feat(limit): add inbound device-limit enforcement with safe unban flow

- add inbound deviceLimit model/frontend fields and translations

- add CheckDeviceLimitJob with observation window and xray API ban/unban

- prevent job re-entrancy and restore users when limit is disabled

- reduce lock scope via snapshots to avoid blocking log parsing
This commit is contained in:
Sora39831 2026-04-06 10:46:48 +08:00
parent 4a1dac89b6
commit 83b61d9da4
10 changed files with 386 additions and 2 deletions

View file

@ -42,6 +42,7 @@ type Inbound struct {
Remark string `json:"remark" form:"remark"` // Human-readable remark
Enable bool `json:"enable" form:"enable" gorm:"index:idx_enable_traffic_reset,priority:1"` // Whether the inbound is enabled
ExpiryTime int64 `json:"expiryTime" form:"expiryTime"` // Expiration timestamp
DeviceLimit int `json:"deviceLimit" form:"deviceLimit" gorm:"column:device_limit;default:0"` // Active device/IP limit by inbound
TrafficReset string `json:"trafficReset" form:"trafficReset" gorm:"default:never;index:idx_enable_traffic_reset,priority:2"` // Traffic reset schedule
LastTrafficResetTime int64 `json:"lastTrafficResetTime" form:"lastTrafficResetTime" gorm:"default:0"` // Last traffic reset timestamp
ClientStats []xray.ClientTraffic `gorm:"foreignKey:InboundId;references:Id" json:"clientStats" form:"clientStats"` // Client traffic statistics

View file

@ -10,6 +10,7 @@ class DBInbound {
this.remark = "";
this.enable = true;
this.expiryTime = 0;
this.deviceLimit = 0;
this.trafficReset = "never";
this.lastTrafficResetTime = 0;
@ -148,4 +149,4 @@ class DBInbound {
const inbound = this.toInbound();
return inbound.genInboundLinks(this.remark, remarkModel);
}
}
}

View file

@ -49,6 +49,20 @@
:min="0"></a-input-number>
</a-form-item>
<a-form-item>
<template slot="label">
<a-tooltip>
<template slot="title">
<span>{{ i18n "pages.inbounds.deviceLimitDesc" }}</span>
</template>
{{ i18n "pages.inbounds.deviceLimit" }}
<a-icon type="question-circle"></a-icon>
</a-tooltip>
</template>
<a-input-number v-model.number="dbInbound.deviceLimit"
:min="0"></a-input-number>
</a-form-item>
<a-form-item>
<template slot="label">
<a-tooltip>
@ -170,4 +184,4 @@
</a-collapse-panel>
</a-collapse>
{{end}}
{{end}}

View file

@ -645,6 +645,11 @@
align: 'left',
width: 50,
scopedSlots: { customRender: 'clients' },
}, {
title: '{{ i18n "pages.inbounds.deviceLimit" }}',
align: 'center',
width: 45,
dataIndex: "deviceLimit",
}, {
title: '{{ i18n "pages.inbounds.traffic" }}',
align: 'center',
@ -1030,6 +1035,7 @@
remark: dbInbound.remark + " - Cloned",
enable: dbInbound.enable,
expiryTime: dbInbound.expiryTime,
deviceLimit: dbInbound.deviceLimit,
trafficReset: dbInbound.trafficReset,
lastTrafficResetTime: dbInbound.lastTrafficResetTime,
@ -1076,6 +1082,7 @@
remark: dbInbound.remark,
enable: dbInbound.enable,
expiryTime: dbInbound.expiryTime,
deviceLimit: dbInbound.deviceLimit,
trafficReset: dbInbound.trafficReset,
lastTrafficResetTime: dbInbound.lastTrafficResetTime,
@ -1101,6 +1108,7 @@
remark: dbInbound.remark,
enable: dbInbound.enable,
expiryTime: dbInbound.expiryTime,
deviceLimit: dbInbound.deviceLimit,
trafficReset: dbInbound.trafficReset,
lastTrafficResetTime: dbInbound.lastTrafficResetTime,

View file

@ -0,0 +1,345 @@
package job
import (
"bufio"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"os"
"regexp"
"sync"
"sync/atomic"
"time"
"github.com/mhsanaei/3x-ui/v2/database"
"github.com/mhsanaei/3x-ui/v2/database/model"
"github.com/mhsanaei/3x-ui/v2/logger"
"github.com/mhsanaei/3x-ui/v2/web/service"
"github.com/mhsanaei/3x-ui/v2/xray"
)
var (
activeClientIPs = make(map[string]map[string]time.Time)
activeClientsLock sync.RWMutex
clientStatus = make(map[string]bool)
clientStatusMu sync.RWMutex
)
type CheckDeviceLimitJob struct {
inboundService service.InboundService
xrayService *service.XrayService
xrayAPI xray.XrayAPI
lastPosition int64
violationStartTime map[string]time.Time
violationMu sync.Mutex
running atomic.Bool
}
type deviceInboundInfo struct {
Limit int
Tag string
Protocol model.Protocol
Enable bool
}
func NewCheckDeviceLimitJob(xrayService *service.XrayService) *CheckDeviceLimitJob {
return &CheckDeviceLimitJob{
xrayService: xrayService,
violationStartTime: make(map[string]time.Time),
}
}
func (j *CheckDeviceLimitJob) Run() {
// Avoid concurrent re-entrancy when previous run is still processing.
if !j.running.CompareAndSwap(false, true) {
return
}
defer j.running.Store(false)
if j.xrayService == nil || !j.xrayService.IsXrayRunning() {
return
}
j.cleanupExpiredIPs()
j.parseAccessLog()
j.checkAllClientsLimit()
}
func (j *CheckDeviceLimitJob) cleanupExpiredIPs() {
activeClientsLock.Lock()
defer activeClientsLock.Unlock()
now := time.Now()
const activeTTL = 3 * time.Minute
for email, ips := range activeClientIPs {
for ip, lastSeen := range ips {
if now.Sub(lastSeen) > activeTTL {
delete(activeClientIPs[email], ip)
}
}
if len(activeClientIPs[email]) == 0 {
delete(activeClientIPs, email)
}
}
}
func (j *CheckDeviceLimitJob) parseAccessLog() {
logPath, err := xray.GetAccessLogPath()
if err != nil || logPath == "none" || logPath == "" {
return
}
file, err := os.Open(logPath)
if err != nil {
return
}
defer file.Close()
if _, err = file.Seek(j.lastPosition, 0); err != nil {
return
}
scanner := bufio.NewScanner(file)
emailRegex := regexp.MustCompile(`email: ([^ ]+)`)
ipRegex := regexp.MustCompile(`from (?:tcp:|udp:)?\[?([0-9a-fA-F\.:]+)\]?:\d+ accepted`)
activeClientsLock.Lock()
defer activeClientsLock.Unlock()
now := time.Now()
for scanner.Scan() {
line := scanner.Text()
emailMatch := emailRegex.FindStringSubmatch(line)
ipMatch := ipRegex.FindStringSubmatch(line)
if len(emailMatch) <= 1 || len(ipMatch) <= 1 {
continue
}
email := emailMatch[1]
ip := ipMatch[1]
if ip == "127.0.0.1" || ip == "::1" {
continue
}
if _, ok := activeClientIPs[email]; !ok {
activeClientIPs[email] = make(map[string]time.Time)
}
activeClientIPs[email][ip] = now
}
currentPosition, err := file.Seek(0, io.SeekEnd)
if err != nil {
return
}
if currentPosition < j.lastPosition {
j.lastPosition = 0
return
}
j.lastPosition = currentPosition
}
func (j *CheckDeviceLimitJob) checkAllClientsLimit() {
db := database.GetDB()
var allInbounds []*model.Inbound
if err := db.Find(&allInbounds).Error; err != nil || len(allInbounds) == 0 {
return
}
apiPort := j.xrayService.GetAPIPort()
if apiPort == 0 {
return
}
if err := j.xrayAPI.Init(apiPort); err != nil {
return
}
defer j.xrayAPI.Close()
inboundInfoMap := make(map[int]deviceInboundInfo, len(allInbounds))
for _, inbound := range allInbounds {
inboundInfoMap[inbound.Id] = deviceInboundInfo{
Limit: inbound.DeviceLimit,
Tag: inbound.Tag,
Protocol: inbound.Protocol,
Enable: inbound.Enable,
}
}
activeCounts := make(map[string]int)
activeClientsLock.RLock()
for email, ips := range activeClientIPs {
activeCounts[email] = len(ips)
}
activeClientsLock.RUnlock()
bannedSnapshot := make(map[string]bool)
clientStatusMu.RLock()
for email, banned := range clientStatus {
if banned {
bannedSnapshot[email] = true
}
}
clientStatusMu.RUnlock()
for email, activeIPCount := range activeCounts {
traffic, err := j.inboundService.GetClientTrafficByEmail(email)
if err != nil || traffic == nil {
continue
}
info, ok := inboundInfoMap[traffic.InboundId]
if !ok {
continue
}
isBanned := j.isClientBanned(email)
enforcementEnabled := info.Enable && info.Limit > 0
// If this client was previously banned but device-limit is now disabled
// (or inbound is disabled), immediately restore access.
if isBanned && !enforcementEnabled {
j.violationMu.Lock()
delete(j.violationStartTime, email)
j.violationMu.Unlock()
j.unbanUser(email, activeIPCount, info)
continue
}
if !enforcementEnabled {
continue
}
if activeIPCount > info.Limit && !isBanned {
j.violationMu.Lock()
startTime, exists := j.violationStartTime[email]
if !exists {
j.violationStartTime[email] = time.Now()
j.violationMu.Unlock()
continue
}
if time.Since(startTime) < 3*time.Minute {
j.violationMu.Unlock()
continue
}
delete(j.violationStartTime, email)
j.violationMu.Unlock()
j.banUser(email, activeIPCount, info)
}
if activeIPCount <= info.Limit {
j.violationMu.Lock()
delete(j.violationStartTime, email)
j.violationMu.Unlock()
if isBanned {
j.unbanUser(email, activeIPCount, info)
}
}
}
for email := range bannedSnapshot {
if _, online := activeCounts[email]; online {
continue
}
traffic, err := j.inboundService.GetClientTrafficByEmail(email)
if err != nil || traffic == nil {
continue
}
info, ok := inboundInfoMap[traffic.InboundId]
if !ok {
// Inbound no longer exists; clear stale ban marker.
j.clearClientBanned(email)
j.violationMu.Lock()
delete(j.violationStartTime, email)
j.violationMu.Unlock()
continue
}
// Offline users should be restored when enforcement is disabled too.
if !info.Enable || info.Limit <= 0 {
j.unbanUser(email, 0, info)
continue
}
j.unbanUser(email, 0, info)
}
}
func (j *CheckDeviceLimitJob) banUser(email string, activeIPCount int, info deviceInboundInfo) {
_, client, err := j.inboundService.GetClientByEmail(email)
if err != nil || client == nil {
return
}
logger.Infof("[DeviceLimit] banning email=%s limit=%d current=%d", email, info.Limit, activeIPCount)
_ = j.xrayAPI.RemoveUser(info.Tag, email)
time.Sleep(5 * time.Second)
tempClient := *client
if tempClient.ID != "" {
tempClient.ID = randomUUID()
}
if tempClient.Password != "" {
tempClient.Password = randomUUID()
}
clientMap := map[string]any{}
clientJSON, _ := json.Marshal(tempClient)
_ = json.Unmarshal(clientJSON, &clientMap)
if err = j.xrayAPI.AddUser(string(info.Protocol), info.Tag, clientMap); err != nil {
logger.Warningf("[DeviceLimit] failed to ban user %s: %v", email, err)
return
}
j.setClientBanned(email, true)
}
func (j *CheckDeviceLimitJob) unbanUser(email string, activeIPCount int, info deviceInboundInfo) {
_, client, err := j.inboundService.GetClientByEmail(email)
if err != nil || client == nil {
return
}
logger.Infof("[DeviceLimit] unbanning email=%s limit=%d current=%d", email, info.Limit, activeIPCount)
_ = j.xrayAPI.RemoveUser(info.Tag, email)
time.Sleep(5 * time.Second)
clientMap := map[string]any{}
clientJSON, _ := json.Marshal(client)
_ = json.Unmarshal(clientJSON, &clientMap)
if err = j.xrayAPI.AddUser(string(info.Protocol), info.Tag, clientMap); err != nil {
logger.Warningf("[DeviceLimit] failed to restore user %s: %v", email, err)
return
}
j.clearClientBanned(email)
}
func randomUUID() string {
uuid := make([]byte, 16)
if _, err := rand.Read(uuid); err != nil {
return fmt.Sprintf("fallback-%d", time.Now().UnixNano())
}
uuid[6] = (uuid[6] & 0x0f) | 0x40
uuid[8] = (uuid[8] & 0x3f) | 0x80
return hex.EncodeToString(uuid[0:4]) + "-" + hex.EncodeToString(uuid[4:6]) + "-" + hex.EncodeToString(uuid[6:8]) + "-" + hex.EncodeToString(uuid[8:10]) + "-" + hex.EncodeToString(uuid[10:16])
}
func (j *CheckDeviceLimitJob) isClientBanned(email string) bool {
clientStatusMu.RLock()
defer clientStatusMu.RUnlock()
return clientStatus[email]
}
func (j *CheckDeviceLimitJob) setClientBanned(email string, banned bool) {
clientStatusMu.Lock()
clientStatus[email] = banned
clientStatusMu.Unlock()
}
func (j *CheckDeviceLimitJob) clearClientBanned(email string) {
clientStatusMu.Lock()
delete(clientStatus, email)
clientStatusMu.Unlock()
}

View file

@ -489,6 +489,7 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
oldInbound.Remark = inbound.Remark
oldInbound.Enable = inbound.Enable
oldInbound.ExpiryTime = inbound.ExpiryTime
oldInbound.DeviceLimit = inbound.DeviceLimit
oldInbound.TrafficReset = inbound.TrafficReset
oldInbound.Listen = inbound.Listen
oldInbound.Port = inbound.Port

View file

@ -33,6 +33,14 @@ func (s *XrayService) IsXrayRunning() bool {
return p != nil && p.IsRunning()
}
// GetAPIPort returns the currently configured Xray API port.
func (s *XrayService) GetAPIPort() int {
if p == nil {
return 0
}
return p.GetAPIPort()
}
// GetXrayErr returns the error from the Xray process, if any.
func (s *XrayService) GetXrayErr() error {
if p == nil {

View file

@ -215,6 +215,8 @@
"monitorDesc" = "Leave blank to listen on all IPs"
"meansNoLimit" = "= Unlimited. (unit: GB)"
"totalFlow" = "Total Flow"
"deviceLimit" = "Device Limit"
"deviceLimitDesc" = "Max active devices/IPs allowed for this inbound. (0 = unlimited)"
"leaveBlankToNeverExpire" = "Leave blank to never expire"
"noRecommendKeepDefault" = "It is recommended to keep the default"
"certificatePath" = "File Path"

View file

@ -215,6 +215,8 @@
"monitorDesc" = "留空表示监听所有 IP"
"meansNoLimit" = "= 无限制单位GB)"
"totalFlow" = "总流量"
"deviceLimit" = "设备限制"
"deviceLimitDesc" = "限制该入站可同时在线的设备/IP 数量0 = 不限制)"
"leaveBlankToNeverExpire" = "留空表示永不过期"
"noRecommendKeepDefault" = "建议保留默认值"
"certificatePath" = "文件路径"

View file

@ -320,6 +320,8 @@ func (s *Server) startTask() {
// check client ips from log file every 10 sec
s.cron.AddJob("@every 10s", job.NewCheckClientIpJob())
// check active device limits every 10 sec
s.cron.AddJob("@every 10s", job.NewCheckDeviceLimitJob(&s.xrayService))
// check client ips from log file every day
s.cron.AddJob("@daily", job.NewClearLogsJob())