From ba3c581372ea3121375d2647e37f0341c964cdfa Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Sun, 17 May 2026 07:15:16 +0200 Subject: [PATCH] feat(clients): make clients+client_inbounds the runtime source of truth Adds ClientService.SyncInbound that reconciles the new tables from each inbound's clients list whenever existing service paths mutate settings.clients. Wires it into AddInbound, UpdateInbound, AddInboundClient, UpdateInboundClient, DelInboundClient, DelInboundClientByEmail, DelDepletedClients, autoRenewClients, and the timestamp-backfill path in adjustTraffics, plus DetachInbound on DelInbound. GetXrayConfig now builds settings.clients from the new tables before writing config.json, and getInboundsBySubId joins through them instead of JSON_EACH on settings JSON. Live Xray config and subscription endpoints are now driven by the relational view; settings.clients JSON stays in step as a side effect of every write. --- sub/subService.go | 11 ++-- web/service/client.go | 143 +++++++++++++++++++++++++++++++++++++++++ web/service/inbound.go | 108 +++++++++++++++++++++++++++++-- web/service/xray.go | 89 +++++++++++++------------ 4 files changed, 297 insertions(+), 54 deletions(-) create mode 100644 web/service/client.go diff --git a/sub/subService.go b/sub/subService.go index d769bf5a..afa9afca 100644 --- a/sub/subService.go +++ b/sub/subService.go @@ -144,15 +144,14 @@ func (s *SubService) GetSubs(subId string, host string) ([]string, int64, xray.C func (s *SubService) getInboundsBySubId(subId string) ([]*model.Inbound, error) { db := database.GetDB() var inbounds []*model.Inbound - // allow "hysteria2" so imports stored with the literal v2 protocol - // string still surface here (#4081) err := db.Model(model.Inbound{}).Preload("ClientStats").Where(`id in ( SELECT DISTINCT inbounds.id - FROM inbounds, - JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client + FROM inbounds + JOIN client_inbounds ON client_inbounds.inbound_id = inbounds.id + JOIN clients ON clients.id = client_inbounds.client_id WHERE - protocol in ('vmess','vless','trojan','shadowsocks','hysteria','hysteria2') - AND JSON_EXTRACT(client.value, '$.subId') = ? AND enable = ? + inbounds.protocol in ('vmess','vless','trojan','shadowsocks','hysteria','hysteria2') + AND clients.sub_id = ? AND inbounds.enable = ? )`, subId, true).Find(&inbounds).Error if err != nil { return nil, err diff --git a/web/service/client.go b/web/service/client.go new file mode 100644 index 00000000..354e3fc5 --- /dev/null +++ b/web/service/client.go @@ -0,0 +1,143 @@ +package service + +import ( + "errors" + "strings" + + "github.com/mhsanaei/3x-ui/v3/database" + "github.com/mhsanaei/3x-ui/v3/database/model" + + "gorm.io/gorm" +) + +type ClientService struct{} + +func (s *ClientService) SyncInbound(tx *gorm.DB, inboundId int, clients []model.Client) error { + if tx == nil { + tx = database.GetDB() + } + + if err := tx.Where("inbound_id = ?", inboundId).Delete(&model.ClientInbound{}).Error; err != nil { + return err + } + + for i := range clients { + c := clients[i] + email := strings.TrimSpace(c.Email) + if email == "" { + continue + } + + incoming := c.ToRecord() + row := &model.ClientRecord{} + err := tx.Where("email = ?", email).First(row).Error + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + return err + } + if errors.Is(err, gorm.ErrRecordNotFound) { + if err := tx.Create(incoming).Error; err != nil { + return err + } + row = incoming + } else { + row.UUID = incoming.UUID + row.Password = incoming.Password + row.Auth = incoming.Auth + row.Flow = incoming.Flow + row.Security = incoming.Security + row.Reverse = incoming.Reverse + row.SubID = incoming.SubID + row.LimitIP = incoming.LimitIP + row.TotalGB = incoming.TotalGB + row.ExpiryTime = incoming.ExpiryTime + row.Enable = incoming.Enable + row.TgID = incoming.TgID + row.Comment = incoming.Comment + row.Reset = incoming.Reset + if incoming.CreatedAt > 0 && (row.CreatedAt == 0 || incoming.CreatedAt < row.CreatedAt) { + row.CreatedAt = incoming.CreatedAt + } + if incoming.UpdatedAt > row.UpdatedAt { + row.UpdatedAt = incoming.UpdatedAt + } + if err := tx.Save(row).Error; err != nil { + return err + } + } + + link := model.ClientInbound{ + ClientId: row.Id, + InboundId: inboundId, + FlowOverride: c.Flow, + } + if err := tx.Create(&link).Error; err != nil { + return err + } + } + return nil +} + +func (s *ClientService) DetachInbound(tx *gorm.DB, inboundId int) error { + if tx == nil { + tx = database.GetDB() + } + return tx.Where("inbound_id = ?", inboundId).Delete(&model.ClientInbound{}).Error +} + +func (s *ClientService) ListForInbound(tx *gorm.DB, inboundId int) ([]model.Client, error) { + if tx == nil { + tx = database.GetDB() + } + type joinedRow struct { + model.ClientRecord + FlowOverride string + } + var rows []joinedRow + err := tx.Table("clients"). + Select("clients.*, client_inbounds.flow_override AS flow_override"). + Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id"). + Where("client_inbounds.inbound_id = ?", inboundId). + Order("clients.id ASC"). + Find(&rows).Error + if err != nil { + return nil, err + } + + out := make([]model.Client, 0, len(rows)) + for i := range rows { + c := rows[i].ToClient() + if rows[i].FlowOverride != "" { + c.Flow = rows[i].FlowOverride + } + out = append(out, *c) + } + return out, nil +} + +func (s *ClientService) GetRecordByEmail(tx *gorm.DB, email string) (*model.ClientRecord, error) { + if tx == nil { + tx = database.GetDB() + } + row := &model.ClientRecord{} + err := tx.Where("email = ?", email).First(row).Error + if err != nil { + return nil, err + } + return row, nil +} + +func (s *ClientService) GetInboundIdsForEmail(tx *gorm.DB, email string) ([]int, error) { + if tx == nil { + tx = database.GetDB() + } + var ids []int + err := tx.Table("client_inbounds"). + Select("client_inbounds.inbound_id"). + Joins("JOIN clients ON clients.id = client_inbounds.client_id"). + Where("clients.email = ?", email). + Scan(&ids).Error + if err != nil { + return nil, err + } + return ids, nil +} diff --git a/web/service/inbound.go b/web/service/inbound.go index 16bb2528..d23dbe86 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -24,7 +24,8 @@ import ( ) type InboundService struct { - xrayApi xray.XrayAPI + xrayApi xray.XrayAPI + clientService ClientService } func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) { @@ -395,6 +396,10 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo return inbound, false, err } + if err = s.clientService.SyncInbound(tx, inbound.Id, clients); err != nil { + return inbound, false, err + } + needRestart := false if inbound.Enable { rt, rterr := s.runtimeFor(inbound) @@ -447,6 +452,9 @@ func (s *InboundService) DelInbound(id int) (bool, error) { if err != nil { return false, err } + if err := s.clientService.DetachInbound(db, id); err != nil { + return false, err + } inbound, err := s.GetInbound(id) if err != nil { return false, err @@ -705,7 +713,18 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, } } - return inbound, needRestart, tx.Save(oldInbound).Error + if err = tx.Save(oldInbound).Error; err != nil { + return inbound, false, err + } + newClients, gcErr := s.GetClients(oldInbound) + if gcErr != nil { + err = gcErr + return inbound, false, err + } + if err = s.clientService.SyncInbound(tx, oldInbound.Id, newClients); err != nil { + return inbound, false, err + } + return inbound, needRestart, nil } func (s *InboundService) buildRuntimeInboundForAPI(tx *gorm.DB, inbound *model.Inbound) (*model.Inbound, error) { @@ -980,7 +999,18 @@ func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) { } } - return needRestart, tx.Save(oldInbound).Error + if err = tx.Save(oldInbound).Error; err != nil { + return false, err + } + finalClients, gcErr := s.GetClients(oldInbound) + if gcErr != nil { + err = gcErr + return false, err + } + if err = s.clientService.SyncInbound(tx, oldInbound.Id, finalClients); err != nil { + return false, err + } + return needRestart, nil } func (s *InboundService) getClientPrimaryKey(protocol model.Protocol, client model.Client) string { @@ -1291,7 +1321,17 @@ func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool, } } } - return needRestart, db.Save(oldInbound).Error + if err := db.Save(oldInbound).Error; err != nil { + return false, err + } + finalClients, gcErr := s.GetClients(oldInbound) + if gcErr != nil { + return false, gcErr + } + if err := s.clientService.SyncInbound(db, inboundId, finalClients); err != nil { + return false, err + } + return needRestart, nil } func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId string) (bool, error) { @@ -1540,7 +1580,18 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin logger.Debug("Client old email not found") needRestart = true } - return needRestart, tx.Save(oldInbound).Error + if err = tx.Save(oldInbound).Error; err != nil { + return false, err + } + finalClients, gcErr := s.GetClients(oldInbound) + if gcErr != nil { + err = gcErr + return false, err + } + if err = s.clientService.SyncInbound(tx, oldInbound.Id, finalClients); err != nil { + return false, err + } + return needRestart, nil } const resetGracePeriodMs int64 = 30000 @@ -2002,6 +2053,20 @@ func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.Cl if err != nil { logger.Warning("AddClientTraffic update inbounds ", err) logger.Error(inbounds) + } else { + for _, ib := range inbounds { + if ib == nil { + continue + } + cs, gcErr := s.GetClients(ib) + if gcErr != nil { + logger.Warning("AddClientTraffic sync clients: GetClients failed", gcErr) + continue + } + if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil { + logger.Warning("AddClientTraffic sync clients: SyncInbound failed", syncErr) + } + } } } @@ -2096,6 +2161,19 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) { if err != nil { return false, 0, err } + for _, ib := range inbounds { + if ib == nil { + continue + } + cs, gcErr := s.GetClients(ib) + if gcErr != nil { + logger.Warning("autoRenewClients sync clients: GetClients failed", gcErr) + continue + } + if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil { + logger.Warning("autoRenewClients sync clients: SyncInbound failed", syncErr) + } + } err = tx.Save(traffics).Error if err != nil { return false, 0, err @@ -3156,6 +3234,14 @@ func (s *InboundService) DelDepletedClients(id int) (err error) { if err = tx.Save(inbound).Error; err != nil { return err } + survivingClients, gcErr := s.GetClients(inbound) + if gcErr != nil { + err = gcErr + return err + } + if err = s.clientService.SyncInbound(tx, inbound.Id, survivingClients); err != nil { + return err + } } // Drop now-orphaned rows. With id >= 0, a row is safe to drop only when @@ -3924,7 +4010,17 @@ func (s *InboundService) DelInboundClientByEmail(inboundId int, email string) (b } } - return needRestart, db.Save(oldInbound).Error + if err := db.Save(oldInbound).Error; err != nil { + return false, err + } + finalClients, gcErr := s.GetClients(oldInbound) + if gcErr != nil { + return false, gcErr + } + if err := s.clientService.SyncInbound(db, inboundId, finalClients); err != nil { + return false, err + } + return needRestart, nil } type SubLinkProvider interface { diff --git a/web/service/xray.go b/web/service/xray.go index 09908aab..0e36f05e 100644 --- a/web/service/xray.go +++ b/web/service/xray.go @@ -116,57 +116,62 @@ func (s *XrayService) GetXrayConfig() (*xray.Config, error) { if inbound.NodeID != nil { continue } - // get settings clients settings := map[string]any{} json.Unmarshal([]byte(inbound.Settings), &settings) - clients, ok := settings["clients"].([]any) - if ok { - // Fast O(N) lookup map for client traffic enablement - clientStats := inbound.ClientStats - enableMap := make(map[string]bool, len(clientStats)) - for _, clientTraffic := range clientStats { - enableMap[clientTraffic.Email] = clientTraffic.Enable + + dbClients, listErr := s.inboundService.clientService.ListForInbound(nil, inbound.Id) + if listErr != nil { + return nil, listErr + } + + clientStats := inbound.ClientStats + enableMap := make(map[string]bool, len(clientStats)) + for _, clientTraffic := range clientStats { + enableMap[clientTraffic.Email] = clientTraffic.Enable + } + + var finalClients []any + for i := range dbClients { + c := dbClients[i] + if enable, exists := enableMap[c.Email]; exists && !enable { + logger.Infof("Remove Inbound User %s due to expiration or traffic limit", c.Email) + continue } - - // filter and clean clients - var final_clients []any - for _, client := range clients { - c, ok := client.(map[string]any) - if !ok { - continue - } - - email, _ := c["email"].(string) - - // check users active or not via stats - if enable, exists := enableMap[email]; exists && !enable { - logger.Infof("Remove Inbound User %s due to expiration or traffic limit", email) - continue - } - - // check manual disabled flag - if manualEnable, ok := c["enable"].(bool); ok && !manualEnable { - continue - } - - // clear client config for additional parameters - for key := range c { - if key != "email" && key != "id" && key != "password" && key != "flow" && key != "method" && key != "auth" && key != "reverse" { - delete(c, key) - } - if flow, ok := c["flow"].(string); ok && flow == "xtls-rprx-vision-udp443" { - c["flow"] = "xtls-rprx-vision" - } - } - final_clients = append(final_clients, any(c)) + if !c.Enable { + continue } + flow := c.Flow + if flow == "xtls-rprx-vision-udp443" { + flow = "xtls-rprx-vision" + } + entry := map[string]any{"email": c.Email} + if c.ID != "" { + entry["id"] = c.ID + } + if c.Password != "" { + entry["password"] = c.Password + } + if flow != "" { + entry["flow"] = flow + } + if c.Auth != "" { + entry["auth"] = c.Auth + } + if c.Security != "" { + entry["method"] = c.Security + } + if c.Reverse != nil { + entry["reverse"] = c.Reverse + } + finalClients = append(finalClients, entry) + } - settings["clients"] = final_clients + if _, hadClients := settings["clients"]; hadClients || len(finalClients) > 0 { + settings["clients"] = finalClients modifiedSettings, err := json.MarshalIndent(settings, "", " ") if err != nil { return nil, err } - inbound.Settings = string(modifiedSettings) }