mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-05-13 17:46:02 +00:00
- Traffic-writer single-consumer queue (web/service/traffic_writer.go) serialises every DB write that touches up/down/all_time/last_online (AddTraffic, SetRemoteTraffic, Reset*, UpdateClientTrafficByEmail) so overlapping goroutines can no longer clobber each other's column-scoped Updates with a stale tx.Save. - DB pool: WAL + busy_timeout=10s + synchronous=NORMAL + _txlock= immediate, MaxOpenConns=8 / MaxIdleConns=4. The immediate-tx PRAGMA fixes residual "database is locked [0ms]" cases where deferred-tx writer-upgrade conflicts bypass busy_timeout. - SetRemoteTraffic full-mirrors node-authoritative state into central: settings JSON, remark, listen, port, total, expiry, all_time, enable, plus per-client total/expiry/reset/all_time. Inbounds and client_traffics rows present on node but missing from central are created; rows missing from snap are deleted (with cascading client_traffics removal). - NodeTrafficSyncJob detects structural changes from the mirror and broadcasts invalidate(inbounds) so open central UIs re-fetch via REST on node-side add/del/edit without manual refresh. - XrayTrafficJob broadcasts invalidate(inbounds) when auto-disable flips client_traffics.enable so the per-client toggle reflects depletion without manual refresh. - Frontend: inbounds page now subscribes to the BroadcastInbounds 'inbounds' WS event (full-list pushes from add/del/update controllers were silently dropped). Fixes invalidate payload field (dataType -> type). Restart- panel modal switched from Promise-wrap to onOk-only so Cancel actually cancels. - Node files trimmed of stale prose-comments; cron cadence dropped 10s -> 5s to match the inbounds page UX. - README badges and Go module path bumped v2 -> v3 to match module rename. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
82 lines
1.7 KiB
Go
82 lines
1.7 KiB
Go
package job
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mhsanaei/3x-ui/v3/database/model"
|
|
"github.com/mhsanaei/3x-ui/v3/logger"
|
|
"github.com/mhsanaei/3x-ui/v3/web/service"
|
|
"github.com/mhsanaei/3x-ui/v3/web/websocket"
|
|
)
|
|
|
|
const (
|
|
nodeHeartbeatConcurrency = 32
|
|
nodeHeartbeatRequestTimeout = 4 * time.Second
|
|
)
|
|
|
|
type NodeHeartbeatJob struct {
|
|
nodeService service.NodeService
|
|
running sync.Mutex
|
|
}
|
|
|
|
func NewNodeHeartbeatJob() *NodeHeartbeatJob {
|
|
return &NodeHeartbeatJob{}
|
|
}
|
|
|
|
func (j *NodeHeartbeatJob) Run() {
|
|
if !j.running.TryLock() {
|
|
return
|
|
}
|
|
defer j.running.Unlock()
|
|
|
|
nodes, err := j.nodeService.GetAll()
|
|
if err != nil {
|
|
logger.Warning("node heartbeat: load nodes failed:", err)
|
|
return
|
|
}
|
|
if len(nodes) == 0 {
|
|
return
|
|
}
|
|
|
|
sem := make(chan struct{}, nodeHeartbeatConcurrency)
|
|
var wg sync.WaitGroup
|
|
for _, n := range nodes {
|
|
if !n.Enable {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
sem <- struct{}{}
|
|
go func(n *model.Node) {
|
|
defer wg.Done()
|
|
defer func() { <-sem }()
|
|
j.probeOne(n)
|
|
}(n)
|
|
}
|
|
wg.Wait()
|
|
|
|
if !websocket.HasClients() {
|
|
return
|
|
}
|
|
updated, err := j.nodeService.GetAll()
|
|
if err != nil {
|
|
logger.Warning("node heartbeat: load nodes for broadcast failed:", err)
|
|
return
|
|
}
|
|
websocket.BroadcastNodes(updated)
|
|
}
|
|
|
|
func (j *NodeHeartbeatJob) probeOne(n *model.Node) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), nodeHeartbeatRequestTimeout)
|
|
defer cancel()
|
|
patch, err := j.nodeService.Probe(ctx, n)
|
|
if err != nil {
|
|
patch.Status = "offline"
|
|
} else {
|
|
patch.Status = "online"
|
|
}
|
|
if updErr := j.nodeService.UpdateHeartbeat(n.Id, patch); updErr != nil {
|
|
logger.Warning("node heartbeat: update node", n.Id, "failed:", updErr)
|
|
}
|
|
}
|