3x-ui/web/job/check_client_ip_job.go

562 lines
16 KiB
Go
Raw Normal View History

2023-02-28 19:54:29 +00:00
package job
import (
"bufio"
2023-04-13 19:37:13 +00:00
"encoding/json"
"errors"
"io"
"log"
2023-04-13 19:37:13 +00:00
"os"
2023-09-01 09:53:50 +00:00
"os/exec"
2023-04-13 19:37:13 +00:00
"regexp"
2025-09-10 19:12:37 +00:00
"runtime"
"sort"
"time"
2025-09-19 08:05:43 +00:00
"github.com/mhsanaei/3x-ui/v2/database"
"github.com/mhsanaei/3x-ui/v2/database/model"
"github.com/mhsanaei/3x-ui/v2/logger"
"github.com/mhsanaei/3x-ui/v2/xray"
2023-02-28 19:54:29 +00:00
)
// IPWithTimestamp tracks an IP address with its last seen timestamp
type IPWithTimestamp struct {
IP string `json:"ip"`
Timestamp int64 `json:"timestamp"`
}
2025-09-20 07:35:50 +00:00
// CheckClientIpJob monitors client IP addresses from access logs and manages IP blocking based on configured limits.
type CheckClientIpJob struct {
lastClear int64
disAllowedIps []string
}
2023-04-13 19:37:13 +00:00
2023-02-28 19:54:29 +00:00
var job *CheckClientIpJob
const defaultXrayAPIPort = 62789
Fix IP Limit continuous ban loop from stale DB entries (#4077) After 60abeaa flipped the excess-IP selector to "oldest wins, newest loses" (to protect the original/current connections), the per-client IP table in `inbound_client_ips.ips` never evicted IPs that stopped connecting. Their stored timestamp stayed ancient, so on every subsequent run they counted as the "oldest protected" slot(s) and whichever IP was actually using the config now was classified as "new excess" and re-banned via fail2ban. This is exactly the #4077 scenario: two IPs connect once and get recorded, the ban lifts after the configured duration, the lone legitimate IP that reconnects gets banned again, and again, and again — a permanent 3xipl.log loop with no real abuser anywhere. Fix: when merging the persisted `old` list with the freshly observed `new` log lines, drop entries whose last-seen timestamp is older than `ipStaleAfterSeconds` (30 minutes). A client that's actually still active refreshes its timestamp any time xray emits a new `accepted` line for a fresh TCP, so the cutoff is far above even idle streaming sessions; a client that's genuinely gone falls out of the table in bounded time and frees its slot. Extracted the merge into `mergeClientIps` so it can be exercised by unit tests without spinning up the full DB-backed job. Tests cover: - stale old entry is dropped (the #4077 regression) - fresh old entries are still carried forward (access-log rotation is still backed by the persisted table) - newer timestamp wins when the same IP appears in both lists - a clock-skewed old `new` entry can't resurrect a stale IP - a zero cutoff never over-evicts Closes #4077
2026-04-22 13:53:32 +00:00
// ipStaleAfterSeconds controls how long a client IP kept in the
// per-client tracking table (model.InboundClientIps.Ips) is considered
// still "active" before it's evicted during the next scan.
//
// Without this eviction, an IP that connected once and then went away
// keeps sitting in the table with its old timestamp. Because the
// excess-IP selector sorts ascending ("oldest wins, newest loses") to
// protect the original/current connections, that stale entry keeps
// occupying a slot and the IP that is *actually* currently using the
// config gets classified as "new excess" and banned by fail2ban on
// every single run — producing the continuous ban loop from #4077.
//
// 30 minutes is chosen so an actively-streaming client (where xray
// emits a fresh `accepted` log line whenever it opens a new TCP) will
// always refresh its timestamp well within the window, but a client
// that has really stopped using the config will drop out of the table
// in a bounded time and free its slot.
const ipStaleAfterSeconds = int64(30 * 60)
2025-09-20 07:35:50 +00:00
// NewCheckClientIpJob creates a new client IP monitoring job instance.
2023-02-28 19:54:29 +00:00
func NewCheckClientIpJob() *CheckClientIpJob {
job = new(CheckClientIpJob)
2023-02-28 19:54:29 +00:00
return job
}
func (j *CheckClientIpJob) Run() {
if j.lastClear == 0 {
j.lastClear = time.Now().Unix()
}
shouldClearAccessLog := false
iplimitActive := j.hasLimitIp()
2024-10-28 19:13:42 +00:00
f2bInstalled := j.checkFail2BanInstalled()
isAccessLogAvailable := j.checkAccessLogAvailable(iplimitActive)
2025-09-11 09:05:06 +00:00
if isAccessLogAvailable {
2025-09-10 19:12:37 +00:00
if runtime.GOOS == "windows" {
2025-09-11 09:05:06 +00:00
if iplimitActive {
2025-09-10 19:12:37 +00:00
shouldClearAccessLog = j.processLogFile()
}
2024-10-28 19:13:42 +00:00
} else {
2025-09-11 09:05:06 +00:00
if iplimitActive {
if f2bInstalled {
shouldClearAccessLog = j.processLogFile()
} else {
if !f2bInstalled {
logger.Warning("[LimitIP] Fail2Ban is not installed, Please install Fail2Ban from the x-ui bash menu.")
}
2025-09-10 19:12:37 +00:00
}
2024-10-28 19:13:42 +00:00
}
}
}
if shouldClearAccessLog || (isAccessLogAvailable && time.Now().Unix()-j.lastClear > 3600) {
j.clearAccessLog()
}
}
func (j *CheckClientIpJob) clearAccessLog() {
logAccessP, err := os.OpenFile(xray.GetAccessPersistentLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
j.checkError(err)
2025-08-17 11:37:49 +00:00
defer logAccessP.Close()
accessLogPath, err := xray.GetAccessLogPath()
j.checkError(err)
file, err := os.Open(accessLogPath)
j.checkError(err)
2025-08-17 11:37:49 +00:00
defer file.Close()
_, err = io.Copy(logAccessP, file)
j.checkError(err)
err = os.Truncate(accessLogPath, 0)
j.checkError(err)
2025-08-17 11:37:49 +00:00
j.lastClear = time.Now().Unix()
2023-02-28 19:54:29 +00:00
}
func (j *CheckClientIpJob) hasLimitIp() bool {
db := database.GetDB()
var inbounds []*model.Inbound
err := db.Model(model.Inbound{}).Find(&inbounds).Error
if err != nil {
return false
}
for _, inbound := range inbounds {
if inbound.Settings == "" {
continue
}
settings := map[string][]model.Client{}
json.Unmarshal([]byte(inbound.Settings), &settings)
clients := settings["clients"]
for _, client := range clients {
limitIp := client.LimitIP
if limitIp > 0 {
return true
}
}
}
return false
}
func (j *CheckClientIpJob) processLogFile() bool {
ipRegex := regexp.MustCompile(`from (?:tcp:|udp:)?\[?([0-9a-fA-F\.:]+)\]?:\d+ accepted`)
2024-10-28 19:13:42 +00:00
emailRegex := regexp.MustCompile(`email: (.+)$`)
timestampRegex := regexp.MustCompile(`^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})`)
2024-10-28 19:13:42 +00:00
accessLogPath, _ := xray.GetAccessLogPath()
file, _ := os.Open(accessLogPath)
defer file.Close()
// Track IPs with their last seen timestamp
inboundClientIps := make(map[string]map[string]int64, 100)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
2023-02-28 19:54:29 +00:00
2024-10-28 19:13:42 +00:00
ipMatches := ipRegex.FindStringSubmatch(line)
if len(ipMatches) < 2 {
continue
}
2023-02-28 19:54:29 +00:00
2024-10-28 19:13:42 +00:00
ip := ipMatches[1]
2023-02-28 19:54:29 +00:00
2024-10-28 19:13:42 +00:00
if ip == "127.0.0.1" || ip == "::1" {
continue
}
2023-04-13 19:37:13 +00:00
2024-10-28 19:13:42 +00:00
emailMatches := emailRegex.FindStringSubmatch(line)
if len(emailMatches) < 2 {
continue
2023-02-28 19:54:29 +00:00
}
2024-10-28 19:13:42 +00:00
email := emailMatches[1]
// Extract timestamp from log line
var timestamp int64
timestampMatches := timestampRegex.FindStringSubmatch(line)
if len(timestampMatches) >= 2 {
t, err := time.Parse("2006/01/02 15:04:05", timestampMatches[1])
if err == nil {
timestamp = t.Unix()
} else {
timestamp = time.Now().Unix()
}
} else {
timestamp = time.Now().Unix()
}
2024-10-28 19:13:42 +00:00
if _, exists := inboundClientIps[email]; !exists {
inboundClientIps[email] = make(map[string]int64)
}
// Update timestamp - keep the latest
if existingTime, ok := inboundClientIps[email][ip]; !ok || timestamp > existingTime {
inboundClientIps[email][ip] = timestamp
2024-10-28 19:13:42 +00:00
}
}
shouldCleanLog := false
for email, ipTimestamps := range inboundClientIps {
2023-02-28 19:54:29 +00:00
// Convert to IPWithTimestamp slice
ipsWithTime := make([]IPWithTimestamp, 0, len(ipTimestamps))
for ip, timestamp := range ipTimestamps {
ipsWithTime = append(ipsWithTime, IPWithTimestamp{IP: ip, Timestamp: timestamp})
2024-10-28 19:13:42 +00:00
}
2024-12-16 13:26:47 +00:00
clientIpsRecord, err := j.getInboundClientIps(email)
2023-04-13 19:37:13 +00:00
if err != nil {
j.addInboundClientIps(email, ipsWithTime)
2024-10-28 19:13:42 +00:00
continue
2023-02-28 19:54:29 +00:00
}
2024-10-28 19:13:42 +00:00
shouldCleanLog = j.updateInboundClientIps(clientIpsRecord, email, ipsWithTime) || shouldCleanLog
2023-04-13 19:37:13 +00:00
}
return shouldCleanLog
2023-02-28 19:54:29 +00:00
}
Fix IP Limit continuous ban loop from stale DB entries (#4077) After 60abeaa flipped the excess-IP selector to "oldest wins, newest loses" (to protect the original/current connections), the per-client IP table in `inbound_client_ips.ips` never evicted IPs that stopped connecting. Their stored timestamp stayed ancient, so on every subsequent run they counted as the "oldest protected" slot(s) and whichever IP was actually using the config now was classified as "new excess" and re-banned via fail2ban. This is exactly the #4077 scenario: two IPs connect once and get recorded, the ban lifts after the configured duration, the lone legitimate IP that reconnects gets banned again, and again, and again — a permanent 3xipl.log loop with no real abuser anywhere. Fix: when merging the persisted `old` list with the freshly observed `new` log lines, drop entries whose last-seen timestamp is older than `ipStaleAfterSeconds` (30 minutes). A client that's actually still active refreshes its timestamp any time xray emits a new `accepted` line for a fresh TCP, so the cutoff is far above even idle streaming sessions; a client that's genuinely gone falls out of the table in bounded time and frees its slot. Extracted the merge into `mergeClientIps` so it can be exercised by unit tests without spinning up the full DB-backed job. Tests cover: - stale old entry is dropped (the #4077 regression) - fresh old entries are still carried forward (access-log rotation is still backed by the persisted table) - newer timestamp wins when the same IP appears in both lists - a clock-skewed old `new` entry can't resurrect a stale IP - a zero cutoff never over-evicts Closes #4077
2026-04-22 13:53:32 +00:00
// mergeClientIps combines the persisted (old) and freshly observed (new)
// IP-with-timestamp lists for a single client into a map. An entry is
// dropped if its last-seen timestamp is older than staleCutoff.
//
// Extracted as a helper so updateInboundClientIps can stay DB-oriented
// and the merge policy can be exercised by a unit test.
func mergeClientIps(old, new []IPWithTimestamp, staleCutoff int64) map[string]int64 {
ipMap := make(map[string]int64, len(old)+len(new))
for _, ipTime := range old {
if ipTime.Timestamp < staleCutoff {
continue
}
ipMap[ipTime.IP] = ipTime.Timestamp
}
for _, ipTime := range new {
if ipTime.Timestamp < staleCutoff {
continue
}
if existingTime, ok := ipMap[ipTime.IP]; !ok || ipTime.Timestamp > existingTime {
ipMap[ipTime.IP] = ipTime.Timestamp
}
}
return ipMap
}
2024-10-28 19:13:42 +00:00
func (j *CheckClientIpJob) checkFail2BanInstalled() bool {
cmd := "fail2ban-client"
args := []string{"-h"}
err := exec.Command(cmd, args...).Run()
2024-10-28 19:13:42 +00:00
return err == nil
}
func (j *CheckClientIpJob) checkAccessLogAvailable(iplimitActive bool) bool {
accessLogPath, err := xray.GetAccessLogPath()
if err != nil {
return false
}
if accessLogPath == "none" || accessLogPath == "" {
if iplimitActive {
2024-09-24 11:24:10 +00:00
logger.Warning("[LimitIP] Access log path is not set, Please configure the access log path in Xray configs.")
}
return false
}
return true
}
func (j *CheckClientIpJob) checkError(e error) {
2023-04-13 19:37:13 +00:00
if e != nil {
2023-02-28 19:54:29 +00:00
logger.Warning("client ip job err:", e)
}
}
func (j *CheckClientIpJob) getInboundClientIps(clientEmail string) (*model.InboundClientIps, error) {
2023-02-28 19:54:29 +00:00
db := database.GetDB()
InboundClientIps := &model.InboundClientIps{}
err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error
if err != nil {
return nil, err
}
return InboundClientIps, nil
}
func (j *CheckClientIpJob) addInboundClientIps(clientEmail string, ipsWithTime []IPWithTimestamp) error {
2023-02-28 19:54:29 +00:00
inboundClientIps := &model.InboundClientIps{}
jsonIps, err := json.Marshal(ipsWithTime)
j.checkError(err)
2023-02-28 19:54:29 +00:00
inboundClientIps.ClientEmail = clientEmail
inboundClientIps.Ips = string(jsonIps)
db := database.GetDB()
tx := db.Begin()
defer func() {
if err == nil {
tx.Commit()
} else {
tx.Rollback()
2023-02-28 19:54:29 +00:00
}
}()
err = tx.Save(inboundClientIps).Error
if err != nil {
return err
}
return nil
}
func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.InboundClientIps, clientEmail string, newIpsWithTime []IPWithTimestamp) bool {
// Get the inbound configuration
inbound, err := j.getInboundByEmail(clientEmail)
2024-07-08 21:08:00 +00:00
if err != nil {
logger.Errorf("failed to fetch inbound settings for email %s: %s", clientEmail, err)
return false
}
2023-02-28 19:54:29 +00:00
if inbound.Settings == "" {
2024-07-08 21:08:00 +00:00
logger.Debug("wrong data:", inbound)
return false
2023-02-28 19:54:29 +00:00
}
settings := map[string][]model.Client{}
json.Unmarshal([]byte(inbound.Settings), &settings)
clients := settings["clients"]
// Find the client's IP limit
var limitIp int
var clientFound bool
for _, client := range clients {
if client.Email == clientEmail {
limitIp = client.LimitIP
clientFound = true
break
}
}
if !clientFound || limitIp <= 0 || !inbound.Enable {
// No limit or inbound disabled, just update and return
jsonIps, _ := json.Marshal(newIpsWithTime)
inboundClientIps.Ips = string(jsonIps)
db := database.GetDB()
db.Save(inboundClientIps)
return false
}
// Parse old IPs from database
var oldIpsWithTime []IPWithTimestamp
if inboundClientIps.Ips != "" {
json.Unmarshal([]byte(inboundClientIps.Ips), &oldIpsWithTime)
}
Fix IP Limit continuous ban loop from stale DB entries (#4077) After 60abeaa flipped the excess-IP selector to "oldest wins, newest loses" (to protect the original/current connections), the per-client IP table in `inbound_client_ips.ips` never evicted IPs that stopped connecting. Their stored timestamp stayed ancient, so on every subsequent run they counted as the "oldest protected" slot(s) and whichever IP was actually using the config now was classified as "new excess" and re-banned via fail2ban. This is exactly the #4077 scenario: two IPs connect once and get recorded, the ban lifts after the configured duration, the lone legitimate IP that reconnects gets banned again, and again, and again — a permanent 3xipl.log loop with no real abuser anywhere. Fix: when merging the persisted `old` list with the freshly observed `new` log lines, drop entries whose last-seen timestamp is older than `ipStaleAfterSeconds` (30 minutes). A client that's actually still active refreshes its timestamp any time xray emits a new `accepted` line for a fresh TCP, so the cutoff is far above even idle streaming sessions; a client that's genuinely gone falls out of the table in bounded time and frees its slot. Extracted the merge into `mergeClientIps` so it can be exercised by unit tests without spinning up the full DB-backed job. Tests cover: - stale old entry is dropped (the #4077 regression) - fresh old entries are still carried forward (access-log rotation is still backed by the persisted table) - newer timestamp wins when the same IP appears in both lists - a clock-skewed old `new` entry can't resurrect a stale IP - a zero cutoff never over-evicts Closes #4077
2026-04-22 13:53:32 +00:00
// Merge old and new IPs, evicting entries that haven't been
// re-observed in a while. See mergeClientIps / #4077 for why.
ipMap := mergeClientIps(oldIpsWithTime, newIpsWithTime, time.Now().Unix()-ipStaleAfterSeconds)
// Convert back to slice and sort by timestamp (oldest first)
// This ensures we always protect the original/current connections and ban new excess ones.
allIps := make([]IPWithTimestamp, 0, len(ipMap))
for ip, timestamp := range ipMap {
allIps = append(allIps, IPWithTimestamp{IP: ip, Timestamp: timestamp})
}
sort.Slice(allIps, func(i, j int) bool {
return allIps[i].Timestamp < allIps[j].Timestamp // Ascending order (oldest first)
})
shouldCleanLog := false
j.disAllowedIps = []string{}
2023-02-28 19:54:29 +00:00
// Open log file
2024-07-08 21:08:00 +00:00
logIpFile, err := os.OpenFile(xray.GetIPLimitLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
2024-07-08 21:08:00 +00:00
logger.Errorf("failed to open IP limit log file: %s", err)
return false
}
defer logIpFile.Close()
log.SetOutput(logIpFile)
log.SetFlags(log.LstdFlags)
// Check if we exceed the limit
if len(allIps) > limitIp {
shouldCleanLog = true
// Keep the oldest IPs (currently active connections) and ban the new excess ones.
keptIps := allIps[:limitIp]
bannedIps := allIps[limitIp:]
// Log banned IPs in the format fail2ban filters expect: [LIMIT_IP] Email = X || Disconnecting OLD IP = Y || Timestamp = Z
for _, ipTime := range bannedIps {
j.disAllowedIps = append(j.disAllowedIps, ipTime.IP)
log.Printf("[LIMIT_IP] Email = %s || Disconnecting OLD IP = %s || Timestamp = %d", clientEmail, ipTime.IP, ipTime.Timestamp)
2023-02-28 19:54:29 +00:00
}
// Actually disconnect banned IPs by temporarily removing and re-adding user
// This forces Xray to drop existing connections from banned IPs
if len(bannedIps) > 0 {
j.disconnectClientTemporarily(inbound, clientEmail, clients)
}
// Update database with only the currently active (kept) IPs
jsonIps, _ := json.Marshal(keptIps)
inboundClientIps.Ips = string(jsonIps)
} else {
// Under limit, save all IPs
jsonIps, _ := json.Marshal(allIps)
inboundClientIps.Ips = string(jsonIps)
}
2023-02-28 19:54:29 +00:00
db := database.GetDB()
err = db.Save(inboundClientIps).Error
2024-07-08 21:08:00 +00:00
if err != nil {
logger.Error("failed to save inboundClientIps:", err)
return false
}
if len(j.disAllowedIps) > 0 {
logger.Infof("[LIMIT_IP] Client %s: Kept %d current IPs, queued %d new IPs for fail2ban", clientEmail, limitIp, len(j.disAllowedIps))
}
return shouldCleanLog
2023-02-28 19:54:29 +00:00
}
// disconnectClientTemporarily removes and re-adds a client to force disconnect banned connections
func (j *CheckClientIpJob) disconnectClientTemporarily(inbound *model.Inbound, clientEmail string, clients []model.Client) {
var xrayAPI xray.XrayAPI
apiPort := j.resolveXrayAPIPort()
err := xrayAPI.Init(apiPort)
if err != nil {
logger.Warningf("[LIMIT_IP] Failed to init Xray API for disconnection: %v", err)
return
}
defer xrayAPI.Close()
// Find the client config
var clientConfig map[string]any
for _, client := range clients {
if client.Email == clientEmail {
// Convert client to map for API
clientBytes, _ := json.Marshal(client)
json.Unmarshal(clientBytes, &clientConfig)
break
}
}
if clientConfig == nil {
return
}
// Only perform remove/re-add for protocols supported by XrayAPI.AddUser
protocol := string(inbound.Protocol)
switch protocol {
case "vmess", "vless", "trojan", "shadowsocks":
// supported protocols, continue
default:
logger.Warningf("[LIMIT_IP] Temporary disconnect is not supported for protocol %s on inbound %s", protocol, inbound.Tag)
return
}
// For Shadowsocks, ensure the required "cipher" field is present by
// reading it from the inbound settings (e.g., settings["method"]).
if string(inbound.Protocol) == "shadowsocks" {
var inboundSettings map[string]any
if err := json.Unmarshal([]byte(inbound.Settings), &inboundSettings); err != nil {
logger.Warningf("[LIMIT_IP] Failed to parse inbound settings for shadowsocks cipher: %v", err)
} else {
if method, ok := inboundSettings["method"].(string); ok && method != "" {
clientConfig["cipher"] = method
}
}
}
// Remove user to disconnect all connections
err = xrayAPI.RemoveUser(inbound.Tag, clientEmail)
if err != nil {
logger.Warningf("[LIMIT_IP] Failed to remove user %s: %v", clientEmail, err)
return
}
// Wait a moment for disconnection to take effect
time.Sleep(100 * time.Millisecond)
// Re-add user to allow new connections
err = xrayAPI.AddUser(protocol, inbound.Tag, clientConfig)
if err != nil {
logger.Warningf("[LIMIT_IP] Failed to re-add user %s: %v", clientEmail, err)
}
}
// resolveXrayAPIPort returns the API inbound port from running config, then template config, then default.
func (j *CheckClientIpJob) resolveXrayAPIPort() int {
var configErr error
var templateErr error
if port, err := getAPIPortFromConfigPath(xray.GetConfigPath()); err == nil {
return port
} else {
configErr = err
}
db := database.GetDB()
var template model.Setting
if err := db.Where("key = ?", "xrayTemplateConfig").First(&template).Error; err == nil {
if port, parseErr := getAPIPortFromConfigData([]byte(template.Value)); parseErr == nil {
return port
} else {
templateErr = parseErr
}
} else {
templateErr = err
}
logger.Warningf(
"[LIMIT_IP] Could not determine Xray API port from config or template; falling back to default port %d (config error: %v, template error: %v)",
defaultXrayAPIPort,
configErr,
templateErr,
)
return defaultXrayAPIPort
}
func getAPIPortFromConfigPath(configPath string) (int, error) {
configData, err := os.ReadFile(configPath)
if err != nil {
return 0, err
}
return getAPIPortFromConfigData(configData)
}
func getAPIPortFromConfigData(configData []byte) (int, error) {
xrayConfig := &xray.Config{}
if err := json.Unmarshal(configData, xrayConfig); err != nil {
return 0, err
}
for _, inboundConfig := range xrayConfig.InboundConfigs {
if inboundConfig.Tag == "api" && inboundConfig.Port > 0 {
return inboundConfig.Port, nil
}
}
return 0, errors.New("api inbound port not found")
}
func (j *CheckClientIpJob) getInboundByEmail(clientEmail string) (*model.Inbound, error) {
2023-02-28 19:54:29 +00:00
db := database.GetDB()
2024-12-16 13:26:47 +00:00
inbound := &model.Inbound{}
2024-12-16 13:26:47 +00:00
err := db.Model(&model.Inbound{}).Where("settings LIKE ?", "%"+clientEmail+"%").First(inbound).Error
2023-02-28 19:54:29 +00:00
if err != nil {
return nil, err
}
2024-12-16 13:26:47 +00:00
return inbound, nil
}