From d3db828b46ac2579e4f6f7eeef0c2170499b2b5a Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Thu, 4 Jun 2026 21:32:15 +0200 Subject: [PATCH] perf(clients): scale-audit remaining client/inbound endpoints to 200k Drive every client/inbound/group endpoint at 100k-200k clients on PostgreSQL and fix the latent issues found in previously-unbenchmarked paths: - enrichClientStats: chunk the email IN lookup (was an unchunked bind that crashed past 65535 clients without traffic rows, taking down GetInbounds/GetInboundDetail/GetAllInbounds) - GetOnlineClients: add the missing nil-process guard its siblings already have, so ListPaged no longer panics before xray starts - GetClientTrafficByEmail: read UUID/subId from the indexed clients table instead of parsing the inbound's full settings JSON (439ms to ~1.5ms, flat in N) - BulkResetTraffic: replace the per-email serialized loop with one chunked bulk UPDATE in a single transaction - DelDepleted: delegate to the already-batched BulkDelete instead of deleting each depleted client one by one Adds a postgres-gated full endpoint sweep plus an A/B benchmark, and SQLite correctness tests for the changed methods. --- frontend/public/openapi.json | 30 ++++ web/service/api_scale_postgres_test.go | 216 +++++++++++++++++++++++++ web/service/bulk_traffic_test.go | 149 +++++++++++++++++ web/service/client.go | 79 +++++---- web/service/inbound.go | 45 +++++- 5 files changed, 483 insertions(+), 36 deletions(-) create mode 100644 web/service/api_scale_postgres_test.go create mode 100644 web/service/bulk_traffic_test.go diff --git a/frontend/public/openapi.json b/frontend/public/openapi.json index dab418d5..e926b679 100644 --- a/frontend/public/openapi.json +++ b/frontend/public/openapi.json @@ -1495,6 +1495,36 @@ } } }, + "/panel/api/server/getMigration": { + "get": { + "tags": [ + "Server" + ], + "summary": "Stream a cross-engine migration file as an attachment: a .dump (SQL text) on SQLite, or a .db SQLite database built from the live data on PostgreSQL.", + "operationId": "get_panel_api_server_getMigration", + "responses": { + "200": { + "description": "Successful response", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "success": { + "type": "boolean" + }, + "msg": { + "type": "string" + }, + "obj": {} + } + } + } + } + } + } + } + }, "/panel/api/server/getNewUUID": { "get": { "tags": [ diff --git a/web/service/api_scale_postgres_test.go b/web/service/api_scale_postgres_test.go new file mode 100644 index 00000000..ce0c0442 --- /dev/null +++ b/web/service/api_scale_postgres_test.go @@ -0,0 +1,216 @@ +package service + +import ( + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/mhsanaei/3x-ui/v3/database" + "github.com/mhsanaei/3x-ui/v3/database/model" + xuilogger "github.com/mhsanaei/3x-ui/v3/logger" + "github.com/mhsanaei/3x-ui/v3/xray" + + "github.com/op/go-logging" +) + +func seedClientTraffics(t *testing.T, inboundId int, clients []model.Client) { + t.Helper() + db := database.GetDB() + rows := make([]xray.ClientTraffic, len(clients)) + for i := range clients { + rows[i] = xray.ClientTraffic{ + InboundId: inboundId, + Email: clients[i].Email, + Enable: true, + Total: clients[i].TotalGB, + ExpiryTime: clients[i].ExpiryTime, + } + } + if err := db.CreateInBatches(rows, 1000).Error; err != nil { + t.Fatalf("seed client_traffics: %v", err) + } +} + +// TestAllAPIsPostgresScale exercises every client/inbound/group service method +// reachable from the REST API at 100k/200k clients, asserting none crash on the +// PostgreSQL bind-parameter ceiling and logging the wall-clock cost of each. +func TestAllAPIsPostgresScale(t *testing.T) { + if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { + t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") + } + xuilogger.InitLogger(logging.ERROR) + if err := database.InitDB(""); err != nil { + t.Fatalf("InitDB: %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + + svc := &ClientService{} + inboundSvc := &InboundService{} + settingSvc := &SettingService{} + const userId = 1 + const m = 2000 + sizes := []int{50000, 100000, 200000} + + for _, n := range sizes { + t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { + db := database.GetDB() + if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics, client_groups RESTART IDENTITY CASCADE").Error; err != nil { + t.Fatalf("truncate: %v", err) + } + + clients := makeScaleClients(n) + exp := time.Now().AddDate(1, 0, 0).UnixMilli() + for i := range clients { + clients[i].ExpiryTime = exp + clients[i].TotalGB = 100 << 30 + } + ib := &model.Inbound{UserId: userId, Tag: fmt.Sprintf("all-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)} + if err := db.Create(ib).Error; err != nil { + t.Fatalf("create inbound: %v", err) + } + ib2 := &model.Inbound{UserId: userId, Tag: fmt.Sprintf("all2-%d", n), Enable: true, Port: 40001, Protocol: model.VLESS, Settings: `{"clients":[]}`} + if err := db.Create(ib2).Error; err != nil { + t.Fatalf("create inbound2: %v", err) + } + if err := svc.SyncInbound(nil, ib.Id, clients); err != nil { + t.Fatalf("seed SyncInbound: %v", err) + } + + run := func(name string, fn func() error) { + start := time.Now() + if err := fn(); err != nil { + t.Fatalf("%s: %v", name, err) + } + t.Logf("N=%-7d %-26s %v", n, name, time.Since(start).Round(time.Millisecond)) + } + + run("GetInboundDetail(noTraffic)", func() error { _, err := inboundSvc.GetInboundDetail(ib.Id); return err }) + + seedClientTraffics(t, ib.Id, clients) + db.Exec("ANALYZE") + + emails := make([]string, n) + for i := 0; i < n; i++ { + emails[i] = clients[i].Email + } + emailsM := emails[:m] + + run("GetInbounds", func() error { _, err := inboundSvc.GetInbounds(userId); return err }) + run("GetInboundsSlim", func() error { _, err := inboundSvc.GetInboundsSlim(userId); return err }) + run("GetInboundDetail", func() error { _, err := inboundSvc.GetInboundDetail(ib.Id); return err }) + run("GetInboundOptions", func() error { _, err := inboundSvc.GetInboundOptions(userId); return err }) + run("ListPaged", func() error { _, err := svc.ListPaged(inboundSvc, settingSvc, ClientPageParams{Page: 1, PageSize: 25}); return err }) + run("ListPaged+search", func() error { + _, err := svc.ListPaged(inboundSvc, settingSvc, ClientPageParams{Page: 1, PageSize: 25, Search: "user-0012345"}) + return err + }) + run("GetClientsLastOnline", func() error { _, err := inboundSvc.GetClientsLastOnline(); return err }) + run("GetClientTrafficByEmail", func() error { _, err := inboundSvc.GetClientTrafficByEmail(emails[n/2]); return err }) + run("GetRecordByEmail", func() error { _, err := svc.GetRecordByEmail(nil, emails[n/2]); return err }) + + run("ListGroups", func() error { _, err := svc.ListGroups(); return err }) + run("AddToGroup(M)", func() error { _, err := svc.AddToGroup(emailsM, "g1"); return err }) + run("EmailsByGroup", func() error { _, err := svc.EmailsByGroup("g1"); return err }) + run("RenameGroup", func() error { _, err := svc.RenameGroup("g1", "g2"); return err }) + run("DeleteGroup", func() error { _, err := svc.DeleteGroup("g2"); return err }) + + run("ResetInboundTraffic", func() error { return inboundSvc.ResetInboundTraffic(ib.Id) }) + run("Inbound.ResetAllTraffics", func() error { return inboundSvc.ResetAllTraffics() }) + run("Client.ResetAllTraffics", func() error { _, err := svc.ResetAllTraffics(); return err }) + run("BulkResetTraffic(M)", func() error { _, err := svc.BulkResetTraffic(inboundSvc, emailsM); return err }) + + run("UpdateByEmail", func() error { + upd := clients[n/3] + upd.Comment = "touched" + _, err := svc.UpdateByEmail(inboundSvc, upd.Email, upd) + return err + }) + run("AttachByEmail", func() error { _, err := svc.AttachByEmail(inboundSvc, emails[n/3], []int{ib2.Id}); return err }) + run("DetachByEmailMany", func() error { _, err := svc.DetachByEmailMany(inboundSvc, emails[n/3], []int{ib2.Id}); return err }) + + depEmails := emails[:1000] + for _, batch := range chunkStrings(depEmails, sqlInChunk) { + if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Update("down", int64(200)<<30).Error; err != nil { + t.Fatalf("mark depleted: %v", err) + } + } + run("DelDepleted(1k)", func() error { _, _, err := svc.DelDepleted(inboundSvc); return err }) + + run("DelInbound(full)", func() error { _, err := inboundSvc.DelInbound(ib.Id); return err }) + }) + } +} + +// TestGetClientTrafficByEmailABScale measures the GetClientTrafficByEmail change: +// old path (GetClientByEmail, which parses the inbound's entire settings JSON to +// find one client) vs new path (UUID/subId read from the indexed clients table). +func TestGetClientTrafficByEmailABScale(t *testing.T) { + if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { + t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") + } + xuilogger.InitLogger(logging.ERROR) + if err := database.InitDB(""); err != nil { + t.Fatalf("InitDB: %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + + svc := &ClientService{} + inboundSvc := &InboundService{} + const reps = 10 + sizes := []int{50000, 100000, 200000} + + oldImpl := func(email string) error { + tr, client, err := inboundSvc.GetClientByEmail(email) + if err != nil { + return err + } + if tr != nil && client != nil { + tr.UUID = client.ID + tr.SubId = client.SubID + } + return nil + } + + for _, n := range sizes { + t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { + db := database.GetDB() + if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil { + t.Fatalf("truncate: %v", err) + } + clients := makeScaleClients(n) + ib := &model.Inbound{UserId: 1, Tag: fmt.Sprintf("ctbe-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)} + if err := db.Create(ib).Error; err != nil { + t.Fatalf("create inbound: %v", err) + } + if err := svc.SyncInbound(nil, ib.Id, clients); err != nil { + t.Fatalf("seed SyncInbound: %v", err) + } + seedClientTraffics(t, ib.Id, clients) + db.Exec("ANALYZE") + + targets := []string{clients[0].Email, clients[n/2].Email, clients[n-1].Email} + + start := time.Now() + for i := 0; i < reps; i++ { + if _, err := inboundSvc.GetClientTrafficByEmail(targets[i%len(targets)]); err != nil { + t.Fatalf("new GetClientTrafficByEmail: %v", err) + } + } + newDur := time.Since(start) / reps + + start = time.Now() + for i := 0; i < reps; i++ { + if err := oldImpl(targets[i%len(targets)]); err != nil { + t.Fatalf("old GetClientTrafficByEmail: %v", err) + } + } + oldDur := time.Since(start) / reps + + t.Logf("N=%-7d new=%-9v old=%-9v speedup=%.0fx", n, + newDur.Round(time.Microsecond), oldDur.Round(time.Millisecond), + float64(oldDur)/float64(maxDur(newDur, time.Microsecond))) + }) + } +} diff --git a/web/service/bulk_traffic_test.go b/web/service/bulk_traffic_test.go new file mode 100644 index 00000000..0e6c92fe --- /dev/null +++ b/web/service/bulk_traffic_test.go @@ -0,0 +1,149 @@ +package service + +import ( + "testing" + "time" + + "github.com/mhsanaei/3x-ui/v3/database" + "github.com/mhsanaei/3x-ui/v3/database/model" + "github.com/mhsanaei/3x-ui/v3/xray" +) + +func mkTraffic(t *testing.T, inboundId int, email string, up, down, total, expiry int64, enable bool) { + t.Helper() + row := xray.ClientTraffic{ + InboundId: inboundId, + Email: email, + Up: up, + Down: down, + Total: total, + ExpiryTime: expiry, + Enable: enable, + } + if err := database.GetDB().Create(&row).Error; err != nil { + t.Fatalf("create traffic %s: %v", email, err) + } +} + +func trafficOf(t *testing.T, email string) xray.ClientTraffic { + t.Helper() + var row xray.ClientTraffic + if err := database.GetDB().Where("email = ?", email).First(&row).Error; err != nil { + t.Fatalf("load traffic %s: %v", email, err) + } + return row +} + +func TestBulkResetTrafficZeroesUsageAndReenables(t *testing.T) { + setupBulkDB(t) + svc := &ClientService{} + inboundSvc := &InboundService{} + + source := []model.Client{ + {Email: "alice@x", ID: "11111111-1111-1111-1111-111111111111", SubID: "sa", Enable: true}, + {Email: "bob@x", ID: "22222222-2222-2222-2222-222222222222", SubID: "sb", Enable: true}, + {Email: "carol@x", ID: "33333333-3333-3333-3333-333333333333", SubID: "sc", Enable: true}, + } + ib := mkInbound(t, 21001, model.VLESS, clientsSettings(t, source)) + if err := svc.SyncInbound(nil, ib.Id, source); err != nil { + t.Fatalf("seed linkage: %v", err) + } + mkTraffic(t, ib.Id, "alice@x", 10, 20, 0, 0, false) + mkTraffic(t, ib.Id, "bob@x", 5, 5, 0, 0, true) + mkTraffic(t, ib.Id, "carol@x", 7, 0, 0, 0, true) + + affected, err := svc.BulkResetTraffic(inboundSvc, []string{"alice@x", "bob@x"}) + if err != nil { + t.Fatalf("BulkResetTraffic: %v", err) + } + if affected != 2 { + t.Fatalf("expected 2 affected, got %d", affected) + } + + for _, e := range []string{"alice@x", "bob@x"} { + tr := trafficOf(t, e) + if tr.Up != 0 || tr.Down != 0 { + t.Fatalf("%s: expected up/down 0, got up=%d down=%d", e, tr.Up, tr.Down) + } + if !tr.Enable { + t.Fatalf("%s: expected re-enabled", e) + } + } + + carol := trafficOf(t, "carol@x") + if carol.Up != 7 { + t.Fatalf("carol not in list should be untouched, got up=%d", carol.Up) + } +} + +func TestDelDepletedRemovesOnlyDepleted(t *testing.T) { + setupBulkDB(t) + svc := &ClientService{} + inboundSvc := &InboundService{} + + source := []model.Client{ + {Email: "alice@x", ID: "11111111-1111-1111-1111-111111111111", SubID: "sa", Enable: true}, + {Email: "bob@x", ID: "22222222-2222-2222-2222-222222222222", SubID: "sb", Enable: true}, + {Email: "carol@x", ID: "33333333-3333-3333-3333-333333333333", SubID: "sc", Enable: true}, + } + ib := mkInbound(t, 21002, model.VLESS, clientsSettings(t, source)) + if err := svc.SyncInbound(nil, ib.Id, source); err != nil { + t.Fatalf("seed linkage: %v", err) + } + past := time.Now().Add(-time.Hour).UnixMilli() + mkTraffic(t, ib.Id, "alice@x", 60, 60, 100, 0, true) + mkTraffic(t, ib.Id, "bob@x", 10, 10, 100, 0, true) + mkTraffic(t, ib.Id, "carol@x", 0, 0, 0, past, true) + + deleted, _, err := svc.DelDepleted(inboundSvc) + if err != nil { + t.Fatalf("DelDepleted: %v", err) + } + if deleted != 2 { + t.Fatalf("expected 2 deleted (alice traffic-depleted, carol expired), got %d", deleted) + } + + if _, err := svc.GetRecordByEmail(nil, "bob@x"); err != nil { + t.Fatalf("bob should survive: %v", err) + } + for _, e := range []string{"alice@x", "carol@x"} { + if _, err := svc.GetRecordByEmail(nil, e); err == nil { + t.Fatalf("%s should be deleted", e) + } + } + + reloaded, _ := inboundSvc.GetInbound(ib.Id) + jsonClients, _ := inboundSvc.GetClients(reloaded) + if len(jsonClients) != 1 || jsonClients[0].Email != "bob@x" { + t.Fatalf("settings JSON should contain only bob, got %d clients", len(jsonClients)) + } +} + +func TestGetClientTrafficByEmailReadsClientsTable(t *testing.T) { + setupBulkDB(t) + svc := &ClientService{} + inboundSvc := &InboundService{} + + source := []model.Client{ + {Email: "alice@x", ID: "11111111-1111-1111-1111-111111111111", SubID: "sa", Enable: true}, + } + ib := mkInbound(t, 21003, model.VLESS, clientsSettings(t, source)) + if err := svc.SyncInbound(nil, ib.Id, source); err != nil { + t.Fatalf("seed linkage: %v", err) + } + mkTraffic(t, ib.Id, "alice@x", 1, 2, 0, 0, true) + + tr, err := inboundSvc.GetClientTrafficByEmail("alice@x") + if err != nil { + t.Fatalf("GetClientTrafficByEmail: %v", err) + } + if tr == nil { + t.Fatalf("expected traffic, got nil") + } + if tr.UUID != "11111111-1111-1111-1111-111111111111" { + t.Fatalf("UUID not enriched from clients table, got %q", tr.UUID) + } + if tr.SubId != "sa" { + t.Fatalf("SubId not enriched from clients table, got %q", tr.SubId) + } +} diff --git a/web/service/client.go b/web/service/client.go index b269a570..cf26344e 100644 --- a/web/service/client.go +++ b/web/service/client.go @@ -1747,14 +1747,43 @@ func (s *ClientService) BulkResetTraffic(inboundSvc *InboundService, emails []st if len(emails) == 0 { return 0, nil } - count := 0 - for _, email := range emails { - if _, err := s.ResetTrafficByEmail(inboundSvc, email); err != nil { - return count, err + seen := map[string]struct{}{} + cleanEmails := make([]string, 0, len(emails)) + for _, e := range emails { + e = strings.TrimSpace(e) + if e == "" { + continue } - count++ + if _, ok := seen[e]; ok { + continue + } + seen[e] = struct{}{} + cleanEmails = append(cleanEmails, e) } - return count, nil + if len(cleanEmails) == 0 { + return 0, nil + } + + affected := 0 + err := submitTrafficWrite(func() error { + db := database.GetDB() + return db.Transaction(func(tx *gorm.DB) error { + for _, batch := range chunkStrings(cleanEmails, sqlInChunk) { + res := tx.Model(xray.ClientTraffic{}). + Where("email IN ?", batch). + Updates(map[string]any{"enable": true, "up": 0, "down": 0}) + if res.Error != nil { + return res.Error + } + affected += int(res.RowsAffected) + } + return nil + }) + }) + if err != nil { + return 0, err + } + return affected, nil } func (s *ClientService) CreateGroup(name string) error { @@ -3334,33 +3363,27 @@ func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, erro return 0, false, nil } - emails := make(map[string]struct{}, len(rows)) + seen := make(map[string]struct{}, len(rows)) + emails := make([]string, 0, len(rows)) for _, r := range rows { - if r.Email != "" { - emails[r.Email] = struct{}{} + if r.Email == "" { + continue } + if _, ok := seen[r.Email]; ok { + continue + } + seen[r.Email] = struct{}{} + emails = append(emails, r.Email) + } + if len(emails) == 0 { + return 0, false, nil } - needRestart := false - deleted := 0 - for email := range emails { - var rec model.ClientRecord - if err := db.Where("email = ?", email).First(&rec).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - continue - } - return deleted, needRestart, err - } - nr, err := s.Delete(inboundSvc, rec.Id, false) - if err != nil { - return deleted, needRestart, err - } - if nr { - needRestart = true - } - deleted++ + res, needRestart, err := s.BulkDelete(inboundSvc, emails, false) + if err != nil { + return res.Deleted, needRestart, err } - return deleted, needRestart, nil + return res.Deleted, needRestart, nil } func (s *ClientService) ResetAllClientTraffics(inboundSvc *InboundService, id int) error { diff --git a/web/service/inbound.go b/web/service/inbound.go index 9978fc32..3e513bc3 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -83,8 +83,17 @@ func (s *InboundService) enrichClientStats(db *gorm.DB, inbounds []*model.Inboun emails = append(emails, e) } var extra []xray.ClientTraffic - if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", emails).Find(&extra).Error; err != nil { - logger.Warning("enrichClientStats:", err) + var loadErr error + for _, batch := range chunkStrings(emails, sqlInChunk) { + var page []xray.ClientTraffic + if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil { + loadErr = err + break + } + extra = append(extra, page...) + } + if loadErr != nil { + logger.Warning("enrichClientStats:", loadErr) } else { byEmail := make(map[string]xray.ClientTraffic, len(extra)) for _, st := range extra { @@ -3048,16 +3057,33 @@ func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, e } func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) { - // Prefer retrieving along with client to reflect actual enabled state from inbound settings - t, client, err := s.GetClientByEmail(email) + db := database.GetDB() + var traffics []*xray.ClientTraffic + if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Find(&traffics).Error; err != nil { + logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err) + return nil, err + } + if len(traffics) == 0 { + return nil, nil + } + t := traffics[0] + + if rec, rErr := s.clientService.GetRecordByEmail(db, email); rErr == nil && rec != nil { + c := rec.ToClient() + t.UUID = c.ID + t.SubId = c.SubID + return t, nil + } + + t2, client, err := s.GetClientByEmail(email) if err != nil { logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err) return nil, err } - if t != nil && client != nil { - t.UUID = client.ID - t.SubId = client.SubID - return t, nil + if t2 != nil && client != nil { + t2.UUID = client.ID + t2.SubId = client.SubID + return t2, nil } return nil, nil } @@ -3386,6 +3412,9 @@ func (s *InboundService) MigrateDB() { } func (s *InboundService) GetOnlineClients() []string { + if p == nil { + return []string{} + } return p.GetOnlineClients() }