3x-ui/web/job/node_traffic_sync_job.go
MHSanaei 8e7d215b4a
feat(nodes): traffic-writer queue, full-mirror sync, WS event fixes
- Traffic-writer single-consumer queue (web/service/traffic_writer.go)
  serialises every DB write that touches up/down/all_time/last_online
  (AddTraffic, SetRemoteTraffic, Reset*, UpdateClientTrafficByEmail) so
  overlapping goroutines can no longer clobber each other's column-scoped
  Updates with a stale tx.Save.

- DB pool: WAL + busy_timeout=10s + synchronous=NORMAL + _txlock=
  immediate, MaxOpenConns=8 / MaxIdleConns=4. The immediate-tx PRAGMA
  fixes residual "database is locked [0ms]" cases where deferred-tx
  writer-upgrade conflicts bypass busy_timeout.

- SetRemoteTraffic full-mirrors node-authoritative state into central:
  settings JSON, remark, listen, port, total, expiry, all_time, enable,
  plus per-client total/expiry/reset/all_time. Inbounds and
  client_traffics rows present on node but missing from central are
  created; rows missing from snap are deleted (with cascading
  client_traffics removal).

- NodeTrafficSyncJob detects structural changes from the mirror and
  broadcasts invalidate(inbounds) so open central UIs re-fetch via REST
  on node-side add/del/edit without manual refresh.

- XrayTrafficJob broadcasts invalidate(inbounds) when auto-disable flips
  client_traffics.enable so the per-client toggle reflects depletion
  without manual refresh.

- Frontend: inbounds page now subscribes to the BroadcastInbounds 'inbounds'
  WS event (full-list pushes from add/del/update controllers were silently
  dropped). Fixes invalidate payload field (dataType -> type). Restart-
  panel modal switched from Promise-wrap to onOk-only so Cancel actually
  cancels.

- Node files trimmed of stale prose-comments; cron cadence dropped
  10s -> 5s to match the inbounds page UX.

- README badges and Go module path bumped v2 -> v3 to match module rename.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 16:25:23 +02:00

194 lines
4.1 KiB
Go

package job
import (
"context"
"sync"
"time"
"github.com/mhsanaei/3x-ui/v3/database/model"
"github.com/mhsanaei/3x-ui/v3/logger"
"github.com/mhsanaei/3x-ui/v3/web/runtime"
"github.com/mhsanaei/3x-ui/v3/web/service"
"github.com/mhsanaei/3x-ui/v3/web/websocket"
)
const (
nodeTrafficSyncConcurrency = 8
nodeTrafficSyncRequestTimeout = 4 * time.Second
)
type NodeTrafficSyncJob struct {
nodeService service.NodeService
inboundService service.InboundService
running sync.Mutex
structural atomicBool
}
type atomicBool struct {
mu sync.Mutex
v bool
}
func (a *atomicBool) set() {
a.mu.Lock()
a.v = true
a.mu.Unlock()
}
func (a *atomicBool) takeAndReset() bool {
a.mu.Lock()
v := a.v
a.v = false
a.mu.Unlock()
return v
}
type emailSet struct {
mu sync.Mutex
m map[string]struct{}
}
func newEmailSet() *emailSet { return &emailSet{m: make(map[string]struct{})} }
func (s *emailSet) addAll(emails []string) {
if len(emails) == 0 {
return
}
s.mu.Lock()
for _, e := range emails {
if e != "" {
s.m[e] = struct{}{}
}
}
s.mu.Unlock()
}
func (s *emailSet) slice() []string {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]string, 0, len(s.m))
for e := range s.m {
out = append(out, e)
}
return out
}
func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
return &NodeTrafficSyncJob{}
}
func (j *NodeTrafficSyncJob) Run() {
if !j.running.TryLock() {
return
}
defer j.running.Unlock()
mgr := runtime.GetManager()
if mgr == nil {
return
}
nodes, err := j.nodeService.GetAll()
if err != nil {
logger.Warning("node traffic sync: load nodes failed:", err)
return
}
if len(nodes) == 0 {
return
}
touched := newEmailSet()
sem := make(chan struct{}, nodeTrafficSyncConcurrency)
var wg sync.WaitGroup
for _, n := range nodes {
if !n.Enable || n.Status != "online" {
continue
}
wg.Add(1)
sem <- struct{}{}
go func(n *model.Node) {
defer wg.Done()
defer func() { <-sem }()
j.syncOne(mgr, n, touched)
}(n)
}
wg.Wait()
if !websocket.HasClients() {
return
}
online := j.inboundService.GetOnlineClients()
if online == nil {
online = []string{}
}
lastOnline, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("node traffic sync: get last-online failed:", err)
}
if lastOnline == nil {
lastOnline = map[string]int64{}
}
websocket.BroadcastTraffic(map[string]any{
"onlineClients": online,
"lastOnlineMap": lastOnline,
})
clientStats := map[string]any{}
if emails := touched.slice(); len(emails) > 0 {
if stats, err := j.inboundService.GetActiveClientTraffics(emails); err != nil {
logger.Warning("node traffic sync: get client traffics for websocket failed:", err)
} else if len(stats) > 0 {
clientStats["clients"] = stats
}
}
if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
} else if len(summary) > 0 {
clientStats["inbounds"] = summary
}
if len(clientStats) > 0 {
websocket.BroadcastClientStats(clientStats)
}
if j.structural.takeAndReset() {
websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
}
}
func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, touched *emailSet) {
ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
defer cancel()
rt, err := mgr.RemoteFor(n)
if err != nil {
logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
return
}
snap, err := rt.FetchTrafficSnapshot(ctx)
if err != nil {
logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
j.inboundService.ClearNodeOnlineClients(n.Id)
return
}
changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap)
if err != nil {
logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
return
}
if changed {
j.structural.set()
}
for _, ib := range snap.Inbounds {
if ib == nil {
continue
}
emails := make([]string, 0, len(ib.ClientStats))
for _, cs := range ib.ClientStats {
if cs.Email != "" {
emails = append(emails, cs.Email)
}
}
touched.addAll(emails)
}
}