배치 처리 완벽 심화 가이드 2편: Python 배치 + Airflow 완전 정복 + 트러블슈팅
이 글은 배치 처리 심화 가이드 2부작 중 2편이다. Python 배치 처리 기법, Apache Airflow의 아키텍처와 실전 운영, Spring Batch vs Airflow 비교, 그리고 배치 장애 시나리오와 트러블슈팅을 다룬다.
- 1편: 배치 기본 원칙 + Spring Batch 아키텍처 / 핵심 기능 / 실전 운영
- 2편 (현재): Python 배치 + Airflow 아키텍처·운영 + 비교 분석 + 트러블슈팅
6. Python 배치 처리 기초
6.1 순수 Python 배치 (cron + script 패턴)
가장 기본적인 Python 배치는 cron으로 스크립트를 실행하는 방식이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| ★ Python cron 배치 구조
crontab:
0 2 * * * /usr/bin/python3 /app/batch/daily_settlement.py
daily_settlement.py:
┌─────────────────────────────────────┐
│ 1. 설정 로드 (DB 연결, 로깅) │
│ 2. 데이터 조회 (SELECT) │
│ 3. 처리 (집계, 변환) │
│ 4. 결과 저장 (INSERT/UPDATE) │
│ 5. 알림 (Slack, Email) │
│ 6. 정리 (커넥션 닫기) │
└─────────────────────────────────────┘
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
| # ★ 기본 Python 배치 스크립트 구조
import logging
import sys
from datetime import date, timedelta
from contextlib import contextmanager
import psycopg2
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(f'/var/log/batch/settlement_{date.today()}.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
@contextmanager
def get_connection():
conn = psycopg2.connect(
host='db-host', dbname='orders', user='batch_user', password='***'
)
try:
yield conn
finally:
conn.close()
def run_settlement(target_date: date):
logger.info(f"정산 배치 시작: target_date={target_date}")
with get_connection() as conn:
with conn.cursor() as cur:
# 1. 데이터 조회
cur.execute("""
SELECT seller_id, SUM(amount) as total, COUNT(*) as cnt
FROM orders
WHERE order_date = %s AND status = 'COMPLETED'
GROUP BY seller_id
""", (target_date,))
rows = cur.fetchall()
logger.info(f"대상 판매자 수: {len(rows)}")
# 2. 정산 결과 저장 (UPSERT — 멱등성)
for seller_id, total, cnt in rows:
fee = total * 0.03
settle_amount = total - fee
cur.execute("""
INSERT INTO settlement (seller_id, settle_date, amount, fee, settle_amount)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (seller_id, settle_date)
DO UPDATE SET amount = EXCLUDED.amount,
fee = EXCLUDED.fee,
settle_amount = EXCLUDED.settle_amount
""", (seller_id, target_date, total, fee, settle_amount))
conn.commit()
logger.info(f"정산 배치 완료: {len(rows)}건 처리")
if __name__ == '__main__':
target = date.today() - timedelta(days=1)
try:
run_settlement(target)
except Exception as e:
logger.error(f"정산 배치 실패: {e}", exc_info=True)
sys.exit(1)
|
6.2 pandas를 활용한 데이터 처리
pandas는 데이터 변환·집계에 강력하지만, 대용량에서는 메모리 관리가 핵심이다.
1
2
3
4
5
6
7
8
9
10
| ★ pandas 대용량 처리 — chunksize 활용
일반 방식 (위험!):
df = pd.read_sql("SELECT * FROM orders", conn) # 1000만 건 → OOM 💀
chunksize 방식 (안전):
for chunk in pd.read_sql(..., chunksize=10000): # 1만 건씩
process(chunk)
write(chunk)
# 메모리: 항상 1만 건 수준 유지
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| # ★ pandas chunksize를 활용한 대용량 ETL
import pandas as pd
from sqlalchemy import create_engine
source_engine = create_engine('postgresql://user:pass@source-db/orders')
target_engine = create_engine('postgresql://user:pass@target-db/analytics')
def etl_daily_orders(target_date: str):
query = f"""
SELECT order_id, seller_id, buyer_id, amount, category, order_date
FROM orders WHERE order_date = '{target_date}'
"""
total_processed = 0
for chunk in pd.read_sql(query, source_engine, chunksize=50000):
# Transform
chunk['amount_krw'] = chunk['amount'].astype(float)
chunk['fee'] = chunk['amount_krw'] * 0.03
chunk['settle_amount'] = chunk['amount_krw'] - chunk['fee']
result = chunk[['order_id', 'seller_id', 'amount_krw',
'fee', 'settle_amount', 'order_date']]
# Load
result.to_sql('daily_order_summary', target_engine,
if_exists='append', index=False, method='multi',
chunksize=5000)
total_processed += len(chunk)
logger.info(f"진행: {total_processed}건 처리 완료")
logger.info(f"ETL 완료: 총 {total_processed}건")
|
6.3 SQLAlchemy를 활용한 DB 배치
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| # ★ SQLAlchemy Core — 벌크 UPSERT
from sqlalchemy import create_engine, text
from sqlalchemy.dialects.postgresql import insert
engine = create_engine('postgresql://user:pass@db/app', pool_size=5)
def bulk_upsert_settlements(settlements: list[dict]):
"""대량 UPSERT — PostgreSQL ON CONFLICT 활용"""
with engine.begin() as conn:
stmt = insert(settlement_table).values(settlements)
upsert_stmt = stmt.on_conflict_do_update(
index_elements=['seller_id', 'settle_date'],
set_={
'amount': stmt.excluded.amount,
'fee': stmt.excluded.fee,
'settle_amount': stmt.excluded.settle_amount,
'updated_at': text('NOW()')
}
)
result = conn.execute(upsert_stmt)
logger.info(f"UPSERT 완료: {result.rowcount}건")
def batch_process_in_chunks(query: str, chunk_size: int = 10000):
"""커서 기반 청크 처리 — 메모리 효율적"""
with engine.connect() as conn:
result = conn.execution_options(stream_results=True).execute(text(query))
while True:
rows = result.fetchmany(chunk_size)
if not rows:
break
settlements = [transform_to_settlement(row) for row in rows]
bulk_upsert_settlements(settlements)
|
6.4 대용량 처리 테크닉
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| ★ Python 대용량 처리 옵션
┌──────────────────────────────────────────────┐
│ Generator — 메모리 절약의 기본 │
│ "한 번에 하나씩 생성, 모두 메모리에 올리지 않음" │
├──────────────────────────────────────────────┤
│ multiprocessing — CPU-bound 병렬화 │
│ "GIL 우회, 코어 수만큼 진짜 병렬" │
├──────────────────────────────────────────────┤
│ asyncio — I/O-bound 동시성 │
│ "네트워크/DB 대기 시간 활용, 단일 스레드" │
├──────────────────────────────────────────────┤
│ concurrent.futures — 간편한 병렬 처리 │
│ "ThreadPool / ProcessPool 추상화" │
└──────────────────────────────────────────────┘
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| # ★ Generator 패턴 — 대용량 파일 처리
def read_large_csv(filepath: str, chunk_size: int = 10000):
"""메모리를 chunk_size만큼만 사용하는 Generator"""
import csv
with open(filepath, 'r') as f:
reader = csv.DictReader(f)
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
for chunk in read_large_csv('/data/orders_20260413.csv'):
process_and_save(chunk)
# ★ multiprocessing 패턴 — CPU-bound 병렬 처리
from multiprocessing import Pool
def process_partition(args):
partition_id, start_id, end_id = args
engine = create_engine('postgresql://...') # 프로세스별 커넥션
query = f"SELECT * FROM orders WHERE id BETWEEN {start_id} AND {end_id}"
df = pd.read_sql(query, engine)
result = heavy_computation(df)
result.to_sql('results', engine, if_exists='append', index=False)
return len(result)
partitions = [(i, i*100000+1, (i+1)*100000) for i in range(8)]
with Pool(processes=8) as pool:
results = pool.map(process_partition, partitions)
print(f"총 처리 건수: {sum(results)}")
# ★ asyncio 패턴 — I/O-bound 동시 처리 (API 호출)
import asyncio
import aiohttp
async def batch_api_calls(item_ids: list[int]):
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(50) # 동시 요청 제한
async def limited_fetch(item_id):
async with semaphore:
async with session.get(f"{API_URL}/items/{item_id}") as resp:
return await resp.json()
results = await asyncio.gather(
*[limited_fetch(id) for id in item_ids]
)
return results
|
6.5 Python 배치의 한계 → 오케스트레이터 필요성
1
2
3
4
5
6
7
8
9
10
| ★ 순수 Python 배치의 한계
1. 의존성 관리 없음 — "Job A 완료 후 Job B 실행" → cron으로 불가
2. 재시도/실패 복구 수동 구현 — 모든 에러 핸들링을 직접 코딩
3. 모니터링 부재 — "어떤 Job이 언제 성공/실패했는지" 로그 뒤적
4. 스케줄링 제약 — cron은 "실행 시각"만 설정 가능
5. 백필(Backfill) 어려움 — 과거 날짜 재처리 시 수동 파라미터 변경
↓ 이 모든 문제를 해결하는 것이
Apache Airflow
|
7. Apache Airflow 아키텍처 Deep Dive
7.1 Airflow란 무엇인가
Apache Airflow는 워크플로우 오케스트레이터(Workflow Orchestrator) 이다. 2014년 Airbnb의 Maxime Beauchemin이 개발했으며, 2019년 Apache Top-Level 프로젝트가 되었다.
1
2
3
4
5
6
7
8
9
10
11
12
| ★ Airflow의 핵심 철학
┌──────────────────────────────────────────────┐
│ "Airflow is NOT a data processing tool." │
│ "Airflow is a WORKFLOW ORCHESTRATOR." │
│ │
│ Airflow는 데이터를 직접 처리하지 않는다. │
│ "무엇을 언제 어떤 순서로 실행할지" 관리한다. │
│ │
│ 실제 처리: Spark, dbt, Python 스크립트, API 등 │
│ Airflow: 실행 순서·조건·재시도·알림 관리 │
└──────────────────────────────────────────────┘
|
| Airflow가 잘하는 것 |
Airflow가 적합하지 않은 것 |
| 작업 의존성/순서 관리 |
대용량 데이터 직접 처리 |
| 스케줄링 + 백필 |
실시간 스트리밍 처리 |
| 재시도 + 알림 |
밀리초 단위 저지연 처리 |
| 모니터링 웹 UI |
무한 루프 / 상시 실행 |
7.2 DAG / Task / Operator 구조
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| ★ Airflow의 핵심 구조
DAG (Directed Acyclic Graph) = 작업의 의존 관계를 방향성 비순환 그래프로 표현
[extract] → [transform] → [load] → [notify]
│ ▲
└── [validate] ──────────┘
DAG > Task > Operator 계층:
DAG: "daily_settlement_pipeline"
├─ Task: "extract_orders" ← Operator: PythonOperator
├─ Task: "validate_data" ← Operator: PythonOperator
├─ Task: "run_settlement" ← Operator: BashOperator
├─ Task: "export_to_s3" ← Operator: S3UploadOperator
└─ Task: "notify_slack" ← Operator: SlackOperator
Task = DAG 내의 하나의 작업 노드
Operator = Task가 "무엇을 하는지" 정의하는 템플릿
|
주요 Operator 표:
| Operator |
역할 |
사용 예시 |
PythonOperator |
Python 함수 실행 |
ETL 로직, 데이터 검증 |
BashOperator |
Shell 명령 실행 |
스크립트 실행, 파일 처리 |
SqlOperator |
SQL 실행 |
테이블 생성, 데이터 삽입 |
HttpOperator |
HTTP 요청 |
REST API 호출 |
SparkSubmitOperator |
Spark Job 제출 |
대용량 분산 처리 |
KubernetesPodOperator |
K8s Pod으로 실행 |
격리된 환경에서 실행 |
BranchPythonOperator |
조건 분기 |
if/else 로직 |
TriggerDagRunOperator |
다른 DAG 트리거 |
DAG 간 의존성 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| # ★ DAG 정의 예시 — 일일 정산 파이프라인
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-team@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='daily_settlement',
default_args=default_args,
description='일일 정산 배치 파이프라인',
schedule='0 2 * * *', # 매일 새벽 2시
start_date=datetime(2026, 1, 1),
catchup=False, # 과거 미실행분 자동 실행 안 함
tags=['settlement', 'daily'],
) as dag:
extract = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders,
op_kwargs={'date': '{{ ds }}'}, # 실행 날짜 자동 주입
)
validate = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
)
settle = BashOperator(
task_id='run_settlement',
bash_command='python /app/batch/settlement.py --date {{ ds }}',
)
notify = SlackWebhookOperator(
task_id='notify_completion',
slack_webhook_conn_id='slack_webhook',
message='✅ {{ ds }} 정산 배치 완료',
)
extract >> validate >> settle >> notify
|
7.3 Scheduler / Executor / Worker 내부 동작
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| ★ Airflow 내부 아키텍처
┌────────────────────────────────────────────────────┐
│ Web Server │
│ (Flask 기반 UI — DAG 목록, 실행 이력, 로그 조회) │
└──────────────────────┬─────────────────────────────┘
│ (메타데이터 DB 공유)
┌──────────────────────┴─────────────────────────────┐
│ Metadata DB │
│ (PostgreSQL / MySQL) │
│ DAG 정보, Task 상태, 스케줄 이력, XCom 데이터 │
└──────────────────────┬─────────────────────────────┘
│
┌──────────────────────┴─────────────────────────────┐
│ Scheduler │
│ 1. DAG 파일 파싱 (dags_folder 주기적 스캔) │
│ 2. DagRun 생성 (스케줄 시각 도달 시) │
│ 3. TaskInstance 생성 및 의존성 확인 │
│ 4. 실행 가능한 Task를 Executor에 전달 │
└──────────────────────┬─────────────────────────────┘
│
┌──────────────────────┴─────────────────────────────┐
│ Executor │
│ ┌─────────────────────────────────────────────┐ │
│ │ LocalExecutor: 같은 머신에서 프로세스로 실행 │ │
│ │ CeleryExecutor: Celery Worker에 분산 실행 │ │
│ │ KubernetesExecutor: K8s Pod으로 실행 │ │
│ └─────────────────────────────────────────────┘ │
└──────────────────────┬─────────────────────────────┘
▼
┌──────────────────────────────────────────────────────┐
│ Worker(s) │
│ 실제 Task 코드를 실행하는 프로세스/컨테이너 │
└──────────────────────────────────────────────────────┘
|
| Executor |
구조 |
장점 |
단점 |
적합 케이스 |
| SequentialExecutor |
단일 프로세스 순차 |
설정 간단 |
병렬 불가 |
개발/테스트 전용 |
| LocalExecutor |
로컬 멀티프로세스 |
설치 간단, 병렬 가능 |
단일 서버 한계 |
소~중규모 |
| CeleryExecutor |
Celery + 메시지 브로커 |
수평 확장, 안정적 |
Redis/RabbitMQ 필요 |
중~대규모 |
| KubernetesExecutor |
Task마다 Pod 생성 |
완벽한 격리, 자원 효율 |
K8s 필요, Pod 생성 지연 |
클라우드 네이티브 |
1
2
3
4
5
6
7
8
9
| ★ CeleryExecutor vs KubernetesExecutor 동작 비교
CeleryExecutor:
Scheduler → Redis/RabbitMQ(큐) → Worker(상시 대기) → Task 실행
→ Worker가 항상 떠 있음 → 빠른 시작, 리소스 상시 점유
KubernetesExecutor:
Scheduler → K8s API → Pod 생성 → Task 실행 → Pod 종료
→ Task마다 Pod → 완벽한 격리, 유휴 시 비용 0, 시작 지연 있음
|
XCom(Cross-Communication) 은 Task 간에 소량의 데이터를 주고받는 메커니즘이다.
1
2
3
4
5
6
7
8
9
| ★ XCom 동작 원리
Task A Metadata DB Task B
│ xcom_push │ │
│ key="row_count" │ │
│ value=150000 │ │
│─────────────────────▶│ xcom_pull │
│ │◀───────────────────────────│
│ │──────── 150000 ──────────▶│
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| # ★ XCom 사용 예시
def extract_orders(**context):
orders = fetch_orders_from_db(context['ds'])
row_count = len(orders)
context['ti'].xcom_push(key='row_count', value=row_count)
context['ti'].xcom_push(key='file_path', value='/data/orders.parquet')
return row_count # return 값은 key='return_value'로 자동 저장
def validate_data(**context):
row_count = context['ti'].xcom_pull(
task_ids='extract_orders', key='row_count')
file_path = context['ti'].xcom_pull(
task_ids='extract_orders', key='file_path')
if row_count == 0:
raise ValueError("추출된 데이터가 없습니다!")
# Jinja 템플릿으로도 접근 가능
settle = BashOperator(
task_id='run_settlement',
bash_command='python settle.py --count {{ ti.xcom_pull(task_ids="extract_orders", key="row_count") }}',
)
|
⚠️ 주의 — XCom 대용량 데이터 금지:
1
2
3
4
5
6
7
| ❌ 안티패턴: XCom으로 DataFrame 전달
xcom_push(key='data', value=huge_dataframe.to_dict())
→ Metadata DB에 수 GB 저장 💀
✓ 올바른 패턴: 파일 경로/S3 키만 전달
df.to_parquet('/data/temp/orders_20260413.parquet')
xcom_push(key='file_path', value='/data/temp/orders_20260413.parquet')
|
7.5 Connection / Hook / Provider
1
2
3
4
5
6
| ★ Connection / Hook / Provider 관계
Provider Package (pip install apache-airflow-providers-xxx)
├─ Operators (고수준 API) — S3UploadOperator 등
├─ Hooks (저수준 API) — S3Hook.get_conn()
└─ Connection (연결 정보) — host, port, login, password, extra JSON
|
| 개념 |
역할 |
설정 위치 |
| Connection |
외부 시스템 연결 정보 |
Airflow UI / 환경변수 / Secrets Backend |
| Hook |
Connection을 사용하여 외부 시스템과 실제 통신 |
Provider 패키지 내 제공 |
| Provider |
특정 서비스용 Operator+Hook 패키지 |
pip install |
1
2
3
4
5
6
7
8
9
10
| # ★ Hook 직접 사용 예시
from airflow.providers.postgres.hooks.postgres import PostgresHook
def extract_with_hook(**context):
hook = PostgresHook(postgres_conn_id='orders_db')
df = hook.get_pandas_df(
"SELECT * FROM orders WHERE order_date = %(date)s",
parameters={'date': context['ds']}
)
return len(df)
|
8. Airflow 실전 운영
8.1 DAG 설계 패턴
패턴 1: 작은 Task로 분리
1
2
3
4
5
6
7
8
| ❌ 하나의 거대한 Task:
[do_everything] ← 1시간짜리 단일 Task
→ 중간 실패 시 처음부터 다시 실행
✓ 작은 Task로 분리:
[extract] → [validate] → [transform] → [load] → [notify]
→ 실패한 Task만 재실행
→ 각 Task 소요 시간/상태 명확
|
패턴 2: 멱등성 보장 + 날짜 파라미터
1
2
3
4
5
6
7
8
9
10
11
12
| # ✓ 멱등한 DAG — 날짜 파라미터 활용
def process_orders(**context):
execution_date = context['ds'] # '2026-04-13'
# DELETE-INSERT 패턴으로 멱등성 보장
hook = PostgresHook('orders_db')
hook.run(f"DELETE FROM daily_summary WHERE summary_date = '{execution_date}'")
# ... INSERT ...
# ❌ 안티패턴: datetime.now() 사용
def bad_process():
today = datetime.now().date() # 백필 시 잘못된 날짜!
|
날짜 파라미터 이해
1
2
3
4
5
6
7
8
9
10
| ★ Airflow 날짜 파라미터 — 가장 혼동하는 부분!
스케줄: 매일 02:00 / start_date: 2026-04-01
실행 시각 │ logical_date (ds) │ data_interval
2026-04-02 02:00 │ 2026-04-01 │ 04-01 ~ 04-02
2026-04-03 02:00 │ 2026-04-02 │ 04-02 ~ 04-03
2026-04-04 02:00 │ 2026-04-03 │ 04-03 ~ 04-04
→ {{ ds }}는 "처리 대상 날짜" (실행 날짜보다 1일 전)
|
실무 팁: Airflow 2.x부터 execution_date 대신 logical_date와 data_interval_start/end를 사용한다. 새 프로젝트는 data_interval_start를 사용하는 것이 직관적이다.
8.2 에러 핸들링과 재시도
1
2
3
4
5
6
| ★ Airflow 에러 핸들링 계층
Level 1: Task 자동 재시도 — retries=3, retry_delay
Level 2: 콜백 알림 — on_failure_callback → Slack/Email
Level 3: trigger_rule — 조건부 실행 (all_done, one_failed 등)
Level 4: SLA 미달 알림 — sla=timedelta(hours=2)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| # ★ 에러 핸들링 종합 예시
def on_failure(context):
ti = context['ti']
message = (
f":red_circle: *배치 실패*\n"
f"DAG: `{context['dag'].dag_id}`\n"
f"Task: `{ti.task_id}`\n"
f"Error: `{context.get('exception', 'Unknown')}`\n"
f"<{ti.log_url}|로그 확인>"
)
send_slack_message(message)
with DAG('settlement_dag', ...) as dag:
process = PythonOperator(
task_id='process_settlement',
python_callable=process_settlement,
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
on_failure_callback=on_failure,
execution_timeout=timedelta(hours=2),
)
# 성공/실패 무관하게 항상 실행
cleanup = PythonOperator(
task_id='cleanup',
python_callable=cleanup,
trigger_rule='all_done',
)
process >> cleanup
|
trigger_rule 종류:
| trigger_rule |
실행 조건 |
용도 |
all_success (기본) |
모든 상위 Task 성공 |
일반적인 순차 실행 |
all_failed |
모든 상위 Task 실패 |
전체 실패 시 보상 작업 |
all_done |
모든 상위 Task 완료 (성공/실패 무관) |
리소스 정리 |
one_success |
하나라도 성공 |
병렬 Task 중 하나만 성공하면 됨 |
one_failed |
하나라도 실패 |
실패 알림 |
none_failed_min_one_success |
실패 없음 + 최소 1개 성공 |
브랜치 후 합류 |
8.3 모니터링
1
2
3
4
5
6
7
8
9
| ★ Airflow 모니터링 아키텍처
Airflow Scheduler/Worker
│ StatsD/Prometheus
▼
Prometheus → Grafana Dashboard
│
▼
AlertManager → Slack / PagerDuty
|
| 메트릭 |
설명 |
위험 임계치 |
dag_processing.total_parse_time |
DAG 파싱 소요 시간 |
> 30초 |
scheduler.tasks.running |
현재 실행 중인 Task 수 |
Worker 슬롯의 90% |
scheduler.tasks.starving |
실행 대기 중인 Task 수 |
지속적으로 > 0 |
dagrun.duration.success |
DAG 전체 실행 시간 |
SLA의 80% |
zombie_tasks_killed |
좀비 Task 종료 횟수 |
> 0 (잦으면 Worker 불안정) |
pool.open_slots |
Pool 사용 가능 슬롯 |
0이면 병목 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| # ★ SLA 설정 예시
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
task_names = [t.task_id for t in task_list]
send_slack_message(
f":warning: *SLA 위반!*\n"
f"DAG: `{dag.dag_id}`\n"
f"Tasks: {task_names}"
)
with DAG('daily_settlement', sla_miss_callback=sla_miss_callback, ...) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_fn,
sla=timedelta(hours=1), # 1시간 이내 완료되어야 함
)
|
8.4 Variable / Connection 관리
1
2
3
4
5
6
7
8
9
10
11
| ★ Variable 사용 시 주의사항
❌ 안티패턴: 모듈 레벨에서 호출 → DAG 파싱마다 DB 조회!
API_URL = Variable.get("api_url")
✓ 올바른 패턴 1: Task 실행 시점에 조회
def my_task(**context):
api_url = Variable.get("api_url")
✓ 올바른 패턴 2: Jinja 템플릿 (파싱 시 DB 조회 안 함)
bash_command='curl {{ var.value.api_url }}/endpoint'
|
Secrets Backend (프로덕션 권장):
1
2
3
4
5
| ★ Secrets 우선순위
1. 환경변수: AIRFLOW_CONN_MY_DB=postgresql://...
2. Secrets Backend: AWS Secrets Manager / HashiCorp Vault (최우선 권장)
3. Metadata DB: Airflow UI에서 설정 (비권장)
|
8.5 대규모 DAG 운영 노하우
DAG 파싱 성능 최적화:
1
2
3
4
5
6
7
8
| # ❌ 안티패턴: 모듈 레벨에서 무거운 import
import pandas as pd # DAG 파싱마다 import 발생
import tensorflow as tf
# ✓ 올바른 패턴: 함수 내에서 import
def heavy_processing(**context):
import pandas as pd # Task 실행 시에만 import
df = pd.read_sql(...)
|
Pool로 리소스 제어:
1
2
3
4
5
6
7
8
9
| ★ Pool — 동시 실행 Task 수 제한
Pool: "db_connection_pool" (slots=10)
[Task A] ── slot 1
[Task B] ── slot 2
...
[Task J] ── slot 10
[Task K] ── 대기 (slot 없음)
→ DB 커넥션 수 제한 등에 활용
|
1
2
3
4
5
6
7
| task = PythonOperator(
task_id='heavy_db_query',
python_callable=query_fn,
pool='db_connection_pool',
pool_slots=2,
priority_weight=10,
)
|
.airflowignore:
1
2
3
4
5
6
| # dags_folder에 위치 — 파싱 제외 대상
test_*.py
*_test.py
utils/
__pycache__/
deprecated_*.py
|
9. Spring Batch vs Airflow 비교
9.1 아키텍처 철학 차이
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| ★ 근본적인 차이: "프레임워크" vs "오케스트레이터"
Spring Batch: Airflow:
┌──────────────────┐ ┌──────────────────┐
│ 데이터를 직접 │ │ 작업을 조율하는 │
│ 읽고, 처리하고, │ │ 지휘자 │
│ 쓰는 전 과정을 │ │ 실제 처리는 │
│ 프레임워크가 지원 │ │ 외부 시스템이 수행 │
│ Read → Process │ │ 실행 순서/조건/ │
│ → Write │ │ 재시도/모니터링 │
│ 트랜잭션/체크포인트 │ │ 전담 │
└──────────────────┘ └──────────────────┘
비유:
Spring Batch = 공장의 조립 라인 (직접 만든다)
Airflow = 공장장 (어떤 라인을 언제 돌릴지 관리한다)
|
9.2 상세 비교표
| 비교 항목 |
Spring Batch |
Apache Airflow |
| 유형 |
배치 처리 프레임워크 |
워크플로우 오케스트레이터 |
| 언어 |
Java/Kotlin |
Python |
| 데이터 처리 |
직접 처리 (Reader/Processor/Writer) |
외부 시스템에 위임 |
| 트랜잭션 |
Spring TX 관리 (chunk 단위) |
없음 (각 Task가 자체 관리) |
| 체크포인트 |
내장 (ExecutionContext) |
없음 (Task 단위 재시도) |
| 재시작 |
실패 chunk부터 재개 |
실패 Task부터 재실행 |
| 스케줄링 |
외부 필요 (cron, Quartz) |
내장 (cron, timetable) |
| 의존성 관리 |
Step 간 순차/분기 (단일 Job 내) |
DAG로 복잡한 의존 관계 |
| 모니터링 |
Actuator + 외부 도구 |
내장 Web UI |
| 백필 |
수동 파라미터 변경 |
내장 (catchup, backfill CLI) |
| 확장성 |
Partitioning, 원격 청킹 |
Executor (Celery, K8s) |
| 에러 핸들링 |
Skip/Retry/faultTolerant |
retries/callback/trigger_rule |
| 학습 곡선 |
중간 (Spring 생태계 필요) |
낮음~중간 (Python 기반) |
| 운영 부담 |
낮음 (JVM 앱으로 배포) |
중간~높음 (Scheduler+Worker+DB+UI) |
9.3 사용 사례 매핑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| ★ 이런 상황에는 이것을 선택하라
Spring Batch:
✓ Java/Spring 기반 시스템에서 데이터 직접 처리
✓ 트랜잭션 정합성이 매우 중요 (정산, 결제)
✓ chunk 단위 체크포인트/재시작이 필요한 대용량
✓ 기존 Spring 인프라/팀 역량 활용
✓ Skip/Retry 등 세밀한 에러 핸들링 필요
Airflow:
✓ 여러 시스템/서비스를 조합하는 워크플로우
✓ ETL/ELT 파이프라인 (Spark, dbt 등 오케스트레이션)
✓ 복잡한 의존 관계 (DAG)가 있는 작업 체인
✓ Python 중심 데이터 팀
✓ 웹 UI 기반 모니터링/관리 필요
✓ 백필(과거 데이터 재처리)이 빈번
둘 다 사용 (하이브리드):
✓ Airflow로 전체 파이프라인 오케스트레이션
✓ Spring Batch Job은 Airflow에서 REST API로 트리거
✓ 대용량 처리는 Spring Batch, 흐름 관리는 Airflow
|
9.4 하이브리드 아키텍처
실무에서는 Spring Batch와 Airflow를 함께 사용하는 하이브리드가 매우 효과적이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| ★ 하이브리드 아키텍처 — Airflow + Spring Batch
Airflow (오케스트레이터)
│
├─ Task 1: [데이터 검증 (Python)]
│
├─ Task 2: [Spring Batch Job 트리거 (HTTP)]
│ POST /batch/jobs/settlement
│ → JobLauncher.run(job, params)
│ → 대용량 정산 처리 (chunk 기반)
│
├─ Task 3: [완료 대기 (HttpSensor)]
│ → Spring Batch Job 상태 폴링
│ → COMPLETED 확인
│
├─ Task 4: [후속 처리 (Python/Spark)]
└─ Task 5: [알림 (Slack)]
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| // ★ Spring Batch REST API 엔드포인트
@RestController
@RequestMapping("/batch/jobs")
public class BatchJobController {
private final JobLauncher jobLauncher;
private final Job settlementJob;
private final JobExplorer jobExplorer;
@PostMapping("/settlement")
public ResponseEntity<Map<String, Object>> runSettlement(
@RequestParam String date) {
JobParameters params = new JobParametersBuilder()
.addLocalDate("date", LocalDate.parse(date))
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncher.run(settlementJob, params);
return ResponseEntity.ok(Map.of(
"executionId", execution.getId(),
"status", execution.getStatus().name()
));
}
@GetMapping("/settlement/status/{executionId}")
public ResponseEntity<Map<String, Object>> getStatus(
@PathVariable Long executionId) {
JobExecution execution = jobExplorer.getJobExecution(executionId);
return ResponseEntity.ok(Map.of(
"executionId", executionId,
"status", execution.getStatus().name(),
"exitCode", execution.getExitStatus().getExitCode()
));
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| # ★ Airflow에서 Spring Batch Job 트리거
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
trigger_batch = SimpleHttpOperator(
task_id='trigger_settlement_batch',
http_conn_id='spring_batch_api',
endpoint='/batch/jobs/settlement',
method='POST',
data=json.dumps({'date': '{{ ds }}'}),
headers={'Content-Type': 'application/json'},
response_filter=lambda resp: json.loads(resp.text)['executionId'],
do_xcom_push=True,
)
wait_batch = HttpSensor(
task_id='wait_settlement_complete',
http_conn_id='spring_batch_api',
endpoint='/batch/jobs/settlement/status/{{ ti.xcom_pull(task_ids="trigger_settlement_batch") }}',
response_check=lambda resp: json.loads(resp.text)['status'] == 'COMPLETED',
poke_interval=60,
timeout=7200,
mode='reschedule',
)
trigger_batch >> wait_batch >> downstream_tasks
|
10. 배치 처리 장애 시나리오 & 트러블슈팅
10.1 OOM 문제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| ★ OOM 발생 원인과 해결 — 기술별
Spring Batch:
원인 1: JpaPagingItemReader가 영속성 컨텍스트에 모든 엔티티 캐싱
해결: entityManager.clear() 호출 또는 JdbcPagingItemReader 전환
원인 2: chunk-size가 너무 큼
해결: chunk-size 축소 (10000 → 1000)
원인 3: ItemProcessor에서 List에 데이터 축적
해결: 상태 없는 Processor 설계
Python:
원인 1: pd.read_sql()로 전체 테이블 로드
해결: chunksize 파라미터 사용
원인 2: 리스트에 결과를 계속 append
해결: Generator 사용, chunk 단위 처리
Airflow Worker:
원인 1: Task 내에서 대량 데이터 메모리 처리
해결: 외부 시스템(Spark, dbt)에 처리 위임
원인 2: DAG 파싱 시 무거운 라이브러리 import
해결: 함수 내 지연 import
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // ★ Spring Batch JPA OOM 해결 — 영속성 컨텍스트 클리어
@Bean
public Step jpaStep(JobRepository jobRepository,
PlatformTransactionManager tm) {
return new StepBuilder("jpaStep", jobRepository)
.<Order, Settlement>chunk(1000, tm)
.reader(jpaPagingReader())
.processor(processor())
.writer(writer())
.listener(new ChunkListener() {
@Autowired private EntityManager entityManager;
@Override
public void afterChunk(ChunkContext context) {
entityManager.clear(); // ★ chunk마다 영속성 컨텍스트 클리어!
}
})
.build();
}
|
10.2 데드락
1
2
3
4
5
6
7
8
9
| ★ 멀티스레드 배치에서의 DB 데드락
Thread-1: UPDATE accounts SET balance = balance - 100 WHERE id = A (A 락 획득)
Thread-1: UPDATE accounts SET balance = balance + 100 WHERE id = B (B 대기...)
Thread-2: UPDATE accounts SET balance = balance - 50 WHERE id = B (B 락 획득)
Thread-2: UPDATE accounts SET balance = balance + 50 WHERE id = A (A 대기...)
→ Thread-1은 B를 기다리고, Thread-2는 A를 기다림 → 데드락! 💀
|
| 전략 |
설명 |
| 락 순서 고정 |
항상 ID 오름차순으로 락 획득 |
| Partitioning |
파티션 간 데이터 겹침 방지 |
| 낙관적 락 |
JPA @Version 컬럼으로 충돌 감지 |
| Retry |
데드락 발생 시 자동 재시도 |
| 격리 수준 조정 |
SERIALIZABLE 회피, READ_COMMITTED 사용 |
1
2
3
4
5
6
7
8
9
10
11
12
| // ★ 데드락 방지 — Partitioning + Retry 조합
@Bean
public Step safeStep(JobRepository jobRepository, PlatformTransactionManager tm) {
return new StepBuilder("safeStep", jobRepository)
.<Account, Account>chunk(500, tm)
.reader(partitionedReader(null, null)) // ID 범위 분할
.writer(accountWriter())
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
|
10.3 느린 배치 진단과 최적화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| ★ 느린 배치 진단 플로우차트
배치가 느리다!
│
├─ Read가 느림?
│ ├─ 인덱스 누락 → CREATE INDEX
│ ├─ 전체 테이블 스캔 → WHERE 조건 추가
│ └─ fetchSize 미설정 → fetchSize 증가
│
├─ Process가 느림?
│ ├─ 건별 API 호출 → 벌크 API로 전환
│ ├─ 복잡한 계산 → 캐싱 / 사전 계산
│ └─ N+1 쿼리 → JOIN / 배치 조회
│
└─ Write가 느림?
├─ 건별 INSERT → 벌크 INSERT
├─ 인덱스 과다 → 배치 전 인덱스 비활성화
└─ auto-commit → 배치 커밋
|
| 증상 |
원인 |
해결 |
효과 |
| Read 시간 > 60% |
인덱스 누락 |
인덱스 추가 |
10~100배 |
| Write 시간 > 60% |
건별 INSERT |
JDBC Batch + rewriteBatchedStatements |
5~20배 |
| 특정 chunk만 느림 |
데이터 편향 |
균등 파티셔닝 |
2~5배 |
| 전체적으로 느림 |
싱글 스레드 |
Partitioning (8~16 파티션) |
5~10배 |
| GC 빈번 |
chunk-size 과대 |
chunk-size 축소, 힙 증가 |
2~3배 |
10.4 중복 실행 방지
1
2
3
4
5
6
7
8
9
10
11
| ★ 중복 실행 방지 전략
Spring Batch:
① JobParameters 고유성 (같은 파라미터 재실행 차단)
② ShedLock (분산 락)
③ DB 기반 분산 락 (SELECT FOR UPDATE)
Airflow:
① max_active_runs=1 (DAG당 동시 실행 1개)
② depends_on_past=True (이전 성공 후 실행)
③ Pool (동시 Task 수 제한)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| // ★ DB 기반 분산 락
@Component
public class DistributedLockRunner {
private final JdbcTemplate jdbcTemplate;
private final JobLauncher jobLauncher;
public JobExecution runWithLock(Job job, JobParameters params) {
String lockKey = job.getName() + "_" + params.getString("date");
boolean acquired = jdbcTemplate.update(
"INSERT INTO batch_lock (lock_key, locked_at, locked_by) " +
"VALUES (?, NOW(), ?) " +
"ON DUPLICATE KEY UPDATE lock_key = lock_key",
lockKey, InetAddress.getLocalHost().getHostName()
) > 0;
if (!acquired) {
throw new BatchLockException("이미 실행 중: " + lockKey);
}
try {
return jobLauncher.run(job, params);
} finally {
jdbcTemplate.update("DELETE FROM batch_lock WHERE lock_key = ?", lockKey);
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
| # ★ Airflow 중복 실행 방지
with DAG(
'daily_settlement',
max_active_runs=1, # ★ DAG 동시 실행 1개
catchup=False,
...
) as dag:
task = PythonOperator(
task_id='settle',
depends_on_past=True, # ★ 이전 실행 성공 후에만 실행
...
)
|
10.5 배치 안티패턴 모음
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| ★ 배치 안티패턴 TOP 7
1. 🚫 "만능 배치" — 하나의 Job에 모든 로직
→ 하나 실패하면 전체 실패
✓ 역할별로 Job 분리
2. 🚫 "날짜 하드코딩" — LocalDate.now() / datetime.now()
→ 백필/재실행 시 잘못된 날짜
✓ JobParameters / Airflow 로 날짜 주입
3. 🚫 "트랜잭션 무시" — 전체를 하나의 TX로
→ 실패 시 전체 롤백, 메모리 폭발
✓ chunk 단위 트랜잭션
4. 🚫 "로깅 없는 배치" — print()만 또는 로그 없음
→ 장애 시 원인 추적 불가
✓ 구조화된 로깅 + 처리 건수/시간 기록
5. 🚫 "멱등성 없는 배치" — INSERT만 사용
→ 재실행 시 데이터 중복
✓ UPSERT 또는 DELETE-INSERT 패턴
6. 🚫 "알림 없는 배치" — 실패해도 아무도 모름
→ 다음날 발견
✓ 성공/실패 알림 필수 (Slack, Email)
7. 🚫 "테스트 없는 배치" — 프로덕션이 첫 테스트
✓ 소량 데이터로 통합 테스트
✓ Step 단위 단위 테스트
✓ 스테이징 환경에서 검증
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| // ★ Spring Batch 테스트 코드 예시
@SpringBatchTest
@SpringBootTest
class SettlementStepTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
void 정산_Step_정상_처리() {
// 테스트 데이터 준비
jdbcTemplate.update(
"INSERT INTO orders (order_id, seller_id, amount, status, order_date) " +
"VALUES (1, 'S001', 10000, 'COMPLETED', '2026-04-13')");
jdbcTemplate.update(
"INSERT INTO orders (order_id, seller_id, amount, status, order_date) " +
"VALUES (2, 'S001', 20000, 'COMPLETED', '2026-04-13')");
// Step 실행
JobExecution execution = jobLauncherTestUtils.launchStep("settlementStep",
new JobParametersBuilder()
.addLocalDate("date", LocalDate.of(2026, 4, 13))
.toJobParameters());
// 검증
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
Map<String, Object> settlement = jdbcTemplate.queryForMap(
"SELECT * FROM settlement WHERE seller_id = 'S001'");
assertThat(settlement.get("amount")).isEqualTo(new BigDecimal("30000"));
}
@Test
void 멱등성_검증_두번_실행해도_같은_결과() {
// ... 데이터 준비 ...
jobLauncherTestUtils.launchStep("settlementStep", params());
jobLauncherTestUtils.launchStep("settlementStep", params());
int count = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM settlement WHERE seller_id = 'S001'", Integer.class);
assertThat(count).isEqualTo(1); // 중복 없이 1건만 존재
}
}
|
마무리: 배치 처리 심화 가이드 전체 요약
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| ★ 2부작 전체 요약
┌──────────────────────────────────────────────────────┐
│ Part 1 — 배치 기본 + Spring Batch 완전 정복 │
│ ① 배치 = 데이터를 모아서 한꺼번에 처리 │
│ ② 핵심 원칙: 멱등성, 재시작, Chunk, Skip/Retry │
│ ③ Job → Step → Chunk(R/P/W) 아키텍처 │
│ ④ 6개 메타데이터 테이블로 상태 관리 │
│ ⑤ 멀티스레드/파티셔닝으로 병렬화 │
│ ⑥ ShedLock / 모니터링 / 성능 튜닝 │
├──────────────────────────────────────────────────────┤
│ Part 2 — Python + Airflow + 비교 + 트러블슈팅 │
│ ⑦ Python: pandas chunksize + multiprocessing │
│ ⑧ Airflow: DAG/Operator/Executor/XCom │
│ ⑨ Airflow 운영: 멱등 DAG, SLA, Pool, 모니터링 │
│ ⑩ Spring Batch = 프레임워크, Airflow = 오케스트레이터 │
│ ⑪ 하이브리드: Airflow → Spring Batch REST 트리거 │
│ ⑫ OOM/데드락/느린 배치 진단 + 안티패턴 7선 │
└──────────────────────────────────────────────────────┘
★ 기술 선택 요약
"Java 기반 대용량 데이터 직접 처리" → Spring Batch
"Python 기반 워크플로우 오케스트레이션" → Airflow
"대규모 파이프라인 + 정밀한 데이터 처리" → Airflow + Spring Batch 하이브리드
|
배치 처리는 화려하지 않지만, 기업의 데이터 처리 인프라에서 가장 중요한 기둥 중 하나이다. 정산이 1원이라도 틀리면 안 되고, 보고서가 하루라도 늦으면 안 되며, 데이터 이관이 누락되면 분석 전체가 틀어진다. 이 가이드가 배치 시스템을 더 견고하고 효율적으로 만드는 데 도움이 되길 바란다.