diff --git a/datahub-actions/setup.py b/datahub-actions/setup.py index 80fd432c..d4348fa1 100644 --- a/datahub-actions/setup.py +++ b/datahub-actions/setup.py @@ -84,7 +84,10 @@ def get_long_description(): }, "tag_propagation": set(), "term_propagation": set(), - "snowflake_tag_propagation": {f"acryl-datahub[snowflake]>={acryl_datahub_min_version}"} + "snowflake_tag_propagation": {f"acryl-datahub[snowflake]>={acryl_datahub_min_version}"}, + "telegram": { + "pyTelegramBotAPI>=4.12.0" + } # Transformer Plugins (None yet) } @@ -137,6 +140,7 @@ def get_long_description(): "tag_propagation", "term_propagation", "snowflake_tag_propagation", + "telegram" ] for dependency in plugins[plugin] ), @@ -157,6 +161,7 @@ def get_long_description(): "tag_propagation", "term_propagation", "snowflake_tag_propagation", + "telegram" ] for dependency in plugins[plugin] ), @@ -172,6 +177,7 @@ def get_long_description(): "tag_propagation = datahub_actions.plugin.action.tag.tag_propagation_action:TagPropagationAction", "term_propagation = datahub_actions.plugin.action.term.term_propagation_action:TermPropagationAction", "snowflake_tag_propagation = datahub_actions.plugin.action.snowflake.tag_propagator:SnowflakeTagPropagatorAction", + "telegram = datahub_actions.plugin.action.telegram.telegram:TelegramNotificationAction" ], "datahub_actions.transformer.plugins": [], "datahub_actions.source.plugins": [], diff --git a/datahub-actions/src/datahub_actions/plugin/action/telegram/__init__.py b/datahub-actions/src/datahub_actions/plugin/action/telegram/__init__.py new file mode 100644 index 00000000..26b2bd5e --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/action/telegram/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/datahub-actions/src/datahub_actions/plugin/action/telegram/telegram.py b/datahub-actions/src/datahub_actions/plugin/action/telegram/telegram.py new file mode 100644 index 00000000..01acef6e --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/action/telegram/telegram.py @@ -0,0 +1,94 @@ +import json +import logging + +import telebot +from datahub.configuration.common import ConfigModel +from datahub.metadata.schema_classes import \ + EntityChangeEventClass as EntityChangeEvent +from datahub_actions.action.action import Action +from datahub_actions.event.event_envelope import EventEnvelope +from datahub_actions.pipeline.pipeline_context import PipelineContext +from datahub_actions.utils.datahub_util import DATAHUB_SYSTEM_ACTOR_URN +from datahub_actions.utils.social_util import ( + StructuredMessage, get_message_from_entity_change_event, + get_telegram_welcome_message, pretty_any_text) +from pydantic import SecretStr +from ratelimit import limits, sleep_and_retry + +logger = logging.getLogger(__name__) + + +@sleep_and_retry +@limits(calls=1, period=1) +def post_message(client, chat_id, text): + client.send_message( + chat_id=chat_id, + text=text, + parse_mode="HTML" + ) + + +class TelegramNotificationConfig(ConfigModel): + bot_token: SecretStr + chat_id: SecretStr + base_url: str = "http://127.0.0.1:9002/" + suppress_system_activity: bool = True + + +class TelegramNotificationAction(Action): + def name(self): + return "SlackNotificationAction" + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action": + action_config = TelegramNotificationConfig.parse_obj(config_dict or {}) + logger.info(f"Telegram notification action configured with {action_config}") + return cls(action_config, ctx) + + def __init__(self, action_config: TelegramNotificationConfig, ctx: PipelineContext): + self.ctx = ctx + self.action_config = action_config + self.bot = telebot.TeleBot(self.action_config.bot_token.get_secret_value()) + + self.bot.send_message( + chat_id=self.action_config.chat_id.get_secret_value(), + text=get_telegram_welcome_message(self.action_config.base_url), + parse_mode="HTML" + ) + + def act(self, event: EventEnvelope) -> None: + try: + message = json.dumps( + json.loads(event.as_json()), + indent=4 + ) + logger.debug(f"Received event: {message}") + if event.event_type == "EntityChangeEvent_v1": + assert isinstance(event.event, EntityChangeEvent) + + if ( + event.event.auditStamp.actor == DATAHUB_SYSTEM_ACTOR_URN + and self.action_config.suppress_system_activity + ): + return None + + semantic_message = get_message_from_entity_change_event( + event.event, + self.action_config.base_url, + self.ctx.graph.graph if self.ctx.graph else None, + channel="telegram" + ) + + if semantic_message: + post_message( + client=self.bot, + chat_id=self.action_config.chat_id.get_secret_value(), + text=semantic_message + ) + else: + logger.debug("Skipping message because it didn't match out filter") + except Exception as e: + logger.debug(f"Failed to process event", e) + + def close(self) -> None: + pass \ No newline at end of file diff --git a/datahub-actions/src/datahub_actions/utils/social_util.py b/datahub-actions/src/datahub_actions/utils/social_util.py index 341934b1..cea3be88 100644 --- a/datahub-actions/src/datahub_actions/utils/social_util.py +++ b/datahub-actions/src/datahub_actions/utils/social_util.py @@ -3,14 +3,13 @@ from dataclasses import dataclass from typing import Dict, Optional +from telebot.formatting import hlink, hbold from datahub.ingestion.graph.client import DataHubGraph -from datahub.metadata.schema_classes import EntityChangeEventClass as EntityChangeEvent +from datahub.metadata.schema_classes import \ + EntityChangeEventClass as EntityChangeEvent from datahub.utilities.urns.urn import Urn - -from datahub_actions.utils.name_resolver import ( - get_entity_name_from_urn, - get_entity_qualifier_from_urn, -) +from datahub_actions.utils.name_resolver import (get_entity_name_from_urn, + get_entity_qualifier_from_urn) logger = logging.getLogger(__name__) @@ -26,6 +25,8 @@ def make_url_with_title(title: str, url: str, channel: str) -> str: if channel == "slack": # slack uses mrkdwn format return f"<{url}|{title}>" + elif channel == 'telegram': + return hlink(title, url) else: return f"[{title}]({url})" @@ -35,6 +36,8 @@ def make_bold(text: str, channel: str) -> str: return text if channel == "slack": return f"*{text}*" + elif channel == 'telegram': + return hbold(text) else: return f"**{text}**" @@ -46,6 +49,19 @@ class StructuredMessage: text: Optional[str] +def get_telegram_welcome_message(datahub_home_url: str): + hostname = "unknown-host" + try: + import os + + hostname = os.uname()[1] + except Exception as e: + logger.warn(f"Failed to acquire hostname with {e}") + pass + + return f"👀 I'll be watching for interesting events on {hlink('DataHub', datahub_home_url)} and keep you updated when anything changes. ⚡" + + def get_welcome_message(datahub_home_url: str) -> StructuredMessage: hostname = "unknown-host" try: @@ -58,6 +74,7 @@ def get_welcome_message(datahub_home_url: str) -> StructuredMessage: current_time: str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") current_timezone: str = str(datetime.datetime.now().astimezone().tzinfo) + return StructuredMessage( title="DataHub Bot 🌟", properties={ @@ -127,12 +144,12 @@ def get_message_from_entity_change_event( ) if category == "lifecycle": - message = f">✏️ {make_bold(actor_name,channel)} has {operation} {entity_specialized_type} {entity_message_trailer}." + message = f"✏️ {make_bold(actor_name,channel)} has {operation} {entity_specialized_type} {entity_message_trailer}." elif category == "technical_schema": if event.modifier and event.modifier.startswith("urn:li:schemaField"): - message = f">✏️ {make_bold(actor_name,channel)} has {operation} field {make_bold(modifier_name,channel)} in schema for {entity_specialized_type} {entity_message_trailer}." + message = f"✏️ {make_bold(actor_name,channel)} has {operation} field {make_bold(modifier_name,channel)} in schema for {entity_specialized_type} {entity_message_trailer}." else: - message = f">✏️ {make_bold(actor_name,channel)} has {operation} {make_bold(modifier_name,channel)} schema for {entity_specialized_type} {entity_message_trailer}." + message = f"✏️ {make_bold(actor_name,channel)} has {operation} {make_bold(modifier_name,channel)} schema for {entity_specialized_type} {entity_message_trailer}." else: - message = f">✏️ {make_bold(actor_name,channel)} has {operation} {category} {make_bold(modifier_name,channel)} for {entity_specialized_type} {entity_message_trailer}." - return message + message = f"✏️ {make_bold(actor_name,channel)} has {operation} {category} {make_bold(modifier_name,channel)} for {entity_specialized_type} {entity_message_trailer}." + return message \ No newline at end of file