From b40f869f2aa86194ad88cd0111db319d87b47314 Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Fri, 5 Jun 2026 02:26:57 +0200 Subject: [PATCH] fix(node): keep client/inbound edits working when a node is offline (#4923, #4931) Node-backed client and inbound edits no longer hard-fail when the backing node is offline or disabled. Edits commit to the panel DB immediately and reconcile to the node when it reconnects (eventual consistency); the panel is the single source of truth for desired config. - Add Node.ConfigDirty/ConfigDirtyAt; mark a node dirty when an edit commits without reaching it (cleared via CAS on ConfigDirtyAt after a full reconcile). - nodePushPlan() reads node state fresh from the DB, skips the push for offline/disabled nodes (no 10s hang), and treats push failures as non-fatal across every mutation path (client add/update/del + bulk + attach/detach; inbound add/update/del/toggle/resetTraffic). - ReconcileNode() pushes the panel's desired config to a node on reconnect (refreshing the remote tag cache first) and prunes node-side orphans; runs before the traffic pull in the node sync job. - While a node is dirty the traffic pull applies only up/down deltas and node-initiated disables, never overwriting desired config from a stale node snapshot. - Surface a non-blocking 'saved; will sync on reconnect' warning to the UI. Validated with a two-panel Docker E2E: client delete/update, attach/detach, and inbound add/delete all reconcile correctly offline -> reconnect. --- database/model/model.go | 3 + frontend/src/generated/types.ts | 2 + frontend/src/generated/zod.ts | 2 + frontend/src/utils/index.ts | 9 + web/controller/client.go | 8 +- web/controller/util.go | 10 + web/job/node_traffic_sync_job.go | 25 +- web/runtime/remote.go | 13 + web/service/client.go | 344 ++++++++++++-------- web/service/client_group_node_sync_test.go | 2 +- web/service/inbound.go | 323 +++++++++++++----- web/service/node.go | 44 +++ web/service/node_client_traffic_sum_test.go | 2 +- web/service/node_dirty_test.go | 104 ++++++ web/service/node_tag_sync_test.go | 2 +- web/translation/en-US.json | 1 + 16 files changed, 674 insertions(+), 220 deletions(-) create mode 100644 web/service/node_dirty_test.go diff --git a/database/model/model.go b/database/model/model.go index 2db08a29..2092adae 100644 --- a/database/model/model.go +++ b/database/model/model.go @@ -396,6 +396,9 @@ type Node struct { UptimeSecs uint64 `json:"uptimeSecs"` LastError string `json:"lastError"` + ConfigDirty bool `json:"configDirty" gorm:"default:false"` + ConfigDirtyAt int64 `json:"configDirtyAt"` + InboundCount int `json:"inboundCount" gorm:"-"` ClientCount int `json:"clientCount" gorm:"-"` OnlineCount int `json:"onlineCount" gorm:"-"` diff --git a/frontend/src/generated/types.ts b/frontend/src/generated/types.ts index ccb8ebab..d45ecc99 100644 --- a/frontend/src/generated/types.ts +++ b/frontend/src/generated/types.ts @@ -322,6 +322,8 @@ export interface Node { apiToken: string; basePath: string; clientCount: number; + configDirty: boolean; + configDirtyAt: number; cpuPct: number; createdAt: number; depletedCount: number; diff --git a/frontend/src/generated/zod.ts b/frontend/src/generated/zod.ts index 68fdf192..74f76507 100644 --- a/frontend/src/generated/zod.ts +++ b/frontend/src/generated/zod.ts @@ -339,6 +339,8 @@ export const NodeSchema = z.object({ apiToken: z.string(), basePath: z.string(), clientCount: z.number().int(), + configDirty: z.boolean(), + configDirtyAt: z.number().int(), cpuPct: z.number(), createdAt: z.number().int(), depletedCount: z.number().int(), diff --git a/frontend/src/utils/index.ts b/frontend/src/utils/index.ts index a9a61a35..82d0a5aa 100644 --- a/frontend/src/utils/index.ts +++ b/frontend/src/utils/index.ts @@ -1,5 +1,6 @@ import axios from 'axios'; import type { AxiosError, AxiosRequestConfig, AxiosResponse } from 'axios'; +import i18next from 'i18next'; import { getMessage } from './messageBus'; type RespEnvelope = { success?: unknown; msg?: unknown; obj?: unknown }; @@ -32,6 +33,14 @@ export class HttpUtil { } const messageType = msg.success ? 'success' : 'error'; getMessage()[messageType](msg.msg); + if ( + msg.success && + msg.obj && + typeof msg.obj === 'object' && + (msg.obj as { nodePending?: unknown }).nodePending === true + ) { + getMessage().warning(i18next.t('pages.inbounds.toasts.savedNodeOfflineWillSync')); + } } static _respToMsg(resp: AxiosResponse | undefined): Msg { diff --git a/web/controller/client.go b/web/controller/client.go index 36d02f04..f6f54947 100644 --- a/web/controller/client.go +++ b/web/controller/client.go @@ -132,7 +132,7 @@ func (a *ClientController) create(c *gin.Context) { jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err) return } - jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientAddSuccess"), nil) + jsonMsgObj(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientAddSuccess"), pendingNodeObj(a.inboundService.AnyNodePending(payload.InboundIds)), nil) if needRestart { a.xrayService.SetToNeedRestart() } @@ -152,7 +152,7 @@ func (a *ClientController) update(c *gin.Context) { jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err) return } - jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientUpdateSuccess"), nil) + jsonMsgObj(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientUpdateSuccess"), pendingNodeObj(a.clientService.HasPendingNode(&a.inboundService, email)), nil) if needRestart { a.xrayService.SetToNeedRestart() } @@ -190,7 +190,7 @@ func (a *ClientController) attach(c *gin.Context) { jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err) return } - jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientAddSuccess"), nil) + jsonMsgObj(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientAddSuccess"), pendingNodeObj(a.inboundService.AnyNodePending(body.InboundIds)), nil) if needRestart { a.xrayService.SetToNeedRestart() } @@ -470,7 +470,7 @@ func (a *ClientController) detach(c *gin.Context) { jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err) return } - jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientDeleteSuccess"), nil) + jsonMsgObj(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientDeleteSuccess"), pendingNodeObj(a.inboundService.AnyNodePending(body.InboundIds)), nil) if needRestart { a.xrayService.SetToNeedRestart() } diff --git a/web/controller/util.go b/web/controller/util.go index d9358828..d5b9f9e9 100644 --- a/web/controller/util.go +++ b/web/controller/util.go @@ -182,6 +182,16 @@ func jsonMsgObj(c *gin.Context, msg string, obj any, err error) { c.JSON(http.StatusOK, m) } +// pendingNodeObj returns a response object flagging that the save committed +// locally but a backing node was offline/disabled, so the change will be +// mirrored to the node once it reconnects. Returns nil when nothing is pending. +func pendingNodeObj(pending bool) any { + if pending { + return gin.H{"nodePending": true} + } + return nil +} + // pureJsonMsg sends a pure JSON message response with custom status code. func pureJsonMsg(c *gin.Context, statusCode int, success bool, msg string) { c.JSON(statusCode, entity.Msg{ diff --git a/web/job/node_traffic_sync_job.go b/web/job/node_traffic_sync_job.go index e8177b6c..f86e0f51 100644 --- a/web/job/node_traffic_sync_job.go +++ b/web/job/node_traffic_sync_job.go @@ -15,6 +15,7 @@ import ( const ( nodeTrafficSyncConcurrency = 8 nodeTrafficSyncRequestTimeout = 4 * time.Second + nodeReconcileTimeout = 30 * time.Second ) type NodeTrafficSyncJob struct { @@ -151,21 +152,37 @@ func (j *NodeTrafficSyncJob) Run() { } func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) { - ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout) - defer cancel() - rt, err := mgr.RemoteFor(n) if err != nil { logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err) return } + + if n.ConfigDirty { + reconcileCtx, reconcileCancel := context.WithTimeout(context.Background(), nodeReconcileTimeout) + reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n.Id) + reconcileCancel() + if reconcileErr != nil { + logger.Warning("node traffic sync: reconcile for", n.Name, "failed:", reconcileErr) + return + } + if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil { + logger.Warning("node traffic sync: clear dirty for", n.Name, "failed:", clearErr) + } + j.structural.set() + } + + ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout) + defer cancel() + snap, err := rt.FetchTrafficSnapshot(ctx) if err != nil { logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err) j.inboundService.ClearNodeOnlineClients(n.Id) return } - changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap) + _, _, dirty, _, _ := j.nodeService.NodeSyncState(n.Id) + changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap, dirty) if err != nil { logger.Warning("node traffic sync: merge for", n.Name, "failed:", err) return diff --git a/web/runtime/remote.go b/web/runtime/remote.go index 1e5ba422..fe374713 100644 --- a/web/runtime/remote.go +++ b/web/runtime/remote.go @@ -191,6 +191,19 @@ func (r *Remote) cacheDel(tag string) { delete(r.remoteIDByTag, tag) } +func (r *Remote) ListRemoteTags(ctx context.Context) ([]string, error) { + if err := r.refreshRemoteIDs(ctx); err != nil { + return nil, err + } + r.mu.RLock() + defer r.mu.RUnlock() + tags := make([]string, 0, len(r.remoteIDByTag)) + for tag := range r.remoteIDByTag { + tags = append(tags, tag) + } + return tags, nil +} + func (r *Remote) refreshRemoteIDs(ctx context.Context) error { env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil) if err != nil { diff --git a/web/service/client.go b/web/service/client.go index cf26344e..232817b8 100644 --- a/web/service/client.go +++ b/web/service/client.go @@ -547,6 +547,17 @@ func validateClientSubID(subID string) error { return nil } +func (s *ClientService) HasPendingNode(inboundSvc *InboundService, email string) bool { + if strings.TrimSpace(email) == "" { + return false + } + ids, err := s.GetInboundIdsForEmail(nil, email) + if err != nil { + return false + } + return inboundSvc.AnyNodePending(ids) +} + func (s *ClientService) Create(inboundSvc *InboundService, payload *ClientCreatePayload) (bool, error) { if payload == nil { return false, common.NewError("empty payload") @@ -1290,6 +1301,7 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId } needRestart := false + markDirty := false for _, r := range removed { email := r.email emailShared := sharedSet[strings.ToLower(strings.TrimSpace(email))] @@ -1324,12 +1336,18 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId } } if oldInbound.NodeID != nil && len(email) > 0 { - rt, rterr := inboundSvc.runtimeFor(oldInbound) - if rterr != nil { - return needRestart, rterr + rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + if perr != nil { + return needRestart, perr } - if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { - return needRestart, err1 + if dirty { + markDirty = true + } + if push { + if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { + logger.Warning("Error in deleting client on", rt.Name(), ":", err1) + markDirty = true + } } } } @@ -1344,6 +1362,11 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId if err := s.SyncInbound(db, inboundId, finalClients); err != nil { return needRestart, err } + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } + } return needRestart, nil } @@ -2722,27 +2745,33 @@ func (s *ClientService) bulkAdjustInboundClients( } oldInbound.Settings = string(newSettings) + markDirty := false if oldInbound.NodeID != nil { - rt, rterr := inboundSvc.runtimeFor(oldInbound) - if rterr != nil { + rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + if perr != nil { for email := range foundEmails { - res.perEmailSkipped[email] = rterr.Error() + res.perEmailSkipped[email] = perr.Error() delete(foundEmails, email) } } else { - for email := range foundEmails { - entry := plan[email] - updated := *entry.record.ToClient() - if entry.applyExpiry { - updated.ExpiryTime = entry.newExpiry - } - if entry.applyTotal { - updated.TotalGB = entry.newTotal - } - updated.UpdatedAt = nowMs - if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil { - res.perEmailSkipped[email] = err1.Error() - delete(foundEmails, email) + if dirty { + markDirty = true + } + if push { + for email := range foundEmails { + entry := plan[email] + updated := *entry.record.ToClient() + if entry.applyExpiry { + updated.ExpiryTime = entry.newExpiry + } + if entry.applyTotal { + updated.TotalGB = entry.newTotal + } + updated.UpdatedAt = nowMs + if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil { + logger.Warning("Error in updating client on", rt.Name(), ":", err1) + markDirty = true + } } } } @@ -2765,6 +2794,10 @@ func (s *ClientService) bulkAdjustInboundClients( res.perEmailSkipped[email] = txErr.Error() } } + } else if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } return res @@ -3083,6 +3116,7 @@ func (s *ClientService) bulkDelInboundClients( } } + markDirty := false if oldInbound.NodeID == nil { rt, rterr := inboundSvc.runtimeFor(oldInbound) if rterr != nil { @@ -3104,17 +3138,22 @@ func (s *ClientService) bulkDelInboundClients( } } } else { - rt, rterr := inboundSvc.runtimeFor(oldInbound) - if rterr != nil { + rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + if perr != nil { for email := range foundEmails { - res.perEmailSkipped[email] = rterr.Error() + res.perEmailSkipped[email] = perr.Error() delete(foundEmails, email) } } else { - for email := range foundEmails { - if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { - res.perEmailSkipped[email] = err1.Error() - delete(foundEmails, email) + if dirty { + markDirty = true + } + if push { + for email := range foundEmails { + if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { + logger.Warning("Error in deleting client on", rt.Name(), ":", err1) + markDirty = true + } } } } @@ -3136,6 +3175,10 @@ func (s *ClientService) bulkDelInboundClients( res.perEmailSkipped[email] = txErr.Error() } } + } else if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } return res @@ -3608,50 +3651,61 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model db := database.GetDB() tx := db.Begin() + markDirty := false defer func() { if err != nil { tx.Rollback() - } else { - tx.Commit() + return + } + tx.Commit() + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } }() needRestart := false - rt, rterr := inboundSvc.runtimeFor(oldInbound) - if rterr != nil { - if oldInbound.NodeID != nil { - err = rterr - return false, err - } - needRestart = true - } else if oldInbound.NodeID == nil { - for _, client := range clients { - if len(client.Email) == 0 { - needRestart = true - continue - } - inboundSvc.AddClientStat(tx, data.Id, &client) - if !client.Enable { - continue - } - cipher := "" - if oldInbound.Protocol == "shadowsocks" { - cipher = oldSettings["method"].(string) - } - err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{ - "email": client.Email, - "id": client.ID, - "auth": client.Auth, - "security": client.Security, - "flow": client.Flow, - "password": client.Password, - "cipher": cipher, - }) - if err1 == nil { - logger.Debug("Client added on", rt.Name(), ":", client.Email) - } else { - logger.Debug("Error in adding client on", rt.Name(), ":", err1) - needRestart = true + rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + if perr != nil { + err = perr + return false, err + } + if dirty { + markDirty = true + } + if oldInbound.NodeID == nil { + if !push { + needRestart = true + } else { + for _, client := range clients { + if len(client.Email) == 0 { + needRestart = true + continue + } + inboundSvc.AddClientStat(tx, data.Id, &client) + if !client.Enable { + continue + } + cipher := "" + if oldInbound.Protocol == "shadowsocks" { + cipher = oldSettings["method"].(string) + } + err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{ + "email": client.Email, + "id": client.ID, + "auth": client.Auth, + "security": client.Security, + "flow": client.Flow, + "password": client.Password, + "cipher": cipher, + }) + if err1 == nil { + logger.Debug("Client added on", rt.Name(), ":", client.Email) + } else { + logger.Debug("Error in adding client on", rt.Name(), ":", err1) + needRestart = true + } } } } else { @@ -3659,9 +3713,12 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model if len(client.Email) > 0 { inboundSvc.AddClientStat(tx, data.Id, &client) } - if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil { - err = err1 - return false, err + if push { + if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil { + logger.Warning("Error in adding client on", rt.Name(), ":", err1) + markDirty = true + push = false + } } } } @@ -3839,11 +3896,17 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo db := database.GetDB() tx := db.Begin() + markDirty := false defer func() { if err != nil { tx.Rollback() - } else { - tx.Commit() + return + } + tx.Commit() + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } }() @@ -3903,50 +3966,55 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo } needRestart := false if len(oldEmail) > 0 { - rt, rterr := inboundSvc.runtimeFor(oldInbound) - if rterr != nil { - if oldInbound.NodeID != nil { - err = rterr - return false, err - } - needRestart = true - } else if oldInbound.NodeID == nil { - if oldClients[clientIndex].Enable { - err1 := rt.RemoveUser(context.Background(), oldInbound, oldEmail) - if err1 == nil { - logger.Debug("Old client deleted on", rt.Name(), ":", oldEmail) - } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", oldEmail)) { - logger.Debug("User is already deleted. Nothing to do more...") - } else { - logger.Debug("Error in deleting client on", rt.Name(), ":", err1) - needRestart = true + rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + if perr != nil { + err = perr + return false, err + } + if dirty { + markDirty = true + } + if oldInbound.NodeID == nil { + if !push { + needRestart = true + } else { + if oldClients[clientIndex].Enable { + err1 := rt.RemoveUser(context.Background(), oldInbound, oldEmail) + if err1 == nil { + logger.Debug("Old client deleted on", rt.Name(), ":", oldEmail) + } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", oldEmail)) { + logger.Debug("User is already deleted. Nothing to do more...") + } else { + logger.Debug("Error in deleting client on", rt.Name(), ":", err1) + needRestart = true + } + } + if clients[0].Enable { + cipher := "" + if oldInbound.Protocol == "shadowsocks" { + cipher = oldSettings["method"].(string) + } + err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{ + "email": clients[0].Email, + "id": clients[0].ID, + "security": clients[0].Security, + "flow": clients[0].Flow, + "auth": clients[0].Auth, + "password": clients[0].Password, + "cipher": cipher, + }) + if err1 == nil { + logger.Debug("Client edited on", rt.Name(), ":", clients[0].Email) + } else { + logger.Debug("Error in adding client on", rt.Name(), ":", err1) + needRestart = true + } } } - if clients[0].Enable { - cipher := "" - if oldInbound.Protocol == "shadowsocks" { - cipher = oldSettings["method"].(string) - } - err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{ - "email": clients[0].Email, - "id": clients[0].ID, - "security": clients[0].Security, - "flow": clients[0].Flow, - "auth": clients[0].Auth, - "password": clients[0].Password, - "cipher": cipher, - }) - if err1 == nil { - logger.Debug("Client edited on", rt.Name(), ":", clients[0].Email) - } else { - logger.Debug("Error in adding client on", rt.Name(), ":", err1) - needRestart = true - } - } - } else { + } else if push { if err1 := rt.UpdateUser(context.Background(), oldInbound, oldEmail, clients[0]); err1 != nil { - err = err1 - return false, err + logger.Warning("Error in updating client on", rt.Name(), ":", err1) + markDirty = true } } } else { @@ -4038,6 +4106,7 @@ func (s *ClientService) DelInboundClient(inboundSvc *InboundService, inboundId i } } needRestart := false + markDirty := false if len(email) > 0 { var enables []bool @@ -4073,12 +4142,18 @@ func (s *ClientService) DelInboundClient(inboundSvc *InboundService, inboundId i } } if oldInbound.NodeID != nil && len(email) > 0 { - rt, rterr := inboundSvc.runtimeFor(oldInbound) - if rterr != nil { - return false, rterr + rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + if perr != nil { + return false, perr } - if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { - return false, err1 + if dirty { + markDirty = true + } + if push { + if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { + logger.Warning("Error in deleting client on", rt.Name(), ":", err1) + markDirty = true + } } } if err := db.Save(oldInbound).Error; err != nil { @@ -4091,6 +4166,11 @@ func (s *ClientService) DelInboundClient(inboundSvc *InboundService, inboundId i if err := s.SyncInbound(db, inboundId, finalClients); err != nil { return false, err } + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } + } return needRestart, nil } @@ -4159,6 +4239,7 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo } needRestart := false + markDirty := false if len(email) > 0 && !emailShared { if !keepTraffic { @@ -4175,25 +4256,29 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo } if needApiDel { - rt, rterr := inboundSvc.runtimeFor(oldInbound) - if rterr != nil { - if oldInbound.NodeID != nil { - return false, rterr - } - needRestart = true - } else if oldInbound.NodeID == nil { - if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 == nil { + rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + if perr != nil { + return false, perr + } + if dirty { + markDirty = true + } + if oldInbound.NodeID == nil { + if !push { + needRestart = true + } else if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 == nil { logger.Debug("Client deleted on", rt.Name(), ":", email) needRestart = false } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) { logger.Debug("User is already deleted. Nothing to do more...") } else { - logger.Debug("Error in deleting client on", rt.Name(), ":", err1) + logger.Debug("Error in deleting client on", rt.Name(), ":", email) needRestart = true } - } else { + } else if push { if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { - return false, err1 + logger.Warning("Error in deleting client on", rt.Name(), ":", err1) + markDirty = true } } } @@ -4209,6 +4294,11 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo if err := s.SyncInbound(db, inboundId, finalClients); err != nil { return false, err } + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } + } return needRestart, nil } diff --git a/web/service/client_group_node_sync_test.go b/web/service/client_group_node_sync_test.go index 01824b91..c9ddf2f2 100644 --- a/web/service/client_group_node_sync_test.go +++ b/web/service/client_group_node_sync_test.go @@ -62,7 +62,7 @@ func TestSetRemoteTraffic_PreservesPanelLocalGroupAndComment(t *testing.T) { } svc := InboundService{} - if _, err := svc.setRemoteTrafficLocked(nodeID, snap); err != nil { + if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil { t.Fatalf("setRemoteTrafficLocked: %v", err) } diff --git a/web/service/inbound.go b/web/service/inbound.go index 35e04686..33995368 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -41,6 +41,92 @@ func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) return mgr.RuntimeFor(ib.NodeID) } +func (s *InboundService) nodePushPlan(ib *model.Inbound) (runtime.Runtime, bool, bool, error) { + if ib.NodeID == nil { + rt, err := s.runtimeFor(ib) + if err != nil { + return nil, false, false, nil + } + return rt, true, false, nil + } + nodeSvc := NodeService{} + enabled, status, _, _, err := nodeSvc.NodeSyncState(*ib.NodeID) + if err != nil { + return nil, false, false, err + } + if !enabled || status == "offline" { + return nil, false, true, nil + } + rt, err := s.runtimeFor(ib) + if err != nil { + return nil, false, true, nil + } + return rt, true, false, nil +} + +func (s *InboundService) NodeIsPending(nodeID *int) bool { + if nodeID == nil { + return false + } + return (&NodeService{}).IsNodePending(*nodeID) +} + +func (s *InboundService) AnyNodePending(inboundIds []int) bool { + if len(inboundIds) == 0 { + return false + } + nodeSvc := NodeService{} + for _, id := range inboundIds { + ib, err := s.GetInbound(id) + if err != nil || ib.NodeID == nil { + continue + } + if nodeSvc.IsNodePending(*ib.NodeID) { + return true + } + } + return false +} + +func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, nodeID int) error { + if rt == nil || nodeID <= 0 { + return nil + } + db := database.GetDB() + var inbounds []*model.Inbound + if err := db.Model(model.Inbound{}).Where("node_id = ?", nodeID).Find(&inbounds).Error; err != nil { + return err + } + remoteTags, err := rt.ListRemoteTags(ctx) + if err != nil { + return err + } + prefix := nodeTagPrefix(&nodeID) + desiredTags := make(map[string]struct{}, len(inbounds)*2) + for _, ib := range inbounds { + desiredTags[ib.Tag] = struct{}{} + if prefix != "" { + if stripped, found := strings.CutPrefix(ib.Tag, prefix); found { + desiredTags[stripped] = struct{}{} + } else { + desiredTags[prefix+ib.Tag] = struct{}{} + } + } + if err := rt.UpdateInbound(ctx, ib, ib); err != nil { + return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err) + } + } + for _, tag := range remoteTags { + if _, want := desiredTags[tag]; want { + continue + } + if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil { + return fmt.Errorf("reconcile delete %q: %w", tag, err) + } + } + return nil +} + type CopyClientsResult struct { Added []string `json:"added"` Skipped []string `json:"skipped"` @@ -575,11 +661,17 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo db := database.GetDB() tx := db.Begin() + markDirty := false defer func() { - if err == nil { - tx.Commit() - } else { + if err != nil { tx.Rollback() + return + } + tx.Commit() + if markDirty && inbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } }() @@ -600,20 +692,25 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo needRestart := false if inbound.Enable { - rt, rterr := s.runtimeFor(inbound) - if rterr != nil { - err = rterr + rt, push, dirty, perr := s.nodePushPlan(inbound) + if perr != nil { + err = perr return inbound, false, err } - if err1 := rt.AddInbound(context.Background(), inbound); err1 == nil { - logger.Debug("New inbound added on", rt.Name(), ":", inbound.Tag) - } else { - logger.Debug("Unable to add inbound on", rt.Name(), ":", err1) - if inbound.NodeID != nil { - err = err1 - return inbound, false, err + if dirty { + markDirty = true + } + if push { + if err1 := rt.AddInbound(context.Background(), inbound); err1 == nil { + logger.Debug("New inbound added on", rt.Name(), ":", inbound.Tag) + } else { + logger.Debug("Unable to add inbound on", rt.Name(), ":", err1) + if inbound.NodeID != nil { + markDirty = true + } else { + needRestart = true + } } - needRestart = true } } @@ -624,24 +721,31 @@ func (s *InboundService) DelInbound(id int) (bool, error) { db := database.GetDB() needRestart := false + markDirty := false var ib model.Inbound loadErr := db.Model(model.Inbound{}).Where("id = ?", id).First(&ib).Error if loadErr == nil { shouldPushToRuntime := ib.NodeID != nil || ib.Enable if shouldPushToRuntime { - rt, rterr := s.runtimeFor(&ib) - if rterr != nil { - logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr) - if ib.NodeID == nil { - needRestart = true - } - } else if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil { - logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag) - } else { - logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1) - if ib.NodeID == nil { - needRestart = true + rt, push, dirty, perr := s.nodePushPlan(&ib) + if perr != nil { + logger.Warning("DelInbound: node lookup failed, deleting central row anyway:", perr) + markDirty = true + } else if push { + if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil { + logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag) + } else { + logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1) + if ib.NodeID == nil { + needRestart = true + } else { + markDirty = true + } } + } else if ib.NodeID == nil { + needRestart = true + } else if dirty { + markDirty = true } } else { logger.Debug("DelInbound: skipping runtime push for disabled local inbound id:", id) @@ -657,6 +761,11 @@ func (s *InboundService) DelInbound(id int) (bool, error) { if err := db.Delete(model.Inbound{}, id).Error; err != nil { return needRestart, err } + if markDirty && ib.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*ib.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } + } if !database.IsPostgres() { var count int64 if err := db.Model(&model.Inbound{}).Count(&count).Error; err != nil { @@ -740,12 +849,9 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) { inbound.Enable = enable needRestart := false - rt, rterr := s.runtimeFor(inbound) - if rterr != nil { - if inbound.NodeID != nil { - return false, rterr - } - return true, nil + rt, push, dirty, perr := s.nodePushPlan(inbound) + if perr != nil { + return false, perr } // Remote nodes interpret DelInbound as a real row delete (it hits @@ -754,13 +860,24 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) { // PATCH the remote row via UpdateInbound instead — preserves the // settings/client history and just flips the enable flag. if inbound.NodeID != nil { - if err := rt.UpdateInbound(context.Background(), inbound, inbound); err != nil { - logger.Debug("SetInboundEnable: remote UpdateInbound on", rt.Name(), "failed:", err) - return false, err + if push { + if err := rt.UpdateInbound(context.Background(), inbound, inbound); err != nil { + logger.Warning("SetInboundEnable: remote UpdateInbound on", rt.Name(), "failed:", err) + dirty = true + } + } + if dirty { + if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } return false, nil } + if !push { + return true, nil + } + if err := rt.DelInbound(context.Background(), inbound); err != nil && !strings.Contains(err.Error(), "not found") { logger.Debug("SetInboundEnable: DelInbound on", rt.Name(), "failed:", err) @@ -807,11 +924,17 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, db := database.GetDB() tx := db.Begin() + markDirty := false defer func() { if err != nil { tx.Rollback() - } else { - tx.Commit() + return + } + tx.Commit() + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } }() @@ -900,17 +1023,20 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, inbound.Tag = oldInbound.Tag needRestart := false - rt, rterr := s.runtimeFor(oldInbound) - if rterr != nil { - if oldInbound.NodeID != nil { - err = rterr - return inbound, false, err - } - needRestart = true - } else { - oldSnapshot := *oldInbound - oldSnapshot.Tag = tag - if oldInbound.NodeID == nil { + rt, push, dirty, perr := s.nodePushPlan(oldInbound) + if perr != nil { + err = perr + return inbound, false, err + } + if dirty { + markDirty = true + } + if oldInbound.NodeID == nil { + if !push { + needRestart = true + } else { + oldSnapshot := *oldInbound + oldSnapshot.Tag = tag if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil { logger.Debug("Old inbound deleted on", rt.Name(), ":", tag) } @@ -926,16 +1052,18 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, needRestart = true } } - } else { - if !inbound.Enable { - if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil { - err = err2 - return inbound, false, err - } - } else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil { - err = err2 - return inbound, false, err + } + } else if push { + oldSnapshot := *oldInbound + oldSnapshot.Tag = tag + if !inbound.Enable { + if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil { + logger.Warning("Unable to disable inbound on", rt.Name(), ":", err2) + markDirty = true } + } else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil { + logger.Warning("Unable to update inbound on", rt.Name(), ":", err2) + markDirty = true } } @@ -1303,17 +1431,17 @@ func (s *InboundService) upsertNodeBaseline(tx *gorm.DB, nodeID int, email strin }).Create(&model.NodeClientTraffic{NodeId: nodeID, Email: email, Up: up, Down: down}).Error } -func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) { +func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) { var structuralChange bool err := submitTrafficWrite(func() error { var inner error - structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap) + structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap, dirty) return inner }) return structuralChange, err } -func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) { +func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) { if snap == nil || nodeID <= 0 { return false, nil } @@ -1425,6 +1553,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi c, ok := tagToCentral[snapIb.Tag] if !ok { + if dirty { + continue + } // Try snap.Tag first; on collision fall back to the n- // prefixed form so local+node can both own the same port. pickFreeTag := func() (string, error) { @@ -1491,42 +1622,48 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs - updates := map[string]any{ - "enable": snapIb.Enable, - "remark": snapIb.Remark, - "listen": snapIb.Listen, - "port": snapIb.Port, - "protocol": snapIb.Protocol, - "total": snapIb.Total, - "expiry_time": snapIb.ExpiryTime, - "settings": snapIb.Settings, - "stream_settings": snapIb.StreamSettings, - "sniffing": snapIb.Sniffing, - "traffic_reset": snapIb.TrafficReset, + updates := map[string]any{} + if !dirty { + updates["enable"] = snapIb.Enable + updates["remark"] = snapIb.Remark + updates["listen"] = snapIb.Listen + updates["port"] = snapIb.Port + updates["protocol"] = snapIb.Protocol + updates["total"] = snapIb.Total + updates["expiry_time"] = snapIb.ExpiryTime + updates["settings"] = snapIb.Settings + updates["stream_settings"] = snapIb.StreamSettings + updates["sniffing"] = snapIb.Sniffing + updates["traffic_reset"] = snapIb.TrafficReset } if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) { updates["up"] = snapIb.Up updates["down"] = snapIb.Down } - if c.Settings != snapIb.Settings || + if !dirty && (c.Settings != snapIb.Settings || c.Remark != snapIb.Remark || c.Listen != snapIb.Listen || c.Port != snapIb.Port || c.Total != snapIb.Total || c.ExpiryTime != snapIb.ExpiryTime || - c.Enable != snapIb.Enable { + c.Enable != snapIb.Enable) { structuralChange = true } - if err := tx.Model(model.Inbound{}). - Where("id = ?", c.Id). - Updates(updates).Error; err != nil { - return false, err + if len(updates) > 0 { + if err := tx.Model(model.Inbound{}). + Where("id = ?", c.Id). + Updates(updates).Error; err != nil { + return false, err + } } } for _, c := range central { + if dirty { + continue + } if _, kept := snapTags[c.Tag]; kept { continue } @@ -1581,6 +1718,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi } if _, rowExists := existingEmails[cs.Email]; !rowExists { + if dirty { + continue + } row := &xray.ClientTraffic{ InboundId: c.Id, Email: cs.Email, @@ -1642,6 +1782,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi } for k, existing := range centralCS { + if dirty { + continue + } if k.inboundID != c.Id { continue } @@ -1673,6 +1816,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi if !ok { continue } + if dirty { + continue + } var oldEmailsRows []string if err := tx.Table("clients"). Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id"). @@ -2674,12 +2820,20 @@ func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (b } for _, client := range clients { if client.Email == clientEmail && client.Enable { - rt, rterr := s.runtimeFor(inbound) - if rterr != nil { + rt, push, dirty, perr := s.nodePushPlan(inbound) + if perr != nil { + return false, perr + } + if !push { if inbound.NodeID != nil { - return false, rterr + if dirty { + if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } + } + } else { + needRestart = true } - needRestart = true break } cipher := "" @@ -2702,6 +2856,11 @@ func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (b }) if err1 == nil { logger.Debug("Client enabled on", rt.Name(), "due to reset traffic:", clientEmail) + } else if inbound.NodeID != nil { + logger.Warning("Error in enabling client on", rt.Name(), ":", err1) + if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } else { logger.Debug("Error in enabling client on", rt.Name(), ":", err1) needRestart = true diff --git a/web/service/node.go b/web/service/node.go index de92cc8e..29e2ba7d 100644 --- a/web/service/node.go +++ b/web/service/node.go @@ -480,6 +480,50 @@ func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error { return nil } +func (s *NodeService) MarkNodeDirty(id int) error { + if id <= 0 { + return nil + } + return database.GetDB().Model(model.Node{}). + Where("id = ?", id). + Updates(map[string]any{ + "config_dirty": true, + "config_dirty_at": time.Now().UnixMilli(), + }).Error +} + +func (s *NodeService) ClearNodeDirty(id int, dirtyAt int64) error { + if id <= 0 { + return nil + } + return database.GetDB().Model(model.Node{}). + Where("id = ? AND config_dirty_at = ?", id, dirtyAt). + Update("config_dirty", false).Error +} + +func (s *NodeService) NodeSyncState(id int) (enabled bool, status string, dirty bool, dirtyAt int64, err error) { + if id <= 0 { + return false, "", false, 0, errors.New("invalid node id") + } + var row model.Node + err = database.GetDB().Model(model.Node{}). + Select("enable", "status", "config_dirty", "config_dirty_at"). + Where("id = ?", id). + First(&row).Error + if err != nil { + return false, "", false, 0, err + } + return row.Enable, row.Status, row.ConfigDirty, row.ConfigDirtyAt, nil +} + +func (s *NodeService) IsNodePending(id int) bool { + enabled, status, dirty, _, err := s.NodeSyncState(id) + if err != nil { + return false + } + return !enabled || status != "online" || dirty +} + func nodeMetricKey(id int, metric string) string { return "node:" + strconv.Itoa(id) + ":" + metric } diff --git a/web/service/node_client_traffic_sum_test.go b/web/service/node_client_traffic_sum_test.go index 0450dbfd..6762af76 100644 --- a/web/service/node_client_traffic_sum_test.go +++ b/web/service/node_client_traffic_sum_test.go @@ -36,7 +36,7 @@ func syncNode(t *testing.T, svc *InboundService, nodeID int, tag string, stats . snap := &runtime.TrafficSnapshot{ Inbounds: []*model.Inbound{{Tag: tag, ClientStats: stats}}, } - if _, err := svc.setRemoteTrafficLocked(nodeID, snap); err != nil { + if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil { t.Fatalf("setRemoteTrafficLocked node %d: %v", nodeID, err) } } diff --git a/web/service/node_dirty_test.go b/web/service/node_dirty_test.go new file mode 100644 index 00000000..8eb4a640 --- /dev/null +++ b/web/service/node_dirty_test.go @@ -0,0 +1,104 @@ +package service + +import ( + "testing" + + "github.com/mhsanaei/3x-ui/v3/database" + "github.com/mhsanaei/3x-ui/v3/database/model" + "github.com/mhsanaei/3x-ui/v3/web/runtime" +) + +// While a node is config-dirty (a local edit committed before it could be +// mirrored to the node), the traffic pull must not overwrite the central +// inbound's config columns from the node's stale snapshot — only traffic +// counters may advance. Otherwise a reconnecting node reverts the edit. +func TestSetRemoteTraffic_DirtyPreservesConfig(t *testing.T) { + setupConflictDB(t) + db := database.GetDB() + + node := &model.Node{Name: "n1", Address: "127.0.0.1", Port: 2096, ApiToken: "tok", Enable: true, Status: "online"} + if err := db.Create(node).Error; err != nil { + t.Fatalf("create node: %v", err) + } + id := node.Id + + const desiredSettings = `{"clients":[{"email":"a@x"}]}` + central := &model.Inbound{ + UserId: 1, + NodeID: &id, + Tag: "in-443-tcp", + Enable: true, + Port: 443, + Protocol: model.VLESS, + Settings: desiredSettings, + } + if err := db.Create(central).Error; err != nil { + t.Fatalf("create inbound: %v", err) + } + + snap := &runtime.TrafficSnapshot{ + Inbounds: []*model.Inbound{{ + Tag: "in-443-tcp", + Enable: true, + Port: 443, + Protocol: model.VLESS, + Settings: `{"clients":[{"email":"b@x"}]}`, + Up: 500, + Down: 700, + }}, + } + + svc := InboundService{} + if _, err := svc.setRemoteTrafficLocked(id, snap, true); err != nil { + t.Fatalf("setRemoteTrafficLocked dirty: %v", err) + } + + var got model.Inbound + if err := db.First(&got, central.Id).Error; err != nil { + t.Fatalf("reload inbound: %v", err) + } + if got.Settings != desiredSettings { + t.Fatalf("dirty pull overwrote settings: want %q got %q", desiredSettings, got.Settings) + } + if got.Up != 500 || got.Down != 700 { + t.Fatalf("traffic counters not applied while dirty: up=%d down=%d", got.Up, got.Down) + } +} + +// ClearNodeDirty must be a compare-and-swap on config_dirty_at so a concurrent +// edit that re-dirties the node during a reconcile is not silently cleared. +func TestNodeDirty_ClearIsCASOnDirtyAt(t *testing.T) { + setupConflictDB(t) + db := database.GetDB() + + node := &model.Node{Name: "n2", Address: "127.0.0.1", Port: 2096, ApiToken: "tok", Enable: true, Status: "online"} + if err := db.Create(node).Error; err != nil { + t.Fatalf("create node: %v", err) + } + + nodeSvc := NodeService{} + if err := nodeSvc.MarkNodeDirty(node.Id); err != nil { + t.Fatalf("MarkNodeDirty: %v", err) + } + _, _, dirty, dirtyAt, err := nodeSvc.NodeSyncState(node.Id) + if err != nil { + t.Fatalf("NodeSyncState: %v", err) + } + if !dirty { + t.Fatal("node should be dirty after MarkNodeDirty") + } + + if err := nodeSvc.ClearNodeDirty(node.Id, dirtyAt-1); err != nil { + t.Fatalf("ClearNodeDirty stale token: %v", err) + } + if _, _, stillDirty, _, _ := nodeSvc.NodeSyncState(node.Id); !stillDirty { + t.Fatal("stale-token clear must not clear the dirty flag") + } + + if err := nodeSvc.ClearNodeDirty(node.Id, dirtyAt); err != nil { + t.Fatalf("ClearNodeDirty matching token: %v", err) + } + if _, _, stillDirty, _, _ := nodeSvc.NodeSyncState(node.Id); stillDirty { + t.Fatal("matching-token clear must clear the dirty flag") + } +} diff --git a/web/service/node_tag_sync_test.go b/web/service/node_tag_sync_test.go index 89481387..ff34f021 100644 --- a/web/service/node_tag_sync_test.go +++ b/web/service/node_tag_sync_test.go @@ -46,7 +46,7 @@ func TestSetRemoteTraffic_KeepsInboundOnPrefixMismatch(t *testing.T) { } svc := InboundService{} - if _, err := svc.setRemoteTrafficLocked(nodeID, snap); err != nil { + if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil { t.Fatalf("setRemoteTrafficLocked: %v", err) } diff --git a/web/translation/en-US.json b/web/translation/en-US.json index 27429ecf..eb78e116 100644 --- a/web/translation/en-US.json +++ b/web/translation/en-US.json @@ -470,6 +470,7 @@ "inboundClientAddSuccess": "Inbound client(s) have been added.", "inboundClientDeleteSuccess": "Inbound client has been deleted.", "inboundClientUpdateSuccess": "Inbound client has been updated.", + "savedNodeOfflineWillSync": "Saved locally. A backing node is offline or disabled — the change will sync once it reconnects.", "delDepletedClientsSuccess": "All depleted clients have been deleted.", "resetAllClientTrafficSuccess": "Traffic for all clients has been reset.", "resetAllTrafficSuccess": "All traffic has been reset.",