As data engineers, we often face the challenge of orchestrating complex workflows. In this article, I’ll share insights from a recent project where I built a simple workflow orchestrator in Python.

Understanding Workflow Orchestration

Before diving into the implementation, let’s explore what makes a good workflow orchestration system. Modern data pipelines often require sophisticated scheduling, dependency management, and error handling capabilities.

Key Components of Apache Airflow

Apache Airflow has become the industry standard for workflow orchestration. Let’s examine its architecture to understand what makes an effective orchestration system:

  1. DAGs (Directed Acyclic Graphs): The fundamental concept in Airflow. DAGs define the structure, dependencies, and scheduling of tasks. Each task within a DAG can have retry logic and failure handling configured separately.

  2. Web Server: Provides a user interface for visualizing DAGs, monitoring execution, and debugging workflows. This component is crucial for operational visibility.

  3. Scheduler: Continuously analyzes DAGs to determine which tasks should be executed, considering dependencies, schedules, and preconditions.

  4. Metadata Database: Stores DAG states, configurations, execution results, and metadata. Typically implemented using PostgreSQL or MySQL in production environments.

  5. Workers: Execute the assigned tasks. Each worker can be configured to run multiple tasks in parallel depending on the executor configuration.

  6. Executors: Responsible for distributing tasks to workers. Airflow offers several executor types:

    • SequentialExecutor: Executes tasks sequentially, suitable for development
    • LocalExecutor: Executes tasks in parallel on a single machine
    • CeleryExecutor: Distributes tasks across multiple workers using a broker/message queue, typically Redis or RabbitMQ.
    • KubernetesExecutor: Executes each task in a separate Kubernetes pod

When designing an orchestration system, considering these components helps ensure scalability, reliability, and operational effectiveness.

Our Architecture Overview

The system works as follows:

  • The Scheduler maintains a list of workflows
  • Each workflow contains sequential tasks and a scheduled execution time
  • When a workflow’s execution time arrives, the Scheduler sends tasks to the Worker
  • The Worker executes each task and returns results to the Scheduler
  • Results from each task become inputs to subsequent tasks

The Scheduler Component

class Workflow:
    def __init__(
        self,
        tasks: List[Callable],
        start_time: datetime,
        name: str,
        conf: any
    ):
        self.tasks = tasks
        self.start_time = start_time
        self.name = name
        self.conf = conf
        self.current_task_index = 0
        self.last_result = None
 
class Scheduler:
    def __init__(self, workflows: List[Workflow]):
        self.workflows = workflows
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.REQ)
        self.socket.connect("tcp://localhost:5555")
 
    def should_execute(self, workflow: Workflow) -> bool:
        return datetime.now() >= workflow.start_time
 
    def send_task(self, func: Callable, arg: any) -> any:
        message = pickle.dumps((func, arg))
        self.socket.send(message)
        result = pickle.loads(self.socket.recv())
        return result
 
    def process_workflow(self, workflow: Workflow):
        print(f"Processing workflow: {workflow.name}")
        
        while workflow.current_task_index < len(workflow.tasks):
            current_task = workflow.tasks[workflow.current_task_index]
            
            if workflow.current_task_index == 0:
                input_data = workflow.conf
            else:
                input_data = workflow.last_result
 
            result = self.send_task(current_task, input_data)
            workflow.last_result = result
            workflow.current_task_index += 1
 
        print(f"Workflow {workflow.name} completed. Final result:", workflow.last_result)
        return workflow.last_result
 
    def run(self):
        print("Scheduler started...")
        
        while self.workflows:
            remaining_workflows = []
            
            for workflow in self.workflows:
                if self.should_execute(workflow):
                    self.process_workflow(workflow)
                else:
                    remaining_workflows.append(workflow)
            
            self.workflows = remaining_workflows
            
            if self.workflows:
                time.sleep(1)
 
        print("All workflows completed")

The Worker Component

class Worker:
    def __init__(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.REP)
        self.socket.bind("tcp://*:5555")
 
    def execute_task(self, task: Callable, arg: Any) -> Any:
        try:
            result = task(arg)
            return result
        except Exception as e:
            return f"Error executing task: {str(e)}"
 
    def run(self):
        print("Worker started, waiting for tasks...")
        
        while True:
            message = self.socket.recv()
            task, arg = pickle.loads(message)
            
            print(f"Executing task: {task.__name__}")
            result = self.execute_task(task, arg)
            
            self.socket.send(pickle.dumps(result))

Using the Orchestrator

Here’s how you can use this orchestrator for a simple data processing workflow:

# Define your workflow
workflow_employee_analysis = Workflow(
    tasks=[
        read_csv_file,          # Task 1
        filter_departments,     # Task 2
        compute_average_salary  # Task 3
    ],
    start_time=datetime.now() + timedelta(seconds=2),
    name="employee_salary_analysis",
    conf="employee_data.csv"
)
 
# Start the worker in a separate process
# In another terminal: python worker.py
 
# Run the scheduler
scheduler = Scheduler([workflow_employee_analysis])
scheduler.run()

This creates a workflow that:

  1. Reads data from a CSV file
  2. Filters the data based on specific criteria
  3. Computes metrics on the filtered data

Key Design Considerations

This simple orchestrator demonstrates several important design principles:

  1. Separation of concerns: The scheduler manages workflow timing and sequencing, while the worker focuses solely on task execution.

  2. Asynchronous communication: ZeroMQ provides a reliable messaging layer between components.

  3. Sequential task execution: Results from each task feed into subsequent tasks, enabling data transformations.

  4. Fault isolation: Worker failures don’t affect the scheduler, enhancing system resilience.

  5. Extensibility: New workflows can be easily added without modifying the core system.

Improvements for Production Use

While our orchestrator serves as a good learning example, a production-ready system would need additional features:

  1. Persistent storage: Task states and results should be stored in a database
  2. Retry mechanism: Failed tasks should be retried with exponential backoff
  3. Monitoring and logging: Comprehensive observability for troubleshooting
  4. Parallel execution: Ability to run multiple workflows simultaneously
  5. Authentication and authorization: Secure access to workflow management
  6. Web interface: Visual management of workflows and execution history

Conclusion

Building a workflow orchestrator helps us understand the core principles behind tools like Apache Airflow, Luigi, Prefect… While these production-grade systems offer more features, the fundamental concepts remain the same.

By implementing our own orchestrator, we gain deeper insights into workflow management, task execution, and system design, all valuable skills for a data engineer. However, for production use, I recommend considering an existing orchestration framework.

The next time you find yourself repeating a complex sequence of data processing steps, consider how you might orchestrate those tasks more efficiently. Whether you use an existing tool or build a custom solution, understanding the core principles of workflow orchestration will serve you well in your data engineering journey.

You can find all the code in this public repository:

https://github.com/p-munhoz/simple-python-orchestrator