diff --git a/cmd/server/main.go b/cmd/server/main.go index ac9a6f9..ce4c683 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -12,6 +12,7 @@ import ( "gitea.mrixs.me/minecraft-platform/backend/internal/api" "gitea.mrixs.me/minecraft-platform/backend/internal/core" "gitea.mrixs.me/minecraft-platform/backend/internal/database" + "gitea.mrixs.me/minecraft-platform/backend/internal/ws" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" @@ -31,12 +32,17 @@ func main() { userRepo := &database.UserRepository{DB: dbPool} serverRepo := &database.ServerRepository{DB: dbPool} modpackRepo := &database.ModpackRepository{DB: dbPool} + jobRepo := &database.JobRepository{DB: dbPool} // --- Инициализация сервисов --- userService := &core.UserService{Repo: userRepo} authService := &core.AuthService{UserRepo: userRepo} serverPoller := &core.ServerPoller{Repo: serverRepo} + // --- Инициализация WebSocket Hub --- + hub := ws.NewHub() + go hub.Run() + keyPath := os.Getenv("RSA_PRIVATE_KEY_PATH") if keyPath == "" { slog.Error("RSA_PRIVATE_KEY_PATH environment variable is not set") @@ -74,7 +80,9 @@ func main() { launcherHandler := &api.LauncherHandler{ModpackRepo: modpackRepo} modpackHandler := &api.ModpackHandler{ ModpackRepo: modpackRepo, + JobRepo: jobRepo, JanitorService: janitorService, + Hub: hub, } adminUserHandler := &api.AdminUserHandler{UserRepo: userRepo} @@ -120,6 +128,11 @@ func main() { r.Route("/api/admin", func(r chi.Router) { r.Use(api.AdminMiddleware) + // WebSocket endpoint for jobs + r.Get("/ws/jobs", func(w http.ResponseWriter, r *http.Request) { + ws.ServeWs(hub, w, r) + }) + r.Route("/modpacks", func(r chi.Router) { r.Post("/import", modpackHandler.ImportModpack) }) diff --git a/go.mod b/go.mod index 075c78a..cab4662 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-chi/chi/v5 v5.2.1 github.com/golang-jwt/jwt/v5 v5.2.2 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.7.5 golang.org/x/crypto v0.39.0 ) diff --git a/go.sum b/go.sum index 01face7..8cfb9de 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeD github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/internal/api/middleware.go b/internal/api/middleware.go index 123f352..f00aa65 100644 --- a/internal/api/middleware.go +++ b/internal/api/middleware.go @@ -16,15 +16,25 @@ const ClaimsContextKey = contextKey("claims") // AuthMiddleware проверяет JWT токен и добавляет claims в контекст запроса. func AuthMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var tokenString string + + // 1. Проверяем заголовок Authorization authHeader := r.Header.Get("Authorization") - if authHeader == "" { - http.Error(w, "Authorization header required", http.StatusUnauthorized) - return + if authHeader != "" { + tokenString = strings.TrimPrefix(authHeader, "Bearer ") + if tokenString == authHeader { // Не было префикса Bearer + http.Error(w, "Invalid token format", http.StatusUnauthorized) + return + } } - tokenString := strings.TrimPrefix(authHeader, "Bearer ") - if tokenString == authHeader { - http.Error(w, "Invalid token format", http.StatusUnauthorized) + // 2. Если заголовка нет, проверяем параметр query (для WebSocket) + if tokenString == "" { + tokenString = r.URL.Query().Get("token") + } + + if tokenString == "" { + http.Error(w, "Authorization required", http.StatusUnauthorized) return } diff --git a/internal/api/modpack_handler.go b/internal/api/modpack_handler.go index 0bb839c..ef7295b 100644 --- a/internal/api/modpack_handler.go +++ b/internal/api/modpack_handler.go @@ -2,8 +2,10 @@ package api import ( "context" + "encoding/json" "fmt" "io" + "log/slog" "net/http" "os" @@ -11,50 +13,51 @@ import ( "gitea.mrixs.me/minecraft-platform/backend/internal/core/importer" "gitea.mrixs.me/minecraft-platform/backend/internal/database" "gitea.mrixs.me/minecraft-platform/backend/internal/models" + "gitea.mrixs.me/minecraft-platform/backend/internal/ws" ) type ModpackHandler struct { ModpackRepo *database.ModpackRepository + JobRepo *database.JobRepository JanitorService *core.FileJanitorService + Hub *ws.Hub } -// ImportModpack обрабатывает загрузку и импорт модпака. +// ImportJobParams содержит параметры для фоновой задачи импорта +type ImportJobParams struct { + ImporterType string + ImportMethod string + SourceURL string + TempZipPath string + Name string + DisplayName string + MCVersion string +} + +// ImportModpack обрабатывает запрос на импорт и запускает асинхронную задачу func (h *ModpackHandler) ImportModpack(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(512 << 20); err != nil { http.Error(w, "File too large", http.StatusBadRequest) return } - importerType := r.FormValue("importerType") - importMethod := r.FormValue("importMethod") - sourceURL := r.FormValue("sourceUrl") + params := ImportJobParams{ + ImporterType: r.FormValue("importerType"), + ImportMethod: r.FormValue("importMethod"), + SourceURL: r.FormValue("sourceUrl"), + Name: r.FormValue("name"), + DisplayName: r.FormValue("displayName"), + MCVersion: r.FormValue("mcVersion"), + } - var tempZipPath string - var err error - - // --- Выбираем импортер --- - var imp importer.ModpackImporter - storagePath := os.Getenv("MODPACKS_STORAGE_PATH") - - switch importerType { - case "simple": - imp = &importer.SimpleZipImporter{StoragePath: storagePath} - case "curseforge": - apiKey := os.Getenv("CURSEFORGE_API_KEY") - if apiKey == "" { - http.Error(w, "CurseForge API key is not configured on the server", http.StatusInternalServerError) - return - } - imp = importer.NewCurseForgeImporter(storagePath, apiKey) - case "modrinth": - imp = importer.NewModrinthImporter(storagePath) - default: - http.Error(w, "Invalid importer type", http.StatusBadRequest) + // Валидация + if params.Name == "" || params.DisplayName == "" || params.MCVersion == "" { + http.Error(w, "Missing required fields", http.StatusBadRequest) return } - // --- Получаем zip-файл --- - if importMethod == "file" { + // Обработка загрузки файла + if params.ImportMethod == "file" { file, _, err := r.FormFile("file") if err != nil { http.Error(w, "Invalid file upload", http.StatusBadRequest) @@ -67,55 +70,136 @@ func (h *ModpackHandler) ImportModpack(w http.ResponseWriter, r *http.Request) { http.Error(w, "Could not create temp file", http.StatusInternalServerError) return } - defer tempFile.Close() - defer os.Remove(tempFile.Name()) + // Не удаляем файл здесь, так как он нужен worker-у. Удалим в worker-е. + // defer os.Remove(tempFile.Name()) if _, err := io.Copy(tempFile, file); err != nil { + tempFile.Close() + os.Remove(tempFile.Name()) http.Error(w, "Could not save temp file", http.StatusInternalServerError) return } - tempZipPath = tempFile.Name() - - } else if importMethod == "url" { - cfImporter, ok := imp.(*importer.CurseForgeImporter) - if !ok { - http.Error(w, "Importer type does not support URL import", http.StatusBadRequest) - return - } - tempZipPath, err = cfImporter.DownloadModpackFromURL(sourceURL) - if err != nil { - http.Error(w, fmt.Sprintf("Failed to download from URL: %v", err), http.StatusInternalServerError) - return - } - defer os.Remove(tempZipPath) - - } else { - http.Error(w, "Invalid import method", http.StatusBadRequest) - return + tempFile.Close() + params.TempZipPath = tempFile.Name() } - // --- Запускаем импорт --- - files, err := imp.Import(tempZipPath) + // Создаем задачу в БД + jobID, err := h.JobRepo.CreateJob(r.Context()) if err != nil { - http.Error(w, fmt.Sprintf("Import failed: %v", err), http.StatusInternalServerError) + if params.ImportMethod == "file" { + os.Remove(params.TempZipPath) + } + http.Error(w, fmt.Sprintf("Failed to create job: %v", err), http.StatusInternalServerError) return } - // --- Сохраняем результат в БД --- + // Запускаем worker + go h.processImportJob(jobID, params) + + // Отправляем ответ + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(map[string]interface{}{ + "job_id": jobID, + "message": "Import job started", + }) +} + +// processImportJob выполняет импорт в фоне +func (h *ModpackHandler) processImportJob(jobID int, params ImportJobParams) { + ctx := context.Background() + h.updateJobStatus(ctx, jobID, models.JobStatusDownloading, 10, "") + + // Очистка временного файла по завершении + if params.TempZipPath != "" { + defer os.Remove(params.TempZipPath) + } + + storagePath := os.Getenv("MODPACKS_STORAGE_PATH") + var imp importer.ModpackImporter + + // Настройка импортера + switch params.ImporterType { + case "simple": + imp = &importer.SimpleZipImporter{StoragePath: storagePath} + case "curseforge": + apiKey := os.Getenv("CURSEFORGE_API_KEY") + if apiKey == "" { + h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, "CurseForge API key missing") + return + } + imp = importer.NewCurseForgeImporter(storagePath, apiKey) + case "modrinth": + imp = importer.NewModrinthImporter(storagePath) + default: + h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, "Invalid importer type") + return + } + + // Загрузка по URL если нужно + if params.ImportMethod == "url" { + h.updateJobStatus(ctx, jobID, models.JobStatusDownloading, 20, "Downloading from URL...") + + // Логика скачивания зависит от типа импортера, пока поддерживаем CurseForge + // TODO: Сделать интерфейс DownloadableImporter + if cfImporter, ok := imp.(*importer.CurseForgeImporter); ok { + zipPath, err := cfImporter.DownloadModpackFromURL(params.SourceURL) + if err != nil { + h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, fmt.Sprintf("Download failed: %v", err)) + return + } + params.TempZipPath = zipPath + defer os.Remove(zipPath) // Удаляем скачанный файл после обработки + } else { + // Для других типов пока не поддерживаем URL download внутри импортера + h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, "URL import not supported for this type") + return + } + } + + h.updateJobStatus(ctx, jobID, models.JobStatusProcessing, 40, "Processing modpack files...") + + // Импорт файлов + files, err := imp.Import(params.TempZipPath) + if err != nil { + h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, fmt.Sprintf("Import failed: %v", err)) + return + } + + h.updateJobStatus(ctx, jobID, models.JobStatusProcessing, 80, "Saving to database...") + + // Сохранение в БД modpack := &models.Modpack{ - Name: r.FormValue("name"), - DisplayName: r.FormValue("displayName"), - MinecraftVersion: r.FormValue("mcVersion"), + Name: params.Name, + DisplayName: params.DisplayName, + MinecraftVersion: params.MCVersion, } - err = h.ModpackRepo.CreateModpackTx(r.Context(), modpack, files) + err = h.ModpackRepo.CreateModpackTx(ctx, modpack, files) if err != nil { - http.Error(w, fmt.Sprintf("Database save failed: %v", err), http.StatusInternalServerError) + h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, fmt.Sprintf("Database save failed: %v", err)) return } - w.WriteHeader(http.StatusCreated) - fmt.Fprintf(w, "Modpack '%s' imported successfully with %d files.", modpack.DisplayName, len(files)) + h.updateJobStatus(ctx, jobID, models.JobStatusCompleted, 100, "Success") + // Запуск Janitor-а go h.JanitorService.CleanOrphanedFiles(context.Background()) } + +// updateJobStatus обновляет статус в БД и отправляет уведомление через WebSocket +func (h *ModpackHandler) updateJobStatus(ctx context.Context, jobID int, status models.ImportJobStatus, progress int, errMsg string) { + // Обновляем БД + if err := h.JobRepo.UpdateJobStatus(ctx, jobID, status, progress, errMsg); err != nil { + slog.Error("Failed to update job status in DB", "jobID", jobID, "error", err) + } + + // Отправляем в WebSocket + update := map[string]interface{}{ + "job_id": jobID, + "status": status, + "progress": progress, + "error_message": errMsg, + } + msg, _ := json.Marshal(update) + h.Hub.BroadcastMessage(msg) +} diff --git a/internal/database/job_repository.go b/internal/database/job_repository.go new file mode 100644 index 0000000..ac6bd82 --- /dev/null +++ b/internal/database/job_repository.go @@ -0,0 +1,58 @@ +package database + +import ( + "context" + + "gitea.mrixs.me/minecraft-platform/backend/internal/models" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +type JobRepository struct { + DB *pgxpool.Pool +} + +// CreateJob создает новую задачу со статусом pending +func (r *JobRepository) CreateJob(ctx context.Context) (int, error) { + var jobID int + query := `INSERT INTO modpack_import_jobs (status, progress, created_at, updated_at) + VALUES ($1, $2, NOW(), NOW()) RETURNING id` + + err := r.DB.QueryRow(ctx, query, models.JobStatusPending, 0).Scan(&jobID) + return jobID, err +} + +// UpdateJobStatus обновляет статус и прогресс задачи +func (r *JobRepository) UpdateJobStatus(ctx context.Context, jobID int, status models.ImportJobStatus, progress int, errorMessage string) error { + query := `UPDATE modpack_import_jobs + SET status = $1, progress = $2, error_message = $3, updated_at = NOW() + WHERE id = $4` + + _, err := r.DB.Exec(ctx, query, status, progress, errorMessage, jobID) + return err +} + +// GetJob получает задачу по ID +func (r *JobRepository) GetJob(ctx context.Context, jobID int) (*models.ImportJob, error) { + job := &models.ImportJob{} + query := `SELECT id, status, progress, error_message, created_at, updated_at + FROM modpack_import_jobs WHERE id = $1` + + var errMsg *string // Для обработки NULL + err := r.DB.QueryRow(ctx, query, jobID).Scan( + &job.ID, &job.Status, &job.Progress, &errMsg, &job.CreatedAt, &job.UpdatedAt, + ) + + if err != nil { + if err == pgx.ErrNoRows { + return nil, nil // Или спец ошибка + } + return nil, err + } + + if errMsg != nil { + job.ErrorMessage = *errMsg + } + + return job, nil +} diff --git a/internal/models/job.go b/internal/models/job.go new file mode 100644 index 0000000..06f6fd0 --- /dev/null +++ b/internal/models/job.go @@ -0,0 +1,24 @@ +package models + +import "time" + +// ImportJobStatus определяет возможные статусы задачи импорта +type ImportJobStatus string + +const ( + JobStatusPending ImportJobStatus = "pending" + JobStatusDownloading ImportJobStatus = "downloading" + JobStatusProcessing ImportJobStatus = "processing" + JobStatusCompleted ImportJobStatus = "completed" + JobStatusFailed ImportJobStatus = "failed" +) + +// ImportJob представляет задачу на импорт модпака +type ImportJob struct { + ID int `json:"id"` + Status ImportJobStatus `json:"status"` + Progress int `json:"progress"` // 0-100 + ErrorMessage string `json:"error_message,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} diff --git a/internal/ws/hub.go b/internal/ws/hub.go new file mode 100644 index 0000000..08d6ef5 --- /dev/null +++ b/internal/ws/hub.go @@ -0,0 +1,179 @@ +package ws + +import ( + "log/slog" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + // Разрешаем CORS для разработки (в продакшене лучше ограничить) + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +// Hub maintains the set of active clients and broadcasts messages to the clients. +type Hub struct { + // Registered clients. + clients map[*Client]bool + + // Inbound messages from the clients (not used for now, only broadcast). + broadcast chan []byte + + // Register requests from the clients. + register chan *Client + + // Unregister requests from clients. + unregister chan *Client + + mu sync.Mutex +} + +func NewHub() *Hub { + return &Hub{ + broadcast: make(chan []byte), + register: make(chan *Client), + unregister: make(chan *Client), + clients: make(map[*Client]bool), + } +} + +func (h *Hub) Run() { + for { + select { + case client := <-h.register: + h.mu.Lock() + h.clients[client] = true + h.mu.Unlock() + slog.Debug("WS: Client registered") + + case client := <-h.unregister: + h.mu.Lock() + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + close(client.send) + slog.Debug("WS: Client unregistered") + } + h.mu.Unlock() + + case message := <-h.broadcast: + h.mu.Lock() + for client := range h.clients { + select { + case client.send <- message: + default: + close(client.send) + delete(h.clients, client) + } + } + h.mu.Unlock() + } + } +} + +// Broadcast отправляет сообщение всем подключенным клиентам +func (h *Hub) BroadcastMessage(msg []byte) { + h.broadcast <- msg +} + +// Client is a middleman between the websocket connection and the hub. +type Client struct { + hub *Hub + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan []byte +} + +// writePump pumps messages from the hub to the websocket connection. +func (c *Client) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + + if err := w.Close(); err != nil { + return + } + + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// readPump pumps messages from the websocket connection to the hub. +// (Needed to process PONGs and detect disconnects) +func (c *Client) readPump() { + defer func() { + c.hub.unregister <- c + c.conn.Close() + }() + c.conn.SetReadLimit(512) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + for { + _, _, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + slog.Error("WS: error", "error", err) + } + break + } + } +} + +// ServeWs handles websocket requests from the peer. +func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + slog.Error("WS: Failed to upgrade connection", "error", err) + return + } + client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} + client.hub.register <- client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go client.writePump() + go client.readPump() +}