diff --git a/web/runtime/local.go b/web/runtime/local.go index b50cb9de..f970ae28 100644 --- a/web/runtime/local.go +++ b/web/runtime/local.go @@ -89,10 +89,6 @@ func (l *Local) ResetClientTraffic(_ context.Context, _ *model.Inbound, _ string return nil } -func (l *Local) ResetInboundClientTraffics(_ context.Context, _ *model.Inbound) error { - return nil -} - func (l *Local) ResetAllTraffics(_ context.Context) error { return nil } diff --git a/web/runtime/remote.go b/web/runtime/remote.go index 9cc83f32..3da7671f 100644 --- a/web/runtime/remote.go +++ b/web/runtime/remote.go @@ -262,26 +262,9 @@ func (r *Remote) RestartXray(ctx context.Context) error { return err } -func (r *Remote) ResetClientTraffic(ctx context.Context, ib *model.Inbound, email string) error { - id, err := r.resolveRemoteID(ctx, ib.Tag) - if err != nil { - logger.Warning("remote ResetClientTraffic: tag", ib.Tag, "not found on", r.node.Name) - return nil - } - _, err = r.do(ctx, http.MethodPost, - fmt.Sprintf("panel/api/inbounds/%d/resetClientTraffic/%s", id, url.PathEscape(email)), - nil) - return err -} - -func (r *Remote) ResetInboundClientTraffics(ctx context.Context, ib *model.Inbound) error { - id, err := r.resolveRemoteID(ctx, ib.Tag) - if err != nil { - logger.Warning("remote ResetInboundClientTraffics: tag", ib.Tag, "not found on", r.node.Name) - return nil - } - _, err = r.do(ctx, http.MethodPost, - fmt.Sprintf("panel/api/inbounds/resetAllClientTraffics/%d", id), nil) +func (r *Remote) ResetClientTraffic(ctx context.Context, _ *model.Inbound, email string) error { + _, err := r.do(ctx, http.MethodPost, + "panel/api/clients/resetTraffic/"+url.PathEscape(email), nil) return err } @@ -307,14 +290,14 @@ func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, er return nil, fmt.Errorf("decode inbound list: %w", err) } - envOnlines, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/onlines", nil) + envOnlines, err := r.do(ctx, http.MethodPost, "panel/api/clients/onlines", nil) if err != nil { logger.Warning("remote", r.node.Name, "onlines fetch failed:", err) } else if len(envOnlines.Obj) > 0 { _ = json.Unmarshal(envOnlines.Obj, &snap.OnlineEmails) } - envLastOnline, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/lastOnline", nil) + envLastOnline, err := r.do(ctx, http.MethodPost, "panel/api/clients/lastOnline", nil) if err != nil { logger.Warning("remote", r.node.Name, "lastOnline fetch failed:", err) } else if len(envLastOnline.Obj) > 0 { diff --git a/web/runtime/runtime.go b/web/runtime/runtime.go index f7f91e83..51dd5f4c 100644 --- a/web/runtime/runtime.go +++ b/web/runtime/runtime.go @@ -19,6 +19,5 @@ type Runtime interface { RestartXray(ctx context.Context) error ResetClientTraffic(ctx context.Context, ib *model.Inbound, email string) error - ResetInboundClientTraffics(ctx context.Context, ib *model.Inbound) error ResetAllTraffics(ctx context.Context) error } diff --git a/web/service/client.go b/web/service/client.go index a3e7d935..1f45481a 100644 --- a/web/service/client.go +++ b/web/service/client.go @@ -610,27 +610,6 @@ func (s *ClientService) resetAllClientTrafficsLocked(inboundSvc *InboundService, }); err != nil { return err } - - var inbounds []model.Inbound - q := db.Model(model.Inbound{}).Where("node_id IS NOT NULL") - if id != -1 { - q = q.Where("id = ?", id) - } - if err := q.Find(&inbounds).Error; err != nil { - logger.Warning("ResetAllClientTraffics: discover node inbounds failed:", err) - return nil - } - for i := range inbounds { - ib := &inbounds[i] - rt, rterr := inboundSvc.runtimeFor(ib) - if rterr != nil { - logger.Warning("ResetAllClientTraffics: runtime lookup for inbound", ib.Id, "failed:", rterr) - continue - } - if e := rt.ResetInboundClientTraffics(context.Background(), ib); e != nil { - logger.Warning("ResetAllClientTraffics: remote propagation to", rt.Name(), "failed:", e) - } - } return nil } diff --git a/web/service/inbound.go b/web/service/inbound.go index 85d97f0a..54fe96ae 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -1070,8 +1070,10 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi email string } centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats)) + centralCSByEmail := make(map[string]*xray.ClientTraffic, len(centralClientStats)) for i := range centralClientStats { centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = ¢ralClientStats[i] + centralCSByEmail[centralClientStats[i].Email] = ¢ralClientStats[i] } var defaultUserId int @@ -1201,7 +1203,10 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi existing := centralCS[csKey{c.Id, cs.Email}] if existing == nil { - if err := tx.Create(&xray.ClientTraffic{ + existing = centralCSByEmail[cs.Email] + } + if existing == nil { + row := &xray.ClientTraffic{ InboundId: c.Id, Email: cs.Email, Enable: cs.Enable, @@ -1211,9 +1216,13 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi Up: cs.Up, Down: cs.Down, LastOnline: cs.LastOnline, - }).Error; err != nil { + } + if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}). + Create(row).Error; err != nil { return false, err } + centralCS[csKey{c.Id, cs.Email}] = row + centralCSByEmail[cs.Email] = row structuralChange = true continue } @@ -1229,8 +1238,8 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi if err := tx.Exec( `UPDATE client_traffics SET enable = ?, total = ?, expiry_time = ?, reset = ? - WHERE inbound_id = ? AND email = ?`, - cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, c.Id, cs.Email, + WHERE email = ?`, + cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, cs.Email, ).Error; err != nil { return false, err } @@ -1241,9 +1250,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi `UPDATE client_traffics SET up = ?, down = ?, enable = ?, total = ?, expiry_time = ?, reset = ?, last_online = MAX(last_online, ?) - WHERE inbound_id = ? AND email = ?`, + WHERE email = ?`, cs.Up, cs.Down, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, - cs.LastOnline, c.Id, cs.Email, + cs.LastOnline, cs.Email, ).Error; err != nil { return false, err } @@ -1264,6 +1273,24 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi } } + for _, snapIb := range snap.Inbounds { + if snapIb == nil { + continue + } + c, ok := tagToCentral[snapIb.Tag] + if !ok { + continue + } + clients, gcErr := s.GetClients(snapIb) + if gcErr != nil { + logger.Warning("setRemoteTraffic: parse clients for tag", snapIb.Tag, "failed:", gcErr) + continue + } + if err := s.clientService.SyncInbound(tx, c.Id, clients); err != nil { + logger.Warning("setRemoteTraffic: sync clients for tag", snapIb.Tag, "failed:", err) + } + } + if err := tx.Commit().Error; err != nil { return false, err } @@ -1645,7 +1672,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) var depletedRows []xray.ClientTraffic err := tx.Model(xray.ClientTraffic{}). Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true). - Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")). Find(&depletedRows).Error if err != nil { return false, 0, err @@ -1663,27 +1689,39 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) } type target struct { - Tag string - Email string + InboundID int `gorm:"column:inbound_id"` + NodeID *int `gorm:"column:node_id"` + Tag string + Email string } var targets []target if len(depletedEmails) > 0 { err = tx.Raw(` - SELECT inbounds.tag AS tag, clients.email AS email + SELECT inbounds.id AS inbound_id, inbounds.node_id AS node_id, + inbounds.tag AS tag, clients.email AS email FROM clients JOIN client_inbounds ON client_inbounds.client_id = clients.id JOIN inbounds ON inbounds.id = client_inbounds.inbound_id - WHERE inbounds.node_id IS NULL - AND clients.email IN ? + WHERE clients.email IN ? `, depletedEmails).Scan(&targets).Error if err != nil { return false, 0, err } } - if p != nil && len(targets) > 0 { + var localTargets []target + remoteByInbound := make(map[int][]target) + for _, t := range targets { + if t.NodeID == nil { + localTargets = append(localTargets, t) + } else { + remoteByInbound[t.InboundID] = append(remoteByInbound[t.InboundID], t) + } + } + + if p != nil && len(localTargets) > 0 { s.xrayApi.Init(p.GetAPIPort()) - for _, t := range targets { + for _, t := range localTargets { err1 := s.xrayApi.RemoveUser(t.Tag, t.Email) if err1 == nil { logger.Debug("Client disabled by api:", t.Email) @@ -1699,7 +1737,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) result := tx.Model(xray.ClientTraffic{}). Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true). - Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")). Update("enable", false) err = result.Error count := result.RowsAffected @@ -1715,9 +1752,73 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) } } + for inboundID, group := range remoteByInbound { + emails := make(map[string]struct{}, len(group)) + for _, t := range group { + emails[t.Email] = struct{}{} + } + if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil { + logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr) + needRestart = true + } + } + return needRestart, count, nil } +// disableRemoteClients marks the given emails as disabled in the stored +// inbound.settings JSON and pushes the updated inbound to the remote node so +// the remote's UpdateClientStat sets xray_client_traffic.enable=false and +// SyncInbound sets clients.enable=false on the remote side. Without this +// the remote's own auto-disable would be reverted whenever the central +// pushes any inbound update later. +func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error { + var ib model.Inbound + if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).First(&ib).Error; err != nil { + return err + } + oldSnapshot := ib + + settings := map[string]any{} + if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil { + return err + } + clients, _ := settings["clients"].([]any) + now := time.Now().Unix() * 1000 + for i := range clients { + entry, ok := clients[i].(map[string]any) + if !ok { + continue + } + email, _ := entry["email"].(string) + if _, hit := emails[email]; !hit { + continue + } + entry["enable"] = false + entry["updated_at"] = now + clients[i] = entry + } + settings["clients"] = clients + bs, err := json.MarshalIndent(settings, "", " ") + if err != nil { + return err + } + ib.Settings = string(bs) + if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID). + Update("settings", ib.Settings).Error; err != nil { + return err + } + + rt, err := s.runtimeFor(&ib) + if err != nil { + return err + } + if err := rt.UpdateInbound(context.Background(), &oldSnapshot, &ib); err != nil { + return err + } + return nil +} + func (s *InboundService) GetInboundTags() (string, error) { db := database.GetDB() var inboundTags []string @@ -2005,24 +2106,6 @@ func (s *InboundService) resetAllTrafficsLocked() error { return err } - var inbounds []model.Inbound - if err := db.Model(model.Inbound{}). - Where("node_id IS NOT NULL"). - Find(&inbounds).Error; err != nil { - logger.Warning("ResetAllTraffics: discover node inbounds failed:", err) - return nil - } - for i := range inbounds { - ib := &inbounds[i] - rt, rterr := s.runtimeFor(ib) - if rterr != nil { - logger.Warning("ResetAllTraffics: runtime lookup for inbound", ib.Id, "failed:", rterr) - continue - } - if e := rt.ResetInboundClientTraffics(context.Background(), ib); e != nil { - logger.Warning("ResetAllTraffics: remote propagation to", rt.Name(), "failed:", e) - } - } return nil }