오픈소스 기반 Data Lake는 클라우드 벤더 종속을 피하면서도 엔터프라이즈급 기능을 제공합니다.
📋 목차
- 아키텍처 개요 및 컴포넌트 역할
- 데이터 흐름 상세 분석
- Windows 11 로컬 환경 구축 가이드
- 엔터프라이즈 확장 전략
- Iceberg 버전 관리 및 타임 트래블
- ML 파이프라인 연계 아키텍처
- Kubernetes 기반 확장 설계
- React UI 연동 구현
- 운영 및 모니터링 가이드
1. 아키텍처 개요 및 컴포넌트 역할
1.1 전체 아키텍처 다이어그램
┌─────────────────────────────────────────────────────────────┐
│ Data Lake Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────────┐ │
│ │ Clients │ │ React UI │ │ MLflow │ │
│ │ (Python/SQL) │ │ Dashboard │ │ Tracking │ │
│ └──────┬───────┘ └──────┬───────┘ └─────┬─────┘ │
│ │ │ │ │
│ └──────────┬──────────┴─────────────────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ Trino Cluster │ ← Query Engine │
│ │ (Distributed SQL) │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ Apache Iceberg │ ← Table Format │
│ │ (Metadata Layer) │ │
│ │ - Snapshot Mgmt │ │
│ │ - Schema Evolution │ │
│ │ - Time Travel │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ MinIO │ ← Object Storage │
│ │ (S3-Compatible) │ │
│ │ - Data Files │ │
│ │ - Metadata Files │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
1.2 각 컴포넌트의 실무 역할
MinIO (Storage Layer)
핵심 역할:
- 객체 스토리지 제공: S3 API 호환 인터페이스로 무제한 확장 가능
- 데이터 지속성: Parquet, ORC, Avro 등 컬럼형 포맷 저장
- 버전 관리 지원: 객체 수준 버전 관리 (Iceberg와 별개)
실무 관점:
실제 저장 구조:
s3://lakehouse/
├── warehouse/
│ ├── db1.db/
│ │ ├── table1/
│ │ │ ├── data/
│ │ │ │ ├── part-00000.parquet
│ │ │ │ ├── part-00001.parquet
│ │ │ └── metadata/
│ │ │ ├── snap-12345-1-abc.avro
│ │ │ ├── v1.metadata.json
│ │ │ └── version-hint.text
성능 특성:
- 처리량: 단일 노드 10-20 GB/s (NVMe SSD 기준)
- 지연시간: 첫 바이트까지 < 10ms
- 확장성: 페타바이트급 지원
Apache Iceberg (Table Format Layer)
핵심 역할:
- 메타데이터 관리: 스냅샷, 파티션, 스키마를 JSON/Avro로 추적
- ACID 트랜잭션: Optimistic Concurrency Control 기반
- 스키마 진화: 컬럼 추가/삭제/변경 without 재작성
메타데이터 구조:
{
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://lakehouse/warehouse/db1.db/sales",
"last-sequence-number": 34,
"last-updated-ms": 1609459200000,
"last-column-id": 5,
"schemas": [...],
"current-snapshot-id": 3055729675574597004,
"snapshots": [
{
"snapshot-id": 3055729675574597004,
"timestamp-ms": 1609459200000,
"manifest-list": "s3://lakehouse/.../snap-3055729675574597004-1-abc.avro"
}
]
}
실무 장점:
- 히든 파티셔닝: 사용자는 파티션 의식 불필요
- 증분 읽기: 마지막 스냅샷 이후 변경분만 조회
- 시간 여행: 과거 임의 시점 데이터 조회
Trino (Query Engine)
핵심 역할:
- 분산 SQL 실행: MPP (Massively Parallel Processing)
- 다중 데이터 소스 연합: Iceberg + PostgreSQL + MySQL 동시 조인
- 벡터화 실행: SIMD 명령어 활용 고속 처리
쿼리 실행 흐름:
1. SQL 파싱 → AST 생성
2. 논리 계획 최적화 (Predicate Pushdown, Projection Pruning)
3. 물리 계획 생성 (분산 실행 계획)
4. Iceberg 메타데이터 조회 → 파일 목록 획득
5. MinIO에서 Parquet 파일 스캔 (병렬)
6. 컬럼형 벡터 처리 → 결과 집계
7. 클라이언트 반환
성능 최적화:
- Predicate Pushdown: WHERE 조건을 파일 스캔 단계에 적용
- Partition Pruning: 불필요한 파티션 스킵
- Dynamic Filtering: 조인 시 런타임 필터 생성
2. 데이터 흐름 상세 분석
2.1 데이터 적재 (Ingestion) 흐름
# Python에서 Iceberg 테이블에 데이터 적재
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("IcebergWriter") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.2") \
.config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.lakehouse.type", "hadoop") \
.config("spark.sql.catalog.lakehouse.warehouse", "s3a://lakehouse/warehouse") \
.config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
.config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
# 데이터 생성
df = spark.createDataFrame([
(1, "Alice", 30, "2024-01-15"),
(2, "Bob", 35, "2024-01-15"),
(3, "Charlie", 28, "2024-01-16")
], ["id", "name", "age", "date"])
# Iceberg 테이블 생성 및 적재
df.writeTo("lakehouse.db1.users") \
.using("iceberg") \
.partitionedBy("date") \
.createOrReplace()
내부 동작:
- Spark가 데이터를 Parquet 파일로 변환
- MinIO에 파일 업로드 (s3a://lakehouse/warehouse/db1.db/users/data/)
- Iceberg가 스냅샷 메타데이터 생성
- 메타데이터도 MinIO에 저장
2.2 데이터 조회 (Query) 흐름
-- Trino에서 실행
SELECT name, age
FROM lakehouse.db1.users
WHERE date = '2024-01-15' AND age > 30;
실행 계획:
Fragment 0 [SINGLE]
Output[name, age]
└─ Filter[age > 30]
└─ TableScan[iceberg.lakehouse.db1.users]
Partition Filter: date = '2024-01-15'
Predicate: age > 30
Files: 1 (125.5 KB)
Rows: 2
상세 흐름:
- 메타데이터 조회: Trino가 MinIO에서 v1.metadata.json 읽기
- 파티션 프루닝: date = '2024-01-15' 파티션만 선택
- 파일 필터링: Parquet 파일의 통계(min/max) 확인
- 데이터 스캔: 선택된 파일만 읽어서 필터 적용
- 결과 반환: 2건의 행만 반환
2.3 업데이트/삭제 흐름 (Copy-on-Write)
-- Trino에서 UPDATE 실행
UPDATE lakehouse.db1.users
SET age = 36
WHERE name = 'Bob';
Iceberg의 ACID 구현:
1. 기존 스냅샷 확인 (snapshot-3055729675574597004)
2. 영향받는 파일 식별 (part-00001.parquet)
3. 해당 파일 읽어서 변경 적용 → 새 파일 생성 (part-00002.parquet)
4. 새 스냅샷 생성 (snapshot-3055729675574597005)
- 기존 파일 제외 + 새 파일 포함
5. 메타데이터 원자적 업데이트 (v2.metadata.json 생성)
중요: 기존 파일(part-00001.parquet)은 삭제되지 않음 → 타임 트래블 가능
3. Windows 11 로컬 환경 구축 가이드
3.1 사전 요구사항
# 1. Docker Desktop 설치 (WSL 2 백엔드)
# https://www.docker.com/products/docker-desktop/
# 2. 시스템 요구사항 확인
# - RAM: 최소 16GB (권장 32GB)
# - CPU: 4코어 이상
# - 디스크: 50GB 여유 공간
# 3. WSL 2 활성화 확인
wsl --list --verbose
# 출력: * docker-desktop-data Running 2
3.2 Docker Compose 구성
디렉토리 구조:
C:\datalake\
├── docker-compose.yml
├── configs/
│ ├── trino/
│ │ ├── catalog/
│ │ │ └── iceberg.properties
│ │ └── config.properties
│ └── minio/
│ └── policy.json
├── data/
│ ├── minio/
│ └── trino/
└── scripts/
└── init-minio.sh
docker-compose.yml
version: '3.9'
services:
# MinIO 객체 스토리지
minio:
image: minio/minio:RELEASE.2024-01-11T07-46-16Z
container_name: minio
ports:
- "9000:9000" # API
- "9001:9001" # Console
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin123
MINIO_DOMAIN: minio
command: server /data --console-address ":9001"
volumes:
- ./data/minio:/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
networks:
- datalake
# MinIO 초기화 (버킷 생성)
minio-init:
image: minio/mc:RELEASE.2024-01-11T05-49-32Z
container_name: minio-init
depends_on:
- minio
entrypoint: >
/bin/sh -c "
sleep 10;
/usr/bin/mc alias set myminio http://minio:9000 minioadmin minioadmin123;
/usr/bin/mc mb --ignore-existing myminio/lakehouse;
/usr/bin/mc mb --ignore-existing myminio/iceberg;
/usr/bin/mc anonymous set download myminio/lakehouse;
exit 0;
"
networks:
- datalake
# Trino Coordinator
trino:
image: trinodb/trino:435
container_name: trino
ports:
- "8080:8080"
volumes:
- ./configs/trino/catalog:/etc/trino/catalog
- ./configs/trino/config.properties:/etc/trino/config.properties
environment:
- JAVA_OPTS=-Xmx8G
depends_on:
- minio
networks:
- datalake
# Hive Metastore (Iceberg 카탈로그용)
hive-metastore:
image: apache/hive:3.1.3
container_name: hive-metastore
ports:
- "9083:9083"
environment:
SERVICE_NAME: metastore
DB_DRIVER: postgres
SERVICE_OPTS: "-Djavax.jdo.option.ConnectionDriverName=org.postgresql.Driver
-Djavax.jdo.option.ConnectionURL=jdbc:postgresql://postgres:5432/metastore
-Djavax.jdo.option.ConnectionUserName=hive
-Djavax.jdo.option.ConnectionPassword=hivepassword"
depends_on:
- postgres
networks:
- datalake
# PostgreSQL (Hive Metastore용)
postgres:
image: postgres:15-alpine
container_name: postgres
ports:
- "5432:5432"
environment:
POSTGRES_USER: hive
POSTGRES_PASSWORD: hivepassword
POSTGRES_DB: metastore
volumes:
- ./data/postgres:/var/lib/postgresql/data
networks:
- datalake
# Spark (데이터 적재용)
spark:
image: bitnami/spark:3.5.0
container_name: spark
ports:
- "8888:8888" # Jupyter
- "4040:4040" # Spark UI
environment:
- SPARK_MODE=master
- SPARK_MASTER_HOST=spark
- SPARK_MASTER_PORT=7077
volumes:
- ./notebooks:/opt/notebooks
- ./configs/spark:/opt/bitnami/spark/conf
command: >
bash -c "pip install jupyterlab pyspark pandas pyarrow &&
jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root
--NotebookApp.token='' --NotebookApp.password=''"
networks:
- datalake
networks:
datalake:
driver: bridge
3.3 Trino 설정 파일
configs/trino/config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery.uri=http://localhost:8080
# 메모리 설정
query.max-memory=5GB
query.max-memory-per-node=2GB
query.max-total-memory-per-node=3GB
# 동시 실행
task.concurrency=4
task.max-worker-threads=8
configs/trino/catalog/iceberg.properties
connector.name=iceberg
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://hive-metastore:9083
# MinIO 연결 설정
fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.path-style-access=true
s3.aws-access-key=minioadmin
s3.aws-secret-key=minioadmin123
# Iceberg 설정
iceberg.file-format=PARQUET
iceberg.compression-codec=SNAPPY
3.4 환경 구축 실행
# 1. 프로젝트 디렉토리 생성
cd C:\
mkdir datalake
cd datalake
# 2. 설정 파일 생성 (위 내용 복사)
mkdir -p configs\trino\catalog
mkdir -p data\minio
mkdir -p notebooks
# 3. Docker Compose 실행
docker-compose up -d
# 4. 로그 확인
docker-compose logs -f
# 5. 서비스 상태 확인
docker-compose ps
접속 확인:
- MinIO Console: http://localhost:9001 (minioadmin / minioadmin123)
- Trino Web UI: http://localhost:8080
- Jupyter Lab: http://localhost:8888
3.5 초기 데이터 적재 테스트
Jupyter Notebook에서 실행:
from pyspark.sql import SparkSession
# Spark 세션 생성
spark = SparkSession.builder \
.appName("IcebergTest") \
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2,"
"org.apache.hadoop:hadoop-aws:3.3.4") \
.config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.lakehouse.type", "hive") \
.config("spark.sql.catalog.lakehouse.uri", "thrift://hive-metastore:9083") \
.config("spark.sql.catalog.lakehouse.warehouse", "s3a://lakehouse/warehouse") \
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
.config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
# 테스트 데이터 생성
data = [
(1, "Product A", 100, "Electronics", "2024-01-15"),
(2, "Product B", 200, "Clothing", "2024-01-15"),
(3, "Product C", 150, "Electronics", "2024-01-16"),
(4, "Product D", 300, "Home", "2024-01-16"),
]
df = spark.createDataFrame(data,
["id", "product_name", "price", "category", "sale_date"])
# Iceberg 테이블 생성
spark.sql("CREATE DATABASE IF NOT EXISTS lakehouse.sales_db")
df.writeTo("lakehouse.sales_db.products") \
.using("iceberg") \
.partitionedBy("sale_date") \
.tableProperty("write.format.default", "parquet") \
.tableProperty("write.metadata.compression-codec", "gzip") \
.createOrReplace()
print("✅ Iceberg 테이블 생성 완료!")
# 데이터 확인
spark.sql("SELECT * FROM lakehouse.sales_db.products").show()
4. 엔터프라이즈 확장 전략
4.1 보안 강화 요소
4.1.1 인증 및 인가
현재 로컬 환경:
MinIO: 기본 계정 (minioadmin)
Trino: 인증 없음
엔터프라이즈 요구사항:
계층 로컬 환경 엔터프라이즈 환경
| 사용자 인증 | 없음 | LDAP/Active Directory 연동 |
| SSO | 미지원 | OAuth 2.0 / SAML 2.0 |
| API 인증 | 기본키 | IAM Role / JWT Token |
| 네트워크 | HTTP | TLS 1.3 암호화 |
| 감사 로그 | 없음 | 모든 쿼리/접근 기록 |
Trino 엔터프라이즈 인증 설정:
# configs/trino/config.properties
http-server.authentication.type=PASSWORD
http-server.https.enabled=true
http-server.https.port=8443
http-server.https.keystore.path=/etc/trino/keystore.jks
http-server.https.keystore.key=changeit
# LDAP 인증
http-server.authentication.password-authenticator=ldap
authentication.ldap.url=ldaps://ldap.company.com:636
authentication.ldap.user-bind-pattern=uid=${USER},ou=users,dc=company,dc=com
MinIO IAM 정책 예제:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::lakehouse/warehouse/sales_db/*",
"Condition": {
"IpAddress": {
"aws:SourceIp": "10.0.0.0/8"
}
}
},
{
"Effect": "Deny",
"Action": "s3:DeleteObject",
"Resource": "*"
}
]
}
4.1.2 데이터 암호화
암호화 계층:
┌─────────────────────────────────────┐
│ 1. 전송 중 암호화 (TLS 1.3) │
│ - 클라이언트 ↔ Trino │
│ - Trino ↔ MinIO │
├─────────────────────────────────────┤
│ 2. 저장 암호화 (Encryption at Rest)│
│ - MinIO SSE-S3 (AES-256) │
│ - 또는 SSE-KMS (키 관리 분리) │
├─────────────────────────────────────┤
│ 3. 컬럼 수준 암호화 │
│ - PII 데이터 (주민번호, 카드번호)│
│ - Parquet 파일 내 암호화 컬럼 │
└─────────────────────────────────────┘
MinIO 서버 측 암호화 설정:
# KMS 기반 암호화 활성화
mc admin config set myminio sse-kms \
endpoint="https://kms.company.com" \
key_id="arn:aws:kms:region:account:key/key-id"
# 버킷 기본 암호화 설정
mc encrypt set sse-s3 myminio/lakehouse
4.1.3 네트워크 격리
DMZ 기반 3계층 아키텍처:
┌─────────────────────────────────────────────┐
│ Web/Application Tier │
│ (React UI, API Gateway) │
│ VLAN: 10.10.10.0/24 │
└──────────────┬──────────────────────────────┘
│ Firewall Rules
│ - Port 8080 (Trino) only
┌──────────────▼──────────────────────────────┐
│ Query/Processing Tier │
│ (Trino, Spark) │
│ VLAN: 10.10.20.0/24 │
└──────────────┬──────────────────────────────┘
│ Firewall Rules
│ - Port 9000 (MinIO) only
┌──────────────▼──────────────────────────────┐
│ Storage Tier │
│ (MinIO Cluster) │
│ VLAN: 10.10.30.0/24 │
│ - 외부 접근 완전 차단 │
└─────────────────────────────────────────────┘
4.2 거버넌스 구현
4.2.1 데이터 카탈로그 (Apache Atlas 연동)
아키텍처:
# docker-compose에 추가
atlas:
image: sburn/apache-atlas:2.3.0
ports:
- "21000:21000"
environment:
- ATLAS_OPTS=-Xmx2048m
volumes:
- ./configs/atlas:/apache-atlas-2.3.0/conf
# Trino-Atlas 연동
trino-atlas-connector:
image: custom/trino-atlas-hook
environment:
ATLAS_ENDPOINT: http://atlas:21000
ATLAS_USERNAME: admin
ATLAS_PASSWORD: admin
메타데이터 자동 등록:
# Iceberg 테이블 생성 시 Atlas에 자동 등록
from pyatlas import AtlasClient
atlas = AtlasClient("http://atlas:21000", "admin", "admin")
# 테이블 엔티티 생성
table_entity = {
"typeName": "iceberg_table",
"attributes": {
"name": "sales_db.products",
"qualifiedName": "lakehouse.sales_db.products@prod",
"owner": "data-team",
"description": "제품 판매 데이터",
"columns": [
{"name": "id", "type": "int", "isPII": False},
{"name": "product_name", "type": "string", "isPII": False},
{"name": "price", "type": "int", "isPII": False}
],
"classifications": ["CONFIDENTIAL", "SALES_DATA"]
}
}
atlas.create_entity(table_entity)
4.2.2 데이터 품질 관리 (Great Expectations)
import great_expectations as gx
# 데이터 품질 검증
context = gx.get_context()
# Expectation Suite 정의
suite = context.create_expectation_suite("sales_validation")
# 검증 규칙
validator = context.get_validator(
batch_request={
"datasource_name": "iceberg_datasource",
"data_connector_name": "default",
"data_asset_name": "lakehouse.sales_db.products"
},
expectation_suite_name="sales_validation"
)
# 규칙 추가
validator.expect_column_values_to_not_be_null("id")
validator.expect_column_values_to_be_unique("id")
validator.expect_column_values_to_be_between("price", min_value=0, max_value=10000)
validator.expect_column_values_to_be_in_set("category",
["Electronics", "Clothing", "Home"])
# 검증 실행
results = validator.validate()
print(results)
4.2.3 접근 제어 (Row-Level Security)
Trino Row Filter 설정:
-- 부서별 데이터 접근 제한
CREATE OR REPLACE VIEW lakehouse.sales_db.products_secure AS
SELECT *
FROM lakehouse.sales_db.products
WHERE
CASE
WHEN current_user() = 'finance_team' THEN category IN ('Electronics', 'Home')
WHEN current_user() = 'sales_team' THEN TRUE
ELSE category = 'Clothing'
END;
-- 권한 부여
GRANT SELECT ON lakehouse.sales_db.products_secure TO ROLE analyst;
4.3 고가용성 (HA) 및 이중화
4.3.1 MinIO 클러스터 구성
Erasure Coding 기반 분산 배포:
# 4노드 분산 MinIO (8드라이브)
version: '3.9'
services:
minio1:
image: minio/minio
command: server http://minio{1...4}/data{1...2}
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: password
volumes:
- /mnt/disk1:/data1
- /mnt/disk2:/data2
minio2:
image: minio/minio
command: server http://minio{1...4}/data{1...2}
volumes:
- /mnt/disk3:/data1
- /mnt/disk4:/data2
minio3:
image: minio/minio
command: server http://minio{1...4}/data{1...2}
volumes:
- /mnt/disk5:/data1
- /mnt/disk6:/data2
minio4:
image: minio/minio
command: server http://minio{1...4}/data{1...2}
volumes:
- /mnt/disk7:/data1
- /mnt/disk8:/data2
# Load Balancer
nginx:
image: nginx:alpine
ports:
- "9000:9000"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
nginx.conf (MinIO 로드 밸런싱):
upstream minio_backend {
least_conn;
server minio1:9000;
server minio2:9000;
server minio3:9000;
server minio4:9000;
}
server {
listen 9000;
location / {
proxy_pass http://minio_backend;
proxy_set_header Host $host;
proxy_http_version 1.1;
proxy_set_header Connection "";
chunked_transfer_encoding off;
}
}
4.3.2 Trino 클러스터 확장
Coordinator + Worker 분리:
# Trino Coordinator (1대)
trino-coordinator:
image: trinodb/trino:435
ports:
- "8080:8080"
volumes:
- ./coordinator-config.properties:/etc/trino/config.properties
environment:
- COORDINATOR=true
- WORKERS=trino-worker-1:8080,trino-worker-2:8080,trino-worker-3:8080
# Trino Workers (3대)
trino-worker-1:
image: trinodb/trino:435
volumes:
- ./worker-config.properties:/etc/trino/config.properties
environment:
- COORDINATOR=false
- DISCOVERY_URI=http://trino-coordinator:8080
trino-worker-2:
image: trinodb/trino:435
# ... 동일 설정
trino-worker-3:
image: trinodb/trino:435
# ... 동일 설정
Autoscaling 정책 (Kubernetes 환경):
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: trino-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: trino-worker
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
4.4 재해 복구 (DR)
백업 전략:
# MinIO 원격 복제 설정
mc admin replicate add myminio \
--remote-bucket lakehouse-dr \
--remote-endpoint https://minio-dr.company.com \
--remote-access-key \
--remote-secret-key \
--replicate-deletes
# Iceberg 메타데이터 백업 (S3 Lifecycle)
mc ilm add myminio/lakehouse/warehouse --expiry-days 90 \
--transition-days 30 --storage-class GLACIER
RPO/RTO 목표:
구분 로컬 환경 엔터프라이즈 환경
| RPO | N/A | < 1시간 (증분 복제) |
| RTO | N/A | < 4시간 (자동 페일오버) |
| 백업 주기 | 수동 | 실시간 + 일 1회 전체 |
5. Apache Iceberg 버전 관리 및 타임 트래블
5.1 스냅샷 아키텍처
Iceberg 스냅샷 구조:
Timeline: ──────────────────────────────────────────►
t1 t2 t3 t4
│ │ │ │
Snapshot: S1 ────► S2 ────► S3 ────► S4 (current)
│ │ │ │
Files: F1 F1,F2 F2,F3 F3,F4,F5
│ │ │ │
Metadata: v1.json v2.json v3.json v4.json
각 스냅샷의 구성 요소:
- Manifest List: 변경된 파티션 목록
- Manifest Files: 실제 데이터 파일 목록
- Data Files: Parquet/ORC 파일
5.2 타임 트래블 실전 예제
5.2.1 기본 타임 트래블
-- 현재 데이터 조회
SELECT * FROM lakehouse.sales_db.products;
-- 결과: 4건 (최신 상태)
-- 특정 시점으로 조회 (스냅샷 ID)
SELECT * FROM lakehouse.sales_db.products
FOR VERSION AS OF 3055729675574597004;
-- 결과: 2건 (과거 스냅샷)
-- 특정 타임스탬프로 조회
SELECT * FROM lakehouse.sales_db.products
FOR TIMESTAMP AS OF TIMESTAMP '2024-01-15 10:30:00';
-- 결과: 해당 시점의 데이터
-- 스냅샷 히스토리 조회
SELECT * FROM lakehouse.sales_db."products$snapshots"
ORDER BY committed_at DESC;
출력 예시:
committed_at snapshot_id parent_id operation manifest_list
| 2024-01-16 14:30:00 | 3055729675574597005 | 3055729675574597004 | append | s3://…snap-005.avro |
| 2024-01-15 10:30:00 | 3055729675574597004 | NULL | append | s3://…snap-004.avro |
5.2.2 증분 읽기 (Incremental Read)
-- 특정 스냅샷 이후 변경분만 조회
SELECT * FROM lakehouse.sales_db.products
FOR VERSION AS OF 3055729675574597005
WHERE iceberg.snapshot_id > 3055729675574597004;
-- 시간 범위 기반 증분 조회
SELECT * FROM lakehouse.sales_db.products
FOR TIMESTAMP AS OF TIMESTAMP '2024-01-16 00:00:00'
WHERE sale_date BETWEEN '2024-01-15' AND '2024-01-16';
Python에서 증분 읽기:
# PySpark를 통한 증분 읽기
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# 마지막 처리 스냅샷 ID 저장
last_snapshot = 3055729675574597004
# 증분 데이터 읽기
df_incremental = spark.read \
.format("iceberg") \
.option("start-snapshot-id", last_snapshot) \
.option("end-snapshot-id", "current") \
.load("lakehouse.sales_db.products")
df_incremental.show()
# 새 스냅샷 ID 저장 (다음 실행용)
new_snapshot = spark.sql("""
SELECT snapshot_id
FROM lakehouse.sales_db.products.snapshots
ORDER BY committed_at DESC LIMIT 1
""").collect()[0][0]
print(f"Next start snapshot: {new_snapshot}")
5.3 스냅샷 롤백
5.3.1 특정 스냅샷으로 롤백
-- 롤백 전 현재 상태 확인
SELECT COUNT(*) FROM lakehouse.sales_db.products;
-- 결과: 4건
-- 스냅샷 히스토리 확인
SELECT snapshot_id, committed_at, operation
FROM lakehouse.sales_db."products$snapshots"
ORDER BY committed_at DESC;
-- 특정 스냅샷으로 롤백
CALL lakehouse.system.rollback_to_snapshot(
'sales_db.products',
3055729675574597004
);
-- 롤백 후 데이터 확인
SELECT COUNT(*) FROM lakehouse.sales_db.products;
-- 결과: 2건 (롤백됨)
5.3.2 타임스탬프 기반 롤백
-- 특정 시간으로 롤백
CALL lakehouse.system.rollback_to_timestamp(
'sales_db.products',
TIMESTAMP '2024-01-15 10:30:00'
);
5.3.3 롤백 안전장치 (태그 활용)
-- 중요 시점에 태그 생성
CALL lakehouse.system.create_tag(
'sales_db.products',
'v1.0-production',
3055729675574597004
);
-- 태그로 롤백 (더 안전)
CALL lakehouse.system.rollback_to_tag(
'sales_db.products',
'v1.0-production'
);
-- 태그 목록 조회
SELECT * FROM lakehouse.sales_db."products$tags";
5.4 스키마 진화 (Schema Evolution)
-- 컬럼 추가 (기존 데이터에 NULL 값)
ALTER TABLE lakehouse.sales_db.products
ADD COLUMN discount_rate DOUBLE;
-- 컬럼 이름 변경
ALTER TABLE lakehouse.sales_db.products
RENAME COLUMN product_name TO name;
-- 컬럼 타입 변경 (호환 가능한 타입만)
ALTER TABLE lakehouse.sales_db.products
ALTER COLUMN price TYPE BIGINT;
-- 컬럼 삭제 (논리적 삭제, 데이터는 유지)
ALTER TABLE lakehouse.sales_db.products
DROP COLUMN discount_rate;
-- 스키마 히스토리 조회
SELECT * FROM lakehouse.sales_db."products$schemas";
스키마 버전 관리:
schema_id committed_at fields
| 0 | 2024-01-15 10:00:00 | id, product_name, price |
| 1 | 2024-01-16 11:00:00 | id, name, price (renamed) |
| 2 | 2024-01-16 14:00:00 | id, name, price, discount_rate |
5.5 파티션 진화 (Partition Evolution)
-- 초기 파티션 (일별)
CREATE TABLE lakehouse.sales_db.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2)
)
USING iceberg
PARTITIONED BY (order_date);
-- 데이터가 쌓인 후 파티션 변경 (월별로 전환)
ALTER TABLE lakehouse.sales_db.orders
DROP PARTITION FIELD order_date;
ALTER TABLE lakehouse.sales_db.orders
ADD PARTITION FIELD months(order_date);
-- 기존 데이터는 일별 파티션 유지, 새 데이터는 월별 파티션
-- Iceberg가 자동으로 처리!
5.6 메타데이터 최적화
-- 작은 파일 병합 (Compaction)
CALL lakehouse.system.rewrite_data_files(
table => 'sales_db.products',
options => map(
'target-file-size-bytes', '536870912', -- 512MB
'min-input-files', '5'
)
);
-- 오래된 스냅샷 정리 (7일 이전)
CALL lakehouse.system.expire_snapshots(
table => 'sales_db.products',
older_than => TIMESTAMP '2024-01-08 00:00:00',
retain_last => 10 -- 최소 10개 스냅샷 유지
);
-- 고아 파일 정리 (참조되지 않는 파일)
CALL lakehouse.system.remove_orphan_files(
table => 'sales_db.products',
older_than => TIMESTAMP '2024-01-01 00:00:00'
);
-- 메타데이터 파일 압축
CALL lakehouse.system.rewrite_manifests('sales_db.products');
6. ML 파이프라인 연계 아키텍처
6.1 전체 아키텍처
┌──────────────────────────────────────────────────────────┐
│ ML Lifecycle │
├──────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────┐│
│ │ Data Lake │─────►│Feature Store │─────►│Training ││
│ │ (Iceberg) │ │ (Feast) │ │ (Spark) ││
│ └─────────────┘ └──────────────┘ └────┬────┘│
│ │ │ │
│ │ Trino Query │ │
│ │ ┌────▼────┐│
│ ┌──────▼──────┐ │ MLflow ││
│ │ EDA │ │Tracking ││
│ │ (Jupyter) │ └────┬────┘│
│ └─────────────┘ │ │
│ ┌────▼────┐│
│ │ Model ││
│ │Registry ││
│ └────┬────┘│
│ │ │
│ ┌────▼────┐│
│ │Inference││
│ │ Serving ││
│ └─────────┘│
└──────────────────────────────────────────────────────────┘
6.2 데이터 추출 및 특징 엔지니어링
6.2.1 Trino를 통한 데이터 추출
from trino.dbapi import connect
import pandas as pd
# Trino 연결
conn = connect(
host='localhost',
port=8080,
user='datascientist',
catalog='lakehouse',
schema='sales_db'
)
# 학습 데이터 쿼리
query = """
SELECT
p.product_id,
p.category,
p.price,
p.brand,
COUNT(o.order_id) as order_count,
SUM(o.quantity) as total_quantity,
AVG(r.rating) as avg_rating,
DATE_DIFF('day', p.created_at, CURRENT_DATE) as product_age_days
FROM products p
LEFT JOIN orders o ON p.product_id = o.product_id
LEFT JOIN reviews r ON p.product_id = r.product_id
WHERE o.order_date >= DATE '2024-01-01'
GROUP BY p.product_id, p.category, p.price, p.brand, p.created_at
"""
df = pd.read_sql(query, conn)
print(f"Extracted {len(df)} rows")
6.2.2 Spark를 통한 대규모 특징 추출
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("FeatureEngineering") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2") \
.getOrCreate()
# Iceberg 테이블 직접 읽기
df = spark.read \
.format("iceberg") \
.load("lakehouse.sales_db.orders")
# 특징 생성
features = df.groupBy("customer_id").agg(
count("order_id").alias("total_orders"),
sum("amount").alias("total_spending"),
avg("amount").alias("avg_order_value"),
max("order_date").alias("last_order_date"),
min("order_date").alias("first_order_date")
).withColumn(
"customer_lifetime_days",
datediff(col("last_order_date"), col("first_order_date"))
).withColumn(
"avg_days_between_orders",
col("customer_lifetime_days") / col("total_orders")
)
# Feature Store에 저장 (새 Iceberg 테이블)
features.writeTo("lakehouse.ml_features.customer_features") \
.using("iceberg") \
.createOrReplace()
6.3 Feature Store 연동 (Feast)
6.3.1 Feast 설정
# feature_repo/feature_store.yaml
project: sales_ml
registry: s3://lakehouse/feast/registry.db
provider: local
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: file # 또는 spark
Feature 정의:
# feature_repo/features.py
from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource
# Entity 정의
customer = Entity(
name="customer_id",
value_type=ValueType.INT64,
description="고객 ID"
)
# Data Source (Iceberg 테이블 참조)
customer_source = FileSource(
path="s3://lakehouse/warehouse/ml_features.db/customer_features",
event_timestamp_column="last_order_date",
created_timestamp_column="feature_created_at"
)
# Feature View
customer_features_view = FeatureView(
name="customer_features",
entities=["customer_id"],
ttl=timedelta(days=30),
features=[
Feature(name="total_orders", dtype=ValueType.INT64),
Feature(name="total_spending", dtype=ValueType.DOUBLE),
Feature(name="avg_order_value", dtype=ValueType.DOUBLE),
],
online=True,
batch_source=customer_source,
)
Feature Store 배포:
cd feature_repo
feast apply # Registry에 등록
feast materialize-incremental $(date +%Y-%m-%d) # Online Store 동기화
6.4 MLflow 기반 실험 관리
6.4.1 MLflow 서버 설정
# docker-compose.yml에 추가
mlflow:
image: python:3.10-slim
container_name: mlflow
ports:
- "5000:5000"
volumes:
- ./mlruns:/mlruns
command: >
bash -c "pip install mlflow boto3 &&
mlflow server
--backend-store-uri sqlite:///mlruns/mlflow.db
--default-artifact-root s3://lakehouse/mlflow-artifacts
--host 0.0.0.0"
environment:
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin123
MLFLOW_S3_ENDPOINT_URL: http://minio:9000
networks:
- datalake
6.4.2 학습 파이프라인
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
# MLflow 설정
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("customer_churn_prediction")
# Iceberg에서 데이터 로드 (Trino 경유)
from trino.dbapi import connect
import pandas as pd
conn = connect(host='localhost', port=8080, catalog='lakehouse', schema='ml_features')
df = pd.read_sql("SELECT * FROM customer_features", conn)
# 레이블 생성 (예: 이탈 고객 여부)
df['is_churned'] = (df['avg_days_between_orders'] > 90).astype(int)
X = df[['total_orders', 'total_spending', 'avg_order_value']]
y = df['is_churned']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# MLflow Run 시작
with mlflow.start_run(run_name="rf_v1"):
# 하이퍼파라미터 로깅
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5
}
mlflow.log_params(params)
# 모델 학습
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# 예측 및 평가
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# 메트릭 로깅
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# 모델 저장 (S3/MinIO)
mlflow.sklearn.log_model(model, "model")
# 데이터셋 버전 정보 로깅
snapshot_id = pd.read_sql("""
SELECT snapshot_id
FROM lakehouse.ml_features.customer_features.snapshots
ORDER BY committed_at DESC LIMIT 1
""", conn).iloc[0, 0]
mlflow.log_param("iceberg_snapshot_id", snapshot_id)
print(f"✅ Model logged with accuracy: {accuracy:.3f}")
6.5 모델 서빙 아키텍처
6.5.1 Batch Inference (Spark)
import mlflow.pyfunc
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# MLflow에서 모델 로드
model_uri = "models:/customer_churn_model/Production"
model = mlflow.pyfunc.spark_udf(spark, model_uri, result_type="double")
# 추론 대상 데이터 (Iceberg)
df = spark.read \
.format("iceberg") \
.load("lakehouse.ml_features.customer_features")
# 배치 예측
predictions = df.withColumn(
"churn_probability",
model(struct("total_orders", "total_spending", "avg_order_value"))
)
# 결과 저장 (새 Iceberg 테이블)
predictions.writeTo("lakehouse.ml_results.churn_predictions") \
.using("iceberg") \
.partitionedBy("prediction_date") \
.createOrReplace()
6.5.2 Real-time Inference (FastAPI)
from fastapi import FastAPI
from pydantic import BaseModel
import mlflow.pyfunc
import pandas as pd
from feast import FeatureStore
app = FastAPI()
# 모델 로드 (시작 시 1회)
model = mlflow.pyfunc.load_model("models:/customer_churn_model/Production")
feast_store = FeatureStore(repo_path="./feature_repo")
class PredictionRequest(BaseModel):
customer_id: int
@app.post("/predict")
def predict_churn(request: PredictionRequest):
# Feature Store에서 실시간 특징 조회
features = feast_store.get_online_features(
features=[
"customer_features:total_orders",
"customer_features:total_spending",
"customer_features:avg_order_value"
],
entity_rows=[{"customer_id": request.customer_id}]
).to_df()
# 예측
prediction = model.predict(features)
return {
"customer_id": request.customer_id,
"churn_probability": float(prediction[0]),
"risk_level": "High" if prediction[0] > 0.7 else "Low"
}
# 실행: uvicorn main:app --host 0.0.0.0 --port 8000
6.6 운영 시 주의점
6.6.1 데이터 버전 추적
# 학습 데이터의 Iceberg 스냅샷 ID를 반드시 기록
with mlflow.start_run():
# 데이터 버전 정보
snapshot_info = spark.sql("""
SELECT snapshot_id, committed_at
FROM lakehouse.ml_features.customer_features.snapshots
ORDER BY committed_at DESC LIMIT 1
""").collect()[0]
mlflow.log_param("data_snapshot_id", snapshot_info.snapshot_id)
mlflow.log_param("data_snapshot_time", snapshot_info.committed_at)
# 나중에 재현 가능
# df = spark.read.format("iceberg")
# .option("snapshot-id", snapshot_info.snapshot_id)
# .load("lakehouse.ml_features.customer_features")
6.6.2 특징 드리프트 모니터링
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
# 학습 데이터 (과거 스냅샷)
reference_df = spark.read \
.format("iceberg") \
.option("snapshot-id", 3055729675574597004) \
.load("lakehouse.ml_features.customer_features") \
.toPandas()
# 현재 데이터
current_df = spark.read \
.format("iceberg") \
.load("lakehouse.ml_features.customer_features") \
.toPandas()
# 드리프트 분석
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=reference_df, current_data=current_df)
# S3에 리포트 저장
report.save_html("s3://lakehouse/ml-monitoring/drift_report.html")
6.6.3 모델 재학습 트리거
# Airflow DAG 예제
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def check_data_freshness():
"""새 데이터 확인"""
latest_snapshot_time = spark.sql("""
SELECT MAX(committed_at)
FROM lakehouse.ml_features.customer_features.snapshots
""").collect()[0][0]
if (datetime.now() - latest_snapshot_time).days > 7:
raise Exception("Data is stale! Trigger retraining.")
def retrain_model():
"""모델 재학습 (위 코드 재사용)"""
# ... 학습 로직
dag = DAG(
'ml_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@weekly',
catchup=False
)
task_check = PythonOperator(
task_id='check_data',
python_callable=check_data_freshness,
dag=dag
)
task_train = PythonOperator(
task_id='retrain_model',
python_callable=retrain_model,
dag=dag
)
task_check >> task_train
7. Kubernetes 기반 확장 설계
7.1 Helm Chart 구조
datalake-helm/
├── Chart.yaml
├── values.yaml
├── templates/
│ ├── minio/
│ │ ├── statefulset.yaml
│ │ ├── service.yaml
│ │ └── pvc.yaml
│ ├── trino/
│ │ ├── coordinator-deployment.yaml
│ │ ├── worker-deployment.yaml
│ │ └── service.yaml
│ ├── hive-metastore/
│ │ └── deployment.yaml
│ └── ingress.yaml
└── values/
├── dev.yaml
├── staging.yaml
└── production.yaml
7.2 MinIO StatefulSet
# templates/minio/statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: minio
spec:
serviceName: minio-headless
replicas: 4 # 4노드 분산 배포
selector:
matchLabels:
app: minio
template:
metadata:
labels:
app: minio
spec:
containers:
- name: minio
image: minio/minio:RELEASE.2024-01-11T07-46-16Z
args:
- server
- http://minio-{0...3}.minio-headless.default.svc.cluster.local/data
env:
- name: MINIO_ROOT_USER
valueFrom:
secretKeyRef:
name: minio-secret
key: root-user
- name: MINIO_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: minio-secret
key: root-password
ports:
- containerPort: 9000
name: api
- containerPort: 9001
name: console
volumeMounts:
- name: data
mountPath: /data
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: fast-ssd
resources:
requests:
storage: 500Gi
7.3 Trino 분산 배포
Coordinator
# templates/trino/coordinator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: trino-coordinator
spec:
replicas: 1 # Coordinator는 단일 인스턴스
selector:
matchLabels:
app: trino
component: coordinator
template:
metadata:
labels:
app: trino
component: coordinator
spec:
containers:
- name: trino
image: trinodb/trino:435
ports:
- containerPort: 8080
name: http
env:
- name: COORDINATOR
value: "true"
- name: WORKERS
value: "trino-worker:8080"
volumeMounts:
- name: config
mountPath: /etc/trino
- name: catalog
mountPath: /etc/trino/catalog
resources:
requests:
memory: "8Gi"
cpu: "4"
limits:
memory: "16Gi"
cpu: "8"
volumes:
- name: config
configMap:
name: trino-config
- name: catalog
configMap:
name: trino-catalog
Worker
# templates/trino/worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: trino-worker
spec:
replicas: 5 # Worker는 수평 확장
selector:
matchLabels:
app: trino
component: worker
template:
metadata:
labels:
app: trino
component: worker
spec:
containers:
- name: trino
image: trinodb/trino:435
env:
- name: COORDINATOR
value: "false"
- name: DISCOVERY_URI
value: "http://trino-coordinator:8080"
volumeMounts:
- name: config
mountPath: /etc/trino
resources:
requests:
memory: "16Gi"
cpu: "8"
limits:
memory: "32Gi"
cpu: "16"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: trino-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: trino-worker
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
7.4 Ingress 및 TLS 설정
# templates/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: datalake-ingress
annotations:
cert-manager.io/cluster-issuer: "letsencrypt-prod"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
spec:
tls:
- hosts:
- trino.company.com
- minio.company.com
secretName: datalake-tls
rules:
- host: trino.company.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: trino-coordinator
port:
number: 8080
- host: minio.company.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: minio
port:
number: 9001
7.5 배포 및 관리
# Helm Chart 설치
helm install datalake ./datalake-helm \
-f values/production.yaml \
--namespace datalake \
--create-namespace
# 업그레이드
helm upgrade datalake ./datalake-helm \
-f values/production.yaml \
--namespace datalake
# Worker 수 조정
kubectl scale deployment trino-worker --replicas=10 -n datalake
# 롤링 업데이트 (무중단)
kubectl set image deployment/trino-worker \
trino=trinodb/trino:436 \
-n datalake
# 모니터링
kubectl logs -f deployment/trino-coordinator -n datalake
kubectl top pods -n datalake
8. React UI 연동 구현
8.1 아키텍처
┌─────────────────────────────────────────┐
│ React Frontend (Port 3000) │
│ - Data Catalog │
│ - Query Editor │
│ - Dashboard │
└──────────────┬──────────────────────────┘
│ REST API
┌──────────────▼──────────────────────────┐
│ Backend API (FastAPI/Flask) │
│ - Trino Connection Pool │
│ - Query Optimization │
│ - Authentication │
└──────────────┬──────────────────────────┘
│ JDBC/HTTP
┌──────────────▼──────────────────────────┐
│ Trino Coordinator │
└─────────────────────────────────────────┘
8.2 Backend API (FastAPI)
# backend/main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from trino.dbapi import connect
from trino.auth import BasicAuthentication
import pandas as pd
app = FastAPI(title="Data Lake API")
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Trino 연결 풀
def get_trino_connection():
return connect(
host='trino-coordinator',
port=8080,
user='api_user',
catalog='lakehouse',
schema='sales_db',
auth=BasicAuthentication("api_user", "password")
)
class QueryRequest(BaseModel):
sql: str
limit: int = 1000
@app.post("/api/query")
async def execute_query(request: QueryRequest):
"""SQL 쿼리 실행"""
try:
conn = get_trino_connection()
cursor = conn.cursor()
# SQL Injection 방지 (간단한 검증)
if any(keyword in request.sql.upper() for keyword in ['DROP', 'DELETE', 'TRUNCATE']):
raise HTTPException(status_code=403, detail="Destructive queries not allowed")
# 쿼리 실행
cursor.execute(f"{request.sql} LIMIT {request.limit}")
# 결과 변환
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return {
"columns": columns,
"data": [dict(zip(columns, row)) for row in rows],
"row_count": len(rows)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/catalogs")
async def list_catalogs():
"""카탈로그 목록 조회"""
conn = get_trino_connection()
df = pd.read_sql("SHOW CATALOGS", conn)
return {"catalogs": df['Catalog'].tolist()}
@app.get("/api/schemas/{catalog}")
async def list_schemas(catalog: str):
"""스키마 목록 조회"""
conn = get_trino_connection()
df = pd.read_sql(f"SHOW SCHEMAS FROM {catalog}", conn)
return {"schemas": df['Schema'].tolist()}
@app.get("/api/tables/{catalog}/{schema}")
async def list_tables(catalog: str, schema: str):
"""테이블 목록 조회"""
conn = get_trino_connection()
df = pd.read_sql(f"SHOW TABLES FROM {catalog}.{schema}", conn)
return {"tables": df['Table'].tolist()}
@app.get("/api/table-info/{catalog}/{schema}/{table}")
async def get_table_info(catalog: str, schema: str, table: str):
"""테이블 상세 정보"""
conn = get_trino_connection()
# 컬럼 정보
columns_df = pd.read_sql(
f"DESCRIBE {catalog}.{schema}.{table}",
conn
)
# 스냅샷 정보 (Iceberg 전용)
try:
snapshots_df = pd.read_sql(
f"SELECT * FROM {catalog}.{schema}.\"{table}$snapshots\" ORDER BY committed_at DESC LIMIT 10",
conn
)
snapshots = snapshots_df.to_dict('records')
except:
snapshots = []
# 파티션 정보
try:
partitions_df = pd.read_sql(
f"SELECT * FROM {catalog}.{schema}.\"{table}$partitions\" LIMIT 100",
conn
)
partitions = partitions_df.to_dict('records')
except:
partitions = []
return {
"columns": columns_df.to_dict('records'),
"snapshots": snapshots,
"partitions": partitions
}
# uvicorn main:app --host 0.0.0.0 --port 8000
8.3 React Frontend
8.3.1 프로젝트 구조
frontend/
├── src/
│ ├── components/
│ │ ├── QueryEditor.jsx
│ │ ├── DataCatalog.jsx
│ │ ├── TableViewer.jsx
│ │ └── SnapshotTimeline.jsx
│ ├── services/
│ │ └── api.js
│ ├── App.jsx
│ └── main.jsx
├── package.json
└── vite.config.js
8.3.2 API 서비스
// src/services/api.js
import axios from 'axios';
const API_BASE_URL = 'http://localhost:8000/api';
const api = axios.create({
baseURL: API_BASE_URL,
timeout: 30000,
headers: {
'Content-Type': 'application/json',
},
});
export const executeQuery = async (sql, limit = 1000) => {
const response = await api.post('/query', { sql, limit });
return response.data;
};
export const getCatalogs = async () => {
const response = await api.get('/catalogs');
return response.data.catalogs;
};
export const getSchemas = async (catalog) => {
const response = await api.get(`/schemas/${catalog}`);
return response.data.schemas;
};
export const getTables = async (catalog, schema) => {
const response = await api.get(`/tables/${catalog}/${schema}`);
return response.data.tables;
};
export const getTableInfo = async (catalog, schema, table) => {
const response = await api.get(`/table-info/${catalog}/${schema}/${table}`);
return response.data;
};
8.3.3 쿼리 에디터 컴포넌트
// src/components/QueryEditor.jsx
import React, { useState } from 'react';
import { executeQuery } from '../services/api';
import { Box, Button, Textarea, Table } from '@chakra-ui/react';
export const QueryEditor = () => {
const [sql, setSql] = useState('SELECT * FROM lakehouse.sales_db.products');
const [result, setResult] = useState(null);
const [loading, setLoading] = useState(false);
const handleExecute = async () => {
setLoading(true);
try {
const data = await executeQuery(sql);
setResult(data);
} catch (error) {
alert(`Error: ${error.message}`);
} finally {
setLoading(false);
}
};
return (
<Box>
<Textarea
value={sql}
onChange={(e) => setSql(e.target.value)}
placeholder="Enter SQL query..."
size="lg"
fontFamily="monospace"
minHeight="200px"
/>
<Button
onClick={handleExecute}
isLoading={loading}
colorScheme="blue"
mt={4}
>
Execute Query
</Button>
{result && (
<Box mt={6}>
<Text>Rows: {result.row_count}</Text>
<Table variant="simple" mt={4}>
<Thead>
<Tr>
{result.columns.map(col => (
<Th key={col}>{col}</Th>
))}
</Tr>
</Thead>
<Tbody>
{result.data.map((row, idx) => (
<Tr key={idx}>
{result.columns.map(col => (
<Td key={col}>{row[col]}</Td>
))}
</Tr>
))}
</Tbody>
</Table>
</Box>
)}
</Box>
);
};
8.3.4 데이터 카탈로그 컴포넌트
// src/components/DataCatalog.jsx
import React, { useState, useEffect } from 'react';
import { getCatalogs, getSchemas, getTables, getTableInfo } from '../services/api';
import { Box, VStack, Accordion, AccordionItem, AccordionButton, AccordionPanel } from '@chakra-ui/react';
export const DataCatalog = ({ onTableSelect }) => {
const [catalogs, setCatalogs] = useState([]);
const [schemas, setSchemas] = useState({});
const [tables, setTables] = useState({});
useEffect(() => {
loadCatalogs();
}, []);
const loadCatalogs = async () => {
const data = await getCatalogs();
setCatalogs(data);
};
const loadSchemas = async (catalog) => {
if (!schemas[catalog]) {
const data = await getSchemas(catalog);
setSchemas({ ...schemas, [catalog]: data });
}
};
const loadTables = async (catalog, schema) => {
const key = `${catalog}.${schema}`;
if (!tables[key]) {
const data = await getTables(catalog, schema);
setTables({ ...tables, [key]: data });
}
};
const handleTableClick = async (catalog, schema, table) => {
const info = await getTableInfo(catalog, schema, table);
onTableSelect({ catalog, schema, table, info });
};
return (
<Box width="300px" borderRight="1px" borderColor="gray.200" p={4}>
<Accordion allowMultiple>
{catalogs.map(catalog => (
<AccordionItem key={catalog}>
<AccordionButton onClick={() => loadSchemas(catalog)}>
📦 {catalog}
</AccordionButton>
<AccordionPanel>
{schemas[catalog]?.map(schema => (
<Accordion key={schema} allowMultiple>
<AccordionItem>
<AccordionButton onClick={() => loadTables(catalog, schema)}>
📂 {schema}
</AccordionButton>
<AccordionPanel>
{tables[`${catalog}.${schema}`]?.map(table => (
<Box
key={table}
p={2}
cursor="pointer"
_hover={{ bg: 'gray.100' }}
onClick={() => handleTableClick(catalog, schema, table)}
>
📄 {table}
</Box>
))}
</AccordionPanel>
</AccordionItem>
</Accordion>
))}
</AccordionPanel>
</AccordionItem>
))}
</Accordion>
</Box>
);
};
8.3.5 스냅샷 타임라인
// src/components/SnapshotTimeline.jsx
import React from 'react';
import { Box, Text, VStack, Badge } from '@chakra-ui/react';
import { format } from 'date-fns';
export const SnapshotTimeline = ({ snapshots }) => {
return (
<VStack align="stretch" spacing={4}>
<Text fontSize="xl" fontWeight="bold">Snapshot History</Text>
{snapshots.map((snap, idx) => (
<Box
key={snap.snapshot_id}
p={4}
borderLeft="4px"
borderColor={idx === 0 ? 'green.500' : 'gray.300'}
bg={idx === 0 ? 'green.50' : 'white'}
>
<Text fontWeight="bold">
Snapshot #{snap.snapshot_id}
{idx === 0 && <Badge ml={2} colorScheme="green">CURRENT</Badge>}
</Text>
<Text fontSize="sm" color="gray.600">
{format(new Date(snap.committed_at), 'yyyy-MM-dd HH:mm:ss')}
</Text>
<Text fontSize="sm">Operation: {snap.operation}</Text>
<Text fontSize="sm">Parent: {snap.parent_id || 'None'}</Text>
</Box>
))}
</VStack>
);
};
8.4 Docker Compose에 추가
# React 개발 서버
react-ui:
build:
context: ./frontend
dockerfile: Dockerfile
ports:
- "3000:3000"
volumes:
- ./frontend:/app
- /app/node_modules
environment:
- REACT_APP_API_URL=http://localhost:8000
networks:
- datalake
# Backend API
api-server:
build:
context: ./backend
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- TRINO_HOST=trino
- TRINO_PORT=8080
depends_on:
- trino
networks:
- datalake
9. 운영 및 모니터링 가이드
9.1 로깅 및 메트릭
9.1.1 Prometheus + Grafana 스택
# docker-compose.yml에 추가
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./configs/prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- datalake
grafana:
image: grafana/grafana:latest
ports:
- "3001:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- ./grafana-data:/var/lib/grafana
networks:
- datalake
prometheus.yml:
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'minio'
static_configs:
- targets: ['minio:9000']
metrics_path: /minio/v2/metrics/cluster
- job_name: 'trino'
static_configs:
- targets: ['trino:8080']
metrics_path: /v1/info
9.1.2 주요 모니터링 지표
MinIO 메트릭:
- minio_cluster_capacity_raw_total_bytes: 전체 용량
- minio_cluster_capacity_raw_free_bytes: 여유 공간
- minio_s3_requests_total: API 요청 수
- minio_s3_errors_total: 에러 수
Trino 메트릭:
- trino.execution.QueuedQueries: 대기 중인 쿼리
- trino.execution.RunningQueries: 실행 중인 쿼리
- trino.memory.AssignedQueries: 메모리 할당량
- trino.execution.QueryWallTime: 쿼리 실행 시간
9.2 성능 최적화 체크리스트
## 데이터 레이아웃 최적화
- [ ] 파티셔닝 전략 수립 (날짜/카테고리)
- [ ] 파일 크기 최적화 (128MB ~ 512MB)
- [ ] Compaction 주기 설정 (주 1회)
- [ ] Orphan file 정리 (월 1회)
## 쿼리 최적화
- [ ] Predicate Pushdown 활용
- [ ] Partition Pruning 확인
- [ ] 통계 정보 수집 (ANALYZE TABLE)
- [ ] Dynamic Filtering 활성화
## 인프라 최적화
- [ ] SSD 스토리지 사용
- [ ] 네트워크 대역폭 확보 (10Gbps+)
- [ ] Trino Worker 메모리 증설
- [ ] MinIO Erasure Coding 설정
9.3 백업 및 복구 절차
# MinIO 데이터 백업 (mc 클라이언트)
mc mirror --preserve myminio/lakehouse backup-server/lakehouse-backup
# Iceberg 메타데이터 백업
mc cp --recursive myminio/lakehouse/warehouse/*/metadata/ \
backup-server/metadata-backup/$(date +%Y%m%d)/
# PostgreSQL Metastore 백업
docker exec postgres pg_dump -U hive metastore > metastore_backup.sql
# 복구 절차
# 1. MinIO 데이터 복원
mc mirror backup-server/lakehouse-backup myminio/lakehouse
# 2. Metastore 복원
docker exec -i postgres psql -U hive metastore < metastore_backup.sql
# 3. Iceberg 메타데이터 정합성 확인
trino --execute "SELECT * FROM lakehouse.information_schema.tables"
10. 요약 및 다음 단계
10.1 구축 완료 체크리스트
✅ **로컬 환경 (Windows 11)**
- [ ] Docker Desktop 설치 및 WSL 2 설정
- [ ] MinIO 클러스터 구성 (docker-compose)
- [ ] Trino + Hive Metastore 연동
- [ ] Spark를 통한 Iceberg 테이블 생성
- [ ] Jupyter Notebook 환경 구축
✅ **데이터 파이프라인**
- [ ] 데이터 적재 파이프라인 구현
- [ ] 타임 트래블 기능 테스트
- [ ] 스냅샷 롤백 시나리오 검증
- [ ] 스키마 진화 테스트
✅ **ML 연계**
- [ ] Feature Store (Feast) 설정
- [ ] MLflow 실험 추적
- [ ] 배치 추론 파이프라인
- [ ] 실시간 추론 API
✅ **UI 개발**
- [ ] React 프론트엔드 구축
- [ ] FastAPI 백엔드 개발
- [ ] 데이터 카탈로그 구현
- [ ] 쿼리 에디터 통합
✅ **엔터프라이즈 준비**
- [ ] LDAP/AD 인증 연동
- [ ] TLS 암호화 설정
- [ ] 감사 로그 구성
- [ ] Kubernetes 마이그레이션 계획
10.2 다음 단계 로드맵
Phase 1: 안정화 (1-2개월)
- 로컬 환경 철저한 테스트
- 성능 벤치마크 (TPC-DS 쿼리)
- 장애 시나리오 시뮬레이션
Phase 2: 프로덕션 준비 (2-3개월)
- Kubernetes 클러스터 구축
- CI/CD 파이프라인 (GitLab/Jenkins)
- 보안 강화 (Vault, IAM)
Phase 3: 엔터프라이즈 배포 (3-6개월)
- 온프레미스 하드웨어 프로비저닝
- 네트워크 격리 및 방화벽 설정
- 데이터 거버넌스 정책 수립
- 운영팀 교육
Phase 4: 확장 (6개월+)
- 멀티 리전 복제
- 실시간 스트리밍 (Kafka + Flink)
- 고급 분석 (Superset, Metabase)
10.3 참고 자료
공식 문서:
- Apache Iceberg: https://iceberg.apache.org/docs/latest/
- Trino: https://trino.io/docs/current/
- MinIO: https://min.io/docs/minio/linux/index.html
커뮤니티:
- Trino Slack: https://trino.io/slack.html
- Iceberg Slack: https://apache-iceberg.slack.com
벤치마크:
결론
이 가이드는 완전 오픈소스 기반 Data Lake 구축의 전 과정을 다룹니다:
- 로컬 개발 환경: Windows 11 + Docker로 빠른 프로토타이핑
- 프로덕션 아키텍처: Kubernetes 기반 고가용성 설계
- 엔터프라이즈 보안: 인증, 암호화, 감사 로그
- ML 통합: Feature Store + MLflow + 추론 서빙
- 사용자 경험: React 기반 직관적 UI
핵심 강점:
- ✅ 클라우드 벤더 독립적 (AWS/Azure/GCP 이관 자유)
- ✅ TCO 절감 (라이선스 비용 제로)
- ✅ 완전한 데이터 통제 (규정 준수 용이)
- ✅ 대기업/공공기관 아키텍처 표준 부합
지금 바로 docker-compose up으로 시작하세요! 🚀
'교육' 카테고리의 다른 글
| mariadb에서 datetime 유형의 0시 이후 조회 예제 (0) | 2026.01.20 |
|---|---|
| node v14.17.0 설치 URL (0) | 2026.01.19 |
| Data Lake Windows 환경에서의 현실적 권장 아키텍처 (0) | 2026.01.15 |
| 초기 로더 CSS 파일 가이드 (0) | 2026.01.15 |
| Worker Configuration Type Definition 파일 가이드 (0) | 2026.01.15 |