As data engineers, we often face the challenge of orchestrating complex workflows. In this article, I’ll share insights from a recent project where I built a simple workflow orchestrator in Python.
Understanding Workflow Orchestration
Before diving into the implementation, let’s explore what makes a good workflow orchestration system. Modern data pipelines often require sophisticated scheduling, dependency management, and error handling capabilities.
Key Components of Apache Airflow
Apache Airflow has become the industry standard for workflow orchestration. Let’s examine its architecture to understand what makes an effective orchestration system:
-
DAGs (Directed Acyclic Graphs): The fundamental concept in Airflow. DAGs define the structure, dependencies, and scheduling of tasks. Each task within a DAG can have retry logic and failure handling configured separately.
-
Web Server: Provides a user interface for visualizing DAGs, monitoring execution, and debugging workflows. This component is crucial for operational visibility.
-
Scheduler: Continuously analyzes DAGs to determine which tasks should be executed, considering dependencies, schedules, and preconditions.
-
Metadata Database: Stores DAG states, configurations, execution results, and metadata. Typically implemented using PostgreSQL or MySQL in production environments.
-
Workers: Execute the assigned tasks. Each worker can be configured to run multiple tasks in parallel depending on the executor configuration.
-
Executors: Responsible for distributing tasks to workers. Airflow offers several executor types:
- SequentialExecutor: Executes tasks sequentially, suitable for development
- LocalExecutor: Executes tasks in parallel on a single machine
- CeleryExecutor: Distributes tasks across multiple workers using a broker/message queue, typically Redis or RabbitMQ.
- KubernetesExecutor: Executes each task in a separate Kubernetes pod
When designing an orchestration system, considering these components helps ensure scalability, reliability, and operational effectiveness.
Our Architecture Overview
The system works as follows:
- The Scheduler maintains a list of workflows
- Each workflow contains sequential tasks and a scheduled execution time
- When a workflow’s execution time arrives, the Scheduler sends tasks to the Worker
- The Worker executes each task and returns results to the Scheduler
- Results from each task become inputs to subsequent tasks
The Scheduler Component
class Workflow:
def __init__(
self,
tasks: List[Callable],
start_time: datetime,
name: str,
conf: any
):
self.tasks = tasks
self.start_time = start_time
self.name = name
self.conf = conf
self.current_task_index = 0
self.last_result = None
class Scheduler:
def __init__(self, workflows: List[Workflow]):
self.workflows = workflows
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect("tcp://localhost:5555")
def should_execute(self, workflow: Workflow) -> bool:
return datetime.now() >= workflow.start_time
def send_task(self, func: Callable, arg: any) -> any:
message = pickle.dumps((func, arg))
self.socket.send(message)
result = pickle.loads(self.socket.recv())
return result
def process_workflow(self, workflow: Workflow):
print(f"Processing workflow: {workflow.name}")
while workflow.current_task_index < len(workflow.tasks):
current_task = workflow.tasks[workflow.current_task_index]
if workflow.current_task_index == 0:
input_data = workflow.conf
else:
input_data = workflow.last_result
result = self.send_task(current_task, input_data)
workflow.last_result = result
workflow.current_task_index += 1
print(f"Workflow {workflow.name} completed. Final result:", workflow.last_result)
return workflow.last_result
def run(self):
print("Scheduler started...")
while self.workflows:
remaining_workflows = []
for workflow in self.workflows:
if self.should_execute(workflow):
self.process_workflow(workflow)
else:
remaining_workflows.append(workflow)
self.workflows = remaining_workflows
if self.workflows:
time.sleep(1)
print("All workflows completed")
The Worker Component
class Worker:
def __init__(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind("tcp://*:5555")
def execute_task(self, task: Callable, arg: Any) -> Any:
try:
result = task(arg)
return result
except Exception as e:
return f"Error executing task: {str(e)}"
def run(self):
print("Worker started, waiting for tasks...")
while True:
message = self.socket.recv()
task, arg = pickle.loads(message)
print(f"Executing task: {task.__name__}")
result = self.execute_task(task, arg)
self.socket.send(pickle.dumps(result))
Using the Orchestrator
Here’s how you can use this orchestrator for a simple data processing workflow:
# Define your workflow
workflow_employee_analysis = Workflow(
tasks=[
read_csv_file, # Task 1
filter_departments, # Task 2
compute_average_salary # Task 3
],
start_time=datetime.now() + timedelta(seconds=2),
name="employee_salary_analysis",
conf="employee_data.csv"
)
# Start the worker in a separate process
# In another terminal: python worker.py
# Run the scheduler
scheduler = Scheduler([workflow_employee_analysis])
scheduler.run()
This creates a workflow that:
- Reads data from a CSV file
- Filters the data based on specific criteria
- Computes metrics on the filtered data
Key Design Considerations
This simple orchestrator demonstrates several important design principles:
-
Separation of concerns: The scheduler manages workflow timing and sequencing, while the worker focuses solely on task execution.
-
Asynchronous communication: ZeroMQ provides a reliable messaging layer between components.
-
Sequential task execution: Results from each task feed into subsequent tasks, enabling data transformations.
-
Fault isolation: Worker failures don’t affect the scheduler, enhancing system resilience.
-
Extensibility: New workflows can be easily added without modifying the core system.
Improvements for Production Use
While our orchestrator serves as a good learning example, a production-ready system would need additional features:
- Persistent storage: Task states and results should be stored in a database
- Retry mechanism: Failed tasks should be retried with exponential backoff
- Monitoring and logging: Comprehensive observability for troubleshooting
- Parallel execution: Ability to run multiple workflows simultaneously
- Authentication and authorization: Secure access to workflow management
- Web interface: Visual management of workflows and execution history
Conclusion
Building a workflow orchestrator helps us understand the core principles behind tools like Apache Airflow, Luigi, Prefect… While these production-grade systems offer more features, the fundamental concepts remain the same.
By implementing our own orchestrator, we gain deeper insights into workflow management, task execution, and system design, all valuable skills for a data engineer. However, for production use, I recommend considering an existing orchestration framework.
The next time you find yourself repeating a complex sequence of data processing steps, consider how you might orchestrate those tasks more efficiently. Whether you use an existing tool or build a custom solution, understanding the core principles of workflow orchestration will serve you well in your data engineering journey.
You can find all the code in this public repository: