Skip to content

Commit d34bfe8

Browse files
authored
Merge pull request #413 from treasure-data/engage_roi_reporting
Add engage ROI reporting workflow
2 parents bef86e0 + 5bcd9da commit d34bfe8

File tree

12 files changed

+1603
-0
lines changed

12 files changed

+1603
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Initialize reporting_agent package
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
"""
5+
Events Master Data Collection Script
6+
7+
This script collects data from CDP APIs and Engage API to create Events Master table
8+
which serves as a reference for linking campaigns, journeys, and activations.
9+
10+
Required environment variables:
11+
- TD_API_KEY: Treasure Data API key
12+
- TD_API_SERVER: Treasure Data API server URL
13+
- TD_PRESTO_API: TD Presto API URL
14+
- TD_PLAZMA_API: TD Plazma API URL
15+
- CDP_API_BASE: CDP API base URL (defaults to https://api-cdp.treasuredata.com)
16+
- ENGAGE_API_BASE: Engage API base URL (defaults to https://engage-api.treasuredata.com)
17+
"""
18+
19+
import os
20+
import logging
21+
import requests
22+
import pytd
23+
import pandas as pd
24+
25+
# Configure logging
26+
logging.basicConfig(
27+
level=logging.INFO,
28+
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
29+
)
30+
logger = logging.getLogger(__name__)
31+
32+
# CDP API endpoints
33+
CDP_API_BASE = os.environ.get('CDP_API_BASE', "https://api-cdp.treasuredata.com")
34+
CDP_PARENT_SEGMENTS_API = "/entities/parent_segments"
35+
CDP_SEGMENT_FOLDERS_API = "/tree/audiences/{audience_id}/segment_folders"
36+
CDP_JOURNEY_API = "/entities/journeys"
37+
38+
# Engage API endpoints
39+
ENGAGE_API_BASE = os.environ.get('ENGAGE_API_BASE', "https://engage-api.treasuredata.com")
40+
ENGAGE_CAMPAIGNS_API = "/api/campaigns"
41+
42+
def get_api_key():
43+
"""Get API key from environment variable"""
44+
api_key = os.environ.get('TD_API_KEY')
45+
if not api_key:
46+
raise ValueError("TD_API_KEY environment variable is not set")
47+
return api_key
48+
49+
def get_headers():
50+
"""Get headers for API requests"""
51+
return {
52+
'Authorization': f'TD1 {get_api_key()}',
53+
'Content-Type': 'application/json'
54+
}
55+
56+
def fetch_cdp_parent_segments():
57+
"""Fetch all parent segments (audiences) from CDP API"""
58+
logger.info("Fetching parent segments (audiences) from CDP API")
59+
url = f"{CDP_API_BASE}{CDP_PARENT_SEGMENTS_API}"
60+
response = requests.get(url, headers=get_headers())
61+
response.raise_for_status()
62+
return response.json()
63+
64+
def fetch_segment_folders(audience_id):
65+
"""Fetch segment folders for a specific audience ID"""
66+
logger.info(f"Fetching segment folders for audience ID {audience_id}")
67+
url = f"{CDP_API_BASE}{CDP_SEGMENT_FOLDERS_API}".format(audience_id=audience_id)
68+
response = requests.get(url, headers=get_headers())
69+
response.raise_for_status()
70+
return response.json()
71+
72+
def fetch_journeys(folder_id):
73+
"""Fetch journeys for a specific folder ID"""
74+
logger.info(f"Fetching journeys for folder ID {folder_id}")
75+
url = f"{CDP_API_BASE}{CDP_JOURNEY_API}"
76+
params = {"folder_id": folder_id}
77+
response = requests.get(url, headers=get_headers(), params=params)
78+
response.raise_for_status()
79+
return response.json()
80+
81+
def fetch_engage_campaigns():
82+
"""Fetch all campaigns from Engage API"""
83+
logger.info("Fetching campaigns from Engage API")
84+
url = f"{ENGAGE_API_BASE}{ENGAGE_CAMPAIGNS_API}"
85+
response = requests.get(url, headers=get_headers())
86+
response.raise_for_status()
87+
return response.json()
88+
89+
def collect_events_master_data():
90+
"""Collect and combine data from CDP and Engage APIs to create Events Master data"""
91+
events_master_data = []
92+
93+
# Fetch and process CDP journeys
94+
try:
95+
ps_response = fetch_cdp_parent_segments()
96+
audiences = ps_response.get('data', [])
97+
for audience in audiences:
98+
audience_id = audience.get('id')
99+
if not audience_id:
100+
continue
101+
102+
folder_response = fetch_segment_folders(audience_id)
103+
folders = folder_response.get('data', [])
104+
for folder in folders:
105+
folder_id = folder.get('id')
106+
if not folder_id:
107+
continue
108+
109+
journey_response = fetch_journeys(folder_id)
110+
journeys = journey_response.get('data', [])
111+
for journey in journeys:
112+
journey_id = journey.get('id')
113+
journey_name = journey.get('name')
114+
launched_at = journey.get('launched_at')
115+
116+
attributes = journey.get('attributes', None)
117+
if not attributes:
118+
continue
119+
120+
# Process journey stages
121+
stages = attributes.get('stages', [])
122+
for stage in stages:
123+
stage_id = stage.get('id')
124+
stage_name = stage.get('name')
125+
126+
events_master_data.append({
127+
'journey_id': journey_id,
128+
'journey_name': journey_name,
129+
'journey_stage_id': stage_id,
130+
'journey_stage_name': stage_name,
131+
'campaign_id': None,
132+
'campaign_name': None,
133+
'launched_at': launched_at
134+
})
135+
except Exception as e:
136+
logger.error(f"Error fetching CDP journeys: {e}")
137+
138+
# Fetch and process Engage campaigns
139+
try:
140+
campaigns_response = fetch_engage_campaigns()
141+
campaigns = campaigns_response.get('data', [])
142+
143+
for campaign in campaigns:
144+
attributes = campaign.get('attributes', {})
145+
campaign_id = campaign.get('id')
146+
campaign_name = attributes.get('name')
147+
launched_at = attributes.get('launched_at')
148+
149+
events_master_data.append({
150+
'journey_id': None,
151+
'journey_name': None,
152+
'journey_stage_id': None,
153+
'journey_stage_name': None,
154+
'campaign_id': campaign_id,
155+
'campaign_name': campaign_name,
156+
'launched_at': launched_at
157+
})
158+
except Exception as e:
159+
logger.error(f"Error fetching Engage campaigns: {e}")
160+
161+
return events_master_data
162+
163+
def update_events_master(database, events_master_table):
164+
"""Update the Events Master table in Treasure Data"""
165+
logger.info(f"Updating Events Master table: {database}.{events_master_table}")
166+
167+
# Collect events master data
168+
events_master_data = collect_events_master_data()
169+
170+
if not events_master_data:
171+
logger.warning("No events master data collected, aborting update")
172+
return
173+
174+
# Convert to DataFrame for TD import
175+
df = pd.DataFrame(events_master_data)
176+
177+
# Initialize TD client
178+
client = pytd.Client(
179+
apikey=get_api_key(),
180+
endpoint=os.environ.get('TD_API_SERVER', 'https://api.treasuredata.com'),
181+
database=database
182+
)
183+
184+
# Create or replace table
185+
logger.info(f"Writing {len(df)} records to {database}.{events_master_table}")
186+
client.load_table_from_dataframe(
187+
df,
188+
f"{database}.{events_master_table}",
189+
if_exists='overwrite'
190+
)
191+
192+
logger.info(f"Successfully updated {database}.{events_master_table}")
193+
194+
if __name__ == "__main__":
195+
# For local testing only
196+
import argparse
197+
parser = argparse.ArgumentParser(description='Update Events Master table')
198+
parser.add_argument('--database', required=True, help='TD database name')
199+
parser.add_argument('--table', required=True, help='TD table name')
200+
args = parser.parse_args()
201+
202+
update_events_master(args.database, args.table)

0 commit comments

Comments
 (0)