Skip to content

Commit 399a857

Browse files
committed
add send email with excel & segment sync with activation
1 parent d34bfe8 commit 399a857

File tree

8 files changed

+409
-0
lines changed

8 files changed

+409
-0
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Segment sync (inter master segment replication) with activation via Activation Actions
2+
3+
----
4+
## Overview
5+
6+
This project provides a solution to replicating segments from one master segment to another using Activation Actions.
7+
8+
----
9+
## Implementation
10+
1. Copy and paste the code into a custom script in Treasure Workflows.
11+
12+
----
13+
## Considerations
14+
15+
This project doesn't handle "Segment Include/Exclude" segment rules.
16+
17+
----
18+
## Questions
19+
20+
Please feel free to reach out to [email protected] with any questions you have about using this code.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#config/params.yaml
2+
aud_api_ep: https://api-cdp.treasuredata.com #US endpoint
3+
target_audience_ids: ["12344567", "7654321"]
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
#scripts/seg_act_rep_inter_ms.py
2+
import os
3+
import requests
4+
5+
def main(**kwargs):
6+
# Init
7+
td_api_key = os.getenv("TD_API_KEY")
8+
audience_api_ep = kwargs.get("aud_api_ep")
9+
source_audience_id = str(os.getenv("AUDIENCE_ID"))
10+
target_audience_ids = kwargs.get("target_audience_ids")
11+
source_segment_id = str(os.getenv("SEGMENT_ID"))
12+
source_activation_id = str(os.getenv("ACTIVATION_ID"))
13+
get_segment_path = "/audiences/{audienceId}/segments/{segmentId}"
14+
get_activation_path = "/audiences/{audienceId}/segments/{segmentId}/syndications/{syndicationId}"
15+
get_folder_path = "/entities/folders/{folderId}"
16+
get_folder_list_path = "/audiences/{audienceId}/folders"
17+
get_segment_list_path = "/audiences/{audienceId}/folders/{folderId}/segments"
18+
post_folder_path = "/entities/folders"
19+
post_new_segment_path = "/audiences/{audienceId}/segments"
20+
post_new_activation_path = "/audiences/{audienceId}/segments/{segmentId}/syndications"
21+
22+
# Get source segment json data
23+
def get_source_segment():
24+
payload = {}
25+
headers = {
26+
"Authorization": f"TD1 {td_api_key}"
27+
}
28+
get_source_segment_path = audience_api_ep + get_segment_path.replace("{audienceId}", source_audience_id).replace("{segmentId}", source_segment_id)
29+
source_segment = requests.request("GET", get_source_segment_path, headers = headers, data = payload)
30+
31+
if source_segment.status_code == 200:
32+
source_segment_json = source_segment.json()
33+
else:
34+
print(f"Failed to fetch source segment. Status code: {source_segment.status_code} - {source_segment.reason} - {source_segment.text}")
35+
exit()
36+
37+
return source_segment_json
38+
39+
# Get target segment id
40+
def get_target_segment_id(segment_target_folder_id, source_segment_name):
41+
payload = {}
42+
headers = {
43+
"Authorization": f"TD1 {td_api_key}"
44+
}
45+
get_target_segment_list_path = audience_api_ep + get_segment_list_path.replace("{audienceId}", target_audience_id).replace("{folderId}", segment_target_folder_id)
46+
target_segment_list = requests.request("GET", get_target_segment_list_path, headers = headers, data = payload)
47+
48+
if target_segment_list.status_code == 200:
49+
target_segment_list_json = target_segment_list.json()
50+
target_segment = next((item for item in target_segment_list_json if item["name"] == source_segment_name), None)
51+
target_segment_id = target_segment["id"]
52+
else:
53+
print(f"Failed to fetch target segment. Status code: {target_segment_list.status_code} - {target_segment_list.reason} - {target_segment_list.text}")
54+
exit()
55+
56+
return target_segment_id
57+
58+
# Get source activation json data
59+
def get_source_activation():
60+
payload = {}
61+
headers = {
62+
"Authorization": f"TD1 {td_api_key}"
63+
}
64+
get_source_activation_path = audience_api_ep + get_activation_path.replace("{audienceId}", source_audience_id).replace("{segmentId}", source_segment_id).replace("{syndicationId}", source_activation_id)
65+
source_activation = requests.request("GET", get_source_activation_path, headers = headers, data = payload)
66+
67+
if source_activation.status_code == 200:
68+
source_activation_json = source_activation.json()
69+
else:
70+
print(f"Failed to fetch source activation. Status code: {source_activation.status_code} - {source_activation.reason} - {source_activation.text}")
71+
exit()
72+
73+
return source_activation_json
74+
75+
# Get source folder json data
76+
def get_source_folder(source_segment_json):
77+
payload = {}
78+
headers = {
79+
"Authorization": f"TD1 {td_api_key}"
80+
}
81+
source_folder_id = source_segment_json["segmentFolderId"]
82+
get_source_folder_path = audience_api_ep + get_folder_path.replace("{folderId}", source_folder_id)
83+
source_folder = requests.request("GET", get_source_folder_path, headers = headers, data = payload)
84+
85+
if source_folder.status_code == 200:
86+
source_folder_json = source_folder.json()
87+
else:
88+
print(f"Failed to fetch source folder. Status code: {source_folder.status_code} - {source_folder.reason} - {source_folder.text}")
89+
exit()
90+
91+
return source_folder_json
92+
93+
# Get source parent folder json data
94+
def get_source_parent_folder(source_folder_json):
95+
payload = {}
96+
headers = {
97+
"Authorization": f"TD1 {td_api_key}"
98+
}
99+
parent_folder_id = source_folder_json["data"]["relationships"]["parentFolder"]["data"]["id"]
100+
get_parent_folder_path = audience_api_ep + get_folder_path.replace("{folderId}", parent_folder_id)
101+
parent_folder = requests.request("GET", get_parent_folder_path, headers = headers, data = payload)
102+
103+
if parent_folder.status_code == 200:
104+
parent_folder_json = parent_folder.json()
105+
else:
106+
print(f"Failed to fetch parent folder. Status code: {parent_folder.status_code} - {parent_folder.reason} - {parent_folder.text}")
107+
exit()
108+
109+
return parent_folder_json
110+
111+
# Get target master segment folders json data
112+
def get_target_master_segment_folders():
113+
payload = {}
114+
headers = {
115+
"Authorization": f"TD1 {td_api_key}"
116+
}
117+
get_target_master_segment_folders_path = audience_api_ep + get_folder_list_path.replace("{audienceId}", target_audience_id)
118+
target_master_segment_folders = requests.request("GET", get_target_master_segment_folders_path, headers = headers, data = payload)
119+
120+
if target_master_segment_folders.status_code == 200:
121+
target_master_segment_folders_json = target_master_segment_folders.json()
122+
else:
123+
print(f"Failed to fetch target master segment. Status code: {target_master_segment_folders.status_code} - {target_master_segment_folders.reason} - {target_master_segment_folders.text}")
124+
exit()
125+
126+
return target_master_segment_folders_json
127+
128+
# Post new folder
129+
def post_target_folder(source_folder_json, segment_target_parent_folder_id):
130+
folder_name = source_folder_json["data"]["attributes"]["name"]
131+
payload = {
132+
"type": "folder-segment",
133+
"attributes": {
134+
"name": folder_name,
135+
"description": source_folder_json["data"]["attributes"]["description"]
136+
},
137+
"relationships": {
138+
"parentFolder": {
139+
"data": {
140+
"id": segment_target_parent_folder_id,
141+
"type": "folder-segment"
142+
}
143+
}
144+
}
145+
}
146+
headers = {
147+
"Authorization": f"TD1 {td_api_key}",
148+
"Content-Type": "application/json"
149+
}
150+
new_folder = requests.request("POST", audience_api_ep + post_folder_path, headers = headers, json = payload)
151+
if new_folder.status_code == 200:
152+
print(f"'{folder_name}' folder was synced to '{target_audience_id}' successfully!")
153+
new_folder_response_json = new_folder.json()
154+
target_folder_id = new_folder_response_json["data"]["id"]
155+
else:
156+
print(f"Failed to post folder: '{folder_name}' - Status code: {new_folder.status_code} - {new_folder.reason} - {new_folder.text}")
157+
exit()
158+
159+
return target_folder_id
160+
161+
# Post new segment
162+
def post_target_segment(source_segment_json, segment_target_folder_id):
163+
headers = {
164+
"Authorization": f"TD1 {td_api_key}"
165+
}
166+
payload = {
167+
"audienceId": target_audience_id,
168+
"name": source_segment_json["name"],
169+
"realtime": source_segment_json["realtime"],
170+
"isVisible": source_segment_json["isVisible"],
171+
"kind": source_segment_json["kind"],
172+
"description": source_segment_json["description"],
173+
"segmentFolderId": segment_target_folder_id,
174+
"rule": source_segment_json["rule"],
175+
}
176+
segment_name = source_segment_json["name"]
177+
new_segment = requests.request("POST", audience_api_ep + post_new_segment_path.replace("{audienceId}", target_audience_id), headers = headers, json = payload)
178+
if new_segment.status_code == 200:
179+
print(f"'{segment_name}' segment was synced to '{target_audience_id}' successfully!")
180+
new_segment_response_json = new_segment.json()
181+
target_segment_id = new_segment_response_json["id"]
182+
post_target_activation(get_source_activation(), target_segment_id)
183+
elif new_segment.status_code == 400:
184+
post_target_activation(get_source_activation(), get_target_segment_id(segment_target_folder_id, source_segment_json["name"]))
185+
else:
186+
print(f"Failed to post segment - Status code: {new_segment.status_code} - {new_segment.reason} - {new_segment.text}")
187+
exit()
188+
189+
# Post activation to new segment
190+
def post_target_activation(source_activation_json, target_segment_id):
191+
headers = {
192+
"Authorization": f"TD1 {td_api_key}"
193+
}
194+
payload = {
195+
"type": "syndication",
196+
"segmentId": target_segment_id,
197+
"name": source_activation_json["name"],
198+
"activationTemplateId": source_activation_json["activationTemplateId"],
199+
"allColumns": source_activation_json["allColumns"],
200+
"connectionId": source_activation_json["connectionId"],
201+
"description": source_activation_json["description"],
202+
"scheduleType": source_activation_json["scheduleType"],
203+
"scheduleOption": source_activation_json["scheduleOption"],
204+
"repeatSubFrequency": source_activation_json["repeatSubFrequency"],
205+
"timezone": source_activation_json["timezone"],
206+
"notifyOn": source_activation_json["notifyOn"],
207+
"emailRecipients": source_activation_json["emailRecipients"],
208+
"connectorConfig": source_activation_json["connectorConfig"],
209+
"audienceId": target_audience_id,
210+
"columns": source_activation_json["columns"],
211+
}
212+
activation_name = source_activation_json["name"]
213+
new_activation = requests.request("POST", audience_api_ep + post_new_activation_path.replace("{audienceId}", target_audience_id).replace("{segmentId}", target_segment_id), headers = headers, json = payload)
214+
if new_activation.status_code == 200:
215+
print(f"'{activation_name}' activation was synced to '{target_audience_id}' successfully!")
216+
else:
217+
print(f"Failed to post activation - Status code: {new_activation.status_code} - {new_activation.reason} - {new_activation.text}")
218+
exit()
219+
220+
# Sync segment with activation
221+
def replicate(target_master_segment_folders_json, source_folder_json, source_segment_json):
222+
target_master_segment_folders_names = [item["name"] for item in target_master_segment_folders_json]
223+
target_master_segment_folders_ids_names = {item["name"]: item["id"] for item in target_master_segment_folders_json}
224+
source_folder_name = source_folder_json["data"]["attributes"]["name"]
225+
source_folder_type = source_folder_json["data"]["relationships"]["parentFolder"]["data"]
226+
if source_folder_type is None:
227+
source_parent_folder_type = "root"
228+
else:
229+
source_parent_folder_json = get_source_parent_folder(source_folder_json)
230+
source_parent_folder_type = source_parent_folder_json["data"]["relationships"]["parentFolder"]["data"]
231+
if source_folder_name in target_master_segment_folders_names:
232+
segment_target_folder_id = target_master_segment_folders_ids_names[source_folder_name]
233+
post_target_segment(source_segment_json, segment_target_folder_id)
234+
elif source_parent_folder_type is None:
235+
segment_target_folder = next((item for item in target_master_segment_folders_json if item["parentFolderId"] is None), None)
236+
post_target_segment(source_segment_json, post_target_folder(source_folder_json, segment_target_folder["id"]))
237+
elif source_parent_folder_type == "root":
238+
segment_target_folder = next((item for item in target_master_segment_folders_json if item["parentFolderId"] is None), None)
239+
post_target_segment(source_segment_json, segment_target_folder["id"])
240+
else:
241+
print("Unexpected error")
242+
exit()
243+
244+
# Replicate segment to each target audience
245+
for ta_id in target_audience_ids:
246+
target_audience_id = ta_id
247+
replicate(get_target_master_segment_folders(), get_source_folder(get_source_segment()), get_source_segment())
248+
print("Fin")
249+
250+
# Main
251+
if __name__ == "__main__":
252+
main()
253+
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#segment_w_activation_inter_ms_sync.dig
2+
timezone: Asia/Tokyo
3+
4+
_export:
5+
!include : config/params.yaml
6+
7+
+echo_activation_actions_parameters:
8+
echo>: segment_id ==> ${segment_id}, segment_name ==> ${segment_name}, activation_id ==> ${activation_id}, audience_id ==> ${audience_id}
9+
+replicate:
10+
py>: scripts.seg_act_rep_inter_ms.main
11+
_env:
12+
TD_API_KEY: ${secret:td.apikey}
13+
SEGMENT_ID: ${segment_id}
14+
ACTIVATION_ID: ${activation_id}
15+
AUDIENCE_ID: ${audience_id}
16+
docker:
17+
image: "digdag/digdag-python:3.10"
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Send email with excel attachment
2+
----
3+
## Overview
4+
5+
This project provides a solution to sending an email with an excel attachment that contains the activated segment audience.
6+
7+
----
8+
## Implementation
9+
1. Copy and paste the code into a custom script in Treasure Workflows.
10+
11+
----
12+
## Considerations
13+
14+
Need STMP relay setup, e.g.: Gmail.
15+
16+
----
17+
## Questions
18+
19+
Please feel free to reach out to [email protected] with any questions you have about using this code.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#config/params.yaml
2+
td_api_ep: https://api.treasuredata.com # US endpoint
3+
sender_email: [email protected]
4+
sender_name: "your sender email display name"
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#scripts/send_email_w_excel_attachment.py
2+
import os
3+
import sys
4+
os.system(f"{sys.executable} -m pip install openpyxl")
5+
6+
import pandas as pd
7+
import pytd
8+
from io import BytesIO
9+
import smtplib
10+
from email.message import EmailMessage
11+
from email.utils import formataddr
12+
13+
def main(**kwargs):
14+
# Init
15+
td_api_key = os.getenv("TD_API_KEY")
16+
td_api_ep = os.getenv("TD_API_EP")
17+
gmail_app_pw = os.getenv("GMAIL_APP_PW")
18+
database = os.getenv("DATABASE")
19+
src_tbl = os.getenv("SRC_TBL")
20+
sender_email = os.getenv("SENDER_EMAIL")
21+
sender_name = os.getenv("SENDER_NAME")
22+
23+
td = pytd.Client(apikey=td_api_key, endpoint=td_api_ep, database=database, default_engine="presto")
24+
# Create a dataframe by importing data from Activation Actions database.table
25+
data_res = td.query(f"SELECT * FROM TABLE(exclude_columns(input => TABLE({database}.{src_tbl}),columns => DESCRIPTOR(email_to,email_subject)))")
26+
data_df = pd.DataFrame(**data_res)
27+
email_to_res = td.query(f"SELECT email_to FROM {database}.{src_tbl} LIMIT 1")
28+
email_to_df = pd.DataFrame(**email_to_res)
29+
email_to = email_to_df.squeeze()
30+
email_subject_res = td.query(f"SELECT email_subject FROM {database}.{src_tbl} LIMIT 1")
31+
email_subject_df = pd.DataFrame(**email_subject_res)
32+
email_subject = email_subject_df.squeeze()
33+
34+
# Save the dataframe to an excel file in memory
35+
excel_buffer = BytesIO()
36+
data_df.to_excel(excel_buffer, index=False, sheet_name="Scores")
37+
excel_buffer.seek(0)
38+
39+
# Prepare the email
40+
msg = EmailMessage()
41+
msg["Subject"] = email_subject
42+
msg["From"] = formataddr((sender_name, sender_email))
43+
msg["To"] = email_to
44+
msg.set_content("Hi,\n\nPlease find the attached excel file.\n\nBest regards,\nTD CDP Sony SG")
45+
msg.add_attachment(
46+
excel_buffer.read(),
47+
maintype="application",
48+
subtype="vnd.openxmlformats-officedocument.spreadsheetml.sheet",
49+
filename="report.xlsx"
50+
)
51+
52+
# Send the email with Gmail SMTP
53+
smtp_server = "smtp.gmail.com"
54+
smtp_port = 587
55+
smtp_account = "[email protected]"
56+
password = gmail_app_pw # generate app password: https://myaccount.google.com/apppasswords
57+
58+
with smtplib.SMTP(smtp_server, smtp_port) as server:
59+
server.starttls()
60+
server.login(smtp_account, password)
61+
server.send_message(msg)
62+
63+
print("Email sent successfully!")
64+
print(msg)
65+
66+
# Main
67+
if __name__ == "__main__":
68+
main()

0 commit comments

Comments
 (0)