From 2ff3c12a427a1bcf91a88a466f2348c8df38a7b3 Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Sun, 17 May 2026 23:29:29 +0200 Subject: [PATCH] fix(nodes): route per-client ops through node clients API + orphan sweep Adds Runtime methods AddClient, UpdateUser, and DeleteUser so master mutates clients on a node via /panel/api/clients/{add,update,del} rather than pushing the whole inbound. The previous rt.UpdateInbound path made the node DelInbound+AddInbound on every single-client change, briefly cycling every other user on the same inbound. DelInbound no longer filters by enable=true, so a disabled node inbound actually gets removed from the node instead of being resurrected by the next snap. setRemoteTrafficLocked now sweeps any ClientRecord with zero ClientInbound rows after SyncInbound rebuilds the attachments, which is how a node-side delete propagates back to master instead of leaving a detached ghost. ClientService.Delete tombstones the email first so a snap arriving mid-delete can't re-create the record. WebSocket broadcasts an "invalidate(clients)" message on every client mutation so the Clients page refreshes without manual reload. Co-Authored-By: Claude Opus 4.7 --- web/controller/client.go | 14 ++++ web/job/node_traffic_sync_job.go | 27 +++++-- web/runtime/local.go | 49 +++++++++++++ web/runtime/remote.go | 44 ++++++++++++ web/runtime/runtime.go | 7 ++ web/service/client.go | 76 ++++++++++++++++---- web/service/inbound.go | 120 +++++++++++++++++++++++-------- web/websocket/hub.go | 37 +++------- 8 files changed, 297 insertions(+), 77 deletions(-) diff --git a/web/controller/client.go b/web/controller/client.go index 868d6126..7a4f0d36 100644 --- a/web/controller/client.go +++ b/web/controller/client.go @@ -7,10 +7,15 @@ import ( "github.com/mhsanaei/3x-ui/v3/database/model" "github.com/mhsanaei/3x-ui/v3/web/service" + "github.com/mhsanaei/3x-ui/v3/web/websocket" "github.com/gin-gonic/gin" ) +func notifyClientsChanged() { + websocket.BroadcastInvalidate(websocket.MessageTypeClients) +} + type ClientController struct { clientService service.ClientService inboundService service.InboundService @@ -84,6 +89,7 @@ func (a *ClientController) create(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } func (a *ClientController) update(c *gin.Context) { @@ -102,6 +108,7 @@ func (a *ClientController) update(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } func (a *ClientController) delete(c *gin.Context) { @@ -116,6 +123,7 @@ func (a *ClientController) delete(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } type attachDetachBody struct { @@ -138,6 +146,7 @@ func (a *ClientController) attach(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } func (a *ClientController) resetAllTraffics(c *gin.Context) { @@ -150,6 +159,7 @@ func (a *ClientController) resetAllTraffics(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } func (a *ClientController) delDepleted(c *gin.Context) { @@ -162,6 +172,7 @@ func (a *ClientController) delDepleted(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } func (a *ClientController) resetTrafficByEmail(c *gin.Context) { @@ -175,6 +186,7 @@ func (a *ClientController) resetTrafficByEmail(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } type trafficUpdateRequest struct { @@ -194,6 +206,7 @@ func (a *ClientController) updateTrafficByEmail(c *gin.Context) { return } jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientUpdateSuccess"), nil) + notifyClientsChanged() } func (a *ClientController) getIps(c *gin.Context) { @@ -294,4 +307,5 @@ func (a *ClientController) detach(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } diff --git a/web/job/node_traffic_sync_job.go b/web/job/node_traffic_sync_job.go index c2f5fa6a..542fe969 100644 --- a/web/job/node_traffic_sync_job.go +++ b/web/job/node_traffic_sync_job.go @@ -18,10 +18,12 @@ const ( ) type NodeTrafficSyncJob struct { - nodeService service.NodeService - inboundService service.InboundService - running sync.Mutex - structural atomicBool + nodeService service.NodeService + inboundService service.InboundService + settingService service.SettingService + xrayService service.XrayService + running sync.Mutex + structural atomicBool } type atomicBool struct { @@ -83,6 +85,22 @@ func (j *NodeTrafficSyncJob) Run() { } wg.Wait() + _, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil) + if err != nil { + logger.Warning("node traffic sync: depletion check failed:", err) + } + if clientsDisabled { + if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable { + if err := j.xrayService.RestartXray(true); err != nil { + logger.Warning("node traffic sync: restart xray after disabling clients failed:", err) + j.xrayService.SetToNeedRestart() + } + } else if settingErr != nil { + logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr) + } + j.structural.set() + } + if !websocket.HasClients() { return } @@ -123,6 +141,7 @@ func (j *NodeTrafficSyncJob) Run() { if j.structural.takeAndReset() { websocket.BroadcastInvalidate(websocket.MessageTypeInbounds) + websocket.BroadcastInvalidate(websocket.MessageTypeClients) } } diff --git a/web/runtime/local.go b/web/runtime/local.go index f970ae28..487e075f 100644 --- a/web/runtime/local.go +++ b/web/runtime/local.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "strings" "sync" "github.com/mhsanaei/3x-ui/v3/database/model" @@ -78,6 +79,54 @@ func (l *Local) RemoveUser(_ context.Context, ib *model.Inbound, email string) e }) } +func (l *Local) AddClient(ctx context.Context, ib *model.Inbound, client model.Client) error { + if !client.Enable { + return nil + } + user := map[string]any{ + "email": client.Email, + "id": client.ID, + "security": client.Security, + "flow": client.Flow, + "auth": client.Auth, + "password": client.Password, + } + return l.AddUser(ctx, ib, user) +} + +func (l *Local) DeleteUser(ctx context.Context, ib *model.Inbound, email string) error { + if email == "" { + return nil + } + if err := l.RemoveUser(ctx, ib, email); err != nil { + if strings.Contains(err.Error(), "not found") { + return nil + } + return err + } + return nil +} + +func (l *Local) UpdateUser(ctx context.Context, ib *model.Inbound, oldEmail string, payload model.Client) error { + if oldEmail != "" { + if err := l.RemoveUser(ctx, ib, oldEmail); err != nil && !strings.Contains(err.Error(), "not found") { + return err + } + } + if !payload.Enable { + return nil + } + user := map[string]any{ + "email": payload.Email, + "id": payload.ID, + "security": payload.Security, + "flow": payload.Flow, + "auth": payload.Auth, + "password": payload.Password, + } + return l.AddUser(ctx, ib, user) +} + func (l *Local) RestartXray(_ context.Context) error { if l.deps.SetNeedRestart != nil { l.deps.SetNeedRestart() diff --git a/web/runtime/remote.go b/web/runtime/remote.go index 3da7671f..44f82a1a 100644 --- a/web/runtime/remote.go +++ b/web/runtime/remote.go @@ -257,6 +257,50 @@ func (r *Remote) RemoveUser(ctx context.Context, ib *model.Inbound, _ string) er return r.UpdateInbound(ctx, ib, ib) } +func (r *Remote) AddClient(ctx context.Context, ib *model.Inbound, client model.Client) error { + id, err := r.resolveRemoteID(ctx, ib.Tag) + if err != nil { + return fmt.Errorf("remote AddClient: resolve tag %q: %w", ib.Tag, err) + } + payload := map[string]any{ + "client": client, + "inboundIds": []int{id}, + } + if _, err := r.do(ctx, http.MethodPost, "panel/api/clients/add", payload); err != nil { + return err + } + return nil +} + +// DeleteUser is idempotent: master's per-inbound Delete loop may call it +// multiple times for the same node, and "not found" on the follow-ups is +// the expected success path. +func (r *Remote) DeleteUser(ctx context.Context, _ *model.Inbound, email string) error { + if email == "" { + return nil + } + _, err := r.do(ctx, http.MethodPost, + "panel/api/clients/del/"+url.PathEscape(email), nil) + if err == nil { + return nil + } + if strings.Contains(strings.ToLower(err.Error()), "not found") { + return nil + } + return err +} + +func (r *Remote) UpdateUser(ctx context.Context, _ *model.Inbound, oldEmail string, payload model.Client) error { + if oldEmail == "" { + oldEmail = payload.Email + } + if _, err := r.do(ctx, http.MethodPost, + "panel/api/clients/update/"+url.PathEscape(oldEmail), payload); err != nil { + return err + } + return nil +} + func (r *Remote) RestartXray(ctx context.Context) error { _, err := r.do(ctx, http.MethodPost, "panel/api/server/restartXrayService", nil) return err diff --git a/web/runtime/runtime.go b/web/runtime/runtime.go index 51dd5f4c..7c7c60c9 100644 --- a/web/runtime/runtime.go +++ b/web/runtime/runtime.go @@ -16,6 +16,13 @@ type Runtime interface { AddUser(ctx context.Context, ib *model.Inbound, userMap map[string]any) error RemoveUser(ctx context.Context, ib *model.Inbound, email string) error + // Per-client operations that route through the node's clients API on + // Remote (instead of pushing the whole inbound) so the node applies + // per-user xray API calls without a DelInbound+AddInbound cycle. + UpdateUser(ctx context.Context, ib *model.Inbound, email string, payload model.Client) error + DeleteUser(ctx context.Context, ib *model.Inbound, email string) error + AddClient(ctx context.Context, ib *model.Inbound, client model.Client) error + RestartXray(ctx context.Context) error ResetClientTraffic(ctx context.Context, ib *model.Inbound, email string) error diff --git a/web/service/client.go b/web/service/client.go index 9086f80b..ec9959b4 100644 --- a/web/service/client.go +++ b/web/service/client.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/google/uuid" @@ -71,6 +72,47 @@ func clientKeyForProtocol(p model.Protocol, rec *model.ClientRecord) string { type ClientService struct{} +// Short-lived tombstone of just-deleted client emails so that a node snapshot +// arriving between delete and node-side processing doesn't resurrect them. +var ( + recentlyDeletedMu sync.Mutex + recentlyDeleted = map[string]time.Time{} +) + +const deleteTombstoneTTL = 90 * time.Second + +func tombstoneClientEmail(email string) { + if email == "" { + return + } + recentlyDeletedMu.Lock() + defer recentlyDeletedMu.Unlock() + recentlyDeleted[email] = time.Now() + cutoff := time.Now().Add(-deleteTombstoneTTL) + for e, ts := range recentlyDeleted { + if ts.Before(cutoff) { + delete(recentlyDeleted, e) + } + } +} + +func isClientEmailTombstoned(email string) bool { + if email == "" { + return false + } + recentlyDeletedMu.Lock() + defer recentlyDeletedMu.Unlock() + ts, ok := recentlyDeleted[email] + if !ok { + return false + } + if time.Since(ts) > deleteTombstoneTTL { + delete(recentlyDeleted, email) + return false + } + return true +} + func (s *ClientService) SyncInbound(tx *gorm.DB, inboundId int, clients []model.Client) error { if tx == nil { tx = database.GetDB() @@ -417,6 +459,8 @@ func (s *ClientService) Delete(inboundSvc *InboundService, id int, keepTraffic b if err != nil { return false, err } + tombstoneClientEmail(existing.Email) + inboundIds, err := s.GetInboundIdsForRecord(id) if err != nil { return false, err @@ -893,10 +937,10 @@ func (s *ClientService) AddInboundClient(inboundSvc *InboundService, data *model if len(client.Email) > 0 { inboundSvc.AddClientStat(tx, data.Id, &client) } - } - if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil { - err = err1 - return false, err + if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil { + err = err1 + return false, err + } } } @@ -1139,7 +1183,7 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo } } } else { - if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil { + if err1 := rt.UpdateUser(context.Background(), oldInbound, oldEmail, clients[0]); err1 != nil { err = err1 return false, err } @@ -1246,14 +1290,11 @@ func (s *ClientService) DelInboundClient(inboundSvc *InboundService, inboundId i return false, err } } - if needApiDel && notDepleted { + if needApiDel && notDepleted && oldInbound.NodeID == nil { rt, rterr := inboundSvc.runtimeFor(oldInbound) if rterr != nil { - if oldInbound.NodeID != nil { - return false, rterr - } needRestart = true - } else if oldInbound.NodeID == nil { + } else { err1 := rt.RemoveUser(context.Background(), oldInbound, email) if err1 == nil { logger.Debug("Client deleted on", rt.Name(), ":", email) @@ -1264,13 +1305,18 @@ func (s *ClientService) DelInboundClient(inboundSvc *InboundService, inboundId i logger.Debug("Error in deleting client on", rt.Name(), ":", err1) needRestart = true } - } else { - if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil { - return false, err1 - } } } } + if oldInbound.NodeID != nil && len(email) > 0 { + rt, rterr := inboundSvc.runtimeFor(oldInbound) + if rterr != nil { + return false, rterr + } + if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { + return false, err1 + } + } if err := db.Save(oldInbound).Error; err != nil { return false, err } @@ -1378,7 +1424,7 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo needRestart = true } } else { - if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil { + if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { return false, err1 } } diff --git a/web/service/inbound.go b/web/service/inbound.go index 44b13983..35b65084 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -388,24 +388,29 @@ func (s *InboundService) DelInbound(id int) (bool, error) { needRestart := false var ib model.Inbound - loadErr := db.Model(model.Inbound{}).Where("id = ? and enable = ?", id, true).First(&ib).Error + loadErr := db.Model(model.Inbound{}).Where("id = ?", id).First(&ib).Error if loadErr == nil { - rt, rterr := s.runtimeFor(&ib) - if rterr != nil { - logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr) - if ib.NodeID == nil { - needRestart = true + shouldPushToRuntime := ib.NodeID != nil || ib.Enable + if shouldPushToRuntime { + rt, rterr := s.runtimeFor(&ib) + if rterr != nil { + logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr) + if ib.NodeID == nil { + needRestart = true + } + } else if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil { + logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag) + } else { + logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1) + if ib.NodeID == nil { + needRestart = true + } } - } else if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil { - logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag) } else { - logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1) - if ib.NodeID == nil { - needRestart = true - } + logger.Debug("DelInbound: skipping runtime push for disabled local inbound id:", id) } } else { - logger.Debug("No enabled inbound found to remove by api, id:", id) + logger.Debug("DelInbound: inbound not found, id:", id) } // Delete client traffics of inbounds @@ -1280,16 +1285,47 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi if !ok { continue } + clients, gcErr := s.GetClients(snapIb) if gcErr != nil { logger.Warning("setRemoteTraffic: parse clients for tag", snapIb.Tag, "failed:", gcErr) continue } - if err := s.clientService.SyncInbound(tx, c.Id, clients); err != nil { + csEnableByEmail := make(map[string]bool, len(snapIb.ClientStats)) + for _, cs := range snapIb.ClientStats { + csEnableByEmail[cs.Email] = cs.Enable + } + filtered := clients[:0] + for i := range clients { + if isClientEmailTombstoned(clients[i].Email) { + continue + } + if cse, hit := csEnableByEmail[clients[i].Email]; hit && !cse { + clients[i].Enable = false + } + filtered = append(filtered, clients[i]) + } + if err := s.clientService.SyncInbound(tx, c.Id, filtered); err != nil { logger.Warning("setRemoteTraffic: sync clients for tag", snapIb.Tag, "failed:", err) } } + var orphanEmails []string + if err := tx.Table("clients"). + Joins("LEFT JOIN client_inbounds ON client_inbounds.client_id = clients.id"). + Where("client_inbounds.client_id IS NULL"). + Pluck("clients.email", &orphanEmails).Error; err != nil { + logger.Warning("setRemoteTraffic: orphan sweep query failed:", err) + } else if len(orphanEmails) > 0 { + if err := tx.Where("email IN ?", orphanEmails).Delete(&model.ClientRecord{}).Error; err != nil { + logger.Warning("setRemoteTraffic: orphan sweep delete ClientRecord failed:", err) + } + if err := tx.Where("email IN ?", orphanEmails).Delete(&xray.ClientTraffic{}).Error; err != nil { + logger.Warning("setRemoteTraffic: orphan sweep delete ClientTraffic failed:", err) + } + structuralChange = true + } + if err := tx.Commit().Error; err != nil { return false, err } @@ -1709,10 +1745,15 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) } var localTargets []target + localByInbound := make(map[int]map[string]struct{}) remoteByInbound := make(map[int][]target) for _, t := range targets { if t.NodeID == nil { localTargets = append(localTargets, t) + if localByInbound[t.InboundID] == nil { + localByInbound[t.InboundID] = make(map[string]struct{}) + } + localByInbound[t.InboundID][t.Email] = struct{}{} } else { remoteByInbound[t.InboundID] = append(remoteByInbound[t.InboundID], t) } @@ -1734,6 +1775,12 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) s.xrayApi.Close() } + for inboundID, emails := range localByInbound { + if _, _, mErr := s.markClientsDisabledInSettings(tx, inboundID, emails); mErr != nil { + logger.Warning("disableInvalidClients: settings.JSON sync failed for inbound", inboundID, ":", mErr) + } + } + result := tx.Model(xray.ClientTraffic{}). Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true). Update("enable", false) @@ -1765,25 +1812,23 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) return needRestart, count, nil } -// disableRemoteClients marks the given emails as disabled in the stored -// inbound.settings JSON and pushes the updated inbound to the remote node so -// the remote's UpdateClientStat sets xray_client_traffic.enable=false and -// SyncInbound sets clients.enable=false on the remote side. Without this -// the remote's own auto-disable would be reverted whenever the central -// pushes any inbound update later. -func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error { +// markClientsDisabledInSettings flips client.enable=false in the inbound's +// stored settings JSON for the given emails and returns both the pre and +// post snapshots so a caller pushing to a remote node has the diff to hand. +func (s *InboundService) markClientsDisabledInSettings(tx *gorm.DB, inboundID int, emails map[string]struct{}) (oldIb, newIb *model.Inbound, err error) { var ib model.Inbound if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).First(&ib).Error; err != nil { - return err + return nil, nil, err } - oldSnapshot := ib + snapshot := ib settings := map[string]any{} if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil { - return err + return nil, nil, err } clients, _ := settings["clients"].([]any) now := time.Now().Unix() * 1000 + mutated := false for i := range clients { entry, ok := clients[i].(map[string]any) if !ok { @@ -1793,26 +1838,41 @@ func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails if _, hit := emails[email]; !hit { continue } + if cur, _ := entry["enable"].(bool); cur == false { + continue + } entry["enable"] = false entry["updated_at"] = now clients[i] = entry + mutated = true + } + if !mutated { + return &snapshot, &ib, nil } settings["clients"] = clients - bs, err := json.MarshalIndent(settings, "", " ") - if err != nil { - return err + bs, marshalErr := json.MarshalIndent(settings, "", " ") + if marshalErr != nil { + return nil, nil, marshalErr } ib.Settings = string(bs) if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID). Update("settings", ib.Settings).Error; err != nil { - return err + return nil, nil, err } + return &snapshot, &ib, nil +} - rt, err := s.runtimeFor(&ib) +func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error { + oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails) if err != nil { return err } - if err := rt.UpdateInbound(context.Background(), &oldSnapshot, &ib); err != nil { + + rt, err := s.runtimeFor(ib) + if err != nil { + return err + } + if err := rt.UpdateInbound(context.Background(), oldSnapshot, ib); err != nil { return err } return nil diff --git a/web/websocket/hub.go b/web/websocket/hub.go index 5eeb80da..6df470d9 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -21,36 +21,17 @@ const ( MessageTypeNodes MessageType = "nodes" MessageTypeNotification MessageType = "notification" MessageTypeXrayState MessageType = "xray_state" - // MessageTypeClientStats carries absolute traffic counters for the clients - // that had activity in the latest collection window. Frontend applies these - // in-place — far smaller than re-broadcasting the full inbound list and - // scales to 10k+ clients without falling back to REST. - MessageTypeClientStats MessageType = "client_stats" - MessageTypeInvalidate MessageType = "invalidate" // Tells frontend to re-fetch via REST (last-resort). + MessageTypeClientStats MessageType = "client_stats" + MessageTypeClients MessageType = "clients" + MessageTypeInvalidate MessageType = "invalidate" + maxMessageSize = 10 * 1024 * 1024 // 10MB - // maxMessageSize caps the WebSocket payload. Beyond this the hub sends a - // lightweight invalidate signal and the frontend re-fetches via REST. - // 10MB lets typical 2k–8k-client deployments push directly via WS (low - // latency); larger installs fall back to invalidate. - maxMessageSize = 10 * 1024 * 1024 // 10MB - - enqueueTimeout = 100 * time.Millisecond - clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser. - hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts. - hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms). - - // minBroadcastInterval throttles per-type broadcasts so cron storms or - // rapid mutations cannot drown the hub. Bursts within the interval are - // dropped (not coalesced); the next broadcast outside the window delivers - // the latest state. Only message types in throttledMessageTypes are gated — - // heartbeat and one-shot signals (status, notification, xray_state, - // invalidate) bypass this so they are never delayed. + enqueueTimeout = 100 * time.Millisecond + clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser. + hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts. + hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms). minBroadcastInterval = 250 * time.Millisecond - - // hubRestartAttempts caps panic-recovery restarts. After this many - // consecutive failures we stop trying and log; the panel keeps running - // (frontend falls back to REST polling) and the operator can investigate. - hubRestartAttempts = 3 + hubRestartAttempts = 3 ) // NewClient builds a Client ready for hub registration.