diff --git a/web/controller/client.go b/web/controller/client.go index 868d6126..7a4f0d36 100644 --- a/web/controller/client.go +++ b/web/controller/client.go @@ -7,10 +7,15 @@ import ( "github.com/mhsanaei/3x-ui/v3/database/model" "github.com/mhsanaei/3x-ui/v3/web/service" + "github.com/mhsanaei/3x-ui/v3/web/websocket" "github.com/gin-gonic/gin" ) +func notifyClientsChanged() { + websocket.BroadcastInvalidate(websocket.MessageTypeClients) +} + type ClientController struct { clientService service.ClientService inboundService service.InboundService @@ -84,6 +89,7 @@ func (a *ClientController) create(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } func (a *ClientController) update(c *gin.Context) { @@ -102,6 +108,7 @@ func (a *ClientController) update(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } func (a *ClientController) delete(c *gin.Context) { @@ -116,6 +123,7 @@ func (a *ClientController) delete(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } type attachDetachBody struct { @@ -138,6 +146,7 @@ func (a *ClientController) attach(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } func (a *ClientController) resetAllTraffics(c *gin.Context) { @@ -150,6 +159,7 @@ func (a *ClientController) resetAllTraffics(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } func (a *ClientController) delDepleted(c *gin.Context) { @@ -162,6 +172,7 @@ func (a *ClientController) delDepleted(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } func (a *ClientController) resetTrafficByEmail(c *gin.Context) { @@ -175,6 +186,7 @@ func (a *ClientController) resetTrafficByEmail(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } type trafficUpdateRequest struct { @@ -194,6 +206,7 @@ func (a *ClientController) updateTrafficByEmail(c *gin.Context) { return } jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientUpdateSuccess"), nil) + notifyClientsChanged() } func (a *ClientController) getIps(c *gin.Context) { @@ -294,4 +307,5 @@ func (a *ClientController) detach(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + notifyClientsChanged() } diff --git a/web/job/node_traffic_sync_job.go b/web/job/node_traffic_sync_job.go index c2f5fa6a..542fe969 100644 --- a/web/job/node_traffic_sync_job.go +++ b/web/job/node_traffic_sync_job.go @@ -18,10 +18,12 @@ const ( ) type NodeTrafficSyncJob struct { - nodeService service.NodeService - inboundService service.InboundService - running sync.Mutex - structural atomicBool + nodeService service.NodeService + inboundService service.InboundService + settingService service.SettingService + xrayService service.XrayService + running sync.Mutex + structural atomicBool } type atomicBool struct { @@ -83,6 +85,22 @@ func (j *NodeTrafficSyncJob) Run() { } wg.Wait() + _, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil) + if err != nil { + logger.Warning("node traffic sync: depletion check failed:", err) + } + if clientsDisabled { + if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable { + if err := j.xrayService.RestartXray(true); err != nil { + logger.Warning("node traffic sync: restart xray after disabling clients failed:", err) + j.xrayService.SetToNeedRestart() + } + } else if settingErr != nil { + logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr) + } + j.structural.set() + } + if !websocket.HasClients() { return } @@ -123,6 +141,7 @@ func (j *NodeTrafficSyncJob) Run() { if j.structural.takeAndReset() { websocket.BroadcastInvalidate(websocket.MessageTypeInbounds) + websocket.BroadcastInvalidate(websocket.MessageTypeClients) } } diff --git a/web/runtime/local.go b/web/runtime/local.go index f970ae28..487e075f 100644 --- a/web/runtime/local.go +++ b/web/runtime/local.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "strings" "sync" "github.com/mhsanaei/3x-ui/v3/database/model" @@ -78,6 +79,54 @@ func (l *Local) RemoveUser(_ context.Context, ib *model.Inbound, email string) e }) } +func (l *Local) AddClient(ctx context.Context, ib *model.Inbound, client model.Client) error { + if !client.Enable { + return nil + } + user := map[string]any{ + "email": client.Email, + "id": client.ID, + "security": client.Security, + "flow": client.Flow, + "auth": client.Auth, + "password": client.Password, + } + return l.AddUser(ctx, ib, user) +} + +func (l *Local) DeleteUser(ctx context.Context, ib *model.Inbound, email string) error { + if email == "" { + return nil + } + if err := l.RemoveUser(ctx, ib, email); err != nil { + if strings.Contains(err.Error(), "not found") { + return nil + } + return err + } + return nil +} + +func (l *Local) UpdateUser(ctx context.Context, ib *model.Inbound, oldEmail string, payload model.Client) error { + if oldEmail != "" { + if err := l.RemoveUser(ctx, ib, oldEmail); err != nil && !strings.Contains(err.Error(), "not found") { + return err + } + } + if !payload.Enable { + return nil + } + user := map[string]any{ + "email": payload.Email, + "id": payload.ID, + "security": payload.Security, + "flow": payload.Flow, + "auth": payload.Auth, + "password": payload.Password, + } + return l.AddUser(ctx, ib, user) +} + func (l *Local) RestartXray(_ context.Context) error { if l.deps.SetNeedRestart != nil { l.deps.SetNeedRestart() diff --git a/web/runtime/remote.go b/web/runtime/remote.go index 3da7671f..44f82a1a 100644 --- a/web/runtime/remote.go +++ b/web/runtime/remote.go @@ -257,6 +257,50 @@ func (r *Remote) RemoveUser(ctx context.Context, ib *model.Inbound, _ string) er return r.UpdateInbound(ctx, ib, ib) } +func (r *Remote) AddClient(ctx context.Context, ib *model.Inbound, client model.Client) error { + id, err := r.resolveRemoteID(ctx, ib.Tag) + if err != nil { + return fmt.Errorf("remote AddClient: resolve tag %q: %w", ib.Tag, err) + } + payload := map[string]any{ + "client": client, + "inboundIds": []int{id}, + } + if _, err := r.do(ctx, http.MethodPost, "panel/api/clients/add", payload); err != nil { + return err + } + return nil +} + +// DeleteUser is idempotent: master's per-inbound Delete loop may call it +// multiple times for the same node, and "not found" on the follow-ups is +// the expected success path. +func (r *Remote) DeleteUser(ctx context.Context, _ *model.Inbound, email string) error { + if email == "" { + return nil + } + _, err := r.do(ctx, http.MethodPost, + "panel/api/clients/del/"+url.PathEscape(email), nil) + if err == nil { + return nil + } + if strings.Contains(strings.ToLower(err.Error()), "not found") { + return nil + } + return err +} + +func (r *Remote) UpdateUser(ctx context.Context, _ *model.Inbound, oldEmail string, payload model.Client) error { + if oldEmail == "" { + oldEmail = payload.Email + } + if _, err := r.do(ctx, http.MethodPost, + "panel/api/clients/update/"+url.PathEscape(oldEmail), payload); err != nil { + return err + } + return nil +} + func (r *Remote) RestartXray(ctx context.Context) error { _, err := r.do(ctx, http.MethodPost, "panel/api/server/restartXrayService", nil) return err diff --git a/web/runtime/runtime.go b/web/runtime/runtime.go index 51dd5f4c..7c7c60c9 100644 --- a/web/runtime/runtime.go +++ b/web/runtime/runtime.go @@ -16,6 +16,13 @@ type Runtime interface { AddUser(ctx context.Context, ib *model.Inbound, userMap map[string]any) error RemoveUser(ctx context.Context, ib *model.Inbound, email string) error + // Per-client operations that route through the node's clients API on + // Remote (instead of pushing the whole inbound) so the node applies + // per-user xray API calls without a DelInbound+AddInbound cycle. + UpdateUser(ctx context.Context, ib *model.Inbound, email string, payload model.Client) error + DeleteUser(ctx context.Context, ib *model.Inbound, email string) error + AddClient(ctx context.Context, ib *model.Inbound, client model.Client) error + RestartXray(ctx context.Context) error ResetClientTraffic(ctx context.Context, ib *model.Inbound, email string) error diff --git a/web/service/client.go b/web/service/client.go index 9086f80b..ec9959b4 100644 --- a/web/service/client.go +++ b/web/service/client.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/google/uuid" @@ -71,6 +72,47 @@ func clientKeyForProtocol(p model.Protocol, rec *model.ClientRecord) string { type ClientService struct{} +// Short-lived tombstone of just-deleted client emails so that a node snapshot +// arriving between delete and node-side processing doesn't resurrect them. +var ( + recentlyDeletedMu sync.Mutex + recentlyDeleted = map[string]time.Time{} +) + +const deleteTombstoneTTL = 90 * time.Second + +func tombstoneClientEmail(email string) { + if email == "" { + return + } + recentlyDeletedMu.Lock() + defer recentlyDeletedMu.Unlock() + recentlyDeleted[email] = time.Now() + cutoff := time.Now().Add(-deleteTombstoneTTL) + for e, ts := range recentlyDeleted { + if ts.Before(cutoff) { + delete(recentlyDeleted, e) + } + } +} + +func isClientEmailTombstoned(email string) bool { + if email == "" { + return false + } + recentlyDeletedMu.Lock() + defer recentlyDeletedMu.Unlock() + ts, ok := recentlyDeleted[email] + if !ok { + return false + } + if time.Since(ts) > deleteTombstoneTTL { + delete(recentlyDeleted, email) + return false + } + return true +} + func (s *ClientService) SyncInbound(tx *gorm.DB, inboundId int, clients []model.Client) error { if tx == nil { tx = database.GetDB() @@ -417,6 +459,8 @@ func (s *ClientService) Delete(inboundSvc *InboundService, id int, keepTraffic b if err != nil { return false, err } + tombstoneClientEmail(existing.Email) + inboundIds, err := s.GetInboundIdsForRecord(id) if err != nil { return false, err @@ -893,10 +937,10 @@ func (s *ClientService) AddInboundClient(inboundSvc *InboundService, data *model if len(client.Email) > 0 { inboundSvc.AddClientStat(tx, data.Id, &client) } - } - if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil { - err = err1 - return false, err + if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil { + err = err1 + return false, err + } } } @@ -1139,7 +1183,7 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo } } } else { - if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil { + if err1 := rt.UpdateUser(context.Background(), oldInbound, oldEmail, clients[0]); err1 != nil { err = err1 return false, err } @@ -1246,14 +1290,11 @@ func (s *ClientService) DelInboundClient(inboundSvc *InboundService, inboundId i return false, err } } - if needApiDel && notDepleted { + if needApiDel && notDepleted && oldInbound.NodeID == nil { rt, rterr := inboundSvc.runtimeFor(oldInbound) if rterr != nil { - if oldInbound.NodeID != nil { - return false, rterr - } needRestart = true - } else if oldInbound.NodeID == nil { + } else { err1 := rt.RemoveUser(context.Background(), oldInbound, email) if err1 == nil { logger.Debug("Client deleted on", rt.Name(), ":", email) @@ -1264,13 +1305,18 @@ func (s *ClientService) DelInboundClient(inboundSvc *InboundService, inboundId i logger.Debug("Error in deleting client on", rt.Name(), ":", err1) needRestart = true } - } else { - if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil { - return false, err1 - } } } } + if oldInbound.NodeID != nil && len(email) > 0 { + rt, rterr := inboundSvc.runtimeFor(oldInbound) + if rterr != nil { + return false, rterr + } + if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { + return false, err1 + } + } if err := db.Save(oldInbound).Error; err != nil { return false, err } @@ -1378,7 +1424,7 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo needRestart = true } } else { - if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil { + if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { return false, err1 } } diff --git a/web/service/inbound.go b/web/service/inbound.go index 44b13983..35b65084 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -388,24 +388,29 @@ func (s *InboundService) DelInbound(id int) (bool, error) { needRestart := false var ib model.Inbound - loadErr := db.Model(model.Inbound{}).Where("id = ? and enable = ?", id, true).First(&ib).Error + loadErr := db.Model(model.Inbound{}).Where("id = ?", id).First(&ib).Error if loadErr == nil { - rt, rterr := s.runtimeFor(&ib) - if rterr != nil { - logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr) - if ib.NodeID == nil { - needRestart = true + shouldPushToRuntime := ib.NodeID != nil || ib.Enable + if shouldPushToRuntime { + rt, rterr := s.runtimeFor(&ib) + if rterr != nil { + logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr) + if ib.NodeID == nil { + needRestart = true + } + } else if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil { + logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag) + } else { + logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1) + if ib.NodeID == nil { + needRestart = true + } } - } else if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil { - logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag) } else { - logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1) - if ib.NodeID == nil { - needRestart = true - } + logger.Debug("DelInbound: skipping runtime push for disabled local inbound id:", id) } } else { - logger.Debug("No enabled inbound found to remove by api, id:", id) + logger.Debug("DelInbound: inbound not found, id:", id) } // Delete client traffics of inbounds @@ -1280,16 +1285,47 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi if !ok { continue } + clients, gcErr := s.GetClients(snapIb) if gcErr != nil { logger.Warning("setRemoteTraffic: parse clients for tag", snapIb.Tag, "failed:", gcErr) continue } - if err := s.clientService.SyncInbound(tx, c.Id, clients); err != nil { + csEnableByEmail := make(map[string]bool, len(snapIb.ClientStats)) + for _, cs := range snapIb.ClientStats { + csEnableByEmail[cs.Email] = cs.Enable + } + filtered := clients[:0] + for i := range clients { + if isClientEmailTombstoned(clients[i].Email) { + continue + } + if cse, hit := csEnableByEmail[clients[i].Email]; hit && !cse { + clients[i].Enable = false + } + filtered = append(filtered, clients[i]) + } + if err := s.clientService.SyncInbound(tx, c.Id, filtered); err != nil { logger.Warning("setRemoteTraffic: sync clients for tag", snapIb.Tag, "failed:", err) } } + var orphanEmails []string + if err := tx.Table("clients"). + Joins("LEFT JOIN client_inbounds ON client_inbounds.client_id = clients.id"). + Where("client_inbounds.client_id IS NULL"). + Pluck("clients.email", &orphanEmails).Error; err != nil { + logger.Warning("setRemoteTraffic: orphan sweep query failed:", err) + } else if len(orphanEmails) > 0 { + if err := tx.Where("email IN ?", orphanEmails).Delete(&model.ClientRecord{}).Error; err != nil { + logger.Warning("setRemoteTraffic: orphan sweep delete ClientRecord failed:", err) + } + if err := tx.Where("email IN ?", orphanEmails).Delete(&xray.ClientTraffic{}).Error; err != nil { + logger.Warning("setRemoteTraffic: orphan sweep delete ClientTraffic failed:", err) + } + structuralChange = true + } + if err := tx.Commit().Error; err != nil { return false, err } @@ -1709,10 +1745,15 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) } var localTargets []target + localByInbound := make(map[int]map[string]struct{}) remoteByInbound := make(map[int][]target) for _, t := range targets { if t.NodeID == nil { localTargets = append(localTargets, t) + if localByInbound[t.InboundID] == nil { + localByInbound[t.InboundID] = make(map[string]struct{}) + } + localByInbound[t.InboundID][t.Email] = struct{}{} } else { remoteByInbound[t.InboundID] = append(remoteByInbound[t.InboundID], t) } @@ -1734,6 +1775,12 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) s.xrayApi.Close() } + for inboundID, emails := range localByInbound { + if _, _, mErr := s.markClientsDisabledInSettings(tx, inboundID, emails); mErr != nil { + logger.Warning("disableInvalidClients: settings.JSON sync failed for inbound", inboundID, ":", mErr) + } + } + result := tx.Model(xray.ClientTraffic{}). Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true). Update("enable", false) @@ -1765,25 +1812,23 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) return needRestart, count, nil } -// disableRemoteClients marks the given emails as disabled in the stored -// inbound.settings JSON and pushes the updated inbound to the remote node so -// the remote's UpdateClientStat sets xray_client_traffic.enable=false and -// SyncInbound sets clients.enable=false on the remote side. Without this -// the remote's own auto-disable would be reverted whenever the central -// pushes any inbound update later. -func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error { +// markClientsDisabledInSettings flips client.enable=false in the inbound's +// stored settings JSON for the given emails and returns both the pre and +// post snapshots so a caller pushing to a remote node has the diff to hand. +func (s *InboundService) markClientsDisabledInSettings(tx *gorm.DB, inboundID int, emails map[string]struct{}) (oldIb, newIb *model.Inbound, err error) { var ib model.Inbound if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).First(&ib).Error; err != nil { - return err + return nil, nil, err } - oldSnapshot := ib + snapshot := ib settings := map[string]any{} if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil { - return err + return nil, nil, err } clients, _ := settings["clients"].([]any) now := time.Now().Unix() * 1000 + mutated := false for i := range clients { entry, ok := clients[i].(map[string]any) if !ok { @@ -1793,26 +1838,41 @@ func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails if _, hit := emails[email]; !hit { continue } + if cur, _ := entry["enable"].(bool); cur == false { + continue + } entry["enable"] = false entry["updated_at"] = now clients[i] = entry + mutated = true + } + if !mutated { + return &snapshot, &ib, nil } settings["clients"] = clients - bs, err := json.MarshalIndent(settings, "", " ") - if err != nil { - return err + bs, marshalErr := json.MarshalIndent(settings, "", " ") + if marshalErr != nil { + return nil, nil, marshalErr } ib.Settings = string(bs) if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID). Update("settings", ib.Settings).Error; err != nil { - return err + return nil, nil, err } + return &snapshot, &ib, nil +} - rt, err := s.runtimeFor(&ib) +func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error { + oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails) if err != nil { return err } - if err := rt.UpdateInbound(context.Background(), &oldSnapshot, &ib); err != nil { + + rt, err := s.runtimeFor(ib) + if err != nil { + return err + } + if err := rt.UpdateInbound(context.Background(), oldSnapshot, ib); err != nil { return err } return nil diff --git a/web/websocket/hub.go b/web/websocket/hub.go index 5eeb80da..6df470d9 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -21,36 +21,17 @@ const ( MessageTypeNodes MessageType = "nodes" MessageTypeNotification MessageType = "notification" MessageTypeXrayState MessageType = "xray_state" - // MessageTypeClientStats carries absolute traffic counters for the clients - // that had activity in the latest collection window. Frontend applies these - // in-place — far smaller than re-broadcasting the full inbound list and - // scales to 10k+ clients without falling back to REST. - MessageTypeClientStats MessageType = "client_stats" - MessageTypeInvalidate MessageType = "invalidate" // Tells frontend to re-fetch via REST (last-resort). + MessageTypeClientStats MessageType = "client_stats" + MessageTypeClients MessageType = "clients" + MessageTypeInvalidate MessageType = "invalidate" + maxMessageSize = 10 * 1024 * 1024 // 10MB - // maxMessageSize caps the WebSocket payload. Beyond this the hub sends a - // lightweight invalidate signal and the frontend re-fetches via REST. - // 10MB lets typical 2k–8k-client deployments push directly via WS (low - // latency); larger installs fall back to invalidate. - maxMessageSize = 10 * 1024 * 1024 // 10MB - - enqueueTimeout = 100 * time.Millisecond - clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser. - hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts. - hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms). - - // minBroadcastInterval throttles per-type broadcasts so cron storms or - // rapid mutations cannot drown the hub. Bursts within the interval are - // dropped (not coalesced); the next broadcast outside the window delivers - // the latest state. Only message types in throttledMessageTypes are gated — - // heartbeat and one-shot signals (status, notification, xray_state, - // invalidate) bypass this so they are never delayed. + enqueueTimeout = 100 * time.Millisecond + clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser. + hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts. + hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms). minBroadcastInterval = 250 * time.Millisecond - - // hubRestartAttempts caps panic-recovery restarts. After this many - // consecutive failures we stop trying and log; the panel keeps running - // (frontend falls back to REST polling) and the operator can investigate. - hubRestartAttempts = 3 + hubRestartAttempts = 3 ) // NewClient builds a Client ready for hub registration.