mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-05-13 09:36:05 +00:00
- Traffic-writer single-consumer queue (web/service/traffic_writer.go) serialises every DB write that touches up/down/all_time/last_online (AddTraffic, SetRemoteTraffic, Reset*, UpdateClientTrafficByEmail) so overlapping goroutines can no longer clobber each other's column-scoped Updates with a stale tx.Save. - DB pool: WAL + busy_timeout=10s + synchronous=NORMAL + _txlock= immediate, MaxOpenConns=8 / MaxIdleConns=4. The immediate-tx PRAGMA fixes residual "database is locked [0ms]" cases where deferred-tx writer-upgrade conflicts bypass busy_timeout. - SetRemoteTraffic full-mirrors node-authoritative state into central: settings JSON, remark, listen, port, total, expiry, all_time, enable, plus per-client total/expiry/reset/all_time. Inbounds and client_traffics rows present on node but missing from central are created; rows missing from snap are deleted (with cascading client_traffics removal). - NodeTrafficSyncJob detects structural changes from the mirror and broadcasts invalidate(inbounds) so open central UIs re-fetch via REST on node-side add/del/edit without manual refresh. - XrayTrafficJob broadcasts invalidate(inbounds) when auto-disable flips client_traffics.enable so the per-client toggle reflects depletion without manual refresh. - Frontend: inbounds page now subscribes to the BroadcastInbounds 'inbounds' WS event (full-list pushes from add/del/update controllers were silently dropped). Fixes invalidate payload field (dataType -> type). Restart- panel modal switched from Promise-wrap to onOk-only so Cancel actually cancels. - Node files trimmed of stale prose-comments; cron cadence dropped 10s -> 5s to match the inbounds page UX. - README badges and Go module path bumped v2 -> v3 to match module rename. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
87 lines
1.5 KiB
Go
87 lines
1.5 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mhsanaei/3x-ui/v3/logger"
|
|
)
|
|
|
|
const (
|
|
trafficWriterQueueSize = 256
|
|
trafficWriterSubmitTimeout = 5 * time.Second
|
|
)
|
|
|
|
type trafficWriteRequest struct {
|
|
apply func() error
|
|
done chan error
|
|
}
|
|
|
|
var (
|
|
twQueue chan *trafficWriteRequest
|
|
twCtx context.Context
|
|
twCancel context.CancelFunc
|
|
twDone chan struct{}
|
|
twOnce sync.Once
|
|
)
|
|
|
|
func StartTrafficWriter() {
|
|
twOnce.Do(func() {
|
|
twQueue = make(chan *trafficWriteRequest, trafficWriterQueueSize)
|
|
twCtx, twCancel = context.WithCancel(context.Background())
|
|
twDone = make(chan struct{})
|
|
go runTrafficWriter()
|
|
})
|
|
}
|
|
|
|
func StopTrafficWriter() {
|
|
if twCancel != nil {
|
|
twCancel()
|
|
<-twDone
|
|
}
|
|
}
|
|
|
|
func runTrafficWriter() {
|
|
defer close(twDone)
|
|
for {
|
|
select {
|
|
case req := <-twQueue:
|
|
req.done <- safeApply(req.apply)
|
|
case <-twCtx.Done():
|
|
for {
|
|
select {
|
|
case req := <-twQueue:
|
|
req.done <- safeApply(req.apply)
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func safeApply(fn func() error) (err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
err = fmt.Errorf("traffic writer panic: %v", r)
|
|
logger.Error(err.Error())
|
|
}
|
|
}()
|
|
return fn()
|
|
}
|
|
|
|
func submitTrafficWrite(fn func() error) error {
|
|
if twQueue == nil {
|
|
return safeApply(fn)
|
|
}
|
|
req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)}
|
|
select {
|
|
case twQueue <- req:
|
|
case <-time.After(trafficWriterSubmitTimeout):
|
|
return errors.New("traffic writer queue full")
|
|
}
|
|
return <-req.done
|
|
}
|