feat(inbounds): one client identity across multiple inbounds via subId

Lets the operator add the same email under the same subId to several
inbounds. Xray reports traffic per email, so a single client_traffics
row acts as the shared accumulator — no aggregation overhead, quota and
expiry stay consistent.

- Email validation allows duplicates only when subId matches
- AddClientStat upserts via OnConflict DoNothing (idempotent on rerun)
- Stat/IP rows survive client deletion when a sibling inbound still
  references the email
- enrichClientStats tops up GORM-preloaded stats with rows whose
  inbound_id points at a sibling, so every panel view sees usage
- disableInvalidClients cascades enable=false and syncs the row's
  total/expiry into every sibling JSON when the shared identity expires
- DelDepletedClients removes the depleted client from all referencing
  inbounds, batched
- Subscription services dedupe traffic by email so shared quota is
  counted once

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
MHSanaei 2026-05-09 03:57:57 +02:00
parent d6fe3d3823
commit ef066a19fc
No known key found for this signature in database
GPG key ID: 7E4060F2FBE5AB7A
4 changed files with 525 additions and 241 deletions

View file

@ -38,6 +38,7 @@ func (s *SubClashService) GetClash(subId string, host string) (string, string, e
var clientTraffics []xray.ClientTraffic
var proxies []map[string]any
seenEmails := make(map[string]struct{})
for _, inbound := range inbounds {
clients, err := s.inboundService.GetClients(inbound)
if err != nil {
@ -56,7 +57,7 @@ func (s *SubClashService) GetClash(subId string, host string) (string, string, e
}
for _, client := range clients {
if client.SubID == subId {
clientTraffics = append(clientTraffics, s.SubService.getClientTraffics(inbound.ClientStats, client.Email))
_, clientTraffics = s.SubService.appendUniqueTraffic(seenEmails, clientTraffics, inbound.ClientStats, client.Email)
proxies = append(proxies, s.getProxies(inbound, client, host)...)
}
}

View file

@ -97,6 +97,7 @@ func (s *SubJsonService) GetJson(subId string, host string) (string, string, err
var clientTraffics []xray.ClientTraffic
var configArray []json_util.RawMessage
seenEmails := make(map[string]struct{})
// Prepare Inbounds
for _, inbound := range inbounds {
clients, err := s.inboundService.GetClients(inbound)
@ -117,9 +118,8 @@ func (s *SubJsonService) GetJson(subId string, host string) (string, string, err
for _, client := range clients {
if client.SubID == subId {
clientTraffics = append(clientTraffics, s.SubService.getClientTraffics(inbound.ClientStats, client.Email))
newConfigs := s.getConfig(inbound, client, host)
configArray = append(configArray, newConfigs...)
_, clientTraffics = s.SubService.appendUniqueTraffic(seenEmails, clientTraffics, inbound.ClientStats, client.Email)
configArray = append(configArray, s.getConfig(inbound, client, host)...)
}
}
}

View file

@ -61,6 +61,7 @@ func (s *SubService) GetSubs(subId string, host string) ([]string, int64, xray.C
if err != nil {
s.datepicker = "gregorian"
}
seenEmails := make(map[string]struct{})
for _, inbound := range inbounds {
clients, err := s.inboundService.GetClients(inbound)
if err != nil {
@ -82,10 +83,9 @@ func (s *SubService) GetSubs(subId string, host string) ([]string, int64, xray.C
if client.Enable {
hasEnabledClient = true
}
link := s.getLink(inbound, client.Email)
result = append(result, link)
ct := s.getClientTraffics(inbound.ClientStats, client.Email)
clientTraffics = append(clientTraffics, ct)
result = append(result, s.getLink(inbound, client.Email))
var ct xray.ClientTraffic
ct, clientTraffics = s.appendUniqueTraffic(seenEmails, clientTraffics, inbound.ClientStats, client.Email)
if ct.LastOnline > lastOnline {
lastOnline = ct.LastOnline
}
@ -138,6 +138,19 @@ func (s *SubService) getInboundsBySubId(subId string) ([]*model.Inbound, error)
return inbounds, nil
}
// appendUniqueTraffic resolves the traffic stats for email and appends them
// to acc only the first time email is seen. Shared-email mode lets one
// client_traffics row underpin several inbounds, so without dedupe its
// quota and usage would be counted once per inbound.
func (s *SubService) appendUniqueTraffic(seen map[string]struct{}, acc []xray.ClientTraffic, stats []xray.ClientTraffic, email string) (xray.ClientTraffic, []xray.ClientTraffic) {
ct := s.getClientTraffics(stats, email)
if _, dup := seen[email]; !dup {
seen[email] = struct{}{}
acc = append(acc, ct)
}
return ct, acc
}
func (s *SubService) getClientTraffics(traffics []xray.ClientTraffic, email string) xray.ClientTraffic {
for _, traffic := range traffics {
if traffic.Email == email {

View file

@ -18,6 +18,7 @@ import (
"github.com/mhsanaei/3x-ui/v2/xray"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// InboundService provides business logic for managing Xray inbound configurations.
@ -33,8 +34,86 @@ type CopyClientsResult struct {
Errors []string `json:"errors"`
}
// GetInbounds retrieves all inbounds for a specific user.
// Returns a slice of inbound models with their associated client statistics.
// enrichClientStats parses each inbound's clients once, fills in the
// UUID/SubId fields on the preloaded ClientStats, and tops up rows owned by
// a sibling inbound (shared-email mode — the row is keyed on email so it
// only preloads on its owning inbound).
func (s *InboundService) enrichClientStats(db *gorm.DB, inbounds []*model.Inbound) {
if len(inbounds) == 0 {
return
}
clientsByInbound := make([][]model.Client, len(inbounds))
seenByInbound := make([]map[string]struct{}, len(inbounds))
missing := make(map[string]struct{})
for i, inbound := range inbounds {
clients, _ := s.GetClients(inbound)
clientsByInbound[i] = clients
seen := make(map[string]struct{}, len(inbound.ClientStats))
for _, st := range inbound.ClientStats {
if st.Email != "" {
seen[strings.ToLower(st.Email)] = struct{}{}
}
}
seenByInbound[i] = seen
for _, c := range clients {
if c.Email == "" {
continue
}
if _, ok := seen[strings.ToLower(c.Email)]; !ok {
missing[c.Email] = struct{}{}
}
}
}
if len(missing) > 0 {
emails := make([]string, 0, len(missing))
for e := range missing {
emails = append(emails, e)
}
var extra []xray.ClientTraffic
if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", emails).Find(&extra).Error; err != nil {
logger.Warning("enrichClientStats:", err)
} else {
byEmail := make(map[string]xray.ClientTraffic, len(extra))
for _, st := range extra {
byEmail[strings.ToLower(st.Email)] = st
}
for i, inbound := range inbounds {
for _, c := range clientsByInbound[i] {
if c.Email == "" {
continue
}
key := strings.ToLower(c.Email)
if _, ok := seenByInbound[i][key]; ok {
continue
}
if st, ok := byEmail[key]; ok {
inbound.ClientStats = append(inbound.ClientStats, st)
seenByInbound[i][key] = struct{}{}
}
}
}
}
}
for i, inbound := range inbounds {
clients := clientsByInbound[i]
if len(clients) == 0 || len(inbound.ClientStats) == 0 {
continue
}
cMap := make(map[string]model.Client, len(clients))
for _, c := range clients {
cMap[strings.ToLower(c.Email)] = c
}
for j := range inbound.ClientStats {
email := strings.ToLower(inbound.ClientStats[j].Email)
if c, ok := cMap[email]; ok {
inbound.ClientStats[j].UUID = c.ID
inbound.ClientStats[j].SubId = c.SubID
}
}
}
}
// GetInbounds retrieves all inbounds for a specific user with client stats.
func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) {
db := database.GetDB()
var inbounds []*model.Inbound
@ -42,30 +121,11 @@ func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) {
if err != nil && err != gorm.ErrRecordNotFound {
return nil, err
}
// Enrich client stats with UUID/SubId from inbound settings
for _, inbound := range inbounds {
clients, _ := s.GetClients(inbound)
if len(clients) == 0 || len(inbound.ClientStats) == 0 {
continue
}
// Build a map email -> client
cMap := make(map[string]model.Client, len(clients))
for _, c := range clients {
cMap[strings.ToLower(c.Email)] = c
}
for i := range inbound.ClientStats {
email := strings.ToLower(inbound.ClientStats[i].Email)
if c, ok := cMap[email]; ok {
inbound.ClientStats[i].UUID = c.ID
inbound.ClientStats[i].SubId = c.SubID
}
}
}
s.enrichClientStats(db, inbounds)
return inbounds, nil
}
// GetAllInbounds retrieves all inbounds from the database.
// Returns a slice of all inbound models with their associated client statistics.
// GetAllInbounds retrieves all inbounds with client stats.
func (s *InboundService) GetAllInbounds() ([]*model.Inbound, error) {
db := database.GetDB()
var inbounds []*model.Inbound
@ -73,24 +133,7 @@ func (s *InboundService) GetAllInbounds() ([]*model.Inbound, error) {
if err != nil && err != gorm.ErrRecordNotFound {
return nil, err
}
// Enrich client stats with UUID/SubId from inbound settings
for _, inbound := range inbounds {
clients, _ := s.GetClients(inbound)
if len(clients) == 0 || len(inbound.ClientStats) == 0 {
continue
}
cMap := make(map[string]model.Client, len(clients))
for _, c := range clients {
cMap[strings.ToLower(c.Email)] = c
}
for i := range inbound.ClientStats {
email := strings.ToLower(inbound.ClientStats[i].Email)
if c, ok := cMap[email]; ok {
inbound.ClientStats[i].UUID = c.ID
inbound.ClientStats[i].SubId = c.SubID
}
}
}
s.enrichClientStats(db, inbounds)
return inbounds, nil
}
@ -122,7 +165,7 @@ func (s *InboundService) getAllEmails() ([]string, error) {
db := database.GetDB()
var emails []string
err := db.Raw(`
SELECT JSON_EXTRACT(client.value, '$.email')
SELECT DISTINCT JSON_EXTRACT(client.value, '$.email')
FROM inbounds,
JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
`).Scan(&emails).Error
@ -132,55 +175,97 @@ func (s *InboundService) getAllEmails() ([]string, error) {
return emails, nil
}
func (s *InboundService) contains(slice []string, str string) bool {
lowerStr := strings.ToLower(str)
for _, s := range slice {
if strings.ToLower(s) == lowerStr {
return true
}
// getAllEmailSubIDs returns email→subId. An email seen with two different
// non-empty subIds is locked (mapped to "") so neither identity can claim it.
func (s *InboundService) getAllEmailSubIDs() (map[string]string, error) {
db := database.GetDB()
var rows []struct {
Email string
SubID string
}
return false
err := db.Raw(`
SELECT JSON_EXTRACT(client.value, '$.email') AS email,
JSON_EXTRACT(client.value, '$.subId') AS sub_id
FROM inbounds,
JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
`).Scan(&rows).Error
if err != nil {
return nil, err
}
result := make(map[string]string, len(rows))
for _, r := range rows {
email := strings.ToLower(strings.Trim(r.Email, "\""))
if email == "" {
continue
}
subID := strings.Trim(r.SubID, "\"")
if existing, ok := result[email]; ok {
if existing != subID {
result[email] = ""
}
continue
}
result[email] = subID
}
return result, nil
}
func lowerAll(in []string) []string {
out := make([]string, len(in))
for i, s := range in {
out[i] = strings.ToLower(s)
}
return out
}
// emailUsedByOtherInbounds reports whether email lives in any inbound other
// than exceptInboundId. Empty email returns false.
func (s *InboundService) emailUsedByOtherInbounds(email string, exceptInboundId int) (bool, error) {
if email == "" {
return false, nil
}
db := database.GetDB()
var count int64
err := db.Raw(`
SELECT COUNT(*)
FROM inbounds,
JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
WHERE inbounds.id != ?
AND LOWER(JSON_EXTRACT(client.value, '$.email')) = LOWER(?)
`, exceptInboundId, email).Scan(&count).Error
if err != nil {
return false, err
}
return count > 0, nil
}
// checkEmailsExistForClients validates a batch of incoming clients. An email
// collides only when the existing holder has a different (or empty) subId —
// matching non-empty subIds let multiple inbounds share one identity.
func (s *InboundService) checkEmailsExistForClients(clients []model.Client) (string, error) {
allEmails, err := s.getAllEmails()
emailSubIDs, err := s.getAllEmailSubIDs()
if err != nil {
return "", err
}
var emails []string
seen := make(map[string]string, len(clients))
for _, client := range clients {
if client.Email != "" {
if s.contains(emails, client.Email) {
return client.Email, nil
}
if s.contains(allEmails, client.Email) {
return client.Email, nil
}
emails = append(emails, client.Email)
if client.Email == "" {
continue
}
}
return "", nil
}
func (s *InboundService) checkEmailExistForInbound(inbound *model.Inbound) (string, error) {
clients, err := s.GetClients(inbound)
if err != nil {
return "", err
}
allEmails, err := s.getAllEmails()
if err != nil {
return "", err
}
var emails []string
for _, client := range clients {
if client.Email != "" {
if s.contains(emails, client.Email) {
key := strings.ToLower(client.Email)
// Within the same payload, the same email must carry the same subId;
// otherwise we would silently merge two distinct identities.
if prev, ok := seen[key]; ok {
if prev != client.SubID || client.SubID == "" {
return client.Email, nil
}
if s.contains(allEmails, client.Email) {
continue
}
seen[key] = client.SubID
if existingSub, ok := emailSubIDs[key]; ok {
if client.SubID == "" || existingSub == "" || existingSub != client.SubID {
return client.Email, nil
}
emails = append(emails, client.Email)
}
}
return "", nil
@ -209,7 +294,11 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
return inbound, false, err
}
existEmail, err := s.checkEmailExistForInbound(inbound)
clients, err := s.GetClients(inbound)
if err != nil {
return inbound, false, err
}
existEmail, err := s.checkEmailsExistForClients(clients)
if err != nil {
return inbound, false, err
}
@ -217,11 +306,6 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
return inbound, false, common.NewError("Duplicate email:", existEmail)
}
clients, err := s.GetClients(inbound)
if err != nil {
return inbound, false, err
}
// Ensure created_at and updated_at on clients in settings
if len(clients) > 0 {
var settings map[string]any
@ -675,7 +759,8 @@ func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inb
newEmails[newClients[i].Email] = struct{}{}
}
// Removed clients — drop their stats rows.
// Drop stats rows for removed emails — but not when a sibling inbound
// still references the email, since the row is the shared accumulator.
for i := range oldClients {
email := oldClients[i].Email
if email == "" {
@ -684,6 +769,13 @@ func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inb
if _, kept := newEmails[email]; kept {
continue
}
stillUsed, err := s.emailUsedByOtherInbounds(email, oldInbound.Id)
if err != nil {
return err
}
if stillUsed {
continue
}
if err := s.DelClientStat(tx, email); err != nil {
return err
}
@ -1080,11 +1172,20 @@ func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool,
db := database.GetDB()
err = s.DelClientIPs(db, email)
// Keep the client_traffics row and IPs alive when another inbound still
// references this email — siblings depend on the shared accounting state.
emailShared, err := s.emailUsedByOtherInbounds(email, inboundId)
if err != nil {
logger.Error("Error in delete client IPs")
return false, err
}
if !emailShared {
err = s.DelClientIPs(db, email)
if err != nil {
logger.Error("Error in delete client IPs")
return false, err
}
}
needRestart := false
if len(email) > 0 {
@ -1094,10 +1195,12 @@ func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool,
logger.Error("Get stats error")
return false, err
}
err = s.DelClientStat(db, email)
if err != nil {
logger.Error("Delete stats Data Error")
return false, err
if !emailShared {
err = s.DelClientStat(db, email)
if err != nil {
logger.Error("Delete stats Data Error")
return false, err
}
}
if needApiDel && notDepleted {
s.xrayApi.Init(p.GetAPIPort())
@ -1254,25 +1357,60 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
if len(clients[0].Email) > 0 {
if len(oldEmail) > 0 {
err = s.UpdateClientStat(tx, oldEmail, &clients[0])
if err != nil {
return false, err
// Repointing onto an email that already has a row would collide on
// the unique constraint, so retire the donor and let the surviving
// row carry the merged identity.
emailUnchanged := strings.EqualFold(oldEmail, clients[0].Email)
targetExists := int64(0)
if !emailUnchanged {
if err = tx.Model(xray.ClientTraffic{}).Where("email = ?", clients[0].Email).Count(&targetExists).Error; err != nil {
return false, err
}
}
err = s.UpdateClientIPs(tx, oldEmail, clients[0].Email)
if err != nil {
return false, err
if emailUnchanged || targetExists == 0 {
err = s.UpdateClientStat(tx, oldEmail, &clients[0])
if err != nil {
return false, err
}
err = s.UpdateClientIPs(tx, oldEmail, clients[0].Email)
if err != nil {
return false, err
}
} else {
stillUsed, sErr := s.emailUsedByOtherInbounds(oldEmail, data.Id)
if sErr != nil {
return false, sErr
}
if !stillUsed {
if err = s.DelClientStat(tx, oldEmail); err != nil {
return false, err
}
if err = s.DelClientIPs(tx, oldEmail); err != nil {
return false, err
}
}
// Refresh the surviving row with the new client's limits/expiry.
if err = s.UpdateClientStat(tx, clients[0].Email, &clients[0]); err != nil {
return false, err
}
}
} else {
s.AddClientStat(tx, data.Id, &clients[0])
}
} else {
err = s.DelClientStat(tx, oldEmail)
stillUsed, err := s.emailUsedByOtherInbounds(oldEmail, data.Id)
if err != nil {
return false, err
}
err = s.DelClientIPs(tx, oldEmail)
if err != nil {
return false, err
if !stillUsed {
err = s.DelClientStat(tx, oldEmail)
if err != nil {
return false, err
}
err = s.DelClientIPs(tx, oldEmail)
if err != nil {
return false, err
}
}
}
needRestart := false
@ -1655,34 +1793,104 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
now := time.Now().Unix() * 1000
needRestart := false
var clientsToDisable []struct {
var depletedRows []xray.ClientTraffic
err := tx.Model(xray.ClientTraffic{}).
Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
Find(&depletedRows).Error
if err != nil {
return false, 0, err
}
if len(depletedRows) == 0 {
return false, 0, nil
}
rowByEmail := make(map[string]*xray.ClientTraffic, len(depletedRows))
depletedEmails := make([]string, 0, len(depletedRows))
for i := range depletedRows {
if depletedRows[i].Email == "" {
continue
}
rowByEmail[strings.ToLower(depletedRows[i].Email)] = &depletedRows[i]
depletedEmails = append(depletedEmails, depletedRows[i].Email)
}
// Resolve inbound membership only for the depleted emails — pushing the
// filter into SQLite avoids dragging every panel client through Go for
// the common case where most clients are healthy.
var memberships []struct {
InboundId int
Tag string
Email string
SubID string `gorm:"column:sub_id"`
}
if len(depletedEmails) > 0 {
err = tx.Raw(`
SELECT inbounds.id AS inbound_id,
inbounds.tag AS tag,
JSON_EXTRACT(client.value, '$.email') AS email,
JSON_EXTRACT(client.value, '$.subId') AS sub_id
FROM inbounds,
JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
WHERE LOWER(JSON_EXTRACT(client.value, '$.email')) IN ?
`, lowerAll(depletedEmails)).Scan(&memberships).Error
if err != nil {
return false, 0, err
}
}
// Discover the row holder's subId per email. Only siblings sharing it
// get cascaded; legacy data where two identities reuse the same email
// stays isolated to the row owner.
holderSub := make(map[string]string, len(rowByEmail))
for _, m := range memberships {
email := strings.ToLower(strings.Trim(m.Email, "\""))
row, ok := rowByEmail[email]
if !ok || m.InboundId != row.InboundId {
continue
}
holderSub[email] = strings.Trim(m.SubID, "\"")
}
type target struct {
InboundId int
Tag string
Email string
}
err := tx.Table("inbounds").
Select("inbounds.id as inbound_id, inbounds.tag, client_traffics.email").
Joins("JOIN client_traffics ON inbounds.id = client_traffics.inbound_id").
Where("((client_traffics.total > 0 AND client_traffics.up + client_traffics.down >= client_traffics.total) OR (client_traffics.expiry_time > 0 AND client_traffics.expiry_time <= ?)) AND client_traffics.enable = ?", now, true).
Scan(&clientsToDisable).Error
if err != nil {
return false, 0, err
var targets []target
for _, m := range memberships {
email := strings.ToLower(strings.Trim(m.Email, "\""))
row, ok := rowByEmail[email]
if !ok {
continue
}
expected, hasSub := holderSub[email]
mSub := strings.Trim(m.SubID, "\"")
switch {
case !hasSub || expected == "":
if m.InboundId != row.InboundId {
continue
}
case mSub != expected:
continue
}
targets = append(targets, target{
InboundId: m.InboundId,
Tag: m.Tag,
Email: strings.Trim(m.Email, "\""),
})
}
if p != nil {
if p != nil && len(targets) > 0 {
s.xrayApi.Init(p.GetAPIPort())
for _, client := range clientsToDisable {
err1 := s.xrayApi.RemoveUser(client.Tag, client.Email)
for _, t := range targets {
err1 := s.xrayApi.RemoveUser(t.Tag, t.Email)
if err1 == nil {
logger.Debug("Client disabled by api:", client.Email)
logger.Debug("Client disabled by api:", t.Email)
} else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.Email)) {
logger.Debug("User is already disabled. Nothing to do more...")
} else {
if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", client.Email)) {
logger.Debug("User is already disabled. Nothing to do more...")
} else {
logger.Debug("Error in disabling client by api:", err1)
needRestart = true
}
logger.Debug("Error in disabling client by api:", err1)
needRestart = true
}
}
s.xrayApi.Close()
@ -1697,58 +1905,71 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
return needRestart, count, err
}
// Also set enable=false in inbounds.settings JSON so clients are visibly disabled
if len(clientsToDisable) > 0 {
inboundEmailMap := make(map[int]map[string]struct{})
for _, c := range clientsToDisable {
if inboundEmailMap[c.InboundId] == nil {
inboundEmailMap[c.InboundId] = make(map[string]struct{})
}
inboundEmailMap[c.InboundId][c.Email] = struct{}{}
if len(targets) == 0 {
return needRestart, count, nil
}
// Mirror enable=false + the row's authoritative quota/expiry into every
// (inbound, email) we just removed via the API.
inboundEmailMap := make(map[int]map[string]struct{})
for _, t := range targets {
if inboundEmailMap[t.InboundId] == nil {
inboundEmailMap[t.InboundId] = make(map[string]struct{})
}
inboundIds := make([]int, 0, len(inboundEmailMap))
for id := range inboundEmailMap {
inboundIds = append(inboundIds, id)
inboundEmailMap[t.InboundId][t.Email] = struct{}{}
}
inboundIds := make([]int, 0, len(inboundEmailMap))
for id := range inboundEmailMap {
inboundIds = append(inboundIds, id)
}
var inbounds []*model.Inbound
if err = tx.Model(model.Inbound{}).Where("id IN ?", inboundIds).Find(&inbounds).Error; err != nil {
logger.Warning("disableInvalidClients fetch inbounds:", err)
return needRestart, count, nil
}
dirty := make([]*model.Inbound, 0, len(inbounds))
for _, inbound := range inbounds {
settings := map[string]any{}
if jsonErr := json.Unmarshal([]byte(inbound.Settings), &settings); jsonErr != nil {
continue
}
var inbounds []*model.Inbound
if err = tx.Model(model.Inbound{}).Where("id IN ?", inboundIds).Find(&inbounds).Error; err != nil {
logger.Warning("disableInvalidClients fetch inbounds:", err)
return needRestart, count, nil
clientsRaw, ok := settings["clients"].([]any)
if !ok {
continue
}
for _, inbound := range inbounds {
settings := map[string]any{}
if jsonErr := json.Unmarshal([]byte(inbound.Settings), &settings); jsonErr != nil {
continue
}
clients, ok := settings["clients"].([]any)
emailSet := inboundEmailMap[inbound.Id]
changed := false
for i := range clientsRaw {
c, ok := clientsRaw[i].(map[string]any)
if !ok {
continue
}
emailSet := inboundEmailMap[inbound.Id]
changed := false
for i := range clients {
c, ok := clients[i].(map[string]any)
if !ok {
continue
}
email, _ := c["email"].(string)
if _, shouldDisable := emailSet[email]; shouldDisable {
c["enable"] = false
c["updated_at"] = time.Now().Unix() * 1000
clients[i] = c
changed = true
}
email, _ := c["email"].(string)
if _, shouldDisable := emailSet[email]; !shouldDisable {
continue
}
if changed {
settings["clients"] = clients
modifiedSettings, jsonErr := json.MarshalIndent(settings, "", " ")
if jsonErr != nil {
continue
}
inbound.Settings = string(modifiedSettings)
c["enable"] = false
if row, ok := rowByEmail[strings.ToLower(email)]; ok {
c["totalGB"] = row.Total
c["expiryTime"] = row.ExpiryTime
}
c["updated_at"] = now
clientsRaw[i] = c
changed = true
}
if err = tx.Save(inbounds).Error; err != nil {
if !changed {
continue
}
settings["clients"] = clientsRaw
modifiedSettings, jsonErr := json.MarshalIndent(settings, "", " ")
if jsonErr != nil {
continue
}
inbound.Settings = string(modifiedSettings)
dirty = append(dirty, inbound)
}
if len(dirty) > 0 {
if err = tx.Save(dirty).Error; err != nil {
logger.Warning("disableInvalidClients update inbound settings:", err)
}
}
@ -1824,19 +2045,20 @@ func (s *InboundService) MigrationRemoveOrphanedTraffics() {
`)
}
// AddClientStat inserts a per-client accounting row, no-op on email
// conflict. Xray reports traffic per email, so the surviving row acts as
// the shared accumulator for inbounds that re-use the same identity.
func (s *InboundService) AddClientStat(tx *gorm.DB, inboundId int, client *model.Client) error {
clientTraffic := xray.ClientTraffic{}
clientTraffic.InboundId = inboundId
clientTraffic.Email = client.Email
clientTraffic.Total = client.TotalGB
clientTraffic.ExpiryTime = client.ExpiryTime
clientTraffic.Enable = client.Enable
clientTraffic.Up = 0
clientTraffic.Down = 0
clientTraffic.Reset = client.Reset
result := tx.Create(&clientTraffic)
err := result.Error
return err
clientTraffic := xray.ClientTraffic{
InboundId: inboundId,
Email: client.Email,
Total: client.TotalGB,
ExpiryTime: client.ExpiryTime,
Enable: client.Enable,
Reset: client.Reset,
}
return tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
Create(&clientTraffic).Error
}
func (s *InboundService) UpdateClientStat(tx *gorm.DB, email string, client *model.Client) error {
@ -2415,77 +2637,117 @@ func (s *InboundService) DelDepletedClients(id int) (err error) {
}
}()
whereText := "reset = 0 and inbound_id "
if id < 0 {
whereText += "> ?"
} else {
whereText += "= ?"
}
// Only consider truly depleted clients: expired OR traffic exhausted
// Collect depleted emails globally — a shared-email row owned by one
// inbound depletes every sibling that lists the email.
now := time.Now().Unix() * 1000
depletedClients := []xray.ClientTraffic{}
depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
var depletedRows []xray.ClientTraffic
err = db.Model(xray.ClientTraffic{}).
Where(whereText+" and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))", id, now).
Select("inbound_id, GROUP_CONCAT(email) as email").
Group("inbound_id").
Find(&depletedClients).Error
Where(depletedClause, now).
Find(&depletedRows).Error
if err != nil {
return err
}
if len(depletedRows) == 0 {
return nil
}
for _, depletedClient := range depletedClients {
emails := strings.Split(depletedClient.Email, ",")
oldInbound, err := s.GetInbound(depletedClient.InboundId)
if err != nil {
depletedEmails := make(map[string]struct{}, len(depletedRows))
for _, r := range depletedRows {
if r.Email == "" {
continue
}
depletedEmails[strings.ToLower(r.Email)] = struct{}{}
}
if len(depletedEmails) == 0 {
return nil
}
var inbounds []*model.Inbound
inboundQuery := db.Model(model.Inbound{})
if id >= 0 {
inboundQuery = inboundQuery.Where("id = ?", id)
}
if err = inboundQuery.Find(&inbounds).Error; err != nil {
return err
}
for _, inbound := range inbounds {
var settings map[string]any
if err = json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
return err
}
var oldSettings map[string]any
err = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings)
if err != nil {
return err
rawClients, ok := settings["clients"].([]any)
if !ok {
continue
}
oldClients := oldSettings["clients"].([]any)
var newClients []any
for _, client := range oldClients {
deplete := false
c := client.(map[string]any)
for _, email := range emails {
if email == c["email"].(string) {
deplete = true
break
}
}
if !deplete {
newClients := make([]any, 0, len(rawClients))
removed := 0
for _, client := range rawClients {
c, ok := client.(map[string]any)
if !ok {
newClients = append(newClients, client)
continue
}
email, _ := c["email"].(string)
if _, isDepleted := depletedEmails[strings.ToLower(email)]; isDepleted {
removed++
continue
}
newClients = append(newClients, client)
}
if len(newClients) > 0 {
oldSettings["clients"] = newClients
newSettings, err := json.MarshalIndent(oldSettings, "", " ")
if err != nil {
return err
}
oldInbound.Settings = string(newSettings)
err = tx.Save(oldInbound).Error
if err != nil {
return err
}
} else {
// Delete inbound if no client remains
s.DelInbound(depletedClient.InboundId)
if removed == 0 {
continue
}
if len(newClients) == 0 {
s.DelInbound(inbound.Id)
continue
}
settings["clients"] = newClients
ns, mErr := json.MarshalIndent(settings, "", " ")
if mErr != nil {
return mErr
}
inbound.Settings = string(ns)
if err = tx.Save(inbound).Error; err != nil {
return err
}
}
// Delete stats only for truly depleted clients
err = tx.Where(whereText+" and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))", id, now).Delete(xray.ClientTraffic{}).Error
if err != nil {
// Drop now-orphaned rows. With id >= 0, a row is safe to drop only when
// no out-of-scope inbound still references the email.
if id < 0 {
err = tx.Where(depletedClause, now).Delete(xray.ClientTraffic{}).Error
return err
}
emails := make([]string, 0, len(depletedEmails))
for e := range depletedEmails {
emails = append(emails, e)
}
var stillReferenced []string
if err = tx.Raw(`
SELECT DISTINCT LOWER(JSON_EXTRACT(client.value, '$.email'))
FROM inbounds,
JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
WHERE LOWER(JSON_EXTRACT(client.value, '$.email')) IN ?
`, emails).Scan(&stillReferenced).Error; err != nil {
return err
}
stillSet := make(map[string]struct{}, len(stillReferenced))
for _, e := range stillReferenced {
stillSet[e] = struct{}{}
}
toDelete := make([]string, 0, len(emails))
for _, e := range emails {
if _, kept := stillSet[e]; !kept {
toDelete = append(toDelete, e)
}
}
if len(toDelete) > 0 {
if err = tx.Where("LOWER(email) IN ?", toDelete).Delete(xray.ClientTraffic{}).Error; err != nil {
return err
}
}
return nil
}
@ -3142,16 +3404,24 @@ func (s *InboundService) DelInboundClientByEmail(inboundId int, email string) (b
db := database.GetDB()
// remove IP bindings
if err := s.DelClientIPs(db, email); err != nil {
logger.Error("Error in delete client IPs")
// Drop the row and IPs only when this was the last inbound referencing
// the email — siblings still need the shared accounting state.
emailShared, err := s.emailUsedByOtherInbounds(email, inboundId)
if err != nil {
return false, err
}
if !emailShared {
if err := s.DelClientIPs(db, email); err != nil {
logger.Error("Error in delete client IPs")
return false, err
}
}
needRestart := false
// remove stats too
if len(email) > 0 {
if len(email) > 0 && !emailShared {
traffic, err := s.GetClientTrafficByEmail(email)
if err != nil {
return false, err