From 8e7d215b4a41f2ee79d918adf63a73eaa9180fa7 Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Sun, 10 May 2026 16:25:23 +0200 Subject: [PATCH] feat(nodes): traffic-writer queue, full-mirror sync, WS event fixes - Traffic-writer single-consumer queue (web/service/traffic_writer.go) serialises every DB write that touches up/down/all_time/last_online (AddTraffic, SetRemoteTraffic, Reset*, UpdateClientTrafficByEmail) so overlapping goroutines can no longer clobber each other's column-scoped Updates with a stale tx.Save. - DB pool: WAL + busy_timeout=10s + synchronous=NORMAL + _txlock= immediate, MaxOpenConns=8 / MaxIdleConns=4. The immediate-tx PRAGMA fixes residual "database is locked [0ms]" cases where deferred-tx writer-upgrade conflicts bypass busy_timeout. - SetRemoteTraffic full-mirrors node-authoritative state into central: settings JSON, remark, listen, port, total, expiry, all_time, enable, plus per-client total/expiry/reset/all_time. Inbounds and client_traffics rows present on node but missing from central are created; rows missing from snap are deleted (with cascading client_traffics removal). - NodeTrafficSyncJob detects structural changes from the mirror and broadcasts invalidate(inbounds) so open central UIs re-fetch via REST on node-side add/del/edit without manual refresh. - XrayTrafficJob broadcasts invalidate(inbounds) when auto-disable flips client_traffics.enable so the per-client toggle reflects depletion without manual refresh. - Frontend: inbounds page now subscribes to the BroadcastInbounds 'inbounds' WS event (full-list pushes from add/del/update controllers were silently dropped). Fixes invalidate payload field (dataType -> type). Restart- panel modal switched from Promise-wrap to onOk-only so Cancel actually cancels. - Node files trimmed of stale prose-comments; cron cadence dropped 10s -> 5s to match the inbounds page UX. - README badges and Go module path bumped v2 -> v3 to match module rename. Co-Authored-By: Claude Opus 4.7 --- .gitignore | 2 + README.ar_EG.md | 4 +- README.es_ES.md | 4 +- README.fa_IR.md | 4 +- README.md | 4 +- README.ru_RU.md | 4 +- README.zh_CN.md | 4 +- database/db.go | 21 +- frontend/src/composables/useWebSocket.js | 2 +- frontend/src/pages/inbounds/InboundsPage.vue | 2 + frontend/src/pages/inbounds/useInbounds.js | 13 +- frontend/src/pages/settings/SettingsPage.vue | 40 +- web/controller/node.go | 25 - web/job/node_heartbeat_job.go | 35 +- web/job/node_traffic_sync_job.go | 171 ++++--- web/job/xray_traffic_job.go | 1 + web/runtime/local.go | 45 +- web/runtime/manager.go | 39 -- web/runtime/remote.go | 92 +--- web/runtime/runtime.go | 42 -- web/service/inbound.go | 483 ++++++++++--------- web/service/node.go | 46 -- web/service/traffic_writer.go | 87 ++++ web/web.go | 10 +- web/websocket/hub.go | 18 +- 25 files changed, 559 insertions(+), 639 deletions(-) create mode 100644 web/service/traffic_writer.go diff --git a/.gitignore b/.gitignore index 1761ece2..81a1f0c4 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,8 @@ Thumbs.db # Ignore Go build files *.exe x-ui.db +x-ui.db-shm +x-ui.db-wal # Ignore Docker specific files docker-compose.override.yml diff --git a/README.ar_EG.md b/README.ar_EG.md index eb9c634b..0dd359f8 100644 --- a/README.ar_EG.md +++ b/README.ar_EG.md @@ -12,8 +12,8 @@ [![GO Version](https://img.shields.io/github/go-mod/go-version/mhsanaei/3x-ui.svg)](#) [![Downloads](https://img.shields.io/github/downloads/mhsanaei/3x-ui/total.svg)](https://github.com/MHSanaei/3x-ui/releases/latest) [![License](https://img.shields.io/badge/license-GPL%20V3-blue.svg?longCache=true)](https://www.gnu.org/licenses/gpl-3.0.en.html) -[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v2.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v2) -[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v2)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v2) +[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v3.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v3) +[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v3)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v3) **3X-UI** — لوحة تحكم متقدمة مفتوحة المصدر تعتمد على الويب مصممة لإدارة خادم Xray-core. توفر واجهة سهلة الاستخدام لتكوين ومراقبة بروتوكولات VPN والوكيل المختلفة. diff --git a/README.es_ES.md b/README.es_ES.md index caddb406..50758f08 100644 --- a/README.es_ES.md +++ b/README.es_ES.md @@ -12,8 +12,8 @@ [![GO Version](https://img.shields.io/github/go-mod/go-version/mhsanaei/3x-ui.svg)](#) [![Downloads](https://img.shields.io/github/downloads/mhsanaei/3x-ui/total.svg)](https://github.com/MHSanaei/3x-ui/releases/latest) [![License](https://img.shields.io/badge/license-GPL%20V3-blue.svg?longCache=true)](https://www.gnu.org/licenses/gpl-3.0.en.html) -[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v2.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v2) -[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v2)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v2) +[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v3.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v3) +[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v3)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v3) **3X-UI** — panel de control avanzado basado en web de código abierto diseñado para gestionar el servidor Xray-core. Ofrece una interfaz fácil de usar para configurar y monitorear varios protocolos VPN y proxy. diff --git a/README.fa_IR.md b/README.fa_IR.md index 67584828..e08fd8c2 100644 --- a/README.fa_IR.md +++ b/README.fa_IR.md @@ -12,8 +12,8 @@ [![GO Version](https://img.shields.io/github/go-mod/go-version/mhsanaei/3x-ui.svg)](#) [![Downloads](https://img.shields.io/github/downloads/mhsanaei/3x-ui/total.svg)](https://github.com/MHSanaei/3x-ui/releases/latest) [![License](https://img.shields.io/badge/license-GPL%20V3-blue.svg?longCache=true)](https://www.gnu.org/licenses/gpl-3.0.en.html) -[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v2.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v2) -[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v2)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v2) +[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v3.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v3) +[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v3)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v3) **3X-UI** — یک پنل کنترل پیشرفته مبتنی بر وب با کد باز که برای مدیریت سرور Xray-core طراحی شده است. این پنل یک رابط کاربری آسان برای پیکربندی و نظارت بر پروتکل‌های مختلف VPN و پراکسی ارائه می‌دهد. diff --git a/README.md b/README.md index 400db1ad..4009acdb 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ [![GO Version](https://img.shields.io/github/go-mod/go-version/mhsanaei/3x-ui.svg)](#) [![Downloads](https://img.shields.io/github/downloads/mhsanaei/3x-ui/total.svg)](https://github.com/MHSanaei/3x-ui/releases/latest) [![License](https://img.shields.io/badge/license-GPL%20V3-blue.svg?longCache=true)](https://www.gnu.org/licenses/gpl-3.0.en.html) -[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v2.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v2) -[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v2)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v2) +[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v3.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v3) +[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v3)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v3) **3X-UI** — advanced, open-source web-based control panel designed for managing Xray-core server. It offers a user-friendly interface for configuring and monitoring various VPN and proxy protocols. diff --git a/README.ru_RU.md b/README.ru_RU.md index efc4bf86..8ec606a7 100644 --- a/README.ru_RU.md +++ b/README.ru_RU.md @@ -12,8 +12,8 @@ [![GO Version](https://img.shields.io/github/go-mod/go-version/mhsanaei/3x-ui.svg)](#) [![Downloads](https://img.shields.io/github/downloads/mhsanaei/3x-ui/total.svg)](https://github.com/MHSanaei/3x-ui/releases/latest) [![License](https://img.shields.io/badge/license-GPL%20V3-blue.svg?longCache=true)](https://www.gnu.org/licenses/gpl-3.0.en.html) -[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v2.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v2) -[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v2)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v2) +[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v3.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v3) +[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v3)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v3) **3X-UI** — продвинутая панель управления с открытым исходным кодом на основе веб-интерфейса, разработанная для управления сервером Xray-core. Предоставляет удобный интерфейс для настройки и мониторинга различных VPN и прокси-протоколов. diff --git a/README.zh_CN.md b/README.zh_CN.md index 13d5075d..12c7f9cb 100644 --- a/README.zh_CN.md +++ b/README.zh_CN.md @@ -12,8 +12,8 @@ [![GO Version](https://img.shields.io/github/go-mod/go-version/mhsanaei/3x-ui.svg)](#) [![Downloads](https://img.shields.io/github/downloads/mhsanaei/3x-ui/total.svg)](https://github.com/MHSanaei/3x-ui/releases/latest) [![License](https://img.shields.io/badge/license-GPL%20V3-blue.svg?longCache=true)](https://www.gnu.org/licenses/gpl-3.0.en.html) -[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v2.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v2) -[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v2)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v2) +[![Go Reference](https://pkg.go.dev/badge/github.com/mhsanaei/3x-ui/v3.svg)](https://pkg.go.dev/github.com/mhsanaei/3x-ui/v3) +[![Go Report Card](https://goreportcard.com/badge/github.com/mhsanaei/3x-ui/v3)](https://goreportcard.com/report/github.com/mhsanaei/3x-ui/v3) **3X-UI** — 一个基于网页的高级开源控制面板,专为管理 Xray-core 服务器而设计。它提供了用户友好的界面,用于配置和监控各种 VPN 和代理协议。 diff --git a/database/db.go b/database/db.go index 8c079755..64d3765d 100644 --- a/database/db.go +++ b/database/db.go @@ -10,6 +10,7 @@ import ( "os" "path" "slices" + "time" "github.com/mhsanaei/3x-ui/v3/config" "github.com/mhsanaei/3x-ui/v3/database/model" @@ -148,11 +149,29 @@ func InitDB(dbPath string) error { c := &gorm.Config{ Logger: gormLogger, } - db, err = gorm.Open(sqlite.Open(dbPath), c) + dsn := dbPath + "?_journal_mode=WAL&_busy_timeout=10000&_synchronous=NORMAL&_txlock=immediate" + db, err = gorm.Open(sqlite.Open(dsn), c) if err != nil { return err } + sqlDB, err := db.DB() + if err != nil { + return err + } + if _, err := sqlDB.Exec("PRAGMA journal_mode=WAL"); err != nil { + return err + } + if _, err := sqlDB.Exec("PRAGMA busy_timeout=10000"); err != nil { + return err + } + if _, err := sqlDB.Exec("PRAGMA synchronous=NORMAL"); err != nil { + return err + } + sqlDB.SetMaxOpenConns(8) + sqlDB.SetMaxIdleConns(4) + sqlDB.SetConnMaxLifetime(time.Hour) + if err := initModels(); err != nil { return err } diff --git a/frontend/src/composables/useWebSocket.js b/frontend/src/composables/useWebSocket.js index a4810d21..04107afb 100644 --- a/frontend/src/composables/useWebSocket.js +++ b/frontend/src/composables/useWebSocket.js @@ -24,7 +24,7 @@ function getSharedClient() { // useWebSocket({ // traffic: (payload) => applyTrafficEvent(payload), // client_stats: (payload) => applyClientStatsEvent(payload), -// invalidate: ({ dataType }) => { if (dataType === 'inbounds') refresh(); }, +// invalidate: ({ type }) => { if (type === 'inbounds') refresh(); }, // }); // // Built-in lifecycle events ('connected' / 'disconnected' / 'error') diff --git a/frontend/src/pages/inbounds/InboundsPage.vue b/frontend/src/pages/inbounds/InboundsPage.vue index 83996aba..5e13b86b 100644 --- a/frontend/src/pages/inbounds/InboundsPage.vue +++ b/frontend/src/pages/inbounds/InboundsPage.vue @@ -50,6 +50,7 @@ const { applyTrafficEvent, applyClientStatsEvent, applyInvalidate, + applyInboundsEvent, } = useInbounds(); // Live updates over WebSocket — replaces the old 5s polling loop. @@ -60,6 +61,7 @@ useWebSocket({ traffic: applyTrafficEvent, client_stats: applyClientStatsEvent, invalidate: applyInvalidate, + inbounds: applyInboundsEvent, }); const { isMobile } = useMediaQuery(); // Node list lives on the central panel; the Inbounds page consumes diff --git a/frontend/src/pages/inbounds/useInbounds.js b/frontend/src/pages/inbounds/useInbounds.js index ca252832..c89e1ad6 100644 --- a/frontend/src/pages/inbounds/useInbounds.js +++ b/frontend/src/pages/inbounds/useInbounds.js @@ -191,6 +191,8 @@ export function useInbounds() { if (typeof upd.up === 'number') ib.up = upd.up; if (typeof upd.down === 'number') ib.down = upd.down; if (typeof upd.allTime === 'number') ib.allTime = upd.allTime; + if (typeof upd.total === 'number') ib.total = upd.total; + if (typeof upd.enable === 'boolean') ib.enable = upd.enable; touched = true; } } @@ -209,14 +211,15 @@ export function useInbounds() { if (typeof upd.up === 'number') stat.up = upd.up; if (typeof upd.down === 'number') stat.down = upd.down; if (typeof upd.total === 'number') stat.total = upd.total; + if (typeof upd.allTime === 'number') stat.allTime = upd.allTime; if (typeof upd.expiryTime === 'number') stat.expiryTime = upd.expiryTime; + if (typeof upd.enable === 'boolean') stat.enable = upd.enable; touched = true; } } } if (touched) { - // shallowRef → trigger reactivity by reassigning the same array. dbInbounds.value = [...dbInbounds.value]; rebuildClientCount(); } @@ -228,11 +231,16 @@ export function useInbounds() { // re-fetch via REST". function applyInvalidate(payload) { if (!payload || typeof payload !== 'object') return; - if (payload.dataType === 'inbounds') { + if (payload.type === 'inbounds') { refresh(); } } + function applyInboundsEvent(payload) { + if (!Array.isArray(payload)) return; + setInbounds(payload); + } + // Recompute the per-inbound roll-up after any in-place mutation. // Cheap because rollupClients only iterates a single inbound's // clients + clientStats arrays. @@ -319,5 +327,6 @@ export function useInbounds() { applyTrafficEvent, applyClientStatsEvent, applyInvalidate, + applyInboundsEvent, }; } diff --git a/frontend/src/pages/settings/SettingsPage.vue b/frontend/src/pages/settings/SettingsPage.vue index 198fd8ad..9c0a7e3a 100644 --- a/frontend/src/pages/settings/SettingsPage.vue +++ b/frontend/src/pages/settings/SettingsPage.vue @@ -96,27 +96,25 @@ function rebuildUrlAfterRestart() { return url.toString(); } -async function restartPanel() { - await new Promise((resolve, reject) => { - Modal.confirm({ - title: 'Restart panel', - content: 'Restart the panel now? Your session will reconnect once it comes back.', - okText: 'Restart', - cancelText: 'Cancel', - onOk: () => resolve(), - onCancel: () => reject(new Error('cancelled')), - }); - }).catch(() => null); - - spinning.value = true; - try { - const msg = await HttpUtil.post('/panel/setting/restartPanel'); - if (!msg?.success) return; - await PromiseUtil.sleep(5000); - window.location.replace(rebuildUrlAfterRestart()); - } finally { - spinning.value = false; - } +function restartPanel() { + Modal.confirm({ + title: t('pages.settings.restartPanel'), + content: t('pages.settings.restartPanelDesc'), + okText: t('pages.settings.restartPanel'), + okButtonProps: { danger: true }, + cancelText: t('cancel'), + async onOk() { + spinning.value = true; + try { + const msg = await HttpUtil.post('/panel/setting/restartPanel'); + if (!msg?.success) return; + await PromiseUtil.sleep(5000); + window.location.replace(rebuildUrlAfterRestart()); + } finally { + spinning.value = false; + } + }, + }); } // Conf alerts mirror the legacy banner — pure derivation off allSetting. diff --git a/web/controller/node.go b/web/controller/node.go index 675bb34e..ab0127d2 100644 --- a/web/controller/node.go +++ b/web/controller/node.go @@ -13,15 +13,10 @@ import ( "github.com/gin-gonic/gin" ) -// NodeController exposes CRUD + probe endpoints for managing remote -// 3x-ui panels registered as nodes. All routes mount under -// /panel/api/nodes/ via APIController.initRouter and inherit its -// session-or-bearer auth from checkAPIAuth. type NodeController struct { nodeService service.NodeService } -// NewNodeController creates the controller and wires its routes onto g. func NewNodeController(g *gin.RouterGroup) *NodeController { a := &NodeController{} a.initRouter(g) @@ -37,14 +32,8 @@ func (a *NodeController) initRouter(g *gin.RouterGroup) { g.POST("/del/:id", a.del) g.POST("/setEnable/:id", a.setEnable) - // /test takes a transient payload (no DB write) so the user can - // validate connectivity before saving the node. g.POST("/test", a.test) - // /probe/:id triggers a synchronous probe of an already-saved node - // without waiting for the next 10s heartbeat tick. g.POST("/probe/:id", a.probe) - // /history/:id/:metric/:bucket returns up to 60 averaged buckets of - // the per-node CPU or Mem time series collected by the heartbeat job. g.GET("/history/:id/:metric/:bucket", a.history) } @@ -115,8 +104,6 @@ func (a *NodeController) del(c *gin.Context) { jsonMsg(c, I18nWeb(c, "pages.nodes.toasts.delete"), nil) } -// setEnable accepts a JSON body { "enable": bool } so the toggle -// switch can flip a node without sending the whole record back. func (a *NodeController) setEnable(c *gin.Context) { id, err := strconv.Atoi(c.Param("id")) if err != nil { @@ -137,18 +124,12 @@ func (a *NodeController) setEnable(c *gin.Context) { jsonMsg(c, I18nWeb(c, "pages.nodes.toasts.update"), nil) } -// test runs Probe against a transient Node payload without writing to -// the DB. Used by the form modal to validate connectivity before save. func (a *NodeController) test(c *gin.Context) { n := &model.Node{} if err := c.ShouldBind(n); err != nil { jsonMsg(c, I18nWeb(c, "pages.nodes.toasts.test"), err) return } - // Reuse normalize-style defaults so the form can leave scheme/basePath - // blank and still get a sensible probe URL. We do this by round-tripping - // through Create's validator without the DB write — a tiny duplication - // here vs. exposing normalize publicly. if n.Scheme == "" { n.Scheme = "https" } @@ -162,9 +143,6 @@ func (a *NodeController) test(c *gin.Context) { jsonObj(c, patch.ToUI(err == nil), nil) } -// probe triggers a one-off probe against a saved node and persists -// the result so the dashboard updates immediately, without waiting -// for the next heartbeat tick. func (a *NodeController) probe(c *gin.Context) { id, err := strconv.Atoi(c.Param("id")) if err != nil { @@ -188,9 +166,6 @@ func (a *NodeController) probe(c *gin.Context) { jsonObj(c, patch.ToUI(probeErr == nil), nil) } -// history returns averaged buckets of the per-node CPU/Mem time-series. -// Mirrors the system-level /panel/api/server/history/:metric/:bucket -// endpoint so the frontend can reuse the same fetch logic. func (a *NodeController) history(c *gin.Context) { id, err := strconv.Atoi(c.Param("id")) if err != nil { diff --git a/web/job/node_heartbeat_job.go b/web/job/node_heartbeat_job.go index 02e7532e..f1d7a113 100644 --- a/web/job/node_heartbeat_job.go +++ b/web/job/node_heartbeat_job.go @@ -11,37 +11,22 @@ import ( "github.com/mhsanaei/3x-ui/v3/web/websocket" ) -// nodeHeartbeatConcurrency caps how many remote panels we probe at once. -// Plenty of headroom for typical deployments (tens of nodes) without -// letting a misconfigured run open thousands of sockets at once. -const nodeHeartbeatConcurrency = 32 +const ( + nodeHeartbeatConcurrency = 32 + nodeHeartbeatRequestTimeout = 4 * time.Second +) -// nodeHeartbeatRequestTimeout bounds a single probe. The cron is @every 10s, -// so this needs to stay well under that to avoid run pile-up. -const nodeHeartbeatRequestTimeout = 6 * time.Second - -// NodeHeartbeatJob probes every enabled remote node once per cron tick -// and persists the result. Disabled nodes are skipped entirely so a -// long-broken node can be parked without burning sockets every 10s. type NodeHeartbeatJob struct { nodeService service.NodeService - - // Coarse mutex prevents two ticks running concurrently if probes - // pile up under network failure. The next tick simply skips when - // the previous one is still draining. - running sync.Mutex + running sync.Mutex } -// NewNodeHeartbeatJob constructs a heartbeat job. The robfig/cron -// scheduler will hand the same instance to every tick, so the -// running mutex carries across runs as intended. func NewNodeHeartbeatJob() *NodeHeartbeatJob { return &NodeHeartbeatJob{} } func (j *NodeHeartbeatJob) Run() { if !j.running.TryLock() { - // Previous tick still in flight — skip this one. return } defer j.running.Unlock() @@ -71,10 +56,6 @@ func (j *NodeHeartbeatJob) Run() { } wg.Wait() - // Push the fresh list to any open Nodes page over WebSocket so the - // status / latency / cpu / mem cells update without the user clicking - // refresh. Skip the DB read entirely when no browser is connected — - // matches the gating pattern in xray_traffic_job. if !websocket.HasClients() { return } @@ -86,10 +67,6 @@ func (j *NodeHeartbeatJob) Run() { websocket.BroadcastNodes(updated) } -// probeOne runs a single probe and persists the result. We deliberately -// don't return errors — partial failures across the node set should not -// abort other probes, and the LastError column carries the message for -// the UI to surface. func (j *NodeHeartbeatJob) probeOne(n *model.Node) { ctx, cancel := context.WithTimeout(context.Background(), nodeHeartbeatRequestTimeout) defer cancel() @@ -100,8 +77,6 @@ func (j *NodeHeartbeatJob) probeOne(n *model.Node) { patch.Status = "online" } if updErr := j.nodeService.UpdateHeartbeat(n.Id, patch); updErr != nil { - // A row deleted mid-tick produces "rows affected = 0", which - // gorm reports as nil — so any error we get here is real. logger.Warning("node heartbeat: update node", n.Id, "failed:", updErr) } } diff --git a/web/job/node_traffic_sync_job.go b/web/job/node_traffic_sync_job.go index ff4c225e..99fe2183 100644 --- a/web/job/node_traffic_sync_job.go +++ b/web/job/node_traffic_sync_job.go @@ -12,41 +12,67 @@ import ( "github.com/mhsanaei/3x-ui/v3/web/websocket" ) -// nodeTrafficSyncConcurrency caps how many nodes we sync simultaneously. -// Each sync does three HTTP calls in series, so the wall-clock budget -// per node is the request timeout below — keeping the cap modest avoids -// flooding the network while still getting through dozens of nodes -// inside a 10s tick. -const nodeTrafficSyncConcurrency = 8 +const ( + nodeTrafficSyncConcurrency = 8 + nodeTrafficSyncRequestTimeout = 4 * time.Second +) -// nodeTrafficSyncRequestTimeout bounds the per-node sync. Three probes -// in series at 8s each would blow past the cron interval, so the budget -// here covers the whole snapshot — FetchTrafficSnapshot internally caps -// each HTTP call at the runtime's own 10s ceiling but uses ctx for the -// outer total. -const nodeTrafficSyncRequestTimeout = 8 * time.Second - -// NodeTrafficSyncJob pulls absolute traffic + online stats from every -// enabled, currently-online remote node and merges them into the central -// DB. Mirrors NodeHeartbeatJob's structure: TryLock to skip pile-ups, -// errgroup-style fan-out with a concurrency cap, per-node ctx timeout. -// -// Offline nodes are skipped entirely — the heartbeat job already owns -// status tracking, and we'd just waste sockets retrying a node we know -// is unreachable. As soon as heartbeat marks a node online again, the -// next traffic tick picks it up. type NodeTrafficSyncJob struct { nodeService service.NodeService inboundService service.InboundService - - // Coarse mutex prevents two ticks running concurrently if a single - // sync stalls past the 10s cron interval (rare but possible when - // many nodes are slow simultaneously). - running sync.Mutex + running sync.Mutex + structural atomicBool +} + +type atomicBool struct { + mu sync.Mutex + v bool +} + +func (a *atomicBool) set() { + a.mu.Lock() + a.v = true + a.mu.Unlock() +} + +func (a *atomicBool) takeAndReset() bool { + a.mu.Lock() + v := a.v + a.v = false + a.mu.Unlock() + return v +} + +type emailSet struct { + mu sync.Mutex + m map[string]struct{} +} + +func newEmailSet() *emailSet { return &emailSet{m: make(map[string]struct{})} } + +func (s *emailSet) addAll(emails []string) { + if len(emails) == 0 { + return + } + s.mu.Lock() + for _, e := range emails { + if e != "" { + s.m[e] = struct{}{} + } + } + s.mu.Unlock() +} + +func (s *emailSet) slice() []string { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]string, 0, len(s.m)) + for e := range s.m { + out = append(out, e) + } + return out } -// NewNodeTrafficSyncJob builds a singleton sync job. Cron hands the same -// instance to every tick so the running mutex is preserved across runs. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob { return &NodeTrafficSyncJob{} } @@ -59,8 +85,6 @@ func (j *NodeTrafficSyncJob) Run() { mgr := runtime.GetManager() if mgr == nil { - // Server still booting — pre-Manager runs are normal during - // the first few seconds of startup. return } @@ -73,6 +97,7 @@ func (j *NodeTrafficSyncJob) Run() { return } + touched := newEmailSet() sem := make(chan struct{}, nodeTrafficSyncConcurrency) var wg sync.WaitGroup for _, n := range nodes { @@ -84,37 +109,54 @@ func (j *NodeTrafficSyncJob) Run() { go func(n *model.Node) { defer wg.Done() defer func() { <-sem }() - j.syncOne(mgr, n) + j.syncOne(mgr, n, touched) }(n) } wg.Wait() - // One broadcast per tick, batched across all nodes — frontend code - // is invariant to whether the rows came from local xray or a node, - // so we reuse the same WebSocket envelope XrayTrafficJob uses. - if websocket.HasClients() { - online := j.inboundService.GetOnlineClients() - if online == nil { - online = []string{} + if !websocket.HasClients() { + return + } + + online := j.inboundService.GetOnlineClients() + if online == nil { + online = []string{} + } + lastOnline, err := j.inboundService.GetClientsLastOnline() + if err != nil { + logger.Warning("node traffic sync: get last-online failed:", err) + } + if lastOnline == nil { + lastOnline = map[string]int64{} + } + websocket.BroadcastTraffic(map[string]any{ + "onlineClients": online, + "lastOnlineMap": lastOnline, + }) + + clientStats := map[string]any{} + if emails := touched.slice(); len(emails) > 0 { + if stats, err := j.inboundService.GetActiveClientTraffics(emails); err != nil { + logger.Warning("node traffic sync: get client traffics for websocket failed:", err) + } else if len(stats) > 0 { + clientStats["clients"] = stats } - lastOnline, err := j.inboundService.GetClientsLastOnline() - if err != nil { - logger.Warning("node traffic sync: get last-online failed:", err) - } - if lastOnline == nil { - lastOnline = map[string]int64{} - } - websocket.BroadcastTraffic(map[string]any{ - "onlineClients": online, - "lastOnlineMap": lastOnline, - }) + } + if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil { + logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err) + } else if len(summary) > 0 { + clientStats["inbounds"] = summary + } + if len(clientStats) > 0 { + websocket.BroadcastClientStats(clientStats) + } + + if j.structural.takeAndReset() { + websocket.BroadcastInvalidate(websocket.MessageTypeInbounds) } } -// syncOne fetches and merges one node's snapshot. Errors are logged -// per-node and don't propagate; one slow node shouldn't keep the rest -// from running. -func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) { +func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, touched *emailSet) { ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout) defer cancel() @@ -126,12 +168,27 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) { snap, err := rt.FetchTrafficSnapshot(ctx) if err != nil { logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err) - // Drop node-online contribution so a hiccup doesn't leave the - // online filter showing stale clients indefinitely. j.inboundService.ClearNodeOnlineClients(n.Id) return } - if err := j.inboundService.SetRemoteTraffic(n.Id, snap); err != nil { + changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap) + if err != nil { logger.Warning("node traffic sync: merge for", n.Name, "failed:", err) + return + } + if changed { + j.structural.set() + } + for _, ib := range snap.Inbounds { + if ib == nil { + continue + } + emails := make([]string, 0, len(ib.ClientStats)) + for _, cs := range ib.ClientStats { + if cs.Email != "" { + emails = append(emails, cs.Email) + } + } + touched.addAll(emails) } } diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index f4f0998d..5464a3a0 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -54,6 +54,7 @@ func (j *XrayTrafficJob) Run() { j.xrayService.SetToNeedRestart() } } + websocket.BroadcastInvalidate(websocket.MessageTypeInbounds) } if ExternalTrafficInformEnable, err := j.settingService.GetExternalTrafficInformEnable(); ExternalTrafficInformEnable { j.informTrafficToExternalAPI(traffics, clientTraffics) diff --git a/web/runtime/local.go b/web/runtime/local.go index 45e550c3..b50cb9de 100644 --- a/web/runtime/local.go +++ b/web/runtime/local.go @@ -10,45 +10,22 @@ import ( "github.com/mhsanaei/3x-ui/v3/xray" ) -// LocalDeps wires the runtime to the panel's xray process and the -// service.XrayService restart trigger via callbacks. We use callbacks -// (not an interface to *service.XrayService) because the runtime -// package would otherwise cycle-import service. type LocalDeps struct { - // APIPort returns the xray gRPC API port the local engine is - // currently listening on. Returns 0 when xray isn't running yet — - // callers should treat that as a transient error. - APIPort func() int - // SetNeedRestart trips the panel's "restart xray on next cron tick" - // flag. Mirrors how InboundController.addInbound calls - // xrayService.SetToNeedRestart() today. + APIPort func() int SetNeedRestart func() } -// Local implements Runtime against the panel's own xray process. Each -// call follows the existing inbound.go pattern: open a gRPC client, -// run one operation, close. Per-call init keeps the connection state -// scoped so a stuck call can't leak across operations. type Local struct { deps LocalDeps - - // Serialise gRPC operations — xray's HandlerService isn't documented - // as concurrent-safe and the existing InboundService implicitly - // runs one op at a time per request. This matches that. - mu sync.Mutex + mu sync.Mutex } -// NewLocal builds a Local runtime. deps.APIPort and deps.SetNeedRestart -// are required; callers that want a no-op restart can pass `func(){}`. func NewLocal(deps LocalDeps) *Local { return &Local{deps: deps} } func (l *Local) Name() string { return "local" } -// withAPI runs fn against a freshly-initialised XrayAPI client and -// guarantees Close() afterwards. Returns an error if the gRPC port -// isn't available yet (xray still starting / stopped). func (l *Local) withAPI(fn func(api *xray.XrayAPI) error) error { l.mu.Lock() defer l.mu.Unlock() @@ -82,17 +59,8 @@ func (l *Local) DelInbound(_ context.Context, ib *model.Inbound) error { } func (l *Local) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) error { - // xray-core has no in-place inbound update — drop and re-add. - // Matches what InboundService.UpdateInbound did inline. - if err := l.DelInbound(ctx, oldIb); err != nil { - // Best-effort: continue to AddInbound so a transient remove - // failure (e.g. inbound already gone) doesn't strand us. The - // caller's needRestart fallback will reconcile from config. - _ = err - } + _ = l.DelInbound(ctx, oldIb) if !newIb.Enable { - // Disabled inbounds aren't pushed to xray; we already removed - // the old one above. return nil } return l.AddInbound(ctx, newIb) @@ -117,13 +85,6 @@ func (l *Local) RestartXray(_ context.Context) error { return nil } -// Reset methods are intentional no-ops for Local. The central DB UPDATE -// that runs in InboundService.Reset* before this call has already zeroed -// the counters that xray reads; on the next stats poll the gRPC service -// will pick up matching values. Pre-Phase-1 the panel never issued an -// xrayApi reset call here either — keeping the same shape avoids a -// behaviour change for single-panel users. - func (l *Local) ResetClientTraffic(_ context.Context, _ *model.Inbound, _ string) error { return nil } diff --git a/web/runtime/manager.go b/web/runtime/manager.go index d8739f21..4d12a1cf 100644 --- a/web/runtime/manager.go +++ b/web/runtime/manager.go @@ -8,14 +8,6 @@ import ( "github.com/mhsanaei/3x-ui/v3/database/model" ) -// Manager is the entry point for service code that needs a Runtime. -// One singleton lives in the package-level `manager` var, set at -// server bootstrap (web.go calls SetManager once). InboundService and -// friends read it via GetManager(). -// -// Local runs forever; Remotes are built lazily per nodeID and cached. -// Cache invalidation runs on node Update/Delete (NodeService hooks -// InvalidateNode) so a token rotation surfaces the next call. type Manager struct { local Runtime @@ -23,9 +15,6 @@ type Manager struct { remotes map[int]*Remote } -// NewManager wires the singleton with the deps Local needs. The runtime -// package can't import service so the caller (web.go) supplies the -// callbacks that bridge into XrayService. func NewManager(localDeps LocalDeps) *Manager { return &Manager{ local: NewLocal(localDeps), @@ -33,10 +22,6 @@ func NewManager(localDeps LocalDeps) *Manager { } } -// RuntimeFor picks the right adapter for an inbound based on NodeID. -// Returns local when nodeID is nil; otherwise looks up the node row -// (or returns the cached Remote for it). The caller does not need to -// know which kind they got — that's the point of the abstraction. func (m *Manager) RuntimeFor(nodeID *int) (Runtime, error) { if nodeID == nil { return m.local, nil @@ -48,8 +33,6 @@ func (m *Manager) RuntimeFor(nodeID *int) (Runtime, error) { } m.mu.RUnlock() - // Cache miss — load the node row and build a Remote. We re-check - // under the write lock to avoid duplicate construction under load. m.mu.Lock() defer m.mu.Unlock() if rt, ok := m.remotes[*nodeID]; ok { @@ -67,16 +50,8 @@ func (m *Manager) RuntimeFor(nodeID *int) (Runtime, error) { return rt, nil } -// Local returns the singleton local runtime. Used by code that needs -// to operate on the panel's own xray regardless of which inbound it -// came from (e.g. on-demand restart from the UI). func (m *Manager) Local() Runtime { return m.local } -// RemoteFor returns the Remote adapter for an already-loaded node row. -// Differs from RuntimeFor in two ways: it skips the DB lookup (caller -// hands in the node), and it returns the concrete *Remote so callers -// like NodeTrafficSyncJob can reach FetchTrafficSnapshot, which the -// Runtime interface doesn't expose. func (m *Manager) RemoteFor(node *model.Node) (*Remote, error) { if node == nil { return nil, errors.New("node is nil") @@ -98,18 +73,12 @@ func (m *Manager) RemoteFor(node *model.Node) (*Remote, error) { return rt, nil } -// InvalidateNode drops the cached Remote for nodeID so the next -// RuntimeFor call rebuilds it from the (possibly updated) node row. -// Called from NodeService.Update / Delete. func (m *Manager) InvalidateNode(nodeID int) { m.mu.Lock() defer m.mu.Unlock() delete(m.remotes, nodeID) } -// loadNode reads a node row directly from the DB. Kept package-local -// to avoid pulling NodeService into the runtime — service depends on -// runtime, not the other way around. func loadNode(id int) (*model.Node, error) { db := database.GetDB() n := &model.Node{} @@ -119,25 +88,17 @@ func loadNode(id int) (*model.Node, error) { return n, nil } -// Singleton wiring ------------------------------------------------------- - var ( managerMu sync.RWMutex manager *Manager ) -// SetManager installs the process-wide Manager. web.go calls this once -// during NewServer. Tests can call it again with a stub. func SetManager(m *Manager) { managerMu.Lock() defer managerMu.Unlock() manager = m } -// GetManager returns the installed Manager, or nil before SetManager -// has run. Callers should treat nil as "still booting" — the existing -// behaviour for code paths that only run on the local engine continues -// to work via a pre-wired fallback set up in init() below. func GetManager() *Manager { managerMu.RLock() defer managerMu.RUnlock() diff --git a/web/runtime/remote.go b/web/runtime/remote.go index 57ce17be..fbd314d7 100644 --- a/web/runtime/remote.go +++ b/web/runtime/remote.go @@ -18,13 +18,8 @@ import ( "github.com/mhsanaei/3x-ui/v3/logger" ) -// remoteHTTPTimeout bounds a single remote API call. Generous enough for -// a slow node under load, short enough that a wedged remote doesn't -// block the central panel's UI thread for the user. const remoteHTTPTimeout = 10 * time.Second -// remoteHTTPClient is shared so repeated calls to the same node reuse -// connections. Per-request timeouts are set via context. var remoteHTTPClient = &http.Client{ Transport: &http.Transport{ MaxIdleConns: 64, @@ -33,22 +28,12 @@ var remoteHTTPClient = &http.Client{ }, } -// envelope mirrors entity.Msg without depending on the entity package -// (avoids a cycle on the controller side that pulls in this runtime). type envelope struct { Success bool `json:"success"` Msg string `json:"msg"` Obj json.RawMessage `json:"obj"` } -// Remote implements Runtime by calling the existing /panel/api/inbounds/* -// endpoints on a remote 3x-ui panel. The remote is authenticated as -// the central panel via its per-node Bearer token. -// -// remoteIDByTag caches the {tag → remote inbound id} mapping so the -// hot path (update/delete/addClient) avoids /list lookups. The cache -// is in-memory and rebuilt lazily on first miss after a process restart -// or InvalidateNode call. type Remote struct { node *model.Node @@ -56,10 +41,6 @@ type Remote struct { remoteIDByTag map[string]int } -// NewRemote constructs a Remote runtime for one node. The node pointer -// is cached; callers that mutate node config (via NodeService.Update) -// must drop the runtime through Manager.InvalidateNode so a fresh one -// picks up the new fields. func NewRemote(n *model.Node) *Remote { return &Remote{ node: n, @@ -69,8 +50,6 @@ func NewRemote(n *model.Node) *Remote { func (r *Remote) Name() string { return "node:" + r.node.Name } -// baseURL composes the panel root for r.node, e.g. https://1.2.3.4:2053/ -// Always ends in '/' so callers can append "panel/api/...". func (r *Remote) baseURL() string { bp := r.node.BasePath if bp == "" { @@ -82,13 +61,6 @@ func (r *Remote) baseURL() string { return fmt.Sprintf("%s://%s:%d%s", r.node.Scheme, r.node.Address, r.node.Port, bp) } -// do issues an HTTP request against the remote panel and decodes the -// entity.Msg envelope. Returns an error for transport failures, non-2xx -// responses, or {success:false} bodies. -// -// body may be nil. For application/x-www-form-urlencoded calls (the -// existing controllers bind via c.ShouldBind which prefers form-encoded) -// pass url.Values; for JSON pass any other type and we'll marshal it. func (r *Remote) do(ctx context.Context, method, path string, body any) (*envelope, error) { if r.node.ApiToken == "" { return nil, errors.New("node has no API token configured") @@ -102,7 +74,6 @@ func (r *Remote) do(ctx context.Context, method, path string, body any) (*envelo ) switch b := body.(type) { case nil: - // nothing case url.Values: reqBody = strings.NewReader(b.Encode()) contentType = "application/x-www-form-urlencoded" @@ -151,10 +122,6 @@ func (r *Remote) do(ctx context.Context, method, path string, body any) (*envelo return &env, nil } -// resolveRemoteID returns the remote panel's local inbound ID for the -// given tag. Cache-backed; on miss it hits /panel/api/inbounds/list and -// repopulates the whole map (one-shot list is cheaper than per-tag -// lookups when several inbounds need resolving in sequence). func (r *Remote) resolveRemoteID(ctx context.Context, tag string) (int, error) { if id, ok := r.cacheGet(tag); ok { return id, nil @@ -187,9 +154,6 @@ func (r *Remote) cacheDel(tag string) { delete(r.remoteIDByTag, tag) } -// refreshRemoteIDs replaces the in-memory tag→id map with whatever the -// node currently has. Called on cache miss; also a useful recovery path -// when the remote panel is rebuilt or we get a "not found" on update. func (r *Remote) refreshRemoteIDs(ctx context.Context) error { env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil) if err != nil { @@ -216,17 +180,11 @@ func (r *Remote) refreshRemoteIDs(ctx context.Context) error { } func (r *Remote) AddInbound(ctx context.Context, ib *model.Inbound) error { - // Strip NodeID from the wire payload so the remote stores a "local" - // row from its own perspective. We also ship the full model.Inbound - // minus runtime metadata. Tag is preserved so central + remote agree - // on the identifier — relies on InboundController being patched to - // not overwrite a non-empty Tag. payload := wireInbound(ib) env, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/add", payload) if err != nil { return err } - // Response body contains the saved inbound (with the remote's Id). var created struct { Id int `json:"id"` Tag string `json:"tag"` @@ -242,9 +200,7 @@ func (r *Remote) AddInbound(ctx context.Context, ib *model.Inbound) error { func (r *Remote) DelInbound(ctx context.Context, ib *model.Inbound) error { id, err := r.resolveRemoteID(ctx, ib.Tag) if err != nil { - // Already gone on remote — treat as success so a sync after a - // remote panel reset doesn't strand the central panel. - logger.Warning("remote DelInbound: tag", ib.Tag, "not found on", r.node.Name, "— treating as success") + logger.Warning("remote DelInbound: tag", ib.Tag, "not found on", r.node.Name) return nil } if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/del/"+strconv.Itoa(id), nil); err != nil { @@ -255,21 +211,14 @@ func (r *Remote) DelInbound(ctx context.Context, ib *model.Inbound) error { } func (r *Remote) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) error { - // The remote's old row is keyed by oldIb.Tag (tags can change on - // edit if listen/port changed). We update by remote-id so the row - // keeps its identity even when its tag flips. id, err := r.resolveRemoteID(ctx, oldIb.Tag) if err != nil { - // Remote lost the row — fall back to add. This can happen if - // the node panel was reset; we'd rather end up with the inbound - // existing than fail the user's update. return r.AddInbound(ctx, newIb) } payload := wireInbound(newIb) if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/update/"+strconv.Itoa(id), payload); err != nil { return err } - // Tag may have changed — remap the cache. if oldIb.Tag != newIb.Tag { r.cacheDel(oldIb.Tag) } @@ -277,18 +226,6 @@ func (r *Remote) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) return nil } -// AddUser pushes a single client into the remote inbound's settings JSON. -// We can't reuse the central panel's xrayApi.AddUser shape directly -// because the remote's HTTP endpoint expects {id, settings} where -// settings is a JSON string with a "clients":[...] array. The central -// panel's InboundService has already updated its own settings JSON -// before calling us, so we just ship the new full settings to the -// remote via /update — simpler than reconstructing the partial AddUser -// payload remote-side. -// -// Caller passes the full updated *model.Inbound on the same code path -// AddUser is called from in InboundService. To avoid changing the -// Runtime interface for that, AddUser/RemoveUser delegate to UpdateInbound. func (r *Remote) AddUser(ctx context.Context, ib *model.Inbound, _ map[string]any) error { return r.UpdateInbound(ctx, ib, ib) } @@ -305,8 +242,7 @@ func (r *Remote) RestartXray(ctx context.Context) error { func (r *Remote) ResetClientTraffic(ctx context.Context, ib *model.Inbound, email string) error { id, err := r.resolveRemoteID(ctx, ib.Tag) if err != nil { - // Already gone on remote — central reset is enough. - logger.Warning("remote ResetClientTraffic: tag", ib.Tag, "not found on", r.node.Name, "— treating as success") + logger.Warning("remote ResetClientTraffic: tag", ib.Tag, "not found on", r.node.Name) return nil } _, err = r.do(ctx, http.MethodPost, @@ -318,7 +254,7 @@ func (r *Remote) ResetClientTraffic(ctx context.Context, ib *model.Inbound, emai func (r *Remote) ResetInboundClientTraffics(ctx context.Context, ib *model.Inbound) error { id, err := r.resolveRemoteID(ctx, ib.Tag) if err != nil { - logger.Warning("remote ResetInboundClientTraffics: tag", ib.Tag, "not found on", r.node.Name, "— treating as success") + logger.Warning("remote ResetInboundClientTraffics: tag", ib.Tag, "not found on", r.node.Name) return nil } _, err = r.do(ctx, http.MethodPost, @@ -331,22 +267,12 @@ func (r *Remote) ResetAllTraffics(ctx context.Context) error { return err } -// TrafficSnapshot is what NodeTrafficSyncJob pulls from a remote node -// every cron tick. Inbounds carry absolute up/down/all_time + ClientStats -// (the same shape /panel/api/inbounds/list returns); the two map fields -// come from the dedicated /onlines and /lastOnline endpoints. type TrafficSnapshot struct { Inbounds []*model.Inbound OnlineEmails []string LastOnlineMap map[string]int64 } -// FetchTrafficSnapshot pulls the three pieces in series. Sequential is -// fine because the cron job already fans out across nodes — adding -// per-node parallelism on top would just thrash the remote. -// -// Not on the Runtime interface: only the sync job needs it, and Local -// has no equivalent (XrayTrafficJob already covers the local engine). func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, error) { snap := &TrafficSnapshot{LastOnlineMap: map[string]int64{}} @@ -360,9 +286,6 @@ func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, er envOnlines, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/onlines", nil) if err != nil { - // Onlines/lastOnline are nice-to-have. A failure here shouldn't - // invalidate the inbound counter merge — log and continue with - // empty values, the next tick may succeed. logger.Warning("remote", r.node.Name, "onlines fetch failed:", err) } else if len(envOnlines.Obj) > 0 { _ = json.Unmarshal(envOnlines.Obj, &snap.OnlineEmails) @@ -378,17 +301,8 @@ func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, er return snap, nil } -// wireInbound builds the request body for /panel/api/inbounds/add and -// /update. Mirrors the form fields the existing InboundController -// expects via c.ShouldBind — we use form-encoded to match exactly. -// -// We deliberately omit Id (remote assigns its own), UserId (remote's -// fallback user takes over), NodeID (the remote sees itself as local), -// and ClientStats (those are joined-table data the remote rebuilds). func wireInbound(ib *model.Inbound) url.Values { v := url.Values{} - v.Set("up", strconv.FormatInt(ib.Up, 10)) - v.Set("down", strconv.FormatInt(ib.Down, 10)) v.Set("total", strconv.FormatInt(ib.Total, 10)) v.Set("remark", ib.Remark) v.Set("enable", strconv.FormatBool(ib.Enable)) diff --git a/web/runtime/runtime.go b/web/runtime/runtime.go index 17d99274..f7f91e83 100644 --- a/web/runtime/runtime.go +++ b/web/runtime/runtime.go @@ -1,12 +1,3 @@ -// Package runtime abstracts the live xray engine that an inbound's -// configuration is shipped to. Two implementations exist: Local talks -// to the panel's own xray via gRPC (the original behaviour); Remote -// talks to another 3x-ui panel's HTTP API as a managed Node. -// -// InboundService picks a Runtime per-inbound based on Inbound.NodeID. -// The point of the abstraction is to keep `if node != nil` checks out -// of the service code as Phase 2/3 features (traffic sync, subscription -// per-node) build on top. package runtime import ( @@ -15,52 +6,19 @@ import ( "github.com/mhsanaei/3x-ui/v3/database/model" ) -// Runtime is the live-engine adapter for one inbound's worth of -// operations. Implementations must be safe for concurrent use — the -// service layer does not synchronise calls. type Runtime interface { - // Name identifies the adapter in logs ("local", "node:"). Name() string - // AddInbound deploys an inbound to the engine. The Tag field on ib - // is treated as the source of truth for identifying the inbound on - // the remote side; Local ignores it. AddInbound(ctx context.Context, ib *model.Inbound) error - - // DelInbound removes the inbound identified by ib.Tag. DelInbound(ctx context.Context, ib *model.Inbound) error - - // UpdateInbound replaces the existing inbound with newIb. oldIb - // carries the previous config so the adapter can compute a minimal - // diff (Local: drop+add by tag; Remote: HTTP update by remote-id). UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) error - // AddUser hot-adds a client to the inbound identified by ib.Tag. - // userMap matches the shape that xray.XrayAPI.AddUser already takes - // — keys: email, id, password, auth, security, flow, cipher. AddUser(ctx context.Context, ib *model.Inbound, userMap map[string]any) error - - // RemoveUser hot-removes the client by email from ib's inbound. RemoveUser(ctx context.Context, ib *model.Inbound, email string) error - // RestartXray asks the engine to fully restart. For Local this just - // flips the SetToNeedRestart flag and lets the cron pick it up; for - // Remote it issues an HTTP POST to /panel/api/server/restartXrayService. RestartXray(ctx context.Context) error - // ResetClientTraffic zeros the up/down counters for one client on the - // engine. Local: no-op — the central DB UPDATE that runs before this - // call is sufficient, and xray's gRPC stats counter resets on the next - // poll. Remote: HTTP POST so the next traffic sync doesn't pull the - // pre-reset absolute back from the node. ResetClientTraffic(ctx context.Context, ib *model.Inbound, email string) error - - // ResetInboundClientTraffics zeros every client of one inbound. Same - // Local/Remote split as ResetClientTraffic. ResetInboundClientTraffics(ctx context.Context, ib *model.Inbound) error - - // ResetAllTraffics zeros every inbound counter on the engine. Used by - // the panel-wide "reset all traffic" action; called once per affected - // node so that nodes with no inbounds for the current panel are skipped. ResetAllTraffics(ctx context.Context) error } diff --git a/web/service/inbound.go b/web/service/inbound.go index 47311980..65c60826 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -23,25 +23,10 @@ import ( "gorm.io/gorm/clause" ) -// InboundService provides business logic for managing Xray inbound configurations. -// It handles CRUD operations for inbounds, client management, traffic monitoring, -// and integration with the Xray API for real-time updates. type InboundService struct { - // xrayApi is retained for backwards compatibility with bulk paths - // that still talk to the local engine directly (e.g. traffic-reset - // jobs that scope to NodeID IS NULL inbounds anyway). New code paths - // route through runtimeFor() instead so they can target remote nodes. xrayApi xray.XrayAPI } -// runtimeFor returns the Runtime adapter for an inbound's destination -// engine. Returns the local runtime when the inbound has no NodeID -// (legacy/local inbounds); otherwise the cached Remote for that node. -// -// nil is returned only when the runtime Manager hasn't been wired yet -// (extremely early bootstrap). Callers treat nil as a transient error -// and either fall back to needRestart=true or surface "panel still -// starting" upstream. func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) { mgr := runtime.GetManager() if mgr == nil { @@ -399,10 +384,6 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo if inbound.Enable { rt, rterr := s.runtimeFor(inbound) if rterr != nil { - // Fail-fast on remote routing errors. Assign to the named - // `err` so the deferred tx handler rolls back the central - // DB row that tx.Save just inserted — otherwise we'd leave - // an orphan that the user sees succeed despite the toast. err = rterr return inbound, false, err } @@ -411,12 +392,9 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo } else { logger.Debug("Unable to add inbound on", rt.Name(), ":", err1) if inbound.NodeID != nil { - // Remote add failed — roll back so central + node stay - // in sync (no row on either side). err = err1 return inbound, false, err } - // Local: keep the existing fall-through-to-restart behaviour. needRestart = true } } @@ -424,25 +402,13 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo return inbound, needRestart, err } -// DelInbound deletes an inbound configuration by ID. -// It removes the inbound from the database and the running Xray instance if active. -// Returns whether Xray needs restart and any error. func (s *InboundService) DelInbound(id int) (bool, error) { db := database.GetDB() needRestart := false - // Load the full inbound (not just the tag) so we know its NodeID and - // can route the runtime call to the right engine. Skip-on-not-found - // preserves the old "no-op when DB row doesn't exist" behaviour. var ib model.Inbound loadErr := db.Model(model.Inbound{}).Where("id = ? and enable = ?", id, true).First(&ib).Error if loadErr == nil { - // Delete is best-effort on the runtime side: the user's intent is - // to get rid of the inbound, so a missing node row, an offline - // node, or a remote-side "already gone" should NEVER block the - // central DB cleanup. Worst case the remote keeps an orphan that - // the user can clean up manually — far less painful than the row - // being stuck on central. rt, rterr := s.runtimeFor(&ib) if rterr != nil { logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr) @@ -531,11 +497,6 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) { } inbound.Enable = enable - // Sync xray runtime via the Runtime adapter. For local inbounds we - // also rebuild the runtime config (drops clients flagged as disabled - // in ClientTraffic) so the live xray sees the same filtered view it - // did pre-refactor. Remote runtimes ship the unfiltered inbound — - // the remote panel does its own filtering before pushing to its xray. needRestart := false rt, rterr := s.runtimeFor(inbound) if rterr != nil { @@ -573,9 +534,6 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) { return needRestart, nil } -// UpdateInbound modifies an existing inbound configuration. -// It validates changes, updates the database, and syncs with the running Xray instance. -// Returns the updated inbound, whether Xray needs restart, and any error. func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, bool, error) { exist, err := s.checkPortConflict(inbound, inbound.Id) if err != nil { @@ -667,8 +625,6 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, } } - oldInbound.Up = inbound.Up - oldInbound.Down = inbound.Down oldInbound.Total = inbound.Total oldInbound.Remark = inbound.Remark oldInbound.Enable = inbound.Enable @@ -696,13 +652,9 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, } needRestart = true } else { - // Use a snapshot of the OLD tag so the remote can resolve its - // remote-id even when the new tag has changed (port/listen edit). oldSnapshot := *oldInbound oldSnapshot.Tag = tag if oldInbound.NodeID == nil { - // Local: keep the old del-then-add-filtered behaviour to - // preserve runtime client filtering. if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil { logger.Debug("Old inbound deleted on", rt.Name(), ":", tag) } @@ -719,10 +671,6 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, } } } else { - // Remote: a single UpdateInbound call (the Remote adapter - // resolves remote-id by old tag, then POSTs /update/{id}). - // Assign to the outer `err` on failure so the deferred tx - // handler rolls back the central DB write. if !inbound.Enable { if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil { err = err2 @@ -851,13 +799,15 @@ func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inb return err } } - // Added clients — create their stats rows. for i := range newClients { email := newClients[i].Email if email == "" { continue } if _, existed := oldEmails[email]; existed { + if err := s.UpdateClientStat(tx, email, &newClients[i]); err != nil { + return err + } continue } if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil { @@ -964,9 +914,6 @@ func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) { } needRestart = true } else if oldInbound.NodeID == nil { - // Local: per-client AddUser keeps existing connections alive - // (incremental hot-add). Walk every new client; on any failure - // fall back to needRestart so cron rebuilds from scratch. for _, client := range clients { if len(client.Email) == 0 { needRestart = true @@ -997,11 +944,6 @@ func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) { } } } else { - // Remote: a single UpdateInbound ships the new clients in one - // HTTP round-trip rather than N. Settings are already mutated - // in-memory (oldInbound.Settings) so the remote sees the final - // state. Per-client ClientStat rows still need the central DB - // update so the loop runs that branch first. for _, client := range clients { if len(client.Email) > 0 { s.AddClientStat(tx, data.Id, &client) @@ -1318,8 +1260,6 @@ func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool, needRestart = true } } else { - // Remote: settings already mutated above; one UpdateInbound - // ships the post-deletion state to the node. if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil { return false, err1 } @@ -1530,8 +1470,6 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin } needRestart = true } else if oldInbound.NodeID == nil { - // Local: paired Remove+Add on the live xray, keeping other - // clients online (full-restart fallback on partial failure). if oldClients[clientIndex].Enable { err1 := rt.RemoveUser(context.Background(), oldInbound, oldEmail) if err1 == nil { @@ -1565,7 +1503,6 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin } } } else { - // Remote: settings already mutated; one UpdateInbound suffices. if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil { err = err1 return false, err @@ -1578,43 +1515,69 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin return needRestart, tx.Save(oldInbound).Error } -// resetGracePeriodMs is the window after a reset during which incoming -// traffic snapshots from the node are ignored if they would resurrect -// non-zero counters. Three sync ticks (10s each) is enough headroom for -// the central → node reset HTTP call to land before the next pull. const resetGracePeriodMs int64 = 30000 -// SetRemoteTraffic merges absolute counters from a remote node into the -// central DB. Unlike AddTraffic, which adds deltas pulled from the local -// xray gRPC stats endpoint, this SETs the values — the node already has -// the canonical absolute value and we just mirror it. -// -// Rows in the post-reset grace window are skipped if the snapshot would -// regress them, so a user-initiated reset survives until the propagation -// HTTP call has completed on the node. After the grace window expires -// the snapshot wins regardless (the node is authoritative for the -// inbounds it hosts). -func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) error { +func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) { + var structuralChange bool + err := submitTrafficWrite(func() error { + var inner error + structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap) + return inner + }) + return structuralChange, err +} + +func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) { if snap == nil || nodeID <= 0 { - return nil + return false, nil } db := database.GetDB() now := time.Now().UnixMilli() - // Load central inbounds for this node so we can resolve tag→id and - // honour the per-inbound grace window. One query covers every row - // touched in this tick. var central []model.Inbound if err := db.Model(model.Inbound{}). Where("node_id = ?", nodeID). Find(¢ral).Error; err != nil { - return err + return false, err } tagToCentral := make(map[string]*model.Inbound, len(central)) for i := range central { tagToCentral[central[i].Tag] = ¢ral[i] } + var centralClientStats []xray.ClientTraffic + if len(central) > 0 { + ids := make([]int, 0, len(central)) + for i := range central { + ids = append(ids, central[i].Id) + } + if err := db.Model(xray.ClientTraffic{}). + Where("inbound_id IN ?", ids). + Find(¢ralClientStats).Error; err != nil { + return false, err + } + } + type csKey struct { + inboundID int + email string + } + centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats)) + for i := range centralClientStats { + centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = ¢ralClientStats[i] + } + + var defaultUserId int + if len(central) > 0 { + defaultUserId = central[0].UserId + } else { + var u model.User + if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil { + defaultUserId = u.Id + } else { + defaultUserId = 1 + } + } + tx := db.Begin() committed := false defer func() { @@ -1623,42 +1586,101 @@ func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnaps } }() - // Per-inbound counter merge. Skip rows whose central allTime is - // suspiciously lower than the snapshot AND we're inside the grace - // window — that's the "reset hit central but not the node yet" - // pattern we want to defer until next tick. + structuralChange := false + + snapTags := make(map[string]struct{}, len(snap.Inbounds)) for _, snapIb := range snap.Inbounds { if snapIb == nil { continue } + snapTags[snapIb.Tag] = struct{}{} + c, ok := tagToCentral[snapIb.Tag] if !ok { - continue // node has an inbound the central doesn't know about — ignore - } - snapAllTime := snapIb.AllTime - if snapAllTime == 0 { - snapAllTime = snapIb.Up + snapIb.Down - } - inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs - if inGrace && snapAllTime > c.AllTime { - logger.Debug("SetRemoteTraffic: skipping inbound", c.Id, "in reset grace window") + newIb := model.Inbound{ + UserId: defaultUserId, + NodeID: &nodeID, + Tag: snapIb.Tag, + Listen: snapIb.Listen, + Port: snapIb.Port, + Protocol: snapIb.Protocol, + Settings: snapIb.Settings, + StreamSettings: snapIb.StreamSettings, + Sniffing: snapIb.Sniffing, + TrafficReset: snapIb.TrafficReset, + Enable: snapIb.Enable, + Remark: snapIb.Remark, + Total: snapIb.Total, + ExpiryTime: snapIb.ExpiryTime, + Up: snapIb.Up, + Down: snapIb.Down, + AllTime: snapIb.AllTime, + } + if err := tx.Create(&newIb).Error; err != nil { + logger.Warning("setRemoteTraffic: create central inbound for tag", snapIb.Tag, "failed:", err) + continue + } + tagToCentral[snapIb.Tag] = &newIb + structuralChange = true continue } + + inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs + + updates := map[string]any{ + "enable": snapIb.Enable, + "remark": snapIb.Remark, + "listen": snapIb.Listen, + "port": snapIb.Port, + "protocol": snapIb.Protocol, + "total": snapIb.Total, + "expiry_time": snapIb.ExpiryTime, + "settings": snapIb.Settings, + "stream_settings": snapIb.StreamSettings, + "sniffing": snapIb.Sniffing, + "traffic_reset": snapIb.TrafficReset, + } + if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) { + updates["up"] = snapIb.Up + updates["down"] = snapIb.Down + } + if snapIb.AllTime > c.AllTime { + updates["all_time"] = snapIb.AllTime + } + + if c.Settings != snapIb.Settings || + c.Remark != snapIb.Remark || + c.Listen != snapIb.Listen || + c.Port != snapIb.Port || + c.Total != snapIb.Total || + c.ExpiryTime != snapIb.ExpiryTime || + c.Enable != snapIb.Enable { + structuralChange = true + } + if err := tx.Model(model.Inbound{}). Where("id = ?", c.Id). - Updates(map[string]any{ - "up": snapIb.Up, - "down": snapIb.Down, - "all_time": snapAllTime, - }).Error; err != nil { - return err + Updates(updates).Error; err != nil { + return false, err } } - // Per-client merge. The snapshot's ClientStats are nested under - // each Inbound, so flatten before walking. Each client_traffics row - // is keyed by (inbound_id, email) — we resolve inbound_id from the - // central inbound row matched above. + for _, c := range central { + if _, kept := snapTags[c.Tag]; kept { + continue + } + if err := tx.Where("inbound_id = ?", c.Id). + Delete(&xray.ClientTraffic{}).Error; err != nil { + return false, err + } + if err := tx.Where("id = ?", c.Id). + Delete(&model.Inbound{}).Error; err != nil { + return false, err + } + delete(tagToCentral, c.Tag) + structuralChange = true + } + for _, snapIb := range snap.Inbounds { if snapIb == nil { continue @@ -1667,52 +1689,105 @@ func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnaps if !ok { continue } - // Honour the same grace window for client rows: if the parent - // inbound was just reset, leave its clients alone too. inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs + + snapEmails := make(map[string]struct{}, len(snapIb.ClientStats)) for _, cs := range snapIb.ClientStats { - snapAllTime := cs.AllTime - if snapAllTime == 0 { - snapAllTime = cs.Up + cs.Down - } - if inGrace { - // Skip client rows whose snapshot would push counters - // back up; allow rows that are zero on the node side - // (those are normal — node was reset alongside central). - if snapAllTime > 0 { - continue + snapEmails[cs.Email] = struct{}{} + + existing := centralCS[csKey{c.Id, cs.Email}] + if existing == nil { + if err := tx.Create(&xray.ClientTraffic{ + InboundId: c.Id, + Email: cs.Email, + Enable: cs.Enable, + Total: cs.Total, + ExpiryTime: cs.ExpiryTime, + Reset: cs.Reset, + Up: cs.Up, + Down: cs.Down, + AllTime: cs.AllTime, + LastOnline: cs.LastOnline, + }).Error; err != nil { + return false, err } + structuralChange = true + continue } - // MAX(last_online, ?) so a momentary clock skew on the node - // can't regress the central row's last-seen timestamp. + + if existing.Enable != cs.Enable || + existing.Total != cs.Total || + existing.ExpiryTime != cs.ExpiryTime || + existing.Reset != cs.Reset { + structuralChange = true + } + + allTime := existing.AllTime + if cs.AllTime > allTime { + allTime = cs.AllTime + } + + if inGrace && cs.Up+cs.Down > 0 { + if err := tx.Exec( + `UPDATE client_traffics + SET enable = ?, total = ?, expiry_time = ?, reset = ?, all_time = ? + WHERE inbound_id = ? AND email = ?`, + cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, allTime, c.Id, cs.Email, + ).Error; err != nil { + return false, err + } + continue + } + if err := tx.Exec( `UPDATE client_traffics - SET up = ?, down = ?, all_time = ?, last_online = MAX(last_online, ?) + SET up = ?, down = ?, enable = ?, total = ?, expiry_time = ?, reset = ?, + all_time = ?, last_online = MAX(last_online, ?) WHERE inbound_id = ? AND email = ?`, - cs.Up, cs.Down, snapAllTime, cs.LastOnline, c.Id, cs.Email, + cs.Up, cs.Down, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, allTime, + cs.LastOnline, c.Id, cs.Email, ).Error; err != nil { - return err + return false, err } } + + for k, existing := range centralCS { + if k.inboundID != c.Id { + continue + } + if _, kept := snapEmails[k.email]; kept { + continue + } + if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email). + Delete(&xray.ClientTraffic{}).Error; err != nil { + return false, err + } + structuralChange = true + } } if err := tx.Commit().Error; err != nil { - return err + return false, err } committed = true - // Push the node's online-clients contribution into xray.Process so - // GetOnlineClients returns the union of local + every node. Empty - // list still calls Set so a node that just had everyone disconnect - // updates promptly. if p != nil { p.SetNodeOnlineClients(nodeID, snap.OnlineEmails) } - return nil + return structuralChange, nil } -func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) { +func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) { + err = submitTrafficWrite(func() error { + var inner error + needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics) + return inner + }) + return +} + +func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) { var err error db := database.GetDB() tx := db.Begin() @@ -1767,7 +1842,7 @@ func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic for _, traffic := range traffics { if traffic.IsInbound { - err = tx.Model(&model.Inbound{}).Where("tag = ?", traffic.Tag). + err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag). Updates(map[string]any{ "up": gorm.Expr("up + ?", traffic.Up), "down": gorm.Expr("down + ?", traffic.Down), @@ -1797,7 +1872,10 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr emails = append(emails, traffic.Email) } dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics)) - err = tx.Model(xray.ClientTraffic{}).Where("email IN (?)", emails).Find(&dbClientTraffics).Error + err = tx.Model(xray.ClientTraffic{}). + Where("email IN (?) AND inbound_id IN (?)", emails, + tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")). + Find(&dbClientTraffics).Error if err != nil { return err } @@ -1911,7 +1989,10 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) { now := time.Now().Unix() * 1000 var err, err1 error - err = tx.Model(xray.ClientTraffic{}).Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).Find(&traffics).Error + err = tx.Model(xray.ClientTraffic{}). + Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now). + Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")). + Find(&traffics).Error if err != nil { return false, 0, err } @@ -2017,7 +2098,7 @@ func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error var tags []string err := tx.Table("inbounds"). Select("inbounds.tag"). - Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true). + Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true). Scan(&tags).Error if err != nil { return false, 0, err @@ -2036,7 +2117,7 @@ func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error } result := tx.Model(model.Inbound{}). - Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true). + Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true). Update("enable", false) err := result.Error count := result.RowsAffected @@ -2050,6 +2131,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) var depletedRows []xray.ClientTraffic err := tx.Model(xray.ClientTraffic{}). Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true). + Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")). Find(&depletedRows).Error if err != nil { return false, 0, err @@ -2152,6 +2234,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) result := tx.Model(xray.ClientTraffic{}). Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true). + Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")). Update("enable", false) err = result.Error count := result.RowsAffected @@ -2163,8 +2246,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) return needRestart, count, nil } - // Mirror enable=false + the row's authoritative quota/expiry into every - // (inbound, email) we just removed via the API. inboundEmailMap := make(map[int]map[string]struct{}) for _, t := range targets { if inboundEmailMap[t.InboundId] == nil { @@ -2744,22 +2825,24 @@ func (s *InboundService) ResetClientTrafficLimitByEmail(clientEmail string, tota } func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error { - db := database.GetDB() - - // Reset traffic stats in ClientTraffic table - result := db.Model(xray.ClientTraffic{}). - Where("email = ?", clientEmail). - Updates(map[string]any{"enable": true, "up": 0, "down": 0}) - - err := result.Error - if err != nil { - return err - } - - return nil + return submitTrafficWrite(func() error { + db := database.GetDB() + return db.Model(xray.ClientTraffic{}). + Where("email = ?", clientEmail). + Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error + }) } -func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (bool, error) { +func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) { + err = submitTrafficWrite(func() error { + var inner error + needRestart, inner = s.resetClientTrafficLocked(id, clientEmail) + return inner + }) + return +} + +func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) { needRestart := false traffic, err := s.GetClientTrafficByEmail(clientEmail) @@ -2825,18 +2908,11 @@ func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (bool, e return false, err } - // Stamp last_traffic_reset_time on the parent inbound so the next - // NodeTrafficSyncJob tick honours the grace window and doesn't pull - // the pre-reset absolute back from the node. now := time.Now().UnixMilli() _ = db.Model(model.Inbound{}). Where("id = ?", id). Update("last_traffic_reset_time", now).Error - // Propagate to the remote node if this inbound is node-managed. - // Best-effort: an offline node shouldn't block a user-driven reset - // — the central DB is already zeroed and the next successful sync - // (within the grace window) will re-pull whatever the node has. inbound, err := s.GetInbound(id) if err == nil && inbound != nil && inbound.NodeID != nil { if rt, rterr := s.runtimeFor(inbound); rterr == nil { @@ -2852,6 +2928,12 @@ func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (bool, e } func (s *InboundService) ResetAllClientTraffics(id int) error { + return submitTrafficWrite(func() error { + return s.resetAllClientTrafficsLocked(id) + }) +} + +func (s *InboundService) resetAllClientTrafficsLocked(id int) error { db := database.GetDB() now := time.Now().Unix() * 1000 @@ -2889,19 +2971,12 @@ func (s *InboundService) ResetAllClientTraffics(id int) error { return err } - // Propagate to remote nodes after the central DB is settled. Single - // inbound: one rt.ResetInboundClientTraffics call. id == -1 (all - // inbounds across panel): walk every node-managed inbound and call - // the per-inbound endpoint — there's no panel-wide endpoint that - // only resets clients without zeroing inbound counters. var inbounds []model.Inbound q := db.Model(model.Inbound{}).Where("node_id IS NOT NULL") if id != -1 { q = q.Where("id = ?", id) } if err := q.Find(&inbounds).Error; err != nil { - // Failed to discover which inbounds to propagate to — central - // DB is already correct, log and move on. logger.Warning("ResetAllClientTraffics: discover node inbounds failed:", err) return nil } @@ -2920,6 +2995,12 @@ func (s *InboundService) ResetAllClientTraffics(id int) error { } func (s *InboundService) ResetAllTraffics() error { + return submitTrafficWrite(func() error { + return s.resetAllTrafficsLocked() + }) +} + +func (s *InboundService) resetAllTrafficsLocked() error { db := database.GetDB() now := time.Now().UnixMilli() @@ -2933,10 +3014,6 @@ func (s *InboundService) ResetAllTraffics() error { return err } - // Propagate to every node that has at least one inbound on this - // panel. We can't blanket-call rt.ResetAllTraffics because that - // would also zero traffic for inbounds the node hosts but the - // central panel doesn't know about — instead reset per inbound. var inbounds []model.Inbound if err := db.Model(model.Inbound{}). Where("node_id IS NOT NULL"). @@ -2959,13 +3036,12 @@ func (s *InboundService) ResetAllTraffics() error { } func (s *InboundService) ResetInboundTraffic(id int) error { - db := database.GetDB() - - result := db.Model(model.Inbound{}). - Where("id = ?", id). - Updates(map[string]any{"up": 0, "down": 0}) - - return result.Error + return submitTrafficWrite(func() error { + db := database.GetDB() + return db.Model(model.Inbound{}). + Where("id = ?", id). + Updates(map[string]any{"up": 0, "down": 0}).Error + }) } func (s *InboundService) DelDepletedClients(id int) (err error) { @@ -3229,11 +3305,6 @@ func chunkInts(s []int, size int) [][]int { return out } -// GetActiveClientTraffics returns the absolute ClientTraffic rows for the given -// emails. Used by the WebSocket delta path to push per-client absolute -// counters without re-serializing the full inbound list. The query is chunked -// to stay under SQLite's bind-variable limit on very large active sets. -// Empty input returns (nil, nil). func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) { uniq := uniqueNonEmptyStrings(emails) if len(uniq) == 0 { @@ -3251,9 +3322,6 @@ func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.Clien return traffics, nil } -// InboundTrafficSummary is the minimal projection of an inbound's traffic -// counters used by the WebSocket delta path. Excludes Settings/StreamSettings -// blobs so the broadcast stays compact even with many inbounds. type InboundTrafficSummary struct { Id int `json:"id"` Up int64 `json:"up"` @@ -3263,9 +3331,6 @@ type InboundTrafficSummary struct { Enable bool `json:"enable"` } -// GetInboundsTrafficSummary returns inbound-level absolute traffic counters -// (no per-client expansion). Companion to GetActiveClientTraffics — together -// they replace the heavy "full inbound list" broadcast on each cron tick. func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) { db := database.GetDB() var summaries []InboundTrafficSummary @@ -3293,26 +3358,20 @@ func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.Cl } func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error { - db := database.GetDB() - - // Keep all_time monotonic: it represents historical cumulative usage and - // must never be less than the currently-tracked up+down. Without this, - // the UI showed "Общий трафик" (allTime) below the live consumed value - // after admins manually edited a client's counters. - result := db.Model(xray.ClientTraffic{}). - Where("email = ?", email). - Updates(map[string]any{ - "up": upload, - "down": download, - "all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download), - }) - - err := result.Error - if err != nil { - logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err) + return submitTrafficWrite(func() error { + db := database.GetDB() + err := db.Model(xray.ClientTraffic{}). + Where("email = ?", email). + Updates(map[string]any{ + "up": upload, + "down": download, + "all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download), + }).Error + if err != nil { + logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err) + } return err - } - return nil + }) } func (s *InboundService) GetClientTrafficByID(id string) ([]xray.ClientTraffic, error) { @@ -3642,18 +3701,12 @@ func (s *InboundService) GetOnlineClients() []string { return p.GetOnlineClients() } -// SetNodeOnlineClients records a remote node's online-clients list on -// the panel-wide xray.Process so GetOnlineClients returns the union of -// local + every node's contribution. Called by NodeTrafficSyncJob. func (s *InboundService) SetNodeOnlineClients(nodeID int, emails []string) { if p != nil { p.SetNodeOnlineClients(nodeID, emails) } } -// ClearNodeOnlineClients drops one node's contribution to the online -// set. Used when the per-node sync probe fails so a downed node -// doesn't keep its clients listed as online forever. func (s *InboundService) ClearNodeOnlineClients(nodeID int) { if p != nil { p.ClearNodeOnlineClients(nodeID) diff --git a/web/service/node.go b/web/service/node.go index afb76f3c..9cdaf2b7 100644 --- a/web/service/node.go +++ b/web/service/node.go @@ -16,10 +16,6 @@ import ( "github.com/mhsanaei/3x-ui/v3/web/runtime" ) -// HeartbeatPatch is the slice of fields a single Probe() result writes -// back to a Node row. We pass it as a struct (not a *model.Node) so the -// heartbeat path can't accidentally clobber configuration columns the -// user just edited. type HeartbeatPatch struct { Status string LastHeartbeat int64 @@ -31,13 +27,8 @@ type HeartbeatPatch struct { LastError string } -// NodeService manages remote 3x-ui nodes registered with this panel. -// It owns CRUD for the Node model and the HTTP probe used by both the -// heartbeat job and the on-demand "test connection" UI action. type NodeService struct{} -// httpClient is shared so repeated probes reuse TCP/TLS connections. -// Timeout is per-request, set on each Do() via context. var nodeHTTPClient = &http.Client{ Transport: &http.Transport{ MaxIdleConns: 64, @@ -62,8 +53,6 @@ func (s *NodeService) GetById(id int) (*model.Node, error) { return n, nil } -// normalize fills in defaults and trims accidental whitespace before save. -// Pulled out so Create and Update share the same rules. func (s *NodeService) normalize(n *model.Node) error { n.Name = strings.TrimSpace(n.Name) n.Address = strings.TrimSpace(n.Address) @@ -109,9 +98,6 @@ func (s *NodeService) Update(id int, in *model.Node) error { if err := db.Where("id = ?", id).First(existing).Error; err != nil { return err } - // Only persist user-controlled columns. Heartbeat fields stay where - // the heartbeat job last wrote them so a no-op edit doesn't blank - // the dashboard out for ten seconds. updates := map[string]any{ "name": in.Name, "remark": in.Remark, @@ -125,8 +111,6 @@ func (s *NodeService) Update(id int, in *model.Node) error { if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil { return err } - // Drop any cached Remote so the next inbound op picks up the fresh - // address/token. Cheap to do unconditionally — the next miss rebuilds. if mgr := runtime.GetManager(); mgr != nil { mgr.InvalidateNode(id) } @@ -141,8 +125,6 @@ func (s *NodeService) Delete(id int) error { if mgr := runtime.GetManager(); mgr != nil { mgr.InvalidateNode(id) } - // Drop in-memory series so a freshly created node with the same id - // doesn't inherit stale points (sqlite reuses ids freely). nodeMetrics.drop(nodeMetricKey(id, "cpu")) nodeMetrics.drop(nodeMetricKey(id, "mem")) return nil @@ -153,9 +135,6 @@ func (s *NodeService) SetEnable(id int, enable bool) error { return db.Model(model.Node{}).Where("id = ?", id).Update("enable", enable).Error } -// UpdateHeartbeat persists the slice of fields written by a probe. We -// don't touch updated_at via gorm autoUpdateTime here — that field is -// reserved for user-driven config edits. func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error { db := database.GetDB() updates := map[string]any{ @@ -171,9 +150,6 @@ func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error { if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil { return err } - // Only record online ticks. Offline probes carry zeroed cpu/mem and - // would draw a misleading dip on the chart; the gap on the x-axis is - // the truthful representation of "we couldn't reach the node". if p.Status == "online" { now := time.Unix(p.LastHeartbeat, 0) nodeMetrics.append(nodeMetricKey(id, "cpu"), now, p.CpuPct) @@ -182,28 +158,14 @@ func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error { return nil } -// nodeMetricKey is the namespacing used inside the singleton ring buffer -// so per-node metrics don't collide with each other or with the system -// metrics in the sibling singleton. func nodeMetricKey(id int, metric string) string { return "node:" + strconv.Itoa(id) + ":" + metric } -// AggregateNodeMetric returns up to maxPoints averaged buckets for one -// node's metric (currently "cpu" or "mem"). Output shape matches -// AggregateSystemMetric: {"t": unixSec, "v": value}. func (s *NodeService) AggregateNodeMetric(id int, metric string, bucketSeconds int, maxPoints int) []map[string]any { return nodeMetrics.aggregate(nodeMetricKey(id, metric), bucketSeconds, maxPoints) } -// Probe issues a single GET to the node's /panel/api/server/status and -// returns a HeartbeatPatch. On error the patch is zero-valued except -// for LastError; the caller is responsible for setting Status="offline". -// -// The remote endpoint requires authentication: we send the per-node -// ApiToken as a Bearer token, which the remote APIController.checkAPIAuth -// validates. Calls without a token would just get a 404, which masks -// the existence of the API entirely. func (s *NodeService) Probe(ctx context.Context, n *model.Node) (HeartbeatPatch, error) { patch := HeartbeatPatch{LastHeartbeat: time.Now().Unix()} url := fmt.Sprintf("%s://%s:%d%spanel/api/server/status", @@ -233,16 +195,10 @@ func (s *NodeService) Probe(ctx context.Context, n *model.Node) (HeartbeatPatch, return patch, errors.New(patch.LastError) } - // The remote wraps Status in entity.Msg. We decode into a typed - // envelope rather than map[string]any so a schema change on the - // remote shows up as a Go error instead of a silent zero-fill. var envelope struct { Success bool `json:"success"` Msg string `json:"msg"` Obj *struct { - Cpu uint64 `json:"-"` - // Status fields we care about. Decode CPU/Mem nested - // structs minimally — anything else gets discarded. CpuPct float64 `json:"cpu"` Mem struct { Current uint64 `json:"current"` @@ -272,8 +228,6 @@ func (s *NodeService) Probe(ctx context.Context, n *model.Node) (HeartbeatPatch, return patch, nil } -// EnvelopeForUI is the shape a frontend test-connection action expects. -// Pulling it out keeps the controller dumb. type ProbeResultUI struct { Status string `json:"status"` LatencyMs int `json:"latencyMs"` diff --git a/web/service/traffic_writer.go b/web/service/traffic_writer.go new file mode 100644 index 00000000..b3fa97b6 --- /dev/null +++ b/web/service/traffic_writer.go @@ -0,0 +1,87 @@ +package service + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/mhsanaei/3x-ui/v3/logger" +) + +const ( + trafficWriterQueueSize = 256 + trafficWriterSubmitTimeout = 5 * time.Second +) + +type trafficWriteRequest struct { + apply func() error + done chan error +} + +var ( + twQueue chan *trafficWriteRequest + twCtx context.Context + twCancel context.CancelFunc + twDone chan struct{} + twOnce sync.Once +) + +func StartTrafficWriter() { + twOnce.Do(func() { + twQueue = make(chan *trafficWriteRequest, trafficWriterQueueSize) + twCtx, twCancel = context.WithCancel(context.Background()) + twDone = make(chan struct{}) + go runTrafficWriter() + }) +} + +func StopTrafficWriter() { + if twCancel != nil { + twCancel() + <-twDone + } +} + +func runTrafficWriter() { + defer close(twDone) + for { + select { + case req := <-twQueue: + req.done <- safeApply(req.apply) + case <-twCtx.Done(): + for { + select { + case req := <-twQueue: + req.done <- safeApply(req.apply) + default: + return + } + } + } + } +} + +func safeApply(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("traffic writer panic: %v", r) + logger.Error(err.Error()) + } + }() + return fn() +} + +func submitTrafficWrite(fn func() error) error { + if twQueue == nil { + return safeApply(fn) + } + req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)} + select { + case twQueue <- req: + case <-time.After(trafficWriterSubmitTimeout): + return errors.New("traffic writer queue full") + } + return <-req.done +} diff --git a/web/web.go b/web/web.go index bb05056c..9a62c895 100644 --- a/web/web.go +++ b/web/web.go @@ -286,12 +286,9 @@ func (s *Server) startTask() { // check client ips from log file every 10 sec s.cron.AddJob("@every 10s", job.NewCheckClientIpJob()) - // Probe every enabled remote node every 10 sec - s.cron.AddJob("@every 10s", job.NewNodeHeartbeatJob()) + s.cron.AddJob("@every 5s", job.NewNodeHeartbeatJob()) - // Pull traffic + online-clients from every online node every 10 sec - // and merge absolute counters into the central DB. - s.cron.AddJob("@every 10s", job.NewNodeTrafficSyncJob()) + s.cron.AddJob("@every 5s", job.NewNodeTrafficSyncJob()) // check client ips from log file every day s.cron.AddJob("@daily", job.NewClearLogsJob()) @@ -362,6 +359,8 @@ func (s *Server) Start() (err error) { if err != nil { return err } + service.StartTrafficWriter() + s.cron = cron.New(cron.WithLocation(loc), cron.WithSeconds()) s.cron.Start() @@ -446,6 +445,7 @@ func (s *Server) Stop() error { if s.cron != nil { s.cron.Stop() } + service.StopTrafficWriter() if s.tgbotService.IsRunning() { s.tgbotService.Stop() } diff --git a/web/websocket/hub.go b/web/websocket/hub.go index 7d43e3e6..5eeb80da 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -43,8 +43,8 @@ const ( // rapid mutations cannot drown the hub. Bursts within the interval are // dropped (not coalesced); the next broadcast outside the window delivers // the latest state. Only message types in throttledMessageTypes are gated — - // heartbeat and real-time signals (status, traffic, client_stats, - // notification, xray_state, invalidate) bypass this so they are never delayed. + // heartbeat and one-shot signals (status, notification, xray_state, + // invalidate) bypass this so they are never delayed. minBroadcastInterval = 250 * time.Millisecond // hubRestartAttempts caps panic-recovery restarts. After this many @@ -103,19 +103,13 @@ func NewHub() *Hub { } } -// throttledMessageTypes is the explicit allow-list of message types subject to -// the per-type rate limit. Everything else (status, traffic, client_stats, -// notification, xray_state, invalidate) is heartbeat- or signal-class and must -// not be delayed. Keeping the set explicit (vs. an exclusion list) makes the -// intent obvious when new message types are added — by default they bypass. var throttledMessageTypes = map[MessageType]struct{}{ - MessageTypeInbounds: {}, - MessageTypeOutbounds: {}, + MessageTypeInbounds: {}, + MessageTypeOutbounds: {}, + MessageTypeTraffic: {}, + MessageTypeClientStats: {}, } -// shouldThrottle returns true if a broadcast of msgType is rate-limited and -// happened within minBroadcastInterval of the previous one. Only message types -// in throttledMessageTypes are gated. func (h *Hub) shouldThrottle(msgType MessageType) bool { if _, gated := throttledMessageTypes[msgType]; !gated { return false