fix(clients): stop node sync from resurrecting deleted clients

Several related issues around node-managed clients:

- Remote runtime: drop the per-inbound resetAllClientTraffics path
  and point traffic/onlines/lastOnline fetches at the new
  /panel/api/clients/* routes.
- Delete from master: always push the updated inbound to the node
  even when the client was already disabled or depleted, so the
  node actually loses the user instead of silently keeping it.
- setRemoteTraffic: mirror remote clients into the central tables
  only on first discovery of a node inbound. Matched inbounds let
  the master own the join table, so a stale snap can no longer
  re-create a ClientRecord (and join row) for a client that was
  just deleted on the master.
- ClientService.Delete: route through submitTrafficWrite so deletes
  serialize with node traffic merges, and switch the final
  ClientRecord delete to an explicit Where("id = ?") clause.
- setRemoteTraffic UNIQUE-constraint fix: use clause.OnConflict on
  inserts and email-keyed UPDATEs for client_traffics, so mirroring
  a snap doesn't trip the unique email index.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
MHSanaei 2026-05-17 15:44:33 +02:00
parent 17433c39f4
commit 79fb392a58
No known key found for this signature in database
GPG key ID: 7E4060F2FBE5AB7A
5 changed files with 121 additions and 81 deletions

View file

@ -89,10 +89,6 @@ func (l *Local) ResetClientTraffic(_ context.Context, _ *model.Inbound, _ string
return nil
}
func (l *Local) ResetInboundClientTraffics(_ context.Context, _ *model.Inbound) error {
return nil
}
func (l *Local) ResetAllTraffics(_ context.Context) error {
return nil
}

View file

@ -262,26 +262,9 @@ func (r *Remote) RestartXray(ctx context.Context) error {
return err
}
func (r *Remote) ResetClientTraffic(ctx context.Context, ib *model.Inbound, email string) error {
id, err := r.resolveRemoteID(ctx, ib.Tag)
if err != nil {
logger.Warning("remote ResetClientTraffic: tag", ib.Tag, "not found on", r.node.Name)
return nil
}
_, err = r.do(ctx, http.MethodPost,
fmt.Sprintf("panel/api/inbounds/%d/resetClientTraffic/%s", id, url.PathEscape(email)),
nil)
return err
}
func (r *Remote) ResetInboundClientTraffics(ctx context.Context, ib *model.Inbound) error {
id, err := r.resolveRemoteID(ctx, ib.Tag)
if err != nil {
logger.Warning("remote ResetInboundClientTraffics: tag", ib.Tag, "not found on", r.node.Name)
return nil
}
_, err = r.do(ctx, http.MethodPost,
fmt.Sprintf("panel/api/inbounds/resetAllClientTraffics/%d", id), nil)
func (r *Remote) ResetClientTraffic(ctx context.Context, _ *model.Inbound, email string) error {
_, err := r.do(ctx, http.MethodPost,
"panel/api/clients/resetTraffic/"+url.PathEscape(email), nil)
return err
}
@ -307,14 +290,14 @@ func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, er
return nil, fmt.Errorf("decode inbound list: %w", err)
}
envOnlines, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/onlines", nil)
envOnlines, err := r.do(ctx, http.MethodPost, "panel/api/clients/onlines", nil)
if err != nil {
logger.Warning("remote", r.node.Name, "onlines fetch failed:", err)
} else if len(envOnlines.Obj) > 0 {
_ = json.Unmarshal(envOnlines.Obj, &snap.OnlineEmails)
}
envLastOnline, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/lastOnline", nil)
envLastOnline, err := r.do(ctx, http.MethodPost, "panel/api/clients/lastOnline", nil)
if err != nil {
logger.Warning("remote", r.node.Name, "lastOnline fetch failed:", err)
} else if len(envLastOnline.Obj) > 0 {

View file

@ -19,6 +19,5 @@ type Runtime interface {
RestartXray(ctx context.Context) error
ResetClientTraffic(ctx context.Context, ib *model.Inbound, email string) error
ResetInboundClientTraffics(ctx context.Context, ib *model.Inbound) error
ResetAllTraffics(ctx context.Context) error
}

View file

@ -610,27 +610,6 @@ func (s *ClientService) resetAllClientTrafficsLocked(inboundSvc *InboundService,
}); err != nil {
return err
}
var inbounds []model.Inbound
q := db.Model(model.Inbound{}).Where("node_id IS NOT NULL")
if id != -1 {
q = q.Where("id = ?", id)
}
if err := q.Find(&inbounds).Error; err != nil {
logger.Warning("ResetAllClientTraffics: discover node inbounds failed:", err)
return nil
}
for i := range inbounds {
ib := &inbounds[i]
rt, rterr := inboundSvc.runtimeFor(ib)
if rterr != nil {
logger.Warning("ResetAllClientTraffics: runtime lookup for inbound", ib.Id, "failed:", rterr)
continue
}
if e := rt.ResetInboundClientTraffics(context.Background(), ib); e != nil {
logger.Warning("ResetAllClientTraffics: remote propagation to", rt.Name(), "failed:", e)
}
}
return nil
}

View file

@ -1070,8 +1070,10 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
email string
}
centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
centralCSByEmail := make(map[string]*xray.ClientTraffic, len(centralClientStats))
for i := range centralClientStats {
centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = &centralClientStats[i]
centralCSByEmail[centralClientStats[i].Email] = &centralClientStats[i]
}
var defaultUserId int
@ -1201,7 +1203,10 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
existing := centralCS[csKey{c.Id, cs.Email}]
if existing == nil {
if err := tx.Create(&xray.ClientTraffic{
existing = centralCSByEmail[cs.Email]
}
if existing == nil {
row := &xray.ClientTraffic{
InboundId: c.Id,
Email: cs.Email,
Enable: cs.Enable,
@ -1211,9 +1216,13 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
Up: cs.Up,
Down: cs.Down,
LastOnline: cs.LastOnline,
}).Error; err != nil {
}
if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
Create(row).Error; err != nil {
return false, err
}
centralCS[csKey{c.Id, cs.Email}] = row
centralCSByEmail[cs.Email] = row
structuralChange = true
continue
}
@ -1229,8 +1238,8 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
if err := tx.Exec(
`UPDATE client_traffics
SET enable = ?, total = ?, expiry_time = ?, reset = ?
WHERE inbound_id = ? AND email = ?`,
cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, c.Id, cs.Email,
WHERE email = ?`,
cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, cs.Email,
).Error; err != nil {
return false, err
}
@ -1241,9 +1250,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
`UPDATE client_traffics
SET up = ?, down = ?, enable = ?, total = ?, expiry_time = ?, reset = ?,
last_online = MAX(last_online, ?)
WHERE inbound_id = ? AND email = ?`,
WHERE email = ?`,
cs.Up, cs.Down, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset,
cs.LastOnline, c.Id, cs.Email,
cs.LastOnline, cs.Email,
).Error; err != nil {
return false, err
}
@ -1264,6 +1273,24 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
}
}
for _, snapIb := range snap.Inbounds {
if snapIb == nil {
continue
}
c, ok := tagToCentral[snapIb.Tag]
if !ok {
continue
}
clients, gcErr := s.GetClients(snapIb)
if gcErr != nil {
logger.Warning("setRemoteTraffic: parse clients for tag", snapIb.Tag, "failed:", gcErr)
continue
}
if err := s.clientService.SyncInbound(tx, c.Id, clients); err != nil {
logger.Warning("setRemoteTraffic: sync clients for tag", snapIb.Tag, "failed:", err)
}
}
if err := tx.Commit().Error; err != nil {
return false, err
}
@ -1645,7 +1672,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
var depletedRows []xray.ClientTraffic
err := tx.Model(xray.ClientTraffic{}).
Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
Find(&depletedRows).Error
if err != nil {
return false, 0, err
@ -1663,27 +1689,39 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
}
type target struct {
Tag string
Email string
InboundID int `gorm:"column:inbound_id"`
NodeID *int `gorm:"column:node_id"`
Tag string
Email string
}
var targets []target
if len(depletedEmails) > 0 {
err = tx.Raw(`
SELECT inbounds.tag AS tag, clients.email AS email
SELECT inbounds.id AS inbound_id, inbounds.node_id AS node_id,
inbounds.tag AS tag, clients.email AS email
FROM clients
JOIN client_inbounds ON client_inbounds.client_id = clients.id
JOIN inbounds ON inbounds.id = client_inbounds.inbound_id
WHERE inbounds.node_id IS NULL
AND clients.email IN ?
WHERE clients.email IN ?
`, depletedEmails).Scan(&targets).Error
if err != nil {
return false, 0, err
}
}
if p != nil && len(targets) > 0 {
var localTargets []target
remoteByInbound := make(map[int][]target)
for _, t := range targets {
if t.NodeID == nil {
localTargets = append(localTargets, t)
} else {
remoteByInbound[t.InboundID] = append(remoteByInbound[t.InboundID], t)
}
}
if p != nil && len(localTargets) > 0 {
s.xrayApi.Init(p.GetAPIPort())
for _, t := range targets {
for _, t := range localTargets {
err1 := s.xrayApi.RemoveUser(t.Tag, t.Email)
if err1 == nil {
logger.Debug("Client disabled by api:", t.Email)
@ -1699,7 +1737,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
result := tx.Model(xray.ClientTraffic{}).
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
Update("enable", false)
err = result.Error
count := result.RowsAffected
@ -1715,9 +1752,73 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
}
}
for inboundID, group := range remoteByInbound {
emails := make(map[string]struct{}, len(group))
for _, t := range group {
emails[t.Email] = struct{}{}
}
if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil {
logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr)
needRestart = true
}
}
return needRestart, count, nil
}
// disableRemoteClients marks the given emails as disabled in the stored
// inbound.settings JSON and pushes the updated inbound to the remote node so
// the remote's UpdateClientStat sets xray_client_traffic.enable=false and
// SyncInbound sets clients.enable=false on the remote side. Without this
// the remote's own auto-disable would be reverted whenever the central
// pushes any inbound update later.
func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error {
var ib model.Inbound
if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).First(&ib).Error; err != nil {
return err
}
oldSnapshot := ib
settings := map[string]any{}
if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil {
return err
}
clients, _ := settings["clients"].([]any)
now := time.Now().Unix() * 1000
for i := range clients {
entry, ok := clients[i].(map[string]any)
if !ok {
continue
}
email, _ := entry["email"].(string)
if _, hit := emails[email]; !hit {
continue
}
entry["enable"] = false
entry["updated_at"] = now
clients[i] = entry
}
settings["clients"] = clients
bs, err := json.MarshalIndent(settings, "", " ")
if err != nil {
return err
}
ib.Settings = string(bs)
if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).
Update("settings", ib.Settings).Error; err != nil {
return err
}
rt, err := s.runtimeFor(&ib)
if err != nil {
return err
}
if err := rt.UpdateInbound(context.Background(), &oldSnapshot, &ib); err != nil {
return err
}
return nil
}
func (s *InboundService) GetInboundTags() (string, error) {
db := database.GetDB()
var inboundTags []string
@ -2005,24 +2106,6 @@ func (s *InboundService) resetAllTrafficsLocked() error {
return err
}
var inbounds []model.Inbound
if err := db.Model(model.Inbound{}).
Where("node_id IS NOT NULL").
Find(&inbounds).Error; err != nil {
logger.Warning("ResetAllTraffics: discover node inbounds failed:", err)
return nil
}
for i := range inbounds {
ib := &inbounds[i]
rt, rterr := s.runtimeFor(ib)
if rterr != nil {
logger.Warning("ResetAllTraffics: runtime lookup for inbound", ib.Id, "failed:", rterr)
continue
}
if e := rt.ResetInboundClientTraffics(context.Background(), ib); e != nil {
logger.Warning("ResetAllTraffics: remote propagation to", rt.Name(), "failed:", e)
}
}
return nil
}