Skip to content

Commit

Permalink
✨简化下载流程
Browse files Browse the repository at this point in the history
✨更改识别流程
✨简化配置
✨增加私聊发送文件
  • Loading branch information
kitUIN committed Dec 10, 2022
1 parent 40120e0 commit d837ee7
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 158 deletions.
266 changes: 162 additions & 104 deletions nonebot-plugin-ncm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,106 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Tuple, Any, Union

import nonebot
from nonebot.rule import Rule
from nonebot.log import logger
from nonebot import on_regex, on_command, on_message
from nonebot.adapters.onebot.v11 import (Message, Bot,
MessageSegment,
ActionFailed,
GroupMessageEvent,
PrivateMessageEvent)
import re

from nonebot.log import logger
from nonebot.matcher import Matcher
from nonebot.params import CommandArg, RegexGroup, Arg
from .data_source import nncm, music, ncm_config, playlist, setting, Q, cmd
from nonebot.rule import Rule

from .data_source import nncm, ncm_config, setting, Q, cmd

# =======nonebot-plugin-help=======
__plugin_meta__ = nonebot.plugin.PluginMetadata(
name='✨ 基于go-cqhttp与nonebot2的 网易云 无损音乐下载 ✨',
description='您的简单插件描述',
description='网易云 无损音乐下载',
usage=f'''将网易云歌曲/歌单分享到群聊即可自动解析\n回复机器人解析消息即可自动下载(需要时间)\n
{cmd}ncm t 开启解析\n{cmd}ncm t 关闭解析\n{cmd}点歌 歌名:点歌''',
extra={'version': '1.4.0'}
{cmd}ncm t:开启解析\n{cmd}ncm t:关闭解析\n{cmd}点歌 歌名:点歌''',
extra={'version': '1.5.0'}
)


# ========nonebot-plugin-ncm======
# ===========Constant=============
TRUE = ["True", "T", "true", "t"]
FALSE = ["False", "F", "false", "f"]
ADMIN = ["owner", "admin", "member"]


# ===============Rule=============
async def song_is_open(event: GroupMessageEvent) -> bool:
info = setting.search(Q["group_id"] == event.dict()["group_id"])
if info:
if info[0]["song"]:
return ncm_config.ncm_song
else:
setting.insert({"group_id": event.dict()["group_id"], "song": False, "list": False})
return False
async def song_is_open(event: Union[GroupMessageEvent, PrivateMessageEvent]) -> bool:
if isinstance(event, GroupMessageEvent):
info = setting.search(Q["group_id"] == event.group_id)
if info:
return info[0]["song"]
else:
setting.insert({"group_id": event.group_id, "song": False, "list": False})
return False
elif isinstance(event, PrivateMessageEvent):
info = setting.search(Q["user_id"] == event.user_id)
if info:
return info[0]["song"]
else:
setting.insert({"user_id": event.user_id, "song": True, "list": True})
return True


async def playlist_is_open(event: GroupMessageEvent) -> bool:
info = setting.search(Q["group_id"] == event.dict()["group_id"])
async def playlist_is_open(event: Union[GroupMessageEvent, PrivateMessageEvent]) -> bool:
if isinstance(event, GroupMessageEvent):
info = setting.search(Q["group_id"] == event.group_id)
if info:
return info[0]["list"]
else:
setting.insert({"group_id": event.group_id, "song": False, "list": False})
return False
elif isinstance(event, PrivateMessageEvent):
info = setting.search(Q["user_id"] == event.user_id)
if info:
return info[0]["list"]
else:
setting.insert({"user_id": event.user_id, "song": True, "list": True})
return True


async def check_search() -> bool:
info = setting.search(Q["global"] == "search")
if info:
if info[0]["list"]:
return ncm_config.ncm_list
return info[0]["value"]
else:
setting.insert({"group_id": event.dict()["group_id"], "song": False, "list": False})
return False
setting.insert({"global": "search", "value": True})
return True


async def search_check() -> bool:
return ncm_config.ncm_search
async def music_set_rule(event: Union[GroupMessageEvent, PrivateMessageEvent]) -> bool:
# 权限设置
return event.sender.role in ADMIN[:ncm_config.ncm_admin_level] or event.get_user_id() in ncm_config.superusers


async def music_reply_rule(event: GroupMessageEvent):
return event.reply and event.reply.sender.user_id == event.self_id
async def music_reply_rule(event: Union[GroupMessageEvent, PrivateMessageEvent]):
return event.reply and event.get_plaintext() == "下载"


# ============Matcher=============
ncm_set = on_command("ncm",
rule=Rule(music_set_rule),
priority=1, block=False)
'''功能设置'''
music_regex = on_regex("(song|url)\?id=([0-9]+)(|&)",
rule=Rule(song_is_open),
priority=2, block=False)
'''歌曲id识别'''
playlist_regex = on_regex("playlist\?id=([0-9]+)&",
rule=Rule(playlist_is_open),
priority=2, block=False)
'''歌单识别'''
music_reply = on_message(priority=2,
rule=Rule(music_reply_rule),
block=False)
'''回复下载'''
search = on_command("点歌",
rule=Rule(search_check),
rule=Rule(check_search),
priority=2, block=False)
'''点歌'''

Expand All @@ -90,103 +117,134 @@ async def receive_song(bot: Bot,
song: Message = Arg(),
):
nncm.get_session(bot, event)
_id = await nncm.search_song(keyword=song.__str__(), limit=1)
try:
await bot.send(event=event, message=Message(MessageSegment.music(type_="163", id_=_id)))
if isinstance(event, GroupMessageEvent):
await nncm.parse_song(_id)
except ActionFailed:
await search.finish(event=event, message="[WARNING]: 合并转发(群)消息发送失败: 账号可能被风控")
_id = await nncm.search_song(keyword=str(song), limit=1)
message_id = await bot.send(event=event, message=Message(MessageSegment.music(type_="163", id_=_id)))
nncm.get_song(message_id=message_id["message_id"], nid=_id)
# try:

# except ActionFailed as e:
# logger.error(e.info)
# await search.finish(event=event, message=f"[WARNING]: 网易云卡片消息发送失败: 账号可能被风控")


@music_regex.handle()
async def music_receive(bot: Bot, event: GroupMessageEvent, regroup: Tuple[Any, ...] = RegexGroup()):
async def music_receive(bot: Bot, event: Union[GroupMessageEvent, PrivateMessageEvent], regroup: Tuple[Any, ...] = RegexGroup()):
nid = regroup[1]
logger.debug(f"已识别NID:{nid}的歌曲")
logger.info(f"已识别NID:{nid}的歌曲")
nncm.get_session(bot, event)
await nncm.parse_song(nid)
nncm.get_song(nid)


@playlist_regex.handle()
async def music_receive(bot: Bot, event: GroupMessageEvent, regroup: Tuple[Any, ...] = RegexGroup()):
async def music_list_receive(bot: Bot, event: Union[GroupMessageEvent, PrivateMessageEvent], regroup: Tuple[Any, ...] = RegexGroup()):
lid = regroup[0]
logger.debug(f"已识别LID:{lid}的歌单")
logger.info(f"已识别LID:{lid}的歌单")
nncm.get_session(bot, event)
msg = await nncm.playlist(lid=lid)
await bot.send(event=event, message=Message(MessageSegment.text(msg)))


@music_reply.handle()
async def music_reply_receive(bot: Bot, event: GroupMessageEvent):
try: # 防止其他回复状况报错
message: str = event.dict()["reply"]["message"][0].data["text"]
except Exception:
return
async def music_reply_receive(bot: Bot, event: Union[GroupMessageEvent, PrivateMessageEvent]):
# logger.info(event.dict()["reply"]["message_id"])
nncm.get_session(bot, event)
nid = re.search("ID:([0-9]*)", message)
if nid:
info = nncm.check_message()
if info is None:
return
if info["type"] == "song" and await song_is_open(event):
await bot.send(event=event, message="少女祈祷中🙏...")
await nncm.download(ids=[int(nid[1])])
data = music.search(Q["id"] == int(nid[1]))
await nncm.download(ids=[int(info["nid"])])
data = await nncm.music_check(info["nid"])
if data:
await nncm.upload_group_file(data)
if isinstance(event, GroupMessageEvent):
await nncm.upload_group_file(data)
elif isinstance(event, PrivateMessageEvent):
await nncm.upload_private_file(data)
else:
logger.error("数据库中未有该音乐地址数据")
else:
lid = re.search("LIST:([0-9]*)", message)
info = playlist.search(Q["playlist_id"] == lid[1])
if info:
await nncm.download(ids=info[0]["ids"])
for i in info[0]["ids"]:
data = music.search(Q["id"] == i)
if data:
await nncm.upload_group_file(data)
else:
logger.error("数据库中未有该音乐地址数据")
else:
logger.error("数据库中未发现该歌单ID")

elif info["type"] == "playlist" and await playlist_is_open(event):
await nncm.download(ids=info["ids"])
for i in info["ids"]:
data = await nncm.music_check(i)
if data:
await nncm.upload_group_file(data)
else:
logger.error("数据库中未有该音乐地址数据")


@ncm_set.handle()
async def set_receive(bot: Bot, event: GroupMessageEvent, args: Message = CommandArg()): # 功能设置接收
true = ["True", "T", "true", "t"]
false = ["False", "F", "false", "f"]
async def set_receive(bot: Bot, event: Union[GroupMessageEvent, PrivateMessageEvent], args: Message = CommandArg()): # 功能设置接收
logger.debug(f"权限为{event.sender.role}的用户<{event.sender.nickname}>尝试使用命令{cmd}ncm {args}")
if event.sender.role not in ncm_config.ncm_admin:
logger.debug(f"执行错误:用户<{event.sender.nickname}>权限{event.sender.role}不在{ncm_config.ncm_admin}中")
elif event.get_user_id() not in ncm_config.superusers:
logger.debug(f"执行错误:用户<{event.sender.nickname}>非超级管理员(SUPERUSERS)")
if event.sender.role in ncm_config.ncm_admin or event.get_user_id() in ncm_config.superusers:
if args:
args = args.__str__().split()
if args:
args = str(args).split()
if len(args) == 1:
mold = args[0]
else:
msg = f"{cmd}ncm:获取命令菜单\r\n说明:网易云歌曲分享到群内后回复机器人即可下载\r\n" \
f"{cmd}ncm t:开启解析\r\n{cmd}ncm f:关闭解析\n{cmd}点歌 歌名:点歌"
return await ncm_set.finish(message=MessageSegment.text(msg))

info = setting.search(Q["group_id"] == event.dict()["group_id"])
# logger.info(info)
if info:
if mold in true:
if isinstance(event, GroupMessageEvent):
info = setting.search(Q["group_id"] == event.group_id)
# logger.info(info)
info[0]["song"] = True
info[0]["list"] = True
setting.update(info[0], Q["group_id"] == event.dict()["group_id"])
msg = "已开启自动下载功能"
await bot.send(event=event, message=Message(MessageSegment.text(msg)))
elif mold in false:
info[0]["song"] = False
info[0]["list"] = False
setting.update(info[0], Q["group_id"] == event.dict()["group_id"])
msg = "已关闭自动下载功能"
await bot.send(event=event, message=Message(MessageSegment.text(msg)))
logger.debug(f"用户<{event.sender.nickname}>执行操作成功")
else:
if mold in true:
setting.insert({"group_id": event.dict()["group_id"], "song": True, "list": True})
elif mold in false:
setting.insert({"group_id": event.dict()["group_id"], "song": False, "list": False})

if info:
if mold in TRUE:
info[0]["song"] = True
info[0]["list"] = True
setting.update(info[0], Q["group_id"] == event.group_id)
msg = "已开启自动下载功能"
await bot.send(event=event, message=Message(MessageSegment.text(msg)))
elif mold in FALSE:
info[0]["song"] = False
info[0]["list"] = False
setting.update(info[0], Q["group_id"] == event.group_id)
msg = "已关闭自动下载功能"
await bot.send(event=event, message=Message(MessageSegment.text(msg)))
logger.debug(f"用户<{event.sender.nickname}>执行操作成功")
else:
if mold in TRUE:
setting.insert({"group_id": event.group_id, "song": True, "list": True})
elif mold in FALSE:
setting.insert({"group_id": event.group_id, "song": False, "list": False})
elif isinstance(event, PrivateMessageEvent):
info = setting.search(Q["user_id"] == event.user_id)
# logger.info(info)
if info:
if mold in TRUE:
info[0]["song"] = True
info[0]["list"] = True
setting.update(info[0], Q["user_id"] == event.user_id)
msg = "已开启自动下载功能"
await bot.send(event=event, message=Message(MessageSegment.text(msg)))
elif mold in FALSE:
info[0]["song"] = False
info[0]["list"] = False
setting.update(info[0], Q["user_id"] == event.user_id)
msg = "已关闭自动下载功能"
await bot.send(event=event, message=Message(MessageSegment.text(msg)))
logger.debug(f"用户<{event.sender.nickname}>执行操作成功")
else:
if mold in TRUE:
setting.insert({"user_id": event.user_id, "song": True, "list": True})
elif mold in FALSE:
setting.insert({"user_id": event.user_id, "song": False, "list": False})
elif len(args) == 2 and args[0] == "search":
mold = args[1]
info = setting.search(Q["global"] == "search")
if info:
if mold in TRUE:
info[0]["value"] = True
setting.update(info[0], Q["global"] == "search")
msg = "已开启点歌功能"
await bot.send(event=event, message=Message(MessageSegment.text(msg)))
elif mold in FALSE:
info[0]["value"] = False
setting.update(info[0], Q["global"] == "search")
msg = "已关闭点歌功能"
await bot.send(event=event, message=Message(MessageSegment.text(msg)))
logger.debug(f"用户<{event.sender.nickname}>执行操作成功")
else:
if mold in TRUE:
setting.insert({"global": "search", "value": True})
elif mold in FALSE:
setting.insert({"global": "search", "value": False})
else:
await bot.send(event=event, message=Message(MessageSegment.text("你咩有权限哦~")))
msg = f"{cmd}ncm:获取命令菜单\r\n说明:网易云歌曲分享到群内后回复机器人即可下载\r\n" \
f"{cmd}ncm t:开启解析\r\n{cmd}ncm f:关闭解析\n{cmd}点歌 歌名:点歌"
return await ncm_set.finish(message=MessageSegment.text(msg))
16 changes: 2 additions & 14 deletions nonebot-plugin-ncm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
class Config(BaseModel, extra=Extra.ignore):
superusers: list = []

ncm_admin: list = ["owner", "admin"]
'''设置命令权限(非解析下载,仅解析功能开关设置)'''
ncm_admin_level: int = 1
'''设置命令权限(1:仅限superusers和群主,2:在1的基础上管理员,3:所有用户)'''

ncm_phone: str = ""
'''手机号'''
Expand All @@ -18,18 +18,6 @@ class Config(BaseModel, extra=Extra.ignore):
ncm_password: str = ""
'''密码'''

ncm_song: bool = True
'''单曲解析总开关'''

ncm_list: bool = True
'''歌单解析总开关'''

whitelist: list = []
'''白名单(一键导入)'''

ncm_search: bool = True
'''点歌总开关'''


global_config = nonebot.get_driver().config
ncm_config = Config(**global_config.dict()) # 载入配置
Loading

0 comments on commit d837ee7

Please sign in to comment.