add rate limiter to telegram
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
// yamusic-bot/cmd/bot/main.go
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
@@ -15,6 +17,7 @@ import (
|
||||
"gitea.mrixs.me/Mrixs/yamusic-bot/pkg/tagger"
|
||||
"gitea.mrixs.me/Mrixs/yamusic-bot/pkg/yamusic"
|
||||
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -54,11 +57,23 @@ func main() {
|
||||
// 4. Инициализация компонентов
|
||||
downloaderComponent := downloader.NewHTTPDownloader()
|
||||
taggerComponent := tagger.NewID3Tagger()
|
||||
telegramClient := bot.NewTelegramClientAdapter(tgAPI, cfg.TelegramCacheChatID)
|
||||
|
||||
// Создаем БЫСТРЫЙ лимитер для общих вызовов API (в секунду)
|
||||
fastLimiter := rate.NewLimiter(rate.Limit(cfg.TelegramAPIRateLimit), cfg.TelegramAPIRateLimit)
|
||||
slog.Info("General Telegram API rate limit set", "requests_per_second", cfg.TelegramAPIRateLimit)
|
||||
|
||||
// Создаем МЕДЛЕННЫЙ лимитер для кэш-канала (в минуту)
|
||||
// rate.Limit измеряется в событиях/секунду, поэтому конвертируем
|
||||
// Burst size = 1, чтобы избежать отправки пачки сообщений и долгого ожидания
|
||||
cacheRPS := float64(cfg.TelegramCacheRateLimitPerMinute) / 60.0
|
||||
cacheLimiter := rate.NewLimiter(rate.Limit(cacheRPS), 1)
|
||||
slog.Info("Cache channel Telegram API rate limit set", "requests_per_minute", cfg.TelegramCacheRateLimitPerMinute)
|
||||
|
||||
// Передаем оба лимитера в адаптер
|
||||
telegramClient := bot.NewTelegramClientAdapter(tgAPI, cfg.TelegramCacheChatID, fastLimiter, cacheLimiter)
|
||||
|
||||
trackProcessor := processor.NewTrackProcessor(db, yandexClient, downloaderComponent, taggerComponent, telegramClient)
|
||||
|
||||
// Передаем taggerComponent в admin.NewHandler
|
||||
adminHandler := admin.NewHandler(db, telegramClient, yandexClient, taggerComponent, startTime)
|
||||
inlineHandler := bot.NewInlineHandler(yandexClient, trackProcessor, telegramClient)
|
||||
|
||||
|
||||
@@ -6,24 +6,35 @@ import (
|
||||
"log/slog"
|
||||
|
||||
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// TelegramClientAdapter адаптирует библиотеку tgbotapi под наш интерфейс interfaces.TelegramClient.
|
||||
type TelegramClientAdapter struct {
|
||||
api *tgbotapi.BotAPI
|
||||
cacheChatID int64
|
||||
fastLimiter *rate.Limiter // Для общих быстрых запросов
|
||||
cacheLimiter *rate.Limiter // Для медленных запросов в кэш-канал
|
||||
}
|
||||
|
||||
// NewTelegramClientAdapter создает новый адаптер.
|
||||
func NewTelegramClientAdapter(api *tgbotapi.BotAPI, cacheChatID int64) *TelegramClientAdapter {
|
||||
func NewTelegramClientAdapter(api *tgbotapi.BotAPI, cacheChatID int64, fastLimiter, cacheLimiter *rate.Limiter) *TelegramClientAdapter {
|
||||
return &TelegramClientAdapter{
|
||||
api: api,
|
||||
cacheChatID: cacheChatID,
|
||||
fastLimiter: fastLimiter,
|
||||
cacheLimiter: cacheLimiter,
|
||||
}
|
||||
}
|
||||
|
||||
// SendAudioToCacheChannel загружает аудиофайл в кэш-канал и возвращает его FileID.
|
||||
// ИСПОЛЬЗУЕТ МЕДЛЕННЫЙ ЛИМИТЕР.
|
||||
func (t *TelegramClientAdapter) SendAudioToCacheChannel(ctx context.Context, audioPath, title, performer string) (string, error) {
|
||||
// Ждем, пока МЕДЛЕННЫЙ лимитер разрешит выполнить запрос
|
||||
if err := t.cacheLimiter.Wait(ctx); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
audio := tgbotapi.NewAudio(t.cacheChatID, tgbotapi.FilePath(audioPath))
|
||||
audio.Title = title
|
||||
audio.Performer = performer
|
||||
@@ -41,11 +52,17 @@ func (t *TelegramClientAdapter) SendAudioToCacheChannel(ctx context.Context, aud
|
||||
}
|
||||
|
||||
// AnswerInlineQuery отвечает на inline-запрос.
|
||||
// ИСПОЛЬЗУЕТ БЫСТРЫЙ ЛИМИТЕР.
|
||||
func (t *TelegramClientAdapter) AnswerInlineQuery(ctx context.Context, queryID string, results []interface{}) error {
|
||||
// Ждем, пока БЫСТРЫЙ лимитер разрешит выполнить запрос
|
||||
if err := t.fastLimiter.Wait(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
inlineConfig := tgbotapi.InlineConfig{
|
||||
InlineQueryID: queryID,
|
||||
Results: results,
|
||||
CacheTime: 1, // Кэшируем результат на стороне Telegram на 1 секунду
|
||||
CacheTime: 1,
|
||||
}
|
||||
|
||||
if _, err := t.api.Request(inlineConfig); err != nil {
|
||||
@@ -55,7 +72,13 @@ func (t *TelegramClientAdapter) AnswerInlineQuery(ctx context.Context, queryID s
|
||||
}
|
||||
|
||||
// SendMessage отправляет текстовое сообщение.
|
||||
// ИСПОЛЬЗУЕТ БЫСТРЫЙ ЛИМИТЕР.
|
||||
func (t *TelegramClientAdapter) SendMessage(ctx context.Context, chatID int64, text string) error {
|
||||
// Ждем, пока БЫСТРЫЙ лимитер разрешит выполнить запрос
|
||||
if err := t.fastLimiter.Wait(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msg := tgbotapi.NewMessage(chatID, text)
|
||||
if _, err := t.api.Send(msg); err != nil {
|
||||
return fmt.Errorf("failed to send message: %w", err)
|
||||
|
||||
@@ -18,7 +18,9 @@ type Config struct {
|
||||
LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
|
||||
ProcessorWorkers int `env:"PROCESSOR_WORKERS" envDefault:"4"`
|
||||
YandexAPIRateLimit int `env:"YANDEX_API_RATE_LIMIT" envDefault:"5"`
|
||||
TelegramAdminIDs []int64 `env:"-"` // Это поле будет заполнено после парсинга
|
||||
TelegramAPIRateLimit int `env:"TELEGRAM_API_RATE_LIMIT" envDefault:"25"` // Общий лимит в секунду
|
||||
TelegramCacheRateLimitPerMinute int `env:"TELEGRAM_CACHE_RATE_LIMIT_PER_MINUTE" envDefault:"20"` // Лимит для кэш-канала в минуту
|
||||
TelegramAdminIDs []int64 `env:"-"`
|
||||
}
|
||||
|
||||
// New загружает конфигурацию из переменных окружения и парсит необходимые поля.
|
||||
|
||||
Reference in New Issue
Block a user