fix: unify clientStats cache, throttle clarity, hub constants

This commit is contained in:
lolka1333 2026-04-28 23:41:14 +02:00
parent 7f7bcb6893
commit df6b46d9c9
2 changed files with 32 additions and 37 deletions

View file

@ -1137,29 +1137,11 @@
this.dbInbounds.forEach(ib => inboundsById.set(ib.id, ib));
const touched = new Set();
// Per-inbound email→clientStat lookup cache. Without this, finding
// each delta target was O(N) (linear scan of clientStats), which
// turned into O(activeClients × totalClients) over the loop and
// re-introduced UI freezes at 10k+ client scale. We invalidate the
// cache when the underlying clientStats array reference changes.
const statsByEmail = (dbInbound) => {
if (!Array.isArray(dbInbound.clientStats)) return null;
if (dbInbound._clientStatsMap && dbInbound._clientStatsMapSrc === dbInbound.clientStats) {
return dbInbound._clientStatsMap;
}
const map = new Map();
for (const cs of dbInbound.clientStats) map.set(cs.email, cs);
dbInbound._clientStatsMap = map;
dbInbound._clientStatsMapSrc = dbInbound.clientStats;
return map;
};
if (Array.isArray(payload.clients) && payload.clients.length > 0) {
for (const stat of payload.clients) {
const dbInbound = inboundsById.get(stat.inboundId);
if (!dbInbound || !Array.isArray(dbInbound.clientStats)) continue;
const csMap = statsByEmail(dbInbound);
const cs = csMap ? csMap.get(stat.email) : null;
const cs = this.getClientStats(dbInbound, stat.email);
if (!cs) continue;
cs.up = stat.up;
cs.down = stat.down;
@ -1868,15 +1850,18 @@
isExpiry(dbInbound, index) {
return dbInbound.toInbound().isExpiry(index);
},
// getClientStats returns the cached email→clientStat lookup for an
// inbound, building it lazily. The cache is invalidated when the
// underlying clientStats array reference changes (full re-fetch),
// so delta updates and post-refetch lookups never see stale entries.
// This is the single source of truth — applyClientStatsDelta uses it too.
getClientStats(dbInbound, email) {
if (!dbInbound) return null;
if (!dbInbound._clientStatsMap) {
dbInbound._clientStatsMap = new Map();
if (dbInbound.clientStats && Array.isArray(dbInbound.clientStats)) {
for (const stats of dbInbound.clientStats) {
dbInbound._clientStatsMap.set(stats.email, stats);
}
}
if (!dbInbound || !Array.isArray(dbInbound.clientStats)) return null;
if (!dbInbound._clientStatsMap || dbInbound._clientStatsMapSrc !== dbInbound.clientStats) {
const map = new Map();
for (const cs of dbInbound.clientStats) map.set(cs.email, cs);
dbInbound._clientStatsMap = map;
dbInbound._clientStatsMapSrc = dbInbound.clientStats;
}
return dbInbound._clientStatsMap.get(email);
},

View file

@ -36,11 +36,13 @@ const (
enqueueTimeout = 100 * time.Millisecond
clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser.
hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts.
hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms).
// minBroadcastInterval throttles per-type broadcasts so cron storms or
// rapid mutations cannot drown the hub. Bursts collapse to one delivery.
// Status/traffic/notifications/xray_state/invalidate bypass this gate so
// real-time signals are never delayed.
// Only message types in throttledMessageTypes are gated — heartbeat and
// real-time signals (status, traffic, client_stats, notification,
// xray_state, invalidate) bypass this so they are never delayed.
minBroadcastInterval = 250 * time.Millisecond
// hubRestartAttempts caps panic-recovery restarts. After this many
@ -91,21 +93,29 @@ func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]struct{}),
broadcast: make(chan []byte, hubBroadcastQueue),
register: make(chan *Client, 64),
unregister: make(chan *Client, 64),
register: make(chan *Client, hubControlQueue),
unregister: make(chan *Client, hubControlQueue),
ctx: ctx,
cancel: cancel,
lastBroadcast: make(map[MessageType]time.Time),
}
}
// shouldThrottle returns true if a broadcast of msgType happened within
// minBroadcastInterval. Status/traffic/invalidate skip the gate so heartbeats
// and re-fetch signals are never dropped.
// throttledMessageTypes is the explicit allow-list of message types subject to
// the per-type rate limit. Everything else (status, traffic, client_stats,
// notification, xray_state, invalidate) is heartbeat- or signal-class and must
// not be delayed. Keeping the set explicit (vs. an exclusion list) makes the
// intent obvious when new message types are added — by default they bypass.
var throttledMessageTypes = map[MessageType]struct{}{
MessageTypeInbounds: {},
MessageTypeOutbounds: {},
}
// shouldThrottle returns true if a broadcast of msgType is rate-limited and
// happened within minBroadcastInterval of the previous one. Only message types
// in throttledMessageTypes are gated.
func (h *Hub) shouldThrottle(msgType MessageType) bool {
switch msgType {
case MessageTypeStatus, MessageTypeTraffic, MessageTypeClientStats,
MessageTypeInvalidate, MessageTypeNotification, MessageTypeXrayState:
if _, gated := throttledMessageTypes[msgType]; !gated {
return false
}
h.throttleMu.Lock()