- Replace plain textarea with CodeMirror editor (YAML syntax highlighting, line numbers, auto-indent) for Clash subscription template - Fix confAlerts crash when subClashURI/subURI/subJsonURI is null/undefined (prevented save button from enabling) - Add yaml.js CodeMirror mode asset - Include docs and .gitignore cleanup
54 KiB
Multi-Node Shared Control Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add a minimal multi-node shared-control mode to 3x-ui where master owns shared-account writes, worker nodes rebuild local Xray config from synchronized snapshots, and all nodes flush traffic deltas back to MariaDB without counter loss.
Architecture: Keep the current controller -> service -> database -> local Xray flow intact. Add node-role config, shared metadata tables, cache-backed worker sync, and a durable traffic delta flush path around the existing services instead of changing Xray protocol behavior. In shared mode, workers never write shared account definitions directly, and traffic reconciliation that mutates enable/expiry state runs only on master.
Tech Stack: Go, GORM, MariaDB, SQLite compatibility mode for legacy single-node installs, existing web/service, web/job, web/controller, xray models, shell installers install.sh and x-ui.sh.
File Map
Existing files to modify
config/config.go— add typed node-role config readers, validation, and runtime file path helpers.config/config_test.go— cover node config defaults, validation, and shared runtime file paths.main.go— validate node config at startup, extendsettingCLI flags, and print node settings insetting -show.database/db.go— migrate shared metadata models and seed the shared version row.database/db_test.go— verify metadata tables, version helpers, and node-state upsert behavior.web/service/inbound.go— enforce master-only shared writes, bump shared version on successful shared mutations, and extract shared traffic reconciliation fromAddTraffic.web/service/xray.go— split config building from DB reads so worker sync can rebuild from cached snapshots.web/web.go— start worker sync / master heartbeat loops and the traffic flush loop with server lifecycle context.web/job/xray_traffic_job.go— branch shared-mode traffic collection away from direct DB writes.xray/client_traffic.go— add a composite unique key for(inbound_id, email)so atomic delta upserts are safe.x-ui.sh— show node config and add minimal node-management actions.install.sh— add fresh-install prompts for MariaDB and node role while preserving upgrade behavior.README.md— add high-level multi-node shared-control documentation.README.zh_CN.md— add Chinese operator guidance.
New files to create
database/model/shared_state.go—shared_accounts_versionmetadata model.database/model/node_state.go— node heartbeat / sync status model.database/shared_state.go— shared version and node-state repository helpers.web/service/node_guard.go— node-role helpers andRequireMaster.web/service/node_guard_test.go— node-role guard and transactional version-bump tests.web/service/node_cache.go— shared snapshot load/save helpers.web/service/node_sync.go— worker snapshot polling, cache refresh, heartbeat, and node-state updates.web/service/node_sync_test.go— snapshot persistence and sync-loop unit tests.web/service/traffic_pending.go— durable pending traffic delta store.web/service/traffic_flush.go— shared-mode delta collection and batch flush service.web/service/traffic_flush_test.go— pending delta merge and flush success/failure coverage.docs/multi-node-sync.md— operator runbook and manual verification checklist.docs/superpowers/progress/2026-04-10-multi-node-shared-control-progress.md— task-by-task execution tracker with mode and commit checkpoints.
Runtime files created by the feature
/etc/x-ui/x-ui.json— now also storesnodeRole,nodeId,syncInterval,trafficFlushInterval./etc/x-ui/shared-cache.json— last good shared account snapshot used by workers./etc/x-ui/traffic-pending.json— durable queue of unflushed traffic deltas.
Task 1: Add node config, runtime file paths, and startup validation
Files:
-
Modify:
config/config.go -
Modify:
config/config_test.go -
Modify:
main.goExecution Mode: Inline -
Step 1: Write the failing config tests
Add these tests to config/config_test.go:
func writeTestSettingsFile(t *testing.T, settings map[string]any) {
t.Helper()
data, err := json.MarshalIndent(settings, "", " ")
if err != nil {
t.Fatalf("MarshalIndent error: %v", err)
}
if err := os.WriteFile(GetSettingPath(), data, 0644); err != nil {
t.Fatalf("WriteFile error: %v", err)
}
}
func TestGetNodeConfigFromJSONDefaults(t *testing.T) {
tmpDir := t.TempDir()
t.Setenv("XUI_DB_FOLDER", tmpDir)
writeTestSettingsFile(t, map[string]any{})
cfg := GetNodeConfigFromJSON()
if cfg.Role != NodeRoleMaster {
t.Fatalf("expected default role %q, got %q", NodeRoleMaster, cfg.Role)
}
if cfg.NodeID != "" {
t.Fatalf("expected empty default node id, got %q", cfg.NodeID)
}
if cfg.SyncIntervalSeconds != 30 {
t.Fatalf("expected default sync interval 30, got %d", cfg.SyncIntervalSeconds)
}
if cfg.TrafficFlushSeconds != 10 {
t.Fatalf("expected default traffic flush interval 10, got %d", cfg.TrafficFlushSeconds)
}
}
func TestValidateNodeConfigWorkerRequiresNodeID(t *testing.T) {
err := ValidateNodeConfig(NodeConfig{
Role: NodeRoleWorker,
SyncIntervalSeconds: 30,
TrafficFlushSeconds: 10,
}, DBConfig{Type: "mariadb"})
if err == nil {
t.Fatal("expected worker without node id to fail validation")
}
}
func TestValidateNodeConfigWorkerRequiresMariaDB(t *testing.T) {
err := ValidateNodeConfig(NodeConfig{
Role: NodeRoleWorker,
NodeID: "worker-1",
SyncIntervalSeconds: 30,
TrafficFlushSeconds: 10,
}, DBConfig{Type: "sqlite"})
if err == nil {
t.Fatal("expected worker on sqlite to fail validation")
}
}
func TestSharedRuntimeFilePaths(t *testing.T) {
tmpDir := t.TempDir()
t.Setenv("XUI_DB_FOLDER", tmpDir)
if got := GetSharedCachePath(); got != filepath.Join(tmpDir, "shared-cache.json") {
t.Fatalf("unexpected shared cache path: %s", got)
}
if got := GetTrafficPendingPath(); got != filepath.Join(tmpDir, "traffic-pending.json") {
t.Fatalf("unexpected traffic pending path: %s", got)
}
}
- Step 2: Run the config test subset and confirm it fails
Run:
go test ./config -run 'Test(GetNodeConfigFromJSONDefaults|ValidateNodeConfigWorkerRequiresNodeID|ValidateNodeConfigWorkerRequiresMariaDB|SharedRuntimeFilePaths)' -v
Expected: FAIL with undefined NodeConfig, NodeRoleMaster, GetNodeConfigFromJSON, ValidateNodeConfig, GetSharedCachePath, or GetTrafficPendingPath.
- Step 3: Implement node config and runtime path helpers
Add these types and helpers in config/config.go:
type NodeRole string
const (
NodeRoleMaster NodeRole = "master"
NodeRoleWorker NodeRole = "worker"
)
type NodeConfig struct {
Role NodeRole
NodeID string
SyncIntervalSeconds int
TrafficFlushSeconds int
}
func GetSharedCachePath() string {
return filepath.Join(GetDBFolderPath(), "shared-cache.json")
}
func GetTrafficPendingPath() string {
return filepath.Join(GetDBFolderPath(), "traffic-pending.json")
}
func readGroupedInt(settings map[string]any, key string, fallback int) int {
readInt := func(value any) (int, bool) {
switch v := value.(type) {
case float64:
return int(v), true
case int:
return v, true
case string:
i, err := strconv.Atoi(v)
if err == nil {
return i, true
}
}
return 0, false
}
if groups, ok := settingGroupAliases[key]; ok {
for _, groupName := range groups {
if group, ok := settings[groupName].(map[string]any); ok {
if value, ok := readInt(group[key]); ok {
return value
}
}
}
}
if value, ok := readInt(settings[key]); ok {
return value
}
return fallback
}
func GetNodeConfigFromJSON() NodeConfig {
data, err := os.ReadFile(GetSettingPath())
if err != nil {
return NodeConfig{Role: NodeRoleMaster, SyncIntervalSeconds: 30, TrafficFlushSeconds: 10}
}
var settings map[string]any
if err := json.Unmarshal(data, &settings); err != nil {
return NodeConfig{Role: NodeRoleMaster, SyncIntervalSeconds: 30, TrafficFlushSeconds: 10}
}
role := readGroupedString(settings, "nodeRole")
if role == "" {
role = string(NodeRoleMaster)
}
return NodeConfig{
Role: NodeRole(role),
NodeID: readGroupedString(settings, "nodeId"),
SyncIntervalSeconds: readGroupedInt(settings, "syncInterval", 30),
TrafficFlushSeconds: readGroupedInt(settings, "trafficFlushInterval", 10),
}
}
func ValidateNodeConfig(nodeCfg NodeConfig, dbCfg DBConfig) error {
switch nodeCfg.Role {
case NodeRoleMaster, NodeRoleWorker:
default:
return fmt.Errorf("invalid nodeRole %q", nodeCfg.Role)
}
if nodeCfg.Role == NodeRoleWorker && nodeCfg.NodeID == "" {
return fmt.Errorf("worker mode requires nodeId")
}
if nodeCfg.Role == NodeRoleWorker && dbCfg.Type != "mariadb" {
return fmt.Errorf("worker mode requires mariadb")
}
if nodeCfg.SyncIntervalSeconds <= 0 {
return fmt.Errorf("syncInterval must be positive")
}
if nodeCfg.TrafficFlushSeconds <= 0 {
return fmt.Errorf("trafficFlushInterval must be positive")
}
return nil
}
Also extend settingGroupAliases so these keys can be read from both top-level JSON and the legacy other group:
"nodeRole": {"other"},
"nodeId": {"other"},
"syncInterval": {"other"},
"trafficFlushInterval": {"other"},
- Step 4: Validate node config at startup and expose CLI setters
Patch main.go in two places.
At startup, validate config before DB init:
func runWebServer() {
log.Printf("Starting %v %v", config.GetName(), config.GetVersion())
dbCfg := config.GetDBConfigFromJSON()
nodeCfg := config.GetNodeConfigFromJSON()
if err := config.ValidateNodeConfig(nodeCfg, dbCfg); err != nil {
log.Fatalf("invalid node configuration: %v", err)
}
switch config.GetLogLevel() {
Extend setting flags and setting -show output:
var nodeRoleFlag string
var nodeIDFlag string
var syncIntervalFlag int
var trafficFlushIntervalFlag int
settingCmd.StringVar(&nodeRoleFlag, "nodeRole", "", "Set node role (master or worker)")
settingCmd.StringVar(&nodeIDFlag, "nodeId", "", "Set node identifier")
settingCmd.IntVar(&syncIntervalFlag, "syncInterval", 0, "Set shared sync interval in seconds")
settingCmd.IntVar(&trafficFlushIntervalFlag, "trafficFlushInterval", 0, "Set traffic flush interval in seconds")
func showSetting(show bool) {
if show {
nodeCfg := config.GetNodeConfigFromJSON()
fmt.Println("nodeRole:", nodeCfg.Role)
fmt.Println("nodeId:", nodeCfg.NodeID)
fmt.Println("syncInterval:", nodeCfg.SyncIntervalSeconds)
fmt.Println("trafficFlushInterval:", nodeCfg.TrafficFlushSeconds)
}
}
When setters are used, validate before writing:
candidate := config.GetNodeConfigFromJSON()
if nodeRoleFlag != "" {
candidate.Role = config.NodeRole(nodeRoleFlag)
}
if nodeIDFlag != "" {
candidate.NodeID = nodeIDFlag
}
if syncIntervalFlag > 0 {
candidate.SyncIntervalSeconds = syncIntervalFlag
}
if trafficFlushIntervalFlag > 0 {
candidate.TrafficFlushSeconds = trafficFlushIntervalFlag
}
if err := config.ValidateNodeConfig(candidate, config.GetDBConfigFromJSON()); err != nil {
fmt.Println("Invalid node settings:", err)
return
}
Then persist with WriteSettingToJSON:
if nodeRoleFlag != "" {
_ = config.WriteSettingToJSON("nodeRole", nodeRoleFlag)
}
if nodeIDFlag != "" {
_ = config.WriteSettingToJSON("nodeId", nodeIDFlag)
}
if syncIntervalFlag > 0 {
_ = config.WriteSettingToJSON("syncInterval", strconv.Itoa(syncIntervalFlag))
}
if trafficFlushIntervalFlag > 0 {
_ = config.WriteSettingToJSON("trafficFlushInterval", strconv.Itoa(trafficFlushIntervalFlag))
}
- Step 5: Run config tests and package discovery
Run:
go test ./config -run 'Test(GetNodeConfigFromJSONDefaults|ValidateNodeConfigWorkerRequiresNodeID|ValidateNodeConfigWorkerRequiresMariaDB|SharedRuntimeFilePaths|GetDBConfigFromJSONSupportsModulePurposeLayout|WriteSettingToJSONUsesModulePurposeGroup)' -v
go test ./... -run TestNonExistent -count=0
Expected:
-
the focused
./configtests PASS -
package discovery succeeds without running unrelated tests
-
Step 6: Checkpoint Commit the config work
Run:
git add config/config.go config/config_test.go main.go
git commit -m "feat: add node config and startup validation"
After commit, update docs/superpowers/progress/2026-04-10-multi-node-shared-control-progress.md:
- mark Task 1 complete
- record the short commit hash beside Task 1
Task 2: Add shared metadata models and repository helpers
Files:
-
Create:
database/model/shared_state.go -
Create:
database/model/node_state.go -
Create:
database/shared_state.go -
Modify:
database/db.go -
Modify:
database/db_test.goExecution Mode: Inline -
Step 1: Write the failing database tests
Add these tests to database/db_test.go:
func TestInitDB_CreatesSharedMetadataTables(t *testing.T) {
setupTestDB(t)
for _, table := range []string{"shared_states", "node_states"} {
var count int64
if err := db.Table(table).Count(&count).Error; err != nil {
t.Fatalf("table %s should exist: %v", table, err)
}
}
}
func TestBumpSharedAccountsVersion(t *testing.T) {
setupTestDB(t)
version, err := GetSharedAccountsVersion(GetDB())
if err != nil {
t.Fatalf("GetSharedAccountsVersion error: %v", err)
}
if version != 0 {
t.Fatalf("expected seeded version 0, got %d", version)
}
tx := GetDB().Begin()
if err := BumpSharedAccountsVersion(tx); err != nil {
t.Fatalf("BumpSharedAccountsVersion error: %v", err)
}
if err := tx.Commit().Error; err != nil {
t.Fatalf("Commit error: %v", err)
}
version, err = GetSharedAccountsVersion(GetDB())
if err != nil {
t.Fatalf("GetSharedAccountsVersion error: %v", err)
}
if version != 1 {
t.Fatalf("expected bumped version 1, got %d", version)
}
}
func TestUpsertNodeState(t *testing.T) {
setupTestDB(t)
state := &model.NodeState{
NodeID: "worker-1",
NodeRole: "worker",
LastSeenVersion: 7,
LastError: "dial tcp timeout",
}
if err := UpsertNodeState(GetDB(), state); err != nil {
t.Fatalf("UpsertNodeState error: %v", err)
}
var stored model.NodeState
if err := GetDB().First(&stored, "node_id = ?", "worker-1").Error; err != nil {
t.Fatalf("lookup node state failed: %v", err)
}
if stored.LastSeenVersion != 7 {
t.Fatalf("expected last seen version 7, got %d", stored.LastSeenVersion)
}
if stored.LastError != "dial tcp timeout" {
t.Fatalf("expected last error to round-trip, got %q", stored.LastError)
}
}
- Step 2: Run the database test subset and confirm it fails
Run:
go test ./database -run 'Test(InitDB_CreatesSharedMetadataTables|BumpSharedAccountsVersion|UpsertNodeState)' -v
Expected: FAIL with missing model.NodeState, GetSharedAccountsVersion, BumpSharedAccountsVersion, or UpsertNodeState.
- Step 3: Add metadata models and DB helpers
Create database/model/shared_state.go:
package model
type SharedState struct {
Key string `json:"key" gorm:"primaryKey"`
Version int64 `json:"version" gorm:"not null;default:0"`
UpdatedAt int64 `json:"updatedAt"`
}
Create database/model/node_state.go:
package model
type NodeState struct {
NodeID string `json:"nodeId" gorm:"primaryKey"`
NodeRole string `json:"nodeRole" gorm:"not null"`
LastSyncAt int64 `json:"lastSyncAt"`
LastHeartbeatAt int64 `json:"lastHeartbeatAt"`
LastSeenVersion int64 `json:"lastSeenVersion"`
LastError string `json:"lastError"`
UpdatedAt int64 `json:"updatedAt"`
}
Create database/shared_state.go:
package database
import (
"time"
"github.com/mhsanaei/3x-ui/v2/database/model"
"gorm.io/gorm"
)
const SharedAccountsVersionKey = "shared_accounts_version"
func txOrDB(tx *gorm.DB) *gorm.DB {
if tx != nil {
return tx
}
return GetDB()
}
func seedSharedAccountsVersion(tx *gorm.DB) error {
return txOrDB(tx).FirstOrCreate(&model.SharedState{
Key: SharedAccountsVersionKey,
Version: 0,
UpdatedAt: time.Now().Unix(),
}, &model.SharedState{Key: SharedAccountsVersionKey}).Error
}
func GetSharedAccountsVersion(tx *gorm.DB) (int64, error) {
state := &model.SharedState{}
err := txOrDB(tx).First(state, "key = ?", SharedAccountsVersionKey).Error
if err != nil {
return 0, err
}
return state.Version, nil
}
func BumpSharedAccountsVersion(tx *gorm.DB) error {
now := time.Now().Unix()
return txOrDB(tx).Model(&model.SharedState{}).
Where("key = ?", SharedAccountsVersionKey).
Updates(map[string]any{
"version": gorm.Expr("version + 1"),
"updated_at": now,
}).Error
}
func UpsertNodeState(tx *gorm.DB, state *model.NodeState) error {
state.UpdatedAt = time.Now().Unix()
return txOrDB(tx).Save(state).Error
}
- Step 4: Register metadata models and seed the shared version row
Patch database/db.go:
func initModels() error {
models := []any{
&model.User{},
&model.Inbound{},
&model.OutboundTraffics{},
&model.Setting{},
&model.InboundClientIps{},
&xray.ClientTraffic{},
&model.HistoryOfSeeders{},
&model.SharedState{},
&model.NodeState{},
}
for _, model := range models {
if err := db.AutoMigrate(model); err != nil {
return err
}
}
if err := seedSharedAccountsVersion(db); err != nil {
return err
}
return nil
}
- Step 5: Run the database tests
Run:
go test ./database -run 'Test(InitDB_CreatesSharedMetadataTables|BumpSharedAccountsVersion|UpsertNodeState|InitDB_CreatesTables|InitDB_Idempotent)' -v
Expected: PASS
- Step 6: Checkpoint Commit the metadata work
Run:
git add database/model/shared_state.go database/model/node_state.go database/shared_state.go database/db.go database/db_test.go
git commit -m "feat: add shared metadata models and helpers"
After commit, update docs/superpowers/progress/2026-04-10-multi-node-shared-control-progress.md:
- mark Task 2 complete
- record the short commit hash beside Task 2
Task 3: Enforce master-only shared writes and transactional version bumping
Files:
-
Create:
web/service/node_guard.go -
Create:
web/service/node_guard_test.go -
Modify:
web/service/inbound.goExecution Mode: Inline -
Step 1: Write the failing node-guard tests
Create web/service/node_guard_test.go with:
package service
import (
"encoding/json"
"os"
"path/filepath"
"testing"
"github.com/mhsanaei/3x-ui/v2/config"
"github.com/mhsanaei/3x-ui/v2/database"
)
func writeNodeGuardSettings(t *testing.T, settings map[string]any) {
t.Helper()
data, err := json.MarshalIndent(settings, "", " ")
if err != nil {
t.Fatalf("MarshalIndent error: %v", err)
}
if err := os.WriteFile(config.GetSettingPath(), data, 0644); err != nil {
t.Fatalf("WriteFile error: %v", err)
}
}
func setupNodeGuardDB(t *testing.T) {
t.Helper()
tmpDir := t.TempDir()
t.Setenv("XUI_DB_FOLDER", tmpDir)
if err := database.InitDBWithPath(filepath.Join(tmpDir, "service.db")); err != nil {
t.Fatalf("InitDBWithPath error: %v", err)
}
t.Cleanup(func() { database.CloseDB() })
}
func TestRequireMasterRejectsWorker(t *testing.T) {
tmpDir := t.TempDir()
t.Setenv("XUI_DB_FOLDER", tmpDir)
writeNodeGuardSettings(t, map[string]any{
"dbType": "mariadb",
"nodeRole": "worker",
"nodeId": "worker-1",
})
if err := RequireMaster(); err == nil {
t.Fatal("expected worker mode to be rejected")
}
}
func TestRequireMasterAllowsMaster(t *testing.T) {
tmpDir := t.TempDir()
t.Setenv("XUI_DB_FOLDER", tmpDir)
writeNodeGuardSettings(t, map[string]any{
"dbType": "mariadb",
"nodeRole": "master",
})
if err := RequireMaster(); err != nil {
t.Fatalf("expected master mode to pass: %v", err)
}
}
func TestBumpSharedAccountsVersionRollsBackWithTransaction(t *testing.T) {
setupNodeGuardDB(t)
tx := database.GetDB().Begin()
if err := database.BumpSharedAccountsVersion(tx); err != nil {
t.Fatalf("BumpSharedAccountsVersion error: %v", err)
}
tx.Rollback()
version, err := database.GetSharedAccountsVersion(database.GetDB())
if err != nil {
t.Fatalf("GetSharedAccountsVersion error: %v", err)
}
if version != 0 {
t.Fatalf("expected rolled-back version to remain 0, got %d", version)
}
}
- Step 2: Run the guard tests and confirm they fail
Run:
go test ./web/service -run 'Test(RequireMasterRejectsWorker|RequireMasterAllowsMaster|BumpSharedAccountsVersionRollsBackWithTransaction)' -v
Expected: FAIL with undefined RequireMaster.
- Step 3: Add shared-write guard helpers
Create web/service/node_guard.go:
package service
import (
"errors"
"github.com/mhsanaei/3x-ui/v2/config"
)
var ErrSharedWriteRequiresMaster = errors.New("shared-account writes are only allowed on master nodes")
func IsWorker() bool {
return config.GetNodeConfigFromJSON().Role == config.NodeRoleWorker
}
func IsMaster() bool {
return !IsWorker()
}
func RequireMaster() error {
if IsWorker() {
return ErrSharedWriteRequiresMaster
}
return nil
}
func IsSharedModeEnabled() bool {
return config.GetDBConfigFromJSON().Type == "mariadb"
}
- Step 4: Guard shared writes and bump shared version inside successful transactions
Patch web/service/inbound.go with one reusable helper:
func ensureSharedWriteAllowed() error {
return RequireMaster()
}
func bumpSharedVersion(tx *gorm.DB) error {
return database.BumpSharedAccountsVersion(tx)
}
Apply this prologue to shared mutators:
if err := ensureSharedWriteAllowed(); err != nil {
return nil, false, err
}
Apply the version bump immediately before commit in:
AddInboundUpdateInboundDelInboundAddInboundClientDelInboundClientUpdateInboundClientResetClientTrafficResetAllTrafficsDelDepletedClientsDelInboundClientByEmail
Use the same pattern in each method’s existing transaction, preserving that method’s current return type:
if err := tx.Save(oldInbound).Error; err != nil {
return false, err
}
if err := bumpSharedVersion(tx); err != nil {
return false, err
}
return needRestart, nil
Do not change controller behavior in this task. Let the existing controller paths surface the service-layer error message.
- Step 5: Run the node-guard tests
Run:
go test ./web/service -run 'Test(RequireMasterRejectsWorker|RequireMasterAllowsMaster|BumpSharedAccountsVersionRollsBackWithTransaction)' -v
Expected: PASS
- Step 6: Checkpoint Commit the guard work
Run:
git add web/service/node_guard.go web/service/node_guard_test.go web/service/inbound.go
git commit -m "feat: guard shared writes and bump version transactionally"
After commit, update docs/superpowers/progress/2026-04-10-multi-node-shared-control-progress.md:
- mark Task 3 complete
- record the short commit hash beside Task 3
Task 4: Add shared snapshot cache, worker sync loop, and snapshot-driven Xray rebuild
Files:
-
Create:
web/service/node_cache.go -
Create:
web/service/node_sync.go -
Create:
web/service/node_sync_test.go -
Modify:
web/service/xray.go -
Modify:
web/web.goExecution Mode: Subagent-Driven -
Step 1: Write the failing snapshot and sync tests
Create web/service/node_sync_test.go:
package service
import (
"path/filepath"
"testing"
"github.com/mhsanaei/3x-ui/v2/database/model"
)
func TestLoadAndSaveSharedAccountsSnapshot(t *testing.T) {
path := filepath.Join(t.TempDir(), "shared-cache.json")
snapshot := &SharedAccountsSnapshot{
Version: 2,
Inbounds: []*model.Inbound{
{Id: 1, Tag: "inbound-443", Enable: true, Port: 443},
},
}
if err := SaveSharedAccountsSnapshot(path, snapshot); err != nil {
t.Fatalf("SaveSharedAccountsSnapshot error: %v", err)
}
loaded, err := LoadSharedAccountsSnapshot(path)
if err != nil {
t.Fatalf("LoadSharedAccountsSnapshot error: %v", err)
}
if loaded.Version != 2 || len(loaded.Inbounds) != 1 {
t.Fatalf("unexpected snapshot round-trip: %+v", loaded)
}
}
func TestSyncOnceSkipsApplyWhenVersionUnchanged(t *testing.T) {
applyCalls := 0
svc := &NodeSyncService{
cachePath: filepath.Join(t.TempDir(), "shared-cache.json"),
loadVersion: func() (int64, error) { return 3, nil },
loadSnapshot: func(int64) (*SharedAccountsSnapshot, error) {
t.Fatal("loadSnapshot should not run when version is unchanged")
return nil, nil
},
applySnapshot: func(*SharedAccountsSnapshot) error {
applyCalls++
return nil
},
lastSeenVersion: 3,
}
if err := svc.SyncOnce(); err != nil {
t.Fatalf("SyncOnce error: %v", err)
}
if applyCalls != 0 {
t.Fatalf("expected applySnapshot to be skipped, got %d calls", applyCalls)
}
}
func TestSyncOnceRefreshesCacheAndAppliesSnapshot(t *testing.T) {
cachePath := filepath.Join(t.TempDir(), "shared-cache.json")
applyCalls := 0
svc := &NodeSyncService{
cachePath: cachePath,
loadVersion: func() (int64, error) { return 4, nil },
loadSnapshot: func(version int64) (*SharedAccountsSnapshot, error) {
return &SharedAccountsSnapshot{
Version: version,
Inbounds: []*model.Inbound{
{Id: 7, Tag: "worker-8443", Enable: true, Port: 8443},
},
}, nil
},
applySnapshot: func(snapshot *SharedAccountsSnapshot) error {
applyCalls++
if snapshot.Version != 4 {
t.Fatalf("expected snapshot version 4, got %d", snapshot.Version)
}
return nil
},
}
if err := svc.SyncOnce(); err != nil {
t.Fatalf("SyncOnce error: %v", err)
}
if applyCalls != 1 {
t.Fatalf("expected one apply call, got %d", applyCalls)
}
loaded, err := LoadSharedAccountsSnapshot(cachePath)
if err != nil {
t.Fatalf("LoadSharedAccountsSnapshot error: %v", err)
}
if loaded.Version != 4 {
t.Fatalf("expected cached version 4, got %d", loaded.Version)
}
}
func TestBootstrapFromCacheAppliesCachedSnapshot(t *testing.T) {
cachePath := filepath.Join(t.TempDir(), "shared-cache.json")
if err := SaveSharedAccountsSnapshot(cachePath, &SharedAccountsSnapshot{
Version: 5,
Inbounds: []*model.Inbound{
{Id: 9, Tag: "cached-9443", Enable: true, Port: 9443},
},
}); err != nil {
t.Fatalf("SaveSharedAccountsSnapshot error: %v", err)
}
applyCalls := 0
svc := &NodeSyncService{
cachePath: cachePath,
applySnapshot: func(snapshot *SharedAccountsSnapshot) error {
applyCalls++
if snapshot.Version != 5 {
t.Fatalf("expected cached version 5, got %d", snapshot.Version)
}
return nil
},
}
if err := svc.BootstrapFromCache(); err != nil {
t.Fatalf("BootstrapFromCache error: %v", err)
}
if applyCalls != 1 {
t.Fatalf("expected one cached apply, got %d", applyCalls)
}
}
- Step 2: Run the sync test subset and confirm it fails
Run:
go test ./web/service -run 'Test(LoadAndSaveSharedAccountsSnapshot|SyncOnceSkipsApplyWhenVersionUnchanged|SyncOnceRefreshesCacheAndAppliesSnapshot|BootstrapFromCacheAppliesCachedSnapshot)' -v
Expected: FAIL with undefined SharedAccountsSnapshot, SaveSharedAccountsSnapshot, LoadSharedAccountsSnapshot, or NodeSyncService.
- Step 3: Add cache helpers and refactor Xray config building away from DB reads
Create web/service/node_cache.go:
package service
import (
"encoding/json"
"os"
"github.com/mhsanaei/3x-ui/v2/database/model"
)
type SharedAccountsSnapshot struct {
Version int64 `json:"version"`
Inbounds []*model.Inbound `json:"inbounds"`
}
func LoadSharedAccountsSnapshot(path string) (*SharedAccountsSnapshot, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
snapshot := &SharedAccountsSnapshot{}
if err := json.Unmarshal(data, snapshot); err != nil {
return nil, err
}
return snapshot, nil
}
func SaveSharedAccountsSnapshot(path string, snapshot *SharedAccountsSnapshot) error {
data, err := json.MarshalIndent(snapshot, "", " ")
if err != nil {
return err
}
return os.WriteFile(path, data, 0644)
}
Refactor web/service/xray.go so worker sync can build from cached inbounds:
func (s *XrayService) BuildConfigFromInbounds(inbounds []*model.Inbound) (*xray.Config, error) {
templateConfig, err := s.settingService.GetXrayConfigTemplate()
if err != nil {
return nil, err
}
xrayConfig := &xray.Config{}
if err := json.Unmarshal([]byte(templateConfig), xrayConfig); err != nil {
return nil, err
}
for _, inbound := range inbounds {
if !inbound.Enable {
continue
}
// move the existing settings/streamSettings normalization logic here
inboundConfig := inbound.GenXrayInboundConfig()
xrayConfig.InboundConfigs = append(xrayConfig.InboundConfigs, *inboundConfig)
}
return xrayConfig, nil
}
func (s *XrayService) RestartXrayWithConfig(xrayConfig *xray.Config, isForce bool) error {
lock.Lock()
defer lock.Unlock()
isManuallyStopped.Store(false)
if s.IsXrayRunning() {
if !isForce && p.GetConfig().Equals(xrayConfig) && !isNeedXrayRestart.Load() {
return nil
}
p.Stop()
}
p = xray.NewProcess(xrayConfig)
result = ""
return p.Start()
}
func (s *XrayService) GetXrayConfig() (*xray.Config, error) {
inbounds, err := s.inboundService.GetAllInbounds()
if err != nil {
return nil, err
}
return s.BuildConfigFromInbounds(inbounds)
}
func (s *XrayService) ApplySharedSnapshot(snapshot *SharedAccountsSnapshot) error {
xrayConfig, err := s.BuildConfigFromInbounds(snapshot.Inbounds)
if err != nil {
return err
}
return s.RestartXrayWithConfig(xrayConfig, false)
}
- Step 4: Implement the node sync service
Create web/service/node_sync.go:
package service
import (
"context"
"os"
"time"
"github.com/mhsanaei/3x-ui/v2/config"
"github.com/mhsanaei/3x-ui/v2/database"
"github.com/mhsanaei/3x-ui/v2/database/model"
)
type NodeSyncService struct {
xrayService XrayService
cachePath string
lastSeenVersion int64
loadVersion func() (int64, error)
loadSnapshot func(int64) (*SharedAccountsSnapshot, error)
applySnapshot func(*SharedAccountsSnapshot) error
}
func NewNodeSyncService() *NodeSyncService {
svc := &NodeSyncService{
cachePath: config.GetSharedCachePath(),
}
svc.loadVersion = func() (int64, error) {
return database.GetSharedAccountsVersion(database.GetDB())
}
svc.loadSnapshot = func(version int64) (*SharedAccountsSnapshot, error) {
inbounds, err := svc.xrayService.inboundService.GetAllInbounds()
if err != nil {
return nil, err
}
return &SharedAccountsSnapshot{Version: version, Inbounds: inbounds}, nil
}
svc.applySnapshot = svc.xrayService.ApplySharedSnapshot
return svc
}
func (s *NodeSyncService) updateNodeState(version int64, syncErr error, didSync bool) {
nodeCfg := config.GetNodeConfigFromJSON()
now := time.Now().Unix()
state := &model.NodeState{
NodeID: nodeCfg.NodeID,
NodeRole: string(nodeCfg.Role),
LastHeartbeatAt: now,
LastSeenVersion: version,
}
if didSync {
state.LastSyncAt = now
}
if syncErr != nil {
state.LastError = syncErr.Error()
}
_ = database.UpsertNodeState(database.GetDB(), state)
}
func (s *NodeSyncService) BootstrapFromCache() error {
snapshot, err := LoadSharedAccountsSnapshot(s.cachePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
s.lastSeenVersion = snapshot.Version
return s.applySnapshot(snapshot)
}
func (s *NodeSyncService) SyncOnce() error {
version, err := s.loadVersion()
if err != nil {
s.updateNodeState(s.lastSeenVersion, err, false)
return err
}
if version == s.lastSeenVersion {
s.updateNodeState(version, nil, false)
return nil
}
snapshot, err := s.loadSnapshot(version)
if err != nil {
s.updateNodeState(s.lastSeenVersion, err, false)
return err
}
if err := SaveSharedAccountsSnapshot(s.cachePath, snapshot); err != nil {
s.updateNodeState(s.lastSeenVersion, err, false)
return err
}
if err := s.applySnapshot(snapshot); err != nil {
s.updateNodeState(s.lastSeenVersion, err, false)
return err
}
s.lastSeenVersion = version
s.updateNodeState(version, nil, true)
return nil
}
func (s *NodeSyncService) Run(ctx context.Context, interval time.Duration) {
_ = s.BootstrapFromCache()
_ = s.SyncOnce()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = s.SyncOnce()
}
}
}
func (s *NodeSyncService) RunHeartbeatLoop(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
version, _ := database.GetSharedAccountsVersion(database.GetDB())
s.updateNodeState(version, nil, false)
}
}
}
- Step 5: Start worker sync or master heartbeat on server startup
Patch web/web.go:
func (s *Server) startNodeLoops() {
nodeCfg := config.GetNodeConfigFromJSON()
nodeSyncService := service.NewNodeSyncService()
interval := time.Duration(nodeCfg.SyncIntervalSeconds) * time.Second
if nodeCfg.Role == config.NodeRoleWorker {
go nodeSyncService.Run(s.ctx, interval)
return
}
if nodeCfg.NodeID != "" {
go nodeSyncService.RunHeartbeatLoop(s.ctx, interval)
}
}
Call it from Start() after s.startTask():
s.startTask()
s.startNodeLoops()
- Step 6: Run the sync tests
Run:
go test ./web/service -run 'Test(LoadAndSaveSharedAccountsSnapshot|SyncOnceSkipsApplyWhenVersionUnchanged|SyncOnceRefreshesCacheAndAppliesSnapshot|BootstrapFromCacheAppliesCachedSnapshot)' -v
Expected: PASS
- Step 7: Checkpoint Commit the sync work
Run:
git add web/service/node_cache.go web/service/node_sync.go web/service/node_sync_test.go web/service/xray.go web/web.go
git commit -m "feat: add cache-backed worker sync and heartbeat loops"
After commit, update docs/superpowers/progress/2026-04-10-multi-node-shared-control-progress.md:
- mark Task 4 complete
- record the short commit hash beside Task 4
Task 5: Add durable traffic delta persistence and safe shared-mode flushes
Files:
-
Create:
web/service/traffic_pending.go -
Create:
web/service/traffic_flush.go -
Create:
web/service/traffic_flush_test.go -
Modify:
web/job/xray_traffic_job.go -
Modify:
web/service/inbound.go -
Modify:
web/web.go -
Modify:
xray/client_traffic.goExecution Mode: Subagent-Driven -
Step 1: Write the failing pending-delta and flush tests
Create web/service/traffic_flush_test.go:
package service
import (
"errors"
"path/filepath"
"testing"
"github.com/mhsanaei/3x-ui/v2/database"
"github.com/mhsanaei/3x-ui/v2/database/model"
"github.com/mhsanaei/3x-ui/v2/xray"
)
func TestTrafficPendingStoreMerge(t *testing.T) {
store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json"))
if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 7}}); err != nil {
t.Fatalf("Merge error: %v", err)
}
if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", DownDelta: 9}}); err != nil {
t.Fatalf("Merge error: %v", err)
}
deltas, err := store.Load()
if err != nil {
t.Fatalf("Load error: %v", err)
}
if len(deltas) != 1 {
t.Fatalf("expected one merged delta, got %d", len(deltas))
}
if deltas[0].UpDelta != 7 || deltas[0].DownDelta != 9 {
t.Fatalf("unexpected merged delta: %+v", deltas[0])
}
}
func TestFlushOnceClearsPendingOnSuccess(t *testing.T) {
tmpDir := t.TempDir()
t.Setenv("XUI_DB_FOLDER", tmpDir)
if err := database.InitDBWithPath(filepath.Join(tmpDir, "flush.db")); err != nil {
t.Fatalf("InitDBWithPath error: %v", err)
}
defer database.CloseDB()
if err := database.GetDB().Create(&model.Inbound{Id: 1, Tag: "inbound-443", Enable: true}).Error; err != nil {
t.Fatalf("seed inbound failed: %v", err)
}
if err := database.GetDB().Create(&xray.ClientTraffic{InboundId: 1, Email: "alice@example.com"}).Error; err != nil {
t.Fatalf("seed client traffic failed: %v", err)
}
store := NewTrafficPendingStore(filepath.Join(tmpDir, "traffic-pending.json"))
if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 7, DownDelta: 9}}); err != nil {
t.Fatalf("Merge error: %v", err)
}
svc := NewTrafficFlushService(store)
if err := svc.FlushOnce(); err != nil {
t.Fatalf("FlushOnce error: %v", err)
}
var clientTraffic xray.ClientTraffic
if err := database.GetDB().First(&clientTraffic, "inbound_id = ? AND email = ?", 1, "alice@example.com").Error; err != nil {
t.Fatalf("lookup client traffic failed: %v", err)
}
if clientTraffic.Up != 7 || clientTraffic.Down != 9 {
t.Fatalf("unexpected flushed traffic: %+v", clientTraffic)
}
deltas, err := store.Load()
if err != nil {
t.Fatalf("Load error: %v", err)
}
if len(deltas) != 0 {
t.Fatalf("expected pending deltas to be cleared, got %+v", deltas)
}
}
func TestFlushOnceKeepsPendingOnFailure(t *testing.T) {
store := NewTrafficPendingStore(filepath.Join(t.TempDir(), "traffic-pending.json"))
if err := store.Merge([]TrafficDelta{{InboundID: 1, Email: "alice@example.com", UpDelta: 3}}); err != nil {
t.Fatalf("Merge error: %v", err)
}
svc := NewTrafficFlushService(store)
svc.flushFn = func([]TrafficDelta) error { return errors.New("boom") }
if err := svc.FlushOnce(); err == nil {
t.Fatal("expected flush failure")
}
deltas, err := store.Load()
if err != nil {
t.Fatalf("Load error: %v", err)
}
if len(deltas) != 1 {
t.Fatalf("expected pending delta to remain, got %+v", deltas)
}
}
- Step 2: Run the flush test subset and confirm it fails
Run:
go test ./web/service -run 'Test(TrafficPendingStoreMerge|FlushOnceClearsPendingOnSuccess|FlushOnceKeepsPendingOnFailure)' -v
Expected: FAIL with undefined TrafficDelta, NewTrafficPendingStore, or NewTrafficFlushService.
- Step 3: Implement the pending-delta store and add the composite unique key
Create web/service/traffic_pending.go:
package service
import (
"encoding/json"
"fmt"
"os"
"sync"
)
type TrafficDelta struct {
InboundID int `json:"inboundId"`
Email string `json:"email"`
UpDelta int64 `json:"upDelta"`
DownDelta int64 `json:"downDelta"`
}
type TrafficPendingStore struct {
path string
mu sync.Mutex
}
func NewTrafficPendingStore(path string) *TrafficPendingStore {
return &TrafficPendingStore{path: path}
}
func (s *TrafficPendingStore) Load() ([]TrafficDelta, error) {
data, err := os.ReadFile(s.path)
if os.IsNotExist(err) {
return []TrafficDelta{}, nil
}
if err != nil {
return nil, err
}
var deltas []TrafficDelta
if err := json.Unmarshal(data, &deltas); err != nil {
return nil, err
}
return deltas, nil
}
func (s *TrafficPendingStore) Save(deltas []TrafficDelta) error {
data, err := json.MarshalIndent(deltas, "", " ")
if err != nil {
return err
}
return os.WriteFile(s.path, data, 0644)
}
func (s *TrafficPendingStore) Merge(newDeltas []TrafficDelta) error {
s.mu.Lock()
defer s.mu.Unlock()
current, err := s.Load()
if err != nil {
return err
}
index := map[string]int{}
for i, delta := range current {
index[deltaKey(delta.InboundID, delta.Email)] = i
}
for _, delta := range newDeltas {
key := deltaKey(delta.InboundID, delta.Email)
if idx, ok := index[key]; ok {
current[idx].UpDelta += delta.UpDelta
current[idx].DownDelta += delta.DownDelta
continue
}
index[key] = len(current)
current = append(current, delta)
}
return s.Save(current)
}
func deltaKey(inboundID int, email string) string {
return fmt.Sprintf("%d:%s", inboundID, email)
}
Patch xray/client_traffic.go so shared flushes can use deterministic upserts:
type ClientTraffic struct {
Id int `json:"id" form:"id" gorm:"primaryKey;autoIncrement"`
InboundId int `json:"inboundId" form:"inboundId" gorm:"uniqueIndex:idx_client_traffics_inbound_email"`
Enable bool `json:"enable" form:"enable"`
Email string `json:"email" form:"email" gorm:"uniqueIndex:idx_client_traffics_inbound_email"`
- Step 4: Implement shared-mode flushes and master-only traffic reconciliation
Create web/service/traffic_flush.go:
package service
import (
"context"
"time"
"github.com/mhsanaei/3x-ui/v2/config"
"github.com/mhsanaei/3x-ui/v2/database"
"github.com/mhsanaei/3x-ui/v2/database/model"
"github.com/mhsanaei/3x-ui/v2/xray"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type TrafficFlushService struct {
store *TrafficPendingStore
inbounds InboundService
flushFn func([]TrafficDelta) error
}
func NewTrafficFlushService(store *TrafficPendingStore) *TrafficFlushService {
svc := &TrafficFlushService{store: store}
svc.flushFn = svc.flushToDatabase
return svc
}
func (s *TrafficFlushService) Collect(clientTraffics []*xray.ClientTraffic) error {
deltas := make([]TrafficDelta, 0, len(clientTraffics))
for _, traffic := range clientTraffics {
if traffic.Up == 0 && traffic.Down == 0 {
continue
}
deltas = append(deltas, TrafficDelta{
InboundID: traffic.InboundId,
Email: traffic.Email,
UpDelta: traffic.Up,
DownDelta: traffic.Down,
})
}
return s.store.Merge(deltas)
}
func (s *TrafficFlushService) flushToDatabase(deltas []TrafficDelta) error {
return database.GetDB().Transaction(func(tx *gorm.DB) error {
for _, delta := range deltas {
if err := tx.Model(&model.Inbound{}).
Where("id = ?", delta.InboundID).
Updates(map[string]any{
"up": gorm.Expr("up + ?", delta.UpDelta),
"down": gorm.Expr("down + ?", delta.DownDelta),
"all_time": gorm.Expr("COALESCE(all_time, 0) + ?", delta.UpDelta+delta.DownDelta),
}).Error; err != nil {
return err
}
row := xray.ClientTraffic{
InboundId: delta.InboundID,
Email: delta.Email,
Up: delta.UpDelta,
Down: delta.DownDelta,
AllTime: delta.UpDelta + delta.DownDelta,
}
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "inbound_id"}, {Name: "email"}},
DoUpdates: clause.Assignments(map[string]any{
"up": gorm.Expr("up + ?", delta.UpDelta),
"down": gorm.Expr("down + ?", delta.DownDelta),
"all_time": gorm.Expr("all_time + ?", delta.UpDelta+delta.DownDelta),
}),
}).Create(&row).Error; err != nil {
return err
}
}
if IsMaster() {
_, err := s.inbounds.ReconcileSharedTrafficState(tx)
if err != nil {
return err
}
}
return nil
})
}
func (s *TrafficFlushService) FlushOnce() error {
deltas, err := s.store.Load()
if err != nil || len(deltas) == 0 {
return err
}
if err := s.flushFn(deltas); err != nil {
return err
}
return s.store.Save([]TrafficDelta{})
}
func (s *TrafficFlushService) Run(ctx context.Context) {
interval := time.Duration(config.GetNodeConfigFromJSON().TrafficFlushSeconds) * time.Second
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
_ = s.FlushOnce()
return
case <-ticker.C:
_ = s.FlushOnce()
}
}
}
Extract master-only reconciliation from web/service/inbound.go:
func (s *InboundService) ReconcileSharedTrafficState(tx *gorm.DB) (bool, error) {
needRestart0, _, err := s.autoRenewClients(tx)
if err != nil {
return false, err
}
needRestart1, _, err := s.disableInvalidClients(tx)
if err != nil {
return false, err
}
needRestart2, _, err := s.disableInvalidInbounds(tx)
if err != nil {
return false, err
}
return needRestart0 || needRestart1 || needRestart2, nil
}
Do not let worker shared-mode traffic processing call AddTraffic(), because that path mutates shared enable/expiry state.
- Step 5: Route shared-mode traffic collection through the pending store and start the flush loop
Patch web/job/xray_traffic_job.go:
type XrayTrafficJob struct {
settingService service.SettingService
xrayService service.XrayService
inboundService service.InboundService
outboundService service.OutboundService
trafficFlushSvc *service.TrafficFlushService
}
func NewXrayTrafficJob() *XrayTrafficJob {
return &XrayTrafficJob{
trafficFlushSvc: service.NewTrafficFlushService(
service.NewTrafficPendingStore(config.GetTrafficPendingPath()),
),
}
}
In Run(), branch on shared mode:
if service.IsSharedModeEnabled() {
if err := j.trafficFlushSvc.Collect(clientTraffics); err != nil {
logger.Warning("collect shared traffic failed:", err)
}
} else {
err, needRestart0 := j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
}
if needRestart0 {
j.xrayService.SetToNeedRestart()
}
}
Start the flush loop in web/web.go:
func (s *Server) startTrafficFlushLoop() {
store := service.NewTrafficPendingStore(config.GetTrafficPendingPath())
flushService := service.NewTrafficFlushService(store)
go flushService.Run(s.ctx)
}
Call it from Start() after s.startNodeLoops():
s.startTrafficFlushLoop()
- Step 6: Run the flush tests and package discovery
Run:
go test ./web/service -run 'Test(TrafficPendingStoreMerge|FlushOnceClearsPendingOnSuccess|FlushOnceKeepsPendingOnFailure)' -v
go test ./... -run TestNonExistent -count=0
Expected:
-
the focused flush tests PASS
-
package discovery still succeeds after the new service wiring
-
Step 7: Checkpoint Commit the shared traffic work
Run:
git add web/service/traffic_pending.go web/service/traffic_flush.go web/service/traffic_flush_test.go web/job/xray_traffic_job.go web/service/inbound.go web/web.go xray/client_traffic.go
git commit -m "feat: add durable traffic deltas and shared flush loop"
After commit, update docs/superpowers/progress/2026-04-10-multi-node-shared-control-progress.md:
- mark Task 5 complete
- record the short commit hash beside Task 5
Task 6: Expose node management in shell tools and the installer
Files:
-
Modify:
x-ui.sh -
Modify:
install.shExecution Mode: Subagent-Driven -
Step 1: Add read helpers and node status display to
x-ui.sh
Add reusable JSON readers near the existing DB helpers:
get_node_setting() {
local key="$1"
local default_value="$2"
local json_path="/etc/x-ui/x-ui.json"
if [ ! -f "$json_path" ]; then
echo "$default_value"
return
fi
jq -r "$key // $default_value" "$json_path" 2>/dev/null
}
show_node_status() {
local node_role
local node_id
local sync_interval
local flush_interval
node_role=$(get_node_setting '.nodeRole' '"master"')
node_id=$(get_node_setting '.nodeId' '""')
sync_interval=$(get_node_setting '.syncInterval' '30')
flush_interval=$(get_node_setting '.trafficFlushInterval' '10')
echo "Node role: ${node_role}"
echo "Node ID: ${node_id:-<empty>}"
echo "Sync interval: ${sync_interval}s"
echo "Traffic flush interval: ${flush_interval}s"
}
- Step 2: Add minimal node-management actions to
x-ui.sh
Add menu actions that call the existing Go binary instead of editing JSON directly:
set_node_role() {
read -rp "Enter node role (master/worker): " node_role
if [ "$node_role" != "master" ] && [ "$node_role" != "worker" ]; then
echo "Invalid node role"
return 1
fi
${xui_folder}/x-ui setting -nodeRole "$node_role"
}
set_node_id() {
read -rp "Enter node ID: " node_id
${xui_folder}/x-ui setting -nodeId "$node_id"
}
Menu text should stay minimal:
-
show current node role
-
set
master/worker -
set
nodeId -
remind the operator to restart after changes
-
Step 3: Prompt for MariaDB and node role during fresh installs
Patch the fresh-install branch in install.sh:
read -rp "Database type [mariadb]: " db_type
db_type=${db_type:-mariadb}
${xui_folder}/x-ui setting -dbType "$db_type"
if [ "$db_type" = "mariadb" ]; then
read -rp "MariaDB host [127.0.0.1]: " db_host
read -rp "MariaDB port [3306]: " db_port
read -rp "MariaDB user: " db_user
read -rsp "MariaDB password: " db_pass
echo
read -rp "MariaDB database [3xui]: " db_name
${xui_folder}/x-ui setting -dbHost "${db_host:-127.0.0.1}" -dbPort "${db_port:-3306}" -dbUser "$db_user" -dbPassword "$db_pass" -dbName "${db_name:-3xui}"
fi
read -rp "Node role [master]: " node_role
node_role=${node_role:-master}
if [ "$node_role" = "worker" ]; then
read -rp "Node ID: " node_id
${xui_folder}/x-ui setting -nodeRole worker -nodeId "$node_id"
else
${xui_folder}/x-ui setting -nodeRole master
fi
Do not add this prompt path to upgrades. Preserve existing SQLite upgrade behavior for old installs.
- Step 4: Run shell syntax checks and a CLI smoke check
Run:
bash -n x-ui.sh
bash -n install.sh
./x-ui setting -show true
Expected:
-
both shell scripts pass
bash -n -
./x-ui setting -show trueprintsnodeRole,nodeId,syncInterval, andtrafficFlushInterval -
Step 5: Checkpoint Commit the operator tooling work
Run:
git add x-ui.sh install.sh
git commit -m "feat: add node management shell and installer flows"
After commit, update docs/superpowers/progress/2026-04-10-multi-node-shared-control-progress.md:
- mark Task 6 complete
- record the short commit hash beside Task 6
Task 7: Document the feature and run focused verification
Files:
-
Create:
docs/multi-node-sync.md -
Modify:
README.md -
Modify:
README.zh_CN.mdExecution Mode: Subagent-Driven -
Step 1: Write the operator runbook
Create docs/multi-node-sync.md with these sections:
# Multi-Node Shared Control
## Roles
- `master`: the only node allowed to change shared account definitions
- `worker`: rebuilds local Xray config from shared snapshots and flushes traffic deltas
## Requirements
- shared mode requires MariaDB
- each worker needs a unique `nodeId`
- workers keep `/etc/x-ui/shared-cache.json` for outage survival
## Runtime Loops
- workers poll `shared_accounts_version` every `syncInterval`
- all nodes flush `/etc/x-ui/traffic-pending.json` every `trafficFlushInterval`
- only `master` runs shared traffic reconciliation that can disable or renew clients
- Step 2: Add concise README entries in both languages
Append a short section to README.md:
## Multi-Node Shared Control
- use MariaDB as the shared control database
- keep one `master` node for shared-account writes
- configure other nodes as `worker`
- workers rebuild local Xray config from synchronized snapshots
- traffic is flushed back as deltas, not absolute totals
Append the matching section to README.zh_CN.md:
## 多节点共享控制
- 使用 MariaDB 作为共享控制数据库
- 仅保留一个 `master` 节点负责共享账号写入
- 其他节点配置为 `worker`
- `worker` 通过同步快照重建本地 Xray 配置
- 流量按增量回刷,不覆盖绝对总量
- Step 3: Add the manual verification checklist to the runbook
Append this checklist to docs/multi-node-sync.md:
## Manual Verification
1. Start a `master` node on MariaDB.
2. Start a `worker` node on the same MariaDB with a unique `nodeId`.
3. Change an inbound or client on `master`.
4. Confirm the worker sees a newer `shared_accounts_version` and rebuilds local Xray.
5. Generate traffic on both nodes.
6. Confirm aggregated MariaDB counters increase without overwriting each other.
7. Stop MariaDB briefly and confirm the worker continues using `shared-cache.json`.
8. Restore MariaDB and confirm pending traffic deltas flush successfully.
- Step 4: Run focused verification
Run:
go test ./config ./database ./web/service -v
go test ./... -run TestNonExistent -count=0
Expected:
-
focused packages PASS
-
package discovery succeeds across the repo
-
Step 5: Checkpoint Commit the docs
Run:
git add docs/multi-node-sync.md README.md README.zh_CN.md
git commit -m "docs: add multi-node shared control guidance"
After commit, update docs/superpowers/progress/2026-04-10-multi-node-shared-control-progress.md:
- mark Task 7 complete
- record the short commit hash beside Task 7
Rollout Order
- Task 1 — config, runtime file paths, CLI setters, startup validation
- Task 2 — metadata tables and repository helpers
- Task 3 — master-only guards and version bumping
- Task 4 — cache-backed worker sync and snapshot-driven Xray rebuild
- Task 5 — durable traffic delta collection, atomic flush, and master-only reconciliation
- Task 6 — shell and installer flows
- Task 7 — docs and focused verification
Execution Strategy
- Tasks 1–3 execute Inline in the current session to establish the shared foundations before parallel work begins.
- Tasks 4–7 execute Subagent-Driven after Tasks 1–3 are complete and committed.
- Each task is a checkpoint and must end with its own git commit; do not batch adjacent tasks into one commit.
- After each checkpoint commit, update
docs/superpowers/progress/2026-04-10-multi-node-shared-control-progress.mdbefore moving to the next task.
Acceptance Criteria
masteris the only node that can mutate shared account definitions.- successful shared-account writes increment
shared_accounts_versionin the same transaction. workernodes poll the shared version and rebuild local Xray config from cached snapshots.- workers keep serving from
shared-cache.jsonwhen MariaDB is temporarily unavailable. - traffic is stored locally as deltas and flushed back without overwriting aggregate totals.
- shared-mode traffic collection on
workerno longer callsInboundService.AddTraffic()and therefore no longer mutates shared enable / expiry state. - only
masterperforms shared traffic reconciliation that can disable or renew clients and inbounds. x-ui.sh,install.sh, and the README documents expose the node role and shared-control workflow clearly.
Self-Review
- Spec coverage: the plan covers node config, shared metadata, master-only writes, worker snapshot sync, cache fallback, durable traffic deltas, master-only reconciliation after flush, operator tooling, and docs.
- Placeholder scan: no unfinished placeholder markers remain.
- Type consistency:
NodeConfig,SharedAccountsSnapshot,TrafficDelta,NodeSyncService,TrafficFlushService,RequireMaster, andReconcileSharedTrafficStateare used consistently across tasks.
Execution Handoff
Selected execution strategy:
- Inline foundation: Tasks 1–3
- Subagent-Driven expansion: Tasks 4–7
Progress tracker:
docs/superpowers/progress/2026-04-10-multi-node-shared-control-progress.md