본문 바로가기
개인 공부 일지/데이터 엔지니어링

Workflow Orchestration (1) - Airflow

by 대체로 무해함 2026. 1. 3.

 워크플로 오케스트레이션에 대해 학습이 필요하다는 생각이 들었고, 단순히 Airflow면 오케이라는 생각이었다. 막상 시작하려니 Airflow만 있는 것은 당연히 아니었기에 이왕 공부하는 김에 연관 있는 툴에 대해 관심이 생겼다. 그래서 이번엔 워크플로 오케스트레이션들에 대해 학습을 순차적으로 해보기로 했다. 그렇지만 실제 사용 방법이나 호출 방법에 대한 학습은 프로젝트 등을 진행하면서 전부 공부하는 것이 좋다고 생각해, 우선 전체적인 개념이나 아키텍처 위주로 공부할 생각에 있다.

Airflow란?

 Apache Airflow는 오픈소스 워크플로 관리 플랫폼으로 데이터 처리 작업을 자동화하고 관리하는 도구. 프로그래밍 방식으로 데이터 파이프라인을 작성, 스케줄링, 모니터링할 수 있다. 주로 ETL이나 ELT 방식의 데이터 파이프라인에 활용되며, 배치 처리 환경뿐만 아니라 실시간 처리 환경에서도 활용할 수 있다. 이로 인해서 데이터 웨어하우스 테이블 새로 고침, 품질 검사 자동화, ML 정기 학습 등 다양한 분야에 활용할 수 있다.

 

 Airflow는 DAG를 통해 데이터 처리 작업을 정의하고 데이터 파이프라인 및 워크플로의 실행 순서를 지정한다. 여러 DAG가 서로 의존하는 경우 종속성 그래프를 통해 관계를 명확히 할 수 있으며, 데이터 파이프라인 문제를 식별하고 해결할 수 있다.

DAG(Directed Acyclic Graph)?

 DAG(방향성 비순환 그래프)는 노드가 순환을 발생시키지 않는 단방향 그래프를 말한다. 데이터, 작업 또는 이벤트를 나타내는 노드 간 관계를 시각화할 수 있어, 목표를 위해 작업 일정과 같이 특정 순서로 발생하는 시스템을 묘사하는데 활용된다.

 

 대표적으로 ETL(수집, 변환, 적재)의 파이프 라인을 DAG로 표현하면 아래와 같은 그래프를 만들 수 있다. 좀 더 절차가 복잡해지더라도 DAG로 표현함으로써 선행되어야 하는 작업이 무엇인지, 어떤 순서로 전체 절차가 진행되는지 파악하기 쉬워진다.

DAG 예시

 

 그래프를 통해 명확하게 선행되는 작업을 정의할 수 있으므로 여러 작업에 대해 독립적으로 작업을 진행할 수 있다. 그래서 Airflow는 Task에 대해 병렬로 진행이 가능하다는 특징이 있다.

Airflow를 활용하기 좋은 환경이란?

 Airflow는 복잡하고 반복적인 파이프라인을 쉽게 정의하는데 효과적이며, 웹 인터페이스를 통해 수동으로 트리거하거나, 로그를 조사할 수 있다. 시스템을 묘사하는 DAG는 멱등성을 지키도록 지향하며, 이를 통해 DAG를 여러 번 실행하거나, Task에 대해 Backfill 또는 실패한 Task만 재실행할 수 있다. 이런 부분을 통해 Airflow는 버전 관리 등을 활용할 수 있으며, 협업에 효과적인 도구로 활용할 수 있다.

 

 다만 배치 지향적인 워크플로를 목적으로 설계된 만큼 Stream 서비스나 지속적인 이벤트를 기반으로 하는 작업엔 적합하지 않다. 그러므로 Apache Kafka와 외부 스트리밍 서비스를 통해 Stream 처리에 대한 역할을 위임하는 것이 더 좋은 방향이다.

설치 방법

pip install apache-airflow-task-sdk

Airflow 필수 구성 요소

Scheduler

 Airflow 스케줄러는 모든 Task와 DAG를 모니터링한 후, 종속 작업이 완료되면 Task 인스턴스를 실행하는 역할을 수행한다. 내부적으로 서브 프로세스를 실행하며, 지정된 DAG 디렉토리에 있는 모든 DAG를 모니터링하고 동기화한다. 기본적으로 1분마다 DAG 파싱 결과를 수집하고, 실행 가능한 Task가 활성화되어 있는지 확인한다.

 

스케줄러를 시작하려면 다음 명령을 실행하면 된다

airflow scheduler

 

 cron 또는 timedelta 스케줄이 있는 DAG의 경우, 지정된 기간이 끝날 때까지 작업을 트리거하지 않는다. 그래서 @daily Task는 하루 늦게 실행하는 것 같을 수 있지만, 지정된 기간이 지나지 않아서 밀리는 것이다.

Executor

 Task 인스턴스가 실행되는 매커니즘. 별도의 구성요소가 아닌 스케줄러의 구성 속성으로 스케줄러 프로세스 내에서 실행된다. 공통 API를 가지고 있어, 설치 환경에 따라 이를 교체할 수 있다.

[core]
executor = KubernetesExecutor

////////////////

[core]
executor = my.custom.executor.module.ExecutorClass

 

 [core] 섹션에 있는 옵션으로 설정되며, 내장 실행기의 경우 이름으로 설정할 수 있다. 사용자 지정 실행기를 구성할 경우, 파이썬 클래스의 모듈 경로를 넣어서 구성할 수 있다.

Dag Processor

Dag Processor Flow

 

 DAG 파일을 파싱하여 메타데이터 데이터베이스에 직렬화하는 구성 요소. 2가지 프로세스로 구성된다.

  • DagFileProcessorManager : 처리해야 할 파일을 결정하는 무한루프를 실행하는 프로세스
  • DagFileProcessorProcess : 개별 파일 하나 이상의 Dag 객체로 변환하기 위해 시작되는 프로세스

Web Server

 DAG 및 Task의 동작을 검사, 실행 및 디버깅할 수 있는 사용자 UI를 제공하는 웹 서버

Folder of Dag files

 스케줄러가 실행할 작업과 실행 시기를 결정하기 위해 읽는 Dag 파일이 담긴 폴더

Metadata Database

 일반적으로 PostgreSQL 또는 MySQL을 사용하며, 작업, DAG 및 변수의 상태를 저장한다


Airflow 구성 요소 배포

 파이썬 환경에서 패키지 형태로 설치할 수 있고, 스케줄러와 웹 서버를 통해 단일 머신에서 실행할 수 있다. 분산 환경 위에서도 실행될 수 있으며, 서로 다른 머신에서 서로 다른 보안 경계를 가지고 실행될 수 있다.

 

 구성 요소를 분리하고, 구성 요소를 서로 격리하여 서로 다른 작업을 수행할 수 있도록 해서 보안을 강화할 수 있다. 예를 들어서 Dag 프로세서와 스케줄러를 분리하면 스케줄러는 Dag 파일에 접근하지 못하는 방식으로 구현될 수 있다.


아키텍처

Airflow 기본 배포

단일 머신 아키텍처

 

 가장 간단한 배포 방식으로 단일 장치에서 운영 및 관리되는 아키텍처. 스케줄러와 워커, 웹 서버가 동일한 Python 프로세스에서 실행되고, 스케줄러는 Dag 파일을 로컬 파일 시스템에서 읽어오는 형태가 된다.

분산형 아키텍처

분산형 아키텍처

 

 여러 사용자 역할을 나눠 사용할 수 있으며, 보안을 위해 다른 인스턴스에 구성 요소를 나눠서 배포할 수 있다. 단, DAG 파일은 모든 구성 요소 간에 동기화되어야 하고 다양한 매커니즘을 통해 동기화한다.


Workloads

Dags

워크 플로를 실행하는데 필요한 모든 것을 캡슐화한 모델로 다음의 내용을 포함한다.

  • Schedule : 워크플로가 실행되는 시점
  • Tasks : 워커가 수행할 수 있는 개별적인 작업 단위
  • Task Dependencies : 작업이 실행되는 순서 및 조건
  • Callbacks : 전체 워크플로가 완료될 때 수행되는 작업
  • Additional Parameters : 추가적인 여러 운영 세부 정보

 DAG를 선언하려면 다음의 방법을 사용할 수 있다.

 

1. with 컨텍스트를 사용하는 방법

 import datetime

 from airflow.sdk import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator

 with DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 ):
     EmptyOperator(task_id="task")

 

2. 생성자를 사용하여 연산자에 전달하는 방법

 import datetime

 from airflow.sdk import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator

 my_dag = DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 )
 EmptyOperator(task_id="task", dag=my_dag)

 

3. @dag 데코레이터를 사용하여 Dag generator를 만드는 방법

import datetime

from airflow.sdk import dag
from airflow.providers.standard.operators.empty import EmptyOperator


@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")


generate_dag()

 

 DAG는 일련의 Task를 실행하는 방식으로 구성되며, 보통 3가지 유형의 Task를 사용한다.

Operators

 DAG의 대부분의 구성 요소를 빠르게 구축할 수 있도록 하는 미리 정의된 작업으로 DAG 내에 선언적으로 정의할 수 있다.

with DAG("my-dag") as dag:
    ping = HttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="admin@example.com", subject="Update complete")

    ping >> email

 

 대표적인 Operator는 아래와 같은 유형들이 있으며, 이외에도 커뮤니티 등을 통해 많은 Operator를 사용할 수 있다

  • BashOperator : 명령어 실행 클래스
  • PythonOperator : 파이썬 코드 실행 클래스(@task 데코레이터를 사용하여 임의의 파이썬 함수를 실행 가능(=TaskFlow) )
  • PostgresOperaotr : PostgreSQL 작업 클래스

Sensors

 외부 이벤트 발생을 기다리는 것에 초점을 맞춘 Operator의 하위 클래스. 주로 유휴 상태이기 때문에 2가지 동작 모드를 제공하여 사용 효율을 더 높일 수 있다.

  • poke (기본값) : 실행 시간 동안 Worker 슬롯 하나를 차지하는 유형
  • reschedule : Sensor가 확인 중에만 Worker 슬롯을 사용하며, 그 사이엔 설정된 시간 동안 대기 시간을 가지는 유형

 두 동작 모드의 절충점으로 지연 시간을 사용할 수 있으며, 보통 매초 확인이 필요한 경우 poke 모드를, 매분 확인이 필요한 경우 reschedule 모드를 추천한다.

TaskFlow

 Operator 대신 일반 파이썬 코드를 통해 DAG를 작성하는 경우 TaskFlow API의 @task 데코레이터를 사용할 수 있다. Dag 파일에서 TaskFlow 함수를 호출하면 함수를 실행하는 대신 결과에 대한 XCom을 나타내는 객체(XComArg)를 얻게 되고, 객체를 하위 작업이나 연산자의 입력으로 사용할 수 있다.

from airflow.sdk import task
from airflow.providers.smtp.operators.smtp import EmailOperator

@task
def get_ip():
    return my_ip_service.get_main_ip()

@task(multiple_outputs=True)
def compose_email(external_ip):
    return {
        'subject':f'Server connected from {external_ip}',
        'body': f'Your server executing Airflow is connected from the external IP {external_ip}<br>'
    }

email_info = compose_email(get_ip())

EmailOperator(
    task_id='send_email_notification',
    to='example@example.com',
    subject=email_info['subject'],
    html_content=email_info['body']
)

 

 예를 들어, 위와 같은 코드에서 send_email_notification은 EmailOperator로, Operator이지만 반환값을 사용하여 TaskFlow로 선언된 compose_email의 하위에 위치함을 파악할 수 있다.

 

 뿐만 아니라 TaskFlow 함수를 호출할 때, 일반 값이나 변수를 사용할 수 있으며, 키워드 인수를 통해 Airflow 컨텍스트 변수에도 접근이 가능하다. **kwargs 또는 Task의 시그니처를 추가하면 모든 Airflow 컨텍스트 변수를 kwargs 딕셔너리에 사용할 수 있다.


Control Flow

 DAG는 여러 번 실행되도록 설계되었으며, 여러 DAG가 병렬적으로 실행될 수 있다. Task들은 서로의 의존성을 선언할 수 있는데, >> 연산자 혹은 << 연산자나 set_upstream, set_downstream 메소드를 통해 나타낼 수 있다.

first_task >> [second_task, third_task]
fourth_task << third_task

//////////////////////////////////////////

first_task.set_downstream([second_task, third_task])
fourth_task.set_upstream(third_task)

 

 이렇게 설정한 종속성은 그래프의 Edge 구성하며, Airflow가 작업을 실행하는 순서를 결정하는 기준이 된다. 기본적으로 작업은 상위 작업이 완료되길 기다리며, Branching, LastestOnly, Trigger Rules와 같은 기능을 사용하여 사용자가 순서를 지정할 수 있다.

 

 Task 간에 데이터를 전달하는 것은 다음의 3가지 방법을 사용한다.

  • XComs : Task에서 작은 메타 데이터 조각을 주고받을 수 있는 시스템
  • Storage Service : 스토리지에 대용량 파일을 업로드하고 다운로드하여 주고받는 방식
  • TaskFlow API : TaskFlow API는 암시적 XCom을 사용하여 작업 간 데이터를 자동으로 전달

 단, 각 Task는 Airflow에서 Worker의 여유 공간 발생한 경우 전송하므로, DAG의 모든 작업이 동일한 Worker 또는 동일 장치에서 실행된다는 보장은 가질 수 없다.


참고

https://yozm.wishket.com/magazine/detail/2874/

 

데이터 엔지니어링 핵심 도구 ‘Apache Airflow’ 개념 정리 | 요즘IT

오늘은 데이터 엔지니어링의 핵심 도구인 ‘Apache Airflow’에 대해 이야기해 보려고 합니다. 에어플로우는 데이터 엔지니어(DE)와 데이터 애널리틱스 엔지니어(DAE)라면 반드시 알아야 하는 툴 중

yozm.wishket.com

https://airflow.apache.org/docs/apache-airflow/stable/index.html

 

What is Airflow®? — Airflow 3.1.5 Documentation

 

airflow.apache.org

https://www.ibm.com/kr-ko/think/topics/directed-acyclic-graph

 

방향성 비순환 그래프(DAG)란 무엇인가요? | IBM

방향성 비순환 그래프(DAG)는 노드가 주기를 형성하지 않는 단방향 연결로 연결된 그래프 유형입니다.

www.ibm.com