From 6ceddf83fdbadb5542b5312d0c4f835ad9b1f215 Mon Sep 17 00:00:00 2001 From: lolka1333 Date: Tue, 28 Apr 2026 15:55:26 +0200 Subject: [PATCH] fix: ws hub, inbound service, and frontend correctness - propagate DelInbound error on disable path in SetInboundEnable - skip empty emails in updateClientTraffics to avoid constraint violations - use consistent IN ? clause, drop redundant ErrRecordNotFound guards - Hub.Unregister: direct removeClient fallback when channel is full - applyClientStatsDelta: O(1) email lookup via per-inbound Map cache - WS payload size check: Blob.size instead of .length for real byte count --- web/assets/js/websocket.js | 18 +++++++++------- web/html/inbounds.html | 20 +++++++++++++++++- web/service/inbound.go | 43 +++++++++++++++++++++++++++++--------- web/websocket/hub.go | 16 +++++++++----- 4 files changed, 74 insertions(+), 23 deletions(-) diff --git a/web/assets/js/websocket.js b/web/assets/js/websocket.js index 409863cb..64dd2769 100644 --- a/web/assets/js/websocket.js +++ b/web/assets/js/websocket.js @@ -136,13 +136,17 @@ class WebSocketClient { #onMessage(event) { const data = event.data; - // Reject oversized payloads up front. event.data is a string for - // text frames; .length is the character count which is always ≤ byte - // count, so checking it is a conservative gate. - if (typeof data === 'string' && data.length > WebSocketClient.#MAX_PAYLOAD_BYTES) { - console.error(`WebSocket: payload too large (${data.length} chars), closing`); - try { this.ws?.close(1009, 'message too big'); } catch { /* ignore */ } - return; + // Reject oversized payloads up front. We compare actual UTF-8 byte + // length (via Blob.size) against the limit — string.length counts + // UTF-16 code units, which can undercount real bytes by up to 4× for + // payloads with non-ASCII characters and bypass the cap. + if (typeof data === 'string') { + const byteLen = new Blob([data]).size; + if (byteLen > WebSocketClient.#MAX_PAYLOAD_BYTES) { + console.error(`WebSocket: payload too large (${byteLen} bytes), closing`); + try { this.ws?.close(1009, 'message too big'); } catch { /* ignore */ } + return; + } } let message; try { diff --git a/web/html/inbounds.html b/web/html/inbounds.html index 06733d9f..fc7b5307 100644 --- a/web/html/inbounds.html +++ b/web/html/inbounds.html @@ -1137,11 +1137,29 @@ this.dbInbounds.forEach(ib => inboundsById.set(ib.id, ib)); const touched = new Set(); + // Per-inbound email→clientStat lookup cache. Without this, finding + // each delta target was O(N) (linear scan of clientStats), which + // turned into O(activeClients × totalClients) over the loop and + // re-introduced UI freezes at 10k+ client scale. We invalidate the + // cache when the underlying clientStats array reference changes. + const statsByEmail = (dbInbound) => { + if (!Array.isArray(dbInbound.clientStats)) return null; + if (dbInbound._clientStatsMap && dbInbound._clientStatsMapSrc === dbInbound.clientStats) { + return dbInbound._clientStatsMap; + } + const map = new Map(); + for (const cs of dbInbound.clientStats) map.set(cs.email, cs); + dbInbound._clientStatsMap = map; + dbInbound._clientStatsMapSrc = dbInbound.clientStats; + return map; + }; + if (Array.isArray(payload.clients) && payload.clients.length > 0) { for (const stat of payload.clients) { const dbInbound = inboundsById.get(stat.inboundId); if (!dbInbound || !Array.isArray(dbInbound.clientStats)) continue; - const cs = dbInbound.clientStats.find(c => c.email === stat.email); + const csMap = statsByEmail(dbInbound); + const cs = csMap ? csMap.get(stat.email) : null; if (!cs) continue; cs.up = stat.up; cs.down = stat.down; diff --git a/web/service/inbound.go b/web/service/inbound.go index 395ee27e..aa0d2f51 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -423,13 +423,21 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) { inbound.Enable = enable // Sync xray runtime: drop the live inbound, add it back if we're enabling. + // "User not found"-style errors from DelInbound mean the inbound was + // already absent from the live config — that's fine. Any other error + // means the live config and DB diverged, so we ask the caller to + // schedule a restart. needRestart := false s.xrayApi.Init(p.GetAPIPort()) defer s.xrayApi.Close() - _ = s.xrayApi.DelInbound(inbound.Tag) + if err := s.xrayApi.DelInbound(inbound.Tag); err != nil && + !strings.Contains(err.Error(), "not found") { + logger.Debug("SetInboundEnable: DelInbound via api failed:", err) + needRestart = true + } if !enable { - return false, nil + return needRestart, nil } runtimeInbound, err := s.buildRuntimeInboundForAPI(db, inbound) @@ -667,27 +675,44 @@ func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inb return err } + // Email is the unique key for ClientTraffic rows. Clients without an + // email have no stats row to sync — skip them on both sides instead of + // risking a unique-constraint hit or accidental delete of an unrelated row. oldEmails := make(map[string]struct{}, len(oldClients)) for i := range oldClients { + if oldClients[i].Email == "" { + continue + } oldEmails[oldClients[i].Email] = struct{}{} } newEmails := make(map[string]struct{}, len(newClients)) for i := range newClients { + if newClients[i].Email == "" { + continue + } newEmails[newClients[i].Email] = struct{}{} } // Removed clients — drop their stats rows. for i := range oldClients { - if _, kept := newEmails[oldClients[i].Email]; kept { + email := oldClients[i].Email + if email == "" { continue } - if err := s.DelClientStat(tx, oldClients[i].Email); err != nil { + if _, kept := newEmails[email]; kept { + continue + } + if err := s.DelClientStat(tx, email); err != nil { return err } } // Added clients — create their stats rows. for i := range newClients { - if _, existed := oldEmails[newClients[i].Email]; existed { + email := newClients[i].Email + if email == "" { + continue + } + if _, existed := oldEmails[email]; existed { continue } if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil { @@ -2398,8 +2423,7 @@ func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.Clien } db := database.GetDB() var traffics []*xray.ClientTraffic - err := db.Model(xray.ClientTraffic{}).Where("email IN (?)", emails).Find(&traffics).Error - if err != nil && err != gorm.ErrRecordNotFound { + if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", emails).Find(&traffics).Error; err != nil { return nil, err } return traffics, nil @@ -2423,10 +2447,9 @@ type InboundTrafficSummary struct { func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) { db := database.GetDB() var summaries []InboundTrafficSummary - err := db.Model(&model.Inbound{}). + if err := db.Model(&model.Inbound{}). Select("id, up, down, total, all_time, enable"). - Find(&summaries).Error - if err != nil && err != gorm.ErrRecordNotFound { + Find(&summaries).Error; err != nil { return nil, err } return summaries, nil diff --git a/web/websocket/hub.go b/web/websocket/hub.go index a672eaa3..04c58d7b 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -337,11 +337,16 @@ func (h *Hub) Register(c *Client) { } } -// Unregister removes a client from the hub. Non-blocking: if the unregister -// channel is full (transient burst), the request is dropped — the client will -// be unregistered on its next failed send or when the hub shuts down. -// A blocking send here is unsafe because callers may include the hub goroutine -// itself, which would self-deadlock. +// Unregister removes a client from the hub. Fast path queues for the hub +// goroutine; if the channel is saturated (disconnect storm) we fall back +// to a direct removal under the write lock so dead clients aren't left in +// the registry waiting for their Send buffer to fill (minutes of wasted +// fanout work at low broadcast rates). +// +// Direct removal is safe from any caller: external goroutines (read/write +// pumps) hold no hub locks, and the hub goroutine itself never holds h.mu +// when it calls Unregister — fanout releases its RLock before per-client +// sends, so we can't self-deadlock here. func (h *Hub) Unregister(c *Client) { if h == nil || c == nil { return @@ -349,6 +354,7 @@ func (h *Hub) Unregister(c *Client) { select { case h.unregister <- c: default: + h.removeClient(c) } }