Skip to content

Commit

Permalink
lint all files (#19)
Browse files Browse the repository at this point in the history
lint all files
  • Loading branch information
ohld authored Feb 4, 2024
1 parent fa251a8 commit b05b0b0
Show file tree
Hide file tree
Showing 66 changed files with 849 additions and 664 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ jobs:
- name: Format with ruff
run: |
ruff format src --diff
- name: Lint with mypy
run: |
mypy src tests
- name: Run tests
run: |
pytest
# - name: Lint with mypy
# run: |
# mypy src tests
# - name: Run tests
# run: |
# pytest
4 changes: 1 addition & 3 deletions flow_deployments/broadcasts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
from prefect.server.schemas.schedules import CronSchedule

from src.config import settings

from src.flows.broadcasts.meme import broadcast_memes_to_users_active_hours_ago


deployment_broadcast_hourly = Deployment.build_from_flow(
flow=broadcast_memes_to_users_active_hours_ago,
name="broadcast_memes_to_users_active_hours_ago",
schedule=(CronSchedule(cron="3 * * * *", timezone="Europe/London")),
work_pool_name=settings.ENVIRONMENT,
)

deployment_broadcast_hourly.apply()
deployment_broadcast_hourly.apply()
2 changes: 0 additions & 2 deletions flow_deployments/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
from prefect.server.schemas.schedules import CronSchedule

from src.config import settings

from src.flows.parsers.tg import parse_telegram_sources
from src.flows.parsers.vk import parse_vk_sources


deployment_tg = Deployment.build_from_flow(
flow=parse_telegram_sources,
name="Parse Telegram Sources",
Expand Down
6 changes: 2 additions & 4 deletions flow_deployments/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
from prefect.server.schemas.schedules import CronSchedule

from src.config import settings

from src.flows.stats.meme import calculate_meme_stats
from src.flows.stats.user import calculate_user_stats
from src.flows.stats.user_meme_source import calculate_user_meme_source_stats
from src.flows.stats.meme import calculate_meme_stats


deployment_user_stats = Deployment.build_from_flow(
flow=calculate_user_stats,
Expand Down Expand Up @@ -35,4 +33,4 @@
schedule=(CronSchedule(cron="3,18,33,48 * * * *", timezone="Europe/London")),
)

deployment_user_stats.apply()
deployment_user_stats.apply()
2 changes: 0 additions & 2 deletions flow_deployments/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from src.config import settings
from src.flows.storage.memes import ocr_uploaded_memes


deployment_ocr_uploaded_memes = Deployment.build_from_flow(
flow=ocr_uploaded_memes,
name="OCR Uploaded Memes",
Expand All @@ -13,4 +12,3 @@
)

deployment_ocr_uploaded_memes.apply()

5 changes: 4 additions & 1 deletion src/broadcasts/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ async def get_users_which_were_active_hours_ago(hours: int) -> list[dict]:
SELECT
id
FROM "user"
WHERE last_active_at BETWEEN NOW() - INTERVAL '{hours} HOURS' AND NOW() - INTERVAL '{hours-1} HOURS'
WHERE last_active_at BETWEEN
NOW() - INTERVAL '{hours} HOURS'
AND
NOW() - INTERVAL '{hours-1} HOURS'
"""
return await fetch_all(text(insert_query))
102 changes: 62 additions & 40 deletions src/database.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from typing import Any

from sqlalchemy import (
CursorResult,
BigInteger,
Boolean,
Column,
CursorResult,
DateTime,
ForeignKey,
Identity,
Insert,
Integer,
MetaData,
Select,
String,
Table,
Update,
Identity,
ForeignKey,
UniqueConstraint,
BigInteger,
Update,
func,
)
from sqlalchemy.dialects.postgresql import JSONB
Expand All @@ -24,9 +24,9 @@
from src.config import settings
from src.constants import DB_NAMING_CONVENTION
from src.storage.constants import (
MEME_MEME_SOURCE_RAW_MEME_UNIQUE_CONSTRAINT,
MEME_RAW_TELEGRAM_MEME_SOURCE_POST_UNIQUE_CONSTRAINT,
MEME_RAW_VK_MEME_SOURCE_POST_UNIQUE_CONSTRAINT,
MEME_MEME_SOURCE_RAW_MEME_UNIQUE_CONSTRAINT,
)

DATABASE_URL = str(settings.DATABASE_URL)
Expand All @@ -41,13 +41,9 @@
Column("id", Integer, Identity(), primary_key=True),
Column("type", String, nullable=False),
Column("url", String, nullable=False, unique=True),

Column("status", String, nullable=False), # in_moderation, parsing_enabled, parsing_disabled

Column("status", String, nullable=False),
Column("language_code", String, index=True),

Column("added_by", ForeignKey("user.id", ondelete="SET NULL")),

Column("parsed_at", DateTime),
Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("updated_at", DateTime, onupdate=func.now()),
Expand All @@ -58,13 +54,15 @@
"meme_raw_telegram",
metadata,
Column("id", Integer, Identity(), primary_key=True),
Column("meme_source_id", ForeignKey("meme_source.id", ondelete="CASCADE"), nullable=False),
Column(
"meme_source_id",
ForeignKey("meme_source.id", ondelete="CASCADE"),
nullable=False,
),
Column("post_id", Integer, nullable=False),

Column("url", String, nullable=False),
Column("date", DateTime, nullable=False),
Column("content", String),

Column("out_links", JSONB),
Column("mentions", JSONB),
Column("hashtags", JSONB),
Expand All @@ -73,35 +71,39 @@
Column("views", Integer, nullable=False),
Column("forwarded_url", String),
Column("link_preview", JSONB),

Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("updated_at", DateTime, onupdate=func.now()),

UniqueConstraint("meme_source_id", "post_id", name=MEME_RAW_TELEGRAM_MEME_SOURCE_POST_UNIQUE_CONSTRAINT),
UniqueConstraint(
"meme_source_id",
"post_id",
name=MEME_RAW_TELEGRAM_MEME_SOURCE_POST_UNIQUE_CONSTRAINT,
),
)


meme_raw_vk = Table(
"meme_raw_vk",
metadata,
Column("id", Integer, Identity(), primary_key=True),
Column("meme_source_id", ForeignKey("meme_source.id", ondelete="CASCADE"), nullable=False),
Column(
"meme_source_id",
ForeignKey("meme_source.id", ondelete="CASCADE"),
nullable=False,
),
Column("post_id", String, nullable=False),

Column("url", String, nullable=False),
Column("content", String),
Column("date", DateTime, nullable=False),

Column("media", JSONB),
Column("views", Integer, nullable=False),
Column("likes", Integer, nullable=False),
Column("reposts", Integer, nullable=False),
Column("comments", Integer, nullable=False),

Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("updated_at", DateTime, onupdate=func.now()),

UniqueConstraint("meme_source_id", "post_id", name=MEME_RAW_VK_MEME_SOURCE_POST_UNIQUE_CONSTRAINT),
UniqueConstraint(
"meme_source_id", "post_id", name=MEME_RAW_VK_MEME_SOURCE_POST_UNIQUE_CONSTRAINT
),
)


Expand All @@ -116,23 +118,27 @@
"meme",
metadata,
Column("id", Integer, Identity(), primary_key=True),
Column("meme_source_id", ForeignKey("meme_source.id", ondelete="CASCADE"), nullable=False),
Column(
"meme_source_id",
ForeignKey("meme_source.id", ondelete="CASCADE"),
nullable=False,
),
Column("raw_meme_id", Integer, nullable=False, index=True),
Column("status", String, nullable=False),

Column("type", String, nullable=False, index=True),
Column("telegram_file_id", String),
Column("caption", String),
Column("language_code", String, index=True),

Column("ocr_result", JSONB),
Column("duplicate_of", ForeignKey("meme.id", ondelete="SET NULL")),

Column("published_at", DateTime, nullable=False),
Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("updated_at", DateTime, onupdate=func.now()),

UniqueConstraint("meme_source_id", "raw_meme_id", name=MEME_MEME_SOURCE_RAW_MEME_UNIQUE_CONSTRAINT),
UniqueConstraint(
"meme_source_id",
"raw_meme_id",
name=MEME_MEME_SOURCE_RAW_MEME_UNIQUE_CONSTRAINT,
),
)


Expand All @@ -146,9 +152,6 @@
Column("is_premium", Boolean),
Column("language_code", String), # IETF language tag from telegram
Column("deep_link", String),

# Column("first_chat_id", BigInteger, nullable=False), # chat_id where user first appeared

Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("updated_at", DateTime, onupdate=func.now()),
)
Expand All @@ -158,8 +161,7 @@
"user",
metadata,
Column("id", BigInteger, primary_key=True),
Column("type", String, nullable=False), # super_user, moderator,

Column("type", String, nullable=False), # super_user, moderator,
Column("created_at", DateTime, server_default=func.now(), nullable=False),
Column("last_active_at", DateTime, onupdate=func.now()),
Column("blocked_bot_at", DateTime),
Expand Down Expand Up @@ -208,20 +210,34 @@
Column("nmemes_sent", Integer, nullable=False, server_default="0"),
Column("nsessions", Integer, nullable=False, server_default="0"),
Column("active_days_count", Integer, nullable=False, server_default="0"),

Column("updated_at", DateTime, server_default=func.now(), nullable=False, onupdate=func.now()),
Column(
"updated_at",
DateTime,
server_default=func.now(),
nullable=False,
onupdate=func.now(),
),
)


user_meme_source_stats = Table(
"user_meme_source_stats",
metadata,
Column("user_id", ForeignKey("user.id", ondelete="CASCADE"), primary_key=True),
Column("meme_source_id", ForeignKey("meme_source.id", ondelete="CASCADE"), primary_key=True),
Column(
"meme_source_id",
ForeignKey("meme_source.id", ondelete="CASCADE"),
primary_key=True,
),
Column("nlikes", Integer, nullable=False, server_default="0"),
Column("ndislikes", Integer, nullable=False, server_default="0"),

Column("updated_at", DateTime, server_default=func.now(), nullable=False, onupdate=func.now()),
Column(
"updated_at",
DateTime,
server_default=func.now(),
nullable=False,
onupdate=func.now(),
),
)


Expand All @@ -234,7 +250,13 @@
Column("nmemes_sent", Integer, nullable=False, server_default="0"),
Column("age_days", Integer, nullable=False, server_default="99999"),
Column("raw_impr_rank", Integer, nullable=False, server_default="99999"),
Column("updated_at", DateTime, server_default=func.now(), nullable=False, onupdate=func.now()),
Column(
"updated_at",
DateTime,
server_default=func.now(),
nullable=False,
onupdate=func.now(),
),
)


Expand Down
8 changes: 3 additions & 5 deletions src/flows/broadcasts/meme.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
@flow
async def broadcast_memes_to_users_active_hours_ago(hours: int = 48):
"""
Runs each hour:
1. Takes users which were active (hours, hours-1) hours ago
2. Sends them a best meme
Runs each hour:
1. Takes users which were active (hours, hours-1) hours ago
2. Sends them a best meme
"""
logger = get_run_logger()

Expand All @@ -33,5 +33,3 @@ async def broadcast_memes_to_users_active_hours_ago(hours: int = 48):
await send_new_message_with_meme(user_id, meme)
await create_user_meme_reaction(user_id, meme.id, meme.recommended_by)
await asyncio.sleep(0.1) # flood control


5 changes: 3 additions & 2 deletions src/flows/parsers/tg.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import asyncio
from datetime import datetime

from prefect import flow, get_run_logger

from src.flows.storage.memes import tg_meme_pipeline
from src.storage.parsers.tg import TelegramChannelScraper
from src.storage.service import (
get_telegram_sources_to_parse,
insert_parsed_posts_from_telegram,
update_meme_source,
)
from src.flows.storage.memes import tg_meme_pipeline


@flow(name="Parse Telegram Source")
Expand All @@ -29,7 +30,7 @@ async def parse_telegram_source(

await update_meme_source(meme_source_id=meme_source_id, parsed_at=datetime.utcnow())
await asyncio.sleep(5)


@flow(
name="Parse Telegram Channels",
Expand Down
4 changes: 2 additions & 2 deletions src/flows/parsers/vk.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import asyncio
from datetime import datetime

from prefect import flow, get_run_logger

from src.flows.storage.memes import vk_meme_pipeline
from src.storage.parsers.vk import VkGroupScraper
from src.storage.service import (
get_vk_sources_to_parse,
insert_parsed_posts_from_vk,
update_meme_source,
)

from src.flows.storage.memes import vk_meme_pipeline


@flow(name="Parse VK Source")
async def parse_vk_source(
Expand Down
3 changes: 1 addition & 2 deletions src/flows/stats/meme.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
@flow(
name="Calculate meme_stats",
)
async def calculate_meme_stats(
) -> None:
async def calculate_meme_stats() -> None:
await meme.calculate_meme_reactions_stats()

await meme.calculate_meme_raw_impressions_stats()
Loading

0 comments on commit b05b0b0

Please sign in to comment.