Implementations
This example relates to downloading Orders
, which represent custom datasets served on request.
👉 Goal: Create a parameterised pipeline that can pull in ordered data directly to your Azure blob storage or data lake in a repeatable and parallelised way, implementing optional throttling of the API calls made to the Weather DataHub. 💪
Our finished pipeline will be able to copy multiple files in parallel, and it will look like this:
- register an account on the Met Office Weather DataHub, define data set orders and subscribe to a plan. (Free developer plan available at the time of writing). Have your order ID to hand, as well as your client id and secret for the Weather DataHub.
- Provision or identify the following resources in your Azure subscription:
- Azure Storage Account (this can optionally have hierarchical namespace enabled to make it an Azure Datalake gen2)
- An Azure Data Factory must be available. If provisioning a new Data Factory, chose to skip setting up the git integration for now.
- Option 1: New Azure Data Factory
- Option 2: Existing Azure Data Factory, with existing git integration
- Option 3: Existing Azure Data Factory, not integrated with source control
Start by forking this repo into your own git account.
From the ADF portal (https://adf.azure.com), configure git integration with the following parameters:
- Repository Type:
GitHub
- GitHub account:
<your github user name or organisation>
- Repository name:
work_with_weather_data
- Collaboration branch:
main
- Publish branch:
adf_publish
* - Root folder:
/orchestration/adf
* Note: the adf_publish
branch will be an orphan branch only containing arm templates that can be used to deploy the solution into other environments
Create a new branch in your existing repo to explore this example.
Using the Github portal or a local clones of the relevant repos, manually copy the files in the /orchestration/adf
folder of this repo into the same folder structure in your existing repo. Your root folder as set up in your existing git integration may be in a different location, for example /adf
or /
- that is fine.
Open the Azure Data Factory portal, and refresh, then then switch to the new branch created above.
You can either follow Option 1 above, and allow all existing content of your data factory to be imported into your forked repo.
⚠️ Make sure to remove any hard coded credentials in your existing content before doing this.
Alternatively, manually create stubs for the assets described below in your Azure Data Factory UI, and use the code view (button {}
, top right corner of the authoring window) to replicate the configuration from this sample - you can find the code view in the ./adf
folder.
Navigate to the Manage
tab within Data Factory, and delete the azuregigdatalake
linked service, and replace it with an entry linking your own storage account. (Link a blob storage account or Link an Azure Data lake Storage Gen 2)
Then find the dataset representing the target location for the ingest (see Dataset 3 under Assets below) and ensure it uses your new linked service. Ensure you create a bronze
container or equivalent destination in your datalake, and update the path in the dataset accordingly.
🎉 You're ready to run the pipeline: click the debug button at the top of the pipeline canvas to execute a debug run. A panel will pop up asking for the Data Package id, the Data Package version, and your api key for the OS Data Hub.
In this sample, the ingest is limited to 10 gzipped archives specified in the data package. Review the detail below to adapt this to your needs.
This sample defines one pipeline and three datasets.
Dataset 1: The PackageIndex dataset represents the data retrieved by the initial API call (GET request) to the OS Download API. It is a http dataset of type Json
and we will use this in a Lookup
activity in the pipeline later on.
The package id and package version are parameterised in the Relative URL field as follows:
@concat('downloads/v1/dataPackages/',dataset().package_id,'/versions/',dataset().package_version)
To support this, the Parameters tab for the dataset needs to be configured as follows:
Dataset 2: The OSMM_source_file dataset represents a single file to be downloaded as part of the package. Since the files are zipped gz
files served by a http server, it is a http dataset of type Binary
.
The relative url is parameterised as above, but also with the specific filename. Later, we will loop through the list of filenames that make up a package in a ForEach
activity in the pipeline.
@concat('downloads/v1/dataPackages/',dataset().package_id,'/versions/',dataset().package_version,'/downloads?fileName=',dataset().filename)
To support this, we have an additional parameter configured on this dataset.
Dataset 3: The OSMM_bronze_gml_gz dataset represents the target location for our pipeline, and is configured as an Azure Data Lake Gen2 dataset Since we will be copying the files 'as is', this is also of type Binary
, meaning any copy process will not attempt to read any data schema within the files.
The target path within the bronze
container of the data lake is parameterised as follows:
@concat('gz/OSMM/v1/dataPackages/',dataset().package_id,'/version/',dataset().package_version)
requiring the same parameters to be configured on the dataset as in the case of the source dataset above.
At pipeline level, we configure the following parameters:
and the following variables, all of which will be used in the activities outlined below.
The lookup activity fetches the initial response from the OS Download API:
As we are expecting a single json object to be returned, we also tick the box First row only
.
Since the PackageIndex dataset is parameterised, we have to supply values for package_id
and package_version
here. Rather than hardcoding these, we have parameterised the pipeline itself as shown above, so that these parameters need to be supplied on pipeline invokation.
In the screenshot above, we supply:
# package_id:
@pipeline().parameters.data_package
# package_version:
@pipeline().parameters.data_package_version
# Additional headers follow the format "<key>:<value>"
@concat('key:',pipeline().parameters.api_key)
The call to the package index by the lookup activity returns a json object. By looking at the output of this step in a debug run, we can see the structure of the returned object.
To view the output of an activity, hover over the entry in the debug run log, accessible either via the output tab on the pipeline, or via the monitor pane in ADF:
As a result of the first row only
setting on the lookup activity, the structure of the object returned looks as follows:
In the Set Variable activity, named set array download_urls
, we therefore set:
# Name: (variable needs to be configured at pipeline level as an array, as show above)
download_urls
# Value:
@activity('Lookup Package Index').output.firstRow.downloads
Since the array of download urls can be very large, we set another variable with a small sample of these, so that we can iteratively develop the rest of the pipeline using the small sample only.
In the second Set Variable activity, named set array download_urls_sample
, we therefore set:
# Name: (variable needs to be configured at pipeline level as an array, as show above)
download_urls_sample
# Value:
@take(variables('download_urls'),10)
Since the OS Data Hub implements a rate limit, the implementation here enables the throttling of parallel requests to the API through a combination of parallel batch size and wait time between batches.
Note: With this throttling implementation, it is possible to throttle to 50 calls per second or slower, as the minimum value of a wait activity is 1s, and the maximum batch size setting in a ForEach activity is 50.
In this example, we set the batch size to 5, giving us 2 batches of downloads in our sample of 10 urls.
Setting Items
to the array stored in the download_urls_sample
variable means that within the ForEach
loop, each object in the array will be accessible with @item()
.
We know from reviewing the output of the lookup activity above that each of these objects has the following structure:
{
"fileName": "5436275-NJ6050.gz",
"url": "https://api.os.uk/downloads/v1/dataPackages/0040154231/versions/5500114/downloads?fileName=5436275-NJ6050.gz",
"size": 2833612,
"md5": "2a0605f83156824e8a03c8fe1b1e6102"
},
As the sequential
box is not ticked, the inner activities (call to download endpoint and a 1 second wait) will be executed for 5 files simultaneously:
The source of the copy activity within the ForEach loop supplies both pipeline parameters specifying package_id
and package_version
, and the fileName
from each @item()
to construct the relative url that we have already parameterised in the dataset as outlined above.
Note: we could also have parameterised the dataset differently so as to supply the entire relative url by doing string manipuation of the
@item().url
field to remove the base url.
# package_id:
@pipeline().parameters.data_package
# package_version:
@pipeline().parameters.data_package_version
# filename:
@item().fileName
# Additional headers follow the format "<key>:<value>"
@concat('key:',pipeline().parameters.api_key)
The sink of the copy activity supplies the same parameters so that the parameterised target data path in the datalake can be constructed within the dataset:
When triggering the pipeline, for example with the Debug
button, the api_key has to be supplied, ensuring that the key is not persisted in the code base of the ADF in the linked git repo.
Here is an example of a throttled run, copying batches of 5 files simultaenously with a wait time of 5 seconds.
Hover over an individual copy activity execution to reveal the button to show details relating to the data transfer:
The resulting files can be reviewed in the data lake, for example via the storage browser integrated into the Azure Portal.
Bonus Tip: Setting the compression type of the target data set (see Dataset 3 under Assets above) to
None
instead of togzip (.gz)
would unzip the ingested archives on the fly. In that case, ensure the parameterised filepath in the target dataset is updated (using the @replace function), as these archives otherwise unzip to a file of the same name but without an extension.