diff --git a/.gitignore b/.gitignore
index a9f3816fd..8d69e6551 100644
--- a/.gitignore
+++ b/.gitignore
@@ -138,10 +138,21 @@ dmypy.json
# Cython debug symbols
cython_debug/
+configs/config.py
+demo.py
test.py
server_ip.py
-game_utils.py
member_activity_handle.py
Yu-Gi-Oh/
csgo/
fantasy_card/
+data/
+log/
+backup/
+extensive_plugin/
+test/
+bot.py
+data/
+.env
+.env.dev
+resources/
diff --git a/README.md b/README.md
index cf3fe3db8..816f8c122 100644
--- a/README.md
+++ b/README.md
@@ -128,7 +128,7 @@
- [x] 移动图片 (同上)
- [x] 删除图片 (同上)
- [x] 群内B站订阅
-- [x] 群词条
+- [x] 词条设置
- [x] 休息吧/醒来
### 已实现的超级用户功能
@@ -245,8 +245,51 @@ __Docker 全量版(包含 真寻Bot PostgreSQL数据库 go-cqhttp webui等)_
**点击上方的 GitHub 徽标查看教程**
PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能无法正常启动全量版容器**
+## [爱发电](https://afdian.net/@HibiKier)
+
+爱发电 以及 感谢投喂
+
+
+### 感谢名单
+(可以告诉我你的 __github__ 地址,我偷偷换掉0v|)
+[爱发电用户_b9S4](https://afdian.net/u/3d8f30581a2911edba6d52540025c377)
+[爱发电用户_c58s](https://afdian.net/u/a6ad8dda195e11ed9a4152540025c377)
+[爱发电用户_eNr9](https://afdian.net/u/05fdb41c0c9a11ed814952540025c377)
+[MangataAkihi](https://github.com/Sakuracio)
+[炀](https://afdian.net/u/69b76e9ec77b11ec874f52540025c377)
+[爱发电用户_Bc6j](https://afdian.net/u/8546be24f44111eca64052540025c377)
+[大魔王](https://github.com/xipesoy)
+[CopilotLaLaLa](https://github.com/CopilotLaLaLa)
+[嘿小欧](https://afdian.net/u/daa4bec4f24911ec82e552540025c377)
+[回忆的秋千](https://afdian.net/u/e315d9c6f14f11ecbeef52540025c377)
+[十年くん](https://github.com/shinianj)
+[哇](https://afdian.net/u/9b266244f23911eca19052540025c377)
+[yajiwa](https://github.com/yajiwa)
+[爆金币](https://afdian.net/u/0d78879ef23711ecb22452540025c377)
+
+
+
+
+
## 更新
+### 2022/8/21
+
+* 重构群词条,改为词库Plus,增加 精准|模糊|正则 问题匹配,问题与回答均支持at,image,face,超级用户额外提供 全局|私聊 词库设置,数据迁移目前只提供了问题和回答都是纯文本的词条
+* 修复b站转发解析av号无法解析
+* 改进插件 `我有一个朋友`,避免触发过于频繁 [@pull/1001](https://github.com/HibiKier/zhenxun_bot/pull/1001)
+* 原神便笺新增洞天宝钱和参量质变仪提示 [@pull/1005](https://github.com/HibiKier/zhenxun_bot/pull/1005)
+* 新增米游社签到功能,自动领取(白嫖)米游币 [@pull/991](https://github.com/HibiKier/zhenxun_bot/pull/991)
+
+### 2022/8/14
+
+* 修复epic未获取到时间时出错
+* 修复订阅主播时动态获取的id是直播间id
+
+### 2022/8/8
+
+* 修复赛马娘重载卡池失败的问题 [@pull/969](https://github.com/HibiKier/zhenxun_bot/pull/969)
+
### 2022/8/3
* 修复 bili动态链接在投稿视频时URL和分割线连在一起 [@pull/951](https://github.com/HibiKier/zhenxun_bot/pull/961)
@@ -268,7 +311,7 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
* 替换了cos和bt的url [@pull/951](https://github.com/HibiKier/zhenxun_bot/pull/951)
* 发言记录统计添加日消息统计 [@pull/953](https://github.com/HibiKier/zhenxun_bot/pull/953)
-### 2022/7/24
+### 2022/7/24 \[v0.1.6.2]
* 订阅up动态提供直链
@@ -738,10 +781,6 @@ __..... 更多更新信息请查看文档__
## Todo
- [ ] web管理
-## 爱发电
-
-
-
## 感谢
[botuniverse / onebot](https://github.com/botuniverse/onebot) :超棒的机器人协议
[Mrs4s / go-cqhttp](https://github.com/Mrs4s/go-cqhttp) :cqhttp的golang实现,轻量、原生跨平台.
diff --git a/basic_plugins/init_plugin_config/init_plugins_config.py b/basic_plugins/init_plugin_config/init_plugins_config.py
index dec01b78e..b3ce429d4 100755
--- a/basic_plugins/init_plugin_config/init_plugins_config.py
+++ b/basic_plugins/init_plugin_config/init_plugins_config.py
@@ -6,7 +6,6 @@
from utils.text_utils import prompt2cn
from utils.utils import get_matchers
from ruamel import yaml
-import nonebot
_yaml = YAML(typ="safe")
diff --git a/basic_plugins/scripts.py b/basic_plugins/scripts.py
index eb592dd28..488f9d15e 100755
--- a/basic_plugins/scripts.py
+++ b/basic_plugins/scripts.py
@@ -108,6 +108,18 @@ async def _():
"ALTER TABLE genshin ADD bind_group Integer;",
"genshin"
), # 新增原神群号绑定字段
+ (
+ "ALTER TABLE genshin ADD login_ticket VARCHAR(255) DEFAULT '';",
+ "genshin"
+ ), # 新增米游社login_ticket绑定字段
+ (
+ "ALTER TABLE genshin ADD stuid VARCHAR(255) DEFAULT '';",
+ "genshin"
+ ), # 新增米游社stuid绑定字段
+ (
+ "ALTER TABLE genshin ADD stoken VARCHAR(255) DEFAULT '';",
+ "genshin"
+ ), # 新增米游社stoken绑定字段
(
"ALTER TABLE chat_history ADD plain_text Text;",
"chat_history"
diff --git a/plugins/bilibili_sub/__init__.py b/plugins/bilibili_sub/__init__.py
index 989627e32..d05b54898 100755
--- a/plugins/bilibili_sub/__init__.py
+++ b/plugins/bilibili_sub/__init__.py
@@ -234,7 +234,7 @@ async def _():
rst = await get_sub_status(sub.sub_id, sub.sub_type)
await send_sub_msg(rst, sub, bot)
if sub.sub_type == "live":
- rst = await get_sub_status(sub.sub_id, "up")
+ rst = await get_sub_status(sub.uid, "up")
await send_sub_msg(rst, sub, bot)
except Exception as e:
logger.error(f"B站订阅推送发生错误 sub_id:{sub.sub_id if sub else 0} {type(e)}:{e}")
diff --git a/plugins/bilibili_sub/data_source.py b/plugins/bilibili_sub/data_source.py
index c06460ec0..69dd47c3e 100755
--- a/plugins/bilibili_sub/data_source.py
+++ b/plugins/bilibili_sub/data_source.py
@@ -38,39 +38,39 @@ async def add_live_sub(live_id: int, sub_user: str) -> str:
:param sub_user: 订阅用户 id # 7384933:private or 7384933:2342344(group)
:return:
"""
+ # try:
try:
- async with db.transaction():
- try:
- """bilibili_api.live库的LiveRoom类中get_room_info改为bilireq.live库的get_room_info_by_id方法"""
- live_info = await get_room_info_by_id(live_id)
- except ResponseCodeError:
- return f"未找到房间号Id:{live_id} 的信息,请检查Id是否正确"
- uid = live_info["uid"]
- room_id = live_info["room_id"]
- short_id = live_info["short_id"]
- title = live_info["title"]
- live_status = live_info["live_status"]
- if await BilibiliSub.add_bilibili_sub(
- room_id,
- "live",
- sub_user,
- uid=uid,
- live_short_id=short_id,
- live_status=live_status,
- ):
- await _get_up_status(live_id)
- uname = (await BilibiliSub.get_sub(live_id)).uname
- return (
- "已成功订阅主播:\n"
- f"\ttitle:{title}\n"
- f"\tname: {uname}\n"
- f"\tlive_id:{live_id}\n"
- f"\tuid:{uid}"
- )
- else:
- return "添加订阅失败..."
- except Exception as e:
- logger.error(f"订阅主播live_id:{live_id} 发生了错误 {type(e)}:{e}")
+ """bilibili_api.live库的LiveRoom类中get_room_info改为bilireq.live库的get_room_info_by_id方法"""
+ live_info = await get_room_info_by_id(live_id)
+ except ResponseCodeError:
+ return f"未找到房间号Id:{live_id} 的信息,请检查Id是否正确"
+ uid = live_info["uid"]
+ room_id = live_info["room_id"]
+ short_id = live_info["short_id"]
+ title = live_info["title"]
+ live_status = live_info["live_status"]
+ await BilibiliSub.add_bilibili_sub(
+ room_id,
+ "live",
+ sub_user,
+ uid=uid,
+ live_short_id=short_id,
+ live_status=live_status,
+ )
+ # await _get_up_status(live_id)
+ uname = (await BilibiliSub.get_sub(live_id)).uname
+ # uname = 1
+ return (
+ "已成功订阅主播:\n"
+ f"\ttitle:{title}\n"
+ f"\tname: {uname}\n"
+ f"\tlive_id:{live_id}\n"
+ f"\tuid:{uid}"
+ )
+ # else:
+ # return "添加订阅失败..."
+ # except Exception as e:
+ # logger.error(f"订阅主播live_id:{live_id} 发生了错误 {type(e)}:{e}")
return "添加订阅失败..."
@@ -243,7 +243,7 @@ async def _get_live_status(id_: int) -> Optional[str]:
async def _get_up_status(id_: int) -> Optional[str]:
"""
获取用户投稿状态
- :param id_: 用户 id
+ :param id_: 订阅 id
:return:
"""
_user = await BilibiliSub.get_sub(id_)
diff --git a/plugins/bilibili_sub/model.py b/plugins/bilibili_sub/model.py
index 449c36ce5..3a1e8ff6c 100755
--- a/plugins/bilibili_sub/model.py
+++ b/plugins/bilibili_sub/model.py
@@ -65,44 +65,43 @@ async def add_bilibili_sub(
:param season_update_time: 番剧更新时间
"""
try:
- async with db.transaction():
- query = (
- await cls.query.where(cls.sub_id == sub_id)
- .with_for_update()
- .gino.first()
+ query = (
+ await cls.query.where(cls.sub_id == sub_id)
+ .with_for_update()
+ .gino.first()
+ )
+ sub_user = sub_user if sub_user[-1] == "," else f"{sub_user},"
+ if query:
+ if sub_user not in query.sub_users:
+ sub_users = query.sub_users + sub_user
+ await query.update(sub_users=sub_users).apply()
+ else:
+ sub = await cls.create(
+ sub_id=sub_id, sub_type=sub_type, sub_users=sub_user
)
- sub_user = sub_user if sub_user[-1] == "," else f"{sub_user},"
- if query:
- if sub_user not in query.sub_users:
- sub_users = query.sub_users + sub_user
- await query.update(sub_users=sub_users).apply()
- else:
- sub = await cls.create(
- sub_id=sub_id, sub_type=sub_type, sub_users=sub_user
- )
- await sub.update(
- live_short_id=live_short_id
- if live_short_id
- else sub.live_short_id,
- live_status=live_status if live_status else sub.live_status,
- dynamic_upload_time=dynamic_upload_time
- if dynamic_upload_time
- else sub.dynamic_upload_time,
- uid=uid if uid else sub.uid,
- uname=uname if uname else sub.uname,
- latest_video_created=latest_video_created
- if latest_video_created
- else sub.latest_video_created,
- season_update_time=season_update_time
- if season_update_time
- else sub.season_update_time,
- season_current_episode=season_current_episode
- if season_current_episode
- else sub.season_current_episode,
- season_id=season_id if season_id else sub.season_id,
- season_name=season_name if season_name else sub.season_name,
- ).apply()
- return True
+ await sub.update(
+ live_short_id=live_short_id
+ if live_short_id
+ else sub.live_short_id,
+ live_status=live_status if live_status else sub.live_status,
+ dynamic_upload_time=dynamic_upload_time
+ if dynamic_upload_time
+ else sub.dynamic_upload_time,
+ uid=uid if uid else sub.uid,
+ uname=uname if uname else sub.uname,
+ latest_video_created=latest_video_created
+ if latest_video_created
+ else sub.latest_video_created,
+ season_update_time=season_update_time
+ if season_update_time
+ else sub.season_update_time,
+ season_current_episode=season_current_episode
+ if season_current_episode
+ else sub.season_current_episode,
+ season_id=season_id if season_id else sub.season_id,
+ season_name=season_name if season_name else sub.season_name,
+ ).apply()
+ return True
except Exception as e:
logger.info(f"bilibili_sub 添加订阅错误 {type(e)}: {e}")
return False
diff --git a/plugins/draw_card/handles/base_handle.py b/plugins/draw_card/handles/base_handle.py
index 9e019884b..ddfc59afa 100644
--- a/plugins/draw_card/handles/base_handle.py
+++ b/plugins/draw_card/handles/base_handle.py
@@ -50,6 +50,7 @@ class UpEvent(BaseModel):
start_time: Optional[datetime] # 开始时间
end_time: Optional[datetime] # 结束时间
up_char: List[UpChar] # up对象
+ up_name: str = "" # up名称
TC = TypeVar("TC", bound="BaseData")
diff --git a/plugins/draw_card/handles/pretty_handle.py b/plugins/draw_card/handles/pretty_handle.py
index da56bd645..4011e416c 100644
--- a/plugins/draw_card/handles/pretty_handle.py
+++ b/plugins/draw_card/handles/pretty_handle.py
@@ -350,48 +350,52 @@ async def update_up_char(self):
char_img = ""
card_img = ""
up_chars = []
+ up_chars_name = []
up_cards = []
+ up_cards_name = []
soup = BeautifulSoup(result, "lxml")
heads = soup.find_all("span", {"class": "mw-headline"})
for head in heads:
- if "时间" in head.text:
+ if "时间" in head.text or "期间" in head.text:
time = head.find_next("p").text.split("\n")[0]
if "~" in time:
start, end = time.split("~")
start_time = dateparser.parse(start)
end_time = dateparser.parse(end)
elif "赛马娘" in head.text:
- char_img = head.find_next("a", {"class": "image"}).find("img")[
+ char_img = head.find_next("center").find("img")[
"src"
]
lines = str(head.find_next("p").text).split("\n")
chars = [
line
for line in lines
- if "★" in line and "(" in line and ")" in line
+ if "★" in line and "【" in line and "】" in line
]
- for char in chars:
+ for char in set(chars): # list去重
star = char.count("★")
- name = re.split(r"[()]", char)[-2].strip()
+ name = re.split(r"[【】]", char)[-2].strip()
up_chars.append(
UpChar(name=name, star=star, limited=False, zoom=70)
)
+ up_chars_name.append(name)
elif "支援卡" in head.text:
- card_img = head.find_next("a", {"class": "image"}).find("img")[
+ card_img = head.find_next("center").find("img")[
"src"
]
lines = str(head.find_next("p").text).split("\n")
cards = [
line
for line in lines
- if "R" in line and "(" in line and ")" in line
+ if "R" in line and "【" in line and "】" in line
]
for card in cards:
star = 3 if "SSR" in card else 2 if "SR" in card else 1
- name = re.split(r"[()]", card)[-2].strip()
+ name = re.split(r"[【】]", card)[-2].strip()
up_cards.append(
UpChar(name=name, star=star, limited=False, zoom=70)
)
+ up_cards_name.append(name)
if start_time and end_time:
if start_time <= datetime.now() <= end_time:
self.UP_CHAR = UpEvent(
@@ -400,6 +404,7 @@ async def update_up_char(self):
start_time=start_time,
end_time=end_time,
up_char=up_chars,
+ up_name=up_chars_name,
)
self.UP_CARD = UpEvent(
title=title,
@@ -407,6 +412,7 @@ async def update_up_char(self):
start_time=start_time,
end_time=end_time,
up_char=up_cards,
+ up_name=up_cards_name,
)
self.dump_up_char()
logger.info(f"成功获取{self.game_name_cn}当前up信息...当前up池: {title}")
@@ -418,9 +424,10 @@ async def _reload_pool(self) -> Optional[Message]:
self.load_up_char()
if self.UP_CHAR and self.UP_CARD:
return Message(
- Message.template("重载成功!\n当前UP池子:{}{:image}{:image}").format(
- self.UP_CHAR.title,
+ Message.template("重载成功!\n当前UP池子:{}{:image}\n当前支援卡池子:{}{:image}").format(
+ self.UP_CHAR.up_name,
self.UP_CHAR.pool_img,
+ self.UP_CARD.up_name,
self.UP_CARD.pool_img,
)
)
diff --git a/plugins/epic/data_source.py b/plugins/epic/data_source.py
index ba4f5dd87..cbdc48c15 100755
--- a/plugins/epic/data_source.py
+++ b/plugins/epic/data_source.py
@@ -96,12 +96,15 @@ async def get_epic_free(bot: Bot, type_event: str):
if pair["key"] == "publisherName":
game_pub = pair["value"]
game_desp = game["description"]
- end_date_iso = game["promotions"]["promotionalOffers"][0][
- "promotionalOffers"
- ][0]["endDate"][:-1]
- end_date = datetime.fromisoformat(end_date_iso).strftime(
- "%b.%d %H:%M"
- )
+ try:
+ end_date_iso = game["promotions"]["promotionalOffers"][0][
+ "promotionalOffers"
+ ][0]["endDate"][:-1]
+ end_date = datetime.fromisoformat(end_date_iso).strftime(
+ "%b.%d %H:%M"
+ )
+ except IndexError:
+ end_date = '未知'
# API 返回不包含游戏商店 URL,此处自行拼接,可能出现少数游戏 404 请反馈
if game.get("productSlug"):
game_url = "https://store.epicgames.com/zh-CN/p/{}".format(
diff --git a/plugins/genshin/query_user/_models/__init__.py b/plugins/genshin/query_user/_models/__init__.py
index e972f8716..5c775c3f1 100644
--- a/plugins/genshin/query_user/_models/__init__.py
+++ b/plugins/genshin/query_user/_models/__init__.py
@@ -19,6 +19,9 @@ class Genshin(db.Model):
resin_remind = db.Column(db.Boolean(), default=False) # 树脂提醒
resin_recovery_time = db.Column(db.DateTime(timezone=True)) # 满树脂提醒日期
bind_group = db.Column(db.BigInteger())
+ login_ticket = db.Column(db.String(), default="")
+ stuid = db.Column(db.String(), default="")
+ stoken = db.Column(db.String(), default="")
_idx1 = db.Index("genshin_uid_idx1", "user_qq", "uid", unique=True)
@@ -386,3 +389,96 @@ async def reset_today_query_uid(cls):
for u in await cls.query.with_for_update().gino.all():
if u.today_query_uid:
await u.update(today_query_uid="").apply()
+
+ @classmethod
+ async def set_stuid(cls, uid: int, stuid: str) -> bool:
+ """
+ 说明:
+ 设置stuid
+ 参数:
+ :param uid: 原神uid
+ :param stuid: stuid
+ """
+ query = cls.query.where(cls.uid == uid).with_for_update()
+ user = await query.gino.first()
+ if user:
+ await user.update(stuid=stuid).apply()
+ return True
+ return False
+
+ @classmethod
+ async def set_stoken(cls, uid: int, stoken: str) -> bool:
+ """
+ 说明:
+ 设置stoken
+ 参数:
+ :param uid: 原神uid
+ :param stoken: stoken
+ """
+ query = cls.query.where(cls.uid == uid).with_for_update()
+ user = await query.gino.first()
+ if user:
+ await user.update(stoken=stoken).apply()
+ return True
+ return False
+
+ @classmethod
+ async def set_login_ticket(cls, uid: int, login_ticket: str) -> bool:
+ """
+ 说明:
+ 设置login_ticket
+ 参数:
+ :param uid: 原神uid
+ :param login_ticket: login_ticket
+ """
+ query = cls.query.where(cls.uid == uid).with_for_update()
+ user = await query.gino.first()
+ if user:
+ await user.update(login_ticket=login_ticket).apply()
+ return True
+ return False
+
+ # 获取login_ticket
+ @classmethod
+ async def get_login_ticket(cls, uid: int) -> Optional[str]:
+ """
+ 说明:
+ 获取login_ticket
+ 参数:
+ :param uid: 原神uid
+ """
+ query = cls.query.where(cls.uid == uid)
+ user = await query.gino.first()
+ if user:
+ return user.login_ticket
+ return None
+
+ # 获取stuid
+ @classmethod
+ async def get_stuid(cls, uid: int) -> Optional[str]:
+ """
+ 说明:
+ 获取stuid
+ 参数:
+ :param uid: 原神uid
+ """
+ query = cls.query.where(cls.uid == uid)
+ user = await query.gino.first()
+ if user:
+ return user.stuid
+ return None
+
+ # 获取stoken
+ @classmethod
+ async def get_stoken(cls, uid: int) -> Optional[str]:
+ """
+ 说明:
+ 获取stoken
+ 参数:
+ :param uid: 原神uid
+ """
+ query = cls.query.where(cls.uid == uid)
+ user = await query.gino.first()
+ if user:
+ return user.stoken
+ return None
diff --git a/plugins/genshin/query_user/bind/__init__.py b/plugins/genshin/query_user/bind/__init__.py
index 1ca8ace7b..8cb3be7d9 100644
--- a/plugins/genshin/query_user/bind/__init__.py
+++ b/plugins/genshin/query_user/bind/__init__.py
@@ -5,6 +5,8 @@
from services.log import logger
from nonebot.params import CommandArg, Command
from typing import Tuple
+from utils.http_utils import AsyncHttpx
+import json
__zx_plugin_name__ = "原神绑定"
@@ -39,6 +41,10 @@
unbind = on_command("原神解绑", priority=5, block=True)
+web_Api = "https://api-takumi.mihoyo.com"
+bbs_Cookie_url = "https://webapi.account.mihoyo.com/Api/cookie_accountinfo_by_loginticket?login_ticket={}"
+bbs_Cookie_url2 = web_Api + "/auth/api/getMultiTokenByLoginTicket?login_ticket={}&token_types=3&uid={}"
+
@bind.handle()
async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command(), arg: Message = CommandArg()):
@@ -81,6 +87,33 @@ async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command(), arg: Message
if msg.endswith('"') or msg.endswith("'"):
msg = msg[:-1]
await Genshin.set_cookie(uid, msg)
+ cookie = msg
+ # 用: 代替=, ,代替;
+ cookie = '{"' + cookie.replace('=', '": "').replace("; ", '","') + '"}'
+ print(cookie)
+ cookie_json = json.loads(cookie)
+ print(cookie_json)
+ login_ticket = cookie_json['login_ticket']
+ # try:
+ res = await AsyncHttpx.get(url=bbs_Cookie_url.format(login_ticket))
+ res.encoding = "utf-8"
+ data = json.loads(res.text)
+ print(data)
+ if "成功" in data["data"]["msg"]:
+ stuid = str(data["data"]["cookie_info"]["account_id"])
+ res = await AsyncHttpx.get(url=bbs_Cookie_url2.format(
+ login_ticket, stuid))
+ res.encoding = "utf-8"
+ data = json.loads(res.text)
+ stoken = data["data"]["list"][0]["token"]
+ # await Genshin.set_cookie(uid, cookie)
+ await Genshin.set_stoken(uid, stoken)
+ await Genshin.set_stuid(uid, stuid)
+ await Genshin.set_login_ticket(uid, login_ticket)
+ # except Exception as e:
+ # await bind.finish("获取登陆信息失败,请检查cookie是否正确或更新cookie")
+ elif data["data"]["msg"] == "登录信息已失效,请重新登录":
+ await bind.finish("登录信息失效,请重新获取最新cookie进行绑定")
_x = f"已成功为uid:{uid} 设置cookie"
if isinstance(event, GroupMessageEvent):
await Genshin.set_bind_group(uid, event.group_id)
diff --git a/plugins/genshin/query_user/genshin_sign/__init__.py b/plugins/genshin/query_user/genshin_sign/__init__.py
index 7f75ae5c3..18463bf4b 100644
--- a/plugins/genshin/query_user/genshin_sign/__init__.py
+++ b/plugins/genshin/query_user/genshin_sign/__init__.py
@@ -1,4 +1,5 @@
from .data_source import get_sign_reward_list, genshin_sign
+from ..mihoyobbs_sign import mihoyobbs_sign
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent
from nonebot import on_command
from services.log import logger
@@ -51,10 +52,13 @@ async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command()):
if cmd == "原神我硬签":
try:
msg = await genshin_sign(uid)
+ return_data = await mihoyobbs_sign(event.user_id)
+ await genshin_matcher.send(return_data)
logger.info(
f"(USER {event.user_id}, "
f"GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'}) UID:{uid} 原神签到"
)
+ logger.info(msg)
# 硬签,移除定时任务
try:
for i in range(3):
@@ -66,7 +70,7 @@ async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command()):
await u.clear_sign_time(uid)
next_date = await Genshin.random_sign_time(uid)
add_job(event.user_id, uid, next_date)
- msg += f"因开启自动签到\n下一次签到时间为:{next_date.replace(microsecond=0)}"
+ msg += f"\n因开启自动签到\n下一次签到时间为:{next_date.replace(microsecond=0)}"
except Exception as e:
msg = "原神签到失败..请尝试检查cookie或报告至管理员!"
logger.info(
diff --git a/plugins/genshin/query_user/genshin_sign/data_source.py b/plugins/genshin/query_user/genshin_sign/data_source.py
index 9211fb0cc..ba39e1e00 100644
--- a/plugins/genshin/query_user/genshin_sign/data_source.py
+++ b/plugins/genshin/query_user/genshin_sign/data_source.py
@@ -1,7 +1,7 @@
from utils.http_utils import AsyncHttpx
from configs.config import Config
from services.log import logger
-from .._utils import random_hex, get_old_ds
+from ..mihoyobbs_sign.setting import *
from .._models import Genshin
from typing import Optional, Dict
import hashlib
@@ -21,30 +21,37 @@ async def genshin_sign(uid: int) -> Optional[str]:
return "签到失败..."
status = data["message"]
if status == "OK":
- sign_info = await _get_sign_info(uid)
- if sign_info:
- sign_info = sign_info["data"]
- sign_list = await get_sign_reward_list()
- get_reward = sign_list["data"]["awards"][
- int(sign_info["total_sign_day"]) - 1
- ]["name"]
- reward_num = sign_list["data"]["awards"][
- int(sign_info["total_sign_day"]) - 1
- ]["cnt"]
- get_im = f"本次签到获得:{get_reward}x{reward_num}"
- if status == "OK" and sign_info["is_sign"]:
- return f"\n原神签到成功!\n{get_im}\n本月漏签次数:{sign_info['sign_cnt_missed']}"
+ try:
+ sign_info = await _get_sign_info(uid)
+ if sign_info:
+ sign_info = sign_info["data"]
+ sign_list = await get_sign_reward_list()
+ get_reward = sign_list["data"]["awards"][
+ int(sign_info["total_sign_day"]) - 1
+ ]["name"]
+ reward_num = sign_list["data"]["awards"][
+ int(sign_info["total_sign_day"]) - 1
+ ]["cnt"]
+ get_im = f"本次签到获得:{get_reward}x{reward_num}"
+ logger.info("get_im:" + get_im + "\nsign_info:" + str(sign_info))
+ if status == "OK" and sign_info["is_sign"]:
+ return f"原神签到成功!\n{get_im}\n本月漏签次数:{sign_info['sign_cnt_missed']}"
+ except Exception as e:
+ logger.error(f"原神签到发生错误 UID:{str(data)}")
+ return f"原神签到发生错误: {str(data)}"
else:
return status
- return None
+ if data["data"]["risk_code"] == 375:
+ return "原神签到失败\n账号可能被风控,请前往米游社手动签到!"
+ return str(data)
# 获取请求Header里的DS 当web为true则生成网页端的DS
def get_ds(web: bool) -> str:
if web:
- n = "9nQiU3AV0rJSIBWgdynfoGMGKaklfbM7"
+ n = mihoyobbs_Salt_web
else:
- n = "9nQiU3AV0rJSIBWgdynfoGMGKaklfbM7"
+ n = mihoyobbs_Salt
i = str(timestamp())
r = random_text(6)
c = md5("salt=" + n + "&t=" + i + "&r=" + r)
@@ -82,24 +89,15 @@ async def _sign(uid: int, server_id: str = "cn_gf01") -> Optional[Dict[str, str]
server_id = "cn_qd01"
try:
cookie = await Genshin.get_user_cookie(uid, True)
+ headers['DS'] = get_ds(web=True)
+ headers['Referer'] = 'https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?bbs_auth_required=true' \
+ f'&act_id={genshin_Act_id}&utm_source=bbs&utm_medium=mys&utm_campaign=icon'
+ headers['Cookie'] = cookie
+ headers['x-rpc-device_id'] = get_device_id(cookie)
req = await AsyncHttpx.post(
- url="https://api-takumi.mihoyo.com/event/bbs_sign_reward/sign",
- headers={
- 'Accept': 'application/json, text/plain, */*',
- 'DS': get_ds(web=True),
- 'Origin': 'https://webstatic.mihoyo.com',
- 'x-rpc-app_version': "2.34.1",
- 'User-Agent': 'Mozilla/5.0 (Linux; Android 9; Unspecified Device) AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Version/4.0 Chrome/39.0.0.0 Mobile Safari/537.36 miHoYoBBS/2.3.0',
- 'x-rpc-client_type': "5",
- 'Referer': 'https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?bbs_auth_required=true&act_id=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon',
- 'Accept-Encoding': 'gzip, deflate',
- 'Accept-Language': 'zh-CN,en-US;q=0.8',
- 'X-Requested-With': 'com.mihoyo.hyperion',
- "Cookie": cookie,
- 'x-rpc-device_id': get_device_id(cookie)
- },
- json={"act_id": "e202009291139501", "uid": uid, "region": server_id},
+ url=genshin_Signurl,
+ headers=headers,
+ json={"act_id": genshin_Act_id, "uid": uid, "region": server_id},
)
return req.json()
except Exception as e:
diff --git a/plugins/genshin/query_user/genshin_sign/init_task.py b/plugins/genshin/query_user/genshin_sign/init_task.py
index 8b9791412..b14534001 100644
--- a/plugins/genshin/query_user/genshin_sign/init_task.py
+++ b/plugins/genshin/query_user/genshin_sign/init_task.py
@@ -1,4 +1,5 @@
from .data_source import genshin_sign
+from ..mihoyobbs_sign import mihoyobbs_sign
from models.group_member_info import GroupInfoUser
from utils.message_builder import at
from services.log import logger
@@ -57,6 +58,11 @@ async def _sign(user_id: int, uid: int, count: int):
:param uid: uid
:param count: 执行次数
"""
+ try:
+ return_data = await mihoyobbs_sign(user_id)
+ except Exception as e:
+ logger.error(f"mihoyobbs_sign error:{e}")
+ return_data = "米游社签到失败,请尝试发送'米游社签到'进行手动签到"
if count < 3:
try:
msg = await genshin_sign(uid)
@@ -101,6 +107,7 @@ async def _sign(user_id: int, uid: int, count: int):
bot = get_bot()
if bot:
if user_id in [x["user_id"] for x in await bot.get_friend_list()]:
+ await bot.send_private_msg(user_id=user_id, message=return_data)
await bot.send_private_msg(user_id=user_id, message=msg)
else:
if not (group_id := await Genshin.get_bind_group(uid)):
diff --git a/plugins/genshin/query_user/mihoyobbs_sign/__init__.py b/plugins/genshin/query_user/mihoyobbs_sign/__init__.py
new file mode 100644
index 000000000..8e61f8287
--- /dev/null
+++ b/plugins/genshin/query_user/mihoyobbs_sign/__init__.py
@@ -0,0 +1,79 @@
+from nonebot.adapters.onebot.v11 import MessageEvent
+from nonebot import on_command
+from services.log import logger
+# from .init_task import add_job, scheduler, _sign
+# from apscheduler.jobstores.base import JobLookupError
+from .._models import Genshin
+from nonebot.params import Command
+from typing import Tuple
+from .mihoyobbs import *
+
+
+__zx_plugin_name__ = "米游社自动签到"
+__plugin_usage__ = """
+usage:
+ 发送'米游社签到'或绑定原神自动签到
+ 即可手动/自动进行米游社签到
+ (若启用了原神自动签到会在签到原神同时完成米游币领取)
+ --> 每天白嫖90-110米游币不香吗
+ 注:需要重新绑定原神cookie!!!
+ 遇到问题请提issue或@作者
+""".strip()
+__plugin_des__ = "米游社自动签到任务"
+__plugin_cmd__ = ["米游社签到", "米游社我硬签"]
+__plugin_type__ = ("原神相关",)
+__plugin_version__ = 0.1
+__plugin_author__ = "HDU_Nbsp"
+__plugin_settings__ = {
+ "level": 5,
+ "default_status": True,
+ "limit_superuser": False,
+ "cmd": ["原神签到"],
+}
+
+mihoyobbs_matcher = on_command(
+ "米游社签到", aliases={"米游社我硬签"}, priority=5, block=True
+)
+
+
+@mihoyobbs_matcher.handle()
+async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command()):
+ await mihoyobbs_matcher.send("提交米游社签到申请")
+ return_data = await mihoyobbs_sign(event.user_id)
+ if return_data:
+ await mihoyobbs_matcher.finish(return_data)
+ else:
+ await mihoyobbs_matcher.finish("米游社签到失败,请查看控制台输出")
+
+
+async def mihoyobbs_sign(user_id):
+ uid = await Genshin.get_user_uid(user_id)
+ stuid = await Genshin.get_stuid(uid)
+ stoken = await Genshin.get_stoken(uid)
+ cookie = await Genshin.get_user_cookie(uid)
+ bbs = mihoyobbs.Mihoyobbs(stuid=stuid, stoken=stoken, cookie=cookie)
+ await bbs.init()
+ return_data = ""
+ if bbs.Task_do["bbs_Sign"] and bbs.Task_do["bbs_Read_posts"] and bbs.Task_do["bbs_Like_posts"] and \
+ bbs.Task_do["bbs_Share"]:
+ return_data += f"今天的米游社签到任务已经全部完成了!\n" \
+ f"一共获得{mihoyobbs.today_have_get_coins}个米游币\n目前有{mihoyobbs.Have_coins}个米游币"
+ logger.info(f"今天已经全部完成了!一共获得{mihoyobbs.today_have_get_coins}个米游币,目前有{mihoyobbs.Have_coins}个米游币")
+ else:
+ i = 0
+ print("开始签到")
+ print(mihoyobbs.today_have_get_coins)
+ while mihoyobbs.today_get_coins != 0 and i < 3:
+ # if i > 0:
+ await bbs.refresh_list()
+ await bbs.signing()
+ await bbs.read_posts()
+ await bbs.like_posts()
+ await bbs.share_post()
+ await bbs.get_tasks_list()
+ i += 1
+ return_data += "\n" + f"今天已经获得{mihoyobbs.today_have_get_coins}个米游币\n" \
+ f"还能获得{mihoyobbs.today_get_coins}个米游币\n目前有{mihoyobbs.Have_coins}个米游币"
+ logger.info(f"今天已经获得{mihoyobbs.today_have_get_coins}个米游币,"
+ f"还能获得{mihoyobbs.today_get_coins}个米游币,目前有{mihoyobbs.Have_coins}个米游币")
+ return return_data
diff --git a/plugins/genshin/query_user/mihoyobbs_sign/error.py b/plugins/genshin/query_user/mihoyobbs_sign/error.py
new file mode 100644
index 000000000..b5e6ff009
--- /dev/null
+++ b/plugins/genshin/query_user/mihoyobbs_sign/error.py
@@ -0,0 +1,6 @@
+class CookieError(Exception):
+ def __init__(self, info):
+ self.info = info
+
+ def __str__(self):
+ return repr(self.info)
diff --git a/plugins/genshin/query_user/mihoyobbs_sign/mihoyobbs.py b/plugins/genshin/query_user/mihoyobbs_sign/mihoyobbs.py
new file mode 100644
index 000000000..e8d67a8f6
--- /dev/null
+++ b/plugins/genshin/query_user/mihoyobbs_sign/mihoyobbs.py
@@ -0,0 +1,193 @@
+from services.log import logger
+from .error import CookieError
+from utils.http_utils import AsyncHttpx
+from .setting import *
+from .tools import *
+import json
+
+today_get_coins = 0
+today_have_get_coins = 0 # 这个变量以后可能会用上,先留着了
+Have_coins = 0
+
+
+class Mihoyobbs:
+ def __init__(self, stuid: str, stoken: str, cookie: str) -> None:
+ self.postsList = None
+ self.headers = {
+ "DS": get_ds(web=False),
+ "cookie": f'stuid={stuid};stoken={stoken}',
+ "x-rpc-client_type": mihoyobbs_Client_type,
+ "x-rpc-app_version": mihoyobbs_Version,
+ "x-rpc-sys_version": "6.0.1",
+ "x-rpc-channel": "miyousheluodi",
+ "x-rpc-device_id": get_device_id(cookie=cookie),
+ "x-rpc-device_name": random_text(random.randint(1, 10)),
+ "x-rpc-device_model": "Mi 10",
+ "Referer": "https://app.mihoyo.com",
+ "Host": "bbs-api.mihoyo.com",
+ "User-Agent": "okhttp/4.8.0"
+ }
+ self.Task_do = {
+ "bbs_Sign": False,
+ "bbs_Read_posts": False,
+ "bbs_Read_posts_num": 3,
+ "bbs_Like_posts": False,
+ "bbs_Like_posts_num": 5,
+ "bbs_Share": False
+ }
+
+ async def init(self):
+ await self.get_tasks_list()
+ # 如果这三个任务都做了就没必要获取帖子了
+ if self.Task_do["bbs_Read_posts"] and self.Task_do["bbs_Like_posts"] and self.Task_do["bbs_Share"]:
+ pass
+ else:
+ self.postsList = await self.get_list()
+
+ async def refresh_list(self) -> None:
+ self.postsList = await self.get_list()
+
+ # 获取任务列表,用来判断做了哪些任务
+ async def get_tasks_list(self):
+ global today_get_coins
+ global today_have_get_coins
+ global Have_coins
+ logger.info("正在获取任务列表")
+ req = await AsyncHttpx.get(url=bbs_Tasks_list, headers=self.headers)
+ data = req.json()
+ if "err" in data["message"] or data["retcode"] == -100:
+ logger.error("获取任务列表失败,你的cookie可能已过期,请重新设置cookie。")
+ raise CookieError('Cookie expires')
+ else:
+ today_get_coins = data["data"]["can_get_points"]
+ today_have_get_coins = data["data"]["already_received_points"]
+ Have_coins = data["data"]["total_points"]
+ # 如果当日可获取米游币数量为0直接判断全部任务都完成了
+ if today_get_coins == 0:
+ self.Task_do["bbs_Sign"] = True
+ self.Task_do["bbs_Read_posts"] = True
+ self.Task_do["bbs_Like_posts"] = True
+ self.Task_do["bbs_Share"] = True
+ else:
+ # 如果第0个大于或等于62则直接判定任务没做
+ if data["data"]["states"][0]["mission_id"] >= 62:
+ logger.info(f"今天可以获得{today_get_coins}个米游币")
+ pass
+ else:
+ logger.info(f"还有任务未完成,今天还能获得{today_get_coins}米游币")
+ for i in data["data"]["states"]:
+ # 58是讨论区签到
+ if i["mission_id"] == 58:
+ if i["is_get_award"]:
+ self.Task_do["bbs_Sign"] = True
+ # 59是看帖子
+ elif i["mission_id"] == 59:
+ if i["is_get_award"]:
+ self.Task_do["bbs_Read_posts"] = True
+ else:
+ self.Task_do["bbs_Read_posts_num"] -= i["happened_times"]
+ # 60是给帖子点赞
+ elif i["mission_id"] == 60:
+ if i["is_get_award"]:
+ self.Task_do["bbs_Like_posts"] = True
+ else:
+ self.Task_do["bbs_Like_posts_num"] -= i["happened_times"]
+ # 61是分享帖子
+ elif i["mission_id"] == 61:
+ if i["is_get_award"]:
+ self.Task_do["bbs_Share"] = True
+ # 分享帖子,是最后一个任务,到这里了下面都是一次性任务,直接跳出循环
+ break
+
+ # 获取要帖子列表
+ async def get_list(self) -> list:
+ temp_list = []
+ logger.info("正在获取帖子列表......")
+ req = await AsyncHttpx.get(url=bbs_List_url.format(mihoyobbs_List_Use[0]["forumId"]),
+ headers=self.headers)
+ data = req.json()["data"]["list"]
+ for n in range(5):
+ r_l = random.choice(data)
+ while r_l["post"]["subject"] in str(temp_list):
+ r_l = random.choice(data)
+ temp_list.append([r_l["post"]["post_id"], r_l["post"]["subject"]])
+ # temp_list.append([data["data"]["list"][n]["post"]["post_id"], data["data"]["list"][n]["post"]["subject"]])
+
+ logger.info("已获取{}个帖子".format(len(temp_list)))
+ return temp_list
+
+ # 进行签到操作
+ async def signing(self):
+ if self.Task_do["bbs_Sign"]:
+ logger.info("讨论区任务已经完成过了~")
+ else:
+ logger.info("正在签到......")
+ header = {}
+ header.update(self.headers)
+ for i in mihoyobbs_List_Use:
+ header["DS"] = get_ds2("", json.dumps({"gids": i["id"]}))
+ req = await AsyncHttpx.post(url=bbs_Sign_url, json={"gids": i["id"]}, headers=header)
+ data = req.json()
+ if "err" not in data["message"]:
+ logger.info(str(i["name"] + data["message"]))
+ time.sleep(random.randint(2, 8))
+ else:
+ logger.error("签到失败,你的cookie可能已过期,请重新设置cookie。")
+ raise CookieError('Cookie expires')
+
+ # 看帖子
+ async def read_posts(self):
+ if self.Task_do["bbs_Read_posts"]:
+ logger.info("看帖任务已经完成过了~")
+ else:
+ logger.info("正在看帖......")
+ for i in range(self.Task_do["bbs_Read_posts_num"]):
+ req = await AsyncHttpx.get(url=bbs_Detail_url.format(self.postsList[i][0]), headers=self.headers)
+ data = req.json()
+ if data["message"] == "OK":
+ logger.debug("看帖:{} 成功".format(self.postsList[i][1]))
+ time.sleep(random.randint(2, 8))
+
+ # 点赞
+ async def like_posts(self):
+ if self.Task_do["bbs_Like_posts"]:
+ logger.info("点赞任务已经完成过了~")
+ else:
+ logger.info("正在点赞......")
+ for i in range(self.Task_do["bbs_Like_posts_num"]):
+ req = await AsyncHttpx.post(url=bbs_Like_url, headers=self.headers,
+ json={"post_id": self.postsList[i][0], "is_cancel": False})
+ data = req.json()
+ if data["message"] == "OK":
+ logger.debug("点赞:{} 成功".format(self.postsList[i][1]))
+ # 判断取消点赞是否打开
+ # if config.config["mihoyobbs"]["un_like"] :
+ # time.sleep(random.randint(2, 8))
+ # req = httpx.post(url=bbs_Like_url, headers=self.headers,
+ # json={"post_id": self.postsList[i][0], "is_cancel": True})
+ # data = req.json()
+ # if data["message"] == "OK":
+ # logger.debug("取消点赞:{} 成功".format(self.postsList[i][1]))
+ time.sleep(random.randint(2, 8))
+
+ # 分享操作
+
+ async def share_post(self):
+ if self.Task_do["bbs_Share"]:
+ logger.info("分享任务已经完成过了~")
+ else:
+ logger.info("正在执行分享任务......")
+ for i in range(3):
+ req = await AsyncHttpx.get(url=bbs_Share_url.format(self.postsList[0][0]), headers=self.headers)
+ data = req.json()
+ if data["message"] == "OK":
+ logger.debug("分享:{} 成功".format(self.postsList[0][1]))
+ logger.info("分享任务执行成功......")
+ break
+ else:
+ logger.debug(f"分享任务执行失败,正在执行第{i + 2}次,共3次")
+ time.sleep(random.randint(2, 8))
+ time.sleep(random.randint(2, 8))
+
+
+
diff --git a/plugins/genshin/query_user/mihoyobbs_sign/setting.py b/plugins/genshin/query_user/mihoyobbs_sign/setting.py
new file mode 100644
index 000000000..e6b7e538d
--- /dev/null
+++ b/plugins/genshin/query_user/mihoyobbs_sign/setting.py
@@ -0,0 +1,124 @@
+# 米游社的Salt
+mihoyobbs_Salt = "z8DRIUjNDT7IT5IZXvrUAxyupA1peND9"
+mihoyobbs_Salt2 = "t0qEgfub6cvueAPgR5m9aQWWVciEer7v"
+mihoyobbs_Salt_web = "9nQiU3AV0rJSIBWgdynfoGMGKaklfbM7"
+# 米游社的版本
+mihoyobbs_Version = "2.34.1" # Slat和Version相互对应
+# 米游社的客户端类型
+mihoyobbs_Client_type = "2" # 1为ios 2为安卓
+mihoyobbs_Client_type_web = "5" # 4为pc web 5为mobile web
+# 米游社的分区列表
+mihoyobbs_List = [{
+ "id": "1",
+ "forumId": "1",
+ "name": "崩坏3",
+ "url": "https://bbs.mihoyo.com/bh3/"
+}, {
+ "id": "2",
+ "forumId": "26",
+ "name": "原神",
+ "url": "https://bbs.mihoyo.com/ys/"
+}, {
+ "id": "3",
+ "forumId": "30",
+ "name": "崩坏2",
+ "url": "https://bbs.mihoyo.com/bh2/"
+}, {
+ "id": "4",
+ "forumId": "37",
+ "name": "未定事件簿",
+ "url": "https://bbs.mihoyo.com/wd/"
+}, {
+ "id": "5",
+ "forumId": "34",
+ "name": "大别野",
+ "url": "https://bbs.mihoyo.com/dby/"
+}, {
+ "id": "6",
+ "forumId": "52",
+ "name": "崩坏:星穹铁道",
+ "url": "https://bbs.mihoyo.com/sr/"
+}, {
+ "id": "8",
+ "forumId": "57",
+ "name": "绝区零",
+ "url": "https://bbs.mihoyo.com/zzz/"
+}]
+
+game_id2name = {
+ "bh2_cn": "崩坏2",
+ "bh3_cn": "崩坏3",
+ "nxx_cn": "未定事件簿",
+ "hk4e_cn": "原神",
+}
+# Config Load之后run里面进行列表的选择
+mihoyobbs_List_Use = [{
+ "id": "2",
+ "forumId": "26",
+ "name": "原神",
+ "url": "https://bbs.mihoyo.com/ys/"
+},
+ # 不玩原神可以把签到讨论区换为大别墅
+ # {
+ # "id": "5",
+ # "forumId": "34",
+ # "name": "大别野",
+ # "url": "https://bbs.mihoyo.com/dby/"
+ # }
+]
+
+# 游戏签到的请求头
+headers = {
+ 'Accept': 'application/json, text/plain, */*',
+ 'DS': "",
+ 'Origin': 'https://webstatic.mihoyo.com',
+ 'x-rpc-app_version': mihoyobbs_Version,
+ 'User-Agent': 'Mozilla/5.0 (Linux; Android 12; Unspecified Device) AppleWebKit/537.36 (KHTML, like Gecko) '
+ f'Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36 miHoYoBBS/{mihoyobbs_Version}',
+ 'x-rpc-client_type': mihoyobbs_Client_type_web,
+ 'Referer': '',
+ 'Accept-Encoding': 'gzip, deflate',
+ 'Accept-Language': 'zh-CN,en-US;q=0.8',
+ 'X-Requested-With': 'com.mihoyo.hyperion',
+ "Cookie": "",
+ 'x-rpc-device_id': ""
+}
+
+# 通用设置
+bbs_Api = "https://bbs-api.mihoyo.com"
+web_Api = "https://api-takumi.mihoyo.com"
+account_Info_url = web_Api + "/binding/api/getUserGameRolesByCookie?game_biz="
+
+# 米游社的API列表
+bbs_Cookie_url = "https://webapi.account.mihoyo.com/Api/cookie_accountinfo_by_loginticket?login_ticket={}"
+bbs_Cookie_url2 = web_Api + "/auth/api/getMultiTokenByLoginTicket?login_ticket={}&token_types=3&uid={}"
+bbs_Tasks_list = bbs_Api + "/apihub/sapi/getUserMissionsState" # 获取任务列表
+bbs_Sign_url = bbs_Api + "/apihub/app/api/signIn" # post
+bbs_List_url = bbs_Api + "/post/api/getForumPostList?forum_id={}&is_good=false&is_hot=false&page_size=20&sort_type=1"
+bbs_Detail_url = bbs_Api + "/post/api/getPostFull?post_id={}"
+bbs_Share_url = bbs_Api + "/apihub/api/getShareConf?entity_id={}&entity_type=1"
+bbs_Like_url = bbs_Api + "/apihub/sapi/upvotePost" # post json
+
+# 崩坏2自动签到相关的相关设置
+honkai2_Act_id = "e202203291431091"
+honkai2_checkin_rewards = f'{web_Api}/event/luna/home?lang=zh-cn&act_id={honkai2_Act_id}'
+honkai2_Is_signurl = web_Api + "/event/luna/info?lang=zh-cn&act_id={}®ion={}&uid={}"
+honkai2_Sign_url = web_Api + "/event/luna/sign"
+
+# 崩坏3自动签到相关的设置
+honkai3rd_Act_id = "e202207181446311"
+honkai3rd_checkin_rewards = f'{web_Api}/event/luna/home?lang=zh-cn&act_id={honkai3rd_Act_id}'
+honkai3rd_Is_signurl = web_Api + "/event/luna/info?lang=zh-cn&act_id={}®ion={}&uid={}"
+honkai3rd_Sign_url = web_Api + "/event/luna/sign"
+
+# 未定事件簿自动签到相关设置
+tearsofthemis_Act_id = "e202202251749321"
+tearsofthemis_checkin_rewards = f'{web_Api}/event/luna/home?lang=zh-cn&act_id={tearsofthemis_Act_id}'
+tearsofthemis_Is_signurl = honkai2_Is_signurl
+tearsofthemis_Sign_url = honkai2_Sign_url # 和二崩完全一致
+
+# 原神自动签到相关的设置
+genshin_Act_id = "e202009291139501"
+genshin_checkin_rewards = f'{web_Api}/event/bbs_sign_reward/home?act_id={genshin_Act_id}'
+genshin_Is_signurl = web_Api + "/event/bbs_sign_reward/info?act_id={}®ion={}&uid={}"
+genshin_Signurl = web_Api + "/event/bbs_sign_reward/sign"
diff --git a/plugins/genshin/query_user/mihoyobbs_sign/tools.py b/plugins/genshin/query_user/mihoyobbs_sign/tools.py
new file mode 100644
index 000000000..2552330ef
--- /dev/null
+++ b/plugins/genshin/query_user/mihoyobbs_sign/tools.py
@@ -0,0 +1,65 @@
+import uuid
+import time
+import random
+import string
+import hashlib
+from .setting import *
+
+
+# md5计算
+def md5(text: str) -> str:
+ md5 = hashlib.md5()
+ md5.update(text.encode())
+ return md5.hexdigest()
+
+
+# 随机文本
+def random_text(num: int) -> str:
+ return ''.join(random.sample(string.ascii_lowercase + string.digits, num))
+
+
+# 时间戳
+def timestamp() -> int:
+ return int(time.time())
+
+
+# 获取请求Header里的DS 当web为true则生成网页端的DS
+def get_ds(web: bool) -> str:
+ if web:
+ n = mihoyobbs_Salt_web
+ else:
+ n = mihoyobbs_Salt
+ i = str(timestamp())
+ r = random_text(6)
+ c = md5("salt=" + n + "&t=" + i + "&r=" + r)
+ return f"{i},{r},{c}"
+
+
+# 获取请求Header里的DS(版本2) 这个版本ds之前见到都是查询接口里的
+def get_ds2(q: str, b: str) -> str:
+ n = mihoyobbs_Salt2
+ i = str(timestamp())
+ r = str(random.randint(100001, 200000))
+ add = f'&b={b}&q={q}'
+ c = md5("salt=" + n + "&t=" + i + "&r=" + r + add)
+ return f"{i},{r},{c}"
+
+
+# 生成一个device id
+def get_device_id(cookie) -> str:
+ return str(uuid.uuid3(uuid.NAMESPACE_URL, cookie))
+
+
+# 获取签到的奖励名称
+def get_item(raw_data: dict) -> str:
+ temp_name = raw_data["name"]
+ temp_cnt = raw_data["cnt"]
+ return f"{temp_name}x{temp_cnt}"
+
+
+# 获取明天早晨0点的时间戳
+def next_day() -> int:
+ now_time = int(time.time())
+ next_day_time = now_time - now_time % 86400 + time.timezone + 86400
+ return next_day_time
+
diff --git a/plugins/genshin/query_user/query_memo/data_source.py b/plugins/genshin/query_user/query_memo/data_source.py
index f1f7fb77d..007c90de8 100644
--- a/plugins/genshin/query_user/query_memo/data_source.py
+++ b/plugins/genshin/query_user/query_memo/data_source.py
@@ -26,12 +26,17 @@
@driver.on_startup
async def _():
for name, url in zip(
- ["resin.png", "task.png", "resin_discount.png"],
+ [
+ "resin.png", "task.png", "resin_discount.png", "chengehu.png",
+ "zhibian.png"
+ ],
[
"https://upload-bbs.mihoyo.com/upload/2021/09/29/8819732/54266243c7d15ba31690c8f5d63cc3c6_71491376413333325"
"20.png?x-oss-process=image//resize,s_600/quality,q_80/auto-orient,0/interlace,1/format,png",
"https://patchwiki.biligame.com/images/ys/thumb/c/cc/6k6kuj1kte6m1n7hexqfrn92z6h4yhh.png/60px-委托任务logo.png",
"https://patchwiki.biligame.com/images/ys/d/d9/t1hv6wpucbwucgkhjntmzroh90nmcdv.png",
+ "https://s3.bmp.ovh/imgs/2022/07/24/28d9338c7da4bcb2.png",
+ "https://genshin.honeyhunterworld.com/img/gadget/i_3016.png",
],
):
file = memo_path / name
@@ -40,7 +45,8 @@ async def _():
logger.info(f"已下载原神便签资源 -> {file}...")
-async def get_user_memo(user_id: int, uid: int, uname: str) -> Optional[Union[str, MessageSegment]]:
+async def get_user_memo(user_id: int, uid: int,
+ uname: str) -> Optional[Union[str, MessageSegment]]:
uid = str(uid)
if uid[0] in ["1", "2"]:
server_id = "cn_gf01"
@@ -54,12 +60,16 @@ async def get_user_memo(user_id: int, uid: int, uname: str) -> Optional[Union[st
async def get_memo(uid: str, server_id: str) -> "Union[str, dict], int":
try:
req = await AsyncHttpx.get(
- url=f"https://api-takumi-record.mihoyo.com/game_record/app/genshin/api/dailyNote?server={server_id}&role_id={uid}",
+ url=
+ f"https://api-takumi-record.mihoyo.com/game_record/app/genshin/api/dailyNote?server={server_id}&role_id={uid}",
headers={
"DS": get_ds(f"role_id={uid}&server={server_id}"),
- "x-rpc-app_version": Config.get_config("genshin", "mhyVersion"),
- "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1",
- "x-rpc-client_type": Config.get_config("genshin", "client_type"),
+ "x-rpc-app_version": Config.get_config("genshin",
+ "mhyVersion"),
+ "User-Agent":
+ "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1",
+ "x-rpc-client_type": Config.get_config("genshin",
+ "client_type"),
"Referer": "https://webstatic.mihoyo.com/",
"Cookie": await Genshin.get_user_cookie(int(uid))
},
@@ -75,11 +85,18 @@ async def get_memo(uid: str, server_id: str) -> "Union[str, dict], int":
return "发生了一些错误,请稍后再试", 998
-def create_border(
- image_name: str, content: str, notice_text: str, value: str
-) -> BuildImage:
- border = BuildImage(500, 100, color="#E0D9D1", font="HYWenHei-85W.ttf", font_size=20)
- text_bk = BuildImage(350, 96, color="#F5F1EB", font_size=23, font="HYWenHei-85W.ttf")
+def create_border(image_name: str, content: str, notice_text: str,
+ value: str) -> BuildImage:
+ border = BuildImage(500,
+ 75,
+ color="#E0D9D1",
+ font="HYWenHei-85W.ttf",
+ font_size=20)
+ text_bk = BuildImage(350,
+ 75,
+ color="#F5F1EB",
+ font_size=23,
+ font="HYWenHei-85W.ttf")
_x = 70 if image_name == "resin.png" else 50
_px = 10 if image_name == "resin.png" else 20
text_bk.paste(
@@ -88,7 +105,7 @@ def create_border(
True,
center_type="by_height",
)
- text_bk.text((87, 20), content)
+ text_bk.text((87, 15), content)
text_bk.paste(
BuildImage(
0,
@@ -98,18 +115,19 @@ def create_border(
font="HYWenHei-85W.ttf",
font_size=17,
),
- (87, 50),
+ (87, 45),
True,
)
font_width, _ = border.getsize(value)
- border.text((350 + 76 - int(font_width / 2), 0), value, center_type="by_height")
+ border.text((350 + 76 - int(font_width / 2), 0),
+ value,
+ center_type="by_height")
border.paste(text_bk, (2, 0), center_type="by_height")
return border
-async def parse_data_and_draw(
- user_id: int, uid: str, server_id: str, uname: str
-) -> Union[str, MessageSegment]:
+async def parse_data_and_draw(user_id: int, uid: str, server_id: str,
+ uname: str) -> Union[str, MessageSegment]:
data, code = await get_memo(uid, server_id)
if code != 200:
return data
@@ -120,13 +138,11 @@ async def parse_data_and_draw(
if not role_avatar.exists():
await AsyncHttpx.download_file(x["avatar_side_icon"], role_avatar)
return await asyncio.get_event_loop().run_in_executor(
- None, _parse_data_and_draw, data, user_avatar, uid, uname
- )
+ None, _parse_data_and_draw, data, user_avatar, uid, uname)
-def _parse_data_and_draw(
- data: dict, user_avatar: BytesIO, uid: int, uname: str
-) -> Union[str, MessageSegment]:
+def _parse_data_and_draw(data: dict, user_avatar: BytesIO, uid: int,
+ uname: str) -> Union[str, MessageSegment]:
current_resin = data["current_resin"] # 当前树脂
max_resin = data["max_resin"] # 最大树脂
resin_recovery_time = data["resin_recovery_time"] # 树脂全部回复时间
@@ -137,17 +153,37 @@ def _parse_data_and_draw(
current_expedition_num = data["current_expedition_num"] # 当前挖矿人数
max_expedition_num = data["max_expedition_num"] # 每日挖矿最大人数
expeditions = data["expeditions"] # 挖矿详情
-
+ current_coin = data["current_home_coin"] # 当前宝钱
+ max_coin = data["max_home_coin"] # 最大宝钱
+ coin_recovery_time = data["home_coin_recovery_time"] # 宝钱全部回复时间
+ transformer_available = data["transformer"]["obtained"] # 参量质变仪可获取
+ transformer_state = data["transformer"]["recovery_time"][
+ "reached"] # 参量质变仪状态
+ transformer_recovery_time = data["transformer"]["recovery_time"][
+ "Day"] # 参量质变仪回复时间
+ transformer_recovery_hour = data["transformer"]["recovery_time"][
+ "Hour"] # 参量质变仪回复时间
+ coin_minute, coin_second = divmod(int(coin_recovery_time), 60)
+ coin_hour, coin_minute = divmod(coin_minute, 60)
+ #print(data)
minute, second = divmod(int(resin_recovery_time), 60)
hour, minute = divmod(minute, 60)
- A = BuildImage(1030, 520, color="#f1e9e1", font_size=15, font="HYWenHei-85W.ttf")
+ A = BuildImage(1030,
+ 570,
+ color="#f1e9e1",
+ font_size=15,
+ font="HYWenHei-85W.ttf")
A.text((10, 15), "原神便笺 | Create By ZhenXun", (198, 186, 177))
ava = BuildImage(100, 100, background=user_avatar)
ava.circle()
A.paste(ava, (40, 40), True)
A.paste(
- BuildImage(0, 0, plain_text=uname, font_size=20, font="HYWenHei-85W.ttf"),
+ BuildImage(0,
+ 0,
+ plain_text=uname,
+ font_size=20,
+ font="HYWenHei-85W.ttf"),
(160, 62),
True,
)
@@ -177,26 +213,52 @@ def _parse_data_and_draw(
"今日委托已全部完成" if finished_task_num == total_task_num else "今日委托完成数量不足",
f"{finished_task_num}/{total_task_num}",
)
- A.paste(border, (10, 265))
+ A.paste(border, (10, 235))
border = create_border(
"resin_discount.png",
"值得铭记的强敌",
"本周剩余消耗减半次数",
f"{remain_resin_discount_num}/{resin_discount_num_limit}",
)
- A.paste(border, (10, 375))
- expeditions_border = BuildImage(
- 470, 430, color="#E0D9D1", font="HYWenHei-85W.ttf", font_size=20
+ A.paste(border, (10, 315))
+ border = create_border(
+ "chengehu.png",
+ "洞天财翁-洞天宝钱",
+ "洞天财翁已达到存储上限"
+ if current_coin == max_coin else f"{coin_hour}小时{coin_minute}分钟后存满",
+ f"{current_coin}/{max_coin}",
)
- expeditions_text = BuildImage(
- 466, 426, color="#F5F1EB", font_size=23, font="HYWenHei-85W.ttf"
+ A.paste(border, (10, 395))
+ border = create_border(
+ "zhibian.png",
+ "参量质变仪",
+ "不存在" if not transformer_available else
+ "已准备完成 " if transformer_state else f"{transformer_recovery_hour}小时后可使用" if not transformer_recovery_time else f"{transformer_recovery_time}天后可使用",
+ "不存在" if not transformer_available else
+ "可使用" if transformer_state else "冷却中",
)
+ A.paste(border, (10, 475))
+
+ expeditions_border = BuildImage(470,
+ 510,
+ color="#E0D9D1",
+ font="HYWenHei-85W.ttf",
+ font_size=20)
+ expeditions_text = BuildImage(466,
+ 506,
+ color="#F5F1EB",
+ font_size=23,
+ font="HYWenHei-85W.ttf")
expeditions_text.text(
- (5, 5), f"探索派遣限制{current_expedition_num}/{max_expedition_num}", (100, 100, 98)
- )
+ (5, 5), f"探索派遣限制{current_expedition_num}/{max_expedition_num}",
+ (100, 100, 98))
h = 45
for x in expeditions:
- _bk = BuildImage(400, 66, color="#ECE3D8", font="HYWenHei-85W.ttf", font_size=21)
+ _bk = BuildImage(400,
+ 82,
+ color="#ECE3D8",
+ font="HYWenHei-85W.ttf",
+ font_size=21)
file_name = x["avatar_side_icon"].split("_")[-1]
role_avatar = memo_path / "role_avatar" / file_name
_ava_img = BuildImage(75, 75, background=role_avatar)
@@ -227,7 +289,7 @@ def _parse_data_and_draw(
_bk.circle_corner(20)
expeditions_text.paste(_bk, (25, h), True)
- h += 75
+ h += 75 + 16
expeditions_border.paste(expeditions_text, center_type="center")
diff --git a/plugins/one_friend/__init__.py b/plugins/one_friend/__init__.py
index 69b4d51c6..2c6add408 100755
--- a/plugins/one_friend/__init__.py
+++ b/plugins/one_friend/__init__.py
@@ -27,7 +27,7 @@
}
one_friend = on_regex(
- "^我.*?朋友.*?[想问问|说|让我问问|想问|让我问|想知道|让我帮他问问|让我帮他问|让我帮忙问|让我帮忙问问|问](.*)",
+ "^我.{0,4}朋友.{0,2}(?:想问问|说|让我问问|想问|让我问|想知道|让我帮他问问|让我帮他问|让我帮忙问|让我帮忙问问|问)(.{0,30})$",
priority=4,
block=True,
)
diff --git a/plugins/parse_bilibili_json.py b/plugins/parse_bilibili_json.py
index ca6628aa0..ff20ed8e7 100755
--- a/plugins/parse_bilibili_json.py
+++ b/plugins/parse_bilibili_json.py
@@ -114,7 +114,7 @@ async def _(event: GroupMessageEvent):
msg = msg[index + 2 : index + 11]
if is_number(msg):
url = f"https://www.bilibili.com/video/{msg}"
- vd_info = await video.get_video_base_info(msg)
+ vd_info = await video.get_video_base_info('av' + msg)
elif "https://b23.tv" in msg:
url = "https://" + msg[msg.find("b23.tv"): msg.find("b23.tv") + 14]
async with aiohttp.ClientSession(
diff --git a/plugins/pix_gallery/_model/omega_pixiv_illusts.py b/plugins/pix_gallery/_model/omega_pixiv_illusts.py
index 71cc30076..02ac051aa 100644
--- a/plugins/pix_gallery/_model/omega_pixiv_illusts.py
+++ b/plugins/pix_gallery/_model/omega_pixiv_illusts.py
@@ -1,4 +1,4 @@
-from typing import Optional, List
+from typing import Optional, List, Tuple
from datetime import datetime
from services.db_context import db
@@ -113,7 +113,7 @@ async def check_exists(cls, pid: int) -> bool:
return bool(query)
@classmethod
- async def get_keyword_num(cls, tags: List[str] = None) -> "int, int, int":
+ async def get_keyword_num(cls, tags: List[str] = None) -> Tuple[int, int, int]:
"""
说明:
获取相关关键词(keyword, tag)在图库中的数量
diff --git a/plugins/sign_in/utils.py b/plugins/sign_in/utils.py
index 885aa8ac7..f1a62bc61 100755
--- a/plugins/sign_in/utils.py
+++ b/plugins/sign_in/utils.py
@@ -24,8 +24,6 @@
import asyncio
import random
import nonebot
-import time
-import locale
import os
diff --git a/plugins/word_bank/__init__.py b/plugins/word_bank/__init__.py
index 3e8535db9..76b267046 100644
--- a/plugins/word_bank/__init__.py
+++ b/plugins/word_bank/__init__.py
@@ -10,25 +10,4 @@
default_value=5
)
-Config.add_plugin_config(
- "word_bank",
- "WORD_BANK_FUZZY",
- False,
- help_="模糊匹配",
- default_value=False
-)
-Config.add_plugin_config(
- "word_bank",
- "WORD_BANK_KEY",
- True,
- help_="关键字匹配",
- default_value=True
-)
-Config.add_plugin_config(
- "word_bank",
- "WORD_BANK_MIX",
- 25,
- help_="查看词条时图片内最多显示条数",
- default_value=25
-)
-nonebot.load_plugins("plugins/word_bank")
+nonebot.load_plugins("test/word_bank")
diff --git a/plugins/word_bank/_config.py b/plugins/word_bank/_config.py
new file mode 100644
index 000000000..d1f6ab671
--- /dev/null
+++ b/plugins/word_bank/_config.py
@@ -0,0 +1,23 @@
+
+
+scope2int = {
+ "全局": 0,
+ "群聊": 1,
+ "私聊": 2,
+}
+
+type2int = {
+ "精准": 0,
+ "模糊": 1,
+ "正则": 2,
+ "图片": 3,
+}
+
+int2type = {
+ 0: "精准",
+ 1: "模糊",
+ 2: "正则",
+ 3: "图片",
+}
+
+
diff --git a/plugins/word_bank/_data_source.py b/plugins/word_bank/_data_source.py
index 62b1be05b..11a42bb4b 100644
--- a/plugins/word_bank/_data_source.py
+++ b/plugins/word_bank/_data_source.py
@@ -1,54 +1,208 @@
-from .model import WordBank
-from typing import Union
-
-
-class WordBankBuilder:
-
- def __init__(self, user_id: int, group_id: int, problem: str):
- self._data = {
- "user_id": user_id,
- "group_id": group_id}
- self.problem = problem
-
- def set_placeholder(self, id_: int, placeholder: Union[str, int]):
- """
- 设置占位符
- :param id_: 站位id
- :param placeholder: 占位符内容
- """
- if self._data.get("placeholder") is None:
- self._data["placeholder"] = []
- self._data["placeholder"].append((id_, placeholder))
-
- def set_answer(self, answer: str):
- """
- 设置回答
- :param answer: 回答
- """
- self._data["answer"] = answer
-
- def set_problem(self, problem: str):
- """
- 设置问题
- :param problem: 问题
- """
- self._data["problem"] = problem
-
- async def save(self, search_type):
- user_id = self._data["user_id"]
- group_id = self._data["group_id"]
- problem = self._data["problem"]
- answer = self._data["answer"]
- placeholder = self._data.get("placeholder")
- return await WordBank.add_problem_answer(user_id, group_id, search_type, problem, answer, placeholder)
-
- async def update(self, index):
- user_id = self._data["user_id"]
- group_id = self._data["group_id"]
- problem = self._data["problem"]
- answer = self._data["answer"]
- placeholder = self._data.get("placeholder")
- return await WordBank.update_problem_answer(user_id, group_id, problem, answer, index, placeholder)
-
- def __str__(self):
- return str(self._data)
+from pathlib import Path
+
+from nonebot.adapters.onebot.v11 import Message, MessageSegment
+
+from services import logger
+from utils.image_utils import text2image
+from utils.message_builder import image
+from ._model import WordBank
+from typing import Optional, Tuple, Union, List, Any
+from utils.utils import is_number
+import nonebot
+
+driver = nonebot.get_driver()
+
+
+async def get_problem_str(
+ id_: Union[str, int], group_id: Optional[int] = None, word_scope: int = 1
+) -> Tuple[str, int]:
+ """
+ 说明:
+ 通过id获取问题字符串
+ 参数:
+ :param id_: 下标
+ :param group_id: 群号
+ :param word_scope: 获取类型
+ """
+ if word_scope in [0, 2]:
+ all_problem = await WordBank.get_problem_by_scope(word_scope)
+ else:
+ all_problem = await WordBank.get_group_all_problem(group_id)
+ if id_.startswith("id:"):
+ id_ = id_.split(":")[-1]
+ if not is_number(id_) or int(id_) < 0 or int(id_) > len(all_problem):
+ return "id必须为数字且在范围内", 999
+ return all_problem[int(id_)][0], 200
+
+
+async def update_word(params: str, group_id: Optional[int] = None, word_scope: int = 1) -> str:
+ """
+ 说明:
+ 修改群词条
+ 参数:
+ :param params: 参数
+ :param group_id: 群号
+ :param word_scope: 词条范围
+ """
+ return await word_handle(params, group_id, "update", word_scope)
+
+
+async def delete_word(params: str, group_id: Optional[int] = None, word_scope: int = 1) -> str:
+ """
+ 说明:
+ 删除群词条
+ 参数:
+ :param params: 参数
+ :param group_id: 群号
+ :param word_scope: 词条范围
+ """
+ return await word_handle(params, group_id, "delete", word_scope)
+
+
+async def word_handle(params: str, group_id: Optional[int], type_: str, word_scope: int = 0) -> str:
+ """
+ 说明:
+ 词条操作
+ 参数:
+ :param params: 参数
+ :param group_id: 群号
+ :param type_: 类型
+ :param word_scope: 词条范围
+ """
+ params = params.split()
+ problem = params[0]
+ if problem.startswith("id:"):
+ problem, code = await get_problem_str(problem, group_id, word_scope)
+ if code != 200:
+ return problem
+ if type_ == "delete":
+ index = params[1] if len(params) > 1 else None
+ if index:
+ answer_num = len(await WordBank.get_problem_all_answer(problem, group_id))
+ if not is_number(index) or int(index) < 0 or int(index) > answer_num:
+ return "指定回答下标id必须为数字且在范围内"
+ index = int(index)
+ await WordBank.delete_group_problem(problem, group_id, index, word_scope)
+ return "删除词条成功"
+ if type_ == "update":
+ replace_str = params[1]
+ await WordBank.update_group_problem(problem, replace_str, group_id, word_scope=word_scope)
+ return "修改词条成功"
+
+
+async def show_word(
+ problem: str,
+ id_: Optional[int],
+ gid: Optional[int],
+ group_id: Optional[int] = None,
+ word_scope: Optional[int] = None,
+) -> Union[str, List[Union[str, Message]]]:
+ if problem:
+ msg_list = []
+ if word_scope is not None:
+ problem = (await WordBank.get_problem_by_scope(word_scope))[id_][0]
+ id_ = None
+ _problem_list = await WordBank.get_problem_all_answer(
+ problem, id_ if id_ is not None else gid, group_id if gid is None else None, word_scope
+ )
+ for index, msg in enumerate(_problem_list):
+ if isinstance(msg, Message):
+ temp = ""
+ for seg in msg:
+ if seg.type == "text":
+ temp += seg
+ elif seg.type == "face":
+ temp += f"[face:{seg.data.id}]"
+ elif seg.type == "at":
+ temp += f'[at:{seg.data["qq"]}]'
+ elif seg.type == "image":
+ temp += f"[image]"
+ msg += temp
+ msg_list.append(f"{index}." + msg if isinstance(msg, str) else msg[1])
+ msg_list = [
+ f'词条:{problem or (f"id: {id_}" if id_ is not None else f"gid: {gid}")} 的回答'
+ ] + msg_list
+ return msg_list
+ else:
+ if group_id:
+ _problem_list = await WordBank.get_group_all_problem(group_id)
+ else:
+ _problem_list = await WordBank.get_problem_by_scope(word_scope)
+ global_problem_list = await WordBank.get_problem_by_scope(0)
+ if not _problem_list and not global_problem_list:
+ return "未收录任何词条.."
+ msg_list = await build_message(_problem_list)
+ global_msg_list = await build_message(global_problem_list)
+ if global_msg_list:
+ msg_list.append("###以下为全局词条###")
+ msg_list = msg_list + global_msg_list
+ return msg_list
+
+
+async def build_message(_problem_list: List[Tuple[Any, Union[MessageSegment, str]]]):
+ index = 0
+ str_temp_list = []
+ msg_list = []
+ temp_str = ""
+ for _, problem in _problem_list:
+ if len(temp_str.split("\n")) > 50:
+ img = await text2image(
+ temp_str,
+ padding=10,
+ color="#f9f6f2",
+ )
+ msg_list.append(image(b64=img.pic2bs4()))
+ temp_str = ""
+ if isinstance(problem, str):
+ if problem not in str_temp_list:
+ str_temp_list.append(problem)
+ temp_str += f"{index}. {problem}\n"
+ else:
+ if temp_str:
+ img = await text2image(
+ temp_str,
+ padding=10,
+ color="#f9f6f2",
+ )
+ msg_list.append(image(b64=img.pic2bs4()))
+ temp_str = ""
+ msg_list.append(f"{index}." + problem)
+ index += 1
+ if temp_str:
+ img = await text2image(
+ temp_str,
+ padding=10,
+ color="#f9f6f2",
+ )
+ msg_list.append(image(b64=img.pic2bs4()))
+ return msg_list
+
+
+@driver.on_startup
+async def _():
+ try:
+ from ._old_model import WordBank as OldWordBank
+ except ModuleNotFoundError:
+ return
+ if await WordBank.get_group_all_problem(0):
+ return
+ logger.info('开始迁移词条 纯文本 数据')
+ word_list = await OldWordBank.get_all()
+ for word in word_list:
+ problem: str = word.problem
+ user_id = word.user_qq
+ group_id = word.group_id
+ format_ = word.format
+ answer = word.answer
+ # 仅对纯文本做处理
+ if '[CQ' not in problem and '[CQ' not in answer and '[_to_me' not in problem:
+ if not format_:
+ await WordBank.add_problem_answer(user_id, group_id, 1, 0, problem, answer)
+ await WordBank.add_problem_answer(0, 0, 999, 0, '_[OK', '_[OK')
+ logger.info('词条 纯文本 数据迁移完成')
+ (Path() / 'plugins' / 'word_bank' / '_old_model.py').unlink()
+
+
+
+
+
diff --git a/plugins/word_bank/_model.py b/plugins/word_bank/_model.py
new file mode 100644
index 000000000..816d25983
--- /dev/null
+++ b/plugins/word_bank/_model.py
@@ -0,0 +1,459 @@
+import time
+from nonebot.internal.adapter.template import MessageTemplate
+from nonebot.adapters.onebot.v11 import (
+ Message,
+ MessageEvent,
+ GroupMessageEvent,
+ MessageSegment,
+)
+from services.db_context import db
+from typing import Optional, List, Union, Tuple, Any
+from datetime import datetime
+from configs.path_config import DATA_PATH
+import random
+from ._config import int2type
+from utils.image_utils import get_img_hash
+from utils.http_utils import AsyncHttpx
+import re
+
+from utils.message_builder import image, face, at
+from utils.utils import get_message_img
+
+path = DATA_PATH / "word_bank"
+
+
+class WordBank(db.Model):
+ __tablename__ = "word_bank2"
+
+ id = db.Column(db.Integer(), primary_key=True)
+ user_qq = db.Column(db.BigInteger(), nullable=False)
+ group_id = db.Column(db.Integer())
+ word_scope = db.Column(
+ db.Integer(), nullable=False, default=0
+ ) # 生效范围 0: 全局 1: 群聊 2: 私聊
+ word_type = db.Column(
+ db.Integer(), nullable=False, default=0
+ ) # 词条类型 0: 完全匹配 1: 模糊 2: 正则 3: 图片
+ status = db.Column(db.Boolean(), nullable=False, default=True) # 词条状态
+ problem = db.Column(db.String(), nullable=False) # 问题,为图片时使用图片hash
+ answer = db.Column(db.String(), nullable=False) # 回答
+ placeholder = db.Column(db.String()) # 占位符
+ image_path = db.Column(db.String()) # 使用图片作为问题时图片存储的路径
+ create_time = db.Column(db.DateTime(), nullable=False)
+ update_time = db.Column(db.DateTime(), nullable=False)
+
+ @classmethod
+ async def exists(
+ cls,
+ user_id: Optional[int],
+ group_id: Optional[int],
+ problem: str,
+ word_scope: Optional[int] = None,
+ word_type: Optional[int] = None,
+ ) -> bool:
+ """
+ 说明:
+ 检测问题是否存在
+ 参数:
+ :param user_id: 用户id
+ :param group_id: 群号
+ :param problem: 问题
+ :param word_scope: 词条范围
+ :param word_type: 词条类型
+ """
+ query = cls.query.where(cls.problem == problem)
+ if user_id:
+ query = query.where(cls.user_qq == user_id)
+ if group_id:
+ query = query.where(cls.group_id == group_id)
+ if word_type:
+ query = query.where(cls.word_type == word_type)
+ if word_scope:
+ query = query.where(cls.word_scope == word_scope)
+ return bool(await query.gino.first())
+
+ @classmethod
+ async def add_problem_answer(
+ cls,
+ user_id: int,
+ group_id: Optional[int],
+ word_scope: int,
+ word_type: int,
+ problem: Union[str, Message],
+ answer: Union[str, Message],
+ ):
+ """
+ 说明:
+ 添加或新增一个问答
+ 参数:
+ :param user_id: 用户id
+ :param group_id: 群号
+ :param word_scope: 词条范围,
+ :param word_type: 词条类型,
+ :param problem: 问题
+ :param answer: 回答
+ """
+ # 对图片做额外处理
+ image_path = None
+ if word_type == 3:
+ url = get_message_img(problem)[0]
+ _file = (
+ path / "problem" / f"{group_id}" / f"{user_id}_{int(time.time())}.jpg"
+ )
+ _file.parent.mkdir(exist_ok=True, parents=True)
+ await AsyncHttpx.download_file(url, _file)
+ problem = str(get_img_hash(_file))
+ image_path = f"problem/{group_id}/{user_id}_{int(time.time())}.jpg"
+ answer, _list = await cls._answer2format(answer, user_id, group_id)
+ await cls.create(
+ user_qq=user_id,
+ group_id=group_id,
+ word_scope=word_scope,
+ word_type=word_type,
+ status=True,
+ problem=problem,
+ answer=answer,
+ image_path=image_path,
+ placeholder=",".join(_list),
+ create_time=datetime.now().replace(microsecond=0),
+ update_time=datetime.now().replace(microsecond=0),
+ )
+
+ @classmethod
+ async def _answer2format(
+ cls, answer: Union[str, Message], user_id: int, group_id: int
+ ) -> Tuple[str, List[Any]]:
+ """
+ 说明:
+ 将CQ码转化为占位符
+ 参数:
+ :param answer: 回答内容
+ :param user_id: 用户id
+ :param group_id: 群号
+ """
+ if isinstance(answer, str):
+ return answer, []
+ _list = []
+ text = ""
+ index = 0
+ for seg in answer:
+ if isinstance(seg, str):
+ text += seg
+ elif seg.type == "text":
+ text += seg.data["text"]
+ elif seg.type == "face":
+ text += f"[face:placeholder_{index}]"
+ _list.append(seg.data.id)
+ elif seg.type == "at":
+ text += f"[at:placeholder_{index}]"
+ _list.append(seg.data["qq"])
+ else:
+ text += f"[image:placeholder_{index}]"
+ index += 1
+ t = int(time.time())
+ _file = path / "answer" / f"{group_id}" / f"{user_id}_{t}.jpg"
+ _file.parent.mkdir(exist_ok=True, parents=True)
+ await AsyncHttpx.download_file(seg.data["url"], _file)
+ _list.append(f"answer/{group_id}/{user_id}_{t}.jpg")
+ return text, _list
+
+ @classmethod
+ async def _format2answer(
+ cls,
+ problem: str,
+ answer: Union[str, Message],
+ user_id: int,
+ group_id: int,
+ query: Optional["WordBank"] = None,
+ ) -> Union[str, Message]:
+ """
+ 说明:
+ 将占位符转换为CQ码
+ 参数:
+ :param problem: 问题内容
+ :param answer: 回答内容
+ :param user_id: 用户id
+ :param group_id: 群号
+ """
+ if query:
+ answer = query.answer
+ else:
+ query = await cls.query.where(
+ (cls.problem == problem)
+ & (cls.user_qq == user_id)
+ & (cls.group_id == group_id)
+ & (cls.answer == answer)
+ ).gino.first()
+ if query and query.placeholder:
+ type_list = re.findall(rf"\[(.*):placeholder_.*]", answer)
+ temp_answer = re.sub(rf"\[(.*):placeholder_.*]", "{}", answer)
+ seg_list = []
+ for t, p in zip(type_list, query.placeholder.split(",")):
+ if t == "image":
+ seg_list.append(image(path / p))
+ elif t == "face":
+ seg_list.append(face(p))
+ elif t == "at":
+ seg_list.append(at(p))
+ return MessageTemplate(temp_answer, Message).format(*seg_list)
+ return answer
+
+ @classmethod
+ async def check(
+ cls,
+ event: MessageEvent,
+ problem: str,
+ word_scope: Optional[int] = None,
+ word_type: Optional[int] = None,
+ ) -> Optional[Any]:
+ """
+ 说明:
+ 检测是否包含该问题并获取所有回答
+ 参数:
+ :param event: event
+ :param problem: 问题内容
+ :param word_scope: 词条范围
+ :param word_type: 词条类型
+ """
+ query = cls.query
+ sql_text = "SELECT * FROM public.word_bank where 1 = 1"
+ # 救命!!没找到gino的正则表达式方法,暂时使用sql语句
+ if isinstance(event, GroupMessageEvent):
+ if word_scope:
+ query = query.where(cls.word_scope == word_scope)
+ sql_text += f" and word_scope = {word_scope}"
+ else:
+ query = query.where(
+ (cls.group_id == event.group_id) | (cls.word_scope == 0)
+ )
+ sql_text += f" and (group_id = {event.group_id} or word_scope = 0)"
+ else:
+ query = query.where((cls.word_scope == 2) | (cls.word_scope == 0))
+ sql_text += f" and (word_scope = 2 or word_scope = 0)"
+ if word_type:
+ query = query.where(cls.word_scope == word_type)
+ sql_text += f" and word_scope = {word_scope}"
+ # 完全匹配
+ if await query.where(cls.problem == problem).gino.first():
+ return query.where(cls.problem == problem)
+ # 正则匹配
+ if await db.first(
+ db.text(sql_text + f" and word_type = 2 and '{problem}' ~ problem;")
+ ):
+ return sql_text + f" and word_type = 2 and '{problem}' ~ problem;"
+ # 模糊匹配
+ if await db.first(
+ db.text(sql_text + f" and word_type = 1 and '{problem}' ~ problem;")
+ ):
+ return sql_text + f" and word_type = 1 and '{problem}' ~ problem;"
+ return None
+
+ @classmethod
+ async def get_answer(
+ cls,
+ event: MessageEvent,
+ problem: str,
+ word_scope: Optional[int] = None,
+ word_type: Optional[int] = None,
+ ) -> Optional[Union[str, Message]]:
+ """
+ 说明:
+ 根据问题内容获取随机回答
+ 参数:
+ :param event: event
+ :param problem: 问题内容
+ :param word_scope: 词条范围
+ :param word_type: 词条类型
+ """
+ query = await cls.check(event, problem, word_scope, word_type)
+ if query is not None:
+ if isinstance(query, str):
+ answer_list = await db.all(db.text(query))
+ answer = random.choice(answer_list)
+ return (
+ await cls._format2answer(problem, answer[7], answer[1], answer[2])
+ if answer.placeholder
+ else answer.answer
+ )
+ else:
+ answer_list = await query.gino.all()
+ answer = random.choice(answer_list)
+ return (
+ await cls._format2answer(
+ problem, answer.answer, answer.user_qq, answer.group_id
+ )
+ if answer.placeholder
+ else answer.answer
+ )
+
+ @classmethod
+ async def get_problem_all_answer(
+ cls,
+ problem: str,
+ index: Optional[int] = None,
+ group_id: Optional[int] = None,
+ word_scope: Optional[int] = 0,
+ ) -> List[Union[str, Message]]:
+ """
+ 说明:
+ 获取指定问题所有回答
+ 参数:
+ :param problem: 问题
+ :param index: 下标
+ :param group_id: 群号
+ :param word_scope: 词条范围
+ """
+ if index is not None:
+ if group_id:
+ problem = (await cls.query.where(cls.group_id == group_id).gino.all())[
+ index
+ ]
+ else:
+ problem = (
+ await cls.query.where(
+ cls.word_scope == (word_scope or 0)
+ ).gino.all()
+ )[index]
+ problem = problem.problem
+ answer = cls.query.where(cls.problem == problem)
+ if group_id:
+ answer = answer.where(cls.group_id == group_id)
+ return [
+ await cls._format2answer("", "", 0, 0, x) for x in (await answer.gino.all())
+ ]
+
+ @classmethod
+ async def delete_group_problem(
+ cls,
+ problem: str,
+ group_id: int,
+ index: Optional[int] = None,
+ word_scope: int = 1,
+ ):
+ """
+ 说明:
+ 删除指定问题全部或指定回答
+ 参数:
+ :param problem: 问题文本
+ :param group_id: 群号
+ :param index: 回答下标
+ :param word_scope: 词条范围
+ """
+ if index is not None:
+ if group_id:
+ query = await cls.query.where(
+ (cls.group_id == group_id) & (cls.problem == problem)
+ ).gino.all()
+ else:
+ query = await cls.query.where(
+ (cls.word_scope == 0) & (cls.problem == problem)
+ ).gino.all()
+ await query[index].delete()
+ else:
+ if group_id:
+ await WordBank.delete.where(
+ (cls.group_id == group_id) & (cls.problem == problem)
+ ).gino.status()
+ else:
+ await WordBank.delete.where(
+ (cls.word_scope == word_scope) & (cls.problem == problem)
+ ).gino.status()
+
+ @classmethod
+ async def update_group_problem(
+ cls,
+ problem: str,
+ replace_str: str,
+ group_id: int,
+ index: Optional[int] = None,
+ word_scope: int = 1,
+ ):
+ """
+ 说明:
+ 修改词条问题
+ 参数:
+ :param problem: 问题
+ :param replace_str: 替换问题
+ :param group_id: 群号
+ :param index: 下标
+ :param word_scope: 词条范围
+ """
+ if index is not None:
+ if group_id:
+ query = await cls.query.where(
+ (cls.group_id == group_id) & (cls.problem == problem)
+ ).gino.all()
+ else:
+ query = await cls.query.where(
+ (cls.word_scope == word_scope) & (cls.problem == problem)
+ ).gino.all()
+ await query[index].update(problem=replace_str).apply()
+ else:
+ if group_id:
+ await WordBank.update.values(problem=replace_str).where(
+ (cls.group_id == group_id) & (cls.problem == problem)
+ ).gino.status()
+ else:
+ await WordBank.update.values(problem=replace_str).where(
+ (cls.word_scope == word_scope) & (cls.problem == problem)
+ ).gino.status()
+
+ @classmethod
+ async def get_group_all_problem(
+ cls, group_id: int
+ ) -> List[Tuple[Any, Union[MessageSegment, str]]]:
+ """
+ 说明:
+ 获取群聊所有词条
+ 参数:
+ :param group_id: 群号
+ """
+ return cls._handle_problem(
+ await cls.query.where(cls.group_id == group_id).gino.all()
+ )
+
+ @classmethod
+ async def get_problem_by_scope(cls, word_scope: int):
+ """
+ 说明:
+ 通过词条范围获取词条
+ 参数:
+ :param word_scope: 词条范围
+ """
+ return cls._handle_problem(
+ await cls.query.where(cls.word_scope == word_scope).gino.all()
+ )
+
+ @classmethod
+ async def get_problem_by_type(cls, word_type: int):
+ """
+ 说明:
+ 通过词条类型获取词条
+ 参数:
+ :param word_type: 词条类型
+ """
+ return cls._handle_problem(
+ await cls.query.where(cls.word_type == word_type).gino.all()
+ )
+
+ @classmethod
+ def _handle_problem(cls, msg_list: List[Union[str, MessageSegment]]):
+ """
+ 说明:
+ 格式化处理问题
+ 参数:
+ :param msg_list: 消息列表
+ """
+ _tmp = []
+ problem_list = []
+ for q in msg_list:
+ if q.problem not in _tmp:
+ problem = (
+ q.problem,
+ image(path / q.image_path)
+ if q.image_path
+ else f"[{int2type[q.word_type]}] " + q.problem,
+ )
+ problem_list.append(problem)
+ _tmp.append(q.problem)
+ return problem_list
diff --git a/plugins/word_bank/_old_model.py b/plugins/word_bank/_old_model.py
new file mode 100644
index 000000000..84678def0
--- /dev/null
+++ b/plugins/word_bank/_old_model.py
@@ -0,0 +1,20 @@
+from services.db_context import db
+from typing import List
+
+
+class WordBank(db.Model):
+ __tablename__ = "word_bank"
+
+ user_qq = db.Column(db.BigInteger(), nullable=False)
+ group_id = db.Column(db.Integer())
+ search_type = db.Column(db.Integer(), nullable=False, default=0)
+ problem = db.Column(db.String(), nullable=False)
+ answer = db.Column(db.String(), nullable=False)
+ format = db.Column(db.String())
+ create_time = db.Column(db.DateTime(), nullable=False)
+ update_time = db.Column(db.DateTime(), nullable=False)
+
+ @classmethod
+ async def get_all(cls) -> List['WordBank']:
+ return await cls.query.gino.all()
+
diff --git a/plugins/word_bank/_rule.py b/plugins/word_bank/_rule.py
index a452aa20f..91f8f4989 100644
--- a/plugins/word_bank/_rule.py
+++ b/plugins/word_bank/_rule.py
@@ -1,18 +1,31 @@
-import re
-from nonebot.adapters.onebot.v11 import GroupMessageEvent, Event
-from utils.utils import get_message_img_file
-from .model import WordBank
+import random
+from nonebot.adapters.onebot.v11 import MessageEvent
-async def check(event: Event) -> bool:
- if isinstance(event, GroupMessageEvent):
- msg = event.raw_message
- list_img = get_message_img_file(event.json())
- if list_img:
- for img_file in list_img:
- strinfo = re.compile(f"{img_file},.*?]")
- msg = strinfo.sub(f'{img_file}]', msg)
- strinfo_face = re.compile(f",type=sticker]")
- msg = strinfo_face.sub(f']', msg)
- return bool(await WordBank.check(event.group_id, msg,))
+from configs.path_config import TEMP_PATH
+from utils.image_utils import get_img_hash
+from utils.utils import get_message_text, get_message_img, get_message_at
+from ._model import WordBank
+from utils.http_utils import AsyncHttpx
+
+
+async def check(event: MessageEvent) -> bool:
+ text = get_message_text(event.message)
+ img = get_message_img(event.message)
+ at = get_message_at(event.message)
+ rand = random.randint(1, 100)
+ problem = text
+ if not text and len(img) == 1:
+ if await AsyncHttpx.download_file(img[0], TEMP_PATH / f"{event.user_id}_{rand}_word_bank_check.jpg"):
+ problem = str(get_img_hash(TEMP_PATH / f"{event.user_id}_{rand}_word_bank_check.jpg"))
+ if at:
+ temp = ''
+ for seg in event.message:
+ if seg.type == 'at':
+ temp += f"[at:{seg.data['qq']}]"
+ else:
+ temp += seg
+ problem = temp
+ if problem:
+ return await WordBank.check(event, problem) is not None
return False
diff --git a/plugins/word_bank/message_handle.py b/plugins/word_bank/message_handle.py
index 43b41cd9a..7aa7bd8df 100644
--- a/plugins/word_bank/message_handle.py
+++ b/plugins/word_bank/message_handle.py
@@ -1,14 +1,14 @@
-from utils.message_builder import image, at, face
-from typing import Tuple
+import random
+
+from services import logger
+from utils.image_utils import get_img_hash
from ._rule import check
-from .model import WordBank
-from configs.path_config import DATA_PATH
-from nonebot.adapters.onebot.v11 import GroupMessageEvent
-from utils.utils import get_message_at, get_message_img, change_img_md5
+from ._model import WordBank
+from configs.path_config import DATA_PATH, TEMP_PATH
+from nonebot.adapters.onebot.v11 import GroupMessageEvent, MessageEvent
+from utils.utils import get_message_img, get_message_text, get_message_at
from nonebot import on_message
-from models.group_member_info import GroupInfoUser
-from utils.utils import get_message_img_file, is_number
-import re
+from utils.http_utils import AsyncHttpx
__zx_plugin_name__ = "词库问答回复操作 [Hidden]"
@@ -19,129 +19,33 @@
@message_handle.handle()
-async def _(event: GroupMessageEvent):
- msg = event.raw_message
- list_img = get_message_img_file(event.json())
- if list_img:
- for img_file in list_img:
- strinfo = re.compile(f"{img_file},.*?]")
- msg = strinfo.sub(f'{img_file}]', msg)
- strinfo_face = re.compile(f",type=sticker]")
- msg = strinfo_face.sub(f']', msg)
- q = await WordBank.check(event.group_id, msg, )
- await message_handle.send(await get_one_answer(event, q.format, q.answer))
-
-
-# 处理单条回答
-async def get_one_answer(event, format: str, _answer: str, all: bool = True) -> str:
- path = data_dir / f"{event.group_id}"
- placeholder_list = (
- [
- (x.split("<_s>")[0], x.split("<_s>")[1])
- for x in format.split("")[:-1]
- ]
- if format
- else []
- )
- answer = ""
- _a = _answer
- if not placeholder_list:
- answer = _a
- return answer
- else:
- for idx, placeholder in placeholder_list:
- if placeholder.endswith("jpg"):
- change_img_md5(path / placeholder)
- answer += _a[: _a.find(f"[__placeholder_{idx}]")] + image(
- path / placeholder
- )
+async def _(event: MessageEvent):
+ text = get_message_text(event.message)
+ img = get_message_img(event.message)
+ at = get_message_at(event.message)
+ problem = None
+ if not text and img and len(img) == 1:
+ rand = random.randint(1, 10000)
+ if await AsyncHttpx.download_file(img[0], TEMP_PATH / f"{event.user_id}_{rand}_word_bank.jpg"):
+ problem = str(get_img_hash(TEMP_PATH / f"{event.user_id}_{rand}_word_bank.jpg"))
+ elif at:
+ temp = ''
+ for seg in event.message:
+ if seg.type == 'at':
+ temp += f"[at:{seg.data['qq']}]"
else:
- if all:
- answer += _a[: _a.find(f"[__placeholder_{idx}]")] + at(int(placeholder))
- else:
- q = await GroupInfoUser.get_member_info(
- int(placeholder), event.group_id)
- answer += _a[: _a.find(f"[__placeholder_{idx}]")] + "@" + q.user_name
- _a = _a[_a.find(f"[__placeholder_{idx}]") + len(f"[__placeholder_{idx}]"):]
- return answer + _a
-
-
-# 处理单条问题
-async def get_one_problem(event, problem: str, ) -> Tuple[str, str]:
- strinfo = re.compile(f",subType=\d")
- problem = strinfo.sub('', problem)
- _problem = problem
- _p = problem
- problem = ''
- for img in get_message_img(event.json()):
- _x = img.split("?")[0]
- r = re.search(rf"\[CQ:image,file=(.*),url={_x}.*?]", _p)
- if r:
- _problem = _problem.replace(
- rf",url={img}",
- f"",
+ temp += seg
+ problem = temp
+ elif text:
+ problem = text
+ if problem:
+ if msg := await WordBank.get_answer(event, problem):
+ await message_handle.send(msg)
+ logger.info(
+ f"(USER {event.user_id}, GROUP "
+ f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
+ f" 触发词条 {problem}"
)
- problem += _p[: _p.find(f"[CQ:image,file={r.group(1)},url={img}]")] + image(img)
- _p = _p[
- _p.find(f"[CQ:image,file={r.group(1)},url={img}]") + len(f"[CQ:image,file={r.group(1)},url={img}]"):]
- for at_ in get_message_at(event.json()):
- r = re.search(rf"\[CQ:at,qq={at_}]", problem)
- if r:
- q = await GroupInfoUser.get_member_info(
- int(at_), event.group_id)
- problem += _p[: _p.find(f"[CQ:at,qq={at_}]")] + "@" + q.user_name
- _p = _p[_p.find(f"[CQ:at,qq={at_}]") + len(f"[CQ:at,qq={at_}]"):]
- return _problem, problem + _p
-
-
-# 显示单条数据库问题
-async def get_one_image_problem(event, problem: str) -> str:
- path = data_dir / f"{event.group_id}" / "problem"
- placeholder_list = []
- idx = 0
- img_list = re.findall(rf"\[CQ:image,file=(.*?)]", problem)
- at_list = re.findall(rf"\[CQ:at,qq=(.*?)]", problem)
- if img_list:
- for img in img_list:
- problem = problem.replace(f'[CQ:image,file={img}]', f'[__placeholder_{idx}]', 1)
- placeholder_list.append([idx, img])
- idx += 1
- if at_list:
- for ats in at_list:
- problem = problem.replace(f'[CQ:at,qq={ats}]', f'[__placeholder_{idx}]', 1)
- placeholder_list.append([idx, ats])
- idx += 1
- _p = problem
- problem = ''
- if not placeholder_list:
- problem = _p
- return problem
- else:
- for idx, placeholder in placeholder_list:
- if is_number(placeholder):
- q = await GroupInfoUser.get_member_info(
- int(placeholder), event.group_id)
- problem += _p[: _p.find(f"[__placeholder_{idx}]")] + "@" + q.user_name
- else:
- problem += _p[: _p.find(f"[__placeholder_{idx}]")] + image(
- path / f"{placeholder}.jpg"
- )
- _p = _p[_p.find(f"[__placeholder_{idx}]") + len(f"[__placeholder_{idx}]"):]
- return problem + _p
-# 替换cq码
-async def replace_cq(group_id, msg: str, is_face: bool = True) -> str:
- strinfo_img = re.compile(f"\[CQ:image.*?]")
- msg = strinfo_img.sub('[图片]', msg)
- at_list = re.findall(rf"\[CQ:at,qq=(.*?)]", msg)
- if at_list:
- for ats in at_list:
- q = await GroupInfoUser.get_member_info(
- int(ats), group_id)
- msg = msg.replace(f'[CQ:at,qq={ats}]', "@" + q.user_name)
- if is_face:
- strinfo_face = re.compile(f"\[CQ:face,id=.*?]")
- msg = strinfo_face.sub('[表情]', msg)
- return msg
diff --git a/plugins/word_bank/model.py b/plugins/word_bank/model.py
deleted file mode 100644
index 5408782a6..000000000
--- a/plugins/word_bank/model.py
+++ /dev/null
@@ -1,269 +0,0 @@
-from services.db_context import db
-from typing import Optional, List, Union, Tuple
-from datetime import datetime
-from configs.path_config import DATA_PATH
-import random
-from configs.config import Config
-
-
-class WordBank(db.Model):
- __tablename__ = "word_bank"
-
- user_qq = db.Column(db.BigInteger(), nullable=False)
- group_id = db.Column(db.Integer())
- search_type = db.Column(db.Integer(), nullable=False, default=0)
- problem = db.Column(db.String(), nullable=False)
- answer = db.Column(db.String(), nullable=False)
- format = db.Column(db.String())
- create_time = db.Column(db.DateTime(), nullable=False)
- update_time = db.Column(db.DateTime(), nullable=False)
-
- @classmethod
- async def add_problem_answer(
- cls,
- user_id: int,
- group_id: Optional[int],
- search_type: [int],
- problem: str,
- answer: str,
- format_: Optional[List[Tuple[int, Union[int, str]]]],
- ) -> bool:
- """
- 添加或新增一个问答
- :param user_id: 用户id
- :param group_id: 群号
- :search_type: 问题类型,
- :param problem: 问题
- :param answer: 回答
- :param format_: 格式化数据
- """
- _str = None
- if format_:
- _str = ""
- for x, y in format_:
- _str += f"{x}<_s>{y}"
- return await cls._problem_answer_handle(
- user_id, group_id, problem, "add", search_type=search_type, answer=answer, format_=_str
- )
-
- @classmethod
- async def delete_problem_answer(
- cls, user_id: int, group_id: Optional[int], problem: str, index: Optional[int]
- ) -> str:
- """
- 删除某问题一个或全部回答
- :param user_id: 用户id
- :param group_id: 群号
- :param problem: 问题
- :param index: 回答下标
- """
- return await cls._problem_answer_handle(
- user_id, group_id, problem, "delete", index=index
- )
-
- @classmethod
- async def update_problem_answer(
- cls,
- user_id: int,
- group_id: Optional[int],
- problem: str,
- answer: str,
- index: Optional[int],
- format_: Optional[List[Tuple[int, Union[int, str]]]],
- ) -> str:
- """
- 修改某问题一个或全部回答
- :param user_id: 用户id
- :param group_id: 群号
- :param problem: 问题
- :param index: 回答下标
- """
- _str = None
- if format_:
- _str = ""
- for x, y in format_:
- _str += f"{x}<_s>{y}"
- return await cls._problem_answer_handle(
- user_id, group_id, problem, "update", answer=answer, index=index, format_=_str
- )
-
- @classmethod
- async def get_problem_answer(
- cls, user_id: int, group_id: Optional[int], problem: str
- ) -> List[str]:
- """
- 获取问题的所有回答
- :param user_id: 用户id
- :param group_id: 群号
- :param problem: 问题
- """
- return await cls._problem_answer_handle(user_id, group_id, problem, "get")
-
- @classmethod
- async def get_group_all_answer(cls, group_id: int, problem: str) -> List[str]:
- """
- 获取群聊指定词条所有回答
- :param group_id: 群号
- :param problem: 问题
- """
- q = await cls.query.where(
- (cls.group_id == group_id) & (cls.problem == problem)
- ).gino.all()
-
- return [(x.answer, x.format) for x in q] if q else None
-
- @classmethod
- async def get_group_all_problem(cls, group_id: int) -> List[str]:
- """
- 获取群聊所有词条
- :param group_id: 群号
- """
- q = await cls.query.where(cls.group_id == group_id).gino.all()
- q = [x.problem for x in q]
- q.sort()
- _tmp = []
- for problem in q:
- _tmp.append(problem)
- return list(set(_tmp))
-
- @classmethod
- async def check(cls, group_id: int, problem: str) -> Optional["WordBank"]:
- """
- 检测词条并随机返回
- :param group_id: 群号
- :param problem: 问题
- """
- if problem:
- FUZZY = Config.get_config("word_bank", "WORD_BANK_FUZZY")
- KEY = Config.get_config("word_bank", "WORD_BANK_KEY")
- q = await cls.query.where(
- (cls.group_id == group_id) & (cls.problem == problem)
- ).gino.all()
- if KEY and FUZZY:
- q_fuzzy = await cls.query.where(
- (cls.group_id == group_id) & (cls.search_type == 2) & (
- cls.problem.contains(f'{problem}'))).gino.all()
- q_key = await cls.query.where((cls.group_id == group_id) & (cls.search_type == 1)).gino.all()
- q_key = [x for x in q_key if str(x.problem) in (problem)]
- q += q_fuzzy + q_key
- elif FUZZY:
- q_fuzzy = await cls.query.where(
- (cls.group_id == group_id) & (cls.search_type == 2) & (
- cls.problem.contains(f'{problem}'))).gino.all()
- q += q_fuzzy
- elif KEY:
- q_key = await cls.query.where((cls.group_id == group_id) & (cls.search_type == 1)).gino.all()
- q_key = [x for x in q_key if str(x.problem) in (problem)]
- q += q_key
- else:
- return None
-
- return random.choice(q) if q else None
-
- @classmethod
- async def _problem_answer_handle(
- cls,
- user_id: int,
- group_id: Optional[int],
- problem: str,
- type_: str,
- *,
- search_type: [int] = 0,
- answer: Optional[str] = None,
- index: Optional[int] = None,
- format_: Optional[str] = None,
- ) -> Union[List[Union[str, Tuple[str, str]]], bool, str]:
- """
- 添加或新增一个问答
- :param user_id: 用户id
- :param group_id: 群号
- :param problem: 问题
- :param type_: 操作类型
- :param answer: 回答
- :param format_: 格式化数据
- """
- if problem.startswith("id:"):
- problem_index = int(problem.split(":")[-1])
- q = await cls.get_group_all_problem(group_id)
- if not q:
- return []
- if len(q) > problem_index:
- problem = q[problem_index]
- if group_id:
- q = cls.query.where((cls.group_id == group_id) & (cls.problem == problem))
- else:
- q = cls.query.where((cls.user_qq == user_id) & (cls.problem == problem))
- if type_ == "add":
- q = await q.where((cls.answer == answer) & (cls.search_type == search_type)).gino.all()
- try:
- if not q or ".jpg" in format_:
- await cls.create(
- user_qq=user_id,
- group_id=group_id,
- search_type=search_type,
- problem=problem,
- answer=answer,
- format=format_,
- create_time=datetime.now().replace(microsecond=0),
- update_time=datetime.now().replace(microsecond=0),
- )
- except:
- return False
- return True
- elif type_ == "delete":
- q = await q.with_for_update().gino.all()
- if q:
- path = DATA_PATH / "word_bank" / f"{group_id}"
- if index is not None:
- q = [q[index]]
- answer = "\n".join([x.answer for x in q])
- for x in q:
- format_ = x.format
- if format_:
- for sp in format_.split("")[:-1]:
- _, image_name = sp.split("<_s>")
- if image_name.endswith("jpg"):
- _path = path / image_name
- if _path.exists():
- _path.unlink()
- await cls.delete.where(
- (cls.update_time == x.update_time)
- & (cls.problem == problem)
- & (cls.answer == x.answer)
- & (cls.group_id == group_id)
- ).gino.status()
- return answer
- elif type_ == "update":
- new_format = format_
- new_answer = answer
- q = await q.with_for_update().gino.all()
- if q:
- path = DATA_PATH / "word_bank" / f"{group_id}"
- if index is not None:
- q = [q[index]]
- else:
- q = [q[0]]
- for x in q:
- format_ = x.format
- if format_:
- for sp in format_.split("")[:-1]:
- _, image_name = sp.split("<_s>")
- if image_name.endswith("jpg"):
- _path = path / image_name
- if _path.exists():
- _path.unlink()
- await cls.update.values(answer=new_answer,
- format=new_format,
- update_time=datetime.now().replace(microsecond=0), ).where(
- (cls.problem == problem)
- & (cls.answer == x.answer)
- & (cls.group_id == group_id)
- & (cls.group_id == group_id)
- & (cls.update_time == x.update_time)
- ).gino.status()
- return True
- elif type_ == "get":
- q = await q.gino.all()
- if q:
- return [(x.answer, x.format.split("")[:-1]) for x in q]
- return False
diff --git a/plugins/word_bank/word_handle.py b/plugins/word_bank/word_handle.py
new file mode 100644
index 000000000..f729883fb
--- /dev/null
+++ b/plugins/word_bank/word_handle.py
@@ -0,0 +1,305 @@
+from typing import Tuple, Any, Optional
+
+from nonebot.internal.params import Arg, ArgStr
+from nonebot.typing import T_State
+
+from utils.utils import get_message_at, is_number, get_message_img
+from nonebot.params import CommandArg, RegexGroup, Command
+from services.log import logger
+from configs.path_config import DATA_PATH
+from utils.message_builder import custom_forward_msg
+from ._model import WordBank
+from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Message, MessageEvent, PrivateMessageEvent
+from nonebot import on_command, on_regex
+from configs.config import Config
+from ._data_source import delete_word, update_word, show_word
+from ._config import scope2int, type2int
+
+__zx_plugin_name__ = "词库问答 [Admin]"
+__plugin_usage__ = r"""
+usage:
+ 对指定问题的随机回答,对相同问题可以设置多个不同回答
+ 删除词条后每个词条的id可能会变化,请查看后再删除
+ 更推荐使用id方式删除
+ 问题回答支持的CQ:at, face, image
+ 查看词条命令:群聊时为 群词条+全局词条,私聊时为 私聊词条+全局词条
+ 添加词条正则:添加词条(模糊|正则|图片)?问\s*?(\S*)\s*?答\s?(\S*)
+ 指令:
+ 添加词条 ?[模糊|正则|图片]问...答...:添加问答词条,可重复添加相同问题的不同回答
+ 删除词条 [问题/下标] ?[下标]:删除指定词条指定或全部回答
+ 修改词条 [问题/下标] [新问题]:修改词条问题
+ 查看词条 ?[问题/下标]:查看全部词条或对应词条回答
+ 示例:添加词条问图片答嗨嗨嗨
+ [图片]...
+ 示例:添加词条@萝莉 我来啦
+ 示例:添加词条问谁是萝莉答是我
+ 示例:删除词条 谁是萝莉
+ 示例:删除词条 谁是萝莉 0
+ 示例:删除词条 id:0 1
+ 示例:修改词条 谁是萝莉 是你
+ 示例:修改词条 id:0 是你
+ 示例:查看词条
+ 示例:查看词条 谁是萝莉
+ 示例:查看词条 id:0 (群/私聊词条)
+ 示例:查看词条 gid:0 (全局词条)
+""".strip()
+__plugin_superuser_usage__ = r"""
+usage:
+ 在私聊中超级用户额外设置
+ 指令:
+ (全局|私聊)?添加词条\s*?(模糊|正则|图片)?问\s*?(\S*)\s*?答\s?(\S*):添加问答词条,可重复添加相同问题的不同回答
+ 全局添加词条
+ 私聊添加词条
+ (私聊情况下)删除词条: 删除私聊词条
+ (私聊情况下)删除全局词条
+ (私聊情况下)修改词条: 修改词条私聊词条
+ (私聊情况下)修改全局词条
+ 用法与普通用法相同
+""".strip()
+__plugin_des__ = "自定义词条内容随机回复"
+__plugin_cmd__ = [
+ "添加词条 ?[模糊/关键字]问...答..",
+ "删除词条 [问题/下标] ?[下标]",
+ "修改词条 [问题/下标] ?[下标/新回答] [新回答]",
+ "查看词条 ?[问题/下标]",
+]
+__plugin_version__ = 0.3
+__plugin_author__ = "HibiKier & yajiwa"
+__plugin_settings__ = {
+ "admin_level": Config.get_config("word_bank", "WORD_BANK_LEVEL [LEVEL]"),
+ "cmd": ["词库问答", "添加词条", "删除词条", "修改词条", "查看词条"],
+}
+
+data_dir = DATA_PATH / "word_bank"
+data_dir.mkdir(parents=True, exist_ok=True)
+
+add_word = on_regex(
+ r"^(全局|私聊)?添加词条\s*?(模糊|正则|图片)?问\s*?(\S*)\s*?答\s?(\S*)", priority=5, block=True
+)
+
+delete_word_matcher = on_command("删除词条", aliases={'删除全局词条'}, priority=5, block=True)
+
+update_word_matcher = on_command("修改词条", aliases={'修改全局词条'}, priority=5, block=True)
+
+show_word_matcher = on_command("显示词条", aliases={"查看词条"}, priority=5, block=True)
+
+
+@add_word.handle()
+async def _(
+ bot: Bot,
+ event: MessageEvent,
+ state: T_State,
+ reg_group: Tuple[Any, ...] = RegexGroup(),
+):
+ if str(event.user_id) not in bot.config.superusers:
+ await add_word.finish('权限不足捏')
+ word_scope, word_type, problem, answer = reg_group
+ if (
+ word_scope
+ and word_scope in ["全局", "私聊"]
+ and str(event.user_id) not in bot.config.superusers
+ ):
+ await add_word.finish("权限不足,无法添加该范围词条")
+ if (not problem or not problem.strip()) and word_type != "图片":
+ await add_word.finish("词条问题不能为空!")
+ if (not answer or not answer.strip()) and not len(get_message_img(event.message)):
+ await add_word.finish("词条回答不能为空!")
+ if word_type != "图片":
+ state["problem_image"] = "YES"
+ answer = event.message
+ # 对at问题对额外处理
+ if get_message_at(event.message):
+ for index, seg in enumerate(event.message):
+ if seg.type == 'text' and '答' in str(seg):
+ _problem = event.message[:index]
+ answer = event.message[index:]
+ answer[0] = str(answer[0])[str(answer).index('答')+1:]
+ _problem[0] = str(_problem[0])[str(_problem).index('问')+1:]
+ temp = ''
+ for g in _problem:
+ if isinstance(g, str) or g.type == 'text':
+ temp += g
+ elif g.type == 'at':
+ temp += f"[at:{g.data['qq']}]"
+ problem = temp
+ break
+ index = len((word_scope or "") + "添加词条" + (word_type or "") + problem) + 1
+ event.message[0] = event.message[0].data["text"][index + 1 :].strip()
+ state["word_scope"] = word_scope
+ state["word_type"] = word_type
+ state["problem"] = problem
+ state["answer"] = answer
+
+
+@add_word.got("problem_image", prompt="请发送该回答设置的问题图片")
+async def _(
+ event: MessageEvent,
+ word_scope: Optional[str] = ArgStr("word_scope"),
+ word_type: Optional[str] = ArgStr("word_type"),
+ problem: Optional[str] = ArgStr("problem"),
+ answer: Message = Arg("answer"),
+ problem_image: Message = Arg("problem_image"),
+):
+ try:
+ await WordBank.add_problem_answer(
+ event.user_id,
+ event.group_id if isinstance(event, GroupMessageEvent) and (not word_scope or word_scope == '1') else 0,
+ scope2int[word_scope] if word_scope else 1,
+ type2int[word_type] if word_type else 0,
+ problem or problem_image,
+ answer,
+ )
+ except Exception as e:
+ logger.error(
+ f"(USER {event.user_id}, GROUP "
+ f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
+ f" 添加词条 {problem} 发生错误 {type(e)}: {e} "
+ )
+ await add_word.finish(f"添加词条 {problem} 发生错误!")
+ await add_word.send("添加词条 " + (problem or problem_image) + " 成功!")
+ logger.info(
+ f"(USER {event.user_id}, GROUP "
+ f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
+ f" 添加词条 {problem} 成功!"
+ )
+
+
+@delete_word_matcher.handle()
+async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
+ if not (msg := arg.extract_plain_text().strip()):
+ await delete_word_matcher.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
+ result = await delete_word(msg, event.group_id)
+ await delete_word_matcher.send(result)
+ logger.info(
+ f"(USER {event.user_id}, GROUP "
+ f"{event.group_id})"
+ f" 删除词条:" + msg
+ )
+
+
+@delete_word_matcher.handle()
+async def _(bot: Bot, event: PrivateMessageEvent, arg: Message = CommandArg(), cmd: Tuple[str, ...] = Command()):
+ if str(event.user_id) not in bot.config.superusers:
+ await delete_word_matcher.finish("权限不足捏!")
+ if not (msg := arg.extract_plain_text().strip()):
+ await delete_word_matcher.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
+ result = await delete_word(msg, word_scope=2 if cmd[0] == '删除词条' else 0)
+ await delete_word_matcher.send(result)
+ logger.info(
+ f"(USER {event.user_id})"
+ f" 删除词条:" + msg
+ )
+
+
+@update_word_matcher.handle()
+async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
+ if not (msg := arg.extract_plain_text().strip()):
+ await update_word_matcher.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
+ if len(msg.split()) < 2:
+ await update_word_matcher.finish("此命令需要两个参数,请查看帮助")
+ result = await update_word(msg, event.group_id)
+ await update_word_matcher.send(result)
+ logger.info(
+ f"(USER {event.user_id}, GROUP "
+ f"{event.group_id})"
+ f" 更新词条词条:" + msg
+ )
+
+
+@update_word_matcher.handle()
+async def _(bot: Bot, event: PrivateMessageEvent, arg: Message = CommandArg(), cmd: Tuple[str, ...] = Command()):
+ if str(event.user_id) not in bot.config.superusers:
+ await delete_word_matcher.finish("权限不足捏!")
+ if not (msg := arg.extract_plain_text().strip()):
+ await update_word_matcher.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
+ if len(msg.split()) < 2:
+ await update_word_matcher.finish("此命令需要两个参数,请查看帮助")
+ result = await update_word(msg, word_scope=2 if cmd[0] == '修改词条' else 0)
+ await update_word_matcher.send(result)
+ logger.info(
+ f"(USER {event.user_id})"
+ f" 更新词条词条:" + msg
+ )
+
+
+@show_word_matcher.handle()
+async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
+ if problem := arg.extract_plain_text().strip():
+ id_ = None
+ gid = None
+ if problem.startswith("id:"):
+ id_ = problem.split(":")[-1]
+ if (
+ not is_number(id_)
+ or int(id_) < 0
+ or int(id_)
+ > len(await WordBank.get_group_all_problem(event.group_id))
+ ):
+ await show_word_matcher.finish("id必须为数字且在范围内")
+ id_ = int(id_)
+ if problem.startswith("gid:"):
+ gid = problem.split(":")[-1]
+ if (
+ not is_number(gid)
+ or int(gid) < 0
+ or int(gid)
+ > len(await WordBank.get_problem_by_scope(0))
+ ):
+ await show_word_matcher.finish("gid必须为数字且在范围内")
+ gid = int(gid)
+ msg_list = await show_word(problem, id_, gid, None if gid else event.group_id)
+ else:
+ msg_list = await show_word(problem, None, None, event.group_id)
+ if isinstance(msg_list, str):
+ await show_word_matcher.send(msg_list)
+ else:
+ await bot.send_group_forward_msg(
+ group_id=event.group_id, messages=custom_forward_msg(msg_list, bot.self_id)
+ )
+ logger.info(
+ f"(USER {event.user_id}, GROUP "
+ f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
+ f" 发送查看词条回答:" + problem
+ )
+
+
+@show_word_matcher.handle()
+async def _(event: PrivateMessageEvent, arg: Message = CommandArg()):
+ if problem := arg.extract_plain_text().strip():
+ id_ = None
+ gid = None
+ if problem.startswith("id:"):
+ id_ = problem.split(":")[-1]
+ if (
+ not is_number(id_)
+ or int(id_) < 0
+ or int(id_)
+ > len(await WordBank.get_problem_by_scope(2))
+ ):
+ await show_word_matcher.finish("id必须为数字且在范围内")
+ id_ = int(id_)
+ if problem.startswith("gid:"):
+ gid = problem.split(":")[-1]
+ if (
+ not is_number(gid)
+ or int(gid) < 0
+ or int(gid)
+ > len(await WordBank.get_problem_by_scope(0))
+ ):
+ await show_word_matcher.finish("gid必须为数字且在范围内")
+ gid = int(gid)
+ msg_list = await show_word(problem, id_, gid, word_scope=2 if id_ is not None else None)
+ else:
+ msg_list = await show_word(problem, None, None, word_scope=2)
+ if isinstance(msg_list, str):
+ await show_word_matcher.send(msg_list)
+ else:
+ t = ""
+ for msg in msg_list:
+ t += msg + '\n'
+ await show_word_matcher.send(t[:-1])
+ logger.info(
+ f"(USER {event.user_id}, GROUP "
+ f"private)"
+ f" 发送查看词条回答:" + problem
+ )
diff --git a/plugins/word_bank/word_hanlde.py b/plugins/word_bank/word_hanlde.py
deleted file mode 100644
index a0464586e..000000000
--- a/plugins/word_bank/word_hanlde.py
+++ /dev/null
@@ -1,311 +0,0 @@
-from utils.utils import get_message_at, is_number, get_message_img
-from nonebot.params import CommandArg
-from services.log import logger
-from configs.path_config import DATA_PATH
-from utils.http_utils import AsyncHttpx
-from ._data_source import WordBankBuilder
-from utils.message_builder import image
-from utils.image_utils import text2image
-from .message_handle import get_one_answer, get_one_problem, get_one_image_problem, replace_cq
-from .model import WordBank
-from nonebot.adapters.onebot.v11 import (
- Bot,
- GroupMessageEvent,
- Message
-)
-from nonebot import on_command
-import random
-import os
-import re
-from configs.config import NICKNAME, Config
-from models.group_member_info import GroupInfoUser
-
-__zx_plugin_name__ = "词库问答 [Admin]"
-__plugin_usage__ = """
-usage:
- 对指定问题的随机回答,对相同问题可以设置多个不同回答
- 删除词条后每个词条的id可能会变化,请查看后再删除
- 指令:
- 添加词条 ?[模糊/关键字|词]...答...:添加问答词条,可重复添加相同问题的不同回答
- 删除词条 [问题/下标] ?[下标]:删除指定词条指定或全部回答
- 修改词条 [问题/下标] ?[下标/新回答] [新回答]:修改指定词条指定回答默认修改为第一条
- 查看词条 ?[问题/下标]:查看全部词条或对应词条回答
- 示例:添加词条问谁是萝莉答是我
- 示例:删除词条 谁是萝莉
- 示例:删除词条 谁是萝莉 0
- 示例:删除词条 id:0
- 示例:修改词条 谁是萝莉 是你
- 示例:修改词条 谁是萝莉 0 是你
- 示例:修改词条 id:0 是你
- 示例:查看词条
- 示例:查看词条 谁是萝莉
- 示例:查看词条 id:0
-""".strip()
-__plugin_des__ = "自定义词条内容随机回复"
-__plugin_cmd__ = [
- "添加词条 ?[模糊/关键字]问...答..",
- "删除词条 [问题/下标] ?[下标]",
- "修改词条 [问题/下标] ?[下标/新回答] [新回答]",
- "查看词条 ?[问题/下标]",
-]
-__plugin_version__ = 0.3
-__plugin_author__ = "HibiKier & yajiwa"
-__plugin_settings__ = {
- "admin_level": Config.get_config("word_bank", "WORD_BANK_LEVEL [LEVEL]"),
- "cmd": ["词库问答", "添加词条", "删除词条", "修改词条", "查看词条"],
-}
-
-data_dir = DATA_PATH / "word_bank"
-data_dir.mkdir(parents=True, exist_ok=True)
-
-add_word = on_command("添加词条", priority=5, block=True)
-
-delete_word = on_command("删除词条", priority=5, block=True)
-
-update_word = on_command("修改词条", priority=5, block=True)
-
-show_word = on_command("显示词条", aliases={"查看词条"}, priority=5, block=True)
-
-
-@add_word.handle()
-async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
- msg = str(arg)
- r = re.search(r"问(.+)\s?答([\s\S]*)", msg)
- if not r:
- await add_word.finish("未检测到词条问题...")
- problem = r.group(1).strip()
- if not problem:
- await add_word.finish("未检测到词条问题...")
- answer = msg.split("答", maxsplit=1)[-1]
- if not answer:
- await add_word.finish("未检测到词条回答...")
- idx = 0
- _problem = problem
- search_type = 0
- if re.search("^关键字|词(.*)", msg):
- search_type = 1
- elif re.search("^模糊(.*)", msg):
- search_type = 2
- _builder = await get__builder(event, _problem, answer, idx)
- if await _builder.save(search_type):
- logger.info(f"已保存词条 问:{_builder.problem} 答:{answer}")
- await add_word.send("已保存词条:" + _builder.problem)
- else:
- await delete_word.send("保存失败,可能是回答重复")
-
-
-@delete_word.handle()
-async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
- msg = str(arg)
- if not msg:
- await delete_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
- index = None
- _sp_msg = msg.split()
- if len(_sp_msg) > 1:
- if is_number(_sp_msg[-1]):
- index = int(_sp_msg[-1])
- msg = " ".join(_sp_msg[:-1])
- problem = msg
- if problem.startswith("id:"):
- x = problem.split(":")[-1]
- if not is_number(x) or int(x) < 0:
- await delete_word.finish("id必须为数字且符合规范!")
- p = await WordBank.get_group_all_problem(event.group_id)
- if p:
- problem = p[int(x)]
- try:
- _problem, problem = await get_one_problem(event, problem)
- if answer := await WordBank.delete_problem_answer(
- event.user_id, event.group_id, _problem, index
- ):
-
- await delete_word.send(Message(
- "删除词条成功:\n问" + await replace_cq(event.group_id, problem, False) + f"\n回答:\n" + await replace_cq(
- event.group_id, answer, False) + "\n"))
- logger.info(
- f"(USER {event.user_id}, GROUP "
- f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
- f" 删除词条: {problem}"
- )
- else:
- await delete_word.send("删除词条:" + problem + "失败,可能该词条不存在")
- except IndexError:
- await delete_word.send("指定下标错误...请通过查看词条来确定..")
-
-
-@update_word.handle()
-async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
- msg = str(arg)
- if not msg:
- await update_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
- index = None
- new_answer = None
- problem = None
- _sp_msg = msg.split()
- len_msg = len(_sp_msg)
- if 1 < len_msg:
- problem = "".join(_sp_msg[0])
- if len_msg == 3:
- if is_number(_sp_msg[1]):
- index = int(_sp_msg[1])
- new_answer = "".join(_sp_msg[2:])
- else:
- new_answer = "".join(_sp_msg[1:])
- else:
- await update_word.finish("此命令之后需要跟随修改内容")
- idx = 0
- _problem = problem
- _builder = await get__builder(event, _problem, new_answer, idx)
-
- try:
- if await _builder.update(index):
- await update_word.send(f"修改词条成功:" + _builder.problem)
- logger.info(
- f"(USER {event.user_id}, GROUP "
- f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
- f" 修改词条: {problem}"
- )
- else:
- await update_word.send(f"修改词条:" + _builder.problem + f"失败,可能该词条不存在")
- except IndexError:
- await update_word.send("指定下标错误...请通过查看词条来确定..")
-
-
-@show_word.handle()
-async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
- msg = str(arg).strip()
- if not msg:
- _problem_list = await WordBank.get_group_all_problem(event.group_id)
- if not _problem_list:
- await show_word.finish("该群未收录任replace_cq何词条..")
- _problem_list = [f"\t{i}. {await replace_cq(event.group_id, x)}" for i, x in enumerate(_problem_list)]
- long_problem_list = len(_problem_list)
- max_line = Config.get_config("word_bank", "WORD_BANK_MIX")
- if long_problem_list > max_line:
- pic_list = []
- mes_list = []
- img_nu = long_problem_list // max_line
- one_msg = "该群已收录的词条:"
- await show_word.send(one_msg)
- for i in range(img_nu + 1):
- if _problem_list:
- one_img = image(
- b64=(await text2image("\n".join(_problem_list[:max_line]),
- padding=10,
- color="#f9f6f2",
- )).pic2bs4()
- )
- if img_nu > 2:
- pic_list.append(one_img)
- else:
- await show_word.send(one_img)
- del _problem_list[:max_line]
- if pic_list:
- for img in pic_list:
- data = {
- "type": "node",
- "data": {"name": f"{NICKNAME}", "uin": f"{bot.self_id}", "content": img},
- }
- mes_list.append(data)
- await bot.send_group_forward_msg(group_id=event.group_id, messages=mes_list)
- else:
- await show_word.send(
- image(
- b64=(await text2image(
- "该群已收录的词条:\n\n" + "\n".join(_problem_list),
- padding=10,
- color="#f9f6f2",
- )).pic2bs4()
- )
- )
- else:
- _answer_list = []
- if msg.startswith("id:"):
- x = msg.split(":")[-1]
- if not is_number(x) or int(x) < 0:
- return await delete_word.finish("id必须为数字且符合规范!")
- p = await WordBank.get_group_all_problem(event.group_id)
- if p:
- _problem = p[int(x)]
- _answer_list = await WordBank.get_group_all_answer(event.group_id, _problem)
- msg += '问' + await get_one_image_problem(event, _problem)
- else:
- _problem, msg = await get_one_problem(event, msg)
- _answer_list = await WordBank.get_group_all_answer(event.group_id, _problem)
- if not _answer_list:
- await show_word.send("未收录该词条...")
-
- else:
- # 解析图片和@
- _answer_img_nu_list = [await get_one_answer(event, format, answer, False) for answer, format in
- _answer_list]
- word_nu = len(_answer_img_nu_list)
- img_nu = 0
- answer = "词条" + msg + "\n回答:"
- for i, x, in enumerate(_answer_img_nu_list):
- r = re.findall(rf"\[CQ:image,file=", str(x))
- if r:
- img_nu += len(r)
- answer += "\n" + f"{i}." + x
- if (img_nu > 2 and word_nu > 5) or word_nu > 10 or img_nu > 4:
- data = {
- "type": "node",
- "data": {"name": f"{NICKNAME}", "uin": f"{bot.self_id}", "content": answer},
- }
- await bot.send_group_forward_msg(group_id=event.group_id, messages=data)
- else:
- await show_word.send(answer)
- # await show_word.send(f"词条 {msg} 回答:\n" + "\n".join(_answer_list))
-
-
-async def get__builder(event, _problem: str, answer: str, idx: int):
- (data_dir / f"{event.group_id}").mkdir(exist_ok=True, parents=True)
- (data_dir / f"{event.group_id}" / "problem").mkdir(exist_ok=True, parents=True)
- _builder = WordBankBuilder(event.user_id, event.group_id, _problem)
- problem = ''
- _p = _problem
- for at_ in get_message_at(event.json()):
- r = re.search(rf"\[CQ:at,qq={at_}]", answer)
- if r:
- answer = answer.replace(f"[CQ:at,qq={at_}]", f"[__placeholder_{idx}]", 1)
- _builder.set_placeholder(idx, at_)
- idx += 1
- r_problem = re.search(rf"\[CQ:at,qq={at_}]", _problem)
- if r_problem:
- q = await GroupInfoUser.get_member_info(
- int(at_), event.group_id)
- problem += _p[: _p.find(f"[CQ:at,qq={at_}]")] + "@" + q.user_name
- _p = _p[_p.find(f"[CQ:at,qq={at_}]") + len(f"[CQ:at,qq={at_}]"):]
- for img in get_message_img(event.json()):
- _x = img.split("?")[0]
- _x_list = img.split("?")
- r = re.search(rf"\[CQ:image,file=(.*),url={_x}.*?]", answer)
- if r:
- rand = random.randint(1, 10000) + random.randint(1, 114514)
- for _ in range(10):
- if f"__placeholder_{rand}_{idx}.jpg" not in os.listdir(data_dir / f"{event.group_id}"):
- break
- rand = random.randint(1, 10000) + random.randint(1, 114514)
- strinfo = re.compile(f"\[CQ:image,file={r.group(1)},.*url={_x_list[0]}\?{_x_list[1]}.*?]")
- answer = strinfo.sub(f"[__placeholder_{idx}]", answer)
- await AsyncHttpx.download_file(
- img, data_dir / f"{event.group_id}" / f"__placeholder_{rand}_{idx}.jpg"
- )
- _builder.set_placeholder(idx, f"__placeholder_{rand}_{idx}.jpg")
- idx += 1
- r_problem = re.search(rf"\[CQ:image,file=(.*?)(,subType=\d)?,url={_x}.*?]", _p)
- if r_problem:
- strinfo = re.compile(f"(,subType=\d)?,url={_x_list[0]}\?{_x_list[1]}.*?]")
- _problem = strinfo.sub(f"]", _problem)
- _p = strinfo.sub(f"]", _p)
- problem += _p[: _p.find(f"[CQ:image,file={r_problem.group(1)}]")] + image(img)
- _p = _p[_p.find(f"[CQ:image,file={r_problem.group(1)}]") + len(f"[CQ:image,file={r_problem.group(1)}]"):]
- problem_img = r_problem.group(1)
- if f"{problem_img}.jpg" not in os.listdir(data_dir / f"{event.group_id}" / f"problem"):
- await AsyncHttpx.download_file(
- img, data_dir / f"{event.group_id}" / f"problem" / f"{problem_img}.jpg"
- )
- _builder.set_answer(answer)
- _builder.set_problem(_problem)
- _builder.problem = problem + _p
- return _builder
diff --git a/resources/font/yuanshen.ttf b/resources/font/yuanshen.ttf
deleted file mode 100644
index 49d242530..000000000
Binary files a/resources/font/yuanshen.ttf and /dev/null differ
diff --git a/update_info.json b/update_info.json
index f6245b00f..a0f81caca 100644
--- a/update_info.json
+++ b/update_info.json
@@ -8,10 +8,7 @@
"configs/path_config.py",
"configs/utils",
"poetry.lock",
- "pyproject.toml",
- "resources/font",
- "resources/image/zhenxun",
- "resources/image/other"
+ "pyproject.toml"
],
"add_file": [],
"delete_file": []