feat: implement real-time traffic monitoring and UI updates using a high-performance WebSocket hub and background job system

This commit is contained in:
test999 2026-04-06 08:16:10 +03:00
parent 169b216d7e
commit 97f284c9a7
5 changed files with 114 additions and 20 deletions

View file

@ -1605,11 +1605,25 @@
} }
}); });
// Listen for invalidate signals (sent when payload is too large for WebSocket)
// The server sends a lightweight notification and we re-fetch via REST API
let invalidateTimer = null;
window.wsClient.on('invalidate', (payload) => {
if (payload && (payload.type === 'inbounds' || payload.type === 'traffic')) {
// Debounce to avoid flooding the REST API with multiple invalidate signals
if (invalidateTimer) clearTimeout(invalidateTimer);
invalidateTimer = setTimeout(() => {
invalidateTimer = null;
this.getDBInbounds();
}, 1000);
}
});
// Listen for traffic updates // Listen for traffic updates
window.wsClient.on('traffic', (payload) => { window.wsClient.on('traffic', (payload) => {
// Note: Do NOT update total consumed traffic (stats.up, stats.down) from this event // Note: Do NOT update total consumed traffic (stats.up, stats.down) from this event
// because clientTraffics contains delta/incremental values, not total accumulated values. // because clientTraffics contains delta/incremental values, not total accumulated values.
// Total traffic is updated via the 'inbounds' event which contains accumulated values from database. // Total traffic is updated via the 'inbounds' WebSocket event (or 'invalidate' fallback for large panels).
// Update online clients list in real-time // Update online clients list in real-time
if (payload && Array.isArray(payload.onlineClients)) { if (payload && Array.isArray(payload.onlineClients)) {
@ -1627,22 +1641,27 @@
this.onlineClients = nextOnlineClients; this.onlineClients = nextOnlineClients;
if (onlineChanged) { if (onlineChanged) {
// Recalculate client counts to update online status // Recalculate client counts to update online status
// Use $set for Vue 2 reactivity — direct array index assignment is not reactive
this.dbInbounds.forEach(dbInbound => { this.dbInbounds.forEach(dbInbound => {
const inbound = this.inbounds.find(ib => ib.id === dbInbound.id); const inbound = this.inbounds.find(ib => ib.id === dbInbound.id);
if (inbound && this.clientCount[dbInbound.id]) { if (inbound && this.clientCount[dbInbound.id]) {
this.clientCount[dbInbound.id] = this.getClientCounts(dbInbound, inbound); this.$set(this.clientCount, dbInbound.id, this.getClientCounts(dbInbound, inbound));
} }
}); });
// Always trigger UI refresh — not just when filter is enabled
if (this.enableFilter) { if (this.enableFilter) {
this.filterInbounds(); this.filterInbounds();
} else {
this.searchInbounds(this.searchKey);
} }
} }
} }
// Update last online map in real-time // Update last online map in real-time
// Replace entirely (server sends the full map) to avoid unbounded growth from deleted clients
if (payload && payload.lastOnlineMap && typeof payload.lastOnlineMap === 'object') { if (payload && payload.lastOnlineMap && typeof payload.lastOnlineMap === 'object') {
this.lastOnlineMap = { ...this.lastOnlineMap, ...payload.lastOnlineMap }; this.lastOnlineMap = payload.lastOnlineMap;
} }
}); });

View file

@ -1075,6 +1075,14 @@
this.$forceUpdate(); this.$forceUpdate();
} }
}); });
// Handle invalidate signals (sent when payload is too large for WebSocket,
// or when traffic job notifies about data changes)
window.wsClient.on('invalidate', (payload) => {
if (payload && payload.type === 'outbounds') {
this.refreshOutboundTraffic();
}
});
} }
while (true) { while (true) {

View file

@ -50,7 +50,13 @@ func (j *XrayTrafficJob) Run() {
j.xrayService.SetToNeedRestart() j.xrayService.SetToNeedRestart()
} }
// Get online clients and last online map for real-time status updates // If no frontend client is connected, skip all WebSocket broadcasting routines,
// including expensive DB queries for online clients and JSON marshaling.
if !websocket.HasClients() {
return
}
// Update online clients list and map
onlineClients := j.inboundService.GetOnlineClients() onlineClients := j.inboundService.GetOnlineClients()
lastOnlineMap, err := j.inboundService.GetClientsLastOnline() lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil { if err != nil {
@ -58,8 +64,17 @@ func (j *XrayTrafficJob) Run() {
lastOnlineMap = make(map[string]int64) lastOnlineMap = make(map[string]int64)
} }
// Broadcast traffic update (deltas and online stats) via WebSocket
trafficUpdate := map[string]any{
"traffics": traffics,
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"lastOnlineMap": lastOnlineMap,
}
websocket.BroadcastTraffic(trafficUpdate)
// Fetch updated inbounds from database with accumulated traffic values // Fetch updated inbounds from database with accumulated traffic values
// This ensures frontend receives the actual total traffic, not just delta values // This ensures frontend receives the actual total traffic for real-time UI refresh.
updatedInbounds, err := j.inboundService.GetAllInbounds() updatedInbounds, err := j.inboundService.GetAllInbounds()
if err != nil { if err != nil {
logger.Warning("get all inbounds for websocket failed:", err) logger.Warning("get all inbounds for websocket failed:", err)
@ -70,16 +85,8 @@ func (j *XrayTrafficJob) Run() {
logger.Warning("get all outbounds for websocket failed:", err) logger.Warning("get all outbounds for websocket failed:", err)
} }
// Broadcast traffic update via WebSocket with accumulated values from database // The web socket hub will automatically check the payload size.
trafficUpdate := map[string]any{ // If < 1MB, it pushes real-time. If > 1MB, it sends a lightweight 'invalidate' signal.
"traffics": traffics,
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"lastOnlineMap": lastOnlineMap,
}
websocket.BroadcastTraffic(trafficUpdate)
// Broadcast full inbounds update for real-time UI refresh
if updatedInbounds != nil { if updatedInbounds != nil {
websocket.BroadcastInbounds(updatedInbounds) websocket.BroadcastInbounds(updatedInbounds)
} }
@ -87,7 +94,6 @@ func (j *XrayTrafficJob) Run() {
if updatedOutbounds != nil { if updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds) websocket.BroadcastOutbounds(updatedOutbounds)
} }
} }
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) { func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {

View file

@ -21,6 +21,7 @@ const (
MessageTypeNotification MessageType = "notification" // System notification MessageTypeNotification MessageType = "notification" // System notification
MessageTypeXrayState MessageType = "xray_state" // Xray state change MessageTypeXrayState MessageType = "xray_state" // Xray state change
MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update
MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST
) )
// Message represents a WebSocket message // Message represents a WebSocket message
@ -259,6 +260,11 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) {
return return
} }
// Skip all work if no clients are connected
if h.GetClientCount() == 0 {
return
}
msg := Message{ msg := Message{
Type: messageType, Type: messageType,
Payload: payload, Payload: payload,
@ -271,10 +277,12 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) {
return return
} }
// Limit message size to prevent memory issues // If message exceeds size limit, send a lightweight invalidate notification
// instead of dropping it entirely — the frontend will re-fetch via REST API
const maxMessageSize = 1024 * 1024 // 1MB const maxMessageSize = 1024 * 1024 // 1MB
if len(data) > maxMessageSize { if len(data) > maxMessageSize {
logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
h.broadcastInvalidate(messageType)
return return
} }
@ -298,6 +306,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) {
return return
} }
// Skip all work if no clients are connected
if h.GetClientCount() == 0 {
return
}
msg := Message{ msg := Message{
Type: messageType, Type: messageType,
Payload: payload, Payload: payload,
@ -310,10 +323,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) {
return return
} }
// Limit message size to prevent memory issues // If message exceeds size limit, send a lightweight invalidate notification
const maxMessageSize = 1024 * 1024 // 1MB const maxMessageSize = 1024 * 1024 // 1MB
if len(data) > maxMessageSize { if len(data) > maxMessageSize {
logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
h.broadcastInvalidate(messageType)
return return
} }
@ -374,6 +388,31 @@ func (h *Hub) Stop() {
} }
} }
// broadcastInvalidate sends a lightweight invalidate message to all clients,
// telling them to re-fetch the specified data type via REST API.
// This is used when the full payload exceeds the WebSocket message size limit.
func (h *Hub) broadcastInvalidate(originalType MessageType) {
msg := Message{
Type: MessageTypeInvalidate,
Payload: map[string]string{"type": string(originalType)},
Time: getCurrentTimestamp(),
}
data, err := json.Marshal(msg)
if err != nil {
logger.Error("Failed to marshal invalidate message:", err)
return
}
// Non-blocking send with timeout
select {
case h.broadcast <- data:
case <-time.After(100 * time.Millisecond):
logger.Warning("WebSocket broadcast channel is full, dropping invalidate message")
case <-h.ctx.Done():
}
}
// getCurrentTimestamp returns current Unix timestamp in milliseconds // getCurrentTimestamp returns current Unix timestamp in milliseconds
func getCurrentTimestamp() int64 { func getCurrentTimestamp() int64 {
return time.Now().UnixMilli() return time.Now().UnixMilli()

View file

@ -24,6 +24,16 @@ func GetHub() *Hub {
return wsHub return wsHub
} }
// HasClients returns true if there are any WebSocket clients connected.
// Use this to skip expensive work (DB queries, serialization) when no browser is open.
func HasClients() bool {
hub := GetHub()
if hub == nil {
return false
}
return hub.GetClientCount() > 0
}
// BroadcastStatus broadcasts server status update to all connected clients // BroadcastStatus broadcasts server status update to all connected clients
func BroadcastStatus(status any) { func BroadcastStatus(status any) {
hub := GetHub() hub := GetHub()
@ -80,3 +90,15 @@ func BroadcastXrayState(state string, errorMsg string) {
hub.Broadcast(MessageTypeXrayState, stateUpdate) hub.Broadcast(MessageTypeXrayState, stateUpdate)
} }
} }
// BroadcastInvalidate sends a lightweight invalidate signal for the given data type,
// telling connected frontends to re-fetch data via REST API.
// Use this instead of BroadcastInbounds/BroadcastOutbounds when you know the payload
// will be too large, to avoid wasting resources on serialization.
func BroadcastInvalidate(dataType MessageType) {
hub := GetHub()
if hub != nil {
hub.broadcastInvalidate(dataType)
}
}