mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2025-11-07 15:52:52 +00:00
Fix: Graceful Telegram bot shutdown to prevent 409 Conflict (#3580)
* Fix: Graceful Telegram bot shutdown to prevent 409 Conflict Introduces a `botCancel` context and a global `StopBot()` function to ensure the Telegram bot's Long Polling operation is safely terminated (via context cancellation) before the service restarts. This prevents the "Conflict: another update consumer is running" (409) error upon panel restart. Changes: - Added `botCancel context.CancelFunc` to manage context cancellation. - Implemented global `StopBot()` function. - Updated `Tgbot.Stop()` to call `StopBot()`. - Modified `Tgbot.OnReceive()` to use the new cancellable context for `UpdatesViaLongPolling`. * Fix: Prevent race condition and goroutine leak in TgBot Addresses a critical race condition on the global `botCancel` variable, which could occur if `Tgbot.OnReceive()` was called concurrently (e.g., during rapid panel restarts or unexpected behavior). Changes in tgbot.go: - Added `tgBotMutex sync.Mutex` to ensure thread safety. - Protected `botCancel` creation and assignment in `OnReceive()` using the mutex, and added a check to prevent overwriting an active context, which avoids goroutine leaks. - Protected the cancellation and cleanup logic in `StopBot()` with the mutex. * Refactor: Replace time.Sleep with sync.WaitGroup for reliable TgBot shutdown Replaced the unreliable `time.Sleep(1 * time.Second)` in `service.StopBot()` with `sync.WaitGroup`. This ensures the Long Polling goroutine is explicitly waited for and reliably exits before the panel continues, preventing potential resource leaks and incomplete shutdowns during restarts. Changes: - Added `botWG sync.WaitGroup` variable. - Updated `service.StopBot()` to call `botWG.Wait()` instead of `time.Sleep()`. - Modified `Tgbot.OnReceive()` to correctly use `botWG.Add(1)` and `defer botWG.Done()` within the Long Polling goroutine. - Corrected the goroutine structure in `OnReceive()` to properly encapsulate all message handling logic.
This commit is contained in:
parent
6e46e9b16e
commit
020cd63e22
1 changed files with 223 additions and 170 deletions
|
|
@ -39,6 +39,14 @@ import (
|
|||
|
||||
var (
|
||||
bot *telego.Bot
|
||||
|
||||
// botCancel stores the function to cancel the context, stopping Long Polling gracefully.
|
||||
botCancel context.CancelFunc
|
||||
// tgBotMutex protects concurrent access to botCancel variable
|
||||
tgBotMutex sync.Mutex
|
||||
// botWG waits for the OnReceive Long Polling goroutine to finish.
|
||||
botWG sync.WaitGroup
|
||||
|
||||
botHandler *th.BotHandler
|
||||
adminIds []int64
|
||||
isRunning bool
|
||||
|
|
@ -306,8 +314,13 @@ func (t *Tgbot) SetHostname() {
|
|||
hostname = host
|
||||
}
|
||||
|
||||
// Stop stops the Telegram bot and cleans up resources.
|
||||
// Stop safely stops the Telegram bot's Long Polling operation.
|
||||
// This method now calls the global StopBot function and cleans up other resources.
|
||||
func (t *Tgbot) Stop() {
|
||||
// Call the global StopBot function to gracefully shut down Long Polling
|
||||
StopBot()
|
||||
|
||||
// Stop the bot handler (in case the goroutine hasn't exited yet)
|
||||
if botHandler != nil {
|
||||
botHandler.Stop()
|
||||
}
|
||||
|
|
@ -316,6 +329,27 @@ func (t *Tgbot) Stop() {
|
|||
adminIds = nil
|
||||
}
|
||||
|
||||
// StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context.
|
||||
// This is the global function called from main.go's signal handler and t.Stop().
|
||||
func StopBot() {
|
||||
tgBotMutex.Lock()
|
||||
defer tgBotMutex.Unlock()
|
||||
|
||||
if botCancel != nil {
|
||||
logger.Info("Sending cancellation signal to Telegram bot...")
|
||||
|
||||
// Calling botCancel() cancels the context passed to UpdatesViaLongPolling,
|
||||
// which stops the Long Polling operation and closes the updates channel,
|
||||
// allowing the th.Start() goroutine to exit cleanly.
|
||||
botCancel()
|
||||
|
||||
botCancel = nil
|
||||
// Giving the goroutine a small delay to exit cleanly.
|
||||
botWG.Wait()
|
||||
logger.Info("Telegram bot successfully stopped.")
|
||||
}
|
||||
}
|
||||
|
||||
// encodeQuery encodes the query string if it's longer than 64 characters.
|
||||
func (t *Tgbot) encodeQuery(query string) string {
|
||||
// NOTE: we only need to hash for more than 64 chars
|
||||
|
|
@ -345,11 +379,29 @@ func (t *Tgbot) OnReceive() {
|
|||
params := telego.GetUpdatesParams{
|
||||
Timeout: 30, // Increased timeout to reduce API calls
|
||||
}
|
||||
// --- GRACEFUL SHUTDOWN FIX: Context creation ---
|
||||
tgBotMutex.Lock()
|
||||
|
||||
updates, _ := bot.UpdatesViaLongPolling(context.Background(), ¶ms)
|
||||
// Create a context with cancellation and store the cancel function.
|
||||
var ctx context.Context
|
||||
|
||||
// Check if botCancel is already set (to prevent race condition overwrite and goroutine leak)
|
||||
if botCancel == nil {
|
||||
ctx, botCancel = context.WithCancel(context.Background())
|
||||
} else {
|
||||
// If botCancel is already set, use a non-cancellable context for this redundant call.
|
||||
// This prevents overwriting the active botCancel and causing a goroutine leak from the previous call.
|
||||
logger.Warning("TgBot OnReceive called concurrently. Using background context for redundant call.")
|
||||
ctx = context.Background() // <<< ИЗМЕНЕНИЕ
|
||||
}
|
||||
|
||||
tgBotMutex.Unlock()
|
||||
|
||||
// Get updates channel using the context.
|
||||
updates, _ := bot.UpdatesViaLongPolling(ctx, ¶ms)
|
||||
botWG.Go(func() {
|
||||
|
||||
botHandler, _ = th.NewBotHandler(bot, updates)
|
||||
|
||||
botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
|
||||
delete(userStates, message.Chat.ID)
|
||||
t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.keyboardClosed"), tu.ReplyKeyboardRemove())
|
||||
|
|
@ -527,6 +579,7 @@ func (t *Tgbot) OnReceive() {
|
|||
}, th.AnyMessage())
|
||||
|
||||
botHandler.Start()
|
||||
})
|
||||
}
|
||||
|
||||
// answerCommand processes incoming command messages from Telegram users.
|
||||
|
|
|
|||
Loading…
Reference in a new issue