fix: ws hub, inbound service, and frontend correctness

- propagate DelInbound error on disable path in SetInboundEnable
- skip empty emails in updateClientTraffics to avoid constraint violations
- use consistent IN ? clause, drop redundant ErrRecordNotFound guards
- Hub.Unregister: direct removeClient fallback when channel is full
- applyClientStatsDelta: O(1) email lookup via per-inbound Map cache
- WS payload size check: Blob.size instead of .length for real byte count
This commit is contained in:
lolka1333 2026-04-28 15:55:26 +02:00
parent 88d71dad5e
commit 6ceddf83fd
4 changed files with 74 additions and 23 deletions

View file

@ -136,13 +136,17 @@ class WebSocketClient {
#onMessage(event) { #onMessage(event) {
const data = event.data; const data = event.data;
// Reject oversized payloads up front. event.data is a string for // Reject oversized payloads up front. We compare actual UTF-8 byte
// text frames; .length is the character count which is always ≤ byte // length (via Blob.size) against the limit — string.length counts
// count, so checking it is a conservative gate. // UTF-16 code units, which can undercount real bytes by up to 4× for
if (typeof data === 'string' && data.length > WebSocketClient.#MAX_PAYLOAD_BYTES) { // payloads with non-ASCII characters and bypass the cap.
console.error(`WebSocket: payload too large (${data.length} chars), closing`); if (typeof data === 'string') {
try { this.ws?.close(1009, 'message too big'); } catch { /* ignore */ } const byteLen = new Blob([data]).size;
return; if (byteLen > WebSocketClient.#MAX_PAYLOAD_BYTES) {
console.error(`WebSocket: payload too large (${byteLen} bytes), closing`);
try { this.ws?.close(1009, 'message too big'); } catch { /* ignore */ }
return;
}
} }
let message; let message;
try { try {

View file

@ -1137,11 +1137,29 @@
this.dbInbounds.forEach(ib => inboundsById.set(ib.id, ib)); this.dbInbounds.forEach(ib => inboundsById.set(ib.id, ib));
const touched = new Set(); const touched = new Set();
// Per-inbound email→clientStat lookup cache. Without this, finding
// each delta target was O(N) (linear scan of clientStats), which
// turned into O(activeClients × totalClients) over the loop and
// re-introduced UI freezes at 10k+ client scale. We invalidate the
// cache when the underlying clientStats array reference changes.
const statsByEmail = (dbInbound) => {
if (!Array.isArray(dbInbound.clientStats)) return null;
if (dbInbound._clientStatsMap && dbInbound._clientStatsMapSrc === dbInbound.clientStats) {
return dbInbound._clientStatsMap;
}
const map = new Map();
for (const cs of dbInbound.clientStats) map.set(cs.email, cs);
dbInbound._clientStatsMap = map;
dbInbound._clientStatsMapSrc = dbInbound.clientStats;
return map;
};
if (Array.isArray(payload.clients) && payload.clients.length > 0) { if (Array.isArray(payload.clients) && payload.clients.length > 0) {
for (const stat of payload.clients) { for (const stat of payload.clients) {
const dbInbound = inboundsById.get(stat.inboundId); const dbInbound = inboundsById.get(stat.inboundId);
if (!dbInbound || !Array.isArray(dbInbound.clientStats)) continue; if (!dbInbound || !Array.isArray(dbInbound.clientStats)) continue;
const cs = dbInbound.clientStats.find(c => c.email === stat.email); const csMap = statsByEmail(dbInbound);
const cs = csMap ? csMap.get(stat.email) : null;
if (!cs) continue; if (!cs) continue;
cs.up = stat.up; cs.up = stat.up;
cs.down = stat.down; cs.down = stat.down;

View file

@ -423,13 +423,21 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
inbound.Enable = enable inbound.Enable = enable
// Sync xray runtime: drop the live inbound, add it back if we're enabling. // Sync xray runtime: drop the live inbound, add it back if we're enabling.
// "User not found"-style errors from DelInbound mean the inbound was
// already absent from the live config — that's fine. Any other error
// means the live config and DB diverged, so we ask the caller to
// schedule a restart.
needRestart := false needRestart := false
s.xrayApi.Init(p.GetAPIPort()) s.xrayApi.Init(p.GetAPIPort())
defer s.xrayApi.Close() defer s.xrayApi.Close()
_ = s.xrayApi.DelInbound(inbound.Tag) if err := s.xrayApi.DelInbound(inbound.Tag); err != nil &&
!strings.Contains(err.Error(), "not found") {
logger.Debug("SetInboundEnable: DelInbound via api failed:", err)
needRestart = true
}
if !enable { if !enable {
return false, nil return needRestart, nil
} }
runtimeInbound, err := s.buildRuntimeInboundForAPI(db, inbound) runtimeInbound, err := s.buildRuntimeInboundForAPI(db, inbound)
@ -667,27 +675,44 @@ func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inb
return err return err
} }
// Email is the unique key for ClientTraffic rows. Clients without an
// email have no stats row to sync — skip them on both sides instead of
// risking a unique-constraint hit or accidental delete of an unrelated row.
oldEmails := make(map[string]struct{}, len(oldClients)) oldEmails := make(map[string]struct{}, len(oldClients))
for i := range oldClients { for i := range oldClients {
if oldClients[i].Email == "" {
continue
}
oldEmails[oldClients[i].Email] = struct{}{} oldEmails[oldClients[i].Email] = struct{}{}
} }
newEmails := make(map[string]struct{}, len(newClients)) newEmails := make(map[string]struct{}, len(newClients))
for i := range newClients { for i := range newClients {
if newClients[i].Email == "" {
continue
}
newEmails[newClients[i].Email] = struct{}{} newEmails[newClients[i].Email] = struct{}{}
} }
// Removed clients — drop their stats rows. // Removed clients — drop their stats rows.
for i := range oldClients { for i := range oldClients {
if _, kept := newEmails[oldClients[i].Email]; kept { email := oldClients[i].Email
if email == "" {
continue continue
} }
if err := s.DelClientStat(tx, oldClients[i].Email); err != nil { if _, kept := newEmails[email]; kept {
continue
}
if err := s.DelClientStat(tx, email); err != nil {
return err return err
} }
} }
// Added clients — create their stats rows. // Added clients — create their stats rows.
for i := range newClients { for i := range newClients {
if _, existed := oldEmails[newClients[i].Email]; existed { email := newClients[i].Email
if email == "" {
continue
}
if _, existed := oldEmails[email]; existed {
continue continue
} }
if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil { if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil {
@ -2398,8 +2423,7 @@ func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.Clien
} }
db := database.GetDB() db := database.GetDB()
var traffics []*xray.ClientTraffic var traffics []*xray.ClientTraffic
err := db.Model(xray.ClientTraffic{}).Where("email IN (?)", emails).Find(&traffics).Error if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", emails).Find(&traffics).Error; err != nil {
if err != nil && err != gorm.ErrRecordNotFound {
return nil, err return nil, err
} }
return traffics, nil return traffics, nil
@ -2423,10 +2447,9 @@ type InboundTrafficSummary struct {
func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) { func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
db := database.GetDB() db := database.GetDB()
var summaries []InboundTrafficSummary var summaries []InboundTrafficSummary
err := db.Model(&model.Inbound{}). if err := db.Model(&model.Inbound{}).
Select("id, up, down, total, all_time, enable"). Select("id, up, down, total, all_time, enable").
Find(&summaries).Error Find(&summaries).Error; err != nil {
if err != nil && err != gorm.ErrRecordNotFound {
return nil, err return nil, err
} }
return summaries, nil return summaries, nil

View file

@ -337,11 +337,16 @@ func (h *Hub) Register(c *Client) {
} }
} }
// Unregister removes a client from the hub. Non-blocking: if the unregister // Unregister removes a client from the hub. Fast path queues for the hub
// channel is full (transient burst), the request is dropped — the client will // goroutine; if the channel is saturated (disconnect storm) we fall back
// be unregistered on its next failed send or when the hub shuts down. // to a direct removal under the write lock so dead clients aren't left in
// A blocking send here is unsafe because callers may include the hub goroutine // the registry waiting for their Send buffer to fill (minutes of wasted
// itself, which would self-deadlock. // fanout work at low broadcast rates).
//
// Direct removal is safe from any caller: external goroutines (read/write
// pumps) hold no hub locks, and the hub goroutine itself never holds h.mu
// when it calls Unregister — fanout releases its RLock before per-client
// sends, so we can't self-deadlock here.
func (h *Hub) Unregister(c *Client) { func (h *Hub) Unregister(c *Client) {
if h == nil || c == nil { if h == nil || c == nil {
return return
@ -349,6 +354,7 @@ func (h *Hub) Unregister(c *Client) {
select { select {
case h.unregister <- c: case h.unregister <- c:
default: default:
h.removeClient(c)
} }
} }