|
210 | 210 | "metadata": {},
|
211 | 211 | "source": [
|
212 | 212 | "## Data Ingestion Pipeline \n",
|
213 |
| - "For this demo, we will use NOAA weather data from [Azure Open Datasets](https://azure.microsoft.com/services/open-datasets/). You can replace this with your own dataset, or you can skip this pipeline if you already have a time-series based `TabularDataset`.\n", |
| 213 | + "For this demo, we will use NOAA weather data from [Azure Open Datasets](https://azure.microsoft.com/services/open-datasets/). You can replace this with your own dataset, or you can skip this pipeline if you already have a time-series based `TabularDataset`.\n" |
| 214 | + ] |
| 215 | + }, |
| 216 | + { |
| 217 | + "cell_type": "code", |
| 218 | + "execution_count": null, |
| 219 | + "metadata": {}, |
| 220 | + "outputs": [], |
| 221 | + "source": [ |
| 222 | + "# The name and target column of the Dataset to create \n", |
| 223 | + "dataset = \"NOAA-Weather-DS4\"\n", |
| 224 | + "target_column_name = \"temperature\"" |
| 225 | + ] |
| 226 | + }, |
| 227 | + { |
| 228 | + "cell_type": "markdown", |
| 229 | + "metadata": {}, |
| 230 | + "source": [ |
214 | 231 | "\n",
|
215 | 232 | "### Upload Data Step\n",
|
216 | 233 | "The data ingestion pipeline has a single step with a script to query the latest weather data and upload it to the blob store. During the first run, the script will create and register a time-series based `TabularDataset` with the past one week of weather data. For each subsequent run, the script will create a partition in the blob store by querying NOAA for new weather data since the last modified time of the dataset (`dataset.data_changed_time`) and creating a data.csv file."
|
|
225 | 242 | "from azureml.pipeline.core import Pipeline, PipelineParameter\n",
|
226 | 243 | "from azureml.pipeline.steps import PythonScriptStep\n",
|
227 | 244 | "\n",
|
228 |
| - "# The name of the Dataset to create \n", |
229 |
| - "dataset = \"NOAA-Weather-DS4\"\n", |
230 | 245 | "ds_name = PipelineParameter(name=\"ds_name\", default_value=dataset)\n",
|
231 | 246 | "upload_data_step = PythonScriptStep(script_name=\"upload_weather_data.py\", \n",
|
232 | 247 | " allow_reuse=False,\n",
|
|
272 | 287 | "## Training Pipeline\n",
|
273 | 288 | "### Prepare Training Data Step\n",
|
274 | 289 | "\n",
|
275 |
| - "Script to bring data into common X,y format. We need to set allow_reuse flag to False to allow the pipeline to run even when inputs don't change. We also need the name of the model to check the time the model was last trained." |
| 290 | + "Script to check if new data is available since the model was last trained. If no new data is available, we cancel the remaining pipeline steps. We need to set allow_reuse flag to False to allow the pipeline to run even when inputs don't change. We also need the name of the model to check the time the model was last trained." |
276 | 291 | ]
|
277 | 292 | },
|
278 | 293 | {
|
|
283 | 298 | "source": [
|
284 | 299 | "from azureml.pipeline.core import PipelineData\n",
|
285 | 300 | "\n",
|
286 |
| - "target_column = PipelineParameter(\"target_column\", default_value=\"y\")\n", |
287 | 301 | "# The model name with which to register the trained model in the workspace.\n",
|
288 |
| - "model_name = PipelineParameter(\"model_name\", default_value=\"y\")\n", |
289 |
| - "output_x = PipelineData(\"output_x\", datastore=dstor)\n", |
290 |
| - "output_y = PipelineData(\"output_y\", datastore=dstor)" |
| 302 | + "model_name = PipelineParameter(\"model_name\", default_value=\"noaaweatherds\")" |
291 | 303 | ]
|
292 | 304 | },
|
293 | 305 | {
|
|
299 | 311 | "data_prep_step = PythonScriptStep(script_name=\"check_data.py\", \n",
|
300 | 312 | " allow_reuse=False,\n",
|
301 | 313 | " name=\"check_data\",\n",
|
302 |
| - " arguments=[\"--target_column\", target_column,\n", |
303 |
| - " \"--output_x\", output_x,\n", |
304 |
| - " \"--output_y\", output_y,\n", |
305 |
| - " \"--ds_name\", ds_name,\n", |
306 |
| - " \"--model_name\", model_name],\n", |
307 |
| - " outputs=[output_x, output_y], \n", |
| 314 | + " arguments=[\"--ds_name\", ds_name,\n", |
| 315 | + " \"--model_name\", model_name],\n", |
308 | 316 | " compute_target=compute_target, \n",
|
309 | 317 | " runconfig=conda_run_config)"
|
310 | 318 | ]
|
311 | 319 | },
|
| 320 | + { |
| 321 | + "cell_type": "code", |
| 322 | + "execution_count": null, |
| 323 | + "metadata": {}, |
| 324 | + "outputs": [], |
| 325 | + "source": [ |
| 326 | + "from azureml.core import Dataset\n", |
| 327 | + "train_ds = Dataset.get_by_name(ws, dataset)\n", |
| 328 | + "train_ds = train_ds.drop_columns([\"partition_date\"])" |
| 329 | + ] |
| 330 | + }, |
312 | 331 | {
|
313 | 332 | "cell_type": "markdown",
|
314 | 333 | "metadata": {},
|
|
324 | 343 | "outputs": [],
|
325 | 344 | "source": [
|
326 | 345 | "from azureml.train.automl import AutoMLConfig\n",
|
327 |
| - "from azureml.train.automl.runtime import AutoMLStep\n", |
| 346 | + "from azureml.train.automl import AutoMLStep\n", |
328 | 347 | "\n",
|
329 | 348 | "automl_settings = {\n",
|
330 |
| - " \"iteration_timeout_minutes\": 20,\n", |
331 |
| - " \"experiment_timeout_minutes\": 30,\n", |
| 349 | + " \"iteration_timeout_minutes\": 10,\n", |
| 350 | + " \"experiment_timeout_minutes\": 10,\n", |
332 | 351 | " \"n_cross_validations\": 3,\n",
|
333 | 352 | " \"primary_metric\": 'r2_score',\n",
|
334 | 353 | " \"preprocess\": True,\n",
|
|
342 | 361 | " debug_log = 'automl_errors.log',\n",
|
343 | 362 | " path = \".\",\n",
|
344 | 363 | " compute_target=compute_target,\n",
|
345 |
| - " run_configuration=conda_run_config,\n", |
346 |
| - " data_script = \"get_data.py\",\n", |
| 364 | + " training_data = train_ds,\n", |
| 365 | + " label_column_name = target_column_name,\n", |
347 | 366 | " **automl_settings\n",
|
348 | 367 | " )"
|
349 | 368 | ]
|
|
378 | 397 | "automl_step = AutoMLStep(\n",
|
379 | 398 | " name='automl_module',\n",
|
380 | 399 | " automl_config=automl_config,\n",
|
381 |
| - " inputs=[output_x, output_y],\n", |
382 | 400 | " outputs=[metirics_data, model_data],\n",
|
383 | 401 | " allow_reuse=False)"
|
384 | 402 | ]
|
|
432 | 450 | "outputs": [],
|
433 | 451 | "source": [
|
434 | 452 | "training_pipeline_run = experiment.submit(training_pipeline, pipeline_parameters={\n",
|
435 |
| - " \"target_column\": \"temperature\", \"ds_name\": dataset, \"model_name\": \"noaaweatherds\"})" |
| 453 | + " \"ds_name\": dataset, \"model_name\": \"noaaweatherds\"})" |
436 | 454 | ]
|
437 | 455 | },
|
438 | 456 | {
|
|
475 | 493 | "source": [
|
476 | 494 | "from azureml.pipeline.core import Schedule\n",
|
477 | 495 | "schedule = Schedule.create(workspace=ws, name=\"RetrainingSchedule\",\n",
|
478 |
| - " pipeline_parameters={\"target_column\": \"temperature\",\"ds_name\": dataset, \"model_name\": \"noaaweatherds\"},\n", |
| 496 | + " pipeline_parameters={\"ds_name\": dataset, \"model_name\": \"noaaweatherds\"},\n", |
479 | 497 | " pipeline_id=published_pipeline.id, \n",
|
480 | 498 | " experiment_name=experiment_name, \n",
|
481 | 499 | " datastore=dstor,\n",
|
|
0 commit comments