배치 처리 완벽 심화 가이드 2편: Python 배치 + Airflow 완전 정복 + 트러블슈팅

배치 처리 완벽 심화 가이드 2편: Python 배치 + Airflow 완전 정복 + 트러블슈팅

이 글은 배치 처리 심화 가이드 2부작 중 2편이다. Python 배치 처리 기법, Apache Airflow의 아키텍처와 실전 운영, Spring Batch vs Airflow 비교, 그리고 배치 장애 시나리오와 트러블슈팅을 다룬다.

  • 1편: 배치 기본 원칙 + Spring Batch 아키텍처 / 핵심 기능 / 실전 운영
  • 2편 (현재): Python 배치 + Airflow 아키텍처·운영 + 비교 분석 + 트러블슈팅

6. Python 배치 처리 기초

6.1 순수 Python 배치 (cron + script 패턴)

가장 기본적인 Python 배치는 cron으로 스크립트를 실행하는 방식이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
★ Python cron 배치 구조

crontab:
  0 2 * * * /usr/bin/python3 /app/batch/daily_settlement.py

daily_settlement.py:
  ┌─────────────────────────────────────┐
  │  1. 설정 로드 (DB 연결, 로깅)         │
  │  2. 데이터 조회 (SELECT)              │
  │  3. 처리 (집계, 변환)                 │
  │  4. 결과 저장 (INSERT/UPDATE)         │
  │  5. 알림 (Slack, Email)              │
  │  6. 정리 (커넥션 닫기)                │
  └─────────────────────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# ★ 기본 Python 배치 스크립트 구조
import logging
import sys
from datetime import date, timedelta
from contextlib import contextmanager
import psycopg2

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler(f'/var/log/batch/settlement_{date.today()}.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

@contextmanager
def get_connection():
    conn = psycopg2.connect(
        host='db-host', dbname='orders', user='batch_user', password='***'
    )
    try:
        yield conn
    finally:
        conn.close()

def run_settlement(target_date: date):
    logger.info(f"정산 배치 시작: target_date={target_date}")

    with get_connection() as conn:
        with conn.cursor() as cur:
            # 1. 데이터 조회
            cur.execute("""
                SELECT seller_id, SUM(amount) as total, COUNT(*) as cnt
                FROM orders
                WHERE order_date = %s AND status = 'COMPLETED'
                GROUP BY seller_id
            """, (target_date,))
            rows = cur.fetchall()
            logger.info(f"대상 판매자 수: {len(rows)}")

            # 2. 정산 결과 저장 (UPSERT — 멱등성)
            for seller_id, total, cnt in rows:
                fee = total * 0.03
                settle_amount = total - fee
                cur.execute("""
                    INSERT INTO settlement (seller_id, settle_date, amount, fee, settle_amount)
                    VALUES (%s, %s, %s, %s, %s)
                    ON CONFLICT (seller_id, settle_date)
                    DO UPDATE SET amount = EXCLUDED.amount,
                                  fee = EXCLUDED.fee,
                                  settle_amount = EXCLUDED.settle_amount
                """, (seller_id, target_date, total, fee, settle_amount))

            conn.commit()
            logger.info(f"정산 배치 완료: {len(rows)}건 처리")

if __name__ == '__main__':
    target = date.today() - timedelta(days=1)
    try:
        run_settlement(target)
    except Exception as e:
        logger.error(f"정산 배치 실패: {e}", exc_info=True)
        sys.exit(1)

6.2 pandas를 활용한 데이터 처리

pandas는 데이터 변환·집계에 강력하지만, 대용량에서는 메모리 관리가 핵심이다.

1
2
3
4
5
6
7
8
9
10
★ pandas 대용량 처리 — chunksize 활용

일반 방식 (위험!):
  df = pd.read_sql("SELECT * FROM orders", conn)  # 1000만 건 → OOM 💀

chunksize 방식 (안전):
  for chunk in pd.read_sql(..., chunksize=10000):  # 1만 건씩
      process(chunk)
      write(chunk)
      # 메모리: 항상 1만 건 수준 유지
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# ★ pandas chunksize를 활용한 대용량 ETL
import pandas as pd
from sqlalchemy import create_engine

source_engine = create_engine('postgresql://user:pass@source-db/orders')
target_engine = create_engine('postgresql://user:pass@target-db/analytics')

def etl_daily_orders(target_date: str):
    query = f"""
        SELECT order_id, seller_id, buyer_id, amount, category, order_date
        FROM orders WHERE order_date = '{target_date}'
    """
    total_processed = 0

    for chunk in pd.read_sql(query, source_engine, chunksize=50000):
        # Transform
        chunk['amount_krw'] = chunk['amount'].astype(float)
        chunk['fee'] = chunk['amount_krw'] * 0.03
        chunk['settle_amount'] = chunk['amount_krw'] - chunk['fee']

        result = chunk[['order_id', 'seller_id', 'amount_krw',
                        'fee', 'settle_amount', 'order_date']]

        # Load
        result.to_sql('daily_order_summary', target_engine,
                      if_exists='append', index=False, method='multi',
                      chunksize=5000)

        total_processed += len(chunk)
        logger.info(f"진행: {total_processed}건 처리 완료")

    logger.info(f"ETL 완료: 총 {total_processed}")

6.3 SQLAlchemy를 활용한 DB 배치

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# ★ SQLAlchemy Core — 벌크 UPSERT
from sqlalchemy import create_engine, text
from sqlalchemy.dialects.postgresql import insert

engine = create_engine('postgresql://user:pass@db/app', pool_size=5)

def bulk_upsert_settlements(settlements: list[dict]):
    """대량 UPSERT — PostgreSQL ON CONFLICT 활용"""
    with engine.begin() as conn:
        stmt = insert(settlement_table).values(settlements)
        upsert_stmt = stmt.on_conflict_do_update(
            index_elements=['seller_id', 'settle_date'],
            set_={
                'amount': stmt.excluded.amount,
                'fee': stmt.excluded.fee,
                'settle_amount': stmt.excluded.settle_amount,
                'updated_at': text('NOW()')
            }
        )
        result = conn.execute(upsert_stmt)
        logger.info(f"UPSERT 완료: {result.rowcount}")

def batch_process_in_chunks(query: str, chunk_size: int = 10000):
    """커서 기반 청크 처리 — 메모리 효율적"""
    with engine.connect() as conn:
        result = conn.execution_options(stream_results=True).execute(text(query))
        while True:
            rows = result.fetchmany(chunk_size)
            if not rows:
                break
            settlements = [transform_to_settlement(row) for row in rows]
            bulk_upsert_settlements(settlements)

6.4 대용량 처리 테크닉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
★ Python 대용량 처리 옵션

┌──────────────────────────────────────────────┐
│  Generator — 메모리 절약의 기본                  │
│  "한 번에 하나씩 생성, 모두 메모리에 올리지 않음"   │
├──────────────────────────────────────────────┤
│  multiprocessing — CPU-bound 병렬화            │
│  "GIL 우회, 코어 수만큼 진짜 병렬"               │
├──────────────────────────────────────────────┤
│  asyncio — I/O-bound 동시성                    │
│  "네트워크/DB 대기 시간 활용, 단일 스레드"         │
├──────────────────────────────────────────────┤
│  concurrent.futures — 간편한 병렬 처리           │
│  "ThreadPool / ProcessPool 추상화"             │
└──────────────────────────────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# ★ Generator 패턴 — 대용량 파일 처리
def read_large_csv(filepath: str, chunk_size: int = 10000):
    """메모리를 chunk_size만큼만 사용하는 Generator"""
    import csv
    with open(filepath, 'r') as f:
        reader = csv.DictReader(f)
        chunk = []
        for row in reader:
            chunk.append(row)
            if len(chunk) >= chunk_size:
                yield chunk
                chunk = []
        if chunk:
            yield chunk

for chunk in read_large_csv('/data/orders_20260413.csv'):
    process_and_save(chunk)


# ★ multiprocessing 패턴 — CPU-bound 병렬 처리
from multiprocessing import Pool

def process_partition(args):
    partition_id, start_id, end_id = args
    engine = create_engine('postgresql://...')  # 프로세스별 커넥션
    query = f"SELECT * FROM orders WHERE id BETWEEN {start_id} AND {end_id}"
    df = pd.read_sql(query, engine)
    result = heavy_computation(df)
    result.to_sql('results', engine, if_exists='append', index=False)
    return len(result)

partitions = [(i, i*100000+1, (i+1)*100000) for i in range(8)]
with Pool(processes=8) as pool:
    results = pool.map(process_partition, partitions)
    print(f"총 처리 건수: {sum(results)}")


# ★ asyncio 패턴 — I/O-bound 동시 처리 (API 호출)
import asyncio
import aiohttp

async def batch_api_calls(item_ids: list[int]):
    async with aiohttp.ClientSession() as session:
        semaphore = asyncio.Semaphore(50)  # 동시 요청 제한

        async def limited_fetch(item_id):
            async with semaphore:
                async with session.get(f"{API_URL}/items/{item_id}") as resp:
                    return await resp.json()

        results = await asyncio.gather(
            *[limited_fetch(id) for id in item_ids]
        )
        return results

6.5 Python 배치의 한계 → 오케스트레이터 필요성

1
2
3
4
5
6
7
8
9
10
★ 순수 Python 배치의 한계

  1. 의존성 관리 없음 — "Job A 완료 후 Job B 실행" → cron으로 불가
  2. 재시도/실패 복구 수동 구현 — 모든 에러 핸들링을 직접 코딩
  3. 모니터링 부재 — "어떤 Job이 언제 성공/실패했는지" 로그 뒤적
  4. 스케줄링 제약 — cron은 "실행 시각"만 설정 가능
  5. 백필(Backfill) 어려움 — 과거 날짜 재처리 시 수동 파라미터 변경

         ↓ 이 모든 문제를 해결하는 것이
           Apache Airflow

7. Apache Airflow 아키텍처 Deep Dive

7.1 Airflow란 무엇인가

Apache Airflow는 워크플로우 오케스트레이터(Workflow Orchestrator) 이다. 2014년 Airbnb의 Maxime Beauchemin이 개발했으며, 2019년 Apache Top-Level 프로젝트가 되었다.

1
2
3
4
5
6
7
8
9
10
11
12
★ Airflow의 핵심 철학

┌──────────────────────────────────────────────┐
│  "Airflow is NOT a data processing tool."     │
│  "Airflow is a WORKFLOW ORCHESTRATOR."        │
│                                               │
│  Airflow는 데이터를 직접 처리하지 않는다.         │
│  "무엇을 언제 어떤 순서로 실행할지" 관리한다.      │
│                                               │
│  실제 처리: Spark, dbt, Python 스크립트, API 등  │
│  Airflow:  실행 순서·조건·재시도·알림 관리         │
└──────────────────────────────────────────────┘
Airflow가 잘하는 것 Airflow가 적합하지 않은 것
작업 의존성/순서 관리 대용량 데이터 직접 처리
스케줄링 + 백필 실시간 스트리밍 처리
재시도 + 알림 밀리초 단위 저지연 처리
모니터링 웹 UI 무한 루프 / 상시 실행

7.2 DAG / Task / Operator 구조

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
★ Airflow의 핵심 구조

DAG (Directed Acyclic Graph) = 작업의 의존 관계를 방향성 비순환 그래프로 표현

  [extract] → [transform] → [load] → [notify]
      │                        ▲
      └── [validate] ──────────┘

DAG > Task > Operator 계층:

  DAG: "daily_settlement_pipeline"
    ├─ Task: "extract_orders"      ← Operator: PythonOperator
    ├─ Task: "validate_data"       ← Operator: PythonOperator
    ├─ Task: "run_settlement"      ← Operator: BashOperator
    ├─ Task: "export_to_s3"       ← Operator: S3UploadOperator
    └─ Task: "notify_slack"       ← Operator: SlackOperator

  Task = DAG 내의 하나의 작업 노드
  Operator = Task가 "무엇을 하는지" 정의하는 템플릿

주요 Operator 표:

Operator 역할 사용 예시
PythonOperator Python 함수 실행 ETL 로직, 데이터 검증
BashOperator Shell 명령 실행 스크립트 실행, 파일 처리
SqlOperator SQL 실행 테이블 생성, 데이터 삽입
HttpOperator HTTP 요청 REST API 호출
SparkSubmitOperator Spark Job 제출 대용량 분산 처리
KubernetesPodOperator K8s Pod으로 실행 격리된 환경에서 실행
BranchPythonOperator 조건 분기 if/else 로직
TriggerDagRunOperator 다른 DAG 트리거 DAG 간 의존성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# ★ DAG 정의 예시 — 일일 정산 파이프라인
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-team@company.com'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='daily_settlement',
    default_args=default_args,
    description='일일 정산 배치 파이프라인',
    schedule='0 2 * * *',              # 매일 새벽 2시
    start_date=datetime(2026, 1, 1),
    catchup=False,                     # 과거 미실행분 자동 실행 안 함
    tags=['settlement', 'daily'],
) as dag:

    extract = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders,
        op_kwargs={'date': '{{ ds }}'},  # 실행 날짜 자동 주입
    )

    validate = PythonOperator(
        task_id='validate_data',
        python_callable=validate_data,
    )

    settle = BashOperator(
        task_id='run_settlement',
        bash_command='python /app/batch/settlement.py --date {{ ds }}',
    )

    notify = SlackWebhookOperator(
        task_id='notify_completion',
        slack_webhook_conn_id='slack_webhook',
        message='✅ {{ ds }} 정산 배치 완료',
    )

    extract >> validate >> settle >> notify

7.3 Scheduler / Executor / Worker 내부 동작

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
★ Airflow 내부 아키텍처

┌────────────────────────────────────────────────────┐
│                     Web Server                      │
│  (Flask 기반 UI — DAG 목록, 실행 이력, 로그 조회)      │
└──────────────────────┬─────────────────────────────┘
                       │ (메타데이터 DB 공유)
┌──────────────────────┴─────────────────────────────┐
│                    Metadata DB                      │
│  (PostgreSQL / MySQL)                                │
│  DAG 정보, Task 상태, 스케줄 이력, XCom 데이터         │
└──────────────────────┬─────────────────────────────┘
                       │
┌──────────────────────┴─────────────────────────────┐
│                     Scheduler                       │
│  1. DAG 파일 파싱 (dags_folder 주기적 스캔)            │
│  2. DagRun 생성 (스케줄 시각 도달 시)                  │
│  3. TaskInstance 생성 및 의존성 확인                    │
│  4. 실행 가능한 Task를 Executor에 전달                 │
└──────────────────────┬─────────────────────────────┘
                       │
┌──────────────────────┴─────────────────────────────┐
│                     Executor                        │
│  ┌─────────────────────────────────────────────┐    │
│  │ LocalExecutor: 같은 머신에서 프로세스로 실행     │    │
│  │ CeleryExecutor: Celery Worker에 분산 실행     │    │
│  │ KubernetesExecutor: K8s Pod으로 실행          │    │
│  └─────────────────────────────────────────────┘    │
└──────────────────────┬─────────────────────────────┘
                       ▼
┌──────────────────────────────────────────────────────┐
│                    Worker(s)                          │
│  실제 Task 코드를 실행하는 프로세스/컨테이너              │
└──────────────────────────────────────────────────────┘
Executor 구조 장점 단점 적합 케이스
SequentialExecutor 단일 프로세스 순차 설정 간단 병렬 불가 개발/테스트 전용
LocalExecutor 로컬 멀티프로세스 설치 간단, 병렬 가능 단일 서버 한계 소~중규모
CeleryExecutor Celery + 메시지 브로커 수평 확장, 안정적 Redis/RabbitMQ 필요 중~대규모
KubernetesExecutor Task마다 Pod 생성 완벽한 격리, 자원 효율 K8s 필요, Pod 생성 지연 클라우드 네이티브
1
2
3
4
5
6
7
8
9
★ CeleryExecutor vs KubernetesExecutor 동작 비교

CeleryExecutor:
  Scheduler → Redis/RabbitMQ(큐) → Worker(상시 대기) → Task 실행
  → Worker가 항상 떠 있음 → 빠른 시작, 리소스 상시 점유

KubernetesExecutor:
  Scheduler → K8s API → Pod 생성 → Task 실행 → Pod 종료
  → Task마다 Pod → 완벽한 격리, 유휴 시 비용 0, 시작 지연 있음

7.4 XCom (Task 간 데이터 전달)

XCom(Cross-Communication) 은 Task 간에 소량의 데이터를 주고받는 메커니즘이다.

1
2
3
4
5
6
7
8
9
★ XCom 동작 원리

Task A                   Metadata DB                  Task B
   │  xcom_push           │                            │
   │  key="row_count"     │                            │
   │  value=150000        │                            │
   │─────────────────────▶│    xcom_pull               │
   │                      │◀───────────────────────────│
   │                      │──────── 150000 ──────────▶│
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ★ XCom 사용 예시
def extract_orders(**context):
    orders = fetch_orders_from_db(context['ds'])
    row_count = len(orders)
    context['ti'].xcom_push(key='row_count', value=row_count)
    context['ti'].xcom_push(key='file_path', value='/data/orders.parquet')
    return row_count  # return 값은 key='return_value'로 자동 저장

def validate_data(**context):
    row_count = context['ti'].xcom_pull(
        task_ids='extract_orders', key='row_count')
    file_path = context['ti'].xcom_pull(
        task_ids='extract_orders', key='file_path')
    if row_count == 0:
        raise ValueError("추출된 데이터가 없습니다!")

# Jinja 템플릿으로도 접근 가능
settle = BashOperator(
    task_id='run_settlement',
    bash_command='python settle.py --count {{ ti.xcom_pull(task_ids="extract_orders", key="row_count") }}',
)

⚠️ 주의 — XCom 대용량 데이터 금지:

1
2
3
4
5
6
7
❌ 안티패턴: XCom으로 DataFrame 전달
   xcom_push(key='data', value=huge_dataframe.to_dict())
   → Metadata DB에 수 GB 저장 💀

✓ 올바른 패턴: 파일 경로/S3 키만 전달
   df.to_parquet('/data/temp/orders_20260413.parquet')
   xcom_push(key='file_path', value='/data/temp/orders_20260413.parquet')

7.5 Connection / Hook / Provider

1
2
3
4
5
6
★ Connection / Hook / Provider 관계

Provider Package (pip install apache-airflow-providers-xxx)
  ├─ Operators (고수준 API) — S3UploadOperator 등
  ├─ Hooks (저수준 API)    — S3Hook.get_conn()
  └─ Connection (연결 정보) — host, port, login, password, extra JSON
개념 역할 설정 위치
Connection 외부 시스템 연결 정보 Airflow UI / 환경변수 / Secrets Backend
Hook Connection을 사용하여 외부 시스템과 실제 통신 Provider 패키지 내 제공
Provider 특정 서비스용 Operator+Hook 패키지 pip install
1
2
3
4
5
6
7
8
9
10
# ★ Hook 직접 사용 예시
from airflow.providers.postgres.hooks.postgres import PostgresHook

def extract_with_hook(**context):
    hook = PostgresHook(postgres_conn_id='orders_db')
    df = hook.get_pandas_df(
        "SELECT * FROM orders WHERE order_date = %(date)s",
        parameters={'date': context['ds']}
    )
    return len(df)

8. Airflow 실전 운영

8.1 DAG 설계 패턴

패턴 1: 작은 Task로 분리

1
2
3
4
5
6
7
8
❌ 하나의 거대한 Task:
  [do_everything]  ← 1시간짜리 단일 Task
  → 중간 실패 시 처음부터 다시 실행

✓ 작은 Task로 분리:
  [extract] → [validate] → [transform] → [load] → [notify]
  → 실패한 Task만 재실행
  → 각 Task 소요 시간/상태 명확

패턴 2: 멱등성 보장 + 날짜 파라미터

1
2
3
4
5
6
7
8
9
10
11
12
# ✓ 멱등한 DAG — 날짜 파라미터 활용
def process_orders(**context):
    execution_date = context['ds']  # '2026-04-13'

    # DELETE-INSERT 패턴으로 멱등성 보장
    hook = PostgresHook('orders_db')
    hook.run(f"DELETE FROM daily_summary WHERE summary_date = '{execution_date}'")
    # ... INSERT ...

# ❌ 안티패턴: datetime.now() 사용
def bad_process():
    today = datetime.now().date()  # 백필 시 잘못된 날짜!

날짜 파라미터 이해

1
2
3
4
5
6
7
8
9
10
★ Airflow 날짜 파라미터 — 가장 혼동하는 부분!

스케줄: 매일 02:00 / start_date: 2026-04-01

실행 시각         │  logical_date (ds)  │  data_interval
2026-04-02 02:00  │  2026-04-01         │  04-01 ~ 04-02
2026-04-03 02:00  │  2026-04-02         │  04-02 ~ 04-03
2026-04-04 02:00  │  2026-04-03         │  04-03 ~ 04-04

→ {{ ds }}는 "처리 대상 날짜" (실행 날짜보다 1일 전)

실무 팁: Airflow 2.x부터 execution_date 대신 logical_datedata_interval_start/end를 사용한다. 새 프로젝트는 data_interval_start를 사용하는 것이 직관적이다.

8.2 에러 핸들링과 재시도

1
2
3
4
5
6
★ Airflow 에러 핸들링 계층

  Level 1: Task 자동 재시도 — retries=3, retry_delay
  Level 2: 콜백 알림 — on_failure_callback → Slack/Email
  Level 3: trigger_rule — 조건부 실행 (all_done, one_failed 등)
  Level 4: SLA 미달 알림 — sla=timedelta(hours=2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# ★ 에러 핸들링 종합 예시
def on_failure(context):
    ti = context['ti']
    message = (
        f":red_circle: *배치 실패*\n"
        f"DAG: `{context['dag'].dag_id}`\n"
        f"Task: `{ti.task_id}`\n"
        f"Error: `{context.get('exception', 'Unknown')}`\n"
        f"<{ti.log_url}|로그 확인>"
    )
    send_slack_message(message)

with DAG('settlement_dag', ...) as dag:
    process = PythonOperator(
        task_id='process_settlement',
        python_callable=process_settlement,
        retries=3,
        retry_delay=timedelta(minutes=5),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(minutes=30),
        on_failure_callback=on_failure,
        execution_timeout=timedelta(hours=2),
    )

    # 성공/실패 무관하게 항상 실행
    cleanup = PythonOperator(
        task_id='cleanup',
        python_callable=cleanup,
        trigger_rule='all_done',
    )

    process >> cleanup

trigger_rule 종류:

trigger_rule 실행 조건 용도
all_success (기본) 모든 상위 Task 성공 일반적인 순차 실행
all_failed 모든 상위 Task 실패 전체 실패 시 보상 작업
all_done 모든 상위 Task 완료 (성공/실패 무관) 리소스 정리
one_success 하나라도 성공 병렬 Task 중 하나만 성공하면 됨
one_failed 하나라도 실패 실패 알림
none_failed_min_one_success 실패 없음 + 최소 1개 성공 브랜치 후 합류

8.3 모니터링

1
2
3
4
5
6
7
8
9
★ Airflow 모니터링 아키텍처

  Airflow Scheduler/Worker
       │ StatsD/Prometheus
       ▼
  Prometheus → Grafana Dashboard
       │
       ▼
  AlertManager → Slack / PagerDuty
메트릭 설명 위험 임계치
dag_processing.total_parse_time DAG 파싱 소요 시간 > 30초
scheduler.tasks.running 현재 실행 중인 Task 수 Worker 슬롯의 90%
scheduler.tasks.starving 실행 대기 중인 Task 수 지속적으로 > 0
dagrun.duration.success DAG 전체 실행 시간 SLA의 80%
zombie_tasks_killed 좀비 Task 종료 횟수 > 0 (잦으면 Worker 불안정)
pool.open_slots Pool 사용 가능 슬롯 0이면 병목
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ★ SLA 설정 예시
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    task_names = [t.task_id for t in task_list]
    send_slack_message(
        f":warning: *SLA 위반!*\n"
        f"DAG: `{dag.dag_id}`\n"
        f"Tasks: {task_names}"
    )

with DAG('daily_settlement', sla_miss_callback=sla_miss_callback, ...) as dag:
    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_fn,
        sla=timedelta(hours=1),  # 1시간 이내 완료되어야 함
    )

8.4 Variable / Connection 관리

1
2
3
4
5
6
7
8
9
10
11
★ Variable 사용 시 주의사항

❌ 안티패턴: 모듈 레벨에서 호출 → DAG 파싱마다 DB 조회!
  API_URL = Variable.get("api_url")

✓ 올바른 패턴 1: Task 실행 시점에 조회
  def my_task(**context):
      api_url = Variable.get("api_url")

✓ 올바른 패턴 2: Jinja 템플릿 (파싱 시 DB 조회 안 함)
  bash_command='curl {{ var.value.api_url }}/endpoint'

Secrets Backend (프로덕션 권장):

1
2
3
4
5
★ Secrets 우선순위

  1. 환경변수: AIRFLOW_CONN_MY_DB=postgresql://...
  2. Secrets Backend: AWS Secrets Manager / HashiCorp Vault (최우선 권장)
  3. Metadata DB: Airflow UI에서 설정 (비권장)

8.5 대규모 DAG 운영 노하우

DAG 파싱 성능 최적화:

1
2
3
4
5
6
7
8
# ❌ 안티패턴: 모듈 레벨에서 무거운 import
import pandas as pd        # DAG 파싱마다 import 발생
import tensorflow as tf

# ✓ 올바른 패턴: 함수 내에서 import
def heavy_processing(**context):
    import pandas as pd    # Task 실행 시에만 import
    df = pd.read_sql(...)

Pool로 리소스 제어:

1
2
3
4
5
6
7
8
9
★ Pool — 동시 실행 Task 수 제한

  Pool: "db_connection_pool" (slots=10)
  [Task A] ── slot 1
  [Task B] ── slot 2
  ...
  [Task J] ── slot 10
  [Task K] ── 대기 (slot 없음)
  → DB 커넥션 수 제한 등에 활용
1
2
3
4
5
6
7
task = PythonOperator(
    task_id='heavy_db_query',
    python_callable=query_fn,
    pool='db_connection_pool',
    pool_slots=2,
    priority_weight=10,
)

.airflowignore:

1
2
3
4
5
6
# dags_folder에 위치 — 파싱 제외 대상
test_*.py
*_test.py
utils/
__pycache__/
deprecated_*.py

9. Spring Batch vs Airflow 비교

9.1 아키텍처 철학 차이

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
★ 근본적인 차이: "프레임워크" vs "오케스트레이터"

Spring Batch:                        Airflow:
  ┌──────────────────┐              ┌──────────────────┐
  │ 데이터를 직접       │              │ 작업을 조율하는      │
  │ 읽고, 처리하고,     │              │ 지휘자                │
  │ 쓰는 전 과정을      │              │ 실제 처리는          │
  │ 프레임워크가 지원    │              │ 외부 시스템이 수행     │
  │ Read → Process     │              │ 실행 순서/조건/       │
  │ → Write            │              │ 재시도/모니터링       │
  │ 트랜잭션/체크포인트  │              │ 전담                  │
  └──────────────────┘              └──────────────────┘

비유:
  Spring Batch = 공장의 조립 라인 (직접 만든다)
  Airflow = 공장장 (어떤 라인을 언제 돌릴지 관리한다)

9.2 상세 비교표

비교 항목 Spring Batch Apache Airflow
유형 배치 처리 프레임워크 워크플로우 오케스트레이터
언어 Java/Kotlin Python
데이터 처리 직접 처리 (Reader/Processor/Writer) 외부 시스템에 위임
트랜잭션 Spring TX 관리 (chunk 단위) 없음 (각 Task가 자체 관리)
체크포인트 내장 (ExecutionContext) 없음 (Task 단위 재시도)
재시작 실패 chunk부터 재개 실패 Task부터 재실행
스케줄링 외부 필요 (cron, Quartz) 내장 (cron, timetable)
의존성 관리 Step 간 순차/분기 (단일 Job 내) DAG로 복잡한 의존 관계
모니터링 Actuator + 외부 도구 내장 Web UI
백필 수동 파라미터 변경 내장 (catchup, backfill CLI)
확장성 Partitioning, 원격 청킹 Executor (Celery, K8s)
에러 핸들링 Skip/Retry/faultTolerant retries/callback/trigger_rule
학습 곡선 중간 (Spring 생태계 필요) 낮음~중간 (Python 기반)
운영 부담 낮음 (JVM 앱으로 배포) 중간~높음 (Scheduler+Worker+DB+UI)

9.3 사용 사례 매핑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
★ 이런 상황에는 이것을 선택하라

  Spring Batch:
  ✓ Java/Spring 기반 시스템에서 데이터 직접 처리
  ✓ 트랜잭션 정합성이 매우 중요 (정산, 결제)
  ✓ chunk 단위 체크포인트/재시작이 필요한 대용량
  ✓ 기존 Spring 인프라/팀 역량 활용
  ✓ Skip/Retry 등 세밀한 에러 핸들링 필요

  Airflow:
  ✓ 여러 시스템/서비스를 조합하는 워크플로우
  ✓ ETL/ELT 파이프라인 (Spark, dbt 등 오케스트레이션)
  ✓ 복잡한 의존 관계 (DAG)가 있는 작업 체인
  ✓ Python 중심 데이터 팀
  ✓ 웹 UI 기반 모니터링/관리 필요
  ✓ 백필(과거 데이터 재처리)이 빈번

  둘 다 사용 (하이브리드):
  ✓ Airflow로 전체 파이프라인 오케스트레이션
  ✓ Spring Batch Job은 Airflow에서 REST API로 트리거
  ✓ 대용량 처리는 Spring Batch, 흐름 관리는 Airflow

9.4 하이브리드 아키텍처

실무에서는 Spring Batch와 Airflow를 함께 사용하는 하이브리드가 매우 효과적이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
★ 하이브리드 아키텍처 — Airflow + Spring Batch

Airflow (오케스트레이터)
  │
  ├─ Task 1: [데이터 검증 (Python)]
  │
  ├─ Task 2: [Spring Batch Job 트리거 (HTTP)]
  │             POST /batch/jobs/settlement
  │             → JobLauncher.run(job, params)
  │             → 대용량 정산 처리 (chunk 기반)
  │
  ├─ Task 3: [완료 대기 (HttpSensor)]
  │             → Spring Batch Job 상태 폴링
  │             → COMPLETED 확인
  │
  ├─ Task 4: [후속 처리 (Python/Spark)]
  └─ Task 5: [알림 (Slack)]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// ★ Spring Batch REST API 엔드포인트
@RestController
@RequestMapping("/batch/jobs")
public class BatchJobController {

    private final JobLauncher jobLauncher;
    private final Job settlementJob;
    private final JobExplorer jobExplorer;

    @PostMapping("/settlement")
    public ResponseEntity<Map<String, Object>> runSettlement(
            @RequestParam String date) {
        JobParameters params = new JobParametersBuilder()
            .addLocalDate("date", LocalDate.parse(date))
            .addLong("timestamp", System.currentTimeMillis())
            .toJobParameters();
        JobExecution execution = jobLauncher.run(settlementJob, params);
        return ResponseEntity.ok(Map.of(
            "executionId", execution.getId(),
            "status", execution.getStatus().name()
        ));
    }

    @GetMapping("/settlement/status/{executionId}")
    public ResponseEntity<Map<String, Object>> getStatus(
            @PathVariable Long executionId) {
        JobExecution execution = jobExplorer.getJobExecution(executionId);
        return ResponseEntity.ok(Map.of(
            "executionId", executionId,
            "status", execution.getStatus().name(),
            "exitCode", execution.getExitStatus().getExitCode()
        ));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# ★ Airflow에서 Spring Batch Job 트리거
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor

trigger_batch = SimpleHttpOperator(
    task_id='trigger_settlement_batch',
    http_conn_id='spring_batch_api',
    endpoint='/batch/jobs/settlement',
    method='POST',
    data=json.dumps({'date': '{{ ds }}'}),
    headers={'Content-Type': 'application/json'},
    response_filter=lambda resp: json.loads(resp.text)['executionId'],
    do_xcom_push=True,
)

wait_batch = HttpSensor(
    task_id='wait_settlement_complete',
    http_conn_id='spring_batch_api',
    endpoint='/batch/jobs/settlement/status/{{ ti.xcom_pull(task_ids="trigger_settlement_batch") }}',
    response_check=lambda resp: json.loads(resp.text)['status'] == 'COMPLETED',
    poke_interval=60,
    timeout=7200,
    mode='reschedule',
)

trigger_batch >> wait_batch >> downstream_tasks

10. 배치 처리 장애 시나리오 & 트러블슈팅

10.1 OOM 문제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
★ OOM 발생 원인과 해결 — 기술별

Spring Batch:
  원인 1: JpaPagingItemReader가 영속성 컨텍스트에 모든 엔티티 캐싱
  해결:   entityManager.clear() 호출 또는 JdbcPagingItemReader 전환

  원인 2: chunk-size가 너무 큼
  해결:   chunk-size 축소 (10000 → 1000)

  원인 3: ItemProcessor에서 List에 데이터 축적
  해결:   상태 없는 Processor 설계

Python:
  원인 1: pd.read_sql()로 전체 테이블 로드
  해결:   chunksize 파라미터 사용

  원인 2: 리스트에 결과를 계속 append
  해결:   Generator 사용, chunk 단위 처리

Airflow Worker:
  원인 1: Task 내에서 대량 데이터 메모리 처리
  해결:   외부 시스템(Spark, dbt)에 처리 위임

  원인 2: DAG 파싱 시 무거운 라이브러리 import
  해결:   함수 내 지연 import
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ★ Spring Batch JPA OOM 해결 — 영속성 컨텍스트 클리어
@Bean
public Step jpaStep(JobRepository jobRepository,
                    PlatformTransactionManager tm) {
    return new StepBuilder("jpaStep", jobRepository)
        .<Order, Settlement>chunk(1000, tm)
        .reader(jpaPagingReader())
        .processor(processor())
        .writer(writer())
        .listener(new ChunkListener() {
            @Autowired private EntityManager entityManager;

            @Override
            public void afterChunk(ChunkContext context) {
                entityManager.clear();  // ★ chunk마다 영속성 컨텍스트 클리어!
            }
        })
        .build();
}

10.2 데드락

1
2
3
4
5
6
7
8
9
★ 멀티스레드 배치에서의 DB 데드락

  Thread-1: UPDATE accounts SET balance = balance - 100 WHERE id = A  (A 락 획득)
  Thread-1: UPDATE accounts SET balance = balance + 100 WHERE id = B  (B 대기...)

  Thread-2: UPDATE accounts SET balance = balance - 50  WHERE id = B  (B 락 획득)
  Thread-2: UPDATE accounts SET balance = balance + 50  WHERE id = A  (A 대기...)

  → Thread-1은 B를 기다리고, Thread-2는 A를 기다림 → 데드락! 💀
전략 설명
락 순서 고정 항상 ID 오름차순으로 락 획득
Partitioning 파티션 간 데이터 겹침 방지
낙관적 락 JPA @Version 컬럼으로 충돌 감지
Retry 데드락 발생 시 자동 재시도
격리 수준 조정 SERIALIZABLE 회피, READ_COMMITTED 사용
1
2
3
4
5
6
7
8
9
10
11
12
// ★ 데드락 방지 — Partitioning + Retry 조합
@Bean
public Step safeStep(JobRepository jobRepository, PlatformTransactionManager tm) {
    return new StepBuilder("safeStep", jobRepository)
        .<Account, Account>chunk(500, tm)
        .reader(partitionedReader(null, null))  // ID 범위 분할
        .writer(accountWriter())
        .faultTolerant()
            .retryLimit(3)
            .retry(DeadlockLoserDataAccessException.class)
        .build();
}

10.3 느린 배치 진단과 최적화

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
★ 느린 배치 진단 플로우차트

배치가 느리다!
    │
    ├─ Read가 느림?
    │   ├─ 인덱스 누락 → CREATE INDEX
    │   ├─ 전체 테이블 스캔 → WHERE 조건 추가
    │   └─ fetchSize 미설정 → fetchSize 증가
    │
    ├─ Process가 느림?
    │   ├─ 건별 API 호출 → 벌크 API로 전환
    │   ├─ 복잡한 계산 → 캐싱 / 사전 계산
    │   └─ N+1 쿼리 → JOIN / 배치 조회
    │
    └─ Write가 느림?
        ├─ 건별 INSERT → 벌크 INSERT
        ├─ 인덱스 과다 → 배치 전 인덱스 비활성화
        └─ auto-commit → 배치 커밋
증상 원인 해결 효과
Read 시간 > 60% 인덱스 누락 인덱스 추가 10~100배
Write 시간 > 60% 건별 INSERT JDBC Batch + rewriteBatchedStatements 5~20배
특정 chunk만 느림 데이터 편향 균등 파티셔닝 2~5배
전체적으로 느림 싱글 스레드 Partitioning (8~16 파티션) 5~10배
GC 빈번 chunk-size 과대 chunk-size 축소, 힙 증가 2~3배

10.4 중복 실행 방지

1
2
3
4
5
6
7
8
9
10
11
★ 중복 실행 방지 전략

Spring Batch:
  ① JobParameters 고유성 (같은 파라미터 재실행 차단)
  ② ShedLock (분산 락)
  ③ DB 기반 분산 락 (SELECT FOR UPDATE)

Airflow:
  ① max_active_runs=1 (DAG당 동시 실행 1개)
  ② depends_on_past=True (이전 성공 후 실행)
  ③ Pool (동시 Task 수 제한)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// ★ DB 기반 분산 락
@Component
public class DistributedLockRunner {
    private final JdbcTemplate jdbcTemplate;
    private final JobLauncher jobLauncher;

    public JobExecution runWithLock(Job job, JobParameters params) {
        String lockKey = job.getName() + "_" + params.getString("date");

        boolean acquired = jdbcTemplate.update(
            "INSERT INTO batch_lock (lock_key, locked_at, locked_by) " +
            "VALUES (?, NOW(), ?) " +
            "ON DUPLICATE KEY UPDATE lock_key = lock_key",
            lockKey, InetAddress.getLocalHost().getHostName()
        ) > 0;

        if (!acquired) {
            throw new BatchLockException("이미 실행 중: " + lockKey);
        }
        try {
            return jobLauncher.run(job, params);
        } finally {
            jdbcTemplate.update("DELETE FROM batch_lock WHERE lock_key = ?", lockKey);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
# ★ Airflow 중복 실행 방지
with DAG(
    'daily_settlement',
    max_active_runs=1,           # ★ DAG 동시 실행 1개
    catchup=False,
    ...
) as dag:
    task = PythonOperator(
        task_id='settle',
        depends_on_past=True,    # ★ 이전 실행 성공 후에만 실행
        ...
    )

10.5 배치 안티패턴 모음

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
★ 배치 안티패턴 TOP 7

  1. 🚫 "만능 배치" — 하나의 Job에 모든 로직
     → 하나 실패하면 전체 실패
     ✓ 역할별로 Job 분리

  2. 🚫 "날짜 하드코딩" — LocalDate.now() / datetime.now()
     → 백필/재실행 시 잘못된 날짜
     ✓ JobParameters / Airflow 로 날짜 주입

  3. 🚫 "트랜잭션 무시" — 전체를 하나의 TX로
     → 실패 시 전체 롤백, 메모리 폭발
     ✓ chunk 단위 트랜잭션

  4. 🚫 "로깅 없는 배치" — print()만 또는 로그 없음
     → 장애 시 원인 추적 불가
     ✓ 구조화된 로깅 + 처리 건수/시간 기록

  5. 🚫 "멱등성 없는 배치" — INSERT만 사용
     → 재실행 시 데이터 중복
     ✓ UPSERT 또는 DELETE-INSERT 패턴

  6. 🚫 "알림 없는 배치" — 실패해도 아무도 모름
     → 다음날 발견
     ✓ 성공/실패 알림 필수 (Slack, Email)

  7. 🚫 "테스트 없는 배치" — 프로덕션이 첫 테스트
     ✓ 소량 데이터로 통합 테스트
     ✓ Step 단위 단위 테스트
     ✓ 스테이징 환경에서 검증
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// ★ Spring Batch 테스트 코드 예시
@SpringBatchTest
@SpringBootTest
class SettlementStepTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Test
    void 정산_Step_정상_처리() {
        // 테스트 데이터 준비
        jdbcTemplate.update(
            "INSERT INTO orders (order_id, seller_id, amount, status, order_date) " +
            "VALUES (1, 'S001', 10000, 'COMPLETED', '2026-04-13')");
        jdbcTemplate.update(
            "INSERT INTO orders (order_id, seller_id, amount, status, order_date) " +
            "VALUES (2, 'S001', 20000, 'COMPLETED', '2026-04-13')");

        // Step 실행
        JobExecution execution = jobLauncherTestUtils.launchStep("settlementStep",
            new JobParametersBuilder()
                .addLocalDate("date", LocalDate.of(2026, 4, 13))
                .toJobParameters());

        // 검증
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);

        Map<String, Object> settlement = jdbcTemplate.queryForMap(
            "SELECT * FROM settlement WHERE seller_id = 'S001'");
        assertThat(settlement.get("amount")).isEqualTo(new BigDecimal("30000"));
    }

    @Test
    void 멱등성_검증_두번_실행해도_같은_결과() {
        // ... 데이터 준비 ...
        jobLauncherTestUtils.launchStep("settlementStep", params());
        jobLauncherTestUtils.launchStep("settlementStep", params());

        int count = jdbcTemplate.queryForObject(
            "SELECT COUNT(*) FROM settlement WHERE seller_id = 'S001'", Integer.class);
        assertThat(count).isEqualTo(1);  // 중복 없이 1건만 존재
    }
}

마무리: 배치 처리 심화 가이드 전체 요약

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
★ 2부작 전체 요약

┌──────────────────────────────────────────────────────┐
│  Part 1 — 배치 기본 + Spring Batch 완전 정복           │
│  ① 배치 = 데이터를 모아서 한꺼번에 처리                  │
│  ② 핵심 원칙: 멱등성, 재시작, Chunk, Skip/Retry         │
│  ③ Job → Step → Chunk(R/P/W) 아키텍처                 │
│  ④ 6개 메타데이터 테이블로 상태 관리                      │
│  ⑤ 멀티스레드/파티셔닝으로 병렬화                        │
│  ⑥ ShedLock / 모니터링 / 성능 튜닝                     │
├──────────────────────────────────────────────────────┤
│  Part 2 — Python + Airflow + 비교 + 트러블슈팅         │
│  ⑦ Python: pandas chunksize + multiprocessing        │
│  ⑧ Airflow: DAG/Operator/Executor/XCom              │
│  ⑨ Airflow 운영: 멱등 DAG, SLA, Pool, 모니터링        │
│  ⑩ Spring Batch = 프레임워크, Airflow = 오케스트레이터  │
│  ⑪ 하이브리드: Airflow → Spring Batch REST 트리거      │
│  ⑫ OOM/데드락/느린 배치 진단 + 안티패턴 7선              │
└──────────────────────────────────────────────────────┘

★ 기술 선택 요약

"Java 기반 대용량 데이터 직접 처리"               → Spring Batch
"Python 기반 워크플로우 오케스트레이션"            → Airflow
"대규모 파이프라인 + 정밀한 데이터 처리"           → Airflow + Spring Batch 하이브리드

배치 처리는 화려하지 않지만, 기업의 데이터 처리 인프라에서 가장 중요한 기둥 중 하나이다. 정산이 1원이라도 틀리면 안 되고, 보고서가 하루라도 늦으면 안 되며, 데이터 이관이 누락되면 분석 전체가 틀어진다. 이 가이드가 배치 시스템을 더 견고하고 효율적으로 만드는 데 도움이 되길 바란다.