mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-06-06 13:14:11 +00:00
Adds Runtime methods AddClient, UpdateUser, and DeleteUser so master
mutates clients on a node via /panel/api/clients/{add,update,del} rather
than pushing the whole inbound. The previous rt.UpdateInbound path made
the node DelInbound+AddInbound on every single-client change, briefly
cycling every other user on the same inbound.
DelInbound no longer filters by enable=true, so a disabled node inbound
actually gets removed from the node instead of being resurrected by the
next snap.
setRemoteTrafficLocked now sweeps any ClientRecord with zero
ClientInbound rows after SyncInbound rebuilds the attachments, which is
how a node-side delete propagates back to master instead of leaving a
detached ghost. ClientService.Delete tombstones the email first so a
snap arriving mid-delete can't re-create the record.
WebSocket broadcasts an "invalidate(clients)" message on every client
mutation so the Clients page refreshes without manual reload.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
171 lines
4.1 KiB
Go
171 lines
4.1 KiB
Go
package job
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mhsanaei/3x-ui/v3/database/model"
|
|
"github.com/mhsanaei/3x-ui/v3/logger"
|
|
"github.com/mhsanaei/3x-ui/v3/web/runtime"
|
|
"github.com/mhsanaei/3x-ui/v3/web/service"
|
|
"github.com/mhsanaei/3x-ui/v3/web/websocket"
|
|
)
|
|
|
|
const (
|
|
nodeTrafficSyncConcurrency = 8
|
|
nodeTrafficSyncRequestTimeout = 4 * time.Second
|
|
)
|
|
|
|
type NodeTrafficSyncJob struct {
|
|
nodeService service.NodeService
|
|
inboundService service.InboundService
|
|
settingService service.SettingService
|
|
xrayService service.XrayService
|
|
running sync.Mutex
|
|
structural atomicBool
|
|
}
|
|
|
|
type atomicBool struct {
|
|
mu sync.Mutex
|
|
v bool
|
|
}
|
|
|
|
func (a *atomicBool) set() {
|
|
a.mu.Lock()
|
|
a.v = true
|
|
a.mu.Unlock()
|
|
}
|
|
|
|
func (a *atomicBool) takeAndReset() bool {
|
|
a.mu.Lock()
|
|
v := a.v
|
|
a.v = false
|
|
a.mu.Unlock()
|
|
return v
|
|
}
|
|
|
|
func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
|
|
return &NodeTrafficSyncJob{}
|
|
}
|
|
|
|
func (j *NodeTrafficSyncJob) Run() {
|
|
if !j.running.TryLock() {
|
|
return
|
|
}
|
|
defer j.running.Unlock()
|
|
|
|
mgr := runtime.GetManager()
|
|
if mgr == nil {
|
|
return
|
|
}
|
|
|
|
nodes, err := j.nodeService.GetAll()
|
|
if err != nil {
|
|
logger.Warning("node traffic sync: load nodes failed:", err)
|
|
return
|
|
}
|
|
if len(nodes) == 0 {
|
|
return
|
|
}
|
|
|
|
sem := make(chan struct{}, nodeTrafficSyncConcurrency)
|
|
var wg sync.WaitGroup
|
|
for _, n := range nodes {
|
|
if !n.Enable || n.Status != "online" {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
sem <- struct{}{}
|
|
go func(n *model.Node) {
|
|
defer wg.Done()
|
|
defer func() { <-sem }()
|
|
j.syncOne(mgr, n)
|
|
}(n)
|
|
}
|
|
wg.Wait()
|
|
|
|
_, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil)
|
|
if err != nil {
|
|
logger.Warning("node traffic sync: depletion check failed:", err)
|
|
}
|
|
if clientsDisabled {
|
|
if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable {
|
|
if err := j.xrayService.RestartXray(true); err != nil {
|
|
logger.Warning("node traffic sync: restart xray after disabling clients failed:", err)
|
|
j.xrayService.SetToNeedRestart()
|
|
}
|
|
} else if settingErr != nil {
|
|
logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr)
|
|
}
|
|
j.structural.set()
|
|
}
|
|
|
|
if !websocket.HasClients() {
|
|
return
|
|
}
|
|
|
|
lastOnline, err := j.inboundService.GetClientsLastOnline()
|
|
if err != nil {
|
|
logger.Warning("node traffic sync: get last-online failed:", err)
|
|
}
|
|
if lastOnline == nil {
|
|
lastOnline = map[string]int64{}
|
|
}
|
|
|
|
j.inboundService.RefreshOnlineClientsFromMap(lastOnline)
|
|
|
|
online := j.inboundService.GetOnlineClients()
|
|
if online == nil {
|
|
online = []string{}
|
|
}
|
|
websocket.BroadcastTraffic(map[string]any{
|
|
"onlineClients": online,
|
|
"lastOnlineMap": lastOnline,
|
|
})
|
|
|
|
clientStats := map[string]any{}
|
|
if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
|
|
logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
|
|
} else if len(stats) > 0 {
|
|
clientStats["clients"] = stats
|
|
}
|
|
if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
|
|
logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
|
|
} else if len(summary) > 0 {
|
|
clientStats["inbounds"] = summary
|
|
}
|
|
if len(clientStats) > 0 {
|
|
websocket.BroadcastClientStats(clientStats)
|
|
}
|
|
|
|
if j.structural.takeAndReset() {
|
|
websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
|
|
websocket.BroadcastInvalidate(websocket.MessageTypeClients)
|
|
}
|
|
}
|
|
|
|
func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
|
|
defer cancel()
|
|
|
|
rt, err := mgr.RemoteFor(n)
|
|
if err != nil {
|
|
logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
|
|
return
|
|
}
|
|
snap, err := rt.FetchTrafficSnapshot(ctx)
|
|
if err != nil {
|
|
logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
|
|
j.inboundService.ClearNodeOnlineClients(n.Id)
|
|
return
|
|
}
|
|
changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap)
|
|
if err != nil {
|
|
logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
|
|
return
|
|
}
|
|
if changed {
|
|
j.structural.set()
|
|
}
|
|
}
|