From 83b61d9da4c4c7c82f5e6de672f89f501cf4bdac Mon Sep 17 00:00:00 2001
From: Sora39831 <540587985@qq.com>
Date: Mon, 6 Apr 2026 10:46:48 +0800
Subject: [PATCH] feat(limit): add inbound device-limit enforcement with safe
unban flow
- add inbound deviceLimit model/frontend fields and translations
- add CheckDeviceLimitJob with observation window and xray API ban/unban
- prevent job re-entrancy and restore users when limit is disabled
- reduce lock scope via snapshots to avoid blocking log parsing
---
database/model/model.go | 1 +
web/assets/js/model/dbinbound.js | 3 +-
web/html/form/inbound.html | 16 +-
web/html/inbounds.html | 8 +
web/job/check_device_limit_job.go | 345 +++++++++++++++++++++++++++
web/service/inbound.go | 1 +
web/service/xray.go | 8 +
web/translation/translate.en_US.toml | 2 +
web/translation/translate.zh_CN.toml | 2 +
web/web.go | 2 +
10 files changed, 386 insertions(+), 2 deletions(-)
create mode 100644 web/job/check_device_limit_job.go
diff --git a/database/model/model.go b/database/model/model.go
index 1865d1a9..98252f62 100644
--- a/database/model/model.go
+++ b/database/model/model.go
@@ -42,6 +42,7 @@ type Inbound struct {
Remark string `json:"remark" form:"remark"` // Human-readable remark
Enable bool `json:"enable" form:"enable" gorm:"index:idx_enable_traffic_reset,priority:1"` // Whether the inbound is enabled
ExpiryTime int64 `json:"expiryTime" form:"expiryTime"` // Expiration timestamp
+ DeviceLimit int `json:"deviceLimit" form:"deviceLimit" gorm:"column:device_limit;default:0"` // Active device/IP limit by inbound
TrafficReset string `json:"trafficReset" form:"trafficReset" gorm:"default:never;index:idx_enable_traffic_reset,priority:2"` // Traffic reset schedule
LastTrafficResetTime int64 `json:"lastTrafficResetTime" form:"lastTrafficResetTime" gorm:"default:0"` // Last traffic reset timestamp
ClientStats []xray.ClientTraffic `gorm:"foreignKey:InboundId;references:Id" json:"clientStats" form:"clientStats"` // Client traffic statistics
diff --git a/web/assets/js/model/dbinbound.js b/web/assets/js/model/dbinbound.js
index befc618e..ea20d855 100644
--- a/web/assets/js/model/dbinbound.js
+++ b/web/assets/js/model/dbinbound.js
@@ -10,6 +10,7 @@ class DBInbound {
this.remark = "";
this.enable = true;
this.expiryTime = 0;
+ this.deviceLimit = 0;
this.trafficReset = "never";
this.lastTrafficResetTime = 0;
@@ -148,4 +149,4 @@ class DBInbound {
const inbound = this.toInbound();
return inbound.genInboundLinks(this.remark, remarkModel);
}
-}
\ No newline at end of file
+}
diff --git a/web/html/form/inbound.html b/web/html/form/inbound.html
index 8b59dc28..dcce4e0f 100644
--- a/web/html/form/inbound.html
+++ b/web/html/form/inbound.html
@@ -49,6 +49,20 @@
:min="0">
+
+
+
+
+ {{ i18n "pages.inbounds.deviceLimitDesc" }}
+
+ {{ i18n "pages.inbounds.deviceLimit" }}
+
+
+
+
+
+
@@ -170,4 +184,4 @@
-{{end}}
\ No newline at end of file
+{{end}}
diff --git a/web/html/inbounds.html b/web/html/inbounds.html
index f0298a8a..fda4aca2 100644
--- a/web/html/inbounds.html
+++ b/web/html/inbounds.html
@@ -645,6 +645,11 @@
align: 'left',
width: 50,
scopedSlots: { customRender: 'clients' },
+ }, {
+ title: '{{ i18n "pages.inbounds.deviceLimit" }}',
+ align: 'center',
+ width: 45,
+ dataIndex: "deviceLimit",
}, {
title: '{{ i18n "pages.inbounds.traffic" }}',
align: 'center',
@@ -1030,6 +1035,7 @@
remark: dbInbound.remark + " - Cloned",
enable: dbInbound.enable,
expiryTime: dbInbound.expiryTime,
+ deviceLimit: dbInbound.deviceLimit,
trafficReset: dbInbound.trafficReset,
lastTrafficResetTime: dbInbound.lastTrafficResetTime,
@@ -1076,6 +1082,7 @@
remark: dbInbound.remark,
enable: dbInbound.enable,
expiryTime: dbInbound.expiryTime,
+ deviceLimit: dbInbound.deviceLimit,
trafficReset: dbInbound.trafficReset,
lastTrafficResetTime: dbInbound.lastTrafficResetTime,
@@ -1101,6 +1108,7 @@
remark: dbInbound.remark,
enable: dbInbound.enable,
expiryTime: dbInbound.expiryTime,
+ deviceLimit: dbInbound.deviceLimit,
trafficReset: dbInbound.trafficReset,
lastTrafficResetTime: dbInbound.lastTrafficResetTime,
diff --git a/web/job/check_device_limit_job.go b/web/job/check_device_limit_job.go
new file mode 100644
index 00000000..b3ae68a2
--- /dev/null
+++ b/web/job/check_device_limit_job.go
@@ -0,0 +1,345 @@
+package job
+
+import (
+ "bufio"
+ "crypto/rand"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "io"
+ "os"
+ "regexp"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/mhsanaei/3x-ui/v2/database"
+ "github.com/mhsanaei/3x-ui/v2/database/model"
+ "github.com/mhsanaei/3x-ui/v2/logger"
+ "github.com/mhsanaei/3x-ui/v2/web/service"
+ "github.com/mhsanaei/3x-ui/v2/xray"
+)
+
+var (
+ activeClientIPs = make(map[string]map[string]time.Time)
+ activeClientsLock sync.RWMutex
+
+ clientStatus = make(map[string]bool)
+ clientStatusMu sync.RWMutex
+)
+
+type CheckDeviceLimitJob struct {
+ inboundService service.InboundService
+ xrayService *service.XrayService
+ xrayAPI xray.XrayAPI
+ lastPosition int64
+
+ violationStartTime map[string]time.Time
+ violationMu sync.Mutex
+ running atomic.Bool
+}
+
+type deviceInboundInfo struct {
+ Limit int
+ Tag string
+ Protocol model.Protocol
+ Enable bool
+}
+
+func NewCheckDeviceLimitJob(xrayService *service.XrayService) *CheckDeviceLimitJob {
+ return &CheckDeviceLimitJob{
+ xrayService: xrayService,
+ violationStartTime: make(map[string]time.Time),
+ }
+}
+
+func (j *CheckDeviceLimitJob) Run() {
+ // Avoid concurrent re-entrancy when previous run is still processing.
+ if !j.running.CompareAndSwap(false, true) {
+ return
+ }
+ defer j.running.Store(false)
+
+ if j.xrayService == nil || !j.xrayService.IsXrayRunning() {
+ return
+ }
+ j.cleanupExpiredIPs()
+ j.parseAccessLog()
+ j.checkAllClientsLimit()
+}
+
+func (j *CheckDeviceLimitJob) cleanupExpiredIPs() {
+ activeClientsLock.Lock()
+ defer activeClientsLock.Unlock()
+
+ now := time.Now()
+ const activeTTL = 3 * time.Minute
+ for email, ips := range activeClientIPs {
+ for ip, lastSeen := range ips {
+ if now.Sub(lastSeen) > activeTTL {
+ delete(activeClientIPs[email], ip)
+ }
+ }
+ if len(activeClientIPs[email]) == 0 {
+ delete(activeClientIPs, email)
+ }
+ }
+}
+
+func (j *CheckDeviceLimitJob) parseAccessLog() {
+ logPath, err := xray.GetAccessLogPath()
+ if err != nil || logPath == "none" || logPath == "" {
+ return
+ }
+
+ file, err := os.Open(logPath)
+ if err != nil {
+ return
+ }
+ defer file.Close()
+
+ if _, err = file.Seek(j.lastPosition, 0); err != nil {
+ return
+ }
+
+ scanner := bufio.NewScanner(file)
+ emailRegex := regexp.MustCompile(`email: ([^ ]+)`)
+ ipRegex := regexp.MustCompile(`from (?:tcp:|udp:)?\[?([0-9a-fA-F\.:]+)\]?:\d+ accepted`)
+
+ activeClientsLock.Lock()
+ defer activeClientsLock.Unlock()
+
+ now := time.Now()
+ for scanner.Scan() {
+ line := scanner.Text()
+ emailMatch := emailRegex.FindStringSubmatch(line)
+ ipMatch := ipRegex.FindStringSubmatch(line)
+ if len(emailMatch) <= 1 || len(ipMatch) <= 1 {
+ continue
+ }
+
+ email := emailMatch[1]
+ ip := ipMatch[1]
+ if ip == "127.0.0.1" || ip == "::1" {
+ continue
+ }
+
+ if _, ok := activeClientIPs[email]; !ok {
+ activeClientIPs[email] = make(map[string]time.Time)
+ }
+ activeClientIPs[email][ip] = now
+ }
+
+ currentPosition, err := file.Seek(0, io.SeekEnd)
+ if err != nil {
+ return
+ }
+ if currentPosition < j.lastPosition {
+ j.lastPosition = 0
+ return
+ }
+ j.lastPosition = currentPosition
+}
+
+func (j *CheckDeviceLimitJob) checkAllClientsLimit() {
+ db := database.GetDB()
+ var allInbounds []*model.Inbound
+ if err := db.Find(&allInbounds).Error; err != nil || len(allInbounds) == 0 {
+ return
+ }
+
+ apiPort := j.xrayService.GetAPIPort()
+ if apiPort == 0 {
+ return
+ }
+ if err := j.xrayAPI.Init(apiPort); err != nil {
+ return
+ }
+ defer j.xrayAPI.Close()
+
+ inboundInfoMap := make(map[int]deviceInboundInfo, len(allInbounds))
+ for _, inbound := range allInbounds {
+ inboundInfoMap[inbound.Id] = deviceInboundInfo{
+ Limit: inbound.DeviceLimit,
+ Tag: inbound.Tag,
+ Protocol: inbound.Protocol,
+ Enable: inbound.Enable,
+ }
+ }
+
+ activeCounts := make(map[string]int)
+ activeClientsLock.RLock()
+ for email, ips := range activeClientIPs {
+ activeCounts[email] = len(ips)
+ }
+ activeClientsLock.RUnlock()
+
+ bannedSnapshot := make(map[string]bool)
+ clientStatusMu.RLock()
+ for email, banned := range clientStatus {
+ if banned {
+ bannedSnapshot[email] = true
+ }
+ }
+ clientStatusMu.RUnlock()
+
+ for email, activeIPCount := range activeCounts {
+ traffic, err := j.inboundService.GetClientTrafficByEmail(email)
+ if err != nil || traffic == nil {
+ continue
+ }
+
+ info, ok := inboundInfoMap[traffic.InboundId]
+ if !ok {
+ continue
+ }
+
+ isBanned := j.isClientBanned(email)
+ enforcementEnabled := info.Enable && info.Limit > 0
+
+ // If this client was previously banned but device-limit is now disabled
+ // (or inbound is disabled), immediately restore access.
+ if isBanned && !enforcementEnabled {
+ j.violationMu.Lock()
+ delete(j.violationStartTime, email)
+ j.violationMu.Unlock()
+ j.unbanUser(email, activeIPCount, info)
+ continue
+ }
+
+ if !enforcementEnabled {
+ continue
+ }
+
+ if activeIPCount > info.Limit && !isBanned {
+ j.violationMu.Lock()
+ startTime, exists := j.violationStartTime[email]
+ if !exists {
+ j.violationStartTime[email] = time.Now()
+ j.violationMu.Unlock()
+ continue
+ }
+ if time.Since(startTime) < 3*time.Minute {
+ j.violationMu.Unlock()
+ continue
+ }
+ delete(j.violationStartTime, email)
+ j.violationMu.Unlock()
+
+ j.banUser(email, activeIPCount, info)
+ }
+
+ if activeIPCount <= info.Limit {
+ j.violationMu.Lock()
+ delete(j.violationStartTime, email)
+ j.violationMu.Unlock()
+
+ if isBanned {
+ j.unbanUser(email, activeIPCount, info)
+ }
+ }
+ }
+
+ for email := range bannedSnapshot {
+ if _, online := activeCounts[email]; online {
+ continue
+ }
+ traffic, err := j.inboundService.GetClientTrafficByEmail(email)
+ if err != nil || traffic == nil {
+ continue
+ }
+ info, ok := inboundInfoMap[traffic.InboundId]
+ if !ok {
+ // Inbound no longer exists; clear stale ban marker.
+ j.clearClientBanned(email)
+ j.violationMu.Lock()
+ delete(j.violationStartTime, email)
+ j.violationMu.Unlock()
+ continue
+ }
+ // Offline users should be restored when enforcement is disabled too.
+ if !info.Enable || info.Limit <= 0 {
+ j.unbanUser(email, 0, info)
+ continue
+ }
+ j.unbanUser(email, 0, info)
+ }
+}
+
+func (j *CheckDeviceLimitJob) banUser(email string, activeIPCount int, info deviceInboundInfo) {
+ _, client, err := j.inboundService.GetClientByEmail(email)
+ if err != nil || client == nil {
+ return
+ }
+
+ logger.Infof("[DeviceLimit] banning email=%s limit=%d current=%d", email, info.Limit, activeIPCount)
+ _ = j.xrayAPI.RemoveUser(info.Tag, email)
+ time.Sleep(5 * time.Second)
+
+ tempClient := *client
+ if tempClient.ID != "" {
+ tempClient.ID = randomUUID()
+ }
+ if tempClient.Password != "" {
+ tempClient.Password = randomUUID()
+ }
+
+ clientMap := map[string]any{}
+ clientJSON, _ := json.Marshal(tempClient)
+ _ = json.Unmarshal(clientJSON, &clientMap)
+
+ if err = j.xrayAPI.AddUser(string(info.Protocol), info.Tag, clientMap); err != nil {
+ logger.Warningf("[DeviceLimit] failed to ban user %s: %v", email, err)
+ return
+ }
+ j.setClientBanned(email, true)
+}
+
+func (j *CheckDeviceLimitJob) unbanUser(email string, activeIPCount int, info deviceInboundInfo) {
+ _, client, err := j.inboundService.GetClientByEmail(email)
+ if err != nil || client == nil {
+ return
+ }
+
+ logger.Infof("[DeviceLimit] unbanning email=%s limit=%d current=%d", email, info.Limit, activeIPCount)
+ _ = j.xrayAPI.RemoveUser(info.Tag, email)
+ time.Sleep(5 * time.Second)
+
+ clientMap := map[string]any{}
+ clientJSON, _ := json.Marshal(client)
+ _ = json.Unmarshal(clientJSON, &clientMap)
+
+ if err = j.xrayAPI.AddUser(string(info.Protocol), info.Tag, clientMap); err != nil {
+ logger.Warningf("[DeviceLimit] failed to restore user %s: %v", email, err)
+ return
+ }
+ j.clearClientBanned(email)
+}
+
+func randomUUID() string {
+ uuid := make([]byte, 16)
+ if _, err := rand.Read(uuid); err != nil {
+ return fmt.Sprintf("fallback-%d", time.Now().UnixNano())
+ }
+ uuid[6] = (uuid[6] & 0x0f) | 0x40
+ uuid[8] = (uuid[8] & 0x3f) | 0x80
+ return hex.EncodeToString(uuid[0:4]) + "-" + hex.EncodeToString(uuid[4:6]) + "-" + hex.EncodeToString(uuid[6:8]) + "-" + hex.EncodeToString(uuid[8:10]) + "-" + hex.EncodeToString(uuid[10:16])
+}
+
+func (j *CheckDeviceLimitJob) isClientBanned(email string) bool {
+ clientStatusMu.RLock()
+ defer clientStatusMu.RUnlock()
+ return clientStatus[email]
+}
+
+func (j *CheckDeviceLimitJob) setClientBanned(email string, banned bool) {
+ clientStatusMu.Lock()
+ clientStatus[email] = banned
+ clientStatusMu.Unlock()
+}
+
+func (j *CheckDeviceLimitJob) clearClientBanned(email string) {
+ clientStatusMu.Lock()
+ delete(clientStatus, email)
+ clientStatusMu.Unlock()
+}
diff --git a/web/service/inbound.go b/web/service/inbound.go
index 669827b4..e0daa47f 100644
--- a/web/service/inbound.go
+++ b/web/service/inbound.go
@@ -489,6 +489,7 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
oldInbound.Remark = inbound.Remark
oldInbound.Enable = inbound.Enable
oldInbound.ExpiryTime = inbound.ExpiryTime
+ oldInbound.DeviceLimit = inbound.DeviceLimit
oldInbound.TrafficReset = inbound.TrafficReset
oldInbound.Listen = inbound.Listen
oldInbound.Port = inbound.Port
diff --git a/web/service/xray.go b/web/service/xray.go
index 511ffdda..b9d4a74a 100644
--- a/web/service/xray.go
+++ b/web/service/xray.go
@@ -33,6 +33,14 @@ func (s *XrayService) IsXrayRunning() bool {
return p != nil && p.IsRunning()
}
+// GetAPIPort returns the currently configured Xray API port.
+func (s *XrayService) GetAPIPort() int {
+ if p == nil {
+ return 0
+ }
+ return p.GetAPIPort()
+}
+
// GetXrayErr returns the error from the Xray process, if any.
func (s *XrayService) GetXrayErr() error {
if p == nil {
diff --git a/web/translation/translate.en_US.toml b/web/translation/translate.en_US.toml
index 047ba4a7..4a21c4f6 100644
--- a/web/translation/translate.en_US.toml
+++ b/web/translation/translate.en_US.toml
@@ -215,6 +215,8 @@
"monitorDesc" = "Leave blank to listen on all IPs"
"meansNoLimit" = "= Unlimited. (unit: GB)"
"totalFlow" = "Total Flow"
+"deviceLimit" = "Device Limit"
+"deviceLimitDesc" = "Max active devices/IPs allowed for this inbound. (0 = unlimited)"
"leaveBlankToNeverExpire" = "Leave blank to never expire"
"noRecommendKeepDefault" = "It is recommended to keep the default"
"certificatePath" = "File Path"
diff --git a/web/translation/translate.zh_CN.toml b/web/translation/translate.zh_CN.toml
index 73d71b60..7efede28 100644
--- a/web/translation/translate.zh_CN.toml
+++ b/web/translation/translate.zh_CN.toml
@@ -215,6 +215,8 @@
"monitorDesc" = "留空表示监听所有 IP"
"meansNoLimit" = "= 无限制(单位:GB)"
"totalFlow" = "总流量"
+"deviceLimit" = "设备限制"
+"deviceLimitDesc" = "限制该入站可同时在线的设备/IP 数量(0 = 不限制)"
"leaveBlankToNeverExpire" = "留空表示永不过期"
"noRecommendKeepDefault" = "建议保留默认值"
"certificatePath" = "文件路径"
diff --git a/web/web.go b/web/web.go
index 60934048..44c5601c 100644
--- a/web/web.go
+++ b/web/web.go
@@ -320,6 +320,8 @@ func (s *Server) startTask() {
// check client ips from log file every 10 sec
s.cron.AddJob("@every 10s", job.NewCheckClientIpJob())
+ // check active device limits every 10 sec
+ s.cron.AddJob("@every 10s", job.NewCheckDeviceLimitJob(&s.xrayService))
// check client ips from log file every day
s.cron.AddJob("@daily", job.NewClearLogsJob())