Skip to content

Commit

Permalink
cosmetic changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ohld committed Jun 5, 2024
1 parent 12c351a commit b6b5018
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 27 deletions.
12 changes: 12 additions & 0 deletions flow_deployments/broadcasts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
broadcast_next_meme_to_active_1w_ago,
broadcast_next_meme_to_active_4w_ago,
broadcast_next_meme_to_active_15m_ago,
broadcast_next_meme_to_active_24h_ago,
broadcast_next_meme_to_active_48h_ago,
)

Expand All @@ -19,6 +20,17 @@

deployment_broadcast_15m_ago.apply()

# broadcasts meme in 48h after last activity
deployment_broadcast_24h_ago = Deployment.build_from_flow(
flow=broadcast_next_meme_to_active_24h_ago,
name="broadcast_next_meme_to_active_24h_ago",
schedule=(CronSchedule(cron="5 * * * *", timezone="Europe/London")),
work_pool_name=settings.ENVIRONMENT,
)

deployment_broadcast_24h_ago.apply()


# broadcasts meme in 48h after last activity
deployment_broadcast_48h_ago = Deployment.build_from_flow(
flow=broadcast_next_meme_to_active_48h_ago,
Expand Down
9 changes: 8 additions & 1 deletion src/flows/broadcasts/meme.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ async def broadcast_next_meme_to_users(users):

for user in users:
user_id = user["id"]
await check_queue(user["id"])
await check_queue(user_id)
meme = await get_next_meme_for_user(user_id)
if meme:
await send_meme_to_user(bot, user_id, meme)
logger.info(f"Sent meme_id={meme.id} to #{user_id}")
await asyncio.sleep(0.2) # flood control


Expand All @@ -44,6 +45,12 @@ async def broadcast_next_meme_to_active_15m_ago():
await broadcast_next_meme_to_users(users)


@flow
async def broadcast_next_meme_to_active_24h_ago():
users = await get_users_active_minutes_ago(24 * 60, 24 * 60 + 60)
await broadcast_next_meme_to_users(users)


@flow
async def broadcast_next_meme_to_active_48h_ago():
users = await get_users_active_minutes_ago(48 * 60, 48 * 60 + 60)
Expand Down
17 changes: 7 additions & 10 deletions src/recommendations/meme_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,12 @@ async def generate_recommendations(user_id, limit):
)

else:
if r < 0.25:
if r < 0.5:
candidates = await classic(
user_id, limit=limit, exclude_meme_ids=meme_ids_in_queue
)
elif r < 0.5:
candidates = await like_spread_and_recent_memes(
user_id, limit=limit, exclude_meme_ids=meme_ids_in_queue
)
elif r < 0.75:
candidates = await get_best_memes_from_each_source(
user_id, limit=limit, exclude_meme_ids=meme_ids_in_queue
)
else:
candidates = await less_seen_meme_and_source(
candidates = await like_spread_and_recent_memes(
user_id, limit=limit, exclude_meme_ids=meme_ids_in_queue
)

Expand All @@ -122,6 +114,11 @@ async def generate_recommendations(user_id, limit):
user_id, limit=limit, exclude_meme_ids=meme_ids_in_queue
)

if len(candidates) == 0 and user_info["nmemes_sent"] > 1000:
candidates = await less_seen_meme_and_source(
user_id, limit=limit, exclude_meme_ids=meme_ids_in_queue
)

if len(candidates) == 0:
# TODO: fallback to some algo which will always return something
return
Expand Down
10 changes: 5 additions & 5 deletions src/storage/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,32 @@ async def insert_parsed_posts_from_vk(

async def insert_parsed_posts_from_ig(
meme_source_id: int,
vk_posts: list[IgPostParsingResult,],
ig_posts: list[IgPostParsingResult,],
) -> None:
result = await fetch_all(
select(meme_raw_ig.c.post_id)
.where(meme_raw_ig.c.meme_source_id == meme_source_id)
.where(meme_raw_ig.c.post_id.in_([post.post_id for post in vk_posts]))
.where(meme_raw_ig.c.post_id.in_([post.post_id for post in ig_posts]))
)
post_ids_in_db = {row["post_id"] for row in result}

posts_to_create = [
post.model_dump() | {"meme_source_id": meme_source_id}
for post in vk_posts
for post in ig_posts
if post.post_id not in post_ids_in_db
]

if len(posts_to_create) > 0:
print(f"Going to insert {len(posts_to_create)} new posts.")
await execute(insert(vk_posts).values(posts_to_create))
await execute(insert(meme_raw_ig).values(posts_to_create))

posts_to_update = [
post.model_dump()
| {
"meme_source_id": meme_source_id,
"updated_at": datetime.now(timezone.utc).replace(tzinfo=None),
}
for post in vk_posts
for post in ig_posts
if post.post_id in post_ids_in_db
]

Expand Down
4 changes: 2 additions & 2 deletions src/tgbot/handlers/admin/user_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ async def handle_show_user_info(
await update.message.reply_text(f"🚫 User @{username} not found.")
return

selected_user_info = await update_user_info_cache(selected_user["id"])

# TODO: create a function which creates a user info string
await calculate_user_stats() # regenerate user stats
await calculate_inviter_stats()

selected_user_info = await update_user_info_cache(selected_user["id"])

report = await get_user_stats_report(selected_user_info["id"])

await update.message.reply_text(
Expand Down
7 changes: 1 addition & 6 deletions src/tgbot/handlers/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,12 @@
from telegram import Update
from telegram.ext import ContextTypes

from src.recommendations.meme_queue import (
clear_meme_queue_for_user,
generate_cold_start_recommendations,
)
from src.tgbot.handlers.deep_link import handle_deep_link_used
from src.tgbot.handlers.language import (
handle_language_settings,
init_user_languages_from_tg_user,
)
from src.tgbot.logs import log
from src.tgbot.senders.next_message import next_message
from src.tgbot.service import create_user, save_tg_user
from src.tgbot.user_info import update_user_info_cache

Expand All @@ -33,7 +28,7 @@ async def handle_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
deep_link=deep_link,
)

user = await create_user(id=user_id, nickname=update.effective_user.name)
user = await create_user(id=user_id)
await init_user_languages_from_tg_user(update.effective_user)
if deep_link:
asyncio.create_task(
Expand Down
5 changes: 2 additions & 3 deletions src/tgbot/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ async def save_tg_user(

async def create_user(
id: int,
nickname: str | None = None,
) -> None:
"""
Creates a row in user table
Expand All @@ -54,8 +53,8 @@ async def create_user(
sql = f"""
INSERT
INTO "user"
(id, type, last_active_at, nickname)
VALUES ({id}, '{UserType.USER.value}', NOW(), '{nickname}')
(id, type, last_active_at)
VALUES ({id}, '{UserType.USER.value}', NOW())
ON CONFLICT(id)
DO UPDATE SET
blocked_bot_at = NULL,
Expand Down

0 comments on commit b6b5018

Please sign in to comment.