diff --git a/web/html/inbounds.html b/web/html/inbounds.html index fc7b5307..22f6352c 100644 --- a/web/html/inbounds.html +++ b/web/html/inbounds.html @@ -1137,29 +1137,11 @@ this.dbInbounds.forEach(ib => inboundsById.set(ib.id, ib)); const touched = new Set(); - // Per-inbound email→clientStat lookup cache. Without this, finding - // each delta target was O(N) (linear scan of clientStats), which - // turned into O(activeClients × totalClients) over the loop and - // re-introduced UI freezes at 10k+ client scale. We invalidate the - // cache when the underlying clientStats array reference changes. - const statsByEmail = (dbInbound) => { - if (!Array.isArray(dbInbound.clientStats)) return null; - if (dbInbound._clientStatsMap && dbInbound._clientStatsMapSrc === dbInbound.clientStats) { - return dbInbound._clientStatsMap; - } - const map = new Map(); - for (const cs of dbInbound.clientStats) map.set(cs.email, cs); - dbInbound._clientStatsMap = map; - dbInbound._clientStatsMapSrc = dbInbound.clientStats; - return map; - }; - if (Array.isArray(payload.clients) && payload.clients.length > 0) { for (const stat of payload.clients) { const dbInbound = inboundsById.get(stat.inboundId); if (!dbInbound || !Array.isArray(dbInbound.clientStats)) continue; - const csMap = statsByEmail(dbInbound); - const cs = csMap ? csMap.get(stat.email) : null; + const cs = this.getClientStats(dbInbound, stat.email); if (!cs) continue; cs.up = stat.up; cs.down = stat.down; @@ -1868,15 +1850,18 @@ isExpiry(dbInbound, index) { return dbInbound.toInbound().isExpiry(index); }, + // getClientStats returns the cached email→clientStat lookup for an + // inbound, building it lazily. The cache is invalidated when the + // underlying clientStats array reference changes (full re-fetch), + // so delta updates and post-refetch lookups never see stale entries. + // This is the single source of truth — applyClientStatsDelta uses it too. getClientStats(dbInbound, email) { - if (!dbInbound) return null; - if (!dbInbound._clientStatsMap) { - dbInbound._clientStatsMap = new Map(); - if (dbInbound.clientStats && Array.isArray(dbInbound.clientStats)) { - for (const stats of dbInbound.clientStats) { - dbInbound._clientStatsMap.set(stats.email, stats); - } - } + if (!dbInbound || !Array.isArray(dbInbound.clientStats)) return null; + if (!dbInbound._clientStatsMap || dbInbound._clientStatsMapSrc !== dbInbound.clientStats) { + const map = new Map(); + for (const cs of dbInbound.clientStats) map.set(cs.email, cs); + dbInbound._clientStatsMap = map; + dbInbound._clientStatsMapSrc = dbInbound.clientStats; } return dbInbound._clientStatsMap.get(email); }, diff --git a/web/websocket/hub.go b/web/websocket/hub.go index 04c58d7b..f6589900 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -36,11 +36,13 @@ const ( enqueueTimeout = 100 * time.Millisecond clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser. hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts. + hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms). // minBroadcastInterval throttles per-type broadcasts so cron storms or // rapid mutations cannot drown the hub. Bursts collapse to one delivery. - // Status/traffic/notifications/xray_state/invalidate bypass this gate so - // real-time signals are never delayed. + // Only message types in throttledMessageTypes are gated — heartbeat and + // real-time signals (status, traffic, client_stats, notification, + // xray_state, invalidate) bypass this so they are never delayed. minBroadcastInterval = 250 * time.Millisecond // hubRestartAttempts caps panic-recovery restarts. After this many @@ -91,21 +93,29 @@ func NewHub() *Hub { return &Hub{ clients: make(map[*Client]struct{}), broadcast: make(chan []byte, hubBroadcastQueue), - register: make(chan *Client, 64), - unregister: make(chan *Client, 64), + register: make(chan *Client, hubControlQueue), + unregister: make(chan *Client, hubControlQueue), ctx: ctx, cancel: cancel, lastBroadcast: make(map[MessageType]time.Time), } } -// shouldThrottle returns true if a broadcast of msgType happened within -// minBroadcastInterval. Status/traffic/invalidate skip the gate so heartbeats -// and re-fetch signals are never dropped. +// throttledMessageTypes is the explicit allow-list of message types subject to +// the per-type rate limit. Everything else (status, traffic, client_stats, +// notification, xray_state, invalidate) is heartbeat- or signal-class and must +// not be delayed. Keeping the set explicit (vs. an exclusion list) makes the +// intent obvious when new message types are added — by default they bypass. +var throttledMessageTypes = map[MessageType]struct{}{ + MessageTypeInbounds: {}, + MessageTypeOutbounds: {}, +} + +// shouldThrottle returns true if a broadcast of msgType is rate-limited and +// happened within minBroadcastInterval of the previous one. Only message types +// in throttledMessageTypes are gated. func (h *Hub) shouldThrottle(msgType MessageType) bool { - switch msgType { - case MessageTypeStatus, MessageTypeTraffic, MessageTypeClientStats, - MessageTypeInvalidate, MessageTypeNotification, MessageTypeXrayState: + if _, gated := throttledMessageTypes[msgType]; !gated { return false } h.throttleMu.Lock()