feat: implement async modpack import with websockets
This commit is contained in:
@@ -12,6 +12,7 @@ import (
|
|||||||
"gitea.mrixs.me/minecraft-platform/backend/internal/api"
|
"gitea.mrixs.me/minecraft-platform/backend/internal/api"
|
||||||
"gitea.mrixs.me/minecraft-platform/backend/internal/core"
|
"gitea.mrixs.me/minecraft-platform/backend/internal/core"
|
||||||
"gitea.mrixs.me/minecraft-platform/backend/internal/database"
|
"gitea.mrixs.me/minecraft-platform/backend/internal/database"
|
||||||
|
"gitea.mrixs.me/minecraft-platform/backend/internal/ws"
|
||||||
|
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/go-chi/chi/v5/middleware"
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
@@ -31,12 +32,17 @@ func main() {
|
|||||||
userRepo := &database.UserRepository{DB: dbPool}
|
userRepo := &database.UserRepository{DB: dbPool}
|
||||||
serverRepo := &database.ServerRepository{DB: dbPool}
|
serverRepo := &database.ServerRepository{DB: dbPool}
|
||||||
modpackRepo := &database.ModpackRepository{DB: dbPool}
|
modpackRepo := &database.ModpackRepository{DB: dbPool}
|
||||||
|
jobRepo := &database.JobRepository{DB: dbPool}
|
||||||
|
|
||||||
// --- Инициализация сервисов ---
|
// --- Инициализация сервисов ---
|
||||||
userService := &core.UserService{Repo: userRepo}
|
userService := &core.UserService{Repo: userRepo}
|
||||||
authService := &core.AuthService{UserRepo: userRepo}
|
authService := &core.AuthService{UserRepo: userRepo}
|
||||||
serverPoller := &core.ServerPoller{Repo: serverRepo}
|
serverPoller := &core.ServerPoller{Repo: serverRepo}
|
||||||
|
|
||||||
|
// --- Инициализация WebSocket Hub ---
|
||||||
|
hub := ws.NewHub()
|
||||||
|
go hub.Run()
|
||||||
|
|
||||||
keyPath := os.Getenv("RSA_PRIVATE_KEY_PATH")
|
keyPath := os.Getenv("RSA_PRIVATE_KEY_PATH")
|
||||||
if keyPath == "" {
|
if keyPath == "" {
|
||||||
slog.Error("RSA_PRIVATE_KEY_PATH environment variable is not set")
|
slog.Error("RSA_PRIVATE_KEY_PATH environment variable is not set")
|
||||||
@@ -74,7 +80,9 @@ func main() {
|
|||||||
launcherHandler := &api.LauncherHandler{ModpackRepo: modpackRepo}
|
launcherHandler := &api.LauncherHandler{ModpackRepo: modpackRepo}
|
||||||
modpackHandler := &api.ModpackHandler{
|
modpackHandler := &api.ModpackHandler{
|
||||||
ModpackRepo: modpackRepo,
|
ModpackRepo: modpackRepo,
|
||||||
|
JobRepo: jobRepo,
|
||||||
JanitorService: janitorService,
|
JanitorService: janitorService,
|
||||||
|
Hub: hub,
|
||||||
}
|
}
|
||||||
adminUserHandler := &api.AdminUserHandler{UserRepo: userRepo}
|
adminUserHandler := &api.AdminUserHandler{UserRepo: userRepo}
|
||||||
|
|
||||||
@@ -120,6 +128,11 @@ func main() {
|
|||||||
r.Route("/api/admin", func(r chi.Router) {
|
r.Route("/api/admin", func(r chi.Router) {
|
||||||
r.Use(api.AdminMiddleware)
|
r.Use(api.AdminMiddleware)
|
||||||
|
|
||||||
|
// WebSocket endpoint for jobs
|
||||||
|
r.Get("/ws/jobs", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ws.ServeWs(hub, w, r)
|
||||||
|
})
|
||||||
|
|
||||||
r.Route("/modpacks", func(r chi.Router) {
|
r.Route("/modpacks", func(r chi.Router) {
|
||||||
r.Post("/import", modpackHandler.ImportModpack)
|
r.Post("/import", modpackHandler.ImportModpack)
|
||||||
})
|
})
|
||||||
|
|||||||
1
go.mod
1
go.mod
@@ -7,6 +7,7 @@ require (
|
|||||||
github.com/go-chi/chi/v5 v5.2.1
|
github.com/go-chi/chi/v5 v5.2.1
|
||||||
github.com/golang-jwt/jwt/v5 v5.2.2
|
github.com/golang-jwt/jwt/v5 v5.2.2
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
|
github.com/gorilla/websocket v1.5.3
|
||||||
github.com/jackc/pgx/v5 v5.7.5
|
github.com/jackc/pgx/v5 v5.7.5
|
||||||
golang.org/x/crypto v0.39.0
|
golang.org/x/crypto v0.39.0
|
||||||
)
|
)
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -9,6 +9,8 @@ github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeD
|
|||||||
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
|
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||||
|
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
|
|||||||
@@ -16,15 +16,25 @@ const ClaimsContextKey = contextKey("claims")
|
|||||||
// AuthMiddleware проверяет JWT токен и добавляет claims в контекст запроса.
|
// AuthMiddleware проверяет JWT токен и добавляет claims в контекст запроса.
|
||||||
func AuthMiddleware(next http.Handler) http.Handler {
|
func AuthMiddleware(next http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var tokenString string
|
||||||
|
|
||||||
|
// 1. Проверяем заголовок Authorization
|
||||||
authHeader := r.Header.Get("Authorization")
|
authHeader := r.Header.Get("Authorization")
|
||||||
if authHeader == "" {
|
if authHeader != "" {
|
||||||
http.Error(w, "Authorization header required", http.StatusUnauthorized)
|
tokenString = strings.TrimPrefix(authHeader, "Bearer ")
|
||||||
return
|
if tokenString == authHeader { // Не было префикса Bearer
|
||||||
|
http.Error(w, "Invalid token format", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tokenString := strings.TrimPrefix(authHeader, "Bearer ")
|
// 2. Если заголовка нет, проверяем параметр query (для WebSocket)
|
||||||
if tokenString == authHeader {
|
if tokenString == "" {
|
||||||
http.Error(w, "Invalid token format", http.StatusUnauthorized)
|
tokenString = r.URL.Query().Get("token")
|
||||||
|
}
|
||||||
|
|
||||||
|
if tokenString == "" {
|
||||||
|
http.Error(w, "Authorization required", http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,8 +2,10 @@ package api
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
@@ -11,50 +13,51 @@ import (
|
|||||||
"gitea.mrixs.me/minecraft-platform/backend/internal/core/importer"
|
"gitea.mrixs.me/minecraft-platform/backend/internal/core/importer"
|
||||||
"gitea.mrixs.me/minecraft-platform/backend/internal/database"
|
"gitea.mrixs.me/minecraft-platform/backend/internal/database"
|
||||||
"gitea.mrixs.me/minecraft-platform/backend/internal/models"
|
"gitea.mrixs.me/minecraft-platform/backend/internal/models"
|
||||||
|
"gitea.mrixs.me/minecraft-platform/backend/internal/ws"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ModpackHandler struct {
|
type ModpackHandler struct {
|
||||||
ModpackRepo *database.ModpackRepository
|
ModpackRepo *database.ModpackRepository
|
||||||
|
JobRepo *database.JobRepository
|
||||||
JanitorService *core.FileJanitorService
|
JanitorService *core.FileJanitorService
|
||||||
|
Hub *ws.Hub
|
||||||
}
|
}
|
||||||
|
|
||||||
// ImportModpack обрабатывает загрузку и импорт модпака.
|
// ImportJobParams содержит параметры для фоновой задачи импорта
|
||||||
|
type ImportJobParams struct {
|
||||||
|
ImporterType string
|
||||||
|
ImportMethod string
|
||||||
|
SourceURL string
|
||||||
|
TempZipPath string
|
||||||
|
Name string
|
||||||
|
DisplayName string
|
||||||
|
MCVersion string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ImportModpack обрабатывает запрос на импорт и запускает асинхронную задачу
|
||||||
func (h *ModpackHandler) ImportModpack(w http.ResponseWriter, r *http.Request) {
|
func (h *ModpackHandler) ImportModpack(w http.ResponseWriter, r *http.Request) {
|
||||||
if err := r.ParseMultipartForm(512 << 20); err != nil {
|
if err := r.ParseMultipartForm(512 << 20); err != nil {
|
||||||
http.Error(w, "File too large", http.StatusBadRequest)
|
http.Error(w, "File too large", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
importerType := r.FormValue("importerType")
|
params := ImportJobParams{
|
||||||
importMethod := r.FormValue("importMethod")
|
ImporterType: r.FormValue("importerType"),
|
||||||
sourceURL := r.FormValue("sourceUrl")
|
ImportMethod: r.FormValue("importMethod"),
|
||||||
|
SourceURL: r.FormValue("sourceUrl"),
|
||||||
|
Name: r.FormValue("name"),
|
||||||
|
DisplayName: r.FormValue("displayName"),
|
||||||
|
MCVersion: r.FormValue("mcVersion"),
|
||||||
|
}
|
||||||
|
|
||||||
var tempZipPath string
|
// Валидация
|
||||||
var err error
|
if params.Name == "" || params.DisplayName == "" || params.MCVersion == "" {
|
||||||
|
http.Error(w, "Missing required fields", http.StatusBadRequest)
|
||||||
// --- Выбираем импортер ---
|
|
||||||
var imp importer.ModpackImporter
|
|
||||||
storagePath := os.Getenv("MODPACKS_STORAGE_PATH")
|
|
||||||
|
|
||||||
switch importerType {
|
|
||||||
case "simple":
|
|
||||||
imp = &importer.SimpleZipImporter{StoragePath: storagePath}
|
|
||||||
case "curseforge":
|
|
||||||
apiKey := os.Getenv("CURSEFORGE_API_KEY")
|
|
||||||
if apiKey == "" {
|
|
||||||
http.Error(w, "CurseForge API key is not configured on the server", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
imp = importer.NewCurseForgeImporter(storagePath, apiKey)
|
|
||||||
case "modrinth":
|
|
||||||
imp = importer.NewModrinthImporter(storagePath)
|
|
||||||
default:
|
|
||||||
http.Error(w, "Invalid importer type", http.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Получаем zip-файл ---
|
// Обработка загрузки файла
|
||||||
if importMethod == "file" {
|
if params.ImportMethod == "file" {
|
||||||
file, _, err := r.FormFile("file")
|
file, _, err := r.FormFile("file")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, "Invalid file upload", http.StatusBadRequest)
|
http.Error(w, "Invalid file upload", http.StatusBadRequest)
|
||||||
@@ -67,55 +70,136 @@ func (h *ModpackHandler) ImportModpack(w http.ResponseWriter, r *http.Request) {
|
|||||||
http.Error(w, "Could not create temp file", http.StatusInternalServerError)
|
http.Error(w, "Could not create temp file", http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer tempFile.Close()
|
// Не удаляем файл здесь, так как он нужен worker-у. Удалим в worker-е.
|
||||||
defer os.Remove(tempFile.Name())
|
// defer os.Remove(tempFile.Name())
|
||||||
|
|
||||||
if _, err := io.Copy(tempFile, file); err != nil {
|
if _, err := io.Copy(tempFile, file); err != nil {
|
||||||
|
tempFile.Close()
|
||||||
|
os.Remove(tempFile.Name())
|
||||||
http.Error(w, "Could not save temp file", http.StatusInternalServerError)
|
http.Error(w, "Could not save temp file", http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tempZipPath = tempFile.Name()
|
tempFile.Close()
|
||||||
|
params.TempZipPath = tempFile.Name()
|
||||||
} else if importMethod == "url" {
|
|
||||||
cfImporter, ok := imp.(*importer.CurseForgeImporter)
|
|
||||||
if !ok {
|
|
||||||
http.Error(w, "Importer type does not support URL import", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
tempZipPath, err = cfImporter.DownloadModpackFromURL(sourceURL)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, fmt.Sprintf("Failed to download from URL: %v", err), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer os.Remove(tempZipPath)
|
|
||||||
|
|
||||||
} else {
|
|
||||||
http.Error(w, "Invalid import method", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Запускаем импорт ---
|
// Создаем задачу в БД
|
||||||
files, err := imp.Import(tempZipPath)
|
jobID, err := h.JobRepo.CreateJob(r.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, fmt.Sprintf("Import failed: %v", err), http.StatusInternalServerError)
|
if params.ImportMethod == "file" {
|
||||||
|
os.Remove(params.TempZipPath)
|
||||||
|
}
|
||||||
|
http.Error(w, fmt.Sprintf("Failed to create job: %v", err), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Сохраняем результат в БД ---
|
// Запускаем worker
|
||||||
|
go h.processImportJob(jobID, params)
|
||||||
|
|
||||||
|
// Отправляем ответ
|
||||||
|
w.WriteHeader(http.StatusAccepted)
|
||||||
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||||
|
"job_id": jobID,
|
||||||
|
"message": "Import job started",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// processImportJob выполняет импорт в фоне
|
||||||
|
func (h *ModpackHandler) processImportJob(jobID int, params ImportJobParams) {
|
||||||
|
ctx := context.Background()
|
||||||
|
h.updateJobStatus(ctx, jobID, models.JobStatusDownloading, 10, "")
|
||||||
|
|
||||||
|
// Очистка временного файла по завершении
|
||||||
|
if params.TempZipPath != "" {
|
||||||
|
defer os.Remove(params.TempZipPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
storagePath := os.Getenv("MODPACKS_STORAGE_PATH")
|
||||||
|
var imp importer.ModpackImporter
|
||||||
|
|
||||||
|
// Настройка импортера
|
||||||
|
switch params.ImporterType {
|
||||||
|
case "simple":
|
||||||
|
imp = &importer.SimpleZipImporter{StoragePath: storagePath}
|
||||||
|
case "curseforge":
|
||||||
|
apiKey := os.Getenv("CURSEFORGE_API_KEY")
|
||||||
|
if apiKey == "" {
|
||||||
|
h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, "CurseForge API key missing")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
imp = importer.NewCurseForgeImporter(storagePath, apiKey)
|
||||||
|
case "modrinth":
|
||||||
|
imp = importer.NewModrinthImporter(storagePath)
|
||||||
|
default:
|
||||||
|
h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, "Invalid importer type")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Загрузка по URL если нужно
|
||||||
|
if params.ImportMethod == "url" {
|
||||||
|
h.updateJobStatus(ctx, jobID, models.JobStatusDownloading, 20, "Downloading from URL...")
|
||||||
|
|
||||||
|
// Логика скачивания зависит от типа импортера, пока поддерживаем CurseForge
|
||||||
|
// TODO: Сделать интерфейс DownloadableImporter
|
||||||
|
if cfImporter, ok := imp.(*importer.CurseForgeImporter); ok {
|
||||||
|
zipPath, err := cfImporter.DownloadModpackFromURL(params.SourceURL)
|
||||||
|
if err != nil {
|
||||||
|
h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, fmt.Sprintf("Download failed: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
params.TempZipPath = zipPath
|
||||||
|
defer os.Remove(zipPath) // Удаляем скачанный файл после обработки
|
||||||
|
} else {
|
||||||
|
// Для других типов пока не поддерживаем URL download внутри импортера
|
||||||
|
h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, "URL import not supported for this type")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
h.updateJobStatus(ctx, jobID, models.JobStatusProcessing, 40, "Processing modpack files...")
|
||||||
|
|
||||||
|
// Импорт файлов
|
||||||
|
files, err := imp.Import(params.TempZipPath)
|
||||||
|
if err != nil {
|
||||||
|
h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, fmt.Sprintf("Import failed: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.updateJobStatus(ctx, jobID, models.JobStatusProcessing, 80, "Saving to database...")
|
||||||
|
|
||||||
|
// Сохранение в БД
|
||||||
modpack := &models.Modpack{
|
modpack := &models.Modpack{
|
||||||
Name: r.FormValue("name"),
|
Name: params.Name,
|
||||||
DisplayName: r.FormValue("displayName"),
|
DisplayName: params.DisplayName,
|
||||||
MinecraftVersion: r.FormValue("mcVersion"),
|
MinecraftVersion: params.MCVersion,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = h.ModpackRepo.CreateModpackTx(r.Context(), modpack, files)
|
err = h.ModpackRepo.CreateModpackTx(ctx, modpack, files)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, fmt.Sprintf("Database save failed: %v", err), http.StatusInternalServerError)
|
h.updateJobStatus(ctx, jobID, models.JobStatusFailed, 0, fmt.Sprintf("Database save failed: %v", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.WriteHeader(http.StatusCreated)
|
h.updateJobStatus(ctx, jobID, models.JobStatusCompleted, 100, "Success")
|
||||||
fmt.Fprintf(w, "Modpack '%s' imported successfully with %d files.", modpack.DisplayName, len(files))
|
|
||||||
|
|
||||||
|
// Запуск Janitor-а
|
||||||
go h.JanitorService.CleanOrphanedFiles(context.Background())
|
go h.JanitorService.CleanOrphanedFiles(context.Background())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateJobStatus обновляет статус в БД и отправляет уведомление через WebSocket
|
||||||
|
func (h *ModpackHandler) updateJobStatus(ctx context.Context, jobID int, status models.ImportJobStatus, progress int, errMsg string) {
|
||||||
|
// Обновляем БД
|
||||||
|
if err := h.JobRepo.UpdateJobStatus(ctx, jobID, status, progress, errMsg); err != nil {
|
||||||
|
slog.Error("Failed to update job status in DB", "jobID", jobID, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Отправляем в WebSocket
|
||||||
|
update := map[string]interface{}{
|
||||||
|
"job_id": jobID,
|
||||||
|
"status": status,
|
||||||
|
"progress": progress,
|
||||||
|
"error_message": errMsg,
|
||||||
|
}
|
||||||
|
msg, _ := json.Marshal(update)
|
||||||
|
h.Hub.BroadcastMessage(msg)
|
||||||
|
}
|
||||||
|
|||||||
58
internal/database/job_repository.go
Normal file
58
internal/database/job_repository.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"gitea.mrixs.me/minecraft-platform/backend/internal/models"
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobRepository struct {
|
||||||
|
DB *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateJob создает новую задачу со статусом pending
|
||||||
|
func (r *JobRepository) CreateJob(ctx context.Context) (int, error) {
|
||||||
|
var jobID int
|
||||||
|
query := `INSERT INTO modpack_import_jobs (status, progress, created_at, updated_at)
|
||||||
|
VALUES ($1, $2, NOW(), NOW()) RETURNING id`
|
||||||
|
|
||||||
|
err := r.DB.QueryRow(ctx, query, models.JobStatusPending, 0).Scan(&jobID)
|
||||||
|
return jobID, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateJobStatus обновляет статус и прогресс задачи
|
||||||
|
func (r *JobRepository) UpdateJobStatus(ctx context.Context, jobID int, status models.ImportJobStatus, progress int, errorMessage string) error {
|
||||||
|
query := `UPDATE modpack_import_jobs
|
||||||
|
SET status = $1, progress = $2, error_message = $3, updated_at = NOW()
|
||||||
|
WHERE id = $4`
|
||||||
|
|
||||||
|
_, err := r.DB.Exec(ctx, query, status, progress, errorMessage, jobID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetJob получает задачу по ID
|
||||||
|
func (r *JobRepository) GetJob(ctx context.Context, jobID int) (*models.ImportJob, error) {
|
||||||
|
job := &models.ImportJob{}
|
||||||
|
query := `SELECT id, status, progress, error_message, created_at, updated_at
|
||||||
|
FROM modpack_import_jobs WHERE id = $1`
|
||||||
|
|
||||||
|
var errMsg *string // Для обработки NULL
|
||||||
|
err := r.DB.QueryRow(ctx, query, jobID).Scan(
|
||||||
|
&job.ID, &job.Status, &job.Progress, &errMsg, &job.CreatedAt, &job.UpdatedAt,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err == pgx.ErrNoRows {
|
||||||
|
return nil, nil // Или спец ошибка
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if errMsg != nil {
|
||||||
|
job.ErrorMessage = *errMsg
|
||||||
|
}
|
||||||
|
|
||||||
|
return job, nil
|
||||||
|
}
|
||||||
24
internal/models/job.go
Normal file
24
internal/models/job.go
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
package models
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// ImportJobStatus определяет возможные статусы задачи импорта
|
||||||
|
type ImportJobStatus string
|
||||||
|
|
||||||
|
const (
|
||||||
|
JobStatusPending ImportJobStatus = "pending"
|
||||||
|
JobStatusDownloading ImportJobStatus = "downloading"
|
||||||
|
JobStatusProcessing ImportJobStatus = "processing"
|
||||||
|
JobStatusCompleted ImportJobStatus = "completed"
|
||||||
|
JobStatusFailed ImportJobStatus = "failed"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ImportJob представляет задачу на импорт модпака
|
||||||
|
type ImportJob struct {
|
||||||
|
ID int `json:"id"`
|
||||||
|
Status ImportJobStatus `json:"status"`
|
||||||
|
Progress int `json:"progress"` // 0-100
|
||||||
|
ErrorMessage string `json:"error_message,omitempty"`
|
||||||
|
CreatedAt time.Time `json:"created_at"`
|
||||||
|
UpdatedAt time.Time `json:"updated_at"`
|
||||||
|
}
|
||||||
179
internal/ws/hub.go
Normal file
179
internal/ws/hub.go
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Time allowed to write a message to the peer.
|
||||||
|
writeWait = 10 * time.Second
|
||||||
|
|
||||||
|
// Time allowed to read the next pong message from the peer.
|
||||||
|
pongWait = 60 * time.Second
|
||||||
|
|
||||||
|
// Send pings to peer with this period. Must be less than pongWait.
|
||||||
|
pingPeriod = (pongWait * 9) / 10
|
||||||
|
)
|
||||||
|
|
||||||
|
var upgrader = websocket.Upgrader{
|
||||||
|
ReadBufferSize: 1024,
|
||||||
|
WriteBufferSize: 1024,
|
||||||
|
// Разрешаем CORS для разработки (в продакшене лучше ограничить)
|
||||||
|
CheckOrigin: func(r *http.Request) bool {
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hub maintains the set of active clients and broadcasts messages to the clients.
|
||||||
|
type Hub struct {
|
||||||
|
// Registered clients.
|
||||||
|
clients map[*Client]bool
|
||||||
|
|
||||||
|
// Inbound messages from the clients (not used for now, only broadcast).
|
||||||
|
broadcast chan []byte
|
||||||
|
|
||||||
|
// Register requests from the clients.
|
||||||
|
register chan *Client
|
||||||
|
|
||||||
|
// Unregister requests from clients.
|
||||||
|
unregister chan *Client
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHub() *Hub {
|
||||||
|
return &Hub{
|
||||||
|
broadcast: make(chan []byte),
|
||||||
|
register: make(chan *Client),
|
||||||
|
unregister: make(chan *Client),
|
||||||
|
clients: make(map[*Client]bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Hub) Run() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case client := <-h.register:
|
||||||
|
h.mu.Lock()
|
||||||
|
h.clients[client] = true
|
||||||
|
h.mu.Unlock()
|
||||||
|
slog.Debug("WS: Client registered")
|
||||||
|
|
||||||
|
case client := <-h.unregister:
|
||||||
|
h.mu.Lock()
|
||||||
|
if _, ok := h.clients[client]; ok {
|
||||||
|
delete(h.clients, client)
|
||||||
|
close(client.send)
|
||||||
|
slog.Debug("WS: Client unregistered")
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
|
||||||
|
case message := <-h.broadcast:
|
||||||
|
h.mu.Lock()
|
||||||
|
for client := range h.clients {
|
||||||
|
select {
|
||||||
|
case client.send <- message:
|
||||||
|
default:
|
||||||
|
close(client.send)
|
||||||
|
delete(h.clients, client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Broadcast отправляет сообщение всем подключенным клиентам
|
||||||
|
func (h *Hub) BroadcastMessage(msg []byte) {
|
||||||
|
h.broadcast <- msg
|
||||||
|
}
|
||||||
|
|
||||||
|
// Client is a middleman between the websocket connection and the hub.
|
||||||
|
type Client struct {
|
||||||
|
hub *Hub
|
||||||
|
|
||||||
|
// The websocket connection.
|
||||||
|
conn *websocket.Conn
|
||||||
|
|
||||||
|
// Buffered channel of outbound messages.
|
||||||
|
send chan []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// writePump pumps messages from the hub to the websocket connection.
|
||||||
|
func (c *Client) writePump() {
|
||||||
|
ticker := time.NewTicker(pingPeriod)
|
||||||
|
defer func() {
|
||||||
|
ticker.Stop()
|
||||||
|
c.conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case message, ok := <-c.send:
|
||||||
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
|
if !ok {
|
||||||
|
// The hub closed the channel.
|
||||||
|
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w, err := c.conn.NextWriter(websocket.TextMessage)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Write(message)
|
||||||
|
|
||||||
|
if err := w.Close(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-ticker.C:
|
||||||
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
|
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// readPump pumps messages from the websocket connection to the hub.
|
||||||
|
// (Needed to process PONGs and detect disconnects)
|
||||||
|
func (c *Client) readPump() {
|
||||||
|
defer func() {
|
||||||
|
c.hub.unregister <- c
|
||||||
|
c.conn.Close()
|
||||||
|
}()
|
||||||
|
c.conn.SetReadLimit(512)
|
||||||
|
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||||
|
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
|
||||||
|
for {
|
||||||
|
_, _, err := c.conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||||
|
slog.Error("WS: error", "error", err)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeWs handles websocket requests from the peer.
|
||||||
|
func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("WS: Failed to upgrade connection", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
|
||||||
|
client.hub.register <- client
|
||||||
|
|
||||||
|
// Allow collection of memory referenced by the caller by doing all work in
|
||||||
|
// new goroutines.
|
||||||
|
go client.writePump()
|
||||||
|
go client.readPump()
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user