From 97f284c9a7404b6973823cf5e77314d98f6232e2 Mon Sep 17 00:00:00 2001 From: test999 Date: Mon, 6 Apr 2026 08:16:10 +0300 Subject: [PATCH] feat: implement real-time traffic monitoring and UI updates using a high-performance WebSocket hub and background job system --- web/html/inbounds.html | 25 +++++++++++++++++--- web/html/xray.html | 8 +++++++ web/job/xray_traffic_job.go | 32 +++++++++++++++---------- web/websocket/hub.go | 47 +++++++++++++++++++++++++++++++++---- web/websocket/notifier.go | 22 +++++++++++++++++ 5 files changed, 114 insertions(+), 20 deletions(-) diff --git a/web/html/inbounds.html b/web/html/inbounds.html index b945da90..fa6462ee 100644 --- a/web/html/inbounds.html +++ b/web/html/inbounds.html @@ -1605,11 +1605,25 @@ } }); + // Listen for invalidate signals (sent when payload is too large for WebSocket) + // The server sends a lightweight notification and we re-fetch via REST API + let invalidateTimer = null; + window.wsClient.on('invalidate', (payload) => { + if (payload && (payload.type === 'inbounds' || payload.type === 'traffic')) { + // Debounce to avoid flooding the REST API with multiple invalidate signals + if (invalidateTimer) clearTimeout(invalidateTimer); + invalidateTimer = setTimeout(() => { + invalidateTimer = null; + this.getDBInbounds(); + }, 1000); + } + }); + // Listen for traffic updates window.wsClient.on('traffic', (payload) => { // Note: Do NOT update total consumed traffic (stats.up, stats.down) from this event // because clientTraffics contains delta/incremental values, not total accumulated values. - // Total traffic is updated via the 'inbounds' event which contains accumulated values from database. + // Total traffic is updated via the 'inbounds' WebSocket event (or 'invalidate' fallback for large panels). // Update online clients list in real-time if (payload && Array.isArray(payload.onlineClients)) { @@ -1627,22 +1641,27 @@ this.onlineClients = nextOnlineClients; if (onlineChanged) { // Recalculate client counts to update online status + // Use $set for Vue 2 reactivity — direct array index assignment is not reactive this.dbInbounds.forEach(dbInbound => { const inbound = this.inbounds.find(ib => ib.id === dbInbound.id); if (inbound && this.clientCount[dbInbound.id]) { - this.clientCount[dbInbound.id] = this.getClientCounts(dbInbound, inbound); + this.$set(this.clientCount, dbInbound.id, this.getClientCounts(dbInbound, inbound)); } }); + // Always trigger UI refresh — not just when filter is enabled if (this.enableFilter) { this.filterInbounds(); + } else { + this.searchInbounds(this.searchKey); } } } // Update last online map in real-time + // Replace entirely (server sends the full map) to avoid unbounded growth from deleted clients if (payload && payload.lastOnlineMap && typeof payload.lastOnlineMap === 'object') { - this.lastOnlineMap = { ...this.lastOnlineMap, ...payload.lastOnlineMap }; + this.lastOnlineMap = payload.lastOnlineMap; } }); diff --git a/web/html/xray.html b/web/html/xray.html index ebe31f48..d1a12e45 100644 --- a/web/html/xray.html +++ b/web/html/xray.html @@ -1075,6 +1075,14 @@ this.$forceUpdate(); } }); + + // Handle invalidate signals (sent when payload is too large for WebSocket, + // or when traffic job notifies about data changes) + window.wsClient.on('invalidate', (payload) => { + if (payload && payload.type === 'outbounds') { + this.refreshOutboundTraffic(); + } + }); } while (true) { diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index 8d2bfbd6..b1a775f0 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -50,7 +50,13 @@ func (j *XrayTrafficJob) Run() { j.xrayService.SetToNeedRestart() } - // Get online clients and last online map for real-time status updates + // If no frontend client is connected, skip all WebSocket broadcasting routines, + // including expensive DB queries for online clients and JSON marshaling. + if !websocket.HasClients() { + return + } + + // Update online clients list and map onlineClients := j.inboundService.GetOnlineClients() lastOnlineMap, err := j.inboundService.GetClientsLastOnline() if err != nil { @@ -58,8 +64,17 @@ func (j *XrayTrafficJob) Run() { lastOnlineMap = make(map[string]int64) } + // Broadcast traffic update (deltas and online stats) via WebSocket + trafficUpdate := map[string]any{ + "traffics": traffics, + "clientTraffics": clientTraffics, + "onlineClients": onlineClients, + "lastOnlineMap": lastOnlineMap, + } + websocket.BroadcastTraffic(trafficUpdate) + // Fetch updated inbounds from database with accumulated traffic values - // This ensures frontend receives the actual total traffic, not just delta values + // This ensures frontend receives the actual total traffic for real-time UI refresh. updatedInbounds, err := j.inboundService.GetAllInbounds() if err != nil { logger.Warning("get all inbounds for websocket failed:", err) @@ -70,16 +85,8 @@ func (j *XrayTrafficJob) Run() { logger.Warning("get all outbounds for websocket failed:", err) } - // Broadcast traffic update via WebSocket with accumulated values from database - trafficUpdate := map[string]any{ - "traffics": traffics, - "clientTraffics": clientTraffics, - "onlineClients": onlineClients, - "lastOnlineMap": lastOnlineMap, - } - websocket.BroadcastTraffic(trafficUpdate) - - // Broadcast full inbounds update for real-time UI refresh + // The web socket hub will automatically check the payload size. + // If < 1MB, it pushes real-time. If > 1MB, it sends a lightweight 'invalidate' signal. if updatedInbounds != nil { websocket.BroadcastInbounds(updatedInbounds) } @@ -87,7 +94,6 @@ func (j *XrayTrafficJob) Run() { if updatedOutbounds != nil { websocket.BroadcastOutbounds(updatedOutbounds) } - } func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) { diff --git a/web/websocket/hub.go b/web/websocket/hub.go index 8aa5903c..e2c0072c 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -21,6 +21,7 @@ const ( MessageTypeNotification MessageType = "notification" // System notification MessageTypeXrayState MessageType = "xray_state" // Xray state change MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update + MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST ) // Message represents a WebSocket message @@ -259,6 +260,11 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) { return } + // Skip all work if no clients are connected + if h.GetClientCount() == 0 { + return + } + msg := Message{ Type: messageType, Payload: payload, @@ -271,10 +277,12 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) { return } - // Limit message size to prevent memory issues + // If message exceeds size limit, send a lightweight invalidate notification + // instead of dropping it entirely — the frontend will re-fetch via REST API const maxMessageSize = 1024 * 1024 // 1MB if len(data) > maxMessageSize { - logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) + logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType) + h.broadcastInvalidate(messageType) return } @@ -298,6 +306,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) { return } + // Skip all work if no clients are connected + if h.GetClientCount() == 0 { + return + } + msg := Message{ Type: messageType, Payload: payload, @@ -310,10 +323,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) { return } - // Limit message size to prevent memory issues + // If message exceeds size limit, send a lightweight invalidate notification const maxMessageSize = 1024 * 1024 // 1MB if len(data) > maxMessageSize { - logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) + logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType) + h.broadcastInvalidate(messageType) return } @@ -374,6 +388,31 @@ func (h *Hub) Stop() { } } +// broadcastInvalidate sends a lightweight invalidate message to all clients, +// telling them to re-fetch the specified data type via REST API. +// This is used when the full payload exceeds the WebSocket message size limit. +func (h *Hub) broadcastInvalidate(originalType MessageType) { + msg := Message{ + Type: MessageTypeInvalidate, + Payload: map[string]string{"type": string(originalType)}, + Time: getCurrentTimestamp(), + } + + data, err := json.Marshal(msg) + if err != nil { + logger.Error("Failed to marshal invalidate message:", err) + return + } + + // Non-blocking send with timeout + select { + case h.broadcast <- data: + case <-time.After(100 * time.Millisecond): + logger.Warning("WebSocket broadcast channel is full, dropping invalidate message") + case <-h.ctx.Done(): + } +} + // getCurrentTimestamp returns current Unix timestamp in milliseconds func getCurrentTimestamp() int64 { return time.Now().UnixMilli() diff --git a/web/websocket/notifier.go b/web/websocket/notifier.go index 74cf61b2..d271aff6 100644 --- a/web/websocket/notifier.go +++ b/web/websocket/notifier.go @@ -24,6 +24,16 @@ func GetHub() *Hub { return wsHub } +// HasClients returns true if there are any WebSocket clients connected. +// Use this to skip expensive work (DB queries, serialization) when no browser is open. +func HasClients() bool { + hub := GetHub() + if hub == nil { + return false + } + return hub.GetClientCount() > 0 +} + // BroadcastStatus broadcasts server status update to all connected clients func BroadcastStatus(status any) { hub := GetHub() @@ -80,3 +90,15 @@ func BroadcastXrayState(state string, errorMsg string) { hub.Broadcast(MessageTypeXrayState, stateUpdate) } } + +// BroadcastInvalidate sends a lightweight invalidate signal for the given data type, +// telling connected frontends to re-fetch data via REST API. +// Use this instead of BroadcastInbounds/BroadcastOutbounds when you know the payload +// will be too large, to avoid wasting resources on serialization. +func BroadcastInvalidate(dataType MessageType) { + hub := GetHub() + if hub != nil { + hub.broadcastInvalidate(dataType) + } +} +