diff --git a/Processor/ProcessC2CMessage.go b/Processor/ProcessC2CMessage.go index 67589d14..a8aeccea 100644 --- a/Processor/ProcessC2CMessage.go +++ b/Processor/ProcessC2CMessage.go @@ -154,7 +154,7 @@ func (p *Processors) ProcessC2CMessage(data *dto.WSC2CMessageData) error { // Convert OnebotGroupMessage to map and send privateMsgMap := structToMap(privateMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(privateMsgMap) + go p.BroadcastMessageToAll(privateMsgMap, p.Apiv2, data) //组合FriendData struserid := strconv.FormatInt(userid64, 10) userdata := structs.FriendData{ @@ -296,7 +296,7 @@ func (p *Processors) ProcessC2CMessage(data *dto.WSC2CMessageData) error { // Convert OnebotGroupMessage to map and send groupMsgMap := structToMap(groupMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) //组合FriendData struserid := strconv.FormatInt(userid64, 10) diff --git a/Processor/ProcessChannelDirectMessage.go b/Processor/ProcessChannelDirectMessage.go index 94d73567..50a099e3 100644 --- a/Processor/ProcessChannelDirectMessage.go +++ b/Processor/ProcessChannelDirectMessage.go @@ -171,7 +171,7 @@ func (p *Processors) ProcessChannelDirectMessage(data *dto.WSDirectMessageData) // Convert OnebotGroupMessage to map and send privateMsgMap := structToMap(privateMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(privateMsgMap) + go p.BroadcastMessageToAll(privateMsgMap, p.Apiv2, data) } else { if !p.Settings.GlobalChannelToGroup { //将频道私信作为普通频道信息 @@ -280,7 +280,7 @@ func (p *Processors) ProcessChannelDirectMessage(data *dto.WSDirectMessageData) // 将 onebotMsg 结构体转换为 map[string]interface{} msgMap := structToMap(onebotMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(msgMap) + go p.BroadcastMessageToAll(msgMap, p.Apiv2, data) } else { //将频道信息转化为群信息(特殊需求情况下) //将channelid写入bolt,可取出guild_id @@ -444,7 +444,7 @@ func (p *Processors) ProcessChannelDirectMessage(data *dto.WSDirectMessageData) // Convert OnebotGroupMessage to map and send groupMsgMap := structToMap(groupMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) } } diff --git a/Processor/ProcessGroupAddBot.go b/Processor/ProcessGroupAddBot.go index 973af06e..4bf65c5a 100644 --- a/Processor/ProcessGroupAddBot.go +++ b/Processor/ProcessGroupAddBot.go @@ -125,11 +125,11 @@ func (p *Processors) ProcessGroupAddBot(data *dto.GroupAddBotEvent) error { } groupMsgMap := structToMap(Request) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) groupMsgMap = structToMap(Notice) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) // 转换appid AppIDString := strconv.FormatUint(p.Settings.AppID, 10) diff --git a/Processor/ProcessGroupDelBot.go b/Processor/ProcessGroupDelBot.go index 44d1287e..59e8fc39 100644 --- a/Processor/ProcessGroupDelBot.go +++ b/Processor/ProcessGroupDelBot.go @@ -72,6 +72,6 @@ func (p *Processors) ProcessGroupDelBot(data *dto.GroupAddBotEvent) error { } groupMsgMap := structToMap(Notice) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) return nil } diff --git a/Processor/ProcessGroupMessage.go b/Processor/ProcessGroupMessage.go index 15622618..c5c16ea3 100644 --- a/Processor/ProcessGroupMessage.go +++ b/Processor/ProcessGroupMessage.go @@ -188,6 +188,6 @@ func (p *Processors) ProcessGroupMessage(data *dto.WSGroupATMessageData) error { // Convert OnebotGroupMessage to map and send groupMsgMap := structToMap(groupMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) return nil } diff --git a/Processor/ProcessGroupMsgReceive.go b/Processor/ProcessGroupMsgReceive.go index 3df37683..a0d27b40 100644 --- a/Processor/ProcessGroupMsgReceive.go +++ b/Processor/ProcessGroupMsgReceive.go @@ -93,7 +93,7 @@ func (p *Processors) ProcessGroupMsgRecive(data *dto.GroupMsgReceiveEvent) error noticeMap := structToMap(notice) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(noticeMap) + go p.BroadcastMessageToAll(noticeMap, p.Apiv2, data) // 转换appid AppIDString := strconv.FormatUint(p.Settings.AppID, 10) @@ -205,7 +205,7 @@ func (p *Processors) ProcessGroupMsgRecive(data *dto.GroupMsgReceiveEvent) error // Convert OnebotGroupMessage to map and send groupMsgMap := structToMap(groupMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) // 转换appid AppIDString := strconv.FormatUint(p.Settings.AppID, 10) diff --git a/Processor/ProcessGroupMsgReject.go b/Processor/ProcessGroupMsgReject.go index ae53d668..c3f72d96 100644 --- a/Processor/ProcessGroupMsgReject.go +++ b/Processor/ProcessGroupMsgReject.go @@ -94,7 +94,7 @@ func (p *Processors) ProcessGroupMsgReject(data *dto.GroupMsgRejectEvent) error noticeMap := structToMap(notice) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(noticeMap) + go p.BroadcastMessageToAll(noticeMap, p.Apiv2, data) // 转换appid AppIDString := strconv.FormatUint(p.Settings.AppID, 10) @@ -206,7 +206,7 @@ func (p *Processors) ProcessGroupMsgReject(data *dto.GroupMsgRejectEvent) error // Convert OnebotGroupMessage to map and send groupMsgMap := structToMap(groupMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) // 转换appid AppIDString := strconv.FormatUint(p.Settings.AppID, 10) diff --git a/Processor/ProcessGuildATMessage.go b/Processor/ProcessGuildATMessage.go index aef11d04..bb2cf227 100644 --- a/Processor/ProcessGuildATMessage.go +++ b/Processor/ProcessGuildATMessage.go @@ -132,7 +132,7 @@ func (p *Processors) ProcessGuildATMessage(data *dto.WSATMessageData) error { msgMap := structToMap(onebotMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(msgMap) + go p.BroadcastMessageToAll(msgMap, p.Apiv2, data) } else { // GlobalChannelToGroup为true时的处理逻辑 //将频道转化为一个群 @@ -298,7 +298,7 @@ func (p *Processors) ProcessGuildATMessage(data *dto.WSATMessageData) error { // Convert OnebotGroupMessage to map and send groupMsgMap := structToMap(groupMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) } diff --git a/Processor/ProcessGuildNormalMessage.go b/Processor/ProcessGuildNormalMessage.go index 36b9a32a..d7d39589 100644 --- a/Processor/ProcessGuildNormalMessage.go +++ b/Processor/ProcessGuildNormalMessage.go @@ -131,7 +131,7 @@ func (p *Processors) ProcessGuildNormalMessage(data *dto.WSMessageData) error { msgMap := structToMap(onebotMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(msgMap) + go p.BroadcastMessageToAll(msgMap, p.Apiv2, data) } else { // GlobalChannelToGroup为true时的处理逻辑 //将频道转化为一个群 @@ -305,7 +305,7 @@ func (p *Processors) ProcessGuildNormalMessage(data *dto.WSMessageData) error { groupMsgMap := structToMap(groupMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) } return nil diff --git a/Processor/ProcessInlineSearch.go b/Processor/ProcessInlineSearch.go index c7a4d867..6e339119 100644 --- a/Processor/ProcessInlineSearch.go +++ b/Processor/ProcessInlineSearch.go @@ -107,7 +107,7 @@ func (p *Processors) ProcessInlineSearch(data *dto.WSInteractionData) error { noticeMap := structToMap(notice) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(noticeMap) + go p.BroadcastMessageToAll(noticeMap, p.Apiv2, data) // 转换appid AppIDString := strconv.FormatUint(p.Settings.AppID, 10) @@ -221,7 +221,7 @@ func (p *Processors) ProcessInlineSearch(data *dto.WSInteractionData) error { // Convert OnebotGroupMessage to map and send groupMsgMap := structToMap(groupMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) // 转换appid AppIDString := strconv.FormatUint(p.Settings.AppID, 10) @@ -300,7 +300,7 @@ func (p *Processors) ProcessInlineSearch(data *dto.WSInteractionData) error { // Convert OnebotGroupMessage to map and send privateMsgMap := structToMap(privateMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(privateMsgMap) + go p.BroadcastMessageToAll(privateMsgMap, p.Apiv2, data) // 转换appid AppIDString := strconv.FormatUint(p.Settings.AppID, 10) @@ -361,7 +361,7 @@ func (p *Processors) ProcessInlineSearch(data *dto.WSInteractionData) error { msgMap := structToMap(onebotMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(msgMap) + go p.BroadcastMessageToAll(msgMap, p.Apiv2, data) // TODO: 实现eventid } diff --git a/Processor/ProcessThreadMessage.go b/Processor/ProcessThreadMessage.go index 833ec72d..1d9d596f 100644 --- a/Processor/ProcessThreadMessage.go +++ b/Processor/ProcessThreadMessage.go @@ -137,7 +137,7 @@ func (p *Processors) ProcessThreadMessage(data *dto.WSThreadData) error { msgMap := structToMap(onebotMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(msgMap) + go p.BroadcastMessageToAll(msgMap, p.Apiv2, data) return nil } else { @@ -256,7 +256,7 @@ func (p *Processors) ProcessThreadMessage(data *dto.WSThreadData) error { msgMap := structToMap(onebotMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(msgMap) + go p.BroadcastMessageToAll(msgMap, p.Apiv2, data) } else { //转化为群信息 //将频道转化为一个群 @@ -394,7 +394,7 @@ func (p *Processors) ProcessThreadMessage(data *dto.WSThreadData) error { groupMsgMap := structToMap(groupMsg) //上报信息到onebotv11应用端(正反ws) - p.BroadcastMessageToAll(groupMsgMap) + go p.BroadcastMessageToAll(groupMsgMap, p.Apiv2, data) } } diff --git a/Processor/Processor.go b/Processor/Processor.go index 62a8cd6d..85b95f6b 100644 --- a/Processor/Processor.go +++ b/Processor/Processor.go @@ -15,6 +15,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/hashicorp/go-multierror" @@ -230,35 +231,101 @@ func (p *Processors) SendMessageToAllClients(message map[string]interface{}) err } // 方便快捷的发信息函数 -func (p *Processors) BroadcastMessageToAll(message map[string]interface{}) error { - var errors []string +func (p *Processors) BroadcastMessageToAll(message map[string]interface{}, api openapi.MessageAPI, data interface{}) error { + var wg sync.WaitGroup + errorCh := make(chan string, len(p.Wsclient)+len(p.WsServerClients)) + defer close(errorCh) - // 发送到我们作为客户端的Wsclient + // 并发发送到我们作为客户端的Wsclient for _, client := range p.Wsclient { - //mylog.Printf("第%v个Wsclient", test) - err := client.SendMessage(message) - if err != nil { - errors = append(errors, fmt.Sprintf("error sending private message via wsclient: %v", err)) - } + wg.Add(1) + go func(c callapi.WebSocketServerClienter) { + defer wg.Done() + if err := c.SendMessage(message); err != nil { + errorCh <- fmt.Sprintf("error sending message via wsclient: %v", err) + } + }(client) } - // 发送到我们作为服务器连接到我们的WsServerClients + // 并发发送到我们作为服务器连接到我们的WsServerClients for _, serverClient := range p.WsServerClients { - err := serverClient.SendMessage(message) - if err != nil { - errors = append(errors, fmt.Sprintf("error sending private message via WsServerClient: %v", err)) + wg.Add(1) + go func(sc callapi.WebSocketServerClienter) { + defer wg.Done() + if err := sc.SendMessage(message); err != nil { + errorCh <- fmt.Sprintf("error sending message via WsServerClient: %v", err) + } + }(serverClient) + } + + wg.Wait() // 等待所有goroutine完成 + + var errors []string + failed := 0 + for len(errorCh) > 0 { + err := <-errorCh + errors = append(errors, err) + failed++ + } + + // 检查是否所有尝试都失败了 + if failed == len(p.Wsclient)+len(p.WsServerClients) { + // 处理全部失败的情况 + fmt.Println("All message sending attempts failed.") + downtimemessgae := config.GetDowntimeMessage() + switch v := data.(type) { + case *dto.WSGroupATMessageData: + msgtocreate := &dto.MessageToCreate{ + Content: downtimemessgae, + MsgID: v.ID, + MsgSeq: 1, + MsgType: 0, // 默认文本类型 + } + api.PostGroupMessage(context.Background(), v.GroupID, msgtocreate) + case *dto.WSATMessageData: + msgtocreate := &dto.MessageToCreate{ + Content: downtimemessgae, + MsgID: v.ID, + MsgSeq: 1, + MsgType: 0, // 默认文本类型 + } + api.PostMessage(context.Background(), v.ChannelID, msgtocreate) + case *dto.WSMessageData: + msgtocreate := &dto.MessageToCreate{ + Content: downtimemessgae, + MsgID: v.ID, + MsgSeq: 1, + MsgType: 0, // 默认文本类型 + } + api.PostMessage(context.Background(), v.ChannelID, msgtocreate) + case *dto.WSDirectMessageData: + msgtocreate := &dto.MessageToCreate{ + Content: downtimemessgae, + MsgID: v.ID, + MsgSeq: 1, + MsgType: 0, // 默认文本类型 + } + api.PostMessage(context.Background(), v.GuildID, msgtocreate) + case *dto.WSC2CMessageData: + msgtocreate := &dto.MessageToCreate{ + Content: downtimemessgae, + MsgID: v.ID, + MsgSeq: 1, + MsgType: 0, // 默认文本类型 + } + api.PostC2CMessage(context.Background(), v.Author.ID, msgtocreate) } } - // 在循环结束后处理记录的错误 + // 判断是否填写了反向post地址 + if !allEmpty(config.GetPostUrl()) { + go PostMessageToUrls(message) + } + if len(errors) > 0 { return fmt.Errorf(strings.Join(errors, "; ")) } - //判断是否填写了反向post地址 - if !allEmpty(config.GetPostUrl()) { - PostMessageToUrls(message) - } return nil } @@ -272,59 +339,70 @@ func allEmpty(addresses []string) bool { return true } -// 上报信息给反向Http +// PostMessageToUrls 使用并发 goroutines 上报信息给多个反向 HTTP URL func PostMessageToUrls(message map[string]interface{}) { // 获取上报 URL 列表 postUrls := config.GetPostUrl() // 检查 postUrls 是否为空 - if len(postUrls) > 0 { - - // 转换 message 为 JSON 字符串 - jsonString, err := handlers.ConvertMapToJSONString(message) - if err != nil { - mylog.Printf("Error converting message to JSON: %v", err) - return - } - - for _, url := range postUrls { - // 创建请求体 - reqBody := bytes.NewBufferString(jsonString) + if len(postUrls) == 0 { + return + } - // 创建 POST 请求 - req, err := http.NewRequest("POST", url, reqBody) - if err != nil { - mylog.Printf("Error creating POST request to %s: %v", url, err) - continue - } + // 转换 message 为 JSON 字符串 + jsonString, err := handlers.ConvertMapToJSONString(message) + if err != nil { + mylog.Printf("Error converting message to JSON: %v", err) + return + } - // 设置请求头 - req.Header.Set("Content-Type", "application/json") - // 设置 X-Self-ID - var selfid string - if config.GetUseUin() { - selfid = config.GetUinStr() - } else { - selfid = config.GetAppIDStr() - } + // 使用 WaitGroup 等待所有 goroutines 完成 + var wg sync.WaitGroup + for _, url := range postUrls { + wg.Add(1) + // 启动一个 goroutine + go func(url string) { + defer wg.Done() // 确保减少 WaitGroup 的计数器 + sendPostRequest(jsonString, url) + }(url) + } + wg.Wait() // 等待所有 goroutine 完成 +} - req.Header.Set("X-Self-ID", selfid) +// sendPostRequest 发送单个 POST 请求 +func sendPostRequest(jsonString, url string) { + // 创建请求体 + reqBody := bytes.NewBufferString(jsonString) - // 发送请求 - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - mylog.Printf("Error sending POST request to %s: %v", url, err) - continue - } + // 创建 POST 请求 + req, err := http.NewRequest("POST", url, reqBody) + if err != nil { + mylog.Printf("Error creating POST request to %s: %v", url, err) + return + } - // 处理响应 - defer resp.Body.Close() - // 可以添加更多的响应处理逻辑,如检查状态码等 + // 设置请求头 + req.Header.Set("Content-Type", "application/json") + // 设置 X-Self-ID + var selfid string + if config.GetUseUin() { + selfid = config.GetUinStr() + } else { + selfid = config.GetAppIDStr() + } + req.Header.Set("X-Self-ID", selfid) - mylog.Printf("Posted to %s successfully", url) - } + // 发送请求 + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + mylog.Printf("Error sending POST request to %s: %v", url, err) + return } + defer resp.Body.Close() // 确保释放网络资源 + + // 可以在此处添加更多的响应处理逻辑 + mylog.Printf("Posted to %s successfully", url) } func (p *Processors) HandleFrameworkCommand(messageText string, data interface{}, Type string) error { diff --git a/config/config.go b/config/config.go index d6abf063..79f57b77 100644 --- a/config/config.go +++ b/config/config.go @@ -2292,3 +2292,13 @@ func GetNativeMD() bool { } return instance.Settings.NativeMD } + +// 获取DowntimeMessage +func GetDowntimeMessage() string { + mu.Lock() + defer mu.Unlock() + if instance != nil { + return instance.Settings.DowntimeMessage + } + return "" +} diff --git a/main.go b/main.go index 1fbe134b..2318f1ce 100644 --- a/main.go +++ b/main.go @@ -48,6 +48,7 @@ func main() { // 定义faststart命令行标志。默认为false。 fastStart := flag.Bool("faststart", false, "start without initialization if set") tidy := flag.Bool("tidy", false, "backup and tidy your config.yml") + m := flag.Bool("m", false, "Maintenance mode") // 解析命令行参数到定义的标志。 flag.Parse() @@ -116,6 +117,12 @@ func main() { loggerAdapter := mylog.NewMyLogAdapter(logLevel, config.GetSaveLogs()) botgo.SetLogger(loggerAdapter) + if *m { + // 维护模式 + conf.Settings.WsAddress = []string{"ws://127.0.0.1:50000"} + conf.Settings.EnableWsServer = false + } + // 创建webui数据库 webui.InitializeDB() defer webui.CloseDB() diff --git a/server/wsserver.go b/server/wsserver.go index 5bd2b366..ae5aed28 100644 --- a/server/wsserver.go +++ b/server/wsserver.go @@ -150,7 +150,7 @@ func processWSMessage(client *WebSocketServerClient, msg []byte) { mylog.Println("Received from WebSocket onebotv11 client:", wsclient.TruncateMessage(message, 500)) // 调用callapi - callapi.CallAPIFromDict(client, client.API, client.APIv2, message) + go callapi.CallAPIFromDict(client, client.API, client.APIv2, message) } // 发信息给client diff --git a/structs/structs.go b/structs/structs.go index 6d231da7..6940c952 100644 --- a/structs/structs.go +++ b/structs/structs.go @@ -95,12 +95,13 @@ type Settings struct { AutoWithdrawTime int `yaml:"auto_withdraw_time"` VisualPrefixsBypass []string `yaml:"visual_prefixs_bypass"` //开发增强类 - DevlopAcDir string `yaml:"develop_access_token_dir"` - DevBotid string `yaml:"develop_bot_id"` - SandBoxMode bool `yaml:"sandbox_mode"` - DevMessgeID bool `yaml:"dev_message_id"` - SendError bool `yaml:"send_error"` - SaveError bool `yaml:"save_error"` + DevlopAcDir string `yaml:"develop_access_token_dir"` + DevBotid string `yaml:"develop_bot_id"` + SandBoxMode bool `yaml:"sandbox_mode"` + DevMessgeID bool `yaml:"dev_message_id"` + SendError bool `yaml:"send_error"` + SaveError bool `yaml:"save_error"` + DowntimeMessage string `yaml:"downtime_message"` //增长营销类 SelfIntroduce []string `yaml:"self_introduce"` //api修改 diff --git a/template/config_template.go b/template/config_template.go index 8cefb04b..3006a586 100644 --- a/template/config_template.go +++ b/template/config_template.go @@ -133,6 +133,7 @@ settings: dev_message_id : false #在沙盒和测试环境使用无限制msg_id 仅沙盒有效,正式环境请关闭,内测结束后,tx侧未来会移除 send_error : true #将报错用文本发出,避免机器人被审核报无响应 save_error : false #将保存保存在log文件夹,方便开发者定位发送错误. + downtime_message : "我正在维护中~请不要担心,维护结束就回来~维护时间:(1小时)" #增长营销类(推荐gensokyo-broadcast项目) self_introduce : ["",""] #自我介绍,可设置多个随机发送,当不为空时,机器人被邀入群会发送自定义自我介绍 需手动添加新textintent - "GroupAddRobotEventHandler" - "GroupDelRobotEventHandler" diff --git a/wsclient/ws.go b/wsclient/ws.go index 3435e87c..f635fd33 100644 --- a/wsclient/ws.go +++ b/wsclient/ws.go @@ -189,7 +189,7 @@ func (client *WebSocketClient) recvMessage(msg []byte) { } mylog.Println("Received from onebotv11 server:", TruncateMessage(message, 800)) // 调用callapi - callapi.CallAPIFromDict(client, client.api, client.apiv2, message) + go callapi.CallAPIFromDict(client, client.api, client.apiv2, message) } // 截断信息