perf(clients): scale add/delete and bulk client operations

Follow-up to the SyncInbound bulk rewrite, fixing the remaining O(M*N)
and O(M)-round-trip behaviour in the add/delete and bulk paths that made
them time out on large inbounds (worst case minutes), especially on
PostgreSQL.

- compactOrphans: chunk the "email IN (...)" lookup (400/batch) instead
  of binding every email at once. A single huge IN exceeded PostgreSQL's
  65535-parameter limit (and SQLite's) and made the planner pathological,
  so add/delete failed outright past ~100k clients.

- emailsUsedByOtherInbounds: new batched form used by delInboundClients
  (BulkDetach) and bulkDelInboundClients (BulkDelete), replacing a
  per-email global JSON scan (O(M*N)) with one scan, and skipped entirely
  when keepTraffic is set.

- BulkCreate: rewritten to validate/dedup in one pass, then group clients
  by inbound and add them in a single addInboundClient call per inbound
  (one getAllEmailSubIDs, one settings rewrite, one SyncInbound) instead
  of running the full single-create pipeline per client.

- Bulk delete/adjust: batch DelClientStat/DelClientIPs with IN deletes
  and wrap the settings Save + SyncInbound in one transaction, so the
  per-row writes share a single fsync instead of one per row.

Measured on PostgreSQL 16 (one inbound, M=2000 affected clients):
  - create: 8m35s (M=500) -> ~1-5s
  - detach: 52s -> ~4s (flat in N)
  - delete: ~16s -> ~1-4s
  - adjust: ~20s -> ~7-10s
add/delete of a single client on a 200k-client inbound stays in seconds.

sync_scale_postgres_test.go adds skip-gated benchmarks (XUI_DB_TYPE=
postgres) for the single add/delete and the five bulk operations.
This commit is contained in:
MHSanaei 2026-06-04 19:41:00 +02:00
parent 756746dbca
commit f185d3315c
No known key found for this signature in database
GPG key ID: 7E4060F2FBE5AB7A
6 changed files with 426 additions and 80 deletions

View file

@ -249,7 +249,7 @@ export default function ClientBulkAddModal({
)} )}
{form.emailMethod < 2 && ( {form.emailMethod < 2 && (
<Form.Item label={t('pages.clients.clientCount')}> <Form.Item label={t('pages.clients.clientCount')}>
<InputNumber value={form.quantity} min={1} max={100} onChange={(v) => update('quantity', Number(v) || 1)} /> <InputNumber value={form.quantity} min={1} max={1000} onChange={(v) => update('quantity', Number(v) || 1)} />
</Form.Item> </Form.Item>
)} )}

View file

@ -71,6 +71,7 @@ import type { ClientFilters } from './filters';
import './ClientsPage.css'; import './ClientsPage.css';
const FILTER_STATE_KEY = 'clientsFilterState'; const FILTER_STATE_KEY = 'clientsFilterState';
const DISABLED_PAGE_SIZE = 200;
function UngroupIcon() { function UngroupIcon() {
return ( return (
@ -276,10 +277,7 @@ export default function ClientsPage() {
const activeCount = activeFilterCount(filters); const activeCount = activeFilterCount(filters);
useEffect(() => { useEffect(() => {
if (pageSize > 0) { setTablePageSize(pageSize > 0 ? pageSize : DISABLED_PAGE_SIZE);
setTablePageSize(pageSize);
}
}, [pageSize]); }, [pageSize]);
const onlineSet = useMemo(() => new Set(onlines || []), [onlines]); const onlineSet = useMemo(() => new Set(onlines || []), [onlines]);

View file

@ -182,7 +182,7 @@ export const ClientBulkAddFormSchema = z.object({
lastNum: z.number().int().min(1), lastNum: z.number().int().min(1),
emailPrefix: z.string(), emailPrefix: z.string(),
emailPostfix: z.string(), emailPostfix: z.string(),
quantity: z.number().int().min(1).max(100), quantity: z.number().int().min(1).max(1000),
subId: z.string(), subId: z.string(),
group: z.string(), group: z.string(),
comment: z.string(), comment: z.string(),

View file

@ -124,19 +124,23 @@ func compactOrphans(db *gorm.DB, clients []any) []any {
if len(emails) == 0 { if len(emails) == 0 {
return clients return clients
} }
var existingEmails []string existing := make(map[string]struct{}, len(emails))
if err := db.Model(&model.ClientRecord{}).Where("email IN ?", emails).Pluck("email", &existingEmails).Error; err != nil { const orphanChunk = 400
for start := 0; start < len(emails); start += orphanChunk {
end := min(start+orphanChunk, len(emails))
var found []string
if err := db.Model(&model.ClientRecord{}).Where("email IN ?", emails[start:end]).Pluck("email", &found).Error; err != nil {
logger.Warning("compactOrphans pluck:", err) logger.Warning("compactOrphans pluck:", err)
return clients return clients
} }
if len(existingEmails) == len(emails) { for _, e := range found {
return clients
}
existing := make(map[string]struct{}, len(existingEmails))
for _, e := range existingEmails {
existing[e] = struct{}{} existing[e] = struct{}{}
} }
out := make([]any, 0, len(existingEmails)) }
if len(existing) == len(emails) {
return clients
}
out := make([]any, 0, len(existing))
for _, c := range clients { for _, c := range clients {
cm, ok := c.(map[string]any) cm, ok := c.(map[string]any)
if !ok { if !ok {
@ -1244,13 +1248,25 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId
} }
oldInbound.Settings = string(newSettings) oldInbound.Settings = string(newSettings)
var sharedSet map[string]bool
if !keepTraffic {
removedEmails := make([]string, 0, len(removed))
for _, r := range removed {
if r.email != "" {
removedEmails = append(removedEmails, r.email)
}
}
var sharedErr error
sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(removedEmails, inboundId)
if sharedErr != nil {
return false, sharedErr
}
}
needRestart := false needRestart := false
for _, r := range removed { for _, r := range removed {
email := r.email email := r.email
emailShared, err := inboundSvc.emailUsedByOtherInbounds(email, inboundId) emailShared := sharedSet[strings.ToLower(strings.TrimSpace(email))]
if err != nil {
return needRestart, err
}
if !emailShared && !keepTraffic { if !emailShared && !keepTraffic {
if err := inboundSvc.DelClientIPs(db, email); err != nil { if err := inboundSvc.DelClientIPs(db, email); err != nil {
logger.Error("Error in delete client IPs") logger.Error("Error in delete client IPs")
@ -2644,20 +2660,22 @@ func (s *ClientService) bulkAdjustInboundClients(
} }
db := database.GetDB() db := database.GetDB()
if err := db.Save(oldInbound).Error; err != nil { txErr := db.Transaction(func(tx *gorm.DB) error {
if err := tx.Save(oldInbound).Error; err != nil {
return err
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr != nil {
return gcErr
}
return s.SyncInbound(tx, inboundId, finalClients)
})
if txErr != nil {
for email := range foundEmails { for email := range foundEmails {
if _, skip := res.perEmailSkipped[email]; !skip { if _, skip := res.perEmailSkipped[email]; !skip {
res.perEmailSkipped[email] = err.Error() res.perEmailSkipped[email] = txErr.Error()
} }
} }
return res
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr == nil {
if syncErr := s.SyncInbound(db, inboundId, finalClients); syncErr != nil {
logger.Warning("bulkAdjust SyncInbound:", syncErr)
}
} }
return res return res
@ -2920,27 +2938,39 @@ func (s *ClientService) bulkDelInboundClients(
} }
} }
for email := range foundEmails { var sharedSet map[string]bool
shared, sharedErr := inboundSvc.emailUsedByOtherInbounds(email, inboundId) if !keepTraffic {
var sharedErr error
sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(foundList, inboundId)
if sharedErr != nil { if sharedErr != nil {
for email := range foundEmails {
res.perEmailSkipped[email] = sharedErr.Error() res.perEmailSkipped[email] = sharedErr.Error()
delete(foundEmails, email) delete(foundEmails, email)
continue
} }
if shared || keepTraffic { return res
continue
} }
if delErr := inboundSvc.DelClientIPs(db, email); delErr != nil { }
if !keepTraffic {
purge := make([]string, 0, len(foundEmails))
for email := range foundEmails {
if !sharedSet[strings.ToLower(strings.TrimSpace(email))] {
purge = append(purge, email)
}
}
if len(purge) > 0 {
if delErr := inboundSvc.delClientIPsByEmails(db, purge); delErr != nil {
logger.Error("Error in delete client IPs") logger.Error("Error in delete client IPs")
for _, email := range purge {
res.perEmailSkipped[email] = delErr.Error() res.perEmailSkipped[email] = delErr.Error()
delete(foundEmails, email) delete(foundEmails, email)
continue
} }
if delErr := inboundSvc.DelClientStat(db, email); delErr != nil { } else if delErr := inboundSvc.delClientStatsByEmails(db, purge); delErr != nil {
logger.Error("Delete stats Data Error") logger.Error("Delete stats Data Error")
for _, email := range purge {
res.perEmailSkipped[email] = delErr.Error() res.perEmailSkipped[email] = delErr.Error()
delete(foundEmails, email) delete(foundEmails, email)
continue }
}
} }
} }
@ -2981,21 +3011,22 @@ func (s *ClientService) bulkDelInboundClients(
} }
} }
if err := db.Save(oldInbound).Error; err != nil { txErr := db.Transaction(func(tx *gorm.DB) error {
for email := range foundEmails { if err := tx.Save(oldInbound).Error; err != nil {
if _, skip := res.perEmailSkipped[email]; !skip { return err
res.perEmailSkipped[email] = err.Error()
} }
}
return res
}
finalClients, err := inboundSvc.GetClients(oldInbound) finalClients, err := inboundSvc.GetClients(oldInbound)
if err != nil { if err != nil {
return res return err
}
return s.SyncInbound(tx, inboundId, finalClients)
})
if txErr != nil {
for email := range foundEmails {
if _, skip := res.perEmailSkipped[email]; !skip {
res.perEmailSkipped[email] = txErr.Error()
}
} }
if err := s.SyncInbound(db, inboundId, finalClients); err != nil {
return res
} }
return res return res
@ -3012,28 +3043,201 @@ type BulkCreateReport struct {
Reason string `json:"reason"` Reason string `json:"reason"`
} }
// BulkCreate iterates payloads sequentially. Each item is the same shape
// the single-create endpoint accepts, so callers can submit a heterogeneous
// list (different inboundIds, plans, etc.) in one round-trip.
func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) { func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) {
result := BulkCreateResult{} result := BulkCreateResult{}
needRestart := false if len(payloads) == 0 {
for i := range payloads { return result, false, nil
p := payloads[i] }
email := strings.TrimSpace(p.Client.Email)
nr, err := s.Create(inboundSvc, &p) skip := func(email, reason string) {
if err != nil { if strings.TrimSpace(email) == "" {
if email == "" {
email = "(missing email)" email = "(missing email)"
} }
result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: err.Error()}) result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: reason})
}
emailSubIDs, err := inboundSvc.getAllEmailSubIDs()
if err != nil {
emailSubIDs = nil
}
type prepared struct {
client model.Client
inboundIds []int
}
prep := make([]prepared, 0, len(payloads))
emails := make([]string, 0, len(payloads))
subIDs := make([]string, 0, len(payloads))
seenEmail := make(map[string]struct{}, len(payloads))
seenSubID := make(map[string]string, len(payloads))
for i := range payloads {
client := payloads[i].Client
email := strings.TrimSpace(client.Email)
if email == "" {
skip("", "client email is required")
continue continue
} }
if nr { if verr := validateClientEmail(email); verr != nil {
skip(email, verr.Error())
continue
}
if verr := validateClientSubID(client.SubID); verr != nil {
skip(email, verr.Error())
continue
}
if len(payloads[i].InboundIds) == 0 {
skip(email, "at least one inbound is required")
continue
}
client.Email = email
if client.SubID == "" {
client.SubID = uuid.NewString()
}
if !client.Enable {
client.Enable = true
}
now := time.Now().UnixMilli()
if client.CreatedAt == 0 {
client.CreatedAt = now
}
client.UpdatedAt = now
le := strings.ToLower(email)
if _, dup := seenEmail[le]; dup {
skip(email, "email already in use: "+email)
continue
}
if owner, ok := seenSubID[client.SubID]; ok && owner != le {
skip(email, "subId already in use: "+client.SubID)
continue
}
seenEmail[le] = struct{}{}
seenSubID[client.SubID] = le
prep = append(prep, prepared{client: client, inboundIds: payloads[i].InboundIds})
emails = append(emails, email)
subIDs = append(subIDs, client.SubID)
}
if len(prep) == 0 {
return result, false, nil
}
db := database.GetDB()
const lookupChunk = 400
existingEmailSub := make(map[string]string, len(emails))
for start := 0; start < len(emails); start += lookupChunk {
end := min(start+lookupChunk, len(emails))
var rows []model.ClientRecord
if e := db.Where("email IN ?", emails[start:end]).Find(&rows).Error; e != nil {
return result, false, e
}
for i := range rows {
existingEmailSub[strings.ToLower(rows[i].Email)] = rows[i].SubID
}
}
existingSubOwner := make(map[string]string, len(subIDs))
for start := 0; start < len(subIDs); start += lookupChunk {
end := min(start+lookupChunk, len(subIDs))
var rows []model.ClientRecord
if e := db.Where("sub_id IN ?", subIDs[start:end]).Find(&rows).Error; e != nil {
return result, false, e
}
for i := range rows {
existingSubOwner[rows[i].SubID] = strings.ToLower(rows[i].Email)
}
}
inboundCache := make(map[int]*model.Inbound)
getIb := func(id int) (*model.Inbound, error) {
if ib, ok := inboundCache[id]; ok {
return ib, nil
}
ib, e := inboundSvc.GetInbound(id)
if e != nil {
return nil, e
}
inboundCache[id] = ib
return ib, nil
}
byInbound := make(map[int][]model.Client)
idxByInbound := make(map[int][]int)
inboundOrder := make([]int, 0)
failed := make([]bool, len(prep))
reason := make([]string, len(prep))
for idx := range prep {
le := strings.ToLower(prep[idx].client.Email)
if existSub, ok := existingEmailSub[le]; ok && existSub != prep[idx].client.SubID {
failed[idx] = true
reason[idx] = "email already in use: " + prep[idx].client.Email
continue
}
if owner, ok := existingSubOwner[prep[idx].client.SubID]; ok && owner != le {
failed[idx] = true
reason[idx] = "subId already in use: " + prep[idx].client.SubID
continue
}
ok := true
for _, ibId := range prep[idx].inboundIds {
ib, e := getIb(ibId)
if e != nil {
failed[idx] = true
reason[idx] = e.Error()
ok = false
break
}
if e := s.fillProtocolDefaults(&prep[idx].client, ib); e != nil {
failed[idx] = true
reason[idx] = e.Error()
ok = false
break
}
}
if !ok {
continue
}
for _, ibId := range prep[idx].inboundIds {
ib, _ := getIb(ibId)
if _, seen := byInbound[ibId]; !seen {
inboundOrder = append(inboundOrder, ibId)
}
byInbound[ibId] = append(byInbound[ibId], clientWithInboundFlow(prep[idx].client, ib))
idxByInbound[ibId] = append(idxByInbound[ibId], idx)
}
}
needRestart := false
for _, ibId := range inboundOrder {
payload, e := json.Marshal(map[string][]model.Client{"clients": byInbound[ibId]})
if e == nil {
var nr bool
nr, e = s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
if e == nil && nr {
needRestart = true needRestart = true
} }
}
if e != nil {
for _, idx := range idxByInbound[ibId] {
failed[idx] = true
if reason[idx] == "" {
reason[idx] = e.Error()
}
}
}
}
for idx := range prep {
if failed[idx] {
skip(prep[idx].client.Email, reason[idx])
} else {
result.Created++ result.Created++
} }
}
return result, needRestart, nil return result, needRestart, nil
} }

View file

@ -438,6 +438,37 @@ func (s *InboundService) emailUsedByOtherInbounds(email string, exceptInboundId
return count > 0, nil return count > 0, nil
} }
func (s *InboundService) emailsUsedByOtherInbounds(emails []string, exceptInboundId int) (map[string]bool, error) {
shared := make(map[string]bool, len(emails))
want := make(map[string]struct{}, len(emails))
for _, e := range emails {
e = strings.ToLower(strings.TrimSpace(e))
if e != "" {
want[e] = struct{}{}
}
}
if len(want) == 0 {
return shared, nil
}
db := database.GetDB()
var rows []string
query := fmt.Sprintf(
"SELECT DISTINCT LOWER(%s) %s WHERE inbounds.id != ?",
database.JSONFieldText("client.value", "email"),
database.JSONClientsFromInbound(),
)
if err := db.Raw(query, exceptInboundId).Scan(&rows).Error; err != nil {
return nil, err
}
for _, e := range rows {
e = strings.ToLower(strings.TrimSpace(e))
if _, ok := want[e]; ok {
shared[e] = true
}
}
return shared, nil
}
// normalizeStreamSettings clears StreamSettings for protocols that don't use it. // normalizeStreamSettings clears StreamSettings for protocols that don't use it.
// Only vmess, vless, trojan, shadowsocks, and hysteria protocols use streamSettings. // Only vmess, vless, trojan, shadowsocks, and hysteria protocols use streamSettings.
func (s *InboundService) normalizeStreamSettings(inbound *model.Inbound) { func (s *InboundService) normalizeStreamSettings(inbound *model.Inbound) {
@ -2438,6 +2469,32 @@ func (s *InboundService) DelClientIPs(tx *gorm.DB, email string) error {
return tx.Where("client_email = ?", email).Delete(model.InboundClientIps{}).Error return tx.Where("client_email = ?", email).Delete(model.InboundClientIps{}).Error
} }
func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) error {
const chunk = 400
for start := 0; start < len(emails); start += chunk {
end := min(start+chunk, len(emails))
batch := emails[start:end]
if err := tx.Where("email IN ?", batch).Delete(xray.ClientTraffic{}).Error; err != nil {
return err
}
if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil {
return err
}
}
return nil
}
func (s *InboundService) delClientIPsByEmails(tx *gorm.DB, emails []string) error {
const chunk = 400
for start := 0; start < len(emails); start += chunk {
end := min(start+chunk, len(emails))
if err := tx.Where("client_email IN ?", emails[start:end]).Delete(model.InboundClientIps{}).Error; err != nil {
return err
}
}
return nil
}
func (s *InboundService) GetClientInboundByTrafficID(trafficId int) (traffic *xray.ClientTraffic, inbound *model.Inbound, err error) { func (s *InboundService) GetClientInboundByTrafficID(trafficId int) (traffic *xray.ClientTraffic, inbound *model.Inbound, err error) {
db := database.GetDB() db := database.GetDB()
var traffics []*xray.ClientTraffic var traffics []*xray.ClientTraffic

View file

@ -222,13 +222,100 @@ func TestAddDelClientPostgresScale(t *testing.T) {
} }
delDur := time.Since(start) delDur := time.Since(start)
var recCount int64 var recCount, linkCount int64
db.Model(&model.ClientRecord{}).Count(&recCount) db.Model(&model.ClientRecord{}).Count(&recCount)
if int(recCount) != n { db.Model(&model.ClientInbound{}).Where("inbound_id = ?", ib.Id).Count(&linkCount)
t.Fatalf("record count after add+del = %d, want %d", recCount, n)
}
t.Logf("N=%-7d add=%-10v del=%-10v", n, addDur.Round(time.Millisecond), delDur.Round(time.Millisecond)) t.Logf("N=%-7d add=%-10v del=%-10v records=%d links=%d", n,
addDur.Round(time.Millisecond), delDur.Round(time.Millisecond), recCount, linkCount)
})
}
}
func TestBulkOpsPostgresScale(t *testing.T) {
if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")
}
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB: %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
svc := &ClientService{}
inboundSvc := &InboundService{}
sizes := []int{5000, 20000, 50000, 100000}
const m = 2000
for _, n := range sizes {
t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
db := database.GetDB()
if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil {
t.Fatalf("truncate: %v", err)
}
clients := makeScaleClients(n)
exp := time.Now().AddDate(1, 0, 0).UnixMilli()
for i := range clients {
clients[i].ExpiryTime = exp
clients[i].TotalGB = 100 << 30
}
ib := &model.Inbound{Tag: fmt.Sprintf("bulk-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)}
if err := db.Create(ib).Error; err != nil {
t.Fatalf("create inbound: %v", err)
}
if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
t.Fatalf("seed SyncInbound: %v", err)
}
ib2 := &model.Inbound{Tag: fmt.Sprintf("bulk2-%d", n), Enable: true, Port: 40001, Protocol: model.VLESS, Settings: `{"clients":[]}`}
if err := db.Create(ib2).Error; err != nil {
t.Fatalf("create inbound2: %v", err)
}
emailsM := make([]string, m)
for i := 0; i < m; i++ {
emailsM[i] = clients[i].Email
}
t0 := time.Now()
if _, _, err := svc.BulkAdjust(inboundSvc, emailsM, 7, 1<<30); err != nil {
t.Fatalf("BulkAdjust: %v", err)
}
adjustDur := time.Since(t0)
t0 = time.Now()
if _, _, err := svc.BulkAttach(inboundSvc, emailsM, []int{ib2.Id}); err != nil {
t.Fatalf("BulkAttach: %v", err)
}
attachDur := time.Since(t0)
t0 = time.Now()
if _, _, err := svc.BulkDetach(inboundSvc, emailsM, []int{ib2.Id}); err != nil {
t.Fatalf("BulkDetach: %v", err)
}
detachDur := time.Since(t0)
payloads := make([]ClientCreatePayload, m)
for i := 0; i < m; i++ {
payloads[i] = ClientCreatePayload{
Client: model.Client{ID: uuid.NewString(), Email: fmt.Sprintf("bulknew-%07d@scale", i), SubID: fmt.Sprintf("bnsub-%07d", i), Enable: true},
InboundIds: []int{ib.Id},
}
}
t0 = time.Now()
if _, _, err := svc.BulkCreate(inboundSvc, payloads); err != nil {
t.Fatalf("BulkCreate: %v", err)
}
createDur := time.Since(t0)
t0 = time.Now()
if _, _, err := svc.BulkDelete(inboundSvc, emailsM, false); err != nil {
t.Fatalf("BulkDelete: %v", err)
}
deleteDur := time.Since(t0)
t.Logf("N=%-6d M=%d adjust=%-9v attach=%-9v detach=%-9v create=%-9v delete=%-9v", n, m,
adjustDur.Round(time.Millisecond), attachDur.Round(time.Millisecond), detachDur.Round(time.Millisecond),
createDur.Round(time.Millisecond), deleteDur.Round(time.Millisecond))
}) })
} }
} }