diff --git a/web/service/traffic_writer.go b/web/service/traffic_writer.go index b3fa97b6..b15c459a 100644 --- a/web/service/traffic_writer.go +++ b/web/service/traffic_writer.go @@ -21,39 +21,64 @@ type trafficWriteRequest struct { } var ( + twMu sync.Mutex twQueue chan *trafficWriteRequest - twCtx context.Context twCancel context.CancelFunc twDone chan struct{} - twOnce sync.Once ) +// StartTrafficWriter spins up the serial writer goroutine. Safe to call again +// after StopTrafficWriter — each Start/Stop cycle gets fresh channels. The +// previous sync.Once-based implementation deadlocked after a SIGHUP-driven +// panel restart: Stop killed the consumer goroutine but Once prevented Start +// from spawning a new one, so every later submitTrafficWrite blocked forever +// on <-req.done with no consumer (including the AddTraffic call inside +// XrayService.GetXrayConfig that runs from startTask). func StartTrafficWriter() { - twOnce.Do(func() { - twQueue = make(chan *trafficWriteRequest, trafficWriterQueueSize) - twCtx, twCancel = context.WithCancel(context.Background()) - twDone = make(chan struct{}) - go runTrafficWriter() - }) + twMu.Lock() + defer twMu.Unlock() + if twQueue != nil { + return + } + queue := make(chan *trafficWriteRequest, trafficWriterQueueSize) + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + twQueue = queue + twCancel = cancel + twDone = done + go runTrafficWriter(queue, ctx, done) } +// StopTrafficWriter cancels the writer context and waits for the goroutine to +// drain any pending requests before returning. Resets the package state so a +// subsequent StartTrafficWriter can spawn a fresh consumer. func StopTrafficWriter() { - if twCancel != nil { - twCancel() - <-twDone + twMu.Lock() + cancel := twCancel + done := twDone + twQueue = nil + twCancel = nil + twDone = nil + twMu.Unlock() + + if cancel != nil { + cancel() + } + if done != nil { + <-done } } -func runTrafficWriter() { - defer close(twDone) +func runTrafficWriter(queue chan *trafficWriteRequest, ctx context.Context, done chan struct{}) { + defer close(done) for { select { - case req := <-twQueue: + case req := <-queue: req.done <- safeApply(req.apply) - case <-twCtx.Done(): + case <-ctx.Done(): for { select { - case req := <-twQueue: + case req := <-queue: req.done <- safeApply(req.apply) default: return @@ -74,12 +99,16 @@ func safeApply(fn func() error) (err error) { } func submitTrafficWrite(fn func() error) error { - if twQueue == nil { + twMu.Lock() + queue := twQueue + twMu.Unlock() + + if queue == nil { return safeApply(fn) } req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)} select { - case twQueue <- req: + case queue <- req: case <-time.After(trafficWriterSubmitTimeout): return errors.New("traffic writer queue full") }