diff --git a/core.py b/core.py index 56e70b2..55a0379 100644 --- a/core.py +++ b/core.py @@ -57,6 +57,8 @@ ('japan', 'tokyo'): 'Japan', } +execution_start_dt = dt.datetime.now() + UNUSED_FIELDS = ['csgo_client_version', 'csgo_server_version', 'csgo_patch_version', @@ -182,7 +184,7 @@ async def update_cache_info(): except Exception: logging.exception('Caught exception in the main thread!') - + @scheduler.scheduled_job('cron', hour=execution_cron_hour, minute=execution_cron_minute) async def unique_monthly(): # noinspection PyBroadException diff --git a/functions/info_formatters.py b/functions/info_formatters.py index 8094177..215703f 100644 --- a/functions/info_formatters.py +++ b/functions/info_formatters.py @@ -75,10 +75,6 @@ def format_server_status(data: ServerStatusData, locale: Locale) -> str: game_servers_dt = f'{format_datetime(game_servers_dt, "HH:mm:ss, dd MMM", locale=lang_code).title()} (UTC)' text = ( - f'
Currently we\'re unable to detect CS2\'s game coordinator status, ' - f'because one of our dependencies is broken. Other trackers are working fine. ' - f'Please accept our apologies.' - f'\n\n' f'{locale.game_status_text.format(tick, *states)}' f'\n\n' f'{locale.latest_data_update.format(game_servers_dt)}' diff --git a/game_coordinator.py b/game_coordinator.py index 6060d1c..6a80814 100644 --- a/game_coordinator.py +++ b/game_coordinator.py @@ -4,13 +4,17 @@ import logging from pathlib import Path import platform +import sys import time from zoneinfo import ZoneInfo from apscheduler.schedulers.asyncio import AsyncIOScheduler -from steam import App -from steam.ext.csgo import Client as CS2GCClient -from steam.ext.csgo.protobufs.sdk import GcConnectionStatus +from apscheduler.schedulers.gevent import GeventScheduler +from csgo.client import CSGOClient +from pyrogram import Client, idle +from steam.client import SteamClient +from steam.enums import EResult + if platform.system() == 'Linux': # noinspection PyPackageRequirements @@ -19,176 +23,236 @@ uvloop.install() import config -from functions import utime -from utypes import GameVersion, States, SteamWebAPI +from functions import locale, utime +from utypes import GameVersion, States VALVE_TIMEZONE = ZoneInfo('America/Los_Angeles') +loc = locale('ru') -logging.basicConfig(format='%(asctime)s | %(name)s: %(message)s', - datefmt='%H:%M:%S — %d/%m/%Y', - force=True) +available_alerts = {'public_build_id': loc.notifs_build_public, + 'dpr_build_id': loc.notifs_build_dpr, + 'dprp_build_id': loc.notifs_build_dprp, + 'dpr_build_sync_id': f'{loc.notifs_build_dpr} 🔃', + 'dprp_build_sync_id': f'{loc.notifs_build_dprp} 🔃', + 'cs2_app_changenumber': loc.notifs_build_cs2_client, + 'cs2_server_changenumber': loc.notifs_build_cs2_server} -logger = logging.getLogger(f'{config.BOT_NAME}.GCCollector') -logger.setLevel(logging.INFO) +logging.basicConfig(level=logging.INFO, + format='%(asctime)s | GC: %(message)s', + datefmt='%H:%M:%S — %d/%m/%Y') +bot = Client(config.BOT_GC_MODULE_NAME, + api_id=config.API_ID, + api_hash=config.API_HASH, + bot_token=config.BOT_TOKEN, + no_updates=True, + workdir=config.SESS_FOLDER) +client = SteamClient() +client.set_credential_location(config.STEAM_CREDS_PATH) +cs = CSGOClient(client) +gevent_scheduler = GeventScheduler() +async_scheduler = AsyncIOScheduler() -api = SteamWebAPI(config.STEAM_API_KEY) +@client.on('error') +def handle_error(result): + logging.info(f'Logon result: {result!r}') -class GCCollector(CS2GCClient): - APPS_TO_FETCH = App(id=730), App(id=2275500), App(id=2275530) # the last two apps don't get fetched - cache: dict[str, ...] +@client.on('channel_secured') +def send_relogin(): + if client.relogin_available: + client.relogin() - def __init__(self, cache_file_path: Path, **kwargs): - super().__init__(**kwargs) - self.cache_file_path = cache_file_path - self.load_cache() +@client.on('connected') +def log_connect(): + logging.info(f'Connected to {client.current_server_addr}') - self.scheduler = AsyncIOScheduler() - self.scheduler.add_job(self.update_depots, 'interval', seconds=45) - # self.scheduler.add_job(self.update_players_count, 'interval', seconds=45) # for GC requesting which doesn't work for now - self.scheduler.add_job(self.update_players_count_alter, 'interval', minutes=2) # currently use WebAPI as an alternative - async def login(self, *args, **kwargs): - logger.info('Logging in...') - await super().login(*args, **kwargs) +@client.on('reconnect') +def handle_reconnect(delay): + logging.info(f'Reconnect in {delay}s...') - async def on_ready(self): - logger.info('Logged in successfully.') - async def on_disconnect(self): - logger.info('Disconnected.') - self.scheduler.pause() +@client.on('disconnected') +def handle_disconnect(): + logging.info('Disconnected.') - logger.info('Reconnecting...') - await self.login(username=config.STEAM_USERNAME, password=config.STEAM_PASS) - result = self.is_ready() + if client.relogin_available: + logging.info('Reconnecting...') + client.reconnect(maxdelay=30) # todo: could be broken - needs to be tested somehow - logger.info('Reconnected successfully.' if result else 'Failed to reconnect.') - if result: - self.scheduler.resume() + # sys.exit() - async def on_gc_ready(self): - logger.info('CS launched.') - self.scheduler.start() - async def on_gc_status_change(self, status: GcConnectionStatus): # currently doesn't get called - logger.info(f'{status.name!r} (on_gc_status_change)') +@client.on('logged_on') +def handle_after_logon(): + cs.launch() + async_scheduler.start() + gevent_scheduler.start() - statuses = {0: States.NORMAL, 1: States.INTERNAL_SERVER_ERROR, 2: States.OFFLINE, - 3: States.RELOADING, 4: States.INTERNAL_STEAM_ERROR} - game_coordinator = statuses.get(status.value, States.UNKNOWN) - if game_coordinator != self.cache.get('game_coordinator'): - self.update_cache({'game_coordinator': game_coordinator.literal}) +@cs.on('ready') +def cs_launched(): + logging.info('CS launched.') - logger.info(f'Successfully dumped game coordinator status: {game_coordinator.literal}') - async def update_depots(self): - try: - data = await self.fetch_product_info(apps=self.APPS_TO_FETCH) - data = {int(app.id): app for app in data} - logging.info(data) - - main_data = data[730] - public_build_id = main_data.public_branch.build_id - dpr_build_id = main_data.get_branch('dpr').build_id - dprp_build_id = main_data.get_branch('dprp').build_id - - # currently can't track todo: investigate, is it steam.py's issue or valve's - # cs2_app_change_number = data[2275500].change_number - # cs2_server_change_number = data[2275530].change_number - except Exception: - logger.exception('Caught an exception while trying to fetch depots!') - return +@cs.on('connection_status') +def update_gc_status(status): + statuses = {0: States.NORMAL, 1: States.INTERNAL_SERVER_ERROR, 2: States.OFFLINE, + 3: States.RELOADING, 4: States.INTERNAL_STEAM_ERROR} + game_coordinator = statuses.get(status, States.UNKNOWN) + + with open(config.CACHE_FILE_PATH, encoding='utf-8') as f: + cache = json.load(f) + + if game_coordinator != cache.get('game_coordinator'): + cache['game_coordinator'] = game_coordinator.literal + + with open(config.CACHE_FILE_PATH, 'w', encoding='utf-8') as f: + json.dump(cache, f, indent=4) + + logging.info(f'Successfully dumped game coordinator status: {game_coordinator.literal}') + + +@async_scheduler.scheduled_job('interval', seconds=45) +async def update_depots(): + # noinspection PyBroadException + try: + data = client.get_product_info(apps=[730, 2275500, 2275530], timeout=15)['apps'] + main_data = data[730] - if public_build_id != self.cache.get('public_build_id'): - _ = asyncio.create_task(self.update_game_version()) + public_build_id = int(main_data['depots']['branches']['public']['buildid']) + dpr_build_id = int(main_data['depots']['branches']['dpr']['buildid']) + dprp_build_id = int(main_data['depots']['branches']['dprp']['buildid']) - self.update_cache({ - 'public_build_id': public_build_id, - 'dpr_build_id': dpr_build_id, - 'dprp_build_id': dprp_build_id, - # 'cs2_app_changenumber': cs2_app_change_number, - # 'cs2_server_changenumber': cs2_server_change_number - }) + cs2_app_change_number = data[2275500]['_change_number'] + cs2_server_change_number = data[2275530]['_change_number'] + except Exception: + logging.exception('Caught an exception while trying to fetch depots!') + return - logger.info('Successfully dumped game build IDs.') + with open(config.CACHE_FILE_PATH, encoding='utf-8') as f: + cache = json.load(f) - async def update_game_version(self): - timeout = 30 * 60 - timeout_start = time.time() - while time.time() < timeout_start + timeout: - try: - data = GameVersion.request() + new_data = {'cs2_app_changenumber': cs2_app_change_number, + 'cs2_server_changenumber': cs2_server_change_number, + 'dprp_build_id': dprp_build_id, + 'dpr_build_id': dpr_build_id, + 'public_build_id': public_build_id} - no_version_data_cached = (data.cs2_client_version is None) - version_has_changed = (data.cs2_client_version != self.cache.get('cs2_client_version')) + for _id, new_value in new_data.items(): + if cache.get(_id) != new_value: + cache[_id] = new_value + if _id == 'dpr_build_id' and new_value == cache['public_build_id']: + await send_alert('dpr_build_sync_id', new_value) + continue + if _id == 'dprp_build_id' and new_value == cache['public_build_id']: + await send_alert('dprp_build_sync_id', new_value) + continue + if _id == 'public_build_id': + _ = asyncio.create_task(update_game_version) + await send_alert(_id, new_value) - # We also want the data to be up-to-date, so we check datetime - new_version_datetime = (dt.datetime.fromtimestamp(data.cs2_version_timestamp) - .replace(tzinfo=VALVE_TIMEZONE).astimezone(dt.UTC)) + with open(config.CACHE_FILE_PATH, 'w', encoding='utf-8') as f: + json.dump(cache, f, indent=4) - is_up_to_date = utime.utcnow() - new_version_datetime < dt.timedelta(hours=12) + logging.info('Successfully dumped game build IDs.') - if no_version_data_cached or (version_has_changed and is_up_to_date): - self.update_cache(data.asdict()) - return - except Exception: - logging.exception('Caught an exception while trying to get new version!') - await asyncio.sleep(45) - # sometimes SteamDB updates the info much later (xPaw: Zzz...) - # because of this, we retry in an hour - await asyncio.sleep(60 * 60) - await self.update_game_version() +def update_game_version(): + timeout = 30 * 60 + timeout_start = time.time() + while time.time() < timeout_start + timeout: + # noinspection PyBroadException + try: + data = GameVersion.request() + + with open(config.CACHE_FILE_PATH, encoding='utf-8') as f: + cache = json.load(f) + + # Made to ensure we will grab the latest public data if we *somehow* don't have anything cached + no_cached_data = (cache.get('cs2_client_version') is None) + + # We also want to ensure that the data is up-to-date, so we check datetime + new_data_datetime = (dt.datetime.fromtimestamp(data.cs2_version_timestamp) + .replace(tzinfo=VALVE_TIMEZONE).astimezone(dt.UTC)) + is_up_to_date = utime.utcnow() - new_data_datetime < dt.timedelta(hours=12) + + if no_cached_data or (is_up_to_date and data.cs2_client_version != cache.get('cs2_client_version')): + for key, value in data.asdict().items(): + cache[key] = value + + with open(config.CACHE_FILE_PATH, 'w', encoding='utf-8') as f: + json.dump(cache, f, indent=4) + sys.exit() + except Exception: + logging.exception('Caught an exception while trying to get new version!') + asyncio.sleep(45) + continue + asyncio.sleep(45) + # xPaw: Zzz... + # because of this, we retry in an hour + asyncio.sleep(60 * 60) + update_game_version() + - async def update_players_count(self): - player_count = await self.get_app(730).player_count() # currently doesn't work - freezes the function entirely - self.update_cache({'online_players': player_count}) +@gevent_scheduler.scheduled_job('interval', seconds=45) +def online_players(): + value = client.get_player_count(730) - logger.info(f'Successfully dumped player count: {player_count}') + with open(config.CACHE_FILE_PATH, 'r', encoding='utf-8') as f: + cache = json.load(f) - async def update_players_count_alter(self): - response = api.get_number_of_current_players(appid=730).get('response') # getting this value from gc is more accurate + if value != cache.get('online_players'): + cache['online_players'] = value - player_count = 0 - if response.get('result') == 1 and response.get('player_count'): - player_count = response['player_count'] + with open(config.CACHE_FILE_PATH, 'w', encoding='utf-8') as f: + json.dump(cache, f, indent=4) - self.update_cache({'online_players': player_count}) + logging.info(f'Successfully dumped player count: {value}') - logger.info(f'Successfully dumped player count: {player_count}') - def load_cache(self): - """Loads cache into ``self.cache``.""" +async def send_alert(key: str, new_value: int): + logging.info(f'Found new change: {key}, sending alert...') - with open(self.cache_file_path, encoding='utf-8') as f: - self.cache = json.load(f) + alert_sample = available_alerts.get(key) - def dump_cache(self): - """Dumps ``self.cache`` to the cache file.""" + if alert_sample is None: + logging.warning(f'Got wrong key to send alert: {key}') + return - with open(self.cache_file_path, 'w', encoding='utf-8') as f: - json.dump(self.cache, f, indent=4, ensure_ascii=False) + text = alert_sample.format(new_value) - def update_cache(self, new_info: dict[str, ...]): - """Loads the cache into ``self.cache``, updates it with new info and dumps back to the cache file.""" + if not config.TEST_MODE: + chat_list = [config.INCS2CHAT, config.CSTRACKER] + else: + chat_list = [config.AQ] - self.load_cache() + for chat_id in chat_list: + msg = await bot.send_message(chat_id, text, disable_web_page_preview=True) + if chat_id == config.INCS2CHAT: + await msg.pin(disable_notification=True) - for k, v in new_info.items(): - self.cache[k] = v - self.dump_cache() +async def main(): + try: + logging.info('Logging in...') + result = client.login(username=config.STEAM_USERNAME, password=config.STEAM_PASS) + if result != EResult.OK: + logging.error(f"Failed to login: {result!r}") + sys.exit(1) -def main(): - collector = GCCollector(cache_file_path=config.CACHE_FILE_PATH) - collector.run(username=config.STEAM_USERNAME, password=config.STEAM_PASS, debug=True) + logging.info('Logged in successfully.') + await bot.start() + await idle() + except KeyboardInterrupt: + if client.connected: + logging.info('Logout...') + client.logout() if __name__ == '__main__': - main() + asyncio.run(main()) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 4eeda5b..5168d16 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/utypes/profiles.py b/utypes/profiles.py index b59d952..4fdb3a8 100644 --- a/utypes/profiles.py +++ b/utypes/profiles.py @@ -1,10 +1,10 @@ from dataclasses import astuple, dataclass from enum import StrEnum -import hashlib import re from typing import NamedTuple, Self -from steam import ID, InvalidID +from steam import steamid +from steam.steamid import SteamID import requests import config @@ -242,9 +242,9 @@ def from_dict(cls, stats: dict) -> Self: @classmethod async def get(cls, data) -> Self: try: - steamid = await parse_steamid(data) + _id = parse_steamid(data) - response = api.get_user_game_stats(steamid=steamid.id64, appid=730) + response = api.get_user_game_stats(steamid=_id.as_64, appid=730) if not response: raise ParseUserStatsError(ErrorCode.PROFILE_IS_PRIVATE) @@ -252,7 +252,7 @@ async def get(cls, data) -> Self: raise ParseUserStatsError(ErrorCode.NO_STATS_AVAILABLE) stats_dict = {stat['name']: stat['value'] for stat in response['playerstats']['stats']} - stats_dict['steamid'] = steamid.id64 + stats_dict['steamid'] = _id.as_64 return cls.from_dict(stats_dict) except requests.exceptions.HTTPError as e: @@ -312,10 +312,10 @@ def _extract_faceit_data(data: dict): @classmethod async def get(cls, data) -> Self: try: - steamid = await parse_steamid(data) + _id = parse_steamid(data) - bans = api.get_player_bans(steamids=str(steamid.id64)) - user_data = api.get_player_summaries(steamids=str(steamid.id64))["response"]["players"][0] + bans = api.get_player_bans(steamids=str(_id.as_64)) + user_data = api.get_player_summaries(steamids=str(_id.as_64))["response"]["players"][0] vanity = user_data['profileurl'] @@ -325,10 +325,10 @@ async def get(cls, data) -> Self: account_created = user_data.get('timecreated') vanity_url = vanity.split('/')[-2] - if vanity_url == str(steamid.id64): + if vanity_url == str(_id.as_64): vanity_url = None - faceit_api_link = f'https://api.faceit.com/search/v2/players?query={steamid.id64}' + faceit_api_link = f'https://api.faceit.com/search/v2/players?query={_id.as_64}' faceit_api_response = requests.get(faceit_api_link, timeout=15).json()['payload']['results'] faceit_elo, faceit_lvl, faceit_url, faceit_ban = cls._extract_faceit_data(faceit_api_response) @@ -345,12 +345,12 @@ async def get(cls, data) -> Self: trade_ban = (bans_data['EconomyBan'] == 'banned') return cls(vanity_url, - steamid.id64, - steamid.id, + _id.as_64, + _id.id, account_created, - steamid.invite_url, - steamid.invite_code, - cls.get_csgo_friend_code(steamid), + _id.invite_url, + _id.as_invite_code, + _id.as_csgo_friend_code, faceit_url, faceit_elo, faceit_lvl, @@ -369,54 +369,27 @@ async def get(cls, data) -> Self: raise ParseUserStatsError(ErrorCode.PROFILE_IS_PRIVATE) raise e - @staticmethod - def get_csgo_friend_code(steamid: ID) -> str | None: - hashed = int.from_bytes( - hashlib.md5( - (b"CSGO" + steamid.id.to_bytes(4, "big"))[::-1], - ).digest()[:4], - "little", - ) - result = 0 - for i in range(8): - id_nib = (steamid.id64 >> (i * 4)) & 0xF - hash_nib = (hashed >> i) & 0x1 - a = (result << 4) | id_nib - result = ((result >> 28) << 32) | a - result = ((result >> 31) << 32) | ((a << 1) | hash_nib) - result = int.from_bytes(result.to_bytes(8, "big"), "little") - code: list[str] = [] - for i in range(13): - if i in {4, 9}: - code.append("-") - code.append(_csgofrcode_chars[result & 31]) - result >>= 5 - return "".join(code[5:]) - def to_tuple(self) -> tuple: return astuple(self) -async def parse_steamid(data: str) -> ID: +def parse_steamid(data: str) -> SteamID: data = data.strip() if STEAM_PROFILE_LINK_PATTERN.match(data): if not data.startswith('http'): data = 'https://' + data - if (steamid := await ID.from_url(data)) is None: + if (_id := steamid.from_url(data)) is None: raise ParseUserStatsError(ErrorCode.INVALID_LINK) - return steamid + return _id - try: - if (steamid := ID(data)).is_valid(): - return steamid - except InvalidID: - pass + if (_id := SteamID(data)).is_valid(): + return _id data = f'https://steamcommunity.com/id/{data}' - if (steamid := await ID.from_url(data)) is None: + if (_id := steamid.from_url(data)) is None: raise ParseUserStatsError(ErrorCode.INVALID_REQUEST) - return steamid + return _id