diff --git a/web/service/inbound.go b/web/service/inbound.go index 99fc13a8..d011e093 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -1537,6 +1537,9 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) { for _, traffic := range traffics { inbound_ids = append(inbound_ids, traffic.InboundId) } + // Dedupe so an inbound hosting N expired clients is fetched and saved once + // per tick instead of N times across chunk boundaries. + inbound_ids = uniqueInts(inbound_ids) // Chunked to stay under SQLite's bind-variable limit when many inbounds // are touched in a single tick. for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) { @@ -2523,6 +2526,23 @@ func uniqueNonEmptyStrings(in []string) []string { return out } +// uniqueInts returns a deduplicated copy of in, preserving order of first occurrence. +func uniqueInts(in []int) []int { + if len(in) == 0 { + return nil + } + seen := make(map[int]struct{}, len(in)) + out := make([]int, 0, len(in)) + for _, v := range in { + if _, ok := seen[v]; ok { + continue + } + seen[v] = struct{}{} + out = append(out, v) + } + return out +} + // chunkStrings splits s into consecutive sub-slices of at most size elements. // Returns nil for an empty input or non-positive size. func chunkStrings(s []string, size int) [][]string { diff --git a/web/websocket/hub.go b/web/websocket/hub.go index f6589900..29ba384e 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -39,10 +39,11 @@ const ( hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms). // minBroadcastInterval throttles per-type broadcasts so cron storms or - // rapid mutations cannot drown the hub. Bursts collapse to one delivery. - // Only message types in throttledMessageTypes are gated — heartbeat and - // real-time signals (status, traffic, client_stats, notification, - // xray_state, invalidate) bypass this so they are never delayed. + // rapid mutations cannot drown the hub. Bursts within the interval are + // dropped (not coalesced); the next broadcast outside the window delivers + // the latest state. Only message types in throttledMessageTypes are gated — + // heartbeat and real-time signals (status, traffic, client_stats, + // notification, xray_state, invalidate) bypass this so they are never delayed. minBroadcastInterval = 250 * time.Millisecond // hubRestartAttempts caps panic-recovery restarts. After this many @@ -274,8 +275,10 @@ func trySend(c *Client, msg []byte) (ok bool) { // Broadcast serializes payload and queues it for delivery to all clients. // If the serialized message exceeds maxMessageSize, an invalidate signal is -// queued instead so the frontend re-fetches via REST. Bursts of the same -// message type within minBroadcastInterval collapse to a single delivery. +// queued instead so the frontend re-fetches via REST. Broadcasts of throttled +// message types (see throttledMessageTypes) within minBroadcastInterval of +// the previous one are dropped — the next legitimate mutation will push the +// fresh state. func (h *Hub) Broadcast(messageType MessageType, payload any) { if h == nil || payload == nil || h.GetClientCount() == 0 { return