Timer-triggered orchestration instances purge on Azure
In one of my recent projects, I used Azure Durable Functions extensively to orchestrate multi-stage, asynchronous machine learning inference requests. It worked like a charm in spite of Azure’s misleading and/or outdated documentation on the Python SDK of their own product.
But then at a certain point of the project, I had to implement a timer-triggered orchestration history purge to cut down on storage costs, since as of this date, Azure keeps all data about the orchestrations forever. That’s when Azure’s poor documentation struck again.
Azure provides an example on how to do what I wanted to do on this section about durable functions instance management. Here’s the code snippet they use for a timer-triggered purge function:
import azure.functions as func
import azure.durable_functions as df
from azure.durable_functions.models.DurableOrchestrationStatus import OrchestrationRuntimeStatus
from datetime import datetime, timedelta
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
created_time_from = datetime.min
created_time_to = datetime.today() + timedelta(days = -30)
runtime_statuses = [OrchestrationRuntimeStatus.Completed]
return await client.purge_instance_history_by(created_time_from, created_time_to, runtime_statuses)
The first thing to be noticed is that all the article’s examples, including the one above, use Python SDK v1, whereas v2 is the recommended version by Azure itself (and used everywhere else in the docs). But worst of all, there are no timer trigger bindings anywhere in the snippet!
The solution
Here’s how I ended up scheduling orchestration history purges on Azure using the Python SDK v2 (please adjust it to your own needs):
import azure.functions as func
import azure.durable_functions as df
from azure.durable_functions.models.DurableOrchestrationStatus import OrchestrationRuntimeStatus
from datetime import datetime, timedelta, timezone
import logging
app = df.DFApp(http_auth_level=func.AuthLevel.FUNCTION)
@app.function_name(name="PurgeOrchestrationHistory")
@app.schedule(schedule="0 0 5 * * *", arg_name="timer", run_on_startup=True, use_monitor=False)
@app.durable_client_input(client_name="purgeClient")
async def purge_orchestration_history(timer: func.TimerRequest, purgeClient) -> None:
logging.info("Purging finished instances...")
created_time_from = datetime.now(timezone.utc) + timedelta(days=-3650)
created_time_to = datetime.now(timezone.utc) + timedelta(days = -5)
runtime_statuses = [OrchestrationRuntimeStatus.Completed]
response = await purgeClient.purge_instance_history_by(created_time_from, created_time_to, runtime_statuses)
logging.info(f"Success! Number of finished purged instances: {response.instances_deleted}")