feat(clients): make clients+client_inbounds the runtime source of truth

Adds ClientService.SyncInbound that reconciles the new tables from
each inbound's clients list whenever existing service paths mutate
settings.clients. Wires it into AddInbound, UpdateInbound,
AddInboundClient, UpdateInboundClient, DelInboundClient,
DelInboundClientByEmail, DelDepletedClients, autoRenewClients, and
the timestamp-backfill path in adjustTraffics, plus DetachInbound
on DelInbound.

GetXrayConfig now builds settings.clients from the new tables before
writing config.json, and getInboundsBySubId joins through them
instead of JSON_EACH on settings JSON. Live Xray config and
subscription endpoints are now driven by the relational view;
settings.clients JSON stays in step as a side effect of every write.
This commit is contained in:
MHSanaei 2026-05-17 07:15:16 +02:00
parent c251482f26
commit ba3c581372
No known key found for this signature in database
GPG key ID: 7E4060F2FBE5AB7A
4 changed files with 297 additions and 54 deletions

View file

@ -144,15 +144,14 @@ func (s *SubService) GetSubs(subId string, host string) ([]string, int64, xray.C
func (s *SubService) getInboundsBySubId(subId string) ([]*model.Inbound, error) { func (s *SubService) getInboundsBySubId(subId string) ([]*model.Inbound, error) {
db := database.GetDB() db := database.GetDB()
var inbounds []*model.Inbound var inbounds []*model.Inbound
// allow "hysteria2" so imports stored with the literal v2 protocol
// string still surface here (#4081)
err := db.Model(model.Inbound{}).Preload("ClientStats").Where(`id in ( err := db.Model(model.Inbound{}).Preload("ClientStats").Where(`id in (
SELECT DISTINCT inbounds.id SELECT DISTINCT inbounds.id
FROM inbounds, FROM inbounds
JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client JOIN client_inbounds ON client_inbounds.inbound_id = inbounds.id
JOIN clients ON clients.id = client_inbounds.client_id
WHERE WHERE
protocol in ('vmess','vless','trojan','shadowsocks','hysteria','hysteria2') inbounds.protocol in ('vmess','vless','trojan','shadowsocks','hysteria','hysteria2')
AND JSON_EXTRACT(client.value, '$.subId') = ? AND enable = ? AND clients.sub_id = ? AND inbounds.enable = ?
)`, subId, true).Find(&inbounds).Error )`, subId, true).Find(&inbounds).Error
if err != nil { if err != nil {
return nil, err return nil, err

143
web/service/client.go Normal file
View file

@ -0,0 +1,143 @@
package service
import (
"errors"
"strings"
"github.com/mhsanaei/3x-ui/v3/database"
"github.com/mhsanaei/3x-ui/v3/database/model"
"gorm.io/gorm"
)
type ClientService struct{}
func (s *ClientService) SyncInbound(tx *gorm.DB, inboundId int, clients []model.Client) error {
if tx == nil {
tx = database.GetDB()
}
if err := tx.Where("inbound_id = ?", inboundId).Delete(&model.ClientInbound{}).Error; err != nil {
return err
}
for i := range clients {
c := clients[i]
email := strings.TrimSpace(c.Email)
if email == "" {
continue
}
incoming := c.ToRecord()
row := &model.ClientRecord{}
err := tx.Where("email = ?", email).First(row).Error
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
if errors.Is(err, gorm.ErrRecordNotFound) {
if err := tx.Create(incoming).Error; err != nil {
return err
}
row = incoming
} else {
row.UUID = incoming.UUID
row.Password = incoming.Password
row.Auth = incoming.Auth
row.Flow = incoming.Flow
row.Security = incoming.Security
row.Reverse = incoming.Reverse
row.SubID = incoming.SubID
row.LimitIP = incoming.LimitIP
row.TotalGB = incoming.TotalGB
row.ExpiryTime = incoming.ExpiryTime
row.Enable = incoming.Enable
row.TgID = incoming.TgID
row.Comment = incoming.Comment
row.Reset = incoming.Reset
if incoming.CreatedAt > 0 && (row.CreatedAt == 0 || incoming.CreatedAt < row.CreatedAt) {
row.CreatedAt = incoming.CreatedAt
}
if incoming.UpdatedAt > row.UpdatedAt {
row.UpdatedAt = incoming.UpdatedAt
}
if err := tx.Save(row).Error; err != nil {
return err
}
}
link := model.ClientInbound{
ClientId: row.Id,
InboundId: inboundId,
FlowOverride: c.Flow,
}
if err := tx.Create(&link).Error; err != nil {
return err
}
}
return nil
}
func (s *ClientService) DetachInbound(tx *gorm.DB, inboundId int) error {
if tx == nil {
tx = database.GetDB()
}
return tx.Where("inbound_id = ?", inboundId).Delete(&model.ClientInbound{}).Error
}
func (s *ClientService) ListForInbound(tx *gorm.DB, inboundId int) ([]model.Client, error) {
if tx == nil {
tx = database.GetDB()
}
type joinedRow struct {
model.ClientRecord
FlowOverride string
}
var rows []joinedRow
err := tx.Table("clients").
Select("clients.*, client_inbounds.flow_override AS flow_override").
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
Where("client_inbounds.inbound_id = ?", inboundId).
Order("clients.id ASC").
Find(&rows).Error
if err != nil {
return nil, err
}
out := make([]model.Client, 0, len(rows))
for i := range rows {
c := rows[i].ToClient()
if rows[i].FlowOverride != "" {
c.Flow = rows[i].FlowOverride
}
out = append(out, *c)
}
return out, nil
}
func (s *ClientService) GetRecordByEmail(tx *gorm.DB, email string) (*model.ClientRecord, error) {
if tx == nil {
tx = database.GetDB()
}
row := &model.ClientRecord{}
err := tx.Where("email = ?", email).First(row).Error
if err != nil {
return nil, err
}
return row, nil
}
func (s *ClientService) GetInboundIdsForEmail(tx *gorm.DB, email string) ([]int, error) {
if tx == nil {
tx = database.GetDB()
}
var ids []int
err := tx.Table("client_inbounds").
Select("client_inbounds.inbound_id").
Joins("JOIN clients ON clients.id = client_inbounds.client_id").
Where("clients.email = ?", email).
Scan(&ids).Error
if err != nil {
return nil, err
}
return ids, nil
}

View file

@ -25,6 +25,7 @@ import (
type InboundService struct { type InboundService struct {
xrayApi xray.XrayAPI xrayApi xray.XrayAPI
clientService ClientService
} }
func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) { func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
@ -395,6 +396,10 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
return inbound, false, err return inbound, false, err
} }
if err = s.clientService.SyncInbound(tx, inbound.Id, clients); err != nil {
return inbound, false, err
}
needRestart := false needRestart := false
if inbound.Enable { if inbound.Enable {
rt, rterr := s.runtimeFor(inbound) rt, rterr := s.runtimeFor(inbound)
@ -447,6 +452,9 @@ func (s *InboundService) DelInbound(id int) (bool, error) {
if err != nil { if err != nil {
return false, err return false, err
} }
if err := s.clientService.DetachInbound(db, id); err != nil {
return false, err
}
inbound, err := s.GetInbound(id) inbound, err := s.GetInbound(id)
if err != nil { if err != nil {
return false, err return false, err
@ -705,7 +713,18 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
} }
} }
return inbound, needRestart, tx.Save(oldInbound).Error if err = tx.Save(oldInbound).Error; err != nil {
return inbound, false, err
}
newClients, gcErr := s.GetClients(oldInbound)
if gcErr != nil {
err = gcErr
return inbound, false, err
}
if err = s.clientService.SyncInbound(tx, oldInbound.Id, newClients); err != nil {
return inbound, false, err
}
return inbound, needRestart, nil
} }
func (s *InboundService) buildRuntimeInboundForAPI(tx *gorm.DB, inbound *model.Inbound) (*model.Inbound, error) { func (s *InboundService) buildRuntimeInboundForAPI(tx *gorm.DB, inbound *model.Inbound) (*model.Inbound, error) {
@ -980,7 +999,18 @@ func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) {
} }
} }
return needRestart, tx.Save(oldInbound).Error if err = tx.Save(oldInbound).Error; err != nil {
return false, err
}
finalClients, gcErr := s.GetClients(oldInbound)
if gcErr != nil {
err = gcErr
return false, err
}
if err = s.clientService.SyncInbound(tx, oldInbound.Id, finalClients); err != nil {
return false, err
}
return needRestart, nil
} }
func (s *InboundService) getClientPrimaryKey(protocol model.Protocol, client model.Client) string { func (s *InboundService) getClientPrimaryKey(protocol model.Protocol, client model.Client) string {
@ -1291,7 +1321,17 @@ func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool,
} }
} }
} }
return needRestart, db.Save(oldInbound).Error if err := db.Save(oldInbound).Error; err != nil {
return false, err
}
finalClients, gcErr := s.GetClients(oldInbound)
if gcErr != nil {
return false, gcErr
}
if err := s.clientService.SyncInbound(db, inboundId, finalClients); err != nil {
return false, err
}
return needRestart, nil
} }
func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId string) (bool, error) { func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId string) (bool, error) {
@ -1540,7 +1580,18 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
logger.Debug("Client old email not found") logger.Debug("Client old email not found")
needRestart = true needRestart = true
} }
return needRestart, tx.Save(oldInbound).Error if err = tx.Save(oldInbound).Error; err != nil {
return false, err
}
finalClients, gcErr := s.GetClients(oldInbound)
if gcErr != nil {
err = gcErr
return false, err
}
if err = s.clientService.SyncInbound(tx, oldInbound.Id, finalClients); err != nil {
return false, err
}
return needRestart, nil
} }
const resetGracePeriodMs int64 = 30000 const resetGracePeriodMs int64 = 30000
@ -2002,6 +2053,20 @@ func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.Cl
if err != nil { if err != nil {
logger.Warning("AddClientTraffic update inbounds ", err) logger.Warning("AddClientTraffic update inbounds ", err)
logger.Error(inbounds) logger.Error(inbounds)
} else {
for _, ib := range inbounds {
if ib == nil {
continue
}
cs, gcErr := s.GetClients(ib)
if gcErr != nil {
logger.Warning("AddClientTraffic sync clients: GetClients failed", gcErr)
continue
}
if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
logger.Warning("AddClientTraffic sync clients: SyncInbound failed", syncErr)
}
}
} }
} }
@ -2096,6 +2161,19 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
if err != nil { if err != nil {
return false, 0, err return false, 0, err
} }
for _, ib := range inbounds {
if ib == nil {
continue
}
cs, gcErr := s.GetClients(ib)
if gcErr != nil {
logger.Warning("autoRenewClients sync clients: GetClients failed", gcErr)
continue
}
if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
logger.Warning("autoRenewClients sync clients: SyncInbound failed", syncErr)
}
}
err = tx.Save(traffics).Error err = tx.Save(traffics).Error
if err != nil { if err != nil {
return false, 0, err return false, 0, err
@ -3156,6 +3234,14 @@ func (s *InboundService) DelDepletedClients(id int) (err error) {
if err = tx.Save(inbound).Error; err != nil { if err = tx.Save(inbound).Error; err != nil {
return err return err
} }
survivingClients, gcErr := s.GetClients(inbound)
if gcErr != nil {
err = gcErr
return err
}
if err = s.clientService.SyncInbound(tx, inbound.Id, survivingClients); err != nil {
return err
}
} }
// Drop now-orphaned rows. With id >= 0, a row is safe to drop only when // Drop now-orphaned rows. With id >= 0, a row is safe to drop only when
@ -3924,7 +4010,17 @@ func (s *InboundService) DelInboundClientByEmail(inboundId int, email string) (b
} }
} }
return needRestart, db.Save(oldInbound).Error if err := db.Save(oldInbound).Error; err != nil {
return false, err
}
finalClients, gcErr := s.GetClients(oldInbound)
if gcErr != nil {
return false, gcErr
}
if err := s.clientService.SyncInbound(db, inboundId, finalClients); err != nil {
return false, err
}
return needRestart, nil
} }
type SubLinkProvider interface { type SubLinkProvider interface {

View file

@ -116,57 +116,62 @@ func (s *XrayService) GetXrayConfig() (*xray.Config, error) {
if inbound.NodeID != nil { if inbound.NodeID != nil {
continue continue
} }
// get settings clients
settings := map[string]any{} settings := map[string]any{}
json.Unmarshal([]byte(inbound.Settings), &settings) json.Unmarshal([]byte(inbound.Settings), &settings)
clients, ok := settings["clients"].([]any)
if ok { dbClients, listErr := s.inboundService.clientService.ListForInbound(nil, inbound.Id)
// Fast O(N) lookup map for client traffic enablement if listErr != nil {
return nil, listErr
}
clientStats := inbound.ClientStats clientStats := inbound.ClientStats
enableMap := make(map[string]bool, len(clientStats)) enableMap := make(map[string]bool, len(clientStats))
for _, clientTraffic := range clientStats { for _, clientTraffic := range clientStats {
enableMap[clientTraffic.Email] = clientTraffic.Enable enableMap[clientTraffic.Email] = clientTraffic.Enable
} }
// filter and clean clients var finalClients []any
var final_clients []any for i := range dbClients {
for _, client := range clients { c := dbClients[i]
c, ok := client.(map[string]any) if enable, exists := enableMap[c.Email]; exists && !enable {
if !ok { logger.Infof("Remove Inbound User %s due to expiration or traffic limit", c.Email)
continue continue
} }
if !c.Enable {
email, _ := c["email"].(string)
// check users active or not via stats
if enable, exists := enableMap[email]; exists && !enable {
logger.Infof("Remove Inbound User %s due to expiration or traffic limit", email)
continue continue
} }
flow := c.Flow
// check manual disabled flag if flow == "xtls-rprx-vision-udp443" {
if manualEnable, ok := c["enable"].(bool); ok && !manualEnable { flow = "xtls-rprx-vision"
continue }
entry := map[string]any{"email": c.Email}
if c.ID != "" {
entry["id"] = c.ID
}
if c.Password != "" {
entry["password"] = c.Password
}
if flow != "" {
entry["flow"] = flow
}
if c.Auth != "" {
entry["auth"] = c.Auth
}
if c.Security != "" {
entry["method"] = c.Security
}
if c.Reverse != nil {
entry["reverse"] = c.Reverse
}
finalClients = append(finalClients, entry)
} }
// clear client config for additional parameters if _, hadClients := settings["clients"]; hadClients || len(finalClients) > 0 {
for key := range c { settings["clients"] = finalClients
if key != "email" && key != "id" && key != "password" && key != "flow" && key != "method" && key != "auth" && key != "reverse" {
delete(c, key)
}
if flow, ok := c["flow"].(string); ok && flow == "xtls-rprx-vision-udp443" {
c["flow"] = "xtls-rprx-vision"
}
}
final_clients = append(final_clients, any(c))
}
settings["clients"] = final_clients
modifiedSettings, err := json.MarshalIndent(settings, "", " ") modifiedSettings, err := json.MarshalIndent(settings, "", " ")
if err != nil { if err != nil {
return nil, err return nil, err
} }
inbound.Settings = string(modifiedSettings) inbound.Settings = string(modifiedSettings)
} }