From 97f284c9a7404b6973823cf5e77314d98f6232e2 Mon Sep 17 00:00:00 2001 From: test999 Date: Mon, 6 Apr 2026 08:16:10 +0300 Subject: [PATCH 1/8] feat: implement real-time traffic monitoring and UI updates using a high-performance WebSocket hub and background job system --- web/html/inbounds.html | 25 +++++++++++++++++--- web/html/xray.html | 8 +++++++ web/job/xray_traffic_job.go | 32 +++++++++++++++---------- web/websocket/hub.go | 47 +++++++++++++++++++++++++++++++++---- web/websocket/notifier.go | 22 +++++++++++++++++ 5 files changed, 114 insertions(+), 20 deletions(-) diff --git a/web/html/inbounds.html b/web/html/inbounds.html index b945da90..fa6462ee 100644 --- a/web/html/inbounds.html +++ b/web/html/inbounds.html @@ -1605,11 +1605,25 @@ } }); + // Listen for invalidate signals (sent when payload is too large for WebSocket) + // The server sends a lightweight notification and we re-fetch via REST API + let invalidateTimer = null; + window.wsClient.on('invalidate', (payload) => { + if (payload && (payload.type === 'inbounds' || payload.type === 'traffic')) { + // Debounce to avoid flooding the REST API with multiple invalidate signals + if (invalidateTimer) clearTimeout(invalidateTimer); + invalidateTimer = setTimeout(() => { + invalidateTimer = null; + this.getDBInbounds(); + }, 1000); + } + }); + // Listen for traffic updates window.wsClient.on('traffic', (payload) => { // Note: Do NOT update total consumed traffic (stats.up, stats.down) from this event // because clientTraffics contains delta/incremental values, not total accumulated values. - // Total traffic is updated via the 'inbounds' event which contains accumulated values from database. + // Total traffic is updated via the 'inbounds' WebSocket event (or 'invalidate' fallback for large panels). // Update online clients list in real-time if (payload && Array.isArray(payload.onlineClients)) { @@ -1627,22 +1641,27 @@ this.onlineClients = nextOnlineClients; if (onlineChanged) { // Recalculate client counts to update online status + // Use $set for Vue 2 reactivity — direct array index assignment is not reactive this.dbInbounds.forEach(dbInbound => { const inbound = this.inbounds.find(ib => ib.id === dbInbound.id); if (inbound && this.clientCount[dbInbound.id]) { - this.clientCount[dbInbound.id] = this.getClientCounts(dbInbound, inbound); + this.$set(this.clientCount, dbInbound.id, this.getClientCounts(dbInbound, inbound)); } }); + // Always trigger UI refresh — not just when filter is enabled if (this.enableFilter) { this.filterInbounds(); + } else { + this.searchInbounds(this.searchKey); } } } // Update last online map in real-time + // Replace entirely (server sends the full map) to avoid unbounded growth from deleted clients if (payload && payload.lastOnlineMap && typeof payload.lastOnlineMap === 'object') { - this.lastOnlineMap = { ...this.lastOnlineMap, ...payload.lastOnlineMap }; + this.lastOnlineMap = payload.lastOnlineMap; } }); diff --git a/web/html/xray.html b/web/html/xray.html index ebe31f48..d1a12e45 100644 --- a/web/html/xray.html +++ b/web/html/xray.html @@ -1075,6 +1075,14 @@ this.$forceUpdate(); } }); + + // Handle invalidate signals (sent when payload is too large for WebSocket, + // or when traffic job notifies about data changes) + window.wsClient.on('invalidate', (payload) => { + if (payload && payload.type === 'outbounds') { + this.refreshOutboundTraffic(); + } + }); } while (true) { diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index 8d2bfbd6..b1a775f0 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -50,7 +50,13 @@ func (j *XrayTrafficJob) Run() { j.xrayService.SetToNeedRestart() } - // Get online clients and last online map for real-time status updates + // If no frontend client is connected, skip all WebSocket broadcasting routines, + // including expensive DB queries for online clients and JSON marshaling. + if !websocket.HasClients() { + return + } + + // Update online clients list and map onlineClients := j.inboundService.GetOnlineClients() lastOnlineMap, err := j.inboundService.GetClientsLastOnline() if err != nil { @@ -58,8 +64,17 @@ func (j *XrayTrafficJob) Run() { lastOnlineMap = make(map[string]int64) } + // Broadcast traffic update (deltas and online stats) via WebSocket + trafficUpdate := map[string]any{ + "traffics": traffics, + "clientTraffics": clientTraffics, + "onlineClients": onlineClients, + "lastOnlineMap": lastOnlineMap, + } + websocket.BroadcastTraffic(trafficUpdate) + // Fetch updated inbounds from database with accumulated traffic values - // This ensures frontend receives the actual total traffic, not just delta values + // This ensures frontend receives the actual total traffic for real-time UI refresh. updatedInbounds, err := j.inboundService.GetAllInbounds() if err != nil { logger.Warning("get all inbounds for websocket failed:", err) @@ -70,16 +85,8 @@ func (j *XrayTrafficJob) Run() { logger.Warning("get all outbounds for websocket failed:", err) } - // Broadcast traffic update via WebSocket with accumulated values from database - trafficUpdate := map[string]any{ - "traffics": traffics, - "clientTraffics": clientTraffics, - "onlineClients": onlineClients, - "lastOnlineMap": lastOnlineMap, - } - websocket.BroadcastTraffic(trafficUpdate) - - // Broadcast full inbounds update for real-time UI refresh + // The web socket hub will automatically check the payload size. + // If < 1MB, it pushes real-time. If > 1MB, it sends a lightweight 'invalidate' signal. if updatedInbounds != nil { websocket.BroadcastInbounds(updatedInbounds) } @@ -87,7 +94,6 @@ func (j *XrayTrafficJob) Run() { if updatedOutbounds != nil { websocket.BroadcastOutbounds(updatedOutbounds) } - } func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) { diff --git a/web/websocket/hub.go b/web/websocket/hub.go index 8aa5903c..e2c0072c 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -21,6 +21,7 @@ const ( MessageTypeNotification MessageType = "notification" // System notification MessageTypeXrayState MessageType = "xray_state" // Xray state change MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update + MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST ) // Message represents a WebSocket message @@ -259,6 +260,11 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) { return } + // Skip all work if no clients are connected + if h.GetClientCount() == 0 { + return + } + msg := Message{ Type: messageType, Payload: payload, @@ -271,10 +277,12 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) { return } - // Limit message size to prevent memory issues + // If message exceeds size limit, send a lightweight invalidate notification + // instead of dropping it entirely — the frontend will re-fetch via REST API const maxMessageSize = 1024 * 1024 // 1MB if len(data) > maxMessageSize { - logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) + logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType) + h.broadcastInvalidate(messageType) return } @@ -298,6 +306,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) { return } + // Skip all work if no clients are connected + if h.GetClientCount() == 0 { + return + } + msg := Message{ Type: messageType, Payload: payload, @@ -310,10 +323,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) { return } - // Limit message size to prevent memory issues + // If message exceeds size limit, send a lightweight invalidate notification const maxMessageSize = 1024 * 1024 // 1MB if len(data) > maxMessageSize { - logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) + logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType) + h.broadcastInvalidate(messageType) return } @@ -374,6 +388,31 @@ func (h *Hub) Stop() { } } +// broadcastInvalidate sends a lightweight invalidate message to all clients, +// telling them to re-fetch the specified data type via REST API. +// This is used when the full payload exceeds the WebSocket message size limit. +func (h *Hub) broadcastInvalidate(originalType MessageType) { + msg := Message{ + Type: MessageTypeInvalidate, + Payload: map[string]string{"type": string(originalType)}, + Time: getCurrentTimestamp(), + } + + data, err := json.Marshal(msg) + if err != nil { + logger.Error("Failed to marshal invalidate message:", err) + return + } + + // Non-blocking send with timeout + select { + case h.broadcast <- data: + case <-time.After(100 * time.Millisecond): + logger.Warning("WebSocket broadcast channel is full, dropping invalidate message") + case <-h.ctx.Done(): + } +} + // getCurrentTimestamp returns current Unix timestamp in milliseconds func getCurrentTimestamp() int64 { return time.Now().UnixMilli() diff --git a/web/websocket/notifier.go b/web/websocket/notifier.go index 74cf61b2..d271aff6 100644 --- a/web/websocket/notifier.go +++ b/web/websocket/notifier.go @@ -24,6 +24,16 @@ func GetHub() *Hub { return wsHub } +// HasClients returns true if there are any WebSocket clients connected. +// Use this to skip expensive work (DB queries, serialization) when no browser is open. +func HasClients() bool { + hub := GetHub() + if hub == nil { + return false + } + return hub.GetClientCount() > 0 +} + // BroadcastStatus broadcasts server status update to all connected clients func BroadcastStatus(status any) { hub := GetHub() @@ -80,3 +90,15 @@ func BroadcastXrayState(state string, errorMsg string) { hub.Broadcast(MessageTypeXrayState, stateUpdate) } } + +// BroadcastInvalidate sends a lightweight invalidate signal for the given data type, +// telling connected frontends to re-fetch data via REST API. +// Use this instead of BroadcastInbounds/BroadcastOutbounds when you know the payload +// will be too large, to avoid wasting resources on serialization. +func BroadcastInvalidate(dataType MessageType) { + hub := GetHub() + if hub != nil { + hub.broadcastInvalidate(dataType) + } +} + From e7f2790236c1770369cebc3435fe4a479997c64e Mon Sep 17 00:00:00 2001 From: test999 Date: Mon, 6 Apr 2026 13:21:33 +0300 Subject: [PATCH 2/8] feat: add bulk client management support and improve inbound data handling --- install.sh | 2 +- update.sh | 2 +- web/assets/js/model/dbinbound.js | 20 ++++++- web/controller/websocket.go | 6 +- web/html/inbounds.html | 80 +++++++++++++++----------- web/html/index.html | 6 +- web/html/modals/client_bulk_modal.html | 4 +- web/html/modals/client_modal.html | 8 +-- web/html/settings.html | 7 +-- web/html/xray.html | 7 +-- web/service/xray.go | 40 +++++++------ web/websocket/hub.go | 4 +- 12 files changed, 108 insertions(+), 78 deletions(-) diff --git a/install.sh b/install.sh index af6b8a51..2ee1fd2c 100644 --- a/install.sh +++ b/install.sh @@ -581,7 +581,7 @@ prompt_and_setup_ssl() { # 3.1 Request Domain to compose Panel URL later read -rp "Please enter domain name certificate issued for: " custom_domain - custom_domain="${custom_domain// /}" # Убираем пробелы + custom_domain="${custom_domain// /}" # Remove spaces # 3.2 Loop for Certificate Path while true; do diff --git a/update.sh b/update.sh index b9cb3ddc..fd7cfc3c 100755 --- a/update.sh +++ b/update.sh @@ -609,7 +609,7 @@ prompt_and_setup_ssl() { # 3.1 Request Domain to compose Panel URL later read -rp "Please enter domain name certificate issued for: " custom_domain - custom_domain="${custom_domain// /}" # Убираем пробелы + custom_domain="${custom_domain// /}" # Remove spaces # 3.2 Loop for Certificate Path while true; do diff --git a/web/assets/js/model/dbinbound.js b/web/assets/js/model/dbinbound.js index befc618e..9132e94e 100644 --- a/web/assets/js/model/dbinbound.js +++ b/web/assets/js/model/dbinbound.js @@ -91,6 +91,10 @@ class DBInbound { } toInbound() { + if (this._cachedInbound) { + return this._cachedInbound; + } + let settings = {}; if (!ObjectUtil.isEmpty(this.settings)) { settings = JSON.parse(this.settings); @@ -116,7 +120,21 @@ class DBInbound { sniffing: sniffing, clientStats: this.clientStats, }; - return Inbound.fromJson(config); + + this._cachedInbound = Inbound.fromJson(config); + return this._cachedInbound; + } + + getClientStats(email) { + if (!this._clientStatsMap) { + this._clientStatsMap = new Map(); + if (this.clientStats && Array.isArray(this.clientStats)) { + for (const stats of this.clientStats) { + this._clientStatsMap.set(stats.email, stats); + } + } + } + return this._clientStatsMap.get(email); } isMultiUser() { diff --git a/web/controller/websocket.go b/web/controller/websocket.go index 0ad5c845..61e6f879 100644 --- a/web/controller/websocket.go +++ b/web/controller/websocket.go @@ -30,8 +30,10 @@ const ( ) var upgrader = ws.Upgrader{ - ReadBufferSize: 4096, // Increased from 1024 for better performance - WriteBufferSize: 4096, // Increased from 1024 for better performance + ReadBufferSize: 32768, // Huge buffers for huge databases + WriteBufferSize: 32768, // Huge buffers to reduce TCP fragmentation + EnableCompression: true, // Automatically GZIP large messages unconditionally + CheckOrigin: func(r *http.Request) bool { // Check origin for security origin := r.Header.Get("Origin") diff --git a/web/html/inbounds.html b/web/html/inbounds.html index fa6462ee..c85e84eb 100644 --- a/web/html/inbounds.html +++ b/web/html/inbounds.html @@ -6,7 +6,7 @@ - + @@ -14,10 +14,7 @@ - - - +
@@ -1304,7 +1301,6 @@ if (!clients || !Array.isArray(clients)) return; index = this.findIndexOfClient(dbInbound.protocol, clients, client); if (index < 0 || !clients[index]) return; - clients[index].enable = !clients[index].enable; clientId = this.getClientId(dbInbound.protocol, clients[index]); await this.updateClient(clients[index], dbInboundId, clientId); this.loading(false); @@ -1317,7 +1313,7 @@ }, getInboundClients(dbInbound) { if (!dbInbound) return null; - const inbound = dbInbound.toInbound(); + const inbound = this.inbounds.find(ib => ib.id === dbInbound.id) || dbInbound.toInbound(); return inbound && inbound.clients ? inbound.clients : null; }, resetClientTraffic(client, dbInboundId, confirmation = true) { @@ -1367,42 +1363,54 @@ isExpiry(dbInbound, index) { return dbInbound.toInbound().isExpiry(index); }, + getClientStats(dbInbound, email) { + if (!dbInbound) return null; + if (!dbInbound._clientStatsMap) { + dbInbound._clientStatsMap = new Map(); + if (dbInbound.clientStats && Array.isArray(dbInbound.clientStats)) { + for (const stats of dbInbound.clientStats) { + dbInbound._clientStatsMap.set(stats.email, stats); + } + } + } + return dbInbound._clientStatsMap.get(email); + }, getUpStats(dbInbound, email) { - if (email.length == 0) return 0; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 0; + let clientStats = this.getClientStats(dbInbound, email); return clientStats ? clientStats.up : 0; }, getDownStats(dbInbound, email) { - if (email.length == 0) return 0; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 0; + let clientStats = this.getClientStats(dbInbound, email); return clientStats ? clientStats.down : 0; }, getSumStats(dbInbound, email) { - if (email.length == 0) return 0; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 0; + let clientStats = this.getClientStats(dbInbound, email); return clientStats ? clientStats.up + clientStats.down : 0; }, getAllTimeClient(dbInbound, email) { - if (email.length == 0) return 0; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 0; + let clientStats = this.getClientStats(dbInbound, email); if (!clientStats) return 0; return clientStats.allTime || (clientStats.up + clientStats.down); }, getRemStats(dbInbound, email) { - if (email.length == 0) return 0; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 0; + let clientStats = this.getClientStats(dbInbound, email); if (!clientStats) return 0; - remained = clientStats.total - (clientStats.up + clientStats.down); + let remained = clientStats.total - (clientStats.up + clientStats.down); return remained > 0 ? remained : 0; }, clientStatsColor(dbInbound, email) { - if (email.length == 0) return ColorUtils.clientUsageColor(); - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return ColorUtils.clientUsageColor(); + let clientStats = this.getClientStats(dbInbound, email); return ColorUtils.clientUsageColor(clientStats, app.trafficDiff) }, statsProgress(dbInbound, email) { - if (email.length == 0) return 100; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 100; + let clientStats = this.getClientStats(dbInbound, email); if (!clientStats) return 0; if (clientStats.total == 0) return 100; return 100 * (clientStats.down + clientStats.up) / clientStats.total; @@ -1415,11 +1423,11 @@ return 100 * (1 - (remainedSeconds / resetSeconds)); }, statsExpColor(dbInbound, email) { - if (email.length == 0) return '#7a316f'; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return '#7a316f'; + let clientStats = this.getClientStats(dbInbound, email); if (!clientStats) return '#7a316f'; - statsColor = ColorUtils.usageColor(clientStats.down + clientStats.up, this.trafficDiff, clientStats.total); - expColor = ColorUtils.usageColor(new Date().getTime(), this.expireDiff, clientStats.expiryTime); + let statsColor = ColorUtils.usageColor(clientStats.down + clientStats.up, this.trafficDiff, clientStats.total); + let expColor = ColorUtils.usageColor(new Date().getTime(), this.expireDiff, clientStats.expiryTime); switch (true) { case statsColor == "red" || expColor == "red": return "#cf3c3c"; // Red @@ -1432,12 +1440,12 @@ } }, isClientEnabled(dbInbound, email) { - clientStats = dbInbound.clientStats ? dbInbound.clientStats.find(stats => stats.email === email) : null; + let clientStats = dbInbound ? this.getClientStats(dbInbound, email) : null; return clientStats ? clientStats['enable'] : true; }, isClientDepleted(dbInbound, email) { - if (!email || !dbInbound || !dbInbound.clientStats) return false; - const stats = dbInbound.clientStats.find(s => s.email === email); + if (!email || !dbInbound) return false; + const stats = this.getClientStats(dbInbound, email); if (!stats) return false; const total = stats.total ?? 0; const used = (stats.up ?? 0) + (stats.down ?? 0); @@ -1557,12 +1565,18 @@ pagination(obj) { if (this.pageSize > 0 && obj.length > this.pageSize) { // Set page options based on object size - sizeOptions = []; - for (i = this.pageSize; i <= obj.length; i = i + this.pageSize) { - sizeOptions.push(i.toString()); + let sizeOptions = [this.pageSize.toString()]; + const increments = [2, 5, 10, 20]; + for (const m of increments) { + const val = this.pageSize * m; + if (val < obj.length && val <= 1000) { + sizeOptions.push(val.toString()); + } } // Add option to see all in one page - sizeOptions.push(i.toString()); + if (!sizeOptions.includes(obj.length.toString())) { + sizeOptions.push(obj.length.toString()); + } p = { showSizeChanger: true, diff --git a/web/html/index.html b/web/html/index.html index bbbbb708..47645f7d 100644 --- a/web/html/index.html +++ b/web/html/index.html @@ -6,7 +6,7 @@ - + @@ -15,9 +15,7 @@