Bruno Arine

Using Pydantic objects with Azure Durable Function triggers

Azure’s Durable Functions are asynchronous, scalable cousins of standard Azure Functions. They’re basically composed of a main orchestrator function (though you can also work with multiple sub-orchestrators), and a series of activity triggers, which are subfunctions to be launched asynchronously by the orchestrator.

Both activity triggers and orchestrator functions run in virtual instances that communicate with each other via REST API. So, in theory, the input and output of all these functions should be JSON serializable according to the Azure documentation.

But looking into the Python implementation of the Azure Durable Function on Github, I just found out these functions work with any Python object provided it has two methods implemented: from_json and to_json.

I already use Pydantic extensively in my REST APIs because of the convenient data validation checks. So I thought it would be a good idea to use it with Durable function as well. And the bonus of working with data class-like objects rather than dictionaries is that the IDE can auto-complete argument names and point out typos.

So, let’s adapt the example in the Azure documentation. First, we need to create a Model class that extends Pydantic’s BaseModel with the two methods that Azure expects.

from pydantic import BaseModel
import json


class Model(BaseModel):
    def to_json(self):
        return self.model_dump_json()

    @classmethod
    def from_json(cls, obj):
        return cls(**json.loads(obj))

Next, we create a custom request and response model for our activity.

class HelloRequest(Model):
    city: str

class HelloResponse(Model):
    message: str

And presto! The function below will run just like with a dict or str

@myApp.activity_trigger(input_name="request")
def hello(request: HelloRequest):
    return HelloResponse(message=f"Hello {request.city}")
import azure.functions as func
import azure.durable_functions as df

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

# An HTTP-triggered function with a Durable Functions client binding
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
    function_name = req.route_params.get('functionName')
    instance_id = await client.start_new(function_name)
    response = client.create_check_status_response(req, instance_id)
    return response

# Orchestrator - coordinates the activity functions using Pydantic models for type safety
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    result1 = yield context.call_activity(hello, HelloRequest(city="Seattle"))
    result2 = yield context.call_activity(hello, HelloRequest(city="Tokyo"))
    result3 = yield context.call_activity(hello, HelloRequest(city="London"))

    return [result1, result2, result3]

# Activity - receives and returns Pydantic models instead of dictionaries
@myApp.activity_trigger(input_name="request")
def hello(request: HelloRequest):
    return HelloResponse(message=f"Hello {request.city}")

This is, of course, a simple demonstration of the technique. In this example it may not seem useful to replace a simple string with a Pydantic model object. But it’s especially useful when your activity triggers take a lot of different values that have to be packed into a dictionary.

Related Posts