This commit is contained in:
lolka1333 2026-05-05 14:42:34 +02:00
parent ebb4c483d0
commit cea179221c
2 changed files with 29 additions and 6 deletions

View file

@ -1537,6 +1537,9 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
for _, traffic := range traffics {
inbound_ids = append(inbound_ids, traffic.InboundId)
}
// Dedupe so an inbound hosting N expired clients is fetched and saved once
// per tick instead of N times across chunk boundaries.
inbound_ids = uniqueInts(inbound_ids)
// Chunked to stay under SQLite's bind-variable limit when many inbounds
// are touched in a single tick.
for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) {
@ -2523,6 +2526,23 @@ func uniqueNonEmptyStrings(in []string) []string {
return out
}
// uniqueInts returns a deduplicated copy of in, preserving order of first occurrence.
func uniqueInts(in []int) []int {
if len(in) == 0 {
return nil
}
seen := make(map[int]struct{}, len(in))
out := make([]int, 0, len(in))
for _, v := range in {
if _, ok := seen[v]; ok {
continue
}
seen[v] = struct{}{}
out = append(out, v)
}
return out
}
// chunkStrings splits s into consecutive sub-slices of at most size elements.
// Returns nil for an empty input or non-positive size.
func chunkStrings(s []string, size int) [][]string {

View file

@ -39,10 +39,11 @@ const (
hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms).
// minBroadcastInterval throttles per-type broadcasts so cron storms or
// rapid mutations cannot drown the hub. Bursts collapse to one delivery.
// Only message types in throttledMessageTypes are gated — heartbeat and
// real-time signals (status, traffic, client_stats, notification,
// xray_state, invalidate) bypass this so they are never delayed.
// rapid mutations cannot drown the hub. Bursts within the interval are
// dropped (not coalesced); the next broadcast outside the window delivers
// the latest state. Only message types in throttledMessageTypes are gated —
// heartbeat and real-time signals (status, traffic, client_stats,
// notification, xray_state, invalidate) bypass this so they are never delayed.
minBroadcastInterval = 250 * time.Millisecond
// hubRestartAttempts caps panic-recovery restarts. After this many
@ -274,8 +275,10 @@ func trySend(c *Client, msg []byte) (ok bool) {
// Broadcast serializes payload and queues it for delivery to all clients.
// If the serialized message exceeds maxMessageSize, an invalidate signal is
// queued instead so the frontend re-fetches via REST. Bursts of the same
// message type within minBroadcastInterval collapse to a single delivery.
// queued instead so the frontend re-fetches via REST. Broadcasts of throttled
// message types (see throttledMessageTypes) within minBroadcastInterval of
// the previous one are dropped — the next legitimate mutation will push the
// fresh state.
func (h *Hub) Broadcast(messageType MessageType, payload any) {
if h == nil || payload == nil || h.GetClientCount() == 0 {
return