go-platform/services/storage_migration.go
2026-04-09 16:26:38 +08:00

192 lines
4.4 KiB
Go

package services
import (
"fmt"
"mime/multipart"
"os"
"path/filepath"
"strings"
"sync"
"server/models"
)
// MigrationProgress 迁移进度
type MigrationProgress struct {
Total int
Success int
Failed int
Current string
Errors []string
mu sync.Mutex
}
// AddSuccess 增加成功计数
func (p *MigrationProgress) AddSuccess() {
p.mu.Lock()
defer p.mu.Unlock()
p.Success++
}
// AddFailed 增加失败计数
func (p *MigrationProgress) AddFailed(err string) {
p.mu.Lock()
defer p.mu.Unlock()
p.Failed++
p.Errors = append(p.Errors, err)
}
// SetCurrent 设置当前处理的文件
func (p *MigrationProgress) SetCurrent(filename string) {
p.mu.Lock()
defer p.mu.Unlock()
p.Current = filename
}
// GetProgress 获取进度信息
func (p *MigrationProgress) GetProgress() (int, int, int, string) {
p.mu.Lock()
defer p.mu.Unlock()
return p.Total, p.Success, p.Failed, p.Current
}
// StorageMigration 存储迁移服务
type StorageMigration struct {
fromService StorageService
toService StorageService
progress *MigrationProgress
}
// NewStorageMigration 创建存储迁移服务
func NewStorageMigration(from, to StorageService) *StorageMigration {
return &StorageMigration{
fromService: from,
toService: to,
progress: &MigrationProgress{
Errors: make([]string, 0),
},
}
}
// MigrateFile 迁移单个文件
func (m *StorageMigration) MigrateFile(file *models.SystemFile) error {
m.progress.SetCurrent(file.Name)
// 如果是本地存储,从本地读取文件
if localFrom, ok := m.fromService.(*LocalStorage); ok {
// 从本地文件系统读取
localPath := strings.TrimPrefix(file.Src, "/")
filePath := filepath.Join(localFrom.BaseDir, localPath)
f, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("打开本地文件失败: %w", err)
}
defer f.Close()
// 获取文件信息
stat, err := f.Stat()
if err != nil {
return fmt.Errorf("获取文件信息失败: %w", err)
}
// 创建 multipart.FileHeader
header := &multipart.FileHeader{
Filename: file.Name,
Size: stat.Size(),
}
// 上传到目标存储
result, err := m.toService.Upload(f, header)
if err != nil {
return fmt.Errorf("上传到目标存储失败: %w", err)
}
// 更新数据库记录
_, err = models.Orm.QueryTable(new(models.SystemFile)).
Filter("id", file.ID).
Update(map[string]interface{}{
"src": result.URL,
})
if err != nil {
// 上传成功但更新数据库失败,尝试删除已上传的文件
_ = m.toService.Delete(result.Key)
return fmt.Errorf("更新数据库失败: %w", err)
}
m.progress.AddSuccess()
return nil
}
// 如果是七牛云存储,需要先下载再上传(这里简化处理)
return fmt.Errorf("暂不支持从七牛云迁移到本地")
}
// MigrateAll 迁移所有文件
func (m *StorageMigration) MigrateAll(tid uint64) error {
// 获取所有文件
var files []models.SystemFile
_, err := models.Orm.QueryTable(new(models.SystemFile)).
Filter("tid", tid).
Filter("delete_time__isnull", true).
All(&files)
if err != nil {
return fmt.Errorf("获取文件列表失败: %w", err)
}
m.progress.Total = len(files)
// 并发迁移(限制并发数)
concurrency := 5
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
for i := range files {
wg.Add(1)
go func(file *models.SystemFile) {
defer wg.Done()
sem <- struct{}{} // 获取信号量
defer func() { <-sem }() // 释放信号量
if err := m.MigrateFile(file); err != nil {
m.progress.AddFailed(fmt.Sprintf("%s: %v", file.Name, err))
}
}(&files[i])
}
wg.Wait()
return nil
}
// GetProgress 获取迁移进度
func (m *StorageMigration) GetProgress() *MigrationProgress {
return m.progress
}
// MigrateLocalToQiniu 从本地存储迁移到七牛云
func MigrateLocalToQiniu(tid uint64) (*MigrationProgress, error) {
// 获取存储配置
cfg, err := models.GetStorageConfig()
if err != nil {
return nil, fmt.Errorf("获取存储配置失败: %w", err)
}
if cfg.StorageType != "qiniu" {
return nil, fmt.Errorf("当前存储类型不是七牛云")
}
// 创建存储服务
localStorage := NewLocalStorage()
qiniuStorage := NewQiniuStorage(cfg)
// 创建迁移服务
migration := NewStorageMigration(localStorage, qiniuStorage)
// 执行迁移
if err := migration.MigrateAll(tid); err != nil {
return migration.GetProgress(), err
}
return migration.GetProgress(), nil
}