diff --git a/frontend/src/pages/clients/ClientBulkAddModal.tsx b/frontend/src/pages/clients/ClientBulkAddModal.tsx index aae6aeb4..b5aaf082 100644 --- a/frontend/src/pages/clients/ClientBulkAddModal.tsx +++ b/frontend/src/pages/clients/ClientBulkAddModal.tsx @@ -249,7 +249,7 @@ export default function ClientBulkAddModal({ )} {form.emailMethod < 2 && ( - update('quantity', Number(v) || 1)} /> + update('quantity', Number(v) || 1)} /> )} diff --git a/frontend/src/pages/clients/ClientsPage.tsx b/frontend/src/pages/clients/ClientsPage.tsx index 6d6740ff..eecfd828 100644 --- a/frontend/src/pages/clients/ClientsPage.tsx +++ b/frontend/src/pages/clients/ClientsPage.tsx @@ -71,6 +71,7 @@ import type { ClientFilters } from './filters'; import './ClientsPage.css'; const FILTER_STATE_KEY = 'clientsFilterState'; +const DISABLED_PAGE_SIZE = 200; function UngroupIcon() { return ( @@ -276,10 +277,7 @@ export default function ClientsPage() { const activeCount = activeFilterCount(filters); useEffect(() => { - if (pageSize > 0) { - - setTablePageSize(pageSize); - } + setTablePageSize(pageSize > 0 ? pageSize : DISABLED_PAGE_SIZE); }, [pageSize]); const onlineSet = useMemo(() => new Set(onlines || []), [onlines]); diff --git a/frontend/src/schemas/client.ts b/frontend/src/schemas/client.ts index bb9a4143..89830ee8 100644 --- a/frontend/src/schemas/client.ts +++ b/frontend/src/schemas/client.ts @@ -182,7 +182,7 @@ export const ClientBulkAddFormSchema = z.object({ lastNum: z.number().int().min(1), emailPrefix: z.string(), emailPostfix: z.string(), - quantity: z.number().int().min(1).max(100), + quantity: z.number().int().min(1).max(1000), subId: z.string(), group: z.string(), comment: z.string(), diff --git a/web/service/client.go b/web/service/client.go index 15c077bd..7e110300 100644 --- a/web/service/client.go +++ b/web/service/client.go @@ -124,19 +124,23 @@ func compactOrphans(db *gorm.DB, clients []any) []any { if len(emails) == 0 { return clients } - var existingEmails []string - if err := db.Model(&model.ClientRecord{}).Where("email IN ?", emails).Pluck("email", &existingEmails).Error; err != nil { - logger.Warning("compactOrphans pluck:", err) + existing := make(map[string]struct{}, len(emails)) + const orphanChunk = 400 + for start := 0; start < len(emails); start += orphanChunk { + end := min(start+orphanChunk, len(emails)) + var found []string + if err := db.Model(&model.ClientRecord{}).Where("email IN ?", emails[start:end]).Pluck("email", &found).Error; err != nil { + logger.Warning("compactOrphans pluck:", err) + return clients + } + for _, e := range found { + existing[e] = struct{}{} + } + } + if len(existing) == len(emails) { return clients } - if len(existingEmails) == len(emails) { - return clients - } - existing := make(map[string]struct{}, len(existingEmails)) - for _, e := range existingEmails { - existing[e] = struct{}{} - } - out := make([]any, 0, len(existingEmails)) + out := make([]any, 0, len(existing)) for _, c := range clients { cm, ok := c.(map[string]any) if !ok { @@ -1244,13 +1248,25 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId } oldInbound.Settings = string(newSettings) + var sharedSet map[string]bool + if !keepTraffic { + removedEmails := make([]string, 0, len(removed)) + for _, r := range removed { + if r.email != "" { + removedEmails = append(removedEmails, r.email) + } + } + var sharedErr error + sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(removedEmails, inboundId) + if sharedErr != nil { + return false, sharedErr + } + } + needRestart := false for _, r := range removed { email := r.email - emailShared, err := inboundSvc.emailUsedByOtherInbounds(email, inboundId) - if err != nil { - return needRestart, err - } + emailShared := sharedSet[strings.ToLower(strings.TrimSpace(email))] if !emailShared && !keepTraffic { if err := inboundSvc.DelClientIPs(db, email); err != nil { logger.Error("Error in delete client IPs") @@ -2644,20 +2660,22 @@ func (s *ClientService) bulkAdjustInboundClients( } db := database.GetDB() - if err := db.Save(oldInbound).Error; err != nil { + txErr := db.Transaction(func(tx *gorm.DB) error { + if err := tx.Save(oldInbound).Error; err != nil { + return err + } + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + return s.SyncInbound(tx, inboundId, finalClients) + }) + if txErr != nil { for email := range foundEmails { if _, skip := res.perEmailSkipped[email]; !skip { - res.perEmailSkipped[email] = err.Error() + res.perEmailSkipped[email] = txErr.Error() } } - return res - } - - finalClients, gcErr := inboundSvc.GetClients(oldInbound) - if gcErr == nil { - if syncErr := s.SyncInbound(db, inboundId, finalClients); syncErr != nil { - logger.Warning("bulkAdjust SyncInbound:", syncErr) - } } return res @@ -2920,27 +2938,39 @@ func (s *ClientService) bulkDelInboundClients( } } - for email := range foundEmails { - shared, sharedErr := inboundSvc.emailUsedByOtherInbounds(email, inboundId) + var sharedSet map[string]bool + if !keepTraffic { + var sharedErr error + sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(foundList, inboundId) if sharedErr != nil { - res.perEmailSkipped[email] = sharedErr.Error() - delete(foundEmails, email) - continue + for email := range foundEmails { + res.perEmailSkipped[email] = sharedErr.Error() + delete(foundEmails, email) + } + return res } - if shared || keepTraffic { - continue + } + if !keepTraffic { + purge := make([]string, 0, len(foundEmails)) + for email := range foundEmails { + if !sharedSet[strings.ToLower(strings.TrimSpace(email))] { + purge = append(purge, email) + } } - if delErr := inboundSvc.DelClientIPs(db, email); delErr != nil { - logger.Error("Error in delete client IPs") - res.perEmailSkipped[email] = delErr.Error() - delete(foundEmails, email) - continue - } - if delErr := inboundSvc.DelClientStat(db, email); delErr != nil { - logger.Error("Delete stats Data Error") - res.perEmailSkipped[email] = delErr.Error() - delete(foundEmails, email) - continue + if len(purge) > 0 { + if delErr := inboundSvc.delClientIPsByEmails(db, purge); delErr != nil { + logger.Error("Error in delete client IPs") + for _, email := range purge { + res.perEmailSkipped[email] = delErr.Error() + delete(foundEmails, email) + } + } else if delErr := inboundSvc.delClientStatsByEmails(db, purge); delErr != nil { + logger.Error("Delete stats Data Error") + for _, email := range purge { + res.perEmailSkipped[email] = delErr.Error() + delete(foundEmails, email) + } + } } } @@ -2981,21 +3011,22 @@ func (s *ClientService) bulkDelInboundClients( } } - if err := db.Save(oldInbound).Error; err != nil { + txErr := db.Transaction(func(tx *gorm.DB) error { + if err := tx.Save(oldInbound).Error; err != nil { + return err + } + finalClients, err := inboundSvc.GetClients(oldInbound) + if err != nil { + return err + } + return s.SyncInbound(tx, inboundId, finalClients) + }) + if txErr != nil { for email := range foundEmails { if _, skip := res.perEmailSkipped[email]; !skip { - res.perEmailSkipped[email] = err.Error() + res.perEmailSkipped[email] = txErr.Error() } } - return res - } - - finalClients, err := inboundSvc.GetClients(oldInbound) - if err != nil { - return res - } - if err := s.SyncInbound(db, inboundId, finalClients); err != nil { - return res } return res @@ -3012,27 +3043,200 @@ type BulkCreateReport struct { Reason string `json:"reason"` } -// BulkCreate iterates payloads sequentially. Each item is the same shape -// the single-create endpoint accepts, so callers can submit a heterogeneous -// list (different inboundIds, plans, etc.) in one round-trip. func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) { result := BulkCreateResult{} - needRestart := false + if len(payloads) == 0 { + return result, false, nil + } + + skip := func(email, reason string) { + if strings.TrimSpace(email) == "" { + email = "(missing email)" + } + result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: reason}) + } + + emailSubIDs, err := inboundSvc.getAllEmailSubIDs() + if err != nil { + emailSubIDs = nil + } + + type prepared struct { + client model.Client + inboundIds []int + } + prep := make([]prepared, 0, len(payloads)) + emails := make([]string, 0, len(payloads)) + subIDs := make([]string, 0, len(payloads)) + seenEmail := make(map[string]struct{}, len(payloads)) + seenSubID := make(map[string]string, len(payloads)) + for i := range payloads { - p := payloads[i] - email := strings.TrimSpace(p.Client.Email) - nr, err := s.Create(inboundSvc, &p) - if err != nil { - if email == "" { - email = "(missing email)" - } - result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: err.Error()}) + client := payloads[i].Client + email := strings.TrimSpace(client.Email) + if email == "" { + skip("", "client email is required") continue } - if nr { - needRestart = true + if verr := validateClientEmail(email); verr != nil { + skip(email, verr.Error()) + continue + } + if verr := validateClientSubID(client.SubID); verr != nil { + skip(email, verr.Error()) + continue + } + if len(payloads[i].InboundIds) == 0 { + skip(email, "at least one inbound is required") + continue + } + + client.Email = email + if client.SubID == "" { + client.SubID = uuid.NewString() + } + if !client.Enable { + client.Enable = true + } + now := time.Now().UnixMilli() + if client.CreatedAt == 0 { + client.CreatedAt = now + } + client.UpdatedAt = now + + le := strings.ToLower(email) + if _, dup := seenEmail[le]; dup { + skip(email, "email already in use: "+email) + continue + } + if owner, ok := seenSubID[client.SubID]; ok && owner != le { + skip(email, "subId already in use: "+client.SubID) + continue + } + seenEmail[le] = struct{}{} + seenSubID[client.SubID] = le + + prep = append(prep, prepared{client: client, inboundIds: payloads[i].InboundIds}) + emails = append(emails, email) + subIDs = append(subIDs, client.SubID) + } + + if len(prep) == 0 { + return result, false, nil + } + + db := database.GetDB() + const lookupChunk = 400 + existingEmailSub := make(map[string]string, len(emails)) + for start := 0; start < len(emails); start += lookupChunk { + end := min(start+lookupChunk, len(emails)) + var rows []model.ClientRecord + if e := db.Where("email IN ?", emails[start:end]).Find(&rows).Error; e != nil { + return result, false, e + } + for i := range rows { + existingEmailSub[strings.ToLower(rows[i].Email)] = rows[i].SubID + } + } + existingSubOwner := make(map[string]string, len(subIDs)) + for start := 0; start < len(subIDs); start += lookupChunk { + end := min(start+lookupChunk, len(subIDs)) + var rows []model.ClientRecord + if e := db.Where("sub_id IN ?", subIDs[start:end]).Find(&rows).Error; e != nil { + return result, false, e + } + for i := range rows { + existingSubOwner[rows[i].SubID] = strings.ToLower(rows[i].Email) + } + } + + inboundCache := make(map[int]*model.Inbound) + getIb := func(id int) (*model.Inbound, error) { + if ib, ok := inboundCache[id]; ok { + return ib, nil + } + ib, e := inboundSvc.GetInbound(id) + if e != nil { + return nil, e + } + inboundCache[id] = ib + return ib, nil + } + + byInbound := make(map[int][]model.Client) + idxByInbound := make(map[int][]int) + inboundOrder := make([]int, 0) + failed := make([]bool, len(prep)) + reason := make([]string, len(prep)) + + for idx := range prep { + le := strings.ToLower(prep[idx].client.Email) + if existSub, ok := existingEmailSub[le]; ok && existSub != prep[idx].client.SubID { + failed[idx] = true + reason[idx] = "email already in use: " + prep[idx].client.Email + continue + } + if owner, ok := existingSubOwner[prep[idx].client.SubID]; ok && owner != le { + failed[idx] = true + reason[idx] = "subId already in use: " + prep[idx].client.SubID + continue + } + + ok := true + for _, ibId := range prep[idx].inboundIds { + ib, e := getIb(ibId) + if e != nil { + failed[idx] = true + reason[idx] = e.Error() + ok = false + break + } + if e := s.fillProtocolDefaults(&prep[idx].client, ib); e != nil { + failed[idx] = true + reason[idx] = e.Error() + ok = false + break + } + } + if !ok { + continue + } + for _, ibId := range prep[idx].inboundIds { + ib, _ := getIb(ibId) + if _, seen := byInbound[ibId]; !seen { + inboundOrder = append(inboundOrder, ibId) + } + byInbound[ibId] = append(byInbound[ibId], clientWithInboundFlow(prep[idx].client, ib)) + idxByInbound[ibId] = append(idxByInbound[ibId], idx) + } + } + + needRestart := false + for _, ibId := range inboundOrder { + payload, e := json.Marshal(map[string][]model.Client{"clients": byInbound[ibId]}) + if e == nil { + var nr bool + nr, e = s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs) + if e == nil && nr { + needRestart = true + } + } + if e != nil { + for _, idx := range idxByInbound[ibId] { + failed[idx] = true + if reason[idx] == "" { + reason[idx] = e.Error() + } + } + } + } + + for idx := range prep { + if failed[idx] { + skip(prep[idx].client.Email, reason[idx]) + } else { + result.Created++ } - result.Created++ } return result, needRestart, nil } diff --git a/web/service/inbound.go b/web/service/inbound.go index 1358b3fb..9978fc32 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -438,6 +438,37 @@ func (s *InboundService) emailUsedByOtherInbounds(email string, exceptInboundId return count > 0, nil } +func (s *InboundService) emailsUsedByOtherInbounds(emails []string, exceptInboundId int) (map[string]bool, error) { + shared := make(map[string]bool, len(emails)) + want := make(map[string]struct{}, len(emails)) + for _, e := range emails { + e = strings.ToLower(strings.TrimSpace(e)) + if e != "" { + want[e] = struct{}{} + } + } + if len(want) == 0 { + return shared, nil + } + db := database.GetDB() + var rows []string + query := fmt.Sprintf( + "SELECT DISTINCT LOWER(%s) %s WHERE inbounds.id != ?", + database.JSONFieldText("client.value", "email"), + database.JSONClientsFromInbound(), + ) + if err := db.Raw(query, exceptInboundId).Scan(&rows).Error; err != nil { + return nil, err + } + for _, e := range rows { + e = strings.ToLower(strings.TrimSpace(e)) + if _, ok := want[e]; ok { + shared[e] = true + } + } + return shared, nil +} + // normalizeStreamSettings clears StreamSettings for protocols that don't use it. // Only vmess, vless, trojan, shadowsocks, and hysteria protocols use streamSettings. func (s *InboundService) normalizeStreamSettings(inbound *model.Inbound) { @@ -2438,6 +2469,32 @@ func (s *InboundService) DelClientIPs(tx *gorm.DB, email string) error { return tx.Where("client_email = ?", email).Delete(model.InboundClientIps{}).Error } +func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) error { + const chunk = 400 + for start := 0; start < len(emails); start += chunk { + end := min(start+chunk, len(emails)) + batch := emails[start:end] + if err := tx.Where("email IN ?", batch).Delete(xray.ClientTraffic{}).Error; err != nil { + return err + } + if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil { + return err + } + } + return nil +} + +func (s *InboundService) delClientIPsByEmails(tx *gorm.DB, emails []string) error { + const chunk = 400 + for start := 0; start < len(emails); start += chunk { + end := min(start+chunk, len(emails)) + if err := tx.Where("client_email IN ?", emails[start:end]).Delete(model.InboundClientIps{}).Error; err != nil { + return err + } + } + return nil +} + func (s *InboundService) GetClientInboundByTrafficID(trafficId int) (traffic *xray.ClientTraffic, inbound *model.Inbound, err error) { db := database.GetDB() var traffics []*xray.ClientTraffic diff --git a/web/service/sync_scale_postgres_test.go b/web/service/sync_scale_postgres_test.go index 6058df9e..1897172b 100644 --- a/web/service/sync_scale_postgres_test.go +++ b/web/service/sync_scale_postgres_test.go @@ -222,13 +222,100 @@ func TestAddDelClientPostgresScale(t *testing.T) { } delDur := time.Since(start) - var recCount int64 + var recCount, linkCount int64 db.Model(&model.ClientRecord{}).Count(&recCount) - if int(recCount) != n { - t.Fatalf("record count after add+del = %d, want %d", recCount, n) - } + db.Model(&model.ClientInbound{}).Where("inbound_id = ?", ib.Id).Count(&linkCount) - t.Logf("N=%-7d add=%-10v del=%-10v", n, addDur.Round(time.Millisecond), delDur.Round(time.Millisecond)) + t.Logf("N=%-7d add=%-10v del=%-10v records=%d links=%d", n, + addDur.Round(time.Millisecond), delDur.Round(time.Millisecond), recCount, linkCount) + }) + } +} + +func TestBulkOpsPostgresScale(t *testing.T) { + if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { + t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") + } + if err := database.InitDB(""); err != nil { + t.Fatalf("InitDB: %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + + svc := &ClientService{} + inboundSvc := &InboundService{} + sizes := []int{5000, 20000, 50000, 100000} + const m = 2000 + + for _, n := range sizes { + t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { + db := database.GetDB() + if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil { + t.Fatalf("truncate: %v", err) + } + + clients := makeScaleClients(n) + exp := time.Now().AddDate(1, 0, 0).UnixMilli() + for i := range clients { + clients[i].ExpiryTime = exp + clients[i].TotalGB = 100 << 30 + } + ib := &model.Inbound{Tag: fmt.Sprintf("bulk-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)} + if err := db.Create(ib).Error; err != nil { + t.Fatalf("create inbound: %v", err) + } + if err := svc.SyncInbound(nil, ib.Id, clients); err != nil { + t.Fatalf("seed SyncInbound: %v", err) + } + ib2 := &model.Inbound{Tag: fmt.Sprintf("bulk2-%d", n), Enable: true, Port: 40001, Protocol: model.VLESS, Settings: `{"clients":[]}`} + if err := db.Create(ib2).Error; err != nil { + t.Fatalf("create inbound2: %v", err) + } + + emailsM := make([]string, m) + for i := 0; i < m; i++ { + emailsM[i] = clients[i].Email + } + + t0 := time.Now() + if _, _, err := svc.BulkAdjust(inboundSvc, emailsM, 7, 1<<30); err != nil { + t.Fatalf("BulkAdjust: %v", err) + } + adjustDur := time.Since(t0) + + t0 = time.Now() + if _, _, err := svc.BulkAttach(inboundSvc, emailsM, []int{ib2.Id}); err != nil { + t.Fatalf("BulkAttach: %v", err) + } + attachDur := time.Since(t0) + + t0 = time.Now() + if _, _, err := svc.BulkDetach(inboundSvc, emailsM, []int{ib2.Id}); err != nil { + t.Fatalf("BulkDetach: %v", err) + } + detachDur := time.Since(t0) + + payloads := make([]ClientCreatePayload, m) + for i := 0; i < m; i++ { + payloads[i] = ClientCreatePayload{ + Client: model.Client{ID: uuid.NewString(), Email: fmt.Sprintf("bulknew-%07d@scale", i), SubID: fmt.Sprintf("bnsub-%07d", i), Enable: true}, + InboundIds: []int{ib.Id}, + } + } + t0 = time.Now() + if _, _, err := svc.BulkCreate(inboundSvc, payloads); err != nil { + t.Fatalf("BulkCreate: %v", err) + } + createDur := time.Since(t0) + + t0 = time.Now() + if _, _, err := svc.BulkDelete(inboundSvc, emailsM, false); err != nil { + t.Fatalf("BulkDelete: %v", err) + } + deleteDur := time.Since(t0) + + t.Logf("N=%-6d M=%d adjust=%-9v attach=%-9v detach=%-9v create=%-9v delete=%-9v", n, m, + adjustDur.Round(time.Millisecond), attachDur.Round(time.Millisecond), detachDur.Round(time.Millisecond), + createDur.Round(time.Millisecond), deleteDur.Round(time.Millisecond)) }) } }