- 
                Notifications
    
You must be signed in to change notification settings  - Fork 74
 
Add new import logs senario #315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| # Workflow: Import Treasure Data Logs from Data Landing Area | ||
| This example shows how you can use workflow to ingest Treasure Data Logs From Data Landing Areas to your Treasure Data account. | ||
| 
     | 
||
| # How to Run | ||
| ## Requirement | ||
| The workflow requires that Data Landing Areas feature is enabled in your Treasure Data account and thne you've got your User ID to access to it. | ||
                
       | 
||
| 
     | 
||
| ## Steps | ||
| First, edit configurations. You can find the following settings in the `import_td_logs.dig` file. | ||
| 
     | 
||
| | Parameter | Description | | ||
| | ---- | ---- | | ||
| | api_endpoint | The endpoint of the Treasure Data API. See this [document]('https://docs.treasuredata.com/display/public/PD/Sites+and+Endpoints'). (e.g. https://api.treasuredata.com) | | ||
                
      
                  akito19 marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| | dla_host | The hostname of the Data Landing Area (e.g. dla1.treasuredata-co.jp) | | ||
| | user_id | Your user_id received from TD when you enabled Data Landing Areas feature | | ||
| | account_id | Your TD account_id | | ||
| | query_logs_table | The table name where query logs are stored (e.g. query_logs) | | ||
| | workflow_logs_table | The table name where workflow logs are stored (e.g. workflow_logs) | | ||
| | users_table | The table name where users data are stored (e.g. users) | | ||
| 
     | 
||
| Next, upload the workflow to Treasure Data. | ||
| 
     | 
||
| # Upload | ||
| $ td wf push import_td_logs | ||
| 
     | 
||
| Set secrets with your private key that is the rest of public key you gave to TD when you enabled Data Landing Areas feature. | ||
| 
     | 
||
| $ td wf secrets --project import_td_logs --set sftp.dla_secret_key_file=@~/.ssh/id_rsa_dla | ||
| $ td wf secrets --project import_td_logs --set td.apikey | ||
| 
     | 
||
| You can trigger the session manually to watch it execute. | ||
| 
     | 
||
| # Run | ||
| $ td wf start import_td_logs import_td_logs --session now | ||
| 
     | 
||
| If you have any questions, contact to [email protected]. | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| in: | ||
| type: sftp | ||
| host: ${dla_host} | ||
| user: ${user_id} | ||
| secret_key_file: {"content": "${secret:sftp.dla_secret_key_file}"} | ||
| path_prefix: "/treasure-data-logs/production/${account_id}/query_logs/v1/data.csv" | ||
| parser: | ||
| charset: UTF-8 | ||
| newline: CRLF | ||
| type: csv | ||
| delimiter: "," | ||
| quote: "\"" | ||
| escape: "\"" | ||
| trim_if_not_quoted: false | ||
| skip_header_lines: 1 | ||
| allow_extra_columns: false | ||
| allow_optional_columns: false | ||
| columns: | ||
| - {name: date, type: string} | ||
| - {name: account_id, type: string} | ||
| - {name: user_id, type: string} | ||
| - {name: project_name, type: string} | ||
| - {name: workflow_name, type: string} | ||
| - {name: task_id, type: string} | ||
| - {name: job_id, type: long} | ||
| - {name: query_id, type: string} | ||
| - {name: created_at, type: string} | ||
| - {name: scheduled_at, type: string} | ||
| - {name: start_at, type: string} | ||
| - {name: end_at, type: string} | ||
| - {name: queued_sec, type: long} | ||
| - {name: running_sec, type: long} | ||
| - {name: result_type, type: string} | ||
| - {name: load_type, type: string} | ||
| - {name: records, type: long} | ||
| - {name: type, type: string} | ||
| - {name: query_status, type: string} | ||
| - {name: result_size, type: long} | ||
| - {name: split_hours, type: double} | ||
| - {name: time, type: long} | ||
| out: {} | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| in: | ||
| type: sftp | ||
| host: ${dla_host} | ||
| user: ${user_id} | ||
| secret_key_file: {"content": "${secret:sftp.dla_secret_key_file}"} | ||
| path_prefix: "/treasure-data-logs/production/${account_id}/workflow_logs/v1/data.csv" | ||
                
       | 
||
| parser: | ||
| charset: UTF-8 | ||
| newline: CRLF | ||
| type: csv | ||
| delimiter: "," | ||
| quote: "\"" | ||
| escape: "\"" | ||
| trim_if_not_quoted: false | ||
| skip_header_lines: 1 | ||
| allow_extra_columns: false | ||
| allow_optional_columns: false | ||
| columns: | ||
| - {name: account_id, type: string} | ||
| - {name: project_id, type: string} | ||
| - {name: workflow_id, type: string} | ||
| - {name: session_id, type: string} | ||
| - {name: attempt_id, type: string} | ||
| - {name: task_id, type: string} | ||
| - {name: user_id, type: string} | ||
| - {name: project_name, type: string} | ||
| - {name: workflow_name, type: string} | ||
| - {name: timezone, type: string} | ||
| - {name: session_time, type: string} | ||
| - {name: attempt_created_at, type: string} | ||
| - {name: attempt_finished_at, type: string} | ||
| - {name: task_name, type: string} | ||
| - {name: task_start_at, type: string} | ||
| - {name: task_end_at, type: string} | ||
| - {name: attempt_running_sec, type: string} | ||
| - {name: task_running_sec, type: string} | ||
| - {name: state, type: string} | ||
| - {name: date, type: string} | ||
| - {name: time, type: long} | ||
| out: {} | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,42 @@ | ||||||||||||||||||||||||||||||||||||||
| timezone: UTC | ||||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||||
| schedule: | ||||||||||||||||||||||||||||||||||||||
| daily>: 03:00:00 | ||||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||||
| _export: | ||||||||||||||||||||||||||||||||||||||
| td: | ||||||||||||||||||||||||||||||||||||||
| database: treaure-data-logs | ||||||||||||||||||||||||||||||||||||||
| api_endpoint: https://api.treasuredata.com | ||||||||||||||||||||||||||||||||||||||
| dla_host: dla1.treasuredata-co.jp | ||||||||||||||||||||||||||||||||||||||
| user_id: abcdefg012345 | ||||||||||||||||||||||||||||||||||||||
| account_id: 1 | ||||||||||||||||||||||||||||||||||||||
| query_logs_table: query_logs | ||||||||||||||||||||||||||||||||||||||
| workflow_logs_table: workflow_logs | ||||||||||||||||||||||||||||||||||||||
| users_table: users | ||||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||||
| +create_databases: | ||||||||||||||||||||||||||||||||||||||
| td_ddl>: | ||||||||||||||||||||||||||||||||||||||
| create_databases: [${td.database}] | ||||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||||
| +create_table: | ||||||||||||||||||||||||||||||||||||||
| td_ddl>: | ||||||||||||||||||||||||||||||||||||||
| create_tables: [${query_logs_table}, ${workflow_logs_table}, ${users_table}] | ||||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||||
| +import: | ||||||||||||||||||||||||||||||||||||||
| +query_logs: | ||||||||||||||||||||||||||||||||||||||
| td_load>: config/query_log.yml | ||||||||||||||||||||||||||||||||||||||
| table: ${query_logs_table} | ||||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||||
| +workflow_logs: | ||||||||||||||||||||||||||||||||||||||
| td_load>: config/workflow_log.yml | ||||||||||||||||||||||||||||||||||||||
| table: ${workflow_logs_table} | ||||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||||
| +users: | ||||||||||||||||||||||||||||||||||||||
| _env: | ||||||||||||||||||||||||||||||||||||||
| TD_API_KEY: ${secret:td.apikey} | ||||||||||||||||||||||||||||||||||||||
| py>: script.import_td_users.import_users | ||||||||||||||||||||||||||||||||||||||
| database: ${td.database} | ||||||||||||||||||||||||||||||||||||||
| table: ${users_table} | ||||||||||||||||||||||||||||||||||||||
| api_endpoint: ${api_endpoint} | ||||||||||||||||||||||||||||||||||||||
| docker: | ||||||||||||||||||||||||||||||||||||||
| image: "digdag/digdag-python:3.9" | ||||||||||||||||||||||||||||||||||||||
                
       | 
||||||||||||||||||||||||||||||||||||||
| +users: | |
| _env: | |
| TD_API_KEY: ${secret:td.apikey} | |
| py>: script.import_td_users.import_users | |
| database: ${td.database} | |
| table: ${users_table} | |
| api_endpoint: ${api_endpoint} | |
| docker: | |
| image: "digdag/digdag-python:3.9" | |
| +users: | |
| _env: | |
| TD_API_KEY: ${secret:td.apikey} | |
| py>: script.import_td_users.import_users | |
| database: ${td.database} | |
| table: ${users_table} | |
| api_endpoint: ${api_endpoint} | |
| docker: | |
| image: "digdag/digdag-python:3.9" | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| import os | ||
| import sys | ||
| os.system(f"{sys.executable} -m pip install -U pandas requests pytd==1.3.0") | ||
| import pandas as pd | ||
| import pytd | ||
| import requests | ||
| 
     | 
||
| td_apikey = os.getenv("TD_API_KEY") | ||
| 
     | 
||
| 
     | 
||
| def import_users(database, table, api_endpoint): | ||
| # get users data | ||
| headers = {'Authorization': 'TD1 {}'.format(td_apikey)} | ||
| r = requests.get('{}/v3/user/list'.format(api_endpoint), headers=headers) | ||
| 
     | 
||
| # write users data | ||
| df = pd.json_normalize(r.json(), record_path=['users']) | ||
| client = pytd.Client(apikey=td_apikey, database=database) | ||
| client.load_table_from_dataframe( | ||
| df, table, writer='bulk_import', if_exists='overwrite') | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a statement;
"This is Opt-in feature. Contact your Customer Success rep or Technical Support if you have an interest in this feature."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Will fix.