From d1e733b9e96725b3b8f07c867469b3853afe30cc Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Thu, 4 Jun 2026 20:35:30 +0200 Subject: [PATCH] perf(clients): chunk IN queries and de-quadratic bulk delete/group/list Bulk client operations bound their entire working set in a single WHERE x IN (...) clause, which exceeds PostgreSQL's 65535-parameter limit (and SQLite's 32766) and gives the planner a pathological query, so they failed outright on inbounds/selections larger than the limit. Every such query is now chunked at 400 items: - BulkDelete / delete-all-clients: six IN queries chunked, and the per-row delete tombstone (which swept the whole in-memory map on every call, O(N^2)) replaced with a single bulk sweep. - BulkAdjust: record and inbound-mapping lookups chunked. - AddToGroup / RemoveFromGroup (bulk add/remove to group): three IN queries chunked. - replaceGroupValue (rename/delete group): inbound-mapping lookup chunked. - List (all-clients listing): link and traffic lookups chunked. Measured on PostgreSQL 16: delete-all-clients on a 100k-client inbound now completes in ~7s (previously crashed at the parameter limit); bulk add/remove to group ~6s and full client list ~1s at 100k. sync_scale_postgres_test.go adds skip-gated benchmarks for delete-all, group add/remove, and list. --- web/service/client.go | 188 +++++++++++++++++------- web/service/sync_scale_postgres_test.go | 110 ++++++++++++++ 2 files changed, 244 insertions(+), 54 deletions(-) diff --git a/web/service/client.go b/web/service/client.go index 7e110300..b269a570 100644 --- a/web/service/client.go +++ b/web/service/client.go @@ -174,6 +174,26 @@ func tombstoneClientEmail(email string) { } } +func tombstoneClientEmails(emails []string) { + if len(emails) == 0 { + return + } + now := time.Now() + cutoff := now.Add(-deleteTombstoneTTL) + recentlyDeletedMu.Lock() + defer recentlyDeletedMu.Unlock() + for _, email := range emails { + if email != "" { + recentlyDeleted[email] = now + } + } + for e, ts := range recentlyDeleted { + if ts.Before(cutoff) { + delete(recentlyDeleted, e) + } + } +} + func isClientEmailTombstoned(email string) bool { if email == "" { return false @@ -462,20 +482,26 @@ func (s *ClientService) List() ([]ClientWithAttachments, error) { } } - var links []model.ClientInbound - if err := db.Where("client_id IN ?", clientIds).Find(&links).Error; err != nil { - return nil, err - } attachments := make(map[int][]int, len(rows)) - for _, l := range links { - attachments[l.ClientId] = append(attachments[l.ClientId], l.InboundId) + for _, batch := range chunkInts(clientIds, sqlInChunk) { + var links []model.ClientInbound + if err := db.Where("client_id IN ?", batch).Find(&links).Error; err != nil { + return nil, err + } + for _, l := range links { + attachments[l.ClientId] = append(attachments[l.ClientId], l.InboundId) + } } trafficByEmail := make(map[string]*xray.ClientTraffic, len(emails)) if len(emails) > 0 { var stats []xray.ClientTraffic - if err := db.Where("email IN ?", emails).Find(&stats).Error; err != nil { - return nil, err + for _, batch := range chunkStrings(emails, sqlInChunk) { + var batchStats []xray.ClientTraffic + if err := db.Where("email IN ?", batch).Find(&batchStats).Error; err != nil { + return nil, err + } + stats = append(stats, batchStats...) } for i := range stats { trafficByEmail[stats[i].Email] = &stats[i] @@ -1800,8 +1826,12 @@ func (s *ClientService) AddToGroup(emails []string, group string) (int, error) { } var records []model.ClientRecord - if err := db.Where("email IN ?", emails).Find(&records).Error; err != nil { - return 0, err + for _, batch := range chunkStrings(emails, sqlInChunk) { + var rows []model.ClientRecord + if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil { + return 0, err + } + records = append(records, rows...) } if len(records) == 0 { return 0, nil @@ -1812,21 +1842,33 @@ func (s *ClientService) AddToGroup(emails []string, group string) (int, error) { } tx := db.Begin() - if err := tx.Model(&model.ClientRecord{}). - Where("email IN ?", affectedEmails). - UpdateColumn("group_name", group).Error; err != nil { - tx.Rollback() - return 0, err + for _, batch := range chunkStrings(affectedEmails, sqlInChunk) { + if err := tx.Model(&model.ClientRecord{}). + Where("email IN ?", batch). + UpdateColumn("group_name", group).Error; err != nil { + tx.Rollback() + return 0, err + } } var inboundIDs []int - if err := tx.Table("client_inbounds"). - Joins("JOIN clients ON clients.id = client_inbounds.client_id"). - Where("clients.email IN ?", affectedEmails). - Distinct("client_inbounds.inbound_id"). - Pluck("inbound_id", &inboundIDs).Error; err != nil { - tx.Rollback() - return 0, err + inboundIDSeen := make(map[int]struct{}) + for _, batch := range chunkStrings(affectedEmails, sqlInChunk) { + var ids []int + if err := tx.Table("client_inbounds"). + Joins("JOIN clients ON clients.id = client_inbounds.client_id"). + Where("clients.email IN ?", batch). + Distinct("client_inbounds.inbound_id"). + Pluck("inbound_id", &ids).Error; err != nil { + tx.Rollback() + return 0, err + } + for _, id := range ids { + if _, ok := inboundIDSeen[id]; !ok { + inboundIDSeen[id] = struct{}{} + inboundIDs = append(inboundIDs, id) + } + } } emailSet := make(map[string]struct{}, len(affectedEmails)) @@ -1918,13 +1960,23 @@ func (s *ClientService) replaceGroupValue(oldName, newName string) (int, error) } var inboundIDs []int - if err := tx.Table("client_inbounds"). - Joins("JOIN clients ON clients.id = client_inbounds.client_id"). - Where("clients.email IN ?", affectedEmails). - Distinct("client_inbounds.inbound_id"). - Pluck("inbound_id", &inboundIDs).Error; err != nil { - tx.Rollback() - return 0, err + inboundIDSeen := make(map[int]struct{}) + for _, batch := range chunkStrings(affectedEmails, sqlInChunk) { + var ids []int + if err := tx.Table("client_inbounds"). + Joins("JOIN clients ON clients.id = client_inbounds.client_id"). + Where("clients.email IN ?", batch). + Distinct("client_inbounds.inbound_id"). + Pluck("inbound_id", &ids).Error; err != nil { + tx.Rollback() + return 0, err + } + for _, id := range ids { + if _, ok := inboundIDSeen[id]; !ok { + inboundIDSeen[id] = struct{}{} + inboundIDs = append(inboundIDs, id) + } + } } for _, ibID := range inboundIDs { @@ -2394,8 +2446,12 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, db := database.GetDB() var records []model.ClientRecord - if err := db.Where("email IN ?", cleanEmails).Find(&records).Error; err != nil { - return result, false, err + for _, batch := range chunkStrings(cleanEmails, sqlInChunk) { + var rows []model.ClientRecord + if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil { + return result, false, err + } + records = append(records, rows...) } recordsByEmail := make(map[string]*model.ClientRecord, len(records)) for i := range records { @@ -2471,8 +2527,12 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, } var mappings []model.ClientInbound - if err := db.Where("client_id IN ?", plannedIds).Find(&mappings).Error; err != nil { - return result, false, err + for _, batch := range chunkInts(plannedIds, sqlInChunk) { + var rows []model.ClientInbound + if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil { + return result, false, err + } + mappings = append(mappings, rows...) } emailsByInbound := map[int][]string{} for _, m := range mappings { @@ -2693,6 +2753,8 @@ type BulkDeleteReport struct { Reason string `json:"reason"` } +const sqlInChunk = 400 + // BulkDelete removes every client in the list in one optimized pass. // Instead of running the full single-delete pipeline N times (which would // re-read, re-parse, and re-write each inbound's settings JSON for every @@ -2723,14 +2785,20 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, db := database.GetDB() var records []model.ClientRecord - if err := db.Where("email IN ?", cleanEmails).Find(&records).Error; err != nil { - return result, false, err + for _, batch := range chunkStrings(cleanEmails, sqlInChunk) { + var rows []model.ClientRecord + if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil { + return result, false, err + } + records = append(records, rows...) } recordsByEmail := make(map[string]*model.ClientRecord, len(records)) + tombstoneEmails := make([]string, 0, len(records)) for i := range records { recordsByEmail[records[i].Email] = &records[i] - tombstoneClientEmail(records[i].Email) + tombstoneEmails = append(tombstoneEmails, records[i].Email) } + tombstoneClientEmails(tombstoneEmails) skippedReasons := map[string]string{} for _, email := range cleanEmails { @@ -2749,8 +2817,12 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, emailsByInbound := map[int][]string{} if len(clientIds) > 0 { var mappings []model.ClientInbound - if err := db.Where("client_id IN ?", clientIds).Find(&mappings).Error; err != nil { - return result, false, err + for _, batch := range chunkInts(clientIds, sqlInChunk) { + var rows []model.ClientInbound + if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil { + return result, false, err + } + mappings = append(mappings, rows...) } for _, m := range mappings { email, ok := recordIdToEmail[m.ClientId] @@ -2785,19 +2857,25 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, } if len(successIds) > 0 { - if err := db.Where("client_id IN ?", successIds).Delete(&model.ClientInbound{}).Error; err != nil { - return result, needRestart, err + for _, batch := range chunkInts(successIds, sqlInChunk) { + if err := db.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; err != nil { + return result, needRestart, err + } } if !keepTraffic && len(successEmails) > 0 { - if err := db.Where("email IN ?", successEmails).Delete(&xray.ClientTraffic{}).Error; err != nil { - return result, needRestart, err - } - if err := db.Where("client_email IN ?", successEmails).Delete(&model.InboundClientIps{}).Error; err != nil { - return result, needRestart, err + for _, batch := range chunkStrings(successEmails, sqlInChunk) { + if err := db.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; err != nil { + return result, needRestart, err + } + if err := db.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; err != nil { + return result, needRestart, err + } } } - if err := db.Where("id IN ?", successIds).Delete(&model.ClientRecord{}).Error; err != nil { - return result, needRestart, err + for _, batch := range chunkInts(successIds, sqlInChunk) { + if err := db.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; err != nil { + return result, needRestart, err + } } } @@ -2927,13 +3005,15 @@ func (s *ClientService) bulkDelInboundClients( Email string Enable bool } - var rows []trafficRow - if err := db.Model(xray.ClientTraffic{}). - Where("email IN ?", foundList). - Select("email, enable"). - Scan(&rows).Error; err == nil { - for _, r := range rows { - notDepletedByEmail[r.Email] = r.Enable + for _, batch := range chunkStrings(foundList, sqlInChunk) { + var rows []trafficRow + if err := db.Model(xray.ClientTraffic{}). + Where("email IN ?", batch). + Select("email, enable"). + Scan(&rows).Error; err == nil { + for _, r := range rows { + notDepletedByEmail[r.Email] = r.Enable + } } } } diff --git a/web/service/sync_scale_postgres_test.go b/web/service/sync_scale_postgres_test.go index 1897172b..35a990ab 100644 --- a/web/service/sync_scale_postgres_test.go +++ b/web/service/sync_scale_postgres_test.go @@ -232,6 +232,116 @@ func TestAddDelClientPostgresScale(t *testing.T) { } } +func TestGroupAndListPostgresScale(t *testing.T) { + if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { + t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") + } + if err := database.InitDB(""); err != nil { + t.Fatalf("InitDB: %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + + svc := &ClientService{} + sizes := []int{5000, 100000} + + for _, n := range sizes { + t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { + db := database.GetDB() + if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil { + t.Fatalf("truncate: %v", err) + } + clients := makeScaleClients(n) + ib := &model.Inbound{Tag: fmt.Sprintf("grp-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)} + if err := db.Create(ib).Error; err != nil { + t.Fatalf("create inbound: %v", err) + } + if err := svc.SyncInbound(nil, ib.Id, clients); err != nil { + t.Fatalf("seed SyncInbound: %v", err) + } + db.Exec("ANALYZE") + emails := make([]string, n) + for i := 0; i < n; i++ { + emails[i] = clients[i].Email + } + + start := time.Now() + if _, err := svc.AddToGroup(emails, "benchgroup"); err != nil { + t.Fatalf("AddToGroup: %v", err) + } + addDur := time.Since(start) + + start = time.Now() + if _, err := svc.RemoveFromGroup(emails); err != nil { + t.Fatalf("RemoveFromGroup: %v", err) + } + rmDur := time.Since(start) + + start = time.Now() + list, err := svc.List() + if err != nil { + t.Fatalf("List: %v", err) + } + listDur := time.Since(start) + if len(list) != n { + t.Fatalf("List returned %d, want %d", len(list), n) + } + + t.Logf("N=%-7d bulkAdd=%-9v bulkRemove=%-9v list=%-9v", n, + addDur.Round(time.Millisecond), rmDur.Round(time.Millisecond), listDur.Round(time.Millisecond)) + }) + } +} + +func TestDelAllClientsPostgresScale(t *testing.T) { + if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { + t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") + } + if err := database.InitDB(""); err != nil { + t.Fatalf("InitDB: %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + + svc := &ClientService{} + inboundSvc := &InboundService{} + sizes := []int{5000, 50000, 100000} + + for _, n := range sizes { + t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { + db := database.GetDB() + if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil { + t.Fatalf("truncate: %v", err) + } + clients := makeScaleClients(n) + ib := &model.Inbound{Tag: fmt.Sprintf("delall-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)} + if err := db.Create(ib).Error; err != nil { + t.Fatalf("create inbound: %v", err) + } + if err := svc.SyncInbound(nil, ib.Id, clients); err != nil { + t.Fatalf("seed SyncInbound: %v", err) + } + + emails, err := inboundSvc.EmailsByInbound(ib.Id) + if err != nil { + t.Fatalf("EmailsByInbound: %v", err) + } + start := time.Now() + res, _, err := svc.BulkDelete(inboundSvc, emails, false) + if err != nil { + t.Fatalf("BulkDelete: %v", err) + } + dur := time.Since(start) + + var recCount, linkCount int64 + db.Model(&model.ClientRecord{}).Count(&recCount) + db.Model(&model.ClientInbound{}).Where("inbound_id = ?", ib.Id).Count(&linkCount) + if recCount != 0 || linkCount != 0 { + t.Fatalf("after delAll: records=%d links=%d want 0/0", recCount, linkCount) + } + t.Logf("N=%-7d delAllClients=%-10v deleted=%d", n, dur.Round(time.Millisecond), res.Deleted) + }) + } +} + func TestBulkOpsPostgresScale(t *testing.T) { if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")