diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f4137d5..e6c7afa 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,10 +5,10 @@ ci: autoupdate_branch: main autoupdate_schedule: monthly autoupdate_commit_msg: ':arrow_up: auto update by pre-commit hooks' - + repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.291 + rev: v0.5.6 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] @@ -19,7 +19,7 @@ repos: # - id: pyright - repo: https://github.com/psf/black - rev: 23.7.0 + rev: 24.8.0 hooks: - id: black stages: [commit] diff --git a/nonebot_plugin_l4d2_server/__main__.py b/nonebot_plugin_l4d2_server/__main__.py index bcb769a..7a6734f 100644 --- a/nonebot_plugin_l4d2_server/__main__.py +++ b/nonebot_plugin_l4d2_server/__main__.py @@ -26,7 +26,7 @@ from .config import config from .l4_help import get_l4d2_core_help -from .l4_local import * # noqa: F401, F403 +from .l4_local import * # noqa: F403 from .l4_request import ( COMMAND, get_all_server_detail, diff --git a/nonebot_plugin_l4d2_server/config.py b/nonebot_plugin_l4d2_server/config.py index 4ee15f0..9e1200e 100644 --- a/nonebot_plugin_l4d2_server/config.py +++ b/nonebot_plugin_l4d2_server/config.py @@ -1,5 +1,4 @@ from pathlib import Path -from typing import List from nonebot import get_plugin_config from pydantic import BaseModel @@ -12,6 +11,9 @@ ICONPATH = DATAPATH / "icon" +global map_index +map_index = 0 + class ConfigModel(BaseModel): l4_anne: bool = False diff --git a/nonebot_plugin_l4d2_server/l4_anne/__init__.py b/nonebot_plugin_l4d2_server/l4_anne/__init__.py index b5c7392..a79155c 100644 --- a/nonebot_plugin_l4d2_server/l4_anne/__init__.py +++ b/nonebot_plugin_l4d2_server/l4_anne/__init__.py @@ -1,4 +1,5 @@ -from nonebot import log as log, on_command # noqa: N999 +from nonebot import log as log +from nonebot import on_command from nonebot.adapters import Event, Message from nonebot.log import logger from nonebot.params import CommandArg diff --git a/nonebot_plugin_l4d2_server/l4_help/Help.json b/nonebot_plugin_l4d2_server/l4_help/Help.json index 47f91d6..057ef05 100644 --- a/nonebot_plugin_l4d2_server/l4_help/Help.json +++ b/nonebot_plugin_l4d2_server/l4_help/Help.json @@ -83,6 +83,14 @@ "need_ck": false, "need_sk": false, "need_admin": true + }, + { + "name": "l4地图上传", + "desc": "进入交互上传地图文件", + "eg": "l4地图上传", + "need_ck": false, + "need_sk": false, + "need_admin": true } ] }, diff --git a/nonebot_plugin_l4d2_server/l4_help/__init__.py b/nonebot_plugin_l4d2_server/l4_help/__init__.py index d938f96..9b4cab4 100644 --- a/nonebot_plugin_l4d2_server/l4_help/__init__.py +++ b/nonebot_plugin_l4d2_server/l4_help/__init__.py @@ -10,7 +10,7 @@ from ..l4_image.model import PluginHelp from .draw import get_help -__version__ = "1.0.0a2" +__version__ = "1.0.0b1" TEXT_PATH = Path(__file__).parent / "texture2d" HELP_DATA = Path(__file__).parent / "Help.json" diff --git a/nonebot_plugin_l4d2_server/l4_image/anne_pil.py b/nonebot_plugin_l4d2_server/l4_image/anne_pil.py index 3614df0..62faf5e 100644 --- a/nonebot_plugin_l4d2_server/l4_image/anne_pil.py +++ b/nonebot_plugin_l4d2_server/l4_image/anne_pil.py @@ -1,8 +1,9 @@ from pathlib import Path -from nonebot_plugin_l4d2_server.utils.api.models import AnnePlayer2 from PIL import Image, ImageFont +from nonebot_plugin_l4d2_server.utils.api.models import AnnePlayer2 + from ..config import config font = ImageFont.truetype(config.l4_font) @@ -10,7 +11,7 @@ anne_path = Path(__file__).parent / "img" / "anne" -async def anne_player_info(msg: AnnePlayer2): +async def anne_player_info(msg: AnnePlayer2): # noqa: RUF029 back_img = Image.open(anne_path / "back1.jpg") base_img = Image.new("RGBA", (back_img.size), (255, 255, 255, 50)) back_img.paste(base_img, (0, 0), base_img) diff --git a/nonebot_plugin_l4d2_server/l4_image/convert.py b/nonebot_plugin_l4d2_server/l4_image/convert.py index 2614a84..888a916 100644 --- a/nonebot_plugin_l4d2_server/l4_image/convert.py +++ b/nonebot_plugin_l4d2_server/l4_image/convert.py @@ -21,32 +21,28 @@ def core_font(size: int) -> ImageFont.FreeTypeFont: async def convert_img( img: Image.Image, is_base64: bool = False, -) -> bytes: - ... +) -> bytes: ... @overload async def convert_img( img: Image.Image, - is_base64: bool = True, -) -> str: - ... + is_base64: bool = True, # noqa: FBT001 +) -> str: ... @overload async def convert_img( img: bytes, is_base64: bool = False, -) -> str: - ... +) -> str: ... @overload async def convert_img( img: Path, is_base64: bool = False, -) -> str: - ... +) -> str: ... async def convert_img( @@ -78,7 +74,7 @@ async def convert_img( async with aiofiles.open(img, "rb") as fp: img = await fp.read() - logger.success("图片处理完成!") + logger.success("图片处理完成!") return f"base64://{b64encode(img).decode()}" @@ -90,7 +86,7 @@ def convert_img_sync(img_path: Path): return f"base64://{b64encode(img).decode()}" -async def str_lenth(r: str, size: int, limit: int = 540) -> str: +async def str_lenth(r: str, size: int, limit: int = 540) -> str: # noqa: RUF029 result = "" temp = 0 for i in r: diff --git a/nonebot_plugin_l4d2_server/l4_image/download.py b/nonebot_plugin_l4d2_server/l4_image/download.py index d787087..d4652c6 100644 --- a/nonebot_plugin_l4d2_server/l4_image/download.py +++ b/nonebot_plugin_l4d2_server/l4_image/download.py @@ -27,20 +27,22 @@ async def download_url(url: str) -> bytes: logger.warning(f"Error downloading {url}, retry {i}/3: {e}") await asyncio.sleep(3) - raise Exception(f"{url} 下载失败!") + raise Exception(f"{url} 下载失败!") async def download_head(user_id: str) -> bytes: url = f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640" data = await download_url(url) - if hashlib.md5(data).hexdigest() == "acef72340ac0e914090bd35799f5594e": # noqa: S324 + if ( + hashlib.md5(data).hexdigest() == "acef72340ac0e914090bd35799f5594e" + ): # noqa: S324 url = f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=100" data = await download_url(url) return data def square_to_circle(im: ImageS): - """im是正方形,变圆形""" + """im是正方形,变圆形""" size = im.size mask = Image.new("L", size, 0) draw = ImageDraw.Draw(mask) diff --git a/nonebot_plugin_l4d2_server/l4_image/html_img.py b/nonebot_plugin_l4d2_server/l4_image/html_img.py index f8e4d74..93f308f 100644 --- a/nonebot_plugin_l4d2_server/l4_image/html_img.py +++ b/nonebot_plugin_l4d2_server/l4_image/html_img.py @@ -23,7 +23,7 @@ async def server_ip_pic(server_dict: List[OutServer]): """ - 输入一个字典列表,输出图片 + 输入一个字典列表,输出图片 msg_dict:folder/name/map_/players/max_players/Players/[Name] """ for server_info in server_dict: @@ -92,7 +92,7 @@ async def get_server_img(plugins: List[OutServer]) -> Optional[bytes]: # async def server_group_ip_pic(msg_list: List[ServerGroup]) -> bytes: # """ -# 输入一个群组字典列表,输出图片 +# 输入一个群组字典列表,输出图片 # msg_dict:folder/name/map_/players/max_players/Players/[Name] # """ # template = env.get_template("group_ip.html") diff --git a/nonebot_plugin_l4d2_server/l4_image/image_tools.py b/nonebot_plugin_l4d2_server/l4_image/image_tools.py index baaba4d..e3bd03c 100644 --- a/nonebot_plugin_l4d2_server/l4_image/image_tools.py +++ b/nonebot_plugin_l4d2_server/l4_image/image_tools.py @@ -44,7 +44,10 @@ def get_v4_bg(w: int, h: int, is_dark: bool = False, is_blur: bool = False): return img.convert("RGBA") -async def shift_image_hue(img: Image.Image, angle: float = 30) -> Image.Image: +async def shift_image_hue( + img: Image.Image, + angle: float = 30, +) -> Image.Image: # noqa: RUF029 alpha = img.getchannel("A") img = img.convert("HSV") @@ -198,7 +201,7 @@ def easy_paste( """ inplace method 快速粘贴, 自动获取被粘贴图像的坐标。 - pos应当是粘贴点坐标,direction指定粘贴点方位,例如lt为左上 + pos应当是粘贴点坐标,direction指定粘贴点方位,例如lt为左上 """ x, y = pos size_x, size_y = im_paste.size @@ -379,11 +382,11 @@ def get_bg_color( for i in range(color): bg = tuple( q.getpalette()[ # type:ignore - i * 3 : (i * 3) + 3 # noqa:E203 + i * 3 : (i * 3) + 3 ], ) light_value = bg[0] * 0.3 + bg[1] * 0.6 + bg[2] * 0.1 - if abs(light_value - based_light) < temp: # noqa:E203 + if abs(light_value - based_light) < temp: bg_color = bg temp = abs(light_value - based_light) return bg_color # type:ignore diff --git a/nonebot_plugin_l4d2_server/l4_local/__init__.py b/nonebot_plugin_l4d2_server/l4_local/__init__.py index 54dbcba..e04d2e1 100644 --- a/nonebot_plugin_l4d2_server/l4_local/__init__.py +++ b/nonebot_plugin_l4d2_server/l4_local/__init__.py @@ -1,33 +1,42 @@ - from pathlib import Path -from nonebot import on_command +from nonebot.adapters import Event from nonebot.log import logger -from nonebot_plugin_alconna import UniMessage +from nonebot.matcher import Matcher +from nonebot_plugin_alconna import File, UniMessage, UniMsg, on_alconna -from ..config import config +from ..config import config, map_index from ..l4_image.convert import text2pic +from ..utils.utils import mes_list +from .file import updown_l4d2_vpk + +vpk_path = config.l4_local[map_index] local_path_list = config.l4_local if not local_path_list: - logger.warning("未填写本地服务器路径,如果想要使用本地服务器功能,请填写本地服务器路径") + logger.warning( + "未填写本地服务器路径,如果想要使用本地服务器功能,请填写本地服务器路径", + ) else: local_path: list[Path] = [] for folder_path in local_path_list: - path = Path(folder_path) - - if path.is_dir(): - for nextdir in path.iterdir(): - # 如果找到了名为left4dead2的目录,返回True - if nextdir.name == "left4dead2" and nextdir.is_dir(): + path = Path(folder_path) + + if path.is_dir(): + for nextdir in path.iterdir(): + # 如果找到了名为left4dead2的目录,返回True + if nextdir.name == "left4dead2" and nextdir.is_dir(): local_path.append(nextdir) continue - logger.info(f"本地服务器路径列表:{local_path}") - - global map_index - map_index = 0 + logger.debug(f"本地服务器路径列表:{local_path}") + + search_map = on_alconna( + "l4map", + aliases={"l4地图查询", "l4地图"}, + priority=20, + block=True, + ) - search_map = on_command("l4map", aliases={"l4地图查询", "l4地图"}, priority=5, block=True) @search_map.handle() async def _(): supath = local_path[map_index] / "addons" @@ -40,7 +49,46 @@ async def _(): vpk_list.append(sudir.name) if not vpk_list: await UniMessage.text("未找到可用的VPK文件").finish() - out_msg = "\n".join(f"{index + 1}、{line}" for index, line in enumerate(vpk_list)) - + out_msg = "\n".join( + f"{index + 1}、{line}" for index, line in enumerate(vpk_list) + ) + img = await text2pic(f"服务器地图:\n{out_msg}") - await UniMessage.image(raw=img).send() \ No newline at end of file + await UniMessage.image(raw=img).send() + + up = on_alconna( + "l4upload", + aliases={"l4地图上传"}, + priority=5, + block=True, + ) + + @up.handle() + async def _(): + await UniMessage.text("请发送地图文件").finish() + + @up.got("map_url", prompt="图来") + async def _(ev: Event, msg: UniMsg, matcher: Matcher): + if not msg.has(File): + await UniMessage.text("不是文件,退出交互").finish() + args = ev.dict() + if args["notice_type"] != "offline_file": + matcher.set_arg("txt", args) # type: ignore + return + l4_file_path = config.l4_local[map_index] + map_path = Path(l4_file_path, vpk_path) # type: ignore + # 检查下载路径是否存在 + if not Path(l4_file_path).exists(): # type: ignore + await UniMessage.text("你填写的路径不存在辣").finish() + if not Path(map_path).exists(): + await UniMessage.text("这个路径并不是求生服务器的路径,请再看看罢").finish() + url: str = args["file"]["url"] + name: str = args["file"]["name"] + # 如果不符合格式则忽略 + await up.send("已收到文件,开始下载") + vpk_files = await updown_l4d2_vpk(map_path, name, url) + if vpk_files: + mes = "解压成功,新增以下几个vpk文件" + await UniMessage.text(mes_list(mes, vpk_files)).finish() + else: + await UniMessage.text("你可能上传了相同的文件,或者解压失败了捏").finish() diff --git a/old_project/l4d2_file/utils.py b/nonebot_plugin_l4d2_server/l4_local/file.py similarity index 92% rename from old_project/l4d2_file/utils.py rename to nonebot_plugin_l4d2_server/l4_local/file.py index 0d7054f..1f3ce6a 100644 --- a/old_project/l4d2_file/utils.py +++ b/nonebot_plugin_l4d2_server/l4_local/file.py @@ -1,4 +1,5 @@ import io +import platform import zipfile from pathlib import Path from typing import Callable, Dict, List @@ -8,8 +9,9 @@ from nonebot.log import logger from pyunpack import Archive -from ..l4d2_utils.config import systems -from ..l4d2_utils.utils import get_file, get_vpk +from ..utils.utils import get_file, get_vpk + +systems = platform.system() async def updown_l4d2_vpk(map_paths: Path, name: str, url: str): @@ -52,9 +54,8 @@ def unpack_rarfile(down_file: Path, down_path: Path): def open_packet(name: str, down_file: Path) -> str: """解压压缩包""" down_path = down_file.parent - logger.info("文件名为:" + name) + logger.info("文件名为:" + name) logger.info(f"系统为{systems}") - if name.endswith(".vpk"): return "vpk文件已下载" @@ -79,7 +80,7 @@ def support_gbk(zip_file: ZipFile): """ 压缩包中文恢复 """ - if type(zip_file) == ZipFile: + if isinstance(zip_file, ZipFile): name_to_info = zip_file.NameToInfo # copy map first for name, info in name_to_info.copy().items(): @@ -91,7 +92,7 @@ def support_gbk(zip_file: ZipFile): return zip_file -async def all_zip_to_one(data_list: List[bytes]): +async def all_zip_to_one(data_list: List[bytes]): # noqa: RUF029 """多压缩包文件合并""" file_list = [io.BytesIO(data).getbuffer() for data in data_list] data_file = io.BytesIO() diff --git a/nonebot_plugin_l4d2_server/l4_request/draw_msg.py b/nonebot_plugin_l4d2_server/l4_request/draw_msg.py index 960dff7..5a063bf 100644 --- a/nonebot_plugin_l4d2_server/l4_request/draw_msg.py +++ b/nonebot_plugin_l4d2_server/l4_request/draw_msg.py @@ -52,7 +52,7 @@ async def get_much_server(server_json: List[NserverOut], command: str): out_server: List[OutServer] = [] search_list: List[Tuple[str, int]] = [] for i in server_json: - search_list.append((i["host"], i["port"])) # noqa: PERF401 + search_list.append((i["host"], i["port"])) all_server = await L4API.a2s_info(search_list, is_player=True) diff --git a/nonebot_plugin_l4d2_server/utils/utils.py b/nonebot_plugin_l4d2_server/utils/utils.py index 56bbde2..f269362 100644 --- a/nonebot_plugin_l4d2_server/utils/utils.py +++ b/nonebot_plugin_l4d2_server/utils/utils.py @@ -6,8 +6,10 @@ import aiofiles import aiohttp +import nonebot from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, MessageEvent from nonebot.log import logger +from nonebot_plugin_alconna import UniMessage async def get_file(url: str, down_file: Path): @@ -174,7 +176,7 @@ def split_maohao(msg: str) -> Tuple[str, int]: headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:107.0) Gecko/20100101 Firefox/107.0", # noqa: E501 + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:107.0) Gecko/20100101 Firefox/107.0", } @@ -198,7 +200,16 @@ async def url_to_msg(url: str): return None -async def get_message_at(datas: str) -> Optional[int]: # noqa: F811 +async def get_message_at(datas: str) -> Optional[int]: data: Dict[str, Any] = json.loads(datas) at_list = [int(msg["data"]["qq"]) for msg in data["message"] if msg["type"] == "at"] return at_list[0] if at_list else None + + +async def send_ip_msg(msg: str): + try: + await UniMessage.text(msg).finish() + except nonebot.adapters.qq: + msg_new = msg.split("\n")[:-2] + msg_out = "\n".join(msg_new) + await UniMessage.text(msg_out).send() diff --git a/old_project/l4d2_anne/__init__.py b/old_project/l4d2_anne/__init__.py deleted file mode 100644 index da3d48f..0000000 --- a/old_project/l4d2_anne/__init__.py +++ /dev/null @@ -1,94 +0,0 @@ -from nonebot import on_command -from nonebot.adapters.onebot.v11 import Message, MessageEvent -from nonebot.matcher import Matcher -from nonebot.params import CommandArg -from nonebot_plugin_alconna.uniseg import UniMessage - -# from .l4d2_file.input_json import * -from ..l4d2_utils.config import MASTER -from ..l4d2_utils.utils import at_to_usrid, get_message_at -from .server import updata_anne_server -from .utils import bind_steam, name_exist, search_anne - -# anne -anne_player = on_command("Ranne", aliases={"求生anne"}, priority=25, block=True) -anne_bind = on_command( - "Rbind", - aliases={"steam绑定", "求生绑定", "anne绑定"}, - priority=20, - block=True, -) -del_bind = on_command( - "del_bind", - aliases={"steam解绑", "求生解绑", "anne解绑"}, - priority=20, - block=True, -) -updata = on_command( - "updata_anne", - aliases={"求生更新anne"}, - priority=20, - block=True, - permission=MASTER, -) - - -@anne_player.handle() -async def _(matcher: Matcher, event: MessageEvent, args: Message = CommandArg()): - name = args.extract_plain_text() - name = name.strip() - at = await get_message_at(event.json()) - usr_id = at_to_usrid(at) - if not usr_id: - usr_id = event.user_id - # 没有参数则从db里找数据 - msg = await search_anne(name, str(usr_id)) - if isinstance(msg, str): - await matcher.finish(msg) - elif isinstance(msg, bytes): - await UniMessage.image(raw=msg).send() - - -@anne_bind.handle() -async def _(matcher: Matcher, event: MessageEvent, args: Message = CommandArg()): - tag = args.extract_plain_text() - tag = tag.strip() - if tag == "" or tag.isspace(): - await matcher.finish("虚空绑定?") - usr_id = str(event.user_id) - nickname = event.sender.card or event.sender.nickname - if not nickname: - nickname = "宁宁" - msg = await bind_steam(usr_id, tag, nickname) - await matcher.finish(msg) - - -@del_bind.handle() -async def _(matcher: Matcher, event: MessageEvent): - usr_id = event.user_id - msg = name_exist(str(usr_id)) - if not msg: - return - await matcher.finish(msg) - - -@del_bind.handle() -async def _(matcher: Matcher, event: MessageEvent): - usr_id = event.user_id - msg = name_exist(str(usr_id)) - if not msg: - return - await matcher.finish(msg) - - -@updata.handle() -async def _(matcher: Matcher, args: Message = CommandArg()): - """更新""" - if args: - # 占位先,除了电信服还有再加 - ... - anne_ip_dict = await updata_anne_server() - if not anne_ip_dict: - await matcher.finish("网络开小差了捏") - server_number = len(anne_ip_dict["云"]) - await matcher.finish(f"更新成功\n一共更新了{server_number}个电信anne服ip") diff --git a/old_project/l4d2_anne/analysis.py b/old_project/l4d2_anne/analysis.py deleted file mode 100644 index 479f915..0000000 --- a/old_project/l4d2_anne/analysis.py +++ /dev/null @@ -1,54 +0,0 @@ -import pandas as pd - -from .startand import NUMBER_MAP, SAVE_MAP - - -async def df_to_guoguanlv(df: pd.DataFrame): - """分析救援关过图率""" - data = df[df["游戏模式"] == "AnneHappy药役"] - other = df[df["游戏模式"].isin(["牛牛冲刺", "单人装逼"])] - all_map = len(data["地图"]) - other_map = len(other["地图"]) - resen = 0 - last_maps = {} - for m in SAVE_MAP: - prefix = m.split("m")[0] - if prefix in last_maps: - last_maps[prefix] = max(last_maps[prefix], m) - else: - last_maps[prefix] = m - - map_counts = {} - - n = 0 - for key in last_maps: - count = len(data[data["地图"].str.startswith(key)]) - if count == 0: - continue - last_map = last_maps[key] - map_count = len(data[data["地图"] == last_map]) - map_counts[key] = map_count * NUMBER_MAP[n] / count - quan = count / all_map - resen += quan * map_counts[key] - n += 1 - - # result = [] - # for i in range(1, 15): - # key = 'c{}'.format(i) - # if key in map_counts: - # result.append('{}:{}%'.format(key, round(map_counts[key] * 100))) - - # print(result) - # result = '救援图过关率: {:.2%}'.format(resen) - - # 加上特殊关卡 - try: - resen += other_map / (all_map + other_map) - result = {"救援关": str("{:.2%}".format(resen))} - except (TypeError, KeyError): - result = {"救援关": "错误"} - except ZeroDivisionError: - result = {"救援关": "0.00%"} - except Exception: - result = {"救援关": "错误"} - return result diff --git a/old_project/l4d2_anne/server.py b/old_project/l4d2_anne/server.py deleted file mode 100644 index a4e105f..0000000 --- a/old_project/l4d2_anne/server.py +++ /dev/null @@ -1,47 +0,0 @@ -import json -from pathlib import Path -from typing import Dict, List - -import httpx -from bs4 import BeautifulSoup - -from ..l4d2_utils.config import ANNE_IP, CONFIG_PATH, anne_url, headers - - -async def updata_anne_server(): - """更新anne服务器列表""" - data = httpx.get(anne_url, headers=headers).content # noqa: ASYNC100 - soup = BeautifulSoup(data, "html.parser") - tbody = soup.find("tbody") - if not tbody: - return None - n = 0 - ip_list = [] - while n < 50: - n += 1 - tr = tbody.find(id=f"server_{n}") # type: ignore - if tr: - td: str = tr.select_one("td:nth-of-type(5)").get_text() # type: ignore - else: - continue - if td: - ip_list.append(td) - else: - continue - if not ip_list: - return None - ip_dict: Dict[str, List[Dict[str, str]]] = {"云": []} - n: int = 0 - - for i, ip in enumerate(ip_list, start=1): - ip_dict["云"].append({"id": str(i), "ip": ip}) - - # ANNE_IP.update(ip_dict) - with Path(CONFIG_PATH.parent / "l4d2/云.json").open( - mode="w", - encoding="utf-8", - ) as f: - json.dump(ip_dict, f, indent=4, ensure_ascii=False) - # print(ANNE_IP) - ANNE_IP.update(ip_dict) - return ip_dict diff --git a/old_project/l4d2_anne/startand.py b/old_project/l4d2_anne/startand.py deleted file mode 100644 index 94c4624..0000000 --- a/old_project/l4d2_anne/startand.py +++ /dev/null @@ -1,17 +0,0 @@ -NUMBER_MAP = [4, 5, 4, 5, 5, 3, 3, 5, 2, 5, 5, 5, 4, 2] -SAVE_MAP = [ - "c1m4_atrium", - "c2m5_concert", - "c3m4_plantation", - "c4m5_milltown_escape", - "c5m5_bridge", - "c6m3_port", - "c7m3_port", - "c8m5_rooftop", - "c9m2_lots", - "c10m5_houseboat", - "c11m5_runway", - "c12m5_cornfield", - "c13m4_cutthroatcreek", - "c14m2_lighthouse", -] diff --git a/old_project/l4d2_anne/utils.py b/old_project/l4d2_anne/utils.py deleted file mode 100644 index bdbd7a4..0000000 --- a/old_project/l4d2_anne/utils.py +++ /dev/null @@ -1,292 +0,0 @@ -from typing import List - -import httpx -import pandas as pd -from bs4 import BeautifulSoup -from nonebot.log import logger - -from ..l4d2_data.players import L4D2Player -from ..l4d2_image import out_png -from ..l4d2_utils.seach import anne_search -from .analysis import df_to_guoguanlv - -# from .anne_telecom import ANNE_API - - -s = L4D2Player() -headers = { - "User-Agent": ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:107.0) " - "Gecko/20100101 Firefox/107.0" - ), -} - - -async def anne_html(name: str): - """搜索里提取玩家信息,返回列表字典""" - data_title = anne_search(name) - if not data_title: - return None - data = data_title[0] - title = data_title[1] - if len(data) == 0 or data[0] == "No Player found.": - return [] - data_list: list = [] - logger.info(data) - for i in data: - i: BeautifulSoup - - def find_text(soup, attr_en, attr_alt): - try: - element = soup.find("td", {"data-title": attr_en}) - if element is None: - element = soup.find("td", {"data-title": attr_alt}) - return element.text.strip() if element else "" - except AttributeError: - return "" - - rank = find_text(i, "Rank:", "排名:") - player = find_text(i, "Player:", "玩家:") - points = find_text(i, "Points:", "分数:") - playtime = find_text(i, "Playtime:", "游玩时间:") - last_online = find_text(i, "Last Online:", "最后上线时间:") - onclick = i["onclick"] - steamid = onclick.split("=")[2].strip("'") # type: ignore - play_json = { - title[0]: rank, - title[1]: player, - title[2]: points, - # title[3]:country, - title[3]: playtime, - title[4]: last_online, - title[5]: steamid, - } - data_list.append(play_json) - logger.info("搜寻数据") - return data_list - - -def anne_html_msg(data_list: list): - """从搜索结果的字典列表中,返回发送信息""" - mes = "搜索到以下玩家信息" - - for ns, one in enumerate(data_list, start=0): - one: dict - x = 6 - - titles = list(one.keys()) - for i in range(x): - mes += "\n" + titles[i] + ":" + str(one[titles[i]]) - mes += "\n--------------------" - if ns > 4: - break - return mes - - -async def write_player(id_, msg: str, nickname: str): - """绑定用户""" - # 判断是steam - if msg.startswith("STEAM"): - # try: - data_tuple = s.query_player_qq(id_) - if data_tuple is not None: - qq, nicknam, steamid = data_tuple - else: - nicknam = None - await s.add_player_all(id_, nicknam, msg) - # except TypeError: - # await s._add_player_steamid(id_ , msg) - return "绑定成功喵~\nQQ:" + nickname + "\n" + "steamid:" + msg - # try: - data_tuple = s.query_player_qq(id_) - if data_tuple is not None: - id_, nicknam, steamid = data_tuple - else: - steamid = None - await s.add_player_all(id_, msg, steamid) - # except TypeError: - # await s._add_player_nickname(id_ , msg ) - return "绑定成功喵~\nQQ:" + nickname + "\n" + "steam昵称:" + msg - - -def del_player(id_: str): - """删除绑定信息,返回消息""" - if not s.query_player_qq(id_): - return "你还没有绑定过,请使用[求生绑定+昵称/steamid]" - if s.delete_player: - return "删除成功喵~" - return None - - -async def id_to_mes(name: str): - """根据name从数据库,返回steamid、或者空白""" - data_tuple = await s.search_data(None, name, None) - if data_tuple: - return data_tuple[2] - return None - - -def anne_rank_dict(name: str): - """用steamid,查详情,输出字典""" - data_dict = {} - url = f"https://sb.trygek.com/l4d_stats/ranking/player.php?steamid={name}" - - data = httpx.get(url=url, headers=headers, timeout=5) - if data.status_code != 200: - return [f"查询错误,状态码{data.status_code}"] - data = data.content.decode("utf-8") - data = BeautifulSoup(data, "html.parser") - detail = data.find_all("table") - n = 0 - data_list: List[dict] = [] - while n < 2: - detail2 = detail[n] - tr = detail2.find_all("tr") - for i in tr: - title = i.find("td", {"class": "w-50"}) - value = title.find_next_sibling("td") - new_dict = {title.text: value.text} - data_dict.update(new_dict) - data_list.append(data_dict) - n += 1 - # 获取头像 - element: str = data.find_all(attrs={"style": "cursor:pointer"})[0].get("onclick") - player_url = element.split("'")[1] - data_list[0].update({"个人资料": player_url}) - # 获取一言 - message = data.select( - ( - "html body div.content.text-center.text-md-left " - "div.container.text-left div.col-md-12.h-100 " - "div.card-body.worldmap.d-flex.flex-column.justify-content-center." - "text-center span" - ), - ) - msg_list = [] - for i in message: - msg_list.append(i.text) - data_list[0].update({"一言": msg_list}) - return data_list - - -def anne_rank_dict_msg(data_list): - """字典转msg""" - msg = "" - for data_dict in data_list: - mes = "" - for i in data_dict: - mes += "\n" + i + data_dict[i] - mes += "\n--------------------" - msg += mes - return msg - - -async def anne_message(name: str, usr_id: str): - """获取anne信息可输出信息""" - if name: - logger.info("关键词查询" + name) - if not name.startswith("STEAM"): - steamid = await id_to_mes(name) - if not steamid: - logger.info("没有找到qq,使用默认头像") - message = await anne_html(name) - if not message: - return None - usr_id = "1145149191810" - if len(message) == 0: - return "没有叫这个名字的...\n" - if len(message) > 1: - return anne_html_msg(message) - name = message[0]["steamid"] - else: - name = steamid - - # steamid - msg = anne_rank_dict(name)[0] - if isinstance(msg, dict): - msg.update(await df_to_guoguanlv(await anne_map_msg(name))) - logger.info("使用图片") - msg = await out_png(usr_id, msg) - return msg - """ - 1、qq>数据>没有数据,返回 - 2、qq>数据>steamid>查询 - 3、qq>数据>昵称>查询 - """ - logger.info("qq信息查询") - data_tuple = s.query_player_qq(usr_id) - logger.info(data_tuple) - if not data_tuple: - return "没有绑定信息...请使用【求生绑定 xxx】\n" - # 只有名字,先查询数据在判断 - if data_tuple[2]: - name = data_tuple[2] - elif data_tuple[1]: - name = await id_to_mes(data_tuple[1]) # type: ignore - logger.info(name) - if not name: - message = await anne_html(data_tuple[1]) - if not message: - return None - usr_id = "1145149191810" - if len(message) == 0: - return "没有叫这个名字的...\n" - if len(message) > 1: - return anne_html_msg(message) - name = message[0]["steamid"] - - # name是steamid - msg = anne_rank_dict(name)[0] - if isinstance(msg, dict): - msg.update(await df_to_guoguanlv(await anne_map_msg(name))) - logger.info("使用图片") - msg = await out_png(usr_id, msg) - return msg - - -async def anne_map_msg(steamid: str): - """steamid->地图信息""" - url = f"https://sb.trygek.com/l4d_stats/ranking/timedmaps.php?steamid={steamid}" - - data = httpx.get(url, headers=headers, timeout=5).content.decode( # noqa: ASYNC100 - "utf-8", - ) - soup = BeautifulSoup(data, "html.parser") - data_list = [] - cards = soup.select("div.card.rounded-0") - for card in cards: - tbodies = card.select("tbody") - for tbody in tbodies: - rows = [td.text.strip() for td in tbody.find_all("td")] - for i in range(0, len(rows), 9): - row = rows[i : i + 9] - data_list.append(row) - return pd.DataFrame( - data_list, - columns=[ - "游戏模式", - "地图", - "难度", - "完成时间", - "特感数量", - "刷新间隔", - "B数使用", - "刷特模式", - "Anne版本", - ], - ) - - -def name_exist(id_: str): - """删除绑定信息""" - return del_player(id_) - - -async def search_anne(name: str, usr_id: str): - """获取anne成绩""" - return await anne_message(name, usr_id) - - -async def bind_steam(id_: str, msg: str, nickname: str): - """绑定qq-steam""" - return await write_player(id_, msg, nickname) diff --git a/old_project/l4d2_data/__init__.py b/old_project/l4d2_data/__init__.py deleted file mode 100644 index 0b1e8e6..0000000 --- a/old_project/l4d2_data/__init__.py +++ /dev/null @@ -1,105 +0,0 @@ -import sqlite3 - -from nonebot.log import logger - -from ..l4d2_utils.config import ( - DATASQLITE, - L4d2_BOOLEAN, - L4d2_INTEGER, - L4d2_players_tag, - L4d2_server_tag, - L4d2_TEXT, - table_data, - tables_columns, -) - - -class L4D2DataSqlite: - """连接数据库和断开数据库,以及一些检查函数""" - - def __init__(self): - """连接数据库""" - self.datasqlite_path = DATASQLITE - self.datasqlite_path.mkdir(parents=True, exist_ok=True) - self.conn = sqlite3.connect(self.datasqlite_path / "L4D2.db") - self._check_tables_exist() - self._check_data_existence() - self._check_data_validity() - logger.info("已连接求生数据库") - - def _base_conn(self): - return self.conn - - def _check_tables_exist(self) -> None: - """ - 检查表是否存在 - """ - c = self.conn.cursor() - for table in table_data: - c.execute( - f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'", - ) - if c.fetchone() is None: - if table == "L4d2_players": - c.execute(f"CREATE TABLE {table} (qq INTEGER PRIMARY KEY)") - elif table == "L4D2_server": - c.execute(f"CREATE TABLE {table} (number INTEGER PRIMARY KEY)") - self.conn.commit() - - def _check_data_existence(self) -> None: - """ - 检查表头是否存在,如果不存在则新建并填充默认值 - """ - c = self.conn.cursor() - for table, tag in tables_columns.items(): - for column in tag: - c.execute(f"PRAGMA table_info({table})") - if not any(col[1] == column for col in c.fetchall()): - if column in L4d2_BOOLEAN: - c.execute( - f"ALTER TABLE {table} ADD COLUMN {column} BOOLEAN DEFAULT 0", - ) - elif column in L4d2_INTEGER: - c.execute( - f"ALTER TABLE {table} ADD COLUMN {column} INTEGER DEFAULT NULL", - ) - else: - c.execute( - f"ALTER TABLE {table} ADD COLUMN {column} TEXT DEFAULT NULL", - ) - self.conn.commit() - - def _check_data_validity(self) -> None: - """ - 检查数据库数据的合法性 - 错误数据默认填充NULL或者False - """ - c = self.conn.cursor() - columns = None - table = None - for table in table_data: - columns = L4d2_players_tag if table == "L4d2_players" else L4d2_server_tag - if not columns: - return - for column in columns: - if column in L4d2_INTEGER: - c.execute( - f"UPDATE {table} SET {column} = NULL WHERE typeof({column}) != 'integer'", - ) - elif column in L4d2_TEXT: - c.execute( - f"UPDATE {table} SET {column} = NULL WHERE typeof({column}) != 'text'", - ) - elif column in L4d2_BOOLEAN: - c.execute( - f"UPDATE {table} SET {column} = 'False' WHERE typeof({column}) != 'boolean'", - ) - self.conn.commit() - - def _close(self): - """断开连接到数据库""" - self.conn.close() - logger.info("已断开求生数据库") - - -sq_L4D2 = L4D2DataSqlite() # noqa: N816 diff --git a/old_project/l4d2_data/config.py b/old_project/l4d2_data/config.py deleted file mode 100644 index 92ca461..0000000 --- a/old_project/l4d2_data/config.py +++ /dev/null @@ -1,18 +0,0 @@ -# from pathlib import Path -# import yaml - -# DATABASE = Path() / "data" / "L4D2" -# class L4D2Config: - -# def __init__(self): -# self.config_yaml = "config.yaml" -# config_data = self._config_data() - -# self - -# def _config_data(self): -# """配置数据""" -# with open(self.config_yaml, 'r', encoding='utf-8') as e: -# a = e.read() -# data = yaml.safe_load(a) -# return data diff --git a/old_project/l4d2_data/database.py b/old_project/l4d2_data/database.py deleted file mode 100644 index e69de29..0000000 diff --git a/old_project/l4d2_data/players.py b/old_project/l4d2_data/players.py deleted file mode 100644 index e87fc09..0000000 --- a/old_project/l4d2_data/players.py +++ /dev/null @@ -1,100 +0,0 @@ -import sqlite3 -from typing import List, Optional, Tuple, Union - -from ..l4d2_utils.config import DATASQLITE - - -class L4D2Player: - """数据库L4D2_Player表的操作""" - - def __init__(self): - """连接数据库""" - self.datasqlite_path = DATASQLITE - self.conn = sqlite3.connect(self.datasqlite_path / "L4D2.db") - self.c = self.conn.cursor() - - async def _add_player_nickname(self, qq, nickname): - """绑定昵称""" - # try: - self.c.execute( - "INSERT INTO L4d2_players (qq, nickname, steamid) VALUES (?,?,NULL)", - (qq, nickname), - ) - self.conn.commit() - # return True - # except sqlite3.IntegrityError: - # return False - - async def _add_player_steamid(self, qq, steamid): - """绑定steamid""" - self.c.execute( - "INSERT INTO L4d2_players (qq, nickname, steamid) VALUES (?,NULL,?)", - (qq, steamid), - ) - self.conn.commit() - - async def add_player_all(self, qq, nickname, steamid): - """用新数据覆盖旧数据""" - # try: - self.c.execute( - "INSERT OR REPLACE INTO L4d2_players (qq, nickname, steamid) VALUES (?,?,?)", # noqa: E501 - (qq, nickname, steamid), - ) - self.conn.commit() - return True - # except sqlite3.IntegrityError: - # return False - - def delete_player(self, qq): - """解除绑定""" - self.c.execute(f"DELETE FROM L4d2_players WHERE qq = {qq}") - self.conn.commit() - return True - - def query_player_qq(self, qq) -> Union[tuple, None]: - """通过qq获取数据""" - self.c.execute(f"SELECT * FROM L4d2_players WHERE qq = '{qq}'") - return self.c.fetchone() - - async def _query_player_nickname(self, nickname: str) -> Union[tuple, None]: - """通过nickname获取数据""" - self.c.execute(f"SELECT * FROM L4d2_players WHERE nickname = '{nickname}'") - return self.c.fetchone() - - async def _query_player_steamid(self, steamid: str): - """通过steamid获取数据""" - self.c.execute(f"SELECT * FROM L4d2_players WHERE steamid = '{steamid}'") - return self.c.fetchone() - - async def search_data( - self, - qq: Optional[str], - nickname: Optional[str], - steamid: Optional[str], - ) -> Union[tuple, None]: - """ - 输入元组查询,优先qq其次steamid最后nickname,不需要值可以为None - 输出为元组,如果为空输出None - data = (qq , nickname , steamid ) - """ - if qq: - self.c.execute("SELECT * FROM L4d2_players WHERE qq=?", (qq,)) - result = self.c.fetchone() - if result: - return result - if steamid: - self.c.execute("SELECT * FROM L4d2_players WHERE steamid=?", (steamid,)) - result = self.c.fetchone() - if result: - return result - if nickname: - self.c.execute("SELECT * FROM L4d2_players WHERE nickname=?", (nickname,)) - result = self.c.fetchone() - if result: - return result - return None - - def _query_all_player(self) -> List[Tuple]: - """获取所有玩家信息""" - self.c.execute("SELECT * FROM L4d2_players") - return self.c.fetchall() diff --git a/old_project/l4d2_data/serverip.py b/old_project/l4d2_data/serverip.py deleted file mode 100644 index d8a33e6..0000000 --- a/old_project/l4d2_data/serverip.py +++ /dev/null @@ -1,40 +0,0 @@ -import sqlite3 - -from ..l4d2_utils.config import DATASQLITE - - -class L4D2Server: - """数据库L4D2_server表的操作""" - - def __init__(self): - """连接数据库""" - self.datasqlite_path = DATASQLITE - self.conn = sqlite3.connect(self.datasqlite_path / "L4D2.db") - self.c = self.conn.cursor() - - async def bind_server_ip(self, qqgroup, host, port): - """绑定群订阅ip""" - self.c.execute( - "INSERT OR REPLACE INTO L4D2_server (qqgroup, host, port) VALUES (?,?,?)", - (qqgroup, host, port), - ) - self.conn.commit() - - async def query_server_ip(self, qqgroup): - """输入群号,返回数据库里订阅ip元组列表""" - self.c.execute( - f"SELECT number, qqgroup ,host ,port FROM L4D2_server WHERE qqgroup = {qqgroup}", - ) - return self.c.fetchall() - - async def del_server_ip(self, id_: int): - """删除指定id的ip""" - self.c.execute(f"DELETE FROM L4D2_server WHERE number = {id_}") - self.conn.commit() - - async def query_number(self, number: int): - """通过序号找服务器""" - self.c.execute( - f"SELECT qqgroup , host ,port FROM L4D2_server WHERE number = {number}", - ) - return self.c.fetchone() diff --git a/old_project/l4d2_file/__init__.py b/old_project/l4d2_file/__init__.py deleted file mode 100644 index 067fa2f..0000000 --- a/old_project/l4d2_file/__init__.py +++ /dev/null @@ -1,223 +0,0 @@ -import re -from pathlib import Path -from typing import Tuple - -from nonebot import on_command, on_regex -from nonebot.adapters.onebot.v11 import Event, Message, NoticeEvent -from nonebot.log import logger -from nonebot.matcher import Matcher -from nonebot.params import ArgPlainText, CommandArg, RegexGroup - -# from nonebot.typing import T_State -from ..l4d2_utils.config import MASTER, config_manager, file_format, l4_config, vpk_path -from ..l4d2_utils.rule import wenjian -from ..l4d2_utils.txt_to_img import mode_txt_to_img -from ..l4d2_utils.utils import del_map, get_vpk, mes_list, rename_map -from .input_json import upload # noqa: F401 -from .utils import updown_l4d2_vpk - -up = on_command( - "l4_upload", - aliases={"l4地图上传"}, - priority=20, - block=True, -) - - -rename_vpk = on_regex( - r"^l4地图\s*(\S+.*?)\s*(改|改名)?\s*(\S+.*?)\s*$", - flags=re.DOTALL, - block=True, - priority=20, - permission=MASTER, -) - -find_vpk = on_command("l4_map", aliases={"l4地图"}, priority=25, block=True) -del_vpk = on_command( - "l4_del_map", - aliases={"l4地图删除", "地图删除"}, - priority=20, - permission=MASTER, -) - - -check_path = on_command( - "l4_check", - aliases={"l4路径"}, - priority=20, - block=True, - permission=MASTER, -) -smx_file = on_command( - "l4_smx", - aliases={"l4插件"}, - priority=20, - block=True, - permission=MASTER, -) - - -@up.handle() -async def _(): - ... - - -@up.got("map_url", prompt="图来") -async def _(matcher: Matcher, event: Event): - if not isinstance(event, NoticeEvent) or not wenjian(event): - await matcher.finish("未检测到地图") - return - args = event.dict() - if args["notice_type"] != "offline_file": - matcher.set_arg("txt", args) # type: ignore - return - l4_file_path = l4_config.l4_ipall[l4_config.l4_number]["location"] - map_path = Path(l4_file_path, vpk_path) # type: ignore - # 检查下载路径是否存在 - if not Path(l4_file_path).exists(): # type: ignore - await matcher.finish("你填写的路径不存在辣") - if not Path(map_path).exists(): - await matcher.finish("这个路径并不是求生服务器的路径,请再看看罢") - url: str = args["file"]["url"] - name: str = args["file"]["name"] - # 如果不符合格式则忽略 - await up.send("已收到文件,开始下载") - vpk_files = await updown_l4d2_vpk(map_path, name, url) - if vpk_files: - mes = "解压成功,新增以下几个vpk文件" - await matcher.finish(mes_list(mes, vpk_files)) - else: - await matcher.finish("你可能上传了相同的文件,或者解压失败了捏") - - -path_list: str = "请选择上传位置(输入阿拉伯数字)" -times = 0 -for one_path in l4_config.l4_ipall: - times += 1 - path_msg = one_path["location"] - path_list += f"\n {times!s} | {path_msg}" - - -@up.got("is_sure", prompt=path_list) -async def _(matcher: Matcher): - args = matcher.get_arg("txt") - l4_file = l4_config.l4_ipall - if args is None: - await matcher.finish("获取文件出错辣,再试一次吧") - return - - is_sure = str(matcher.get_arg("is_sure")).strip() - if not is_sure.isdigit(): - await matcher.finish("已取消上传") - - file_path: str = "" - for one_server in l4_file: - if one_server["id_rank"] == is_sure: - file_path = one_server["location"] - if not file_path: - await matcher.finish("没有这个序号拉baka") - - map_path = Path(file_path, vpk_path) - - # 检查下载路径是否存在 - if not Path(file_path).exists(): - await matcher.finish("你填写的路径不存在辣") - if not map_path.exists(): - await matcher.finish("这个路径并不是求生服务器的路径,请再看看罢") - - url = args["file"]["url"] - name = args["file"]["name"] - # 如果不符合格式则忽略 - if not name.endswith(file_format): # type: ignore - return - - await matcher.send("已收到文件,开始下载") - vpk_files = await updown_l4d2_vpk(map_path, name, url) # type: ignore - - if vpk_files: - logger.info("检查到新增文件") - mes = "解压成功,新增以下几个vpk文件" - elif vpk_files is None: - await matcher.finish("文件错误") - return - else: - mes = "你可能上传了相同的文件,或者解压失败了捏" - - await matcher.finish(mes_list(mes, vpk_files)) - - -@find_vpk.handle() -async def _(): - map_path = Path(l4_config.l4_ipall[l4_config.l4_number]["location"], vpk_path) - name_vpk = get_vpk(map_path) - logger.info("获取文件列表成功") - mes = "当前服务器下有以下vpk文件" - msg = mes_list("", name_vpk).replace(" ", "") - - await mode_txt_to_img(mes, msg) - - -@del_vpk.handle() -async def _(matcher: Matcher, args: Message = CommandArg()): - num1 = args.extract_plain_text() - if num1: - matcher.set_arg("num", args) - - -@del_vpk.got("num", prompt="你要删除第几个序号的地图(阿拉伯数字)") -async def _(matcher: Matcher, tag: str = ArgPlainText("num")): - map_path = Path(l4_config.l4_ipall[l4_config.l4_number]["location"], vpk_path) - vpk_name = del_map(int(tag), map_path) - await matcher.finish("已删除地图:" + vpk_name) - - -@rename_vpk.handle() -async def _( - matcher: Matcher, - matched: Tuple[int, str, str] = RegexGroup(), -): - num, useless, rename = matched - map_path = Path(l4_config.l4_ipall[l4_config.l4_number]["location"], vpk_path) - logger.info("检查是否名字是.vpk后缀") - if not rename.endswith(".vpk"): - rename = rename + ".vpk" - logger.info("尝试改名") - try: - map_name = rename_map(num, rename, map_path) - if map_name: - await matcher.finish("改名成功\n原名:" + map_name + "\n新名称:" + rename) - except ValueError: - await matcher.finish("参数错误,输入【求生地图】获取全部名称") - - -@check_path.handle() -async def _(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg.startswith("切换"): - msg_number = int("".join(msg.replace("切换", " ").split())) - if msg_number > len(l4_config.l4_ipall) or msg_number < 0: - await matcher.send("没有这个序号的路径呐") - else: - l4_config.l4_number = msg_number - 1 - now_path = l4_config.l4_ipall[l4_config.l4_number]["location"] - await matcher.send( - f"已经切换路径为\n{l4_config.l4_number + 1!s}、{now_path}", - ) # noqa: E501 - config_manager.save() - else: - now_path = l4_config.l4_ipall[l4_config.l4_number]["location"] - await matcher.send(f"当前的路径为\n{l4_config.l4_number + 1!s}、{now_path}") - - -@smx_file.handle() -async def _(): - smx_path = Path( - l4_config.l4_ipall[l4_config.l4_number]["location"], - "left4dead2/addons/sourcemod/plugins", - ) - name_smx = get_vpk(smx_path, file_=".smx") - logger.info("获取文件列表成功") - mes = "当前服务器下有以下smx文件" - msg = "" - msg = mes_list(msg, name_smx).replace(" ", "") - await mode_txt_to_img(mes, msg) diff --git a/old_project/l4d2_file/ayromote.py b/old_project/l4d2_file/ayromote.py deleted file mode 100644 index 1bede77..0000000 --- a/old_project/l4d2_file/ayromote.py +++ /dev/null @@ -1,64 +0,0 @@ -# import asyncio -# import asyncssh - - -# class AsyncSSHClient: -# def __init__(self, host, port, username, password=None, private_key=None): -# self.host = host -# self.port = port -# self.username = username -# self.password = password -# self.private_key = private_key - -# async def connect(self): -# if self.private_key is not None: -# try: -# key = asyncssh.read_private_key(self.private_key) -# except asyncssh.Error as exc: -# raise ValueError(f"Unable to read private key: {exc}") -# else: -# key = None - -# self.conn = await asyncssh.connect( -# self.host, -# self.port, -# username=self.username, -# password=self.password, -# client_keys=key, -# ) - -# async def upload(self, local_path, remote_path): -# async with self.conn.sftp() as sftp: -# await sftp.put(local_path, remote_path) - -# async def delete(self, remote_path): -# async with self.conn.sftp() as sftp: -# await sftp.remove(remote_path) - -# async def listdir(self, remote_path): -# async with self.conn.sftp() as sftp: -# return await sftp.listdir(remote_path) - -# async def close(self): -# self.conn.close() - - -# async def remote( -# mode: str, -# host: str, -# user: str, -# password: str, -# local_path="", -# port=22, -# remote_path="", -# ): -# """mode:upload、read、del""" -# client = AsyncSSHClient(host, port, user, password) -# await client.connect() -# if mode == "upload": -# await client.upload(local_path, remote_path) -# elif mode == "read": -# file = await client.read(remote_path) -# return file -# elif mode == "del": -# await client.delete(remote_path) diff --git a/old_project/l4d2_file/input_json.py b/old_project/l4d2_file/input_json.py deleted file mode 100644 index b8be0bb..0000000 --- a/old_project/l4d2_file/input_json.py +++ /dev/null @@ -1,77 +0,0 @@ -import json -from pathlib import Path -from typing import Dict, List - -from nonebot import on_notice -from nonebot.adapters.onebot.v11 import NoticeEvent -from nonebot.log import logger -from nonebot.matcher import Matcher - -from ..l4d2_utils.steam import url_to_msg - -upload = on_notice(priority=1) - - -@upload.handle() -async def _(matcher: Matcher, event: NoticeEvent): - try: - arg = event.dict() - files: dict = arg["file"] - name: str = files["name"] - if arg["notice_type"] == "offline_file" and name.endswith(".json"): - try: - msg = await url_to_msg(files["url"]) - if not msg: - return - jsons: Dict[str, List[Dict[str, str]]] = json.loads(msg) - except json.decoder: - logger.info("求生json格式不正确") - await matcher.finish("求生json格式不正确") - return - if not validate_json(jsons): - logger.info("求生json格式不正确") - await matcher.finish("求生json格式不正确") - print(name) - key = await up_date(jsons, name) - if key: - # logger.info(jsons) - msg = "输入成功\n" - for key, value in jsons.items(): - msg += f"【{key}】指令:{len(value)}个\n" - logger.info(msg) - await matcher.send(msg) - except KeyError: - pass - - -async def validate_json(json_data): - try: - data = json.loads(json_data) - if not isinstance(data, dict): - return False - - for key, value in data.items(): - if not isinstance(value, list): - return False - for item in value: - if not isinstance(item, dict): - return False - if not all(key in item for key in ["id", "ip"]): - return False - if True: - return True - - except json.JSONDecodeError: - return False - - -async def up_date(data: Dict[str, List[Dict[str, str]]], name: str): - print(data) - directory = Path("data/L4D2/l4d2") - directory.mkdir(parents=True, exist_ok=True) - - file_path = directory / name - with file_path.open("w") as json_file: - json.dump(data, json_file) - - return True diff --git a/old_project/l4d2_file/remote.py b/old_project/l4d2_file/remote.py deleted file mode 100644 index 075a584..0000000 --- a/old_project/l4d2_file/remote.py +++ /dev/null @@ -1,86 +0,0 @@ -# import asyncio -# import paramiko - - -# class SSHClient: -# def __init__(self, hostname, port, username, password): -# self._hostname = hostname -# self._port = port -# self._username = username -# self._password = password -# self._ssh = None - -# async def connect(self): -# self._ssh = paramiko.SSHClient() -# self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) -# await asyncio.get_event_loop().run_in_executor( -# None, -# self._ssh.connect, -# self._hostname, -# self._port, -# self._username, -# self._password, -# ) - -# async def upload(self, local_file_path, remote_file_path): -# with paramiko.Transport((self._hostname, self._port)) as transport: -# await asyncio.get_event_loop().run_in_executor( -# None, transport.connect, None, self._username, self._password -# ) -# sftp = paramiko.SFTPClient.from_transport(transport) -# await asyncio.get_event_loop().run_in_executor( -# None, sftp.put, local_file_path, remote_file_path -# ) - -# async def delete(self, remote_file_path): -# with paramiko.Transport((self._hostname, self._port)) as transport: -# await asyncio.get_event_loop().run_in_executor( -# None, transport.connect, None, self._username, self._password -# ) -# sftp = paramiko.SFTPClient.from_transport(transport) -# await asyncio.get_event_loop().run_in_executor( -# None, sftp.remove, remote_file_path -# ) - -# async def read(self, remote_dir_path): -# with paramiko.Transport((self._hostname, self._port)) as transport: -# await asyncio.get_event_loop().run_in_executor( -# None, transport.connect, None, self._username, self._password -# ) -# sftp = paramiko.SFTPClient.from_transport(transport) -# return await asyncio.get_event_loop().run_in_executor( -# None, sftp.listdir, remote_dir_path -# ) - -# async def close(self): -# if self._ssh is not None: -# self._ssh.close() - - -# async def main(): -# ssh = SSHClient("example.com", 22, "username", "password") -# await ssh.connect() - -# await ssh.upload("/path/to/local/file", "/path/to/remote/file") -# await ssh.delete("/path/to/remote/file") -# files = await ssh.list("/path/to/remote/directory") -# print(files) - -# await ssh.close() - -# if __name__ == '__main__': -# asyncio.run(main()) - - -# async def remote( -# mode: str, host, user, password, local_path="", port=22, remote_path="" -# ): -# """mode:upload、read、del""" -# client = SSHClient(host, port, user, password) -# if mode == "upload": -# await client.upload(local_path, remote_path) -# elif mode == "read": -# file = await client.read(remote_path) -# return file -# elif mode == "del": -# await client.delete(remote_path) diff --git a/old_project/l4d2_queries/__init__.py b/old_project/l4d2_queries/__init__.py deleted file mode 100644 index 67e89c9..0000000 --- a/old_project/l4d2_queries/__init__.py +++ /dev/null @@ -1,317 +0,0 @@ -import json -from time import sleep -from typing import Dict, List, Tuple, Union - -from nonebot import on_command, on_keyword -from nonebot.adapters import Event, Message -from nonebot.adapters.onebot.v11 import GroupMessageEvent -from nonebot.log import logger -from nonebot.matcher import Matcher -from nonebot.params import ArgPlainText, CommandArg, CommandStart, Keyword, RawCommand -from nonebot_plugin_alconna.uniseg import UniMessage - -from ..l4d2_image import server_group_ip_pic -from ..l4d2_queries.qqgroup import add_ip, del_ip, get_number_url, show_ip -from ..l4d2_queries.utils import queries_server -from ..l4d2_server.rcon import command_server -from ..l4d2_utils.config import DATA_PATH, MASTER, driver, l4_config -from ..l4d2_utils.txt_to_img import mode_txt_to_img -from ..l4d2_utils.utils import split_maohao, str_to_picstr -from .local_ip import ALL_HOST -from .qqgroup import get_tan_jian -from .send_msg import get_group_ip_to_msg, get_ip_to_mes -from .utils import server_key - -tan_jian = on_command("tj", aliases={"探监"}, priority=20, block=True) -prison = on_command("zl", aliases={"坐牢"}, priority=20, block=True) -open_prison = on_command("kl", aliases={"开牢"}, priority=20, block=True) -rcon_to_server = on_command( - "rcon", - aliases={"求生服务器指令", "服务器指令"}, - permission=MASTER, -) - -# 查询 -queries_comm = on_keyword( - keywords={"queries", "求生ip", "求生IP", "connect"}, - priority=20, - block=True, -) -add_queries = on_command( - "addq", - aliases={"求生添加订阅"}, - priority=20, - block=True, - permission=MASTER, -) -del_queries = on_command( - "delq", - aliases={"求生取消订阅"}, - priority=20, - block=True, - permission=MASTER, -) -add2_queries = on_command( - "l4add", - aliases={"l4添加查询", "l4增加查询"}, - priority=20, - block=True, - permission=MASTER, -) -del2_queries = on_command( - "l4del", - aliases={"l4删除查询", "l4取消查询"}, - priority=20, - block=True, - permission=MASTER, -) -show_queries = on_command("showq", aliases={"求生订阅"}, priority=20, block=True) -join_server = on_command("ld_jr", aliases={"求生加入"}, priority=20, block=True) - -updata_himi = on_command( - "update_hime", - aliases={"公益服更新,l4公益服更新"}, - priority=10, - block=True, -) - - -async def get_des_ip(): - """初始化""" - - def count_ips(ip_dict: Dict[str, List[Dict[str, str]]]): - """输出加载ip""" - global ANNE_IP - for key, value in ip_dict.items(): - if key in ["error_", "success_"]: - ip_dict.pop(key) - break - count = len(value) - logger.info(f"已加载:{key} | {count}个") - if key == "云": - ANNE_IP = {key: value} - sleep(1) - - count_ips(ALL_HOST) - ip_anne_list = [] - try: - for one_tag in l4_config.l4_zl_tag: - ips = ALL_HOST[one_tag] - ip_anne_list: List[Tuple[str, str, str]] = [] - for one_ip in ips: - host, port = split_maohao(one_ip["ip"]) - ip_anne_list.append((one_ip["id"], host, port)) - except (KeyError, TypeError): - pass - await get_read_ip(ip_anne_list) - - @tan_jian.handle() - async def _(matcher: Matcher): - msg = await get_tan_jian(ip_anne_list, 1) - await str_to_picstr(push_msg=msg, matcher=matcher) - - @prison.handle() - async def _(matcher: Matcher): - msg = await get_tan_jian(ip_anne_list, 2) - await str_to_picstr(push_msg=msg, matcher=matcher) - - @open_prison.handle() - async def _(matcher: Matcher): - msg = await get_tan_jian(ip_anne_list, 3) - await str_to_picstr(push_msg=msg, matcher=matcher) - - -async def get_read_ip(ip_anne_list: List[Tuple[str, str, str]]): - get_ip = on_command("云", aliases=server_key(), priority=50, block=True) - if not ip_anne_list: - ... - - @get_ip.handle() - async def _( - matcher: Matcher, - start: str = CommandStart(), - command: str = RawCommand(), - args: Message = CommandArg(), - ): - """例: - 指令: /橘5 - start: /(command开头指令) - command: /橘(响应的全部指令) - args: 5(响应的指令后的数字) - """ - print(start, command, args) - if start: - command = command.replace(start, "") - if command == "anne": - command = "云" - msg: str = args.extract_plain_text() - if "组" in msg: - logger.info(f"关键词:{command}") - # 以群组模式输出 - push_msg = await get_group_ip_to_msg(command) - if push_msg is None or not push_msg: - await matcher.finish("当前对象里并没有组") - print(push_msg) - msg_img = await server_group_ip_pic(push_msg) - await UniMessage.image(raw=msg_img).send() - else: - push_msg = await get_ip_to_mes(msg, command) - if push_msg is None: - push_msg = "" - if isinstance(push_msg, str): - await matcher.finish(push_msg) - # await UniMessage.image(raw=push_msg).send() - await push_msg.finish() - - -# tests = on_command("测试1") - -# @tests.handle() -# async def _(event: Event,arg:Message=CommandArg()): -# logger.info(event) -# logger.info(arg.extract_plain_text()) - - -@add_queries.handle() -async def _(matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg()): - msg = args.extract_plain_text() - if len(msg) == 0: - await matcher.finish("请在该指令后加入参数,例如【114.51.49.19:1810】") - [host, port] = split_maohao(msg) - group_id = event.group_id - msg = await add_ip(group_id, host, port) - await matcher.finish(msg) - - -@del_queries.handle() -async def _(event: GroupMessageEvent, matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if not msg.isdigit(): - await matcher.finish("请输入正确的序号数字") - group_id = event.group_id - msg = await del_ip(group_id, msg) - await matcher.finish(msg) - - -@show_queries.handle() -async def _(matcher: Matcher, event: GroupMessageEvent): - group_id = event.group_id - msg = await show_ip(group_id) - if not msg: - await matcher.finish("当前没有启动的服务器捏") - if isinstance(msg, str): - await matcher.finish(msg) - else: - await UniMessage.image(raw=msg).finish() - - -@queries_comm.handle() -async def _(matcher: Matcher, event: Event, keyword: str = Keyword()): - msg = event.get_plaintext() - - if not msg: - await matcher.finish("ip格式如中括号内【127.0.0.1】【114.51.49.19:1810】") - ip = msg.split(keyword)[-1].split("\r")[0].split("\n")[0].split(" ") - one_msg = None - for one in ip: - if one and one[-1].isdigit(): - one_msg = one - break - if not one_msg: - await matcher.finish() - ip_list = split_maohao(one_msg) - msg = await queries_server(ip_list) - await str_to_picstr(msg, matcher, keyword) - - -@join_server.handle() -async def _(args: Message = CommandArg()): - msg = args.extract_plain_text() - url = await get_number_url(msg) - await join_server.finish(url) - - -@rcon_to_server.handle() -async def _(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text() - if msg: - matcher.set_arg("command", args) - - -@rcon_to_server.got("command", prompt="请输入向服务器发送的指令") -async def _(matcher: Matcher, tag: str = ArgPlainText("command")): - tag = tag.strip() - msg = await command_server(tag) - try: - await mode_txt_to_img("服务器返回", msg) - except Exception as E: - await matcher.finish(str(E), reply_message=True) - - -async def init(): - global matchers - # print('启动辣') - - await get_des_ip() - - -@driver.on_startup -async def _(): - await init() - - -@add2_queries.handle() -async def _(matcher: Matcher, arg: Message = CommandArg()): - arg_list = arg.extract_plain_text().split(" ") - if len(arg_list) == 2: - tag, ip = arg_list - file_name = f"{tag}.json" - file_path = DATA_PATH.resolve() / "l4d2" / file_name - with file_path.open(mode="r", encoding="utf-8") as f: - tag_data: Dict[str, List[Dict[str, Union[int, str]]]] = json.load(f) - num_list: List[str] = [] - for one_number in tag_data[tag]: - num_list.append(str(one_number["id"])) - add_num = 0 - for i in range(1, 100): - if str(i) not in num_list: - add_num = i - break - continue - elif len(arg_list) == 3: - tag, add_num, ip = arg_list - file_name = f"{tag}.json" - file_path = DATA_PATH.resolve() / "l4d2" / file_name - with file_path.open(mode="r", encoding="utf-8") as f: - tag_data: Dict[str, List[Dict[str, Union[int, str]]]] = json.load(f) - else: - await matcher.finish("参数不正确") - if add_num != 0: - tag_data[tag].append({"id": int(add_num), "ip": ip}) - with file_path.open(mode="w", encoding="utf-8") as f: - json.dump(tag_data, f, ensure_ascii=False) - await matcher.finish( - f"""成功添加 - 指令: {tag}{add_num} - ip: {ip} - """, - ) - await matcher.finish("参数不正确") - - -@del2_queries.handle() -async def _(matcher: Matcher, arg: Message = CommandArg()): - arg_list = arg.extract_plain_text().split(" ") - if len(arg_list) == 2: - tag, num = arg_list - file_name = f"{tag}.json" - file_path = DATA_PATH.resolve() / "l4d2" / file_name - with file_path.open(mode="r", encoding="utf-8") as f: - tag_data: Dict[str, List[Dict[str, Union[int, str]]]] = json.load(f) - for one_number in tag_data[tag]: - if str(one_number["id"]) == num: - tag_data[tag].remove(one_number) - with file_path.open(mode="w", encoding="utf-8") as f: - json.dump(tag_data, f, ensure_ascii=False) - await matcher.finish(f"成功删除指令指令: {tag}") - await matcher.finish("参数不正确") diff --git a/old_project/l4d2_queries/local_ip.py b/old_project/l4d2_queries/local_ip.py deleted file mode 100644 index 6c5c321..0000000 --- a/old_project/l4d2_queries/local_ip.py +++ /dev/null @@ -1,43 +0,0 @@ -import json -from pathlib import Path -from typing import Dict, List - -filename = Path("data/L4D2/l4d2.json") -global_file = Path(Path(__file__).parent.parent, filename) - - -def load_ip_json(): - # 本地模块 - local_host: Dict[str, List[Dict[str, str]]] = {} - try: - # 获取所有json文件的路径 - json_files = Path("data/L4D2/l4d2").glob("*.json") - - # 将所有json文件中的字典对象合并为一个字典 - for file_path in json_files: - try: - with file_path.open("r", encoding="utf-8") as f: - local_host.update(json.load(f)) - except Exception: - print("导入错误", file_path) - except Exception: - pass - return local_host - - -def load_group_json(): - try: - group_host: Dict[str, List[str]] = json.load( - filename.open("r", encoding="utf8"), - ) - except (IOError, FileNotFoundError): - filename.parent.mkdir(parents=True, exist_ok=True) - data: Dict[str, List[str]] = {"anne": ["云"]} - with filename.open("w") as f: - json.dump(data, f) - group_host: Dict[str, List[str]] = {} - return group_host - - -ALL_HOST: Dict[str, List[Dict[str, str]]] = load_ip_json() -Group_All_HOST: Dict[str, List[str]] = load_group_json() diff --git a/old_project/l4d2_queries/qqgroup.py b/old_project/l4d2_queries/qqgroup.py deleted file mode 100644 index 244f5b8..0000000 --- a/old_project/l4d2_queries/qqgroup.py +++ /dev/null @@ -1,368 +0,0 @@ -import asyncio -import random -from pathlib import Path -from typing import Dict, List, Tuple - -import a2s -import aiofiles -from nonebot.log import logger - -from ..l4d2_data.serverip import L4D2Server -from ..l4d2_image import server_ip_pic -from ..l4d2_utils.classcal import ServerStatus -from ..l4d2_utils.message import KAILAO, PRISON, QUEREN -from ..l4d2_utils.utils import split_maohao -from .local_ip import ALL_HOST -from .utils import ( - msg_ip_to_list, - player_queries, - player_queries_anne_dict, - queries, - queries_dict, -) - -try: - import ujson as json -except ImportError: - import json -si = L4D2Server() -errors = ( - ConnectionRefusedError, - ConnectionResetError, - asyncio.exceptions.TimeoutError, - OSError, -) -# errors = (TypeError,KeyError,ValueError,ConnectionResetError,TimeoutError) - - -async def get_qqgroup_ip_msg(qqgroup): - """首先,获取qq群订阅数据,再依次queries返回ip原标""" - return await si.query_server_ip(qqgroup) - - -async def bind_group_ip(group: int, host: str, port: int): - ip_list = await si.query_server_ip(group) - if (host, port) in ip_list: - return "本群已添加过该ip辣" - await si.bind_server_ip(group, host, port) - return "绑定成功喵,新增ip" + host - - -async def del_group_ip(group: int, number: int): - number = int(number) - logger.info(number) - try: - groups, host, port = await si.query_number(number) - except TypeError: - return "没有这个序号哦" - if groups != group: - return "本群可没有订阅过这个ip" - await si.del_server_ip(number) - return "取消成功喵,已删除序号" + str(number) - - -async def qq_ip_queries(msg: List[tuple]): - """输入一个ip的二元元组组成的列表,返回一个输出消息的列表 - 未来作图这里重置""" - messsage = "" - for i in msg: - number, qqgroup, host, port = i - msg2 = await player_queries(host, port) - msg1 = await queries(host, port) - messsage += f"序号、{number!s} \n{msg1}{msg2}--------------------\n" - return messsage - - -async def qq_ip_querie(msg: List[Tuple[str, str, int]], igr: bool = True): - msg_list: List[ServerStatus] = [] - tasks = [] # 用来保存异步任务 - if msg != []: - for i in msg: - try: - number: str - host: str - port: int - qqgroup: str = "" # 初始化为"" - if len(i) == 3: - number, host, port = i - else: - number, qqgroup, host, port = i # type: ignore - # 将异步任务添加到任务列表中 - tasks.append( - asyncio.create_task( - process_message( - number, - host, - port, - msg_list, - igr, - qqgroup, - ), - ), - ) - except ValueError: - continue # 处理异常情况 - # 等待所有异步任务完成 - await asyncio.gather(*tasks) - # 对msg_list按照number顺序排序 - # msg_list.sort(key=lambda x: x["number"]) - send_list = sorted(msg_list, key=lambda x: int(x.number)) - result = {"msg_list": send_list} - - else: - result: Dict[str, List[ServerStatus]] = {} - return result - - -async def qq_ip_queries_pic(msg: list, igr=False): - result = await qq_ip_querie(msg, igr) - if "msg_list" in result: - pic = await server_ip_pic(result["msg_list"]) - else: - pic = None - return pic - - -async def process_message( - number: str, - host: str, - port: int, - msg_list: List[ServerStatus], - igr: bool, - qqgroup: str = "", -): - try: - msg2 = await player_queries_anne_dict(host, port) - msg1 = await queries_dict(host, port) - msg3 = await server_rule_dict(host, port) - msg1.update( - { - "Players": msg2, - "number": number, - }, - ) - msg1.update(msg3) - if qqgroup: - msg1.update({"tag": qqgroup}) - msg_list.append(ServerStatus(**msg1)) - except errors: - if igr: - pass - else: - # 空白字典 - null_dict = ServerStatus(number=number) - msg_list.append(null_dict) - - -async def process_mode_1(host: str, port: int): - """探监逻辑""" - msg2 = await player_queries_anne_dict(host, port) - point = sum(int(i["Score"]) for i in msg2) - logger.info(point) - msg1 = await queries_dict(host, port) - sp = msg1["name"] - if "特" not in sp or "HT" in sp: - return None - - sps = int(sp.split("特")[0].split("[")[-1]) - points = point / 4 - if points / sps < 10: - return None - - msg1.update({"Players": msg2, "ranks": point, "ips": f"{host}:{port}"}) - return msg1 - - -async def process_mode_2(host: str, port: int): - """坐牢逻辑""" - msg1 = await queries_dict(host, port) - if "普通药役" in msg1["name"] and "缺人" in msg1["name"]: - msg2 = await player_queries_anne_dict(host, port) - msg1.update({"Players": msg2, "ips": f"{host}:{port}"}) - return msg1 - return None - - -async def process_mode_3(host: str, port: int): - """开牢逻辑""" - msg1 = await queries_dict(host, port) - if "[" not in msg1["name"]: - msg2 = await player_queries_anne_dict(host, port) - msg1.update({"Players": msg2, "ips": f"{host}:{port}"}) - return msg1 - return None - - -async def get_tan_jian(msg: List[Tuple[int, str, int]], mode: int): - """获取anne列表抽一个""" - msg_list = [] - random.shuffle(msg) - for _number, host, port in msg: - if mode == 1: - result = await process_mode_1(host, port) - if result: - msg_list.append(result) - elif mode == 2: - result = await process_mode_2(host, port) - if result: - msg_list.append(result) - elif mode == 3: - result = await process_mode_3(host, port) - if result: - msg_list.append(result) - if msg_list != []: - break - # 随机选一个牢房 - logger.info(msg_list) - if len(msg_list) == 0: - return "暂时没有这种牢房捏" - logger.info(len(msg_list)) - mse = msg_list[0] - message: str = "" - if mode == 1: - ranks = mse["ranks"] - if ranks <= 300: - message = random.choice(PRISON[1]) - if 300 < ranks <= 450: - message = random.choice(PRISON[2]) - if ranks > 450: - message = random.choice(PRISON[3]) - if mode == 2: - player_point = mse["players"] - if player_point == "1": - message = random.choice(QUEREN[1]) - elif player_point == "2": - message = random.choice(QUEREN[2]) - elif player_point == "3": - message = random.choice(QUEREN[3]) - else: - message = random.choice(QUEREN[4]) - if mode == 3: - message = random.choice(KAILAO) - message += f"\n名称:{mse['name']}\n" - message += f"地图:{mse['map_']} \n" - message += f"玩家:{mse['players']} / {mse['max_players']}\n" - try: - message += await msg_ip_to_list(mse["Players"]) - except KeyError: - message += "服务器里,是空空的呢\n" - return message - - -async def get_server_ip(number): - group, host, port = await si.query_number(number) - try: - return str(host) + ":" + str(port) - except TypeError: - return None - - -async def write_json(data_str: str): - """ - 添加数据或者删除数据 - - 【求生更新 添加 腐竹 ip 模式 序号】 - - 【求生更新 添加 腐竹 ip 模式】 - - 【求生更新 删除 腐竹 序号】 - """ - data_list = data_str.split() - logger.info(data_list) - if data_list[0] == "添加": - add_server = {} - server_dict = ALL_HOST.get(data_list[1], {}) - if not server_dict: - logger.info("新建分支") - ALL_HOST[data_list[1]] = [] - for key, value in ALL_HOST.items(): - if data_list[1] == key: - ids = [server["id"] for server in value] - # 序号 - if len(data_list) == 4: - data_num = int(max(ids, default=0)) + 1 - add_server.update({"id": data_num}) - elif len(data_list) == 5: - if not data_list[4].isdigit(): - return "请输入【求生更新 添加 腐竹 ip 模式 序号】" - data_num = int(data_list[4]) - if data_num in ids: - return "该序号已存在,请删除原序号【求生更新 删除 腐竹 序号】" - add_server.update({"id": data_num}) - else: - return "请输入【求生更新 添加 腐竹 ip 模式 序号】" - # 模式,ip - try: - host, port = split_maohao(data_list[2]) - add_server.update({"host": host, "port": port}) - except KeyError: - return "ip格式不正确【114.11.4.514:9191】" - add_server.update({"version": data_list[3]}) - value.append(add_server) - ALL_HOST[key] = value - with Path("data/L4D2/l4d2.json").open("w", encoding="utf8") as f_new: - json.dump(ALL_HOST, f_new, ensure_ascii=False, indent=4) - return f"添加成功,指令为{key}{data_num}" - return None - - if data_list[0] == "删除": - for key, value in ALL_HOST.items(): - if data_list[1] == key: - try: - data_num = int(data_list[2]) - except ValueError: - return "序号应该为大于0的正整数,请输入【求生更新 删除 腐竹 序号】" - for i, server in enumerate(value): - if data_num == server["id"]: - value.pop(i) - if not value: - ALL_HOST.pop(key) - async with aiofiles.open( - "data/L4D2/l4d2.json", - "w", - encoding="utf8", - ) as f_new: - await f_new.write( - json.dumps(ALL_HOST, ensure_ascii=False, indent=4), - ) - await f_new.flush() - return "删除成功喵" - return "序号不正确,请输入【求生更新 删除 腐竹 序号】" - return "腐竹名不存在,请输入【求生更新 删除 腐竹 序号】" - return None - - -async def add_ip(group_id, host, port): - """先查找是否存在,如果不存在则创建""" - return await bind_group_ip(group_id, host, port) - - -async def del_ip(group_id, number): - """删除群ip""" - return await del_group_ip(group_id, number) - - -async def show_ip(group_id): - """先查找群ip,再根据群ip返回""" - data_list = await get_qqgroup_ip_msg(group_id) - logger.info(data_list) - if len(data_list) == 0: - return "本群没有订阅" - return await qq_ip_queries_pic(data_list) - - -async def get_number_url(number): - ip = await get_server_ip(number) - if not ip: - return "该序号不存在" - return f"connect {ip}" - - -async def server_rule_dict(ip: str, port: int): - port = int(port) - ip = str(ip) - msg_dict = {} - # message_dict = await l4d(ip,port) - try: - msg: dict = await a2s.arules((ip, port)) # type: ignore - msg_dict["tick"] = msg["l4d2_tickrate_enabler"] + "tick" - except Exception: - msg_dict["tick"] = "" - return msg_dict diff --git a/old_project/l4d2_queries/send_msg.py b/old_project/l4d2_queries/send_msg.py deleted file mode 100644 index 6d6660c..0000000 --- a/old_project/l4d2_queries/send_msg.py +++ /dev/null @@ -1,135 +0,0 @@ -import asyncio -from typing import Dict, List, Optional, Tuple - -from nonebot.log import logger -from nonebot_plugin_alconna.uniseg import UniMessage - -from ..l4d2_queries.local_ip import ALL_HOST -from ..l4d2_queries.qqgroup import qq_ip_queries_pic -from ..l4d2_queries.utils import get_anne_server_ip, json_server_to_tag_dict -from ..l4d2_utils.classcal import ServerGroup, ServerStatus -from ..l4d2_utils.utils import split_maohao -from .local_ip import Group_All_HOST -from .qqgroup import qq_ip_querie - - -async def get_ip_to_mes(msg: str, command: str = ""): - if not msg: - # 以图片输出全部当前 - igr = False - # if command in gamemode_list: - # this_ips = [ - # d for l in ALL_HOST.values() for d in l if d.get("version") == command - # ] - # igr = True - # else: - this_ips = ALL_HOST[command] - ip_list: List[Tuple[str, str, str]] = [] - for one_ip in this_ips: - host, port = split_maohao(one_ip["ip"]) - msg_tuple = (one_ip["id"], host, port) - ip_list.append(msg_tuple) - img = await qq_ip_queries_pic(ip_list, igr) - return UniMessage.image(raw=img) if img else None - - if not msg[0].isdigit(): - # if any(mode in msg for mode in gamemode_list): - # pass - # else: - return None - message = await json_server_to_tag_dict(command, msg) - if len(message) == 0: - # 关键词不匹配,忽略 - return None - ip = str(message["ip"]) - logger.info(ip) - - try: - msg_send: Optional[str] = await get_anne_server_ip(ip) - if msg_send is not None: - return msg_send - - except (OSError, asyncio.exceptions.TimeoutError): - return "服务器无响应" - - -# async def get_read_group_ip(): -# """输出群组服务器""" -# get_grou_ip = on_command("anne", aliases=group_key(), priority=80, block=True) - -# @get_grou_ip.handle() -# async def _( -# matcher: Matcher, -# start: str = CommandStart(), -# command: str = RawCommand(), -# args: Message = CommandArg(), -# ): -# if start: -# command = command.replace(start, "") -# msg: str = args.extract_plain_text() -# push_msg = await get_group_ip_to_msg(msg, command) -# msg_img = await server_group_ip_pic(push_msg) -# if isinstance(push_msg, bytes): -# await MessageFactory([Image(push_msg)]).finish() -# elif msg and type(push_msg) == list: -# await MessageFactory([Image(push_msg[0]), Text(push_msg[-1])]).finish() -# elif msg and isinstance(push_msg, str): -# await str_to_picstr(push_msg, matcher) -# await matcher.finish() - - -async def get_group_ip_to_msg(command: str): - """输出群组ip的dict信息""" - if command in Group_All_HOST: - group_tag_list: List[str] = Group_All_HOST[command] - elif command in ALL_HOST: - group_tag_list = [command] - else: - return None - logger.info(f"组内关键词{group_tag_list}") - group_ip_dict: Dict[str, List[Dict[str, str]]] = {} - tag = len(group_tag_list) == 0 - return_list: List[ServerGroup] = [] - - for id_number, tag in enumerate(ALL_HOST): - one_group = ALL_HOST[tag] - if tag in group_tag_list and tag: - group_ip_dict.update({tag: one_group}) - ip_tuple_list: List[Tuple[str, str, int]] = [] - for one_server in one_group: - number = one_server["id"] - host, port = split_maohao(one_server["ip"]) - ip_tuple_list.append((number, host, int(port))) - msg_group_server = await qq_ip_querie(ip_tuple_list) - return_list.append( - await check_group_msg(msg_group_server, id_number, command), - ) - - return return_list - # 还没写完 - # host, port = split_maohao(one_ip["ip"]) - # msg_tuple = (one_ip["id"], host, port) - # ip_list.append(msg_tuple) - # img = await qq_ip_queries_pic(ip_list, igr) - - -async def check_group_msg( - msg: Dict[str, List[ServerStatus]], - number: int, - command: str, -): - server_info = ServerGroup() - server_info.server_id = number - - for server_group in msg["msg_list"]: - # 服务器,服务器玩家数量 - # 当前/总数 - server_info.server_tag = command - if server_group.name == "null": - server_info.server_all_number += 1 - continue - server_info.server_all_number += 1 - server_info.server_number += 1 - server_info.server_people += server_group.players - server_info.server_all_people += server_group.max_players - return server_info diff --git a/old_project/l4d2_queries/utils.py b/old_project/l4d2_queries/utils.py deleted file mode 100644 index 79933d8..0000000 --- a/old_project/l4d2_queries/utils.py +++ /dev/null @@ -1,212 +0,0 @@ -# -*- coding: utf-8 -*- -import random -import struct -from typing import List - -import a2s -from nonebot.log import logger -from pydantic import BaseModel - -from ..l4d2_utils.classcal import PlayerInfo -from ..l4d2_utils.config import l4_config -from ..l4d2_utils.txt_to_img import mode_txt_to_img -from ..l4d2_utils.utils import split_maohao -from .local_ip import ALL_HOST, Group_All_HOST - - -class GROUPMSG(BaseModel): - tag: str - online_server: int - empty_server: int - full_server: int - max_server: int - - online_player: int - max_player: int - - def __str__(self) -> str: - return f"""组:{self.tag} - 在线服务器:{self.online_server}/{self.max_server} - 空服务器:{self.empty_server}/{self.max_server} - 在线玩家数量:{self.online_player}/{self.max_player}""" - - -async def queries_server(msg: list) -> str: - """查询ip返回信息""" - ip = msg[0] - port = msg[1] - msgs = "" - try: - msgs = await queries(ip, port) - msgs += await player_queries(ip, port) - except (struct.error, TimeoutError): - pass - # except Exception: - # msgs = '有无法识别的用户名' - # return msgs - return msgs - - -async def get_anne_server_ip(ip: str, ismsg: bool = False): - """输出查询ip和ping""" - if ismsg: - ... - host, port = split_maohao(ip) - data = await queries_server([host, port]) - if l4_config.l4_image: - await mode_txt_to_img( - data.split("\n")[0], - data.replace(data.split("\n")[0], f"\nconnect {ip}"), - ) - return None - data += f"\nconnect {ip}" - return data - - -async def json_server_to_tag_dict(key: str, msg: str): - """ - l4d2字典转tag的dict结果 - - 1、先匹配腐竹 - - 2、匹配数字(几服),没有参数则从结果里随机返回一个 - """ - data_dict = {} - msg = msg.replace(" ", "") - # 腐竹循环 - for tag, value in ALL_HOST.items(): - value: List[dict] - if tag == key: - data_dict.update({"server": tag}) - if not msg: - # 腐竹 - data_dict.update(random.choice(value)) - elif msg.isdigit(): - logger.info("腐竹 + 序号") - for server in value: - if msg == str(server["id"]): - data_dict.update(server) - break - - return data_dict - - -async def player_queries_anne_dict(ip: str, port: int): - """anne算法返回玩家""" - port = int(port) - # message_dic = await l4d2.APlayer(ip,port,times=5) - message_list: List[a2s.Player] = await a2s.aplayers((ip, port)) # type: ignore - msg_list: List[PlayerInfo] = [] - if message_list != []: - for i in message_list: - msg_list.append( - PlayerInfo( - name=i.name, - Score=i.score, - Duration=await convert_duration(i.duration), - ), - # { - # "name": i.name, - # "Score": i.score, - # "Duration": await convert_duration(i.duration), - # }, - ) - return msg_list - - -async def player_queries(ip: str, port: int): - port = int(port) - message_list = await player_queries_anne_dict(ip, port) - return await msg_ip_to_list(message_list) - - -async def msg_ip_to_list(message_list: List[PlayerInfo]): - message = "" - n = 0 - if message_list == []: - message += "服务器里,是空空的呢\n" - else: - max_duration_len = max([len(str(i.Duration)) for i in message_list]) - max_score_len = max([len(str(i.Score)) for i in message_list]) - for i in message_list: - n += 1 - name = i.name - score = i.Score - if score == "0": - score = "摸" - duration = i.Duration - soc = "[{:>{}}]".format(score, max_score_len) - dur = "{:^{}}".format(duration, max_duration_len) - message += f"{soc} | {dur} | {name} \n" - return message - - -async def convert_duration(duration: float) -> str: - minutes, seconds = divmod(duration, 60) - hours, minutes = divmod(minutes, 60) - time_str = "" - if hours > 0: - time_str += f"{int(hours)}h " - if minutes > 0 or hours > 0: - time_str += f"{int(minutes)}m " - time_str += f"{int(seconds)}s" - return time_str.strip() - - -async def queries(ip: str, port: int): - port = int(port) - msg_dict = await queries_dict(ip, port) - message = f"名称:{msg_dict['name']}\n" - message += f"地图:{msg_dict['map_']}\n" - message += f"延迟:{msg_dict['ping']}\n" - message += f"玩家:{msg_dict['players']} / {msg_dict['max_players']}\n" - return message - - -async def queries_dict(ip: str, port: int) -> dict: - port = int(port) - ip = str(ip) - msg_dict = {} - # message_dict = await l4d(ip,port) - msg: a2s.SourceInfo = await a2s.ainfo((ip, port)) # type: ignore - msg_dict["folder"] = msg.folder - msg_dict["name"] = msg.server_name - msg_dict["map_"] = msg.map_name - msg_dict["players"] = msg.player_count - msg_dict["max_players"] = msg.max_players - msg_dict["rank_players"] = f"{msg.player_count}/{msg.max_players}" - msg_dict["ip"] = str(ip) + ":" + str(port) - msg_dict["ping"] = f"{msg.ping * 1000:.0f}ms" - msg_dict["system"] = f"{msg.platform}.svg" - if msg_dict["players"] < msg_dict["max_players"]: - msg_dict["enabled"] = True - else: - msg_dict["enabled"] = False - return msg_dict - - -def server_key(): - """响应的服务器开头""" - a = set() - for tag1, _value in ALL_HOST.items(): - try: - a.add(tag1) - except AttributeError: - a.add("希腊那我从来没有想过这个事情") - return a - - -def group_key(): - """响应群组服务器开头""" - a = set() - for tag1, _value in Group_All_HOST.items(): - try: - a.add(tag1) - except AttributeError: - a.add("希腊那我从来没有想过这个事情") - return a - - -async def auto_id(msg_list: List[int]): - for one in range(1, 100): - if one not in msg_list: - return one - return 0 diff --git a/old_project/l4d2_server/__init__.py b/old_project/l4d2_server/__init__.py deleted file mode 100644 index edcac19..0000000 --- a/old_project/l4d2_server/__init__.py +++ /dev/null @@ -1,119 +0,0 @@ -from typing import List, Union - -from nonebot import on_command -from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Message -from nonebot.log import logger -from nonebot.matcher import Matcher -from nonebot.params import Arg, ArgPlainText, CommandArg -from nonebot.typing import T_State -from nonebot_plugin_alconna.uniseg import UniMessage - -from ..l4d2_file.utils import all_zip_to_one -from ..l4d2_image.vtfs import img_to_vtf -from ..l4d2_utils.steam import url_to_byte -from ..l4d2_utils.utils import upload_file -from .workshop import workshop_msg - -# 下载内容 -up_workshop = on_command( - "workshop", - aliases={"创意工坊下载", "求生创意工坊"}, - priority=20, - block=True, -) -vtf_make = on_command("vtf_make", aliases={"求生喷漆"}, priority=20, block=True) - - -@up_workshop.handle() -async def _(matcher: Matcher, args: Message = CommandArg()): - msg = args.extract_plain_text().strip() - if msg: - matcher.set_arg("ip", args) - - -@up_workshop.got("ip", prompt="请输入创意工坊网址或者物品id") -async def _(matcher: Matcher, state: T_State, tag: str = ArgPlainText("ip")): - # 这一部分注释类型有大问题,反正能跑就不改了 - msg = await workshop_msg(tag) - if not msg: - await matcher.finish("没有这个物品捏") - elif isinstance(msg, dict): - pic = await url_to_byte(msg["图片地址"]) - if not pic: - return - message: str = "" - for item, value in msg.items(): - if item in ["图片地址", "下载地址", "细节"] or not isinstance(item, str): - continue - message += f"{item} : {value}\n" - message += "如果需要上传,请发送 'yes'" - state["dic"] = msg - await UniMessage.text(message).image(raw=pic).finish() - - elif isinstance(msg, list): - lenge = len(msg) - pic = await url_to_byte(msg[0]["图片地址"]) # type: ignore - message: str = f"有{lenge}个文件\n" - ones = [] - for one in msg: - for item, value in one.items(): - if item in ["图片地址", "下载地址", "细节"]: - continue - message += f"{item}:{value}\n" - ones.append(one) - state["dic"] = ones - - -@up_workshop.got("is_sure") -async def _(matcher: Matcher, bot: Bot, event: GroupMessageEvent, state: T_State): - is_sure = str(state["is_sure"]) - if is_sure == "yes": - data_dict: Union[dict, List[dict]] = state["dic"] - logger.info("开始上传") - if isinstance(data_dict, dict): - data_file = await url_to_byte(data_dict["下载地址"]) - if not data_file: - return - file_name = data_dict["名字"] + ".vpk" - await matcher.send("获取地址成功,尝试上传") - await upload_file(bot, event, data_file, file_name) - else: - data_file_list = [] - for data_one in data_dict: - data_file = await url_to_byte(data_one["下载地址"]) - data_file_list.append(data_file) - if not data_file: - return - file_name = data_one["名字"] + ".vpk" - await all_zip_to_one(data_file_list) - await upload_file(bot, event, data_file, file_name) - else: - await matcher.finish("已取消上传") - - -@vtf_make.handle() -async def _(matcher: Matcher, state: T_State, args: Message = CommandArg()): - msg: str = args.extract_plain_text() - if msg not in ["拉伸", "填充", "覆盖", ""]: - await matcher.finish("错误的图片处理方式") - if msg == "": - msg = "拉伸" - state["way"] = msg - logger.info("方式", msg) - - -@vtf_make.got("image", prompt="请发送喷漆图片") -async def _(bot: Bot, event: GroupMessageEvent, state: T_State, tag=Arg("image")): - pic_msg: GroupMessageEvent = state["image"][0] - pic_url = pic_msg.dict()["data"]["url"] - logger.info(pic_url) - logger.info(type(pic_url)) - tag = state["way"] - pic_bytes = await url_to_byte(pic_url) - if not pic_bytes: - return - img_io = await img_to_vtf(pic_bytes, tag) - img_bytes = img_io.getbuffer() - usr_id = event.get_user_id() - file_name: str = str(usr_id) + ".vtf" - await upload_file(bot, event, img_bytes, file_name) diff --git a/old_project/l4d2_server/index.py b/old_project/l4d2_server/index.py deleted file mode 100644 index e69de29..0000000 diff --git a/old_project/l4d2_server/rcon.py b/old_project/l4d2_server/rcon.py deleted file mode 100644 index dad1dd3..0000000 --- a/old_project/l4d2_server/rcon.py +++ /dev/null @@ -1,53 +0,0 @@ -import asyncio -from pathlib import Path - -import aiofiles -from rcon.source import rcon - -from ..l4d2_utils.config import CHECK_FILE, l4_config - - -async def rcon_server(password: str, msg: str): - try: - return await asyncio.wait_for( - rcon( - command=msg, - host=l4_config.l4_ipall[CHECK_FILE]["host"], - port=l4_config.l4_ipall[CHECK_FILE]["port"], - passwd=password, - ), - timeout=30, - ) - except asyncio.TimeoutError: - return "超时" - - -async def read_server_cfg_rcon(): - """如果没有输入rcon,尝试自动获取""" - if not l4_config.l4_ipall[CHECK_FILE]["rcon"]: - cfg_server = Path( - l4_config.l4_ipall[CHECK_FILE]["location"], - "left4dead2/cfg/server.cfg", - ) - async with aiofiles.open(cfg_server, "r") as cfg: - content: str = await cfg.read() - lines = content.split("\n") - for line in lines: - if line.startswith("rcon_password"): - password = line.split(" ")[-1] - return password.strip('"') - return l4_config.l4_ipall[CHECK_FILE]["rcon"] - - -async def rcon_command(rcon, cmd): - return await rcon_server(rcon, cmd.strip()) - - -async def command_server(msg: str): - rcon = await read_server_cfg_rcon() - msg = await rcon_command(rcon, msg) - if not msg: - msg = "你可能发送了一个无用指令,或者换图导致服务器无响应" - elif msg.startswith("Unknown command"): - msg = "无效指令:" + msg.replace("Unknown command", "").strip() - return msg.strip().replace("\n", "") diff --git a/old_project/l4d2_server/workshop.py b/old_project/l4d2_server/workshop.py deleted file mode 100644 index 22ffda6..0000000 --- a/old_project/l4d2_server/workshop.py +++ /dev/null @@ -1,80 +0,0 @@ -from typing import Dict, List, Union - -import httpx -from nonebot.log import logger - -try: - import ujson as json -except ImportError: - import json - - -async def workshop_to_dict(msg: str): - """把创意工坊的id,转化为信息字典""" - i = await api_get_json(msg) - - # 处理是否是多地图文件 - if i["file_url"] == i["preview_url"]: - return await primary_map(i) # type: ignore - return await only_map(i) - - -async def api_get_json(msg: str): - url_search = "https://db.steamworkshopdownloader.io/prod/api/details/file" - # data = {msg: ""} - data = [int(msg)] - headers = { - "Accept": "application/json, text/plain, */*", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", - "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", - "Origin": "https://steamworkshopdownloader.io", - "Referer": "https://steamworkshopdownloader.io/", - "Sec-Ch-Ua": '"Not/A)Brand";v="99", "Microsoft Edge";v="115", "Chromium";v="115"', - "Sec-Ch-Ua-Mobile": "?0", - "Sec-Ch-Ua-Platform": '"Windows"', - "Sec-Fetch-Dest": "empty", - "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Site": "same-site", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.188", - } - async with httpx.AsyncClient() as client: - response = await client.post(url_search, headers=headers, json=data) - data_msg = response.content.decode("utf-8", errors="ignore") - logger.info(response.status_code) - logger.info(data_msg) - return json.loads(data_msg[1:-1]) - - -async def only_map(i: Dict[str, str]): - """单地图下载""" - out: Dict[str, str] = {} - out["名字"] = i["title"] - out["游戏"] = i["app_name"] - out["下载地址"] = i["file_url"] - out["图片地址"] = i["preview_url"] - out["细节"] = i["file_description"] - return out - - -async def primary_map(i: Dict[str, List[Dict[str, str]]]): - """主地图返回多地图参数""" - map_list: List[Union[Dict[str, List[Dict[str, str]]], Dict[str, str]]] = [] - map_list.append(i) - for one in i["children"]: - map_list.append(await api_get_json(one["publishedfileid"])) - return map_list - - -async def workshop_msg(msg: str): - """url变成id,拼接post请求""" - if msg.startswith("https://steamcommunity.com/sharedfiles/filedetails/?id"): - if "&" in msg: - msg = msg.split("&")[0] - else: - pass - msg = msg.replace("https://steamcommunity.com/sharedfiles/filedetails/?id=", "") - if msg.isdigit(): - data: Union[dict, List[dict]] = await workshop_to_dict(msg) - return data - return None diff --git a/old_project/l4d2_utils/classcal.py b/old_project/l4d2_utils/classcal.py deleted file mode 100644 index 0e0b902..0000000 --- a/old_project/l4d2_utils/classcal.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import List - -from pydantic import BaseModel - - -class PlayerInfo(BaseModel): - """读取玩家信息""" - - name: str = "" - Score: int = 0 - Duration: str = "" - - -class ServerStatus(BaseModel): - """单服务器查询信息""" - - name: str = "null" - """服务器名称""" - map_: str = "null" - """服务器地图""" - players: int = 0 - """当前玩家数量""" - Players: List[PlayerInfo] = [] - """当前玩家信息""" - max_players: int = 0 - """最大玩家数量""" - rank_players: str = "null" - """玩家数量对比""" - ping: str = "null" - """服务器延迟""" - number: str = "null" - """服务器序号""" - ip: str = "null" - """服务器ip""" - system: str = "m.svg" - """服务器系统,["l","w","m"]""" - - -class ServerGroup(BaseModel): - """组服务器信息""" - - server_id: int = 0 - """服务器序号""" - server_tag: str = "服务器" - """服务器组名字""" - server_number: int = 0 - """群组当期启动服务器数量""" - server_all_number: int = 0 - """群组总服务器数量""" - server_people: int = 0 - """群组当前在线人数""" - server_all_people: int = 0 - """群组当前总容纳人数""" diff --git a/old_project/l4d2_utils/command.py b/old_project/l4d2_utils/command.py deleted file mode 100644 index 2a6f407..0000000 --- a/old_project/l4d2_utils/command.py +++ /dev/null @@ -1,23 +0,0 @@ -from nonebot import on_command - -from .config import MASTER - -help_ = on_command("l4_help", aliases={"求生帮助"}, priority=20, block=True) - - -connect_rcon = on_command( - "Rrcon", - aliases={"求生连接", "求生链接", "求生rcon"}, - priority=50, - block=False, -) -end_connect = ["stop", "结束", "连接结束", "结束连接"] -search_api = on_command( - "search", - aliases={"求生三方"}, - priority=20, - block=True, - permission=MASTER, -) -# which_map = on_keyword("是什么图"), priority=20, block=False) -reload_ip = on_command("l4_reload", aliases={"重载ip"}, priority=30, permission=MASTER) diff --git a/old_project/l4d2_utils/config.py b/old_project/l4d2_utils/config.py deleted file mode 100644 index a4fa5e2..0000000 --- a/old_project/l4d2_utils/config.py +++ /dev/null @@ -1,203 +0,0 @@ -import platform -from pathlib import Path -from typing import Any, Dict, List - -from nonebot import get_driver -from nonebot.adapters.onebot.v11.permission import ( - GROUP_ADMIN, - GROUP_OWNER, - PRIVATE_FRIEND, -) -from nonebot.permission import SUPERUSER -from pydantic import BaseModel, Field -from ruamel import yaml - -file_format = (".vpk", ".zip", ".7z", "rar") -# 权限 - -driver = get_driver() -COMMAND_START = list(driver.config.command_start) - -headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:107.0) Gecko/20100101 Firefox/107.0", # noqa: E501 -} - -try: - NICKNAME: str = next(iter(driver.config.nickname)) -except Exception: - NICKNAME = "bot" -CHECK_FILE: int = 0 - - -re_master = SUPERUSER | GROUP_OWNER -MASTER = SUPERUSER | GROUP_ADMIN | GROUP_OWNER -ADMINISTRATOR = SUPERUSER | GROUP_ADMIN | GROUP_OWNER | PRIVATE_FRIEND -# file 填写求生服务器所在路径 - -DATA_PATH = Path() / "data" / "L4D2" -CONFIG_PATH = DATA_PATH / "l4d2.yml" -CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) - - -class L4d2GroupConfig(BaseModel): - enable: bool = Field(True, alias="是否启用求生功能") - map_master: List[str] = Field([], alias="分群地图管理员") - - def update(self, **kwargs): - for key, value in kwargs.items(): - if key in self.__fields__: - self.__setattr__(key, value) - - -class L4d2Config(BaseModel): - total_enable: bool = Field(True, alias="是否全局启用求生功能") - l4_image: bool = Field(False, alias="是否启用图片") - web_username: str = Field("l4d2", alias="后台管理用户名") - web_password: str = Field("admin", alias="后台管理密码") - l4_style: str = Field("standard", alias="图片风格") - l4_img_name: int = Field(4, alias="图片显示玩家数量") - l4_ipall: List[Dict[str, Any]] = Field( - [ - { - "id_rank": "1", - "place": False, - "location": "C:\\l4d2", - "host": "127.0.0.1", - "port": "20715", - "rcon": "114514", - "server_id": "本地地图", - }, - { - "id_rank": "2", - "place": True, - "location": "/home/unbuntu/coop", - "host": "11.4.51.4", - "port": "20715", - "rcon": "9191810", - "account": "root", - "password": "114514", - "server_id": "远程地图", - }, - ], - alias="l4服务器ip集合", - ) - l4_zl_tag: List[str] = Field(["云"], alias="坐牢三指令") - l4_number: int = Field(1, alias="第几个地图路径") - web_secret_key: str = Field( - "49c294d32f69b732ef6447c18379451ce1738922a75cd1d4812ef150318a2ed0", - alias="后台管理token密钥", - ) - l4_master: List[str] = Field(default=["114514919"], alias="求生地图全局管理员qq") - # l4_ip:bool = Field(False, alias='查询地图是否显示ip') - l4_font: str = Field(default="simsun.ttc", alias="字体") - l4_only: bool = Field(default=False, alias="下载地图是是否阻碍其他指令") - l4_push_interval: int = Field(default=3, alias="定时任务间隔") - l4_push_times: int = Field(default=10, alias="定时任务次数") - l4_connect: bool = Field(default=True, alias="是否在查服命令后加入connect ip") - l4_group_upload: bool = Field( - default=False, - alias="是否在群里传地图的时候,提示上传服务器", - ) - group_config: Dict[int, L4d2GroupConfig] = Field( - default_factory=dict, - alias="分群配置", - ) - - def update(self, **kwargs): - for key, value in kwargs.items(): - if key in self.__fields__: - self.__setattr__(key, value) - - -class L4d2ConfigManager: - def __init__(self): - self.file_path = CONFIG_PATH - if self.file_path.exists(): - self.config = L4d2Config.parse_obj( - yaml.load( - self.file_path.read_text(encoding="utf-8"), - Loader=yaml.Loader, - ), - ) - else: - self.config = L4d2Config() # type: ignore - self.save() - - def get_group_config(self, group_id: int) -> L4d2GroupConfig: - if group_id not in self.config.group_config: - self.config.group_config[group_id] = L4d2GroupConfig() # type: ignore - self.save() - return self.config.group_config[group_id] - - @property - def config_list(self) -> List[str]: - return list(self.config.dict(by_alias=True).keys()) - - def save(self): - with self.file_path.open("w", encoding="utf-8") as f: - yaml.dump( - self.config.dict(by_alias=True), - f, - indent=2, - Dumper=yaml.RoundTripDumper, - allow_unicode=True, - ) - - -# 参数设置为全局变量 -global config_manager, l4_config -config_manager = L4d2ConfigManager() -l4_config = config_manager.config - - -class UserModel(BaseModel): - username: str - password: str - - -""" -地图路径 -""" -vpk_path = "left4dead2/addons" - - -PLAYERSDATA = Path() / "data/L4D2/image/players" -"""用户数据路径""" -TEXT_PATH = Path(__file__).parent.parent / "data/L4D2/image" -"""图片存储路径""" -TEXT_XPATH = Path() / "data/L4D2/image" -"""内置图片路径""" - - -PLAYERSDATA = Path() / "data/L4D2/sql" -"""数据库路径""" -DATASQLITE = Path().parent.parent / "data/L4D2/sql/L4D2.db" -"""数据库!""" - -table_data = ["L4d2_players", "L4D2_server"] -"""数据库表""" -L4d2_players_tag = ["qq", "nickname", "steamid"] -"""数据库表1""" -L4d2_server_tag = ["id", "qqgroup", "host", "port", "rcon"] -"""数据库表2""" -L4d2_INTEGER = ["id", "qq", "qqgroup", "port"] -"""INITEGER的表头""" -L4d2_TEXT = ["nickname", "steamid", "host", "rcon", "path"] -"""TEXT的表头""" -L4d2_BOOLEAN = ["use"] -"""BOOLEAN的表头""" - -tables_columns = {table_data[0]: L4d2_players_tag, table_data[1]: L4d2_server_tag} - -# 求生anne服务器 -anne_url = "https://sb.trygek.com/" - - -# 系统 -if platform.system() == "Windows": - systems: str = "win" -elif platform.system() == "Linux": - systems: str = "linux" -else: - systems: str = "others" -ANNE_IP: Dict[str, List[Dict[str, str]]] = {} diff --git a/old_project/l4d2_utils/message.py b/old_project/l4d2_utils/message.py deleted file mode 100644 index 374d400..0000000 --- a/old_project/l4d2_utils/message.py +++ /dev/null @@ -1,59 +0,0 @@ -# 坐牢 -PRISON = { - 1: [ - "⭐低级牢房", - "⭐牢动最光荣", - "⭐能者多牢", - "⭐年年有牢", - ], - 2: [ - "⭐⭐中级牢房", - "⭐⭐牢不可破", - "⭐⭐牢不可破联盟", - "⭐⭐牢底坐穿联盟", - "⭐⭐牢饭恰饱联盟", - "⭐⭐个个都是人才说话又好听", - "⭐⭐来点狱卒", - ], - 3: [ - "⭐⭐⭐曼德拉的前半生", - "⭐⭐⭐肖申克的救赎", - "⭐⭐⭐甲级战犯", - "⭐⭐⭐刘培强正在尝试逃离冷冻仓", - ], -} - -QUEREN = { - 1: [ - "📞📞1Q3速来坐牢", - "📞📞📞发起了anne坐牢", - "📞📞📞摇人摇人,一个人玩个锤子", - "📞📞📞1=3速来坐牢", - "📞📞📞差三个速来", - ], - 2: [ - "📞📞2Q2速来坐牢", - "📞📞2=2这牢十分的珍贵,让同志们先做", - "📞📞牢房过半,速速人牢", - "📞📞2Q2有大佬带逛街速来", - "📞📞2=2速来坐牢", - ], - 3: [ - "📞牢房三缺一,速来", - "📞3Q1来个大佬带逛街", - "📞3Q1就等你了", - "📞别来他们三个还没一个人分高", - ], - 4: [ - "📞有人旁观,速来", - "📞有人越狱,呼叫支援", - "📞别让牢房成为寂寞", - "📞有人还没进,希望你网络比他好", - ], -} -KAILAO = [ - "🏠又来坐牢了?", - "🏠已经变成牢房的样子了!", - "🏠是谁在呼唤牢房?", - "🏠牢房青云阁雅座四位", -] diff --git a/old_project/l4d2_utils/rule.py b/old_project/l4d2_utils/rule.py deleted file mode 100644 index 7671ae0..0000000 --- a/old_project/l4d2_utils/rule.py +++ /dev/null @@ -1,35 +0,0 @@ -from nonebot.adapters.onebot.v11 import Message, NoticeEvent -from nonebot.params import CommandArg, Depends -from nonebot.rule import Rule - -from .config import file_format, l4_config - - -async def full_command(arg: Message = CommandArg()) -> bool: - return not bool(str(arg)) - - -def FullCommand() -> Rule: # noqa: N802 - return Rule(full_command) - - -def FullCommandDepend(): # noqa: N802 - return Depends(full_command) - - -def wenjian(event: NoticeEvent): - args = event.dict() - try: - name: str = args["file"]["name"] - usr_id = str(args["user_id"]) - except KeyError: - return False - if args["notice_type"] == "offline_file": - if l4_config.l4_master: - return name.endswith(file_format) and usr_id in l4_config.l4_master - return name.endswith(file_format) - if args["notice_type"] == "group_upload": - if l4_config.l4_master: - return usr_id in l4_config.l4_master and name.endswith(file_format) - return False - return False diff --git a/old_project/l4d2_utils/seach.py b/old_project/l4d2_utils/seach.py deleted file mode 100644 index feb6200..0000000 --- a/old_project/l4d2_utils/seach.py +++ /dev/null @@ -1,43 +0,0 @@ -import httpx -from bs4 import BeautifulSoup - - -def anne_search(name: str): - """输入名字返回列表[""" - url = "https://sb.trygek.com/l4d_stats/ranking/search.php" - data = {"search": name} - headers = {"Content-Type": "application/x-www-form-urlencoded"} - data = httpx.post(url, data=data, headers=headers, timeout=60).content.decode( - "utf-8", - ) - soup = BeautifulSoup(data, "html.parser") - # 获取标题 - title = [] - thead = soup.find("thead") - if not thead: - return None - for i in thead.find_all("td"): # type: ignore - tag = i.text.strip() - title.append(tag) - title.append("steamid") - # 角色信息 - datas_table = soup.find("table") - if not datas_table: - return None - datas_tbody = datas_table.find("tbody") - if not datas_tbody: - return None - datas = datas_tbody.find_all("tr") # type: ignore - return [datas, title] - - -def name_steamid_html(name): - """您称通过网页来返回求生steamid""" - data_title = anne_search(name) - if not data_title: - return None - data = data_title[0] - for i in data: - onclick: str = i["onclick"] - return onclick.split("=")[2].strip("'") - return None diff --git a/old_project/l4d2_utils/steam.py b/old_project/l4d2_utils/steam.py deleted file mode 100644 index e69de29..0000000 diff --git a/old_project/l4d2_utils/txt_to_img.py b/old_project/l4d2_utils/txt_to_img.py deleted file mode 100644 index b3a4f8d..0000000 --- a/old_project/l4d2_utils/txt_to_img.py +++ /dev/null @@ -1,32 +0,0 @@ -import base64 -from typing import Optional - -from nonebot_plugin_saa import Image, MessageFactory, Text -from nonebot_plugin_txt2img import Txt2Img - -from .config import l4_config - -l4_font = l4_config.l4_font -"""直接超的智障回复""" - - -async def mode_txt_to_img( - title: str, - text: str, - ex_text: Optional[str] = None, - font_size: int = 32, - ex_msg: Optional[str] = None, -): - """文字转图片,如果有额外的信息则构造图文""" - txt2img = Txt2Img() - txt2img.set_font_size(font_size) - pic = txt2img.draw(title, text) - pic = base64.b64decode(pic) - if ex_text: - ex_msg = ex_text - if not ex_msg: - msg = MessageFactory([Image(pic)]) - else: - msg = MessageFactory([Image(pic), Text(ex_msg)]) - await msg.finish() - return diff --git a/old_project/l4d2_utils/utils.py b/old_project/l4d2_utils/utils.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyproject.toml b/pyproject.toml index d6f0875..e1c728a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "nonebot-plugin-l4d2-server" -version = "1.0.0a2" +version = "1.0.0b1" description = "L4D2 server related operations plugin for NoneBot2" authors = [{ name = "Agnes_Digital", email = "Z735803792@163.com" }] requires-python = ">=3.9,<4.0" @@ -45,3 +45,50 @@ includes = [] [build-system] requires = ["pdm-backend"] build-backend = "pdm.backend" + + +[tool.ruff] +ignore = [ + "B008", + "B905", + "E501", + "N999", + "FBT002", + "PGH003", + "RUF001", + "RUF002", + "RUF003", + "RUF006", + "RUF100", + "SIM117", + "TRY002", + "TRY003", + "FBT001", +] +select = [ + "A", + "ARG", + "ASYNC", + "B", + "C4", + "COM", + "E", + "F", + "FBT", + "FLY", + "I", + "ISC", + "N", + "PIE", + "PGH", + "PTH", + "PYI", + "Q", + "RET", + "RSE", + "RUF", + "SIM", + "SLF", + "SLOT", + "TRY", +]