Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/tutorials.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Tutorials
tutorials/6-makers
tutorials/7-generalized-makers
tutorials/8-fireworks
tutorials/9-pydantic-validation

.. Note::
`@jageo <https://github.com/JaGeo>`_ also has a set of `Jobflow tutorials <https://jageo.github.io/Advanced_Jobflow_Tutorial/intro.html>`_ written within the context of computational materials science applications, which you may wish to check out after exploring the basics here.
361 changes: 361 additions & 0 deletions docs/tutorials/9-pydantic-validation.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,361 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "ba4737bb",
"metadata": {},
"source": [
"## Pydantic validation\n",
"\n",
"When building computational workflows with jobflow, you're often chaining together multiple jobs that pass data between each other. Without proper validation, several problems can occur:\n",
"\n",
"1. Silent Failures: A job might produce output in an unexpected format, causing downstream jobs to fail with cryptic error messages or produce incorrect results without warning.\n",
"\n",
"2. Missing Required Data: Without data validation jobs are not required to include a critical field in its output, and the error only appears several steps later in the workflow.\n",
"\n",
"3. Documentation Drift: Without enforced schemas, it's unclear what data structure each job expects or produces, making workflows harder to understand and maintain.\n",
"\n",
"Pydantic provides powerful data validation and settings management using Python type annotations. It allows users to define explicit schemas for job inputs and outputs using Python type hints, catch errors early at the job boundaries rather than deep in your workflow, auto-generate documentation of your data structures, ensure data consistency across complex, multi-step workflows, and validate at runtime with clear, informative error messages. For example this is used in `atomate2` to validate the outputs of computational tasks.\n",
"\n",
"In the example below, we define a simple Pydantic model to validate that the output of a job is a float."
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "e38a453e",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2025-10-27 12:32:49,377 INFO Started executing jobs locally\n",
"2025-10-27 12:32:49,380 INFO Starting job - add (8ae64cc3-2da4-4914-b967-3c45e808d7d5)\n",
"2025-10-27 12:32:49,381 INFO Finished job - add (8ae64cc3-2da4-4914-b967-3c45e808d7d5)\n",
"2025-10-27 12:32:49,383 INFO Finished executing jobs locally\n",
"{'8ae64cc3-2da4-4914-b967-3c45e808d7d5': {1: Response(output=FloatValidator(result=3.0), detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=PosixPath('/X/jobflow/docs/tutorials'))}}\n"
]
}
],
"source": [
"from pydantic import BaseModel, Field\n",
"\n",
"from jobflow import job, run_locally\n",
"from jobflow.core.job import apply_schema\n",
"\n",
"\n",
"class FloatValidator(BaseModel):\n",
" result: float = Field(..., description=\"The resulting float value\")\n",
"\n",
"\n",
"@job\n",
"def add(a, b):\n",
" return FloatValidator(result=a + b)\n",
"\n",
"\n",
"job_1 = add(1, 2)\n",
"response = run_locally(job_1)\n",
"\n",
"print(response)"
]
},
{
"cell_type": "markdown",
"id": "46f3ae65",
"metadata": {},
"source": [
"Or equivalently, we can use the `output_schema` parameter in the `job` decorator. In which case the results of the job can be returned as a dictionary and will be validated against the schema."
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "cc6ce11d",
"metadata": {},
"outputs": [],
"source": [
"@job(output_schema=FloatValidator)\n",
"def add(a, b):\n",
" return {\"result\": a + b}\n",
"\n",
"\n",
"# Or equivalently:\n",
"\n",
"\n",
"@job\n",
"def _add(a, b):\n",
" return apply_schema({\"result\": a + b}, FloatValidator)"
]
},
{
"cell_type": "markdown",
"id": "83ab2d85",
"metadata": {},
"source": [
"If the output does not conform to the schema, an error will be raised:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "8ef986e9",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2025-10-20 14:20:52,616 INFO Started executing jobs locally\n",
"2025-10-20 14:20:52,618 INFO Starting job - invalid_add (a22f7c1f-a80e-404f-9192-302487b4eaf7)\n",
"2025-10-20 14:20:52,621 INFO invalid_add failed with exception:\n",
"Traceback (most recent call last):\n",
" File \"/X/jobflow/src/jobflow/managers/local.py\", line 117, in _run_job\n",
" response = job.run(store=store)\n",
" File \"/X/jobflow/src/jobflow/core/job.py\", line 604, in run\n",
" response = function(*self.function_args, **self.function_kwargs)\n",
" File \"/var/folders/zh/3748r38115qb94_pvwg0cc6m0000gn/T/ipykernel_33718/2862673370.py\", line 3, in invalid_add\n",
" return FloatValidator(result={\"invalid_result\": a + b})\n",
" File \"/opt/homebrew/Caskroom/miniforge/base/envs/jobflow/lib/python3.14/site-packages/pydantic/main.py\", line 250, in __init__\n",
" validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)\n",
"pydantic_core._pydantic_core.ValidationError: 1 validation error for FloatValidator\n",
"result\n",
" Input should be a valid number [type=float_type, input_value={'invalid_result': 3}, input_type=dict]\n",
" For further information visit https://errors.pydantic.dev/2.12/v/float_type\n",
"\n",
"2025-10-20 14:20:52,622 INFO Finished executing jobs locally\n"
]
}
],
"source": [
"@job\n",
"def invalid_add(a, b):\n",
" return FloatValidator(result={\"invalid_result\": a + b})\n",
"\n",
"\n",
"invalid_job = invalid_add(1, 2)\n",
"\n",
"response_invalid = run_locally(invalid_job)"
]
},
{
"cell_type": "markdown",
"id": "e83cae72",
"metadata": {},
"source": [
"Similarly, it is possible to define input schemas using Pydantic models to validate the inputs of your jobs. This ensures that the data being processed meets the expected format and constraints."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6f7da2af",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2025-10-19 14:07:28,934 INFO Started executing jobs locally\n",
"2025-10-19 14:07:28,935 INFO Starting job - validated_add (8fd24cd1-82ff-42ac-8bcf-d75504818c71)\n",
"2025-10-19 14:07:28,936 INFO Finished job - validated_add (8fd24cd1-82ff-42ac-8bcf-d75504818c71)\n",
"2025-10-19 14:07:28,937 INFO Finished executing jobs locally\n",
"{'8fd24cd1-82ff-42ac-8bcf-d75504818c71': {1: Response(output=FloatValidator(result=7.0), detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=PosixPath('/X/docs/tutorials'))}}\n"
]
}
],
"source": [
"class InputValidator(BaseModel):\n",
" a: float = Field(..., description=\"First float value\")\n",
" b: float = Field(..., description=\"Second float value\")\n",
"\n",
"\n",
"@job\n",
"def validated_add(inputs: InputValidator):\n",
" return FloatValidator(result=inputs.a + inputs.b)\n",
"\n",
"\n",
"validated_job = validated_add(InputValidator(a=3.0, b=4.0))\n",
"validated_response = run_locally(validated_job)\n",
"\n",
"print(validated_response)"
]
},
{
"cell_type": "markdown",
"id": "1de11988",
"metadata": {},
"source": [
"If the input does not conform to the schema, an error will be raised before the job is executed."
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "8cec3c04",
"metadata": {},
"outputs": [
{
"ename": "ValidationError",
"evalue": "1 validation error for InputValidator\na\n Input should be a valid number, unable to parse string as a number [type=float_parsing, input_value='a', input_type=str]\n For further information visit https://errors.pydantic.dev/2.12/v/float_parsing",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mValidationError\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[15]\u001b[39m\u001b[32m, line 1\u001b[39m\n\u001b[32m----> \u001b[39m\u001b[32m1\u001b[39m input_invalid_job = validated_add(\u001b[43mInputValidator\u001b[49m\u001b[43m(\u001b[49m\u001b[43ma\u001b[49m\u001b[43m=\u001b[49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43ma\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mb\u001b[49m\u001b[43m=\u001b[49m\u001b[32;43m4.0\u001b[39;49m\u001b[43m)\u001b[49m)\n\u001b[32m 3\u001b[39m run_locally(input_invalid_job)\n",
"\u001b[36mFile \u001b[39m\u001b[32m/opt/homebrew/Caskroom/miniforge/base/envs/jobflow/lib/python3.14/site-packages/pydantic/main.py:250\u001b[39m, in \u001b[36mBaseModel.__init__\u001b[39m\u001b[34m(self, **data)\u001b[39m\n\u001b[32m 248\u001b[39m \u001b[38;5;66;03m# `__tracebackhide__` tells pytest and some other tools to omit this function from tracebacks\u001b[39;00m\n\u001b[32m 249\u001b[39m __tracebackhide__ = \u001b[38;5;28;01mTrue\u001b[39;00m\n\u001b[32m--> \u001b[39m\u001b[32m250\u001b[39m validated_self = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m__pydantic_validator__\u001b[49m\u001b[43m.\u001b[49m\u001b[43mvalidate_python\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdata\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mself_instance\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[32m 251\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m validated_self:\n\u001b[32m 252\u001b[39m warnings.warn(\n\u001b[32m 253\u001b[39m \u001b[33m'\u001b[39m\u001b[33mA custom validator is returning a value other than `self`.\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[33m'\u001b[39m\n\u001b[32m 254\u001b[39m \u001b[33m\"\u001b[39m\u001b[33mReturning anything other than `self` from a top level model validator isn\u001b[39m\u001b[33m'\u001b[39m\u001b[33mt supported when validating via `__init__`.\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[33m\"\u001b[39m\n\u001b[32m 255\u001b[39m \u001b[33m'\u001b[39m\u001b[33mSee the `model_validator` docs (https://docs.pydantic.dev/latest/concepts/validators/#model-validators) for more details.\u001b[39m\u001b[33m'\u001b[39m,\n\u001b[32m 256\u001b[39m stacklevel=\u001b[32m2\u001b[39m,\n\u001b[32m 257\u001b[39m )\n",
"\u001b[31mValidationError\u001b[39m: 1 validation error for InputValidator\na\n Input should be a valid number, unable to parse string as a number [type=float_parsing, input_value='a', input_type=str]\n For further information visit https://errors.pydantic.dev/2.12/v/float_parsing"
]
}
],
"source": [
"input_invalid_job = validated_add(InputValidator(a=\"a\", b=4.0))\n",
"\n",
"run_locally(input_invalid_job)"
]
},
{
"cell_type": "markdown",
"id": "035aba79",
"metadata": {},
"source": [
"By default, Pydantic models are not strict about extra fields. However, you can configure the model to forbid extra fields by setting `extra='forbid'`. This ensures that only the defined fields are accepted, and any additional fields will raise a validation error."
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "9d244220",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2025-10-19 14:36:24,898 INFO Started executing jobs locally\n",
"2025-10-19 14:36:24,900 INFO Starting job - validated_add (2b361380-3a7b-4f26-8c6e-c76c533fe66c)\n",
"2025-10-19 14:36:24,903 INFO Finished job - validated_add (2b361380-3a7b-4f26-8c6e-c76c533fe66c)\n",
"2025-10-19 14:36:24,903 INFO Finished executing jobs locally\n",
"{'2b361380-3a7b-4f26-8c6e-c76c533fe66c': {1: Response(output=OutputValidator(result=7.0), detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=PosixPath('/X/docs/tutorials'))}}\n"
]
}
],
"source": [
"class InputValidator(BaseModel):\n",
" a: float = Field(..., description=\"First float value\")\n",
" b: float = Field(..., description=\"Second float value\")\n",
"\n",
"\n",
"class OutputValidator(BaseModel, extra=\"forbid\"):\n",
" result: float = Field(..., description=\"The resulting float value\")\n",
"\n",
"\n",
"@job\n",
"def validated_add(inputs: InputValidator):\n",
" return OutputValidator(result=inputs.a + inputs.b)\n",
"\n",
"\n",
"validated_job = validated_add(InputValidator(a=3.0, b=4.0, c=5.0, d=6.0))\n",
"\n",
"validated_response = run_locally(validated_job)\n",
"\n",
"print(validated_response)"
]
},
{
"cell_type": "markdown",
"id": "0f0110c5",
"metadata": {},
"source": [
"In the code above, extra parameters in the input data are allowed by default, `c` and `d` will be ignored without raising an error.\n",
"\n",
"If instead we have an `invalid_add` that returns additional field, an error will be raised since `extra=forbid` has been specified in the `OutputValidator`:"
]
},
{
"cell_type": "markdown",
"id": "924a38e8",
"metadata": {},
"source": [
"Finally, if a field is missing from a Pydantic model, a validation error will also be raised."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c83f75c8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2025-10-19 15:02:00,197 INFO Started executing jobs locally\n",
"2025-10-19 15:02:00,199 INFO Starting job - invalid_add (d03d4dd6-a66e-4551-862d-ca6f01b2032c)\n",
"2025-10-19 15:02:00,200 INFO invalid_add failed with exception:\n",
"Traceback (most recent call last):\n",
" File \"/X/src/jobflow/managers/local.py\", line 117, in _run_job\n",
" response = job.run(store=store)\n",
" File \"/X/src/jobflow/core/job.py\", line 604, in run\n",
" response = function(*self.function_args, **self.function_kwargs)\n",
" File \"/var/folders/zh/3748r38115qb94_pvwg0cc6m0000gn/T/ipykernel_34651/2447134739.py\", line 7, in invalid_add\n",
" return OutputValidator(result=a + b)\n",
" File \"/X/pydantic/main.py\", line 250, in __init__\n",
" validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)\n",
"pydantic_core._pydantic_core.ValidationError: 1 validation error for OutputValidator\n",
"extra_field\n",
" Field required [type=missing, input_value={'result': 3}, input_type=dict]\n",
" For further information visit https://errors.pydantic.dev/2.12/v/missing\n",
"\n",
"2025-10-19 15:02:00,201 INFO Finished executing jobs locally\n"
]
},
{
"data": {
"text/plain": [
"{}"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"class MissingOutputValidator(BaseModel):\n",
" result: float = Field(..., description=\"The resulting float value\")\n",
" extra_field: float = Field(..., description=\"An extra required float value\")\n",
"\n",
"\n",
"@job\n",
"def invalid_add(a, b):\n",
" return MissingOutputValidator(result=a + b)\n",
"\n",
"\n",
"invalid_job = invalid_add(1, 2)\n",
"run_locally(invalid_job)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "jobflow",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.14.0"
}
},
"nbformat": 4,
"nbformat_minor": 5
}