Skip to content

Commit 83bf2e8

Browse files
authored
add comare_workflow_task sample to monitoring/app (#405)
* add comare_workflow_task sample to monitoring/app
1 parent 0ee80ba commit 83bf2e8

File tree

11 files changed

+134
-0
lines changed

11 files changed

+134
-0
lines changed
+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Workflow: Scenario (compare tasks's duration in workflow between multiple attempts)
2+
3+
## Scenario
4+
5+
The purpose of this scenario is to compare tasks's duration in workflow between multiple attemps.
6+
7+
### Steps
8+
#### 1. push this workflow to Treasure Data
9+
```
10+
> cd compare_workflow_task
11+
> td push compare_workflow_task
12+
```
13+
14+
#### 2. configure endpoint settings
15+
- api_endpoint
16+
- workflow_endpoint
17+
![](images/1.png)
18+
19+
#### 3. configure attempts (you want to compare attempt)
20+
![](images/2.png)
21+
22+
#### 4. register td.apikey as a secret (Owner of td.apikey must be attempts which you specify.)
23+
![](images/3.png)
24+
25+
#### 5. run workflow
26+
![](images/4.png)
27+
28+
29+
After this workflow run, you can get the following query result.
30+
![](images/5.png)
31+
32+
![](images/6.png)
33+
34+
You can compare tasks's duration between multiple attempts.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
_export:
2+
td:
3+
database: temporary_${session_id}
4+
tables:
5+
tasks: tasks
6+
api_endpoint: api.treasuredata.com
7+
workflow_endpoint: api-workflow.treasuredata.com
8+
attempt_ids:
9+
- 1201247649
10+
- 1200176632
11+
- 1199185996
12+
13+
+create_temporary_db:
14+
td_ddl>:
15+
create_databases: ["${td.database}"]
16+
17+
+get_attempt_task:
18+
py>: scripts.ingest_task.run
19+
session_unixtime: ${session_unixtime}
20+
dest_db: ${td.database}
21+
dest_table: ${td.tables.tasks}
22+
attempt_ids: ${attempt_ids.join(',')}
23+
api_endpoint: ${td.api_endpoint}
24+
workflow_endpoint: ${td.workflow_endpoint}
25+
docker:
26+
image: "digdag/digdag-python:3.9"
27+
_env:
28+
TD_API_KEY: ${secret:td.apikey}
29+
30+
+gen_query:
31+
td>: queries/gen_query.sql
32+
store_last_results: true
33+
34+
+compare_task:
35+
td>: queries/compare_task.sql
36+
37+
+delete_temporary_db:
38+
td_ddl>:
39+
drop_databases: ["${td.database}"]
40+
41+
416 KB
Loading
416 KB
Loading
265 KB
Loading
151 KB
Loading
416 KB
Loading
262 KB
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
select
2+
fullname as task_name,
3+
${td.last_results.query}
4+
from ${td.tables.tasks}
5+
group by 1
6+
order by max(id)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
with temp1 as (
2+
select 'max(If(attemptid=''' || attemptid || ''', DATE_DIFF(''second'', DATE_PARSE(startedat, ''%Y-%m-%dT%H:%i:%sZ''), DATE_PARSE(updatedat, ''%Y-%m-%dT%H:%i:%sZ'')), NULL)) as "' || attemptid || '"' as query_fragment from ${td.tables.tasks}
3+
group by 1
4+
)
5+
select array_join(array_agg(query_fragment), ',') as query from temp1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import requests
2+
import os
3+
import pytd
4+
import pandas as pd
5+
import json
6+
7+
def convert_to_json(s):
8+
return json.dumps(s)
9+
10+
def get_task_info(base_url, headers, ids):
11+
l = []
12+
for i in ids:
13+
url = base_url % i
14+
print(url)
15+
res = requests.get(url=url, headers=headers)
16+
if res.status_code != requests.codes.ok:
17+
res.raise_for_status()
18+
tasks = res.json()['tasks']
19+
for t in tasks:
20+
t['attemptid'] = i
21+
l.extend(tasks)
22+
return l
23+
24+
def insert_task_info(import_unixtime, endpoint, apikey, dest_db, dest_table, tasks):
25+
df = pd.DataFrame(tasks)
26+
df['time'] = int(import_unixtime)
27+
df['config'] = df['config'].apply(convert_to_json)
28+
df['upstreams'] = df['upstreams'].apply(convert_to_json)
29+
df['exportParams'] = df['exportParams'].apply(convert_to_json)
30+
df['storeParams'] = df['storeParams'].apply(convert_to_json)
31+
df['stateParams'] = df['stateParams'].apply(convert_to_json)
32+
df['error'] = df['error'].apply(convert_to_json)
33+
client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
34+
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
35+
36+
def run(session_unixtime, dest_db, dest_table, attempt_ids, api_endpoint='api.treasuredata.com', workflow_endpoint='api-workflow.treasuredata.com'):
37+
id_list = attempt_ids.split(',')
38+
if len(id_list) == 0:
39+
print('no attempt id')
40+
return
41+
42+
workflow_url = 'https://%s/api/attempts' % workflow_endpoint + '/%s/tasks'
43+
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
44+
l = get_task_info(workflow_url, headers, id_list)
45+
if len(l) == 0:
46+
print('no update record')
47+
return
48+
insert_task_info(session_unixtime, 'https://%s' % api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, l)

0 commit comments

Comments
 (0)