192 lines
4.4 KiB
Go
192 lines
4.4 KiB
Go
package services
|
|
|
|
import (
|
|
"fmt"
|
|
"mime/multipart"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
|
|
"server/models"
|
|
)
|
|
|
|
// MigrationProgress 迁移进度
|
|
type MigrationProgress struct {
|
|
Total int
|
|
Success int
|
|
Failed int
|
|
Current string
|
|
Errors []string
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// AddSuccess 增加成功计数
|
|
func (p *MigrationProgress) AddSuccess() {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.Success++
|
|
}
|
|
|
|
// AddFailed 增加失败计数
|
|
func (p *MigrationProgress) AddFailed(err string) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.Failed++
|
|
p.Errors = append(p.Errors, err)
|
|
}
|
|
|
|
// SetCurrent 设置当前处理的文件
|
|
func (p *MigrationProgress) SetCurrent(filename string) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.Current = filename
|
|
}
|
|
|
|
// GetProgress 获取进度信息
|
|
func (p *MigrationProgress) GetProgress() (int, int, int, string) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
return p.Total, p.Success, p.Failed, p.Current
|
|
}
|
|
|
|
// StorageMigration 存储迁移服务
|
|
type StorageMigration struct {
|
|
fromService StorageService
|
|
toService StorageService
|
|
progress *MigrationProgress
|
|
}
|
|
|
|
// NewStorageMigration 创建存储迁移服务
|
|
func NewStorageMigration(from, to StorageService) *StorageMigration {
|
|
return &StorageMigration{
|
|
fromService: from,
|
|
toService: to,
|
|
progress: &MigrationProgress{
|
|
Errors: make([]string, 0),
|
|
},
|
|
}
|
|
}
|
|
|
|
// MigrateFile 迁移单个文件
|
|
func (m *StorageMigration) MigrateFile(file *models.SystemFile) error {
|
|
m.progress.SetCurrent(file.Name)
|
|
|
|
// 如果是本地存储,从本地读取文件
|
|
if localFrom, ok := m.fromService.(*LocalStorage); ok {
|
|
// 从本地文件系统读取
|
|
localPath := strings.TrimPrefix(file.Src, "/")
|
|
filePath := filepath.Join(localFrom.BaseDir, localPath)
|
|
|
|
f, err := os.Open(filePath)
|
|
if err != nil {
|
|
return fmt.Errorf("打开本地文件失败: %w", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
// 获取文件信息
|
|
stat, err := f.Stat()
|
|
if err != nil {
|
|
return fmt.Errorf("获取文件信息失败: %w", err)
|
|
}
|
|
|
|
// 创建 multipart.FileHeader
|
|
header := &multipart.FileHeader{
|
|
Filename: file.Name,
|
|
Size: stat.Size(),
|
|
}
|
|
|
|
// 上传到目标存储
|
|
result, err := m.toService.Upload(f, header)
|
|
if err != nil {
|
|
return fmt.Errorf("上传到目标存储失败: %w", err)
|
|
}
|
|
|
|
// 更新数据库记录
|
|
_, err = models.Orm.QueryTable(new(models.SystemFile)).
|
|
Filter("id", file.ID).
|
|
Update(map[string]interface{}{
|
|
"src": result.URL,
|
|
})
|
|
if err != nil {
|
|
// 上传成功但更新数据库失败,尝试删除已上传的文件
|
|
_ = m.toService.Delete(result.Key)
|
|
return fmt.Errorf("更新数据库失败: %w", err)
|
|
}
|
|
|
|
m.progress.AddSuccess()
|
|
return nil
|
|
}
|
|
|
|
// 如果是七牛云存储,需要先下载再上传(这里简化处理)
|
|
return fmt.Errorf("暂不支持从七牛云迁移到本地")
|
|
}
|
|
|
|
// MigrateAll 迁移所有文件
|
|
func (m *StorageMigration) MigrateAll(tid uint64) error {
|
|
// 获取所有文件
|
|
var files []models.SystemFile
|
|
_, err := models.Orm.QueryTable(new(models.SystemFile)).
|
|
Filter("tid", tid).
|
|
Filter("delete_time__isnull", true).
|
|
All(&files)
|
|
if err != nil {
|
|
return fmt.Errorf("获取文件列表失败: %w", err)
|
|
}
|
|
|
|
m.progress.Total = len(files)
|
|
|
|
// 并发迁移(限制并发数)
|
|
concurrency := 5
|
|
sem := make(chan struct{}, concurrency)
|
|
var wg sync.WaitGroup
|
|
|
|
for i := range files {
|
|
wg.Add(1)
|
|
go func(file *models.SystemFile) {
|
|
defer wg.Done()
|
|
sem <- struct{}{} // 获取信号量
|
|
defer func() { <-sem }() // 释放信号量
|
|
|
|
if err := m.MigrateFile(file); err != nil {
|
|
m.progress.AddFailed(fmt.Sprintf("%s: %v", file.Name, err))
|
|
}
|
|
}(&files[i])
|
|
}
|
|
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// GetProgress 获取迁移进度
|
|
func (m *StorageMigration) GetProgress() *MigrationProgress {
|
|
return m.progress
|
|
}
|
|
|
|
// MigrateLocalToQiniu 从本地存储迁移到七牛云
|
|
func MigrateLocalToQiniu(tid uint64) (*MigrationProgress, error) {
|
|
// 获取存储配置
|
|
cfg, err := models.GetStorageConfig()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("获取存储配置失败: %w", err)
|
|
}
|
|
|
|
if cfg.StorageType != "qiniu" {
|
|
return nil, fmt.Errorf("当前存储类型不是七牛云")
|
|
}
|
|
|
|
// 创建存储服务
|
|
localStorage := NewLocalStorage()
|
|
qiniuStorage := NewQiniuStorage(cfg)
|
|
|
|
// 创建迁移服务
|
|
migration := NewStorageMigration(localStorage, qiniuStorage)
|
|
|
|
// 执行迁移
|
|
if err := migration.MigrateAll(tid); err != nil {
|
|
return migration.GetProgress(), err
|
|
}
|
|
|
|
return migration.GetProgress(), nil
|
|
}
|