From 278aa1c85c6af86a64326c924a746eeec5865389 Mon Sep 17 00:00:00 2001 From: Vlad Yaroslavlev Date: Fri, 2 Jan 2026 18:13:32 +0300 Subject: [PATCH] Fix telegram bot issue (#3608) * fix: improve Telegram bot handling for concurrent starts and graceful shutdown - Added logic to stop any existing long-polling loop when Start is called again. - Introduced a mutex to manage access to shared state variables, ensuring thread safety. - Updated the OnReceive method to prevent multiple concurrent executions. - Enhanced Stop method to ensure proper cleanup of resources and state management. * fix: enhance Telegram bot's long-polling management - Improved handling of concurrent starts by stopping existing long-polling loops. - Implemented mutex for thread-safe access to shared state variables. - Updated OnReceive method to prevent multiple executions. - Enhanced Stop method for better resource cleanup and state management. * . --- web/service/tgbot.go | 96 +++++++++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 41 deletions(-) diff --git a/web/service/tgbot.go b/web/service/tgbot.go index 06c51faa..3a98dcb4 100644 --- a/web/service/tgbot.go +++ b/web/service/tgbot.go @@ -174,6 +174,10 @@ func (t *Tgbot) Start(i18nFS embed.FS) error { return err } + // If Start is called again (e.g. during reload), ensure any previous long-polling + // loop is stopped before creating a new bot / receiver. + StopBot() + // Initialize hash storage to store callback queries hashStorage = global.NewHashStorage(20 * time.Minute) @@ -207,6 +211,7 @@ func (t *Tgbot) Start(i18nFS embed.FS) error { return err } + parsedAdminIds := make([]int64, 0) // Parse admin IDs from comma-separated string if tgBotID != "" { for _, adminID := range strings.Split(tgBotID, ",") { @@ -215,9 +220,12 @@ func (t *Tgbot) Start(i18nFS embed.FS) error { logger.Warning("Failed to parse admin ID from Telegram bot chat ID:", err) return err } - adminIds = append(adminIds, int64(id)) + parsedAdminIds = append(parsedAdminIds, int64(id)) } } + tgBotMutex.Lock() + adminIds = parsedAdminIds + tgBotMutex.Unlock() // Get Telegram bot proxy URL tgBotProxy, err := t.settingService.GetTgBotProxy() @@ -252,10 +260,12 @@ func (t *Tgbot) Start(i18nFS embed.FS) error { } // Start receiving Telegram bot messages - if !isRunning { + tgBotMutex.Lock() + alreadyRunning := isRunning || botCancel != nil + tgBotMutex.Unlock() + if !alreadyRunning { logger.Info("Telegram bot receiver started") go t.OnReceive() - isRunning = true } return nil @@ -300,6 +310,8 @@ func (t *Tgbot) NewBot(token string, proxyUrl string, apiServerUrl string) (*tel // IsRunning checks if the Telegram bot is currently running. func (t *Tgbot) IsRunning() bool { + tgBotMutex.Lock() + defer tgBotMutex.Unlock() return isRunning } @@ -317,34 +329,34 @@ func (t *Tgbot) SetHostname() { // Stop safely stops the Telegram bot's Long Polling operation. // This method now calls the global StopBot function and cleans up other resources. func (t *Tgbot) Stop() { - // Call the global StopBot function to gracefully shut down Long Polling StopBot() - - // Stop the bot handler (in case the goroutine hasn't exited yet) - if botHandler != nil { - botHandler.Stop() - } logger.Info("Stop Telegram receiver ...") - isRunning = false + tgBotMutex.Lock() adminIds = nil + tgBotMutex.Unlock() } // StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context. // This is the global function called from main.go's signal handler and t.Stop(). func StopBot() { + // Don't hold the mutex while cancelling/waiting. tgBotMutex.Lock() - defer tgBotMutex.Unlock() + cancel := botCancel + botCancel = nil + handler := botHandler + botHandler = nil + isRunning = false + tgBotMutex.Unlock() - if botCancel != nil { + if handler != nil { + handler.Stop() + } + + if cancel != nil { logger.Info("Sending cancellation signal to Telegram bot...") - - // Calling botCancel() cancels the context passed to UpdatesViaLongPolling, - // which stops the Long Polling operation and closes the updates channel, - // allowing the th.Start() goroutine to exit cleanly. - botCancel() - - botCancel = nil - // Giving the goroutine a small delay to exit cleanly. + // Cancels the context passed to UpdatesViaLongPolling; this closes updates channel + // and lets botHandler.Start() exit cleanly. + cancel() botWG.Wait() logger.Info("Telegram bot successfully stopped.") } @@ -379,36 +391,38 @@ func (t *Tgbot) OnReceive() { params := telego.GetUpdatesParams{ Timeout: 30, // Increased timeout to reduce API calls } - // --- GRACEFUL SHUTDOWN FIX: Context creation --- + // Strict singleton: never start a second long-polling loop. tgBotMutex.Lock() - - // Create a context with cancellation and store the cancel function. - var ctx context.Context - - // Check if botCancel is already set (to prevent race condition overwrite and goroutine leak) - if botCancel == nil { - ctx, botCancel = context.WithCancel(context.Background()) - } else { - // If botCancel is already set, use a non-cancellable context for this redundant call. - // This prevents overwriting the active botCancel and causing a goroutine leak from the previous call. - logger.Warning("TgBot OnReceive called concurrently. Using background context for redundant call.") - ctx = context.Background() // <<< ИЗМЕНЕНИЕ + if botCancel != nil || isRunning { + tgBotMutex.Unlock() + logger.Warning("TgBot OnReceive called while already running; ignoring.") + return } + ctx, cancel := context.WithCancel(context.Background()) + botCancel = cancel + isRunning = true + // Add to WaitGroup before releasing the lock so StopBot() can't return + // before this receiver goroutine is accounted for. + botWG.Add(1) tgBotMutex.Unlock() // Get updates channel using the context. updates, _ := bot.UpdatesViaLongPolling(ctx, ¶ms) - botWG.Go(func() { + go func() { + defer botWG.Done() + h, _ := th.NewBotHandler(bot, updates) + tgBotMutex.Lock() + botHandler = h + tgBotMutex.Unlock() - botHandler, _ = th.NewBotHandler(bot, updates) - botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error { + h.HandleMessage(func(ctx *th.Context, message telego.Message) error { delete(userStates, message.Chat.ID) t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.keyboardClosed"), tu.ReplyKeyboardRemove()) return nil }, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard"))) - botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error { + h.HandleMessage(func(ctx *th.Context, message telego.Message) error { // Use goroutine with worker pool for concurrent command processing go func() { messageWorkerPool <- struct{}{} // Acquire worker @@ -420,7 +434,7 @@ func (t *Tgbot) OnReceive() { return nil }, th.AnyCommand()) - botHandler.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error { + h.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error { // Use goroutine with worker pool for concurrent callback processing go func() { messageWorkerPool <- struct{}{} // Acquire worker @@ -432,7 +446,7 @@ func (t *Tgbot) OnReceive() { return nil }, th.AnyCallbackQueryWithMessage()) - botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error { + h.HandleMessage(func(ctx *th.Context, message telego.Message) error { if userState, exists := userStates[message.Chat.ID]; exists { switch userState { case "awaiting_id": @@ -578,8 +592,8 @@ func (t *Tgbot) OnReceive() { return nil }, th.AnyMessage()) - botHandler.Start() - }) + h.Start() + }() } // answerCommand processes incoming command messages from Telegram users.