package services import ( "fmt" "mime/multipart" "os" "path/filepath" "strings" "sync" "server/models" ) // MigrationProgress 迁移进度 type MigrationProgress struct { Total int Success int Failed int Current string Errors []string mu sync.Mutex } // AddSuccess 增加成功计数 func (p *MigrationProgress) AddSuccess() { p.mu.Lock() defer p.mu.Unlock() p.Success++ } // AddFailed 增加失败计数 func (p *MigrationProgress) AddFailed(err string) { p.mu.Lock() defer p.mu.Unlock() p.Failed++ p.Errors = append(p.Errors, err) } // SetCurrent 设置当前处理的文件 func (p *MigrationProgress) SetCurrent(filename string) { p.mu.Lock() defer p.mu.Unlock() p.Current = filename } // GetProgress 获取进度信息 func (p *MigrationProgress) GetProgress() (int, int, int, string) { p.mu.Lock() defer p.mu.Unlock() return p.Total, p.Success, p.Failed, p.Current } // StorageMigration 存储迁移服务 type StorageMigration struct { fromService StorageService toService StorageService progress *MigrationProgress } // NewStorageMigration 创建存储迁移服务 func NewStorageMigration(from, to StorageService) *StorageMigration { return &StorageMigration{ fromService: from, toService: to, progress: &MigrationProgress{ Errors: make([]string, 0), }, } } // MigrateFile 迁移单个文件 func (m *StorageMigration) MigrateFile(file *models.SystemFile) error { m.progress.SetCurrent(file.Name) // 如果是本地存储,从本地读取文件 if localFrom, ok := m.fromService.(*LocalStorage); ok { // 从本地文件系统读取 localPath := strings.TrimPrefix(file.Src, "/") filePath := filepath.Join(localFrom.BaseDir, localPath) f, err := os.Open(filePath) if err != nil { return fmt.Errorf("打开本地文件失败: %w", err) } defer f.Close() // 获取文件信息 stat, err := f.Stat() if err != nil { return fmt.Errorf("获取文件信息失败: %w", err) } // 创建 multipart.FileHeader header := &multipart.FileHeader{ Filename: file.Name, Size: stat.Size(), } // 上传到目标存储 result, err := m.toService.Upload(f, header) if err != nil { return fmt.Errorf("上传到目标存储失败: %w", err) } // 更新数据库记录 _, err = models.Orm.QueryTable(new(models.SystemFile)). Filter("id", file.ID). Update(map[string]interface{}{ "src": result.URL, }) if err != nil { // 上传成功但更新数据库失败,尝试删除已上传的文件 _ = m.toService.Delete(result.Key) return fmt.Errorf("更新数据库失败: %w", err) } m.progress.AddSuccess() return nil } // 如果是七牛云存储,需要先下载再上传(这里简化处理) return fmt.Errorf("暂不支持从七牛云迁移到本地") } // MigrateAll 迁移所有文件 func (m *StorageMigration) MigrateAll(tid uint64) error { // 获取所有文件 var files []models.SystemFile _, err := models.Orm.QueryTable(new(models.SystemFile)). Filter("tid", tid). Filter("delete_time__isnull", true). All(&files) if err != nil { return fmt.Errorf("获取文件列表失败: %w", err) } m.progress.Total = len(files) // 并发迁移(限制并发数) concurrency := 5 sem := make(chan struct{}, concurrency) var wg sync.WaitGroup for i := range files { wg.Add(1) go func(file *models.SystemFile) { defer wg.Done() sem <- struct{}{} // 获取信号量 defer func() { <-sem }() // 释放信号量 if err := m.MigrateFile(file); err != nil { m.progress.AddFailed(fmt.Sprintf("%s: %v", file.Name, err)) } }(&files[i]) } wg.Wait() return nil } // GetProgress 获取迁移进度 func (m *StorageMigration) GetProgress() *MigrationProgress { return m.progress } // MigrateLocalToQiniu 从本地存储迁移到七牛云 func MigrateLocalToQiniu(tid uint64) (*MigrationProgress, error) { // 获取存储配置 cfg, err := models.GetStorageConfig() if err != nil { return nil, fmt.Errorf("获取存储配置失败: %w", err) } if cfg.StorageType != "qiniu" { return nil, fmt.Errorf("当前存储类型不是七牛云") } // 创建存储服务 localStorage := NewLocalStorage() qiniuStorage := NewQiniuStorage(cfg) // 创建迁移服务 migration := NewStorageMigration(localStorage, qiniuStorage) // 执行迁移 if err := migration.MigrateAll(tid); err != nil { return migration.GetProgress(), err } return migration.GetProgress(), nil }