3x-ui/web/job/node_heartbeat_job.go
MHSanaei f4f0af576a
feat(ws): live updates on inbounds/xray/nodes pages, drop polling + manual refresh
Replaces the legacy polling + manual-refresh model with WebSocket pushes
across the three live-data pages. The hub already broadcast traffic /
client_stats / outbounds; this wires the frontend to consume them and
adds a new `nodes` channel for the heartbeat job's snapshot.

Frontend
- new useWebSocket composable: page-scoped singleton WebSocketClient,
  lifecycle-managed on/off, leaves disconnect to page-unload
- inbounds: useInbounds gains applyTrafficEvent / applyClientStatsEvent
  / applyInvalidate that merge counters and online/lastOnline in place;
  InboundsPage subscribes; InboundList drops the auto-refresh popover,
  the refresh button, and the now-unused refreshing prop
- xray outbounds: useXraySetting gains applyOutboundsEvent; XrayPage
  subscribes; OutboundsTab drops the refresh button + emit
- nodes: useNodes gains applyNodesEvent and stops the 5s
  setInterval/visibilitychange polling; NodesPage subscribes;
  NodeList drops the refresh button and ReloadOutlined import

Backend
- web/websocket: new MessageTypeNodes + BroadcastNodes notifier
- node_heartbeat_job: after wg.Wait(), reload the table once and
  BroadcastNodes(updated). Gated on websocket.HasClients() so a panel
  with no open browser doesn't spend the DB read

Bug fixes spotted in this pass
- websocket.js #buildUrl defaulted basePath to '' when the global was
  missing (dev mode), producing `ws://host:portws` and a SyntaxError
  on the WebSocket constructor. Fall back to '/' and ensure leading
  slash.
- vite.config.js: forward /ws to ws://localhost:2053 with ws:true so
  dev (5173) reaches the Go backend's WebSocket
- NodeFormModal: a-input-password's visibilityToggle is Boolean in
  AntD Vue 4; the v3-era object form (`{ visible, 'onUpdate:visible' }`)
  triggered a Vue prop-type warning. Drop the override (default true
  shows the eye icon and toggles internally) and remove the orphaned
  tokenVisible ref

Translations
- pages.inbounds.autoRefresh / autoRefreshInterval: removed from all
  13 locales (UI gone)
- pages.nodes.refresh: removed from all 13 locales (UI gone)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-09 17:30:31 +02:00

107 lines
3.2 KiB
Go

package job
import (
"context"
"sync"
"time"
"github.com/mhsanaei/3x-ui/v2/database/model"
"github.com/mhsanaei/3x-ui/v2/logger"
"github.com/mhsanaei/3x-ui/v2/web/service"
"github.com/mhsanaei/3x-ui/v2/web/websocket"
)
// nodeHeartbeatConcurrency caps how many remote panels we probe at once.
// Plenty of headroom for typical deployments (tens of nodes) without
// letting a misconfigured run open thousands of sockets at once.
const nodeHeartbeatConcurrency = 32
// nodeHeartbeatRequestTimeout bounds a single probe. The cron is @every 10s,
// so this needs to stay well under that to avoid run pile-up.
const nodeHeartbeatRequestTimeout = 6 * time.Second
// NodeHeartbeatJob probes every enabled remote node once per cron tick
// and persists the result. Disabled nodes are skipped entirely so a
// long-broken node can be parked without burning sockets every 10s.
type NodeHeartbeatJob struct {
nodeService service.NodeService
// Coarse mutex prevents two ticks running concurrently if probes
// pile up under network failure. The next tick simply skips when
// the previous one is still draining.
running sync.Mutex
}
// NewNodeHeartbeatJob constructs a heartbeat job. The robfig/cron
// scheduler will hand the same instance to every tick, so the
// running mutex carries across runs as intended.
func NewNodeHeartbeatJob() *NodeHeartbeatJob {
return &NodeHeartbeatJob{}
}
func (j *NodeHeartbeatJob) Run() {
if !j.running.TryLock() {
// Previous tick still in flight — skip this one.
return
}
defer j.running.Unlock()
nodes, err := j.nodeService.GetAll()
if err != nil {
logger.Warning("node heartbeat: load nodes failed:", err)
return
}
if len(nodes) == 0 {
return
}
sem := make(chan struct{}, nodeHeartbeatConcurrency)
var wg sync.WaitGroup
for _, n := range nodes {
if !n.Enable {
continue
}
wg.Add(1)
sem <- struct{}{}
go func(n *model.Node) {
defer wg.Done()
defer func() { <-sem }()
j.probeOne(n)
}(n)
}
wg.Wait()
// Push the fresh list to any open Nodes page over WebSocket so the
// status / latency / cpu / mem cells update without the user clicking
// refresh. Skip the DB read entirely when no browser is connected —
// matches the gating pattern in xray_traffic_job.
if !websocket.HasClients() {
return
}
updated, err := j.nodeService.GetAll()
if err != nil {
logger.Warning("node heartbeat: load nodes for broadcast failed:", err)
return
}
websocket.BroadcastNodes(updated)
}
// probeOne runs a single probe and persists the result. We deliberately
// don't return errors — partial failures across the node set should not
// abort other probes, and the LastError column carries the message for
// the UI to surface.
func (j *NodeHeartbeatJob) probeOne(n *model.Node) {
ctx, cancel := context.WithTimeout(context.Background(), nodeHeartbeatRequestTimeout)
defer cancel()
patch, err := j.nodeService.Probe(ctx, n)
if err != nil {
patch.Status = "offline"
} else {
patch.Status = "online"
}
if updErr := j.nodeService.UpdateHeartbeat(n.Id, patch); updErr != nil {
// A row deleted mid-tick produces "rows affected = 0", which
// gorm reports as nil — so any error we get here is real.
logger.Warning("node heartbeat: update node", n.Id, "failed:", updErr)
}
}