yunzerwebsiteallinone/go/services/reminder_scheduler.go
2026-06-18 00:58:26 +08:00

372 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"server/models"
)
// ReminderSender 提醒发送接口
type ReminderSender interface {
Send(ctx context.Context, reminder *models.PlatformScheduleReminder, title, content string) (success bool, err error)
}
// SMSSender 短信发送实现
type SMSSender struct{}
func (s *SMSSender) Send(ctx context.Context, reminder *models.PlatformScheduleReminder, title, content string) (bool, error) {
backendURL, apiKey, err := getDefaultSystemSMSConfig()
if err != nil {
return false, err
}
phone := ""
if reminder.ReceiverTarget != nil && *reminder.ReceiverTarget != "" {
phone = *reminder.ReceiverTarget
} else {
var user models.AdminUser
if err := models.Orm.QueryTable(new(models.AdminUser)).Filter("id", reminder.ReceiverUserID).One(&user); err == nil && user.Phone != nil {
phone = *user.Phone
}
}
if phone == "" {
return false, fmt.Errorf("未配置手机号")
}
enqueueURL := strings.TrimRight(backendURL, "/") + "/api/v1/business/outbound-tasks"
payload := map[string]interface{}{
"phone": phone,
"content": title + ": " + content,
}
bs, _ := json.Marshal(payload)
client := &http.Client{Timeout: 10 * time.Second}
req, err := http.NewRequestWithContext(ctx, "POST", enqueueURL, bytes.NewReader(bs))
if err != nil {
return false, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Api-Key", apiKey)
resp, err := client.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return false, fmt.Errorf("网关返回HTTP状态码: %d, 返回内容: %s", resp.StatusCode, string(bodyBytes))
}
return true, nil
}
// EmailSender 邮件发送实现
type EmailSender struct{}
func (s *EmailSender) Send(ctx context.Context, reminder *models.PlatformScheduleReminder, title, content string) (bool, error) {
emails, err := ListSystemEmails()
if err != nil || len(emails) == 0 {
return false, fmt.Errorf("未配置系统邮箱")
}
emailCfg := emails[0]
if emailCfg.FromAddress == "" || emailCfg.Host == "" {
return false, fmt.Errorf("未配置系统邮箱")
}
toEmail := ""
if reminder.ReceiverTarget != nil && *reminder.ReceiverTarget != "" {
toEmail = *reminder.ReceiverTarget
} else {
var user models.AdminUser
if err := models.Orm.QueryTable(new(models.AdminUser)).Filter("id", reminder.ReceiverUserID).One(&user); err == nil && user.Email != nil {
toEmail = *user.Email
}
}
if toEmail == "" {
return false, fmt.Errorf("未配置收件邮箱")
}
sysDomain := models.GetPlatformSettingValue("system_domain", "https://api.yunzer.cn")
ackToken := ""
if reminder.AckToken != nil {
ackToken = *reminder.AckToken
}
// 构造 HTML 邮件
htmlBody := fmt.Sprintf(`
<div style="font-family: Arial, sans-serif; padding: 20px; border: 1px solid #eee; border-radius: 5px; max-width: 600px; margin: 0 auto;">
<h2 style="color: #409EFF; margin-bottom: 20px;">日程提醒:%s</h2>
<p style="font-size: 16px; line-height: 1.6; color: #333;">%s</p>
<hr style="border: 0; border-top: 1px solid #eee; margin: 20px 0;" />
`, title, content)
if ackToken != "" {
ackURL := fmt.Sprintf("%s/api/schedule/reminder/ack?token=%s", strings.TrimRight(sysDomain, "/"), ackToken)
htmlBody += fmt.Sprintf(`
<div style="text-align: center; margin-top: 30px;">
<a href="%s" target="_blank" style="background-color: #409EFF; color: #fff; padding: 12px 24px; text-decoration: none; border-radius: 4px; font-weight: bold; display: inline-block;">
收到,确认此提醒
</a>
</div>
<p style="font-size: 12px; color: #999; text-align: center; margin-top: 15px;">确认收到后,系统将不再向您发送该日程的重复提醒。</p>
`, ackURL)
}
htmlBody += "</div>"
cfg := SMTPConfig{
FromAddress: emailCfg.FromAddress,
Host: emailCfg.Host,
Port: emailCfg.Port,
Password: emailCfg.Password,
Encryption: emailCfg.Encryption,
Timeout: emailCfg.Timeout,
}
if emailCfg.FromName != nil {
cfg.FromName = *emailCfg.FromName
}
err = SendHTMLEmailSMTP(cfg, toEmail, title, htmlBody)
if err != nil {
return false, err
}
return true, nil
}
// BarkSender Bark 推送实现
type BarkSender struct{}
func (s *BarkSender) Send(ctx context.Context, reminder *models.PlatformScheduleReminder, title, content string) (bool, error) {
deviceKey := ""
if reminder.ReceiverTarget != nil && *reminder.ReceiverTarget != "" {
deviceKey = *reminder.ReceiverTarget
} else {
deviceKey = models.GetPlatformSettingValue("bark_device_key", "")
}
if deviceKey == "" {
return false, fmt.Errorf("Bark 设备 Key 未配置")
}
serverURL := models.GetPlatformSettingValue("bark_server_url", "https://api.day.app")
sysDomain := models.GetPlatformSettingValue("system_domain", "https://api.yunzer.cn")
ackToken := ""
if reminder.AckToken != nil {
ackToken = *reminder.AckToken
}
baseURL := strings.TrimRight(serverURL, "/")
escapedTitle := url.PathEscape(title)
pushContent := content
if ackToken != "" {
pushContent += "\n确认收到请点击→"
}
escapedContent := url.PathEscape(pushContent)
barkURL := fmt.Sprintf("%s/%s/%s/%s", baseURL, deviceKey, escapedTitle, escapedContent)
if ackToken != "" {
ackURL := fmt.Sprintf("%s/api/schedule/reminder/ack?token=%s", strings.TrimRight(sysDomain, "/"), ackToken)
// Bark 官方推送支持 url 参数
barkURL += "?url=" + url.QueryEscape(ackURL)
}
client := &http.Client{Timeout: 10 * time.Second}
req, err := http.NewRequestWithContext(ctx, "GET", barkURL, nil)
if err != nil {
return false, err
}
resp, err := client.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return false, fmt.Errorf("Bark返回HTTP状态码: %d, 返回内容: %s", resp.StatusCode, string(bodyBytes))
}
return true, nil
}
// SiteMsgSender 站内信发送实现
type SiteMsgSender struct{}
func (s *SiteMsgSender) Send(ctx context.Context, reminder *models.PlatformScheduleReminder, title, content string) (bool, error) {
now := time.Now()
msg := &models.SystemReminderList{
Title: title,
Content: content,
SenderID: 0,
SenderType: "system",
ReceiverID: reminder.ReceiverUserID,
ReceiverType: "platform", // 平台端用户
IsRead: 0,
CreateTime: &now,
}
_, err := models.Orm.Insert(msg)
if err != nil {
return false, err
}
return true, nil
}
// generateUUID 生成一个安全的随机 UUID 字符
func generateUUID() string {
b := make([]byte, 16)
_, _ = rand.Read(b)
b[6] = (b[6] & 0x0f) | 0x40
b[8] = (b[8] & 0x3f) | 0x80
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}
// StartReminderScheduler 启动定时提醒调度器 (1分钟一次的 Ticker)
func StartReminderScheduler(stopChan chan struct{}) {
ticker := time.NewTicker(1 * time.Minute)
go func() {
for {
select {
case <-ticker.C:
scanAndSendReminders()
case <-stopChan:
ticker.Stop()
return
}
}
}()
}
func scanAndSendReminders() {
// 1. 生成唯一扫描批次号用于抢占锁定
scanBatch := generateUUID()
now := time.Now()
// 2. 抢占待处理的数据(乐观锁防并发重复发送)
_, err := models.Orm.Raw(`
UPDATE yz_platform_schedule_reminder
SET scan_lock = ?, update_time = NOW()
WHERE next_remind_time <= ?
AND remind_status IN (0, 1)
AND is_deleted = 0
AND (scan_lock = '' OR scan_lock IS NULL)
`, scanBatch, now).Exec()
if err != nil {
return
}
// 3. 查询自己锁定成功的数据
var list []models.PlatformScheduleReminder
_, err = models.Orm.QueryTable(new(models.PlatformScheduleReminder)).
Filter("scan_lock", scanBatch).
Filter("remind_status__in", 0, 1).
Filter("is_deleted", 0).
All(&list)
if err != nil || len(list) == 0 {
return
}
// 实例分发发送
senders := map[string]ReminderSender{
"SMS": &SMSSender{},
"EMAIL": &EmailSender{},
"BARK": &BarkSender{},
"SITE_MSG": &SiteMsgSender{},
}
for i := range list {
reminder := &list[i]
// 3.1 获取日程信息(主要拿 ContentTitle 统一为 "日程提醒"
var schedule models.PlatformSchedule
err := models.Orm.QueryTable(new(models.PlatformSchedule)).
Filter("id", reminder.ScheduleID).
One(&schedule)
title := "日程提醒"
content := "您有一个待处理的日程时间已到,请注意查收。"
if err == nil {
content = schedule.Content
}
sender, ok := senders[reminder.RemindChannel]
if !ok {
// 未知渠道,直接强制置为结束
_, _ = models.Orm.QueryTable(new(models.PlatformScheduleReminder)).
Filter("id", reminder.ID).
Update(map[string]interface{}{
"remind_status": 2,
"scan_lock": "",
"update_time": time.Now(),
})
continue
}
// 执行发送
ctx := context.Background()
success, sendErr := sender.Send(ctx, reminder, title, content)
// 3.2 记录发送流水日志
sendResult := int8(0)
var failReason *string
if success {
sendResult = 1
} else if sendErr != nil {
errStr := sendErr.Error()
if len(errStr) > 255 {
errStr = errStr[:255]
}
failReason = &errStr
}
logRow := &models.PlatformScheduleReminderSendLog{
ReminderID: reminder.ID,
SendTime: time.Now(),
SendResult: sendResult,
FailReason: failReason,
}
_, _ = models.Orm.Insert(logRow)
// 3.3 根据发送渠道分类更新提醒状态和下一次发送时间
newSendCount := reminder.SendCount + 1
newStatus := reminder.RemindStatus
if reminder.RemindChannel == "SMS" || reminder.RemindChannel == "SITE_MSG" {
// 一次性发送:发送后直接置为结束
newStatus = 2
} else {
// 重复发送渠道 EMAIL / BARK
// 如果还没被 Ack且没有达到 max_send_count继续提醒
if reminder.AckStatus == 0 && newSendCount < reminder.MaxSendCount {
newStatus = 1 // 提醒中
// 更新下次发送时间
reminder.NextRemindTime = time.Now().Add(time.Duration(reminder.RepeatIntervalMinutes) * time.Minute)
} else {
// 达到最大上限或者已 Ack
newStatus = 2
}
}
// 3.4 回写主表记录
_, _ = models.Orm.QueryTable(new(models.PlatformScheduleReminder)).
Filter("id", reminder.ID).
Update(map[string]interface{}{
"SendCount": newSendCount,
"NextRemindTime": reminder.NextRemindTime,
"RemindStatus": newStatus,
"ScanLock": "", // 释放扫描锁
"UpdateTime": time.Now(),
})
}
}