Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lint all files #19

Merged
merged 2 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ jobs:
- name: Format with ruff
run: |
ruff format src --diff
- name: Lint with mypy
run: |
mypy src tests
- name: Run tests
run: |
pytest
# - name: Lint with mypy
# run: |
# mypy src tests
# - name: Run tests
# run: |
# pytest
4 changes: 1 addition & 3 deletions flow_deployments/broadcasts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
from prefect.server.schemas.schedules import CronSchedule

from src.config import settings

from src.flows.broadcasts.meme import broadcast_memes_to_users_active_hours_ago


deployment_broadcast_hourly = Deployment.build_from_flow(
flow=broadcast_memes_to_users_active_hours_ago,
name="broadcast_memes_to_users_active_hours_ago",
schedule=(CronSchedule(cron="3 * * * *", timezone="Europe/London")),
work_pool_name=settings.ENVIRONMENT,
)

deployment_broadcast_hourly.apply()
deployment_broadcast_hourly.apply()
2 changes: 0 additions & 2 deletions flow_deployments/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
from prefect.server.schemas.schedules import CronSchedule

from src.config import settings

from src.flows.parsers.tg import parse_telegram_sources
from src.flows.parsers.vk import parse_vk_sources


deployment_tg = Deployment.build_from_flow(
flow=parse_telegram_sources,
name="Parse Telegram Sources",
Expand Down
6 changes: 2 additions & 4 deletions flow_deployments/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
from prefect.server.schemas.schedules import CronSchedule

from src.config import settings

from src.flows.stats.meme import calculate_meme_stats
from src.flows.stats.user import calculate_user_stats
from src.flows.stats.user_meme_source import calculate_user_meme_source_stats
from src.flows.stats.meme import calculate_meme_stats


deployment_user_stats = Deployment.build_from_flow(
flow=calculate_user_stats,
Expand Down Expand Up @@ -35,4 +33,4 @@
schedule=(CronSchedule(cron="3,18,33,48 * * * *", timezone="Europe/London")),
)

deployment_user_stats.apply()
deployment_user_stats.apply()
2 changes: 0 additions & 2 deletions flow_deployments/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from src.config import settings
from src.flows.storage.memes import ocr_uploaded_memes


deployment_ocr_uploaded_memes = Deployment.build_from_flow(
flow=ocr_uploaded_memes,
name="OCR Uploaded Memes",
Expand All @@ -13,4 +12,3 @@
)

deployment_ocr_uploaded_memes.apply()

5 changes: 4 additions & 1 deletion src/broadcasts/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ async def get_users_which_were_active_hours_ago(hours: int) -> list[dict]:
SELECT
id
FROM "user"
WHERE last_active_at BETWEEN NOW() - INTERVAL '{hours} HOURS' AND NOW() - INTERVAL '{hours-1} HOURS'
WHERE last_active_at BETWEEN
NOW() - INTERVAL '{hours} HOURS'
AND
NOW() - INTERVAL '{hours-1} HOURS'
"""
return await fetch_all(text(insert_query))
102 changes: 62 additions & 40 deletions src/database.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from typing import Any

from sqlalchemy import (
CursorResult,
BigInteger,
Boolean,
Column,
CursorResult,
DateTime,
ForeignKey,
Identity,
Insert,
Integer,
MetaData,
Select,
String,
Table,
Update,
Identity,
ForeignKey,
UniqueConstraint,
BigInteger,
Update,
func,
)
from sqlalchemy.dialects.postgresql import JSONB
Expand All @@ -24,9 +24,9 @@
from src.config import settings
from src.constants import DB_NAMING_CONVENTION
from src.storage.constants import (
MEME_MEME_SOURCE_RAW_MEME_UNIQUE_CONSTRAINT,
MEME_RAW_TELEGRAM_MEME_SOURCE_POST_UNIQUE_CONSTRAINT,
MEME_RAW_VK_MEME_SOURCE_POST_UNIQUE_CONSTRAINT,
MEME_MEME_SOURCE_RAW_MEME_UNIQUE_CONSTRAINT,
)

DATABASE_URL = str(settings.DATABASE_URL)
Expand All @@ -41,13 +41,9 @@
Column("id", Integer, Identity(), primary_key=True),
Column("type", String, nullable=False),
Column("url", String, nullable=False, unique=True),

Column("status", String, nullable=False), # in_moderation, parsing_enabled, parsing_disabled

Column("status", String, nullable=False),
Column("language_code", String, index=True),

Column("added_by", ForeignKey("user.id", ondelete="SET NULL")),

Column("parsed_at", DateTime),
Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("updated_at", DateTime, onupdate=func.now()),
Expand All @@ -58,13 +54,15 @@
"meme_raw_telegram",
metadata,
Column("id", Integer, Identity(), primary_key=True),
Column("meme_source_id", ForeignKey("meme_source.id", ondelete="CASCADE"), nullable=False),
Column(
"meme_source_id",
ForeignKey("meme_source.id", ondelete="CASCADE"),
nullable=False,
),
Column("post_id", Integer, nullable=False),

Column("url", String, nullable=False),
Column("date", DateTime, nullable=False),
Column("content", String),

Column("out_links", JSONB),
Column("mentions", JSONB),
Column("hashtags", JSONB),
Expand All @@ -73,35 +71,39 @@
Column("views", Integer, nullable=False),
Column("forwarded_url", String),
Column("link_preview", JSONB),

Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("updated_at", DateTime, onupdate=func.now()),

UniqueConstraint("meme_source_id", "post_id", name=MEME_RAW_TELEGRAM_MEME_SOURCE_POST_UNIQUE_CONSTRAINT),
UniqueConstraint(
"meme_source_id",
"post_id",
name=MEME_RAW_TELEGRAM_MEME_SOURCE_POST_UNIQUE_CONSTRAINT,
),
)


meme_raw_vk = Table(
"meme_raw_vk",
metadata,
Column("id", Integer, Identity(), primary_key=True),
Column("meme_source_id", ForeignKey("meme_source.id", ondelete="CASCADE"), nullable=False),
Column(
"meme_source_id",
ForeignKey("meme_source.id", ondelete="CASCADE"),
nullable=False,
),
Column("post_id", String, nullable=False),

Column("url", String, nullable=False),
Column("content", String),
Column("date", DateTime, nullable=False),

Column("media", JSONB),
Column("views", Integer, nullable=False),
Column("likes", Integer, nullable=False),
Column("reposts", Integer, nullable=False),
Column("comments", Integer, nullable=False),

Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("updated_at", DateTime, onupdate=func.now()),

UniqueConstraint("meme_source_id", "post_id", name=MEME_RAW_VK_MEME_SOURCE_POST_UNIQUE_CONSTRAINT),
UniqueConstraint(
"meme_source_id", "post_id", name=MEME_RAW_VK_MEME_SOURCE_POST_UNIQUE_CONSTRAINT
),
)


Expand All @@ -116,23 +118,27 @@
"meme",
metadata,
Column("id", Integer, Identity(), primary_key=True),
Column("meme_source_id", ForeignKey("meme_source.id", ondelete="CASCADE"), nullable=False),
Column(
"meme_source_id",
ForeignKey("meme_source.id", ondelete="CASCADE"),
nullable=False,
),
Column("raw_meme_id", Integer, nullable=False, index=True),
Column("status", String, nullable=False),

Column("type", String, nullable=False, index=True),
Column("telegram_file_id", String),
Column("caption", String),
Column("language_code", String, index=True),

Column("ocr_result", JSONB),
Column("duplicate_of", ForeignKey("meme.id", ondelete="SET NULL")),

Column("published_at", DateTime, nullable=False),
Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("updated_at", DateTime, onupdate=func.now()),

UniqueConstraint("meme_source_id", "raw_meme_id", name=MEME_MEME_SOURCE_RAW_MEME_UNIQUE_CONSTRAINT),
UniqueConstraint(
"meme_source_id",
"raw_meme_id",
name=MEME_MEME_SOURCE_RAW_MEME_UNIQUE_CONSTRAINT,
),
)


Expand All @@ -146,9 +152,6 @@
Column("is_premium", Boolean),
Column("language_code", String), # IETF language tag from telegram
Column("deep_link", String),

# Column("first_chat_id", BigInteger, nullable=False), # chat_id where user first appeared

Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("updated_at", DateTime, onupdate=func.now()),
)
Expand All @@ -158,8 +161,7 @@
"user",
metadata,
Column("id", BigInteger, primary_key=True),
Column("type", String, nullable=False), # super_user, moderator,

Column("type", String, nullable=False), # super_user, moderator,
Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("last_active_at", DateTime, onupdate=func.now()),
Column("blocked_bot_at", DateTime),
Expand Down Expand Up @@ -208,20 +210,34 @@
Column("nmemes_sent", Integer, nullable=False, server_default="0"),
Column("nsessions", Integer, nullable=False, server_default="0"),
Column("active_days_count", Integer, nullable=False, server_default="0"),

Column("updated_at", DateTime, server_default=func.now(), nullable=False, onupdate=func.now()),
Column(
"updated_at",
DateTime,
server_default=func.now(),
nullable=False,
onupdate=func.now(),
),
)


user_meme_source_stats = Table(
"user_meme_source_stats",
metadata,
Column("user_id", ForeignKey("user.id", ondelete="CASCADE"), primary_key=True),
Column("meme_source_id", ForeignKey("meme_source.id", ondelete="CASCADE"), primary_key=True),
Column(
"meme_source_id",
ForeignKey("meme_source.id", ondelete="CASCADE"),
primary_key=True,
),
Column("nlikes", Integer, nullable=False, server_default="0"),
Column("ndislikes", Integer, nullable=False, server_default="0"),

Column("updated_at", DateTime, server_default=func.now(), nullable=False, onupdate=func.now()),
Column(
"updated_at",
DateTime,
server_default=func.now(),
nullable=False,
onupdate=func.now(),
),
)


Expand All @@ -234,7 +250,13 @@
Column("nmemes_sent", Integer, nullable=False, server_default="0"),
Column("age_days", Integer, nullable=False, server_default="99999"),
Column("raw_impr_rank", Integer, nullable=False, server_default="99999"),
Column("updated_at", DateTime, server_default=func.now(), nullable=False, onupdate=func.now()),
Column(
"updated_at",
DateTime,
server_default=func.now(),
nullable=False,
onupdate=func.now(),
),
)


Expand Down
8 changes: 3 additions & 5 deletions src/flows/broadcasts/meme.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
@flow
async def broadcast_memes_to_users_active_hours_ago(hours: int = 48):
"""
Runs each hour:
1. Takes users which were active (hours, hours-1) hours ago
2. Sends them a best meme
Runs each hour:
1. Takes users which were active (hours, hours-1) hours ago
2. Sends them a best meme
"""
logger = get_run_logger()

Expand All @@ -33,5 +33,3 @@ async def broadcast_memes_to_users_active_hours_ago(hours: int = 48):
await send_new_message_with_meme(user_id, meme)
await create_user_meme_reaction(user_id, meme.id, meme.recommended_by)
await asyncio.sleep(0.1) # flood control


5 changes: 3 additions & 2 deletions src/flows/parsers/tg.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import asyncio
from datetime import datetime

from prefect import flow, get_run_logger

from src.flows.storage.memes import tg_meme_pipeline
from src.storage.parsers.tg import TelegramChannelScraper
from src.storage.service import (
get_telegram_sources_to_parse,
insert_parsed_posts_from_telegram,
update_meme_source,
)
from src.flows.storage.memes import tg_meme_pipeline


@flow(name="Parse Telegram Source")
Expand All @@ -29,7 +30,7 @@ async def parse_telegram_source(

await update_meme_source(meme_source_id=meme_source_id, parsed_at=datetime.utcnow())
await asyncio.sleep(5)


@flow(
name="Parse Telegram Channels",
Expand Down
4 changes: 2 additions & 2 deletions src/flows/parsers/vk.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import asyncio
from datetime import datetime

from prefect import flow, get_run_logger

from src.flows.storage.memes import vk_meme_pipeline
from src.storage.parsers.vk import VkGroupScraper
from src.storage.service import (
get_vk_sources_to_parse,
insert_parsed_posts_from_vk,
update_meme_source,
)

from src.flows.storage.memes import vk_meme_pipeline


@flow(name="Parse VK Source")
async def parse_vk_source(
Expand Down
3 changes: 1 addition & 2 deletions src/flows/stats/meme.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
@flow(
name="Calculate meme_stats",
)
async def calculate_meme_stats(
) -> None:
async def calculate_meme_stats() -> None:
await meme.calculate_meme_reactions_stats()

await meme.calculate_meme_raw_impressions_stats()
Loading
Loading