3x-ui/web/runtime/local.go
MHSanaei 8e7d215b4a
feat(nodes): traffic-writer queue, full-mirror sync, WS event fixes
- Traffic-writer single-consumer queue (web/service/traffic_writer.go)
  serialises every DB write that touches up/down/all_time/last_online
  (AddTraffic, SetRemoteTraffic, Reset*, UpdateClientTrafficByEmail) so
  overlapping goroutines can no longer clobber each other's column-scoped
  Updates with a stale tx.Save.

- DB pool: WAL + busy_timeout=10s + synchronous=NORMAL + _txlock=
  immediate, MaxOpenConns=8 / MaxIdleConns=4. The immediate-tx PRAGMA
  fixes residual "database is locked [0ms]" cases where deferred-tx
  writer-upgrade conflicts bypass busy_timeout.

- SetRemoteTraffic full-mirrors node-authoritative state into central:
  settings JSON, remark, listen, port, total, expiry, all_time, enable,
  plus per-client total/expiry/reset/all_time. Inbounds and
  client_traffics rows present on node but missing from central are
  created; rows missing from snap are deleted (with cascading
  client_traffics removal).

- NodeTrafficSyncJob detects structural changes from the mirror and
  broadcasts invalidate(inbounds) so open central UIs re-fetch via REST
  on node-side add/del/edit without manual refresh.

- XrayTrafficJob broadcasts invalidate(inbounds) when auto-disable flips
  client_traffics.enable so the per-client toggle reflects depletion
  without manual refresh.

- Frontend: inbounds page now subscribes to the BroadcastInbounds 'inbounds'
  WS event (full-list pushes from add/del/update controllers were silently
  dropped). Fixes invalidate payload field (dataType -> type). Restart-
  panel modal switched from Promise-wrap to onOk-only so Cancel actually
  cancels.

- Node files trimmed of stale prose-comments; cron cadence dropped
  10s -> 5s to match the inbounds page UX.

- README badges and Go module path bumped v2 -> v3 to match module rename.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 16:25:23 +02:00

98 lines
2.1 KiB
Go

package runtime
import (
"context"
"encoding/json"
"errors"
"sync"
"github.com/mhsanaei/3x-ui/v3/database/model"
"github.com/mhsanaei/3x-ui/v3/xray"
)
type LocalDeps struct {
APIPort func() int
SetNeedRestart func()
}
type Local struct {
deps LocalDeps
mu sync.Mutex
}
func NewLocal(deps LocalDeps) *Local {
return &Local{deps: deps}
}
func (l *Local) Name() string { return "local" }
func (l *Local) withAPI(fn func(api *xray.XrayAPI) error) error {
l.mu.Lock()
defer l.mu.Unlock()
port := l.deps.APIPort()
if port <= 0 {
return errors.New("local xray is not running")
}
var api xray.XrayAPI
if err := api.Init(port); err != nil {
return err
}
defer api.Close()
return fn(&api)
}
func (l *Local) AddInbound(_ context.Context, ib *model.Inbound) error {
body, err := json.MarshalIndent(ib.GenXrayInboundConfig(), "", " ")
if err != nil {
return err
}
return l.withAPI(func(api *xray.XrayAPI) error {
return api.AddInbound(body)
})
}
func (l *Local) DelInbound(_ context.Context, ib *model.Inbound) error {
return l.withAPI(func(api *xray.XrayAPI) error {
return api.DelInbound(ib.Tag)
})
}
func (l *Local) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) error {
_ = l.DelInbound(ctx, oldIb)
if !newIb.Enable {
return nil
}
return l.AddInbound(ctx, newIb)
}
func (l *Local) AddUser(_ context.Context, ib *model.Inbound, userMap map[string]any) error {
return l.withAPI(func(api *xray.XrayAPI) error {
return api.AddUser(string(ib.Protocol), ib.Tag, userMap)
})
}
func (l *Local) RemoveUser(_ context.Context, ib *model.Inbound, email string) error {
return l.withAPI(func(api *xray.XrayAPI) error {
return api.RemoveUser(ib.Tag, email)
})
}
func (l *Local) RestartXray(_ context.Context) error {
if l.deps.SetNeedRestart != nil {
l.deps.SetNeedRestart()
}
return nil
}
func (l *Local) ResetClientTraffic(_ context.Context, _ *model.Inbound, _ string) error {
return nil
}
func (l *Local) ResetInboundClientTraffics(_ context.Context, _ *model.Inbound) error {
return nil
}
func (l *Local) ResetAllTraffics(_ context.Context) error {
return nil
}