Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TelegramNotificationAction #94

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion datahub-actions/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ def get_long_description():
},
"tag_propagation": set(),
"term_propagation": set(),
"snowflake_tag_propagation": {f"acryl-datahub[snowflake]>={acryl_datahub_min_version}"}
"snowflake_tag_propagation": {f"acryl-datahub[snowflake]>={acryl_datahub_min_version}"},
"telegram": {
"pyTelegramBotAPI>=4.12.0"
}
# Transformer Plugins (None yet)
}

Expand Down Expand Up @@ -137,6 +140,7 @@ def get_long_description():
"tag_propagation",
"term_propagation",
"snowflake_tag_propagation",
"telegram"
]
for dependency in plugins[plugin]
),
Expand All @@ -157,6 +161,7 @@ def get_long_description():
"tag_propagation",
"term_propagation",
"snowflake_tag_propagation",
"telegram"
]
for dependency in plugins[plugin]
),
Expand All @@ -172,6 +177,7 @@ def get_long_description():
"tag_propagation = datahub_actions.plugin.action.tag.tag_propagation_action:TagPropagationAction",
"term_propagation = datahub_actions.plugin.action.term.term_propagation_action:TermPropagationAction",
"snowflake_tag_propagation = datahub_actions.plugin.action.snowflake.tag_propagator:SnowflakeTagPropagatorAction",
"telegram = datahub_actions.plugin.action.telegram.telegram:TelegramNotificationAction"
],
"datahub_actions.transformer.plugins": [],
"datahub_actions.source.plugins": [],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2021 Acryl Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import json
import logging

import telebot
from datahub.configuration.common import ConfigModel
from datahub.metadata.schema_classes import \
EntityChangeEventClass as EntityChangeEvent
from datahub_actions.action.action import Action
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.pipeline.pipeline_context import PipelineContext
from datahub_actions.utils.datahub_util import DATAHUB_SYSTEM_ACTOR_URN
from datahub_actions.utils.social_util import (
StructuredMessage, get_message_from_entity_change_event,
get_telegram_welcome_message, pretty_any_text)
from pydantic import SecretStr
from ratelimit import limits, sleep_and_retry

logger = logging.getLogger(__name__)


@sleep_and_retry
@limits(calls=1, period=1)
def post_message(client, chat_id, text):
client.send_message(
chat_id=chat_id,
text=text,
parse_mode="HTML"
)


class TelegramNotificationConfig(ConfigModel):
bot_token: SecretStr
chat_id: SecretStr
base_url: str = "http://127.0.0.1:9002/"
suppress_system_activity: bool = True


class TelegramNotificationAction(Action):
def name(self):
return "SlackNotificationAction"

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
action_config = TelegramNotificationConfig.parse_obj(config_dict or {})
logger.info(f"Telegram notification action configured with {action_config}")
return cls(action_config, ctx)

def __init__(self, action_config: TelegramNotificationConfig, ctx: PipelineContext):
self.ctx = ctx
self.action_config = action_config
self.bot = telebot.TeleBot(self.action_config.bot_token.get_secret_value())

self.bot.send_message(
chat_id=self.action_config.chat_id.get_secret_value(),
text=get_telegram_welcome_message(self.action_config.base_url),
parse_mode="HTML"
)

def act(self, event: EventEnvelope) -> None:
try:
message = json.dumps(
json.loads(event.as_json()),
indent=4
)
logger.debug(f"Received event: {message}")
if event.event_type == "EntityChangeEvent_v1":
assert isinstance(event.event, EntityChangeEvent)

if (
event.event.auditStamp.actor == DATAHUB_SYSTEM_ACTOR_URN
and self.action_config.suppress_system_activity
):
return None

semantic_message = get_message_from_entity_change_event(
event.event,
self.action_config.base_url,
self.ctx.graph.graph if self.ctx.graph else None,
channel="telegram"
)

if semantic_message:
post_message(
client=self.bot,
chat_id=self.action_config.chat_id.get_secret_value(),
text=semantic_message
)
else:
logger.debug("Skipping message because it didn't match out filter")
except Exception as e:
logger.debug(f"Failed to process event", e)

def close(self) -> None:
pass
39 changes: 28 additions & 11 deletions datahub-actions/src/datahub_actions/utils/social_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
from dataclasses import dataclass
from typing import Dict, Optional

from telebot.formatting import hlink, hbold
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import EntityChangeEventClass as EntityChangeEvent
from datahub.metadata.schema_classes import \
EntityChangeEventClass as EntityChangeEvent
from datahub.utilities.urns.urn import Urn

from datahub_actions.utils.name_resolver import (
get_entity_name_from_urn,
get_entity_qualifier_from_urn,
)
from datahub_actions.utils.name_resolver import (get_entity_name_from_urn,
get_entity_qualifier_from_urn)

logger = logging.getLogger(__name__)

Expand All @@ -26,6 +25,8 @@ def make_url_with_title(title: str, url: str, channel: str) -> str:
if channel == "slack":
# slack uses mrkdwn format
return f"<{url}|{title}>"
elif channel == 'telegram':
return hlink(title, url)
else:
return f"[{title}]({url})"

Expand All @@ -35,6 +36,8 @@ def make_bold(text: str, channel: str) -> str:
return text
if channel == "slack":
return f"*{text}*"
elif channel == 'telegram':
return hbold(text)
else:
return f"**{text}**"

Expand All @@ -46,6 +49,19 @@ class StructuredMessage:
text: Optional[str]


def get_telegram_welcome_message(datahub_home_url: str):
hostname = "unknown-host"
try:
import os

hostname = os.uname()[1]
except Exception as e:
logger.warn(f"Failed to acquire hostname with {e}")
pass

return f"👀 I'll be watching for interesting events on {hlink('DataHub', datahub_home_url)} and keep you updated when anything changes. ⚡"


def get_welcome_message(datahub_home_url: str) -> StructuredMessage:
hostname = "unknown-host"
try:
Expand All @@ -58,6 +74,7 @@ def get_welcome_message(datahub_home_url: str) -> StructuredMessage:

current_time: str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_timezone: str = str(datetime.datetime.now().astimezone().tzinfo)

return StructuredMessage(
title="DataHub Bot 🌟",
properties={
Expand Down Expand Up @@ -127,12 +144,12 @@ def get_message_from_entity_change_event(
)

if category == "lifecycle":
message = f">✏️ {make_bold(actor_name,channel)} has {operation} {entity_specialized_type} {entity_message_trailer}."
message = f"✏️ {make_bold(actor_name,channel)} has {operation} {entity_specialized_type} {entity_message_trailer}."
elif category == "technical_schema":
if event.modifier and event.modifier.startswith("urn:li:schemaField"):
message = f">✏️ {make_bold(actor_name,channel)} has {operation} field {make_bold(modifier_name,channel)} in schema for {entity_specialized_type} {entity_message_trailer}."
message = f"✏️ {make_bold(actor_name,channel)} has {operation} field {make_bold(modifier_name,channel)} in schema for {entity_specialized_type} {entity_message_trailer}."
else:
message = f">✏️ {make_bold(actor_name,channel)} has {operation} {make_bold(modifier_name,channel)} schema for {entity_specialized_type} {entity_message_trailer}."
message = f"✏️ {make_bold(actor_name,channel)} has {operation} {make_bold(modifier_name,channel)} schema for {entity_specialized_type} {entity_message_trailer}."
else:
message = f">✏️ {make_bold(actor_name,channel)} has {operation} {category} {make_bold(modifier_name,channel)} for {entity_specialized_type} {entity_message_trailer}."
return message
message = f"✏️ {make_bold(actor_name,channel)} has {operation} {category} {make_bold(modifier_name,channel)} for {entity_specialized_type} {entity_message_trailer}."
return message