본문 바로가기

교육

MinIO + Apache Iceberg + Trino 기반 Data Lake 구축 종합 가이드

반응형

오픈소스 기반 Data Lake는 클라우드 벤더 종속을 피하면서도 엔터프라이즈급 기능을 제공합니다.

 

📋 목차

  1. 아키텍처 개요 및 컴포넌트 역할
  2. 데이터 흐름 상세 분석
  3. Windows 11 로컬 환경 구축 가이드
  4. 엔터프라이즈 확장 전략
  5. Iceberg 버전 관리 및 타임 트래블
  6. ML 파이프라인 연계 아키텍처
  7. Kubernetes 기반 확장 설계
  8. React UI 연동 구현
  9. 운영 및 모니터링 가이드

 

1. 아키텍처 개요 및 컴포넌트 역할

1.1 전체 아키텍처 다이어그램

┌─────────────────────────────────────────────────────────────┐
│                     Data Lake Architecture                   │
├─────────────────────────────────────────────────────────────┤
│                                                               │
│  ┌──────────────┐      ┌──────────────┐      ┌───────────┐ │
│  │   Clients    │      │   React UI   │      │  MLflow   │ │
│  │ (Python/SQL) │      │  Dashboard   │      │ Tracking  │ │
│  └──────┬───────┘      └──────┬───────┘      └─────┬─────┘ │
│         │                     │                     │        │
│         └──────────┬──────────┴─────────────────────┘        │
│                    │                                          │
│         ┌──────────▼──────────┐                              │
│         │    Trino Cluster    │ ← Query Engine              │
│         │  (Distributed SQL)  │                              │
│         └──────────┬──────────┘                              │
│                    │                                          │
│         ┌──────────▼──────────┐                              │
│         │  Apache Iceberg     │ ← Table Format              │
│         │  (Metadata Layer)   │                              │
│         │  - Snapshot Mgmt    │                              │
│         │  - Schema Evolution │                              │
│         │  - Time Travel      │                              │
│         └──────────┬──────────┘                              │
│                    │                                          │
│         ┌──────────▼──────────┐                              │
│         │      MinIO          │ ← Object Storage            │
│         │  (S3-Compatible)    │                              │
│         │  - Data Files       │                              │
│         │  - Metadata Files   │                              │
│         └─────────────────────┘                              │
│                                                               │
└─────────────────────────────────────────────────────────────┘

1.2 각 컴포넌트의 실무 역할

MinIO (Storage Layer)

핵심 역할:

  • 객체 스토리지 제공: S3 API 호환 인터페이스로 무제한 확장 가능
  • 데이터 지속성: Parquet, ORC, Avro 등 컬럼형 포맷 저장
  • 버전 관리 지원: 객체 수준 버전 관리 (Iceberg와 별개)

실무 관점:

실제 저장 구조:
s3://lakehouse/
├── warehouse/
│   ├── db1.db/
│   │   ├── table1/
│   │   │   ├── data/
│   │   │   │   ├── part-00000.parquet
│   │   │   │   ├── part-00001.parquet
│   │   │   └── metadata/
│   │   │       ├── snap-12345-1-abc.avro
│   │   │       ├── v1.metadata.json
│   │   │       └── version-hint.text

성능 특성:

  • 처리량: 단일 노드 10-20 GB/s (NVMe SSD 기준)
  • 지연시간: 첫 바이트까지 < 10ms
  • 확장성: 페타바이트급 지원

Apache Iceberg (Table Format Layer)

핵심 역할:

  • 메타데이터 관리: 스냅샷, 파티션, 스키마를 JSON/Avro로 추적
  • ACID 트랜잭션: Optimistic Concurrency Control 기반
  • 스키마 진화: 컬럼 추가/삭제/변경 without 재작성

메타데이터 구조:

{
  "format-version": 2,
  "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
  "location": "s3://lakehouse/warehouse/db1.db/sales",
  "last-sequence-number": 34,
  "last-updated-ms": 1609459200000,
  "last-column-id": 5,
  "schemas": [...],
  "current-snapshot-id": 3055729675574597004,
  "snapshots": [
    {
      "snapshot-id": 3055729675574597004,
      "timestamp-ms": 1609459200000,
      "manifest-list": "s3://lakehouse/.../snap-3055729675574597004-1-abc.avro"
    }
  ]
}

실무 장점:

  1. 히든 파티셔닝: 사용자는 파티션 의식 불필요
  2. 증분 읽기: 마지막 스냅샷 이후 변경분만 조회
  3. 시간 여행: 과거 임의 시점 데이터 조회

Trino (Query Engine)

핵심 역할:

  • 분산 SQL 실행: MPP (Massively Parallel Processing)
  • 다중 데이터 소스 연합: Iceberg + PostgreSQL + MySQL 동시 조인
  • 벡터화 실행: SIMD 명령어 활용 고속 처리

쿼리 실행 흐름:

1. SQL 파싱 → AST 생성
2. 논리 계획 최적화 (Predicate Pushdown, Projection Pruning)
3. 물리 계획 생성 (분산 실행 계획)
4. Iceberg 메타데이터 조회 → 파일 목록 획득
5. MinIO에서 Parquet 파일 스캔 (병렬)
6. 컬럼형 벡터 처리 → 결과 집계
7. 클라이언트 반환

성능 최적화:

  • Predicate Pushdown: WHERE 조건을 파일 스캔 단계에 적용
  • Partition Pruning: 불필요한 파티션 스킵
  • Dynamic Filtering: 조인 시 런타임 필터 생성

2. 데이터 흐름 상세 분석

2.1 데이터 적재 (Ingestion) 흐름

# Python에서 Iceberg 테이블에 데이터 적재
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergWriter") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.2") \
    .config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.lakehouse.type", "hadoop") \
    .config("spark.sql.catalog.lakehouse.warehouse", "s3a://lakehouse/warehouse") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# 데이터 생성
df = spark.createDataFrame([
    (1, "Alice", 30, "2024-01-15"),
    (2, "Bob", 35, "2024-01-15"),
    (3, "Charlie", 28, "2024-01-16")
], ["id", "name", "age", "date"])

# Iceberg 테이블 생성 및 적재
df.writeTo("lakehouse.db1.users") \
    .using("iceberg") \
    .partitionedBy("date") \
    .createOrReplace()

내부 동작:

  1. Spark가 데이터를 Parquet 파일로 변환
  2. MinIO에 파일 업로드 (s3a://lakehouse/warehouse/db1.db/users/data/)
  3. Iceberg가 스냅샷 메타데이터 생성
  4. 메타데이터도 MinIO에 저장

2.2 데이터 조회 (Query) 흐름

-- Trino에서 실행
SELECT name, age 
FROM lakehouse.db1.users 
WHERE date = '2024-01-15' AND age > 30;

실행 계획:

Fragment 0 [SINGLE]
    Output[name, age]
    └─ Filter[age > 30]
        └─ TableScan[iceberg.lakehouse.db1.users]
            Partition Filter: date = '2024-01-15'
            Predicate: age > 30
            Files: 1 (125.5 KB)
            Rows: 2

상세 흐름:

  1. 메타데이터 조회: Trino가 MinIO에서 v1.metadata.json 읽기
  2. 파티션 프루닝: date = '2024-01-15' 파티션만 선택
  3. 파일 필터링: Parquet 파일의 통계(min/max) 확인
  4. 데이터 스캔: 선택된 파일만 읽어서 필터 적용
  5. 결과 반환: 2건의 행만 반환

2.3 업데이트/삭제 흐름 (Copy-on-Write)

-- Trino에서 UPDATE 실행
UPDATE lakehouse.db1.users 
SET age = 36 
WHERE name = 'Bob';

Iceberg의 ACID 구현:

1. 기존 스냅샷 확인 (snapshot-3055729675574597004)
2. 영향받는 파일 식별 (part-00001.parquet)
3. 해당 파일 읽어서 변경 적용 → 새 파일 생성 (part-00002.parquet)
4. 새 스냅샷 생성 (snapshot-3055729675574597005)
   - 기존 파일 제외 + 새 파일 포함
5. 메타데이터 원자적 업데이트 (v2.metadata.json 생성)

중요: 기존 파일(part-00001.parquet)은 삭제되지 않음 → 타임 트래블 가능


3. Windows 11 로컬 환경 구축 가이드

3.1 사전 요구사항

# 1. Docker Desktop 설치 (WSL 2 백엔드)
# https://www.docker.com/products/docker-desktop/

# 2. 시스템 요구사항 확인
# - RAM: 최소 16GB (권장 32GB)
# - CPU: 4코어 이상
# - 디스크: 50GB 여유 공간

# 3. WSL 2 활성화 확인
wsl --list --verbose
# 출력: * docker-desktop-data  Running  2

3.2 Docker Compose 구성

디렉토리 구조:

C:\datalake\
├── docker-compose.yml
├── configs/
│   ├── trino/
│   │   ├── catalog/
│   │   │   └── iceberg.properties
│   │   └── config.properties
│   └── minio/
│       └── policy.json
├── data/
│   ├── minio/
│   └── trino/
└── scripts/
    └── init-minio.sh

docker-compose.yml

version: '3.9'

services:
  # MinIO 객체 스토리지
  minio:
    image: minio/minio:RELEASE.2024-01-11T07-46-16Z
    container_name: minio
    ports:
      - "9000:9000"  # API
      - "9001:9001"  # Console
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin123
      MINIO_DOMAIN: minio
    command: server /data --console-address ":9001"
    volumes:
      - ./data/minio:/data
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3
    networks:
      - datalake

  # MinIO 초기화 (버킷 생성)
  minio-init:
    image: minio/mc:RELEASE.2024-01-11T05-49-32Z
    container_name: minio-init
    depends_on:
      - minio
    entrypoint: >
      /bin/sh -c "
      sleep 10;
      /usr/bin/mc alias set myminio http://minio:9000 minioadmin minioadmin123;
      /usr/bin/mc mb --ignore-existing myminio/lakehouse;
      /usr/bin/mc mb --ignore-existing myminio/iceberg;
      /usr/bin/mc anonymous set download myminio/lakehouse;
      exit 0;
      "
    networks:
      - datalake

  # Trino Coordinator
  trino:
    image: trinodb/trino:435
    container_name: trino
    ports:
      - "8080:8080"
    volumes:
      - ./configs/trino/catalog:/etc/trino/catalog
      - ./configs/trino/config.properties:/etc/trino/config.properties
    environment:
      - JAVA_OPTS=-Xmx8G
    depends_on:
      - minio
    networks:
      - datalake

  # Hive Metastore (Iceberg 카탈로그용)
  hive-metastore:
    image: apache/hive:3.1.3
    container_name: hive-metastore
    ports:
      - "9083:9083"
    environment:
      SERVICE_NAME: metastore
      DB_DRIVER: postgres
      SERVICE_OPTS: "-Djavax.jdo.option.ConnectionDriverName=org.postgresql.Driver
                     -Djavax.jdo.option.ConnectionURL=jdbc:postgresql://postgres:5432/metastore
                     -Djavax.jdo.option.ConnectionUserName=hive
                     -Djavax.jdo.option.ConnectionPassword=hivepassword"
    depends_on:
      - postgres
    networks:
      - datalake

  # PostgreSQL (Hive Metastore용)
  postgres:
    image: postgres:15-alpine
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: hive
      POSTGRES_PASSWORD: hivepassword
      POSTGRES_DB: metastore
    volumes:
      - ./data/postgres:/var/lib/postgresql/data
    networks:
      - datalake

  # Spark (데이터 적재용)
  spark:
    image: bitnami/spark:3.5.0
    container_name: spark
    ports:
      - "8888:8888"  # Jupyter
      - "4040:4040"  # Spark UI
    environment:
      - SPARK_MODE=master
      - SPARK_MASTER_HOST=spark
      - SPARK_MASTER_PORT=7077
    volumes:
      - ./notebooks:/opt/notebooks
      - ./configs/spark:/opt/bitnami/spark/conf
    command: >
      bash -c "pip install jupyterlab pyspark pandas pyarrow &&
               jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root 
               --NotebookApp.token='' --NotebookApp.password=''"
    networks:
      - datalake

networks:
  datalake:
    driver: bridge

3.3 Trino 설정 파일

configs/trino/config.properties

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery.uri=http://localhost:8080

# 메모리 설정
query.max-memory=5GB
query.max-memory-per-node=2GB
query.max-total-memory-per-node=3GB

# 동시 실행
task.concurrency=4
task.max-worker-threads=8

configs/trino/catalog/iceberg.properties

connector.name=iceberg
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://hive-metastore:9083

# MinIO 연결 설정
fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.path-style-access=true
s3.aws-access-key=minioadmin
s3.aws-secret-key=minioadmin123

# Iceberg 설정
iceberg.file-format=PARQUET
iceberg.compression-codec=SNAPPY

3.4 환경 구축 실행

# 1. 프로젝트 디렉토리 생성
cd C:\
mkdir datalake
cd datalake

# 2. 설정 파일 생성 (위 내용 복사)
mkdir -p configs\trino\catalog
mkdir -p data\minio
mkdir -p notebooks

# 3. Docker Compose 실행
docker-compose up -d

# 4. 로그 확인
docker-compose logs -f

# 5. 서비스 상태 확인
docker-compose ps

접속 확인:


3.5 초기 데이터 적재 테스트

Jupyter Notebook에서 실행:

from pyspark.sql import SparkSession

# Spark 세션 생성
spark = SparkSession.builder \
    .appName("IcebergTest") \
    .config("spark.jars.packages", 
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2,"
            "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.lakehouse.type", "hive") \
    .config("spark.sql.catalog.lakehouse.uri", "thrift://hive-metastore:9083") \
    .config("spark.sql.catalog.lakehouse.warehouse", "s3a://lakehouse/warehouse") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# 테스트 데이터 생성
data = [
    (1, "Product A", 100, "Electronics", "2024-01-15"),
    (2, "Product B", 200, "Clothing", "2024-01-15"),
    (3, "Product C", 150, "Electronics", "2024-01-16"),
    (4, "Product D", 300, "Home", "2024-01-16"),
]

df = spark.createDataFrame(data, 
    ["id", "product_name", "price", "category", "sale_date"])

# Iceberg 테이블 생성
spark.sql("CREATE DATABASE IF NOT EXISTS lakehouse.sales_db")

df.writeTo("lakehouse.sales_db.products") \
    .using("iceberg") \
    .partitionedBy("sale_date") \
    .tableProperty("write.format.default", "parquet") \
    .tableProperty("write.metadata.compression-codec", "gzip") \
    .createOrReplace()

print("✅ Iceberg 테이블 생성 완료!")

# 데이터 확인
spark.sql("SELECT * FROM lakehouse.sales_db.products").show()

4. 엔터프라이즈 확장 전략

4.1 보안 강화 요소

4.1.1 인증 및 인가

현재 로컬 환경:

MinIO: 기본 계정 (minioadmin)
Trino: 인증 없음

엔터프라이즈 요구사항:

계층 로컬 환경 엔터프라이즈 환경

사용자 인증 없음 LDAP/Active Directory 연동
SSO 미지원 OAuth 2.0 / SAML 2.0
API 인증 기본키 IAM Role / JWT Token
네트워크 HTTP TLS 1.3 암호화
감사 로그 없음 모든 쿼리/접근 기록

Trino 엔터프라이즈 인증 설정:

# configs/trino/config.properties
http-server.authentication.type=PASSWORD
http-server.https.enabled=true
http-server.https.port=8443
http-server.https.keystore.path=/etc/trino/keystore.jks
http-server.https.keystore.key=changeit

# LDAP 인증
http-server.authentication.password-authenticator=ldap
authentication.ldap.url=ldaps://ldap.company.com:636
authentication.ldap.user-bind-pattern=uid=${USER},ou=users,dc=company,dc=com

MinIO IAM 정책 예제:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Resource": "arn:aws:s3:::lakehouse/warehouse/sales_db/*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": "10.0.0.0/8"
        }
      }
    },
    {
      "Effect": "Deny",
      "Action": "s3:DeleteObject",
      "Resource": "*"
    }
  ]
}

4.1.2 데이터 암호화

암호화 계층:

┌─────────────────────────────────────┐
│  1. 전송 중 암호화 (TLS 1.3)       │
│     - 클라이언트 ↔ Trino           │
│     - Trino ↔ MinIO                │
├─────────────────────────────────────┤
│  2. 저장 암호화 (Encryption at Rest)│
│     - MinIO SSE-S3 (AES-256)       │
│     - 또는 SSE-KMS (키 관리 분리)  │
├─────────────────────────────────────┤
│  3. 컬럼 수준 암호화                │
│     - PII 데이터 (주민번호, 카드번호)│
│     - Parquet 파일 내 암호화 컬럼  │
└─────────────────────────────────────┘

MinIO 서버 측 암호화 설정:

# KMS 기반 암호화 활성화
mc admin config set myminio sse-kms \
    endpoint="https://kms.company.com" \
    key_id="arn:aws:kms:region:account:key/key-id"

# 버킷 기본 암호화 설정
mc encrypt set sse-s3 myminio/lakehouse

4.1.3 네트워크 격리

DMZ 기반 3계층 아키텍처:

┌─────────────────────────────────────────────┐
│           Web/Application Tier              │
│  (React UI, API Gateway)                    │
│  VLAN: 10.10.10.0/24                        │
└──────────────┬──────────────────────────────┘
               │ Firewall Rules
               │ - Port 8080 (Trino) only
┌──────────────▼──────────────────────────────┐
│           Query/Processing Tier             │
│  (Trino, Spark)                             │
│  VLAN: 10.10.20.0/24                        │
└──────────────┬──────────────────────────────┘
               │ Firewall Rules
               │ - Port 9000 (MinIO) only
┌──────────────▼──────────────────────────────┐
│           Storage Tier                      │
│  (MinIO Cluster)                            │
│  VLAN: 10.10.30.0/24                        │
│  - 외부 접근 완전 차단                      │
└─────────────────────────────────────────────┘

4.2 거버넌스 구현

4.2.1 데이터 카탈로그 (Apache Atlas 연동)

아키텍처:

# docker-compose에 추가
atlas:
  image: sburn/apache-atlas:2.3.0
  ports:
    - "21000:21000"
  environment:
    - ATLAS_OPTS=-Xmx2048m
  volumes:
    - ./configs/atlas:/apache-atlas-2.3.0/conf

# Trino-Atlas 연동
trino-atlas-connector:
  image: custom/trino-atlas-hook
  environment:
    ATLAS_ENDPOINT: http://atlas:21000
    ATLAS_USERNAME: admin
    ATLAS_PASSWORD: admin

메타데이터 자동 등록:

# Iceberg 테이블 생성 시 Atlas에 자동 등록
from pyatlas import AtlasClient

atlas = AtlasClient("http://atlas:21000", "admin", "admin")

# 테이블 엔티티 생성
table_entity = {
    "typeName": "iceberg_table",
    "attributes": {
        "name": "sales_db.products",
        "qualifiedName": "lakehouse.sales_db.products@prod",
        "owner": "data-team",
        "description": "제품 판매 데이터",
        "columns": [
            {"name": "id", "type": "int", "isPII": False},
            {"name": "product_name", "type": "string", "isPII": False},
            {"name": "price", "type": "int", "isPII": False}
        ],
        "classifications": ["CONFIDENTIAL", "SALES_DATA"]
    }
}

atlas.create_entity(table_entity)

4.2.2 데이터 품질 관리 (Great Expectations)

import great_expectations as gx

# 데이터 품질 검증
context = gx.get_context()

# Expectation Suite 정의
suite = context.create_expectation_suite("sales_validation")

# 검증 규칙
validator = context.get_validator(
    batch_request={
        "datasource_name": "iceberg_datasource",
        "data_connector_name": "default",
        "data_asset_name": "lakehouse.sales_db.products"
    },
    expectation_suite_name="sales_validation"
)

# 규칙 추가
validator.expect_column_values_to_not_be_null("id")
validator.expect_column_values_to_be_unique("id")
validator.expect_column_values_to_be_between("price", min_value=0, max_value=10000)
validator.expect_column_values_to_be_in_set("category", 
    ["Electronics", "Clothing", "Home"])

# 검증 실행
results = validator.validate()
print(results)

4.2.3 접근 제어 (Row-Level Security)

Trino Row Filter 설정:

-- 부서별 데이터 접근 제한
CREATE OR REPLACE VIEW lakehouse.sales_db.products_secure AS
SELECT *
FROM lakehouse.sales_db.products
WHERE 
  CASE 
    WHEN current_user() = 'finance_team' THEN category IN ('Electronics', 'Home')
    WHEN current_user() = 'sales_team' THEN TRUE
    ELSE category = 'Clothing'
  END;

-- 권한 부여
GRANT SELECT ON lakehouse.sales_db.products_secure TO ROLE analyst;

4.3 고가용성 (HA) 및 이중화

4.3.1 MinIO 클러스터 구성

Erasure Coding 기반 분산 배포:

# 4노드 분산 MinIO (8드라이브)
version: '3.9'

services:
  minio1:
    image: minio/minio
    command: server http://minio{1...4}/data{1...2}
    environment:
      MINIO_ROOT_USER: admin
      MINIO_ROOT_PASSWORD: password
    volumes:
      - /mnt/disk1:/data1
      - /mnt/disk2:/data2

  minio2:
    image: minio/minio
    command: server http://minio{1...4}/data{1...2}
    volumes:
      - /mnt/disk3:/data1
      - /mnt/disk4:/data2

  minio3:
    image: minio/minio
    command: server http://minio{1...4}/data{1...2}
    volumes:
      - /mnt/disk5:/data1
      - /mnt/disk6:/data2

  minio4:
    image: minio/minio
    command: server http://minio{1...4}/data{1...2}
    volumes:
      - /mnt/disk7:/data1
      - /mnt/disk8:/data2

  # Load Balancer
  nginx:
    image: nginx:alpine
    ports:
      - "9000:9000"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf

nginx.conf (MinIO 로드 밸런싱):

upstream minio_backend {
    least_conn;
    server minio1:9000;
    server minio2:9000;
    server minio3:9000;
    server minio4:9000;
}

server {
    listen 9000;
    location / {
        proxy_pass http://minio_backend;
        proxy_set_header Host $host;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        chunked_transfer_encoding off;
    }
}

4.3.2 Trino 클러스터 확장

Coordinator + Worker 분리:

# Trino Coordinator (1대)
trino-coordinator:
  image: trinodb/trino:435
  ports:
    - "8080:8080"
  volumes:
    - ./coordinator-config.properties:/etc/trino/config.properties
  environment:
    - COORDINATOR=true
    - WORKERS=trino-worker-1:8080,trino-worker-2:8080,trino-worker-3:8080

# Trino Workers (3대)
trino-worker-1:
  image: trinodb/trino:435
  volumes:
    - ./worker-config.properties:/etc/trino/config.properties
  environment:
    - COORDINATOR=false
    - DISCOVERY_URI=http://trino-coordinator:8080

trino-worker-2:
  image: trinodb/trino:435
  # ... 동일 설정

trino-worker-3:
  image: trinodb/trino:435
  # ... 동일 설정

Autoscaling 정책 (Kubernetes 환경):

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: trino-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: trino-worker
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

4.4 재해 복구 (DR)

백업 전략:

# MinIO 원격 복제 설정
mc admin replicate add myminio \
  --remote-bucket lakehouse-dr \
  --remote-endpoint https://minio-dr.company.com \
  --remote-access-key  \
  --remote-secret-key  \
  --replicate-deletes

# Iceberg 메타데이터 백업 (S3 Lifecycle)
mc ilm add myminio/lakehouse/warehouse --expiry-days 90 \
  --transition-days 30 --storage-class GLACIER

RPO/RTO 목표:

구분 로컬 환경 엔터프라이즈 환경

RPO N/A < 1시간 (증분 복제)
RTO N/A < 4시간 (자동 페일오버)
백업 주기 수동 실시간 + 일 1회 전체

5. Apache Iceberg 버전 관리 및 타임 트래블

5.1 스냅샷 아키텍처

Iceberg 스냅샷 구조:

Timeline: ──────────────────────────────────────────►
          t1        t2        t3        t4
          │         │         │         │
Snapshot: S1 ────► S2 ────► S3 ────► S4 (current)
          │         │         │         │
Files:    F1        F1,F2     F2,F3     F3,F4,F5
          │         │         │         │
Metadata: v1.json   v2.json   v3.json   v4.json

각 스냅샷의 구성 요소:

  • Manifest List: 변경된 파티션 목록
  • Manifest Files: 실제 데이터 파일 목록
  • Data Files: Parquet/ORC 파일

5.2 타임 트래블 실전 예제

5.2.1 기본 타임 트래블

-- 현재 데이터 조회
SELECT * FROM lakehouse.sales_db.products;
-- 결과: 4건 (최신 상태)

-- 특정 시점으로 조회 (스냅샷 ID)
SELECT * FROM lakehouse.sales_db.products 
FOR VERSION AS OF 3055729675574597004;
-- 결과: 2건 (과거 스냅샷)

-- 특정 타임스탬프로 조회
SELECT * FROM lakehouse.sales_db.products 
FOR TIMESTAMP AS OF TIMESTAMP '2024-01-15 10:30:00';
-- 결과: 해당 시점의 데이터

-- 스냅샷 히스토리 조회
SELECT * FROM lakehouse.sales_db."products$snapshots"
ORDER BY committed_at DESC;

출력 예시:

committed_at snapshot_id parent_id operation manifest_list

2024-01-16 14:30:00 3055729675574597005 3055729675574597004 append s3://…snap-005.avro
2024-01-15 10:30:00 3055729675574597004 NULL append s3://…snap-004.avro

5.2.2 증분 읽기 (Incremental Read)

-- 특정 스냅샷 이후 변경분만 조회
SELECT * FROM lakehouse.sales_db.products 
FOR VERSION AS OF 3055729675574597005
WHERE iceberg.snapshot_id > 3055729675574597004;

-- 시간 범위 기반 증분 조회
SELECT * FROM lakehouse.sales_db.products 
FOR TIMESTAMP AS OF TIMESTAMP '2024-01-16 00:00:00'
WHERE sale_date BETWEEN '2024-01-15' AND '2024-01-16';

Python에서 증분 읽기:

# PySpark를 통한 증분 읽기
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# 마지막 처리 스냅샷 ID 저장
last_snapshot = 3055729675574597004

# 증분 데이터 읽기
df_incremental = spark.read \
    .format("iceberg") \
    .option("start-snapshot-id", last_snapshot) \
    .option("end-snapshot-id", "current") \
    .load("lakehouse.sales_db.products")

df_incremental.show()

# 새 스냅샷 ID 저장 (다음 실행용)
new_snapshot = spark.sql("""
    SELECT snapshot_id 
    FROM lakehouse.sales_db.products.snapshots 
    ORDER BY committed_at DESC LIMIT 1
""").collect()[0][0]

print(f"Next start snapshot: {new_snapshot}")

5.3 스냅샷 롤백

5.3.1 특정 스냅샷으로 롤백

-- 롤백 전 현재 상태 확인
SELECT COUNT(*) FROM lakehouse.sales_db.products;
-- 결과: 4건

-- 스냅샷 히스토리 확인
SELECT snapshot_id, committed_at, operation 
FROM lakehouse.sales_db."products$snapshots"
ORDER BY committed_at DESC;

-- 특정 스냅샷으로 롤백
CALL lakehouse.system.rollback_to_snapshot(
    'sales_db.products', 
    3055729675574597004
);

-- 롤백 후 데이터 확인
SELECT COUNT(*) FROM lakehouse.sales_db.products;
-- 결과: 2건 (롤백됨)

5.3.2 타임스탬프 기반 롤백

-- 특정 시간으로 롤백
CALL lakehouse.system.rollback_to_timestamp(
    'sales_db.products',
    TIMESTAMP '2024-01-15 10:30:00'
);

5.3.3 롤백 안전장치 (태그 활용)

-- 중요 시점에 태그 생성
CALL lakehouse.system.create_tag(
    'sales_db.products',
    'v1.0-production',
    3055729675574597004
);

-- 태그로 롤백 (더 안전)
CALL lakehouse.system.rollback_to_tag(
    'sales_db.products',
    'v1.0-production'
);

-- 태그 목록 조회
SELECT * FROM lakehouse.sales_db."products$tags";

5.4 스키마 진화 (Schema Evolution)

-- 컬럼 추가 (기존 데이터에 NULL 값)
ALTER TABLE lakehouse.sales_db.products 
ADD COLUMN discount_rate DOUBLE;

-- 컬럼 이름 변경
ALTER TABLE lakehouse.sales_db.products 
RENAME COLUMN product_name TO name;

-- 컬럼 타입 변경 (호환 가능한 타입만)
ALTER TABLE lakehouse.sales_db.products 
ALTER COLUMN price TYPE BIGINT;

-- 컬럼 삭제 (논리적 삭제, 데이터는 유지)
ALTER TABLE lakehouse.sales_db.products 
DROP COLUMN discount_rate;

-- 스키마 히스토리 조회
SELECT * FROM lakehouse.sales_db."products$schemas";

스키마 버전 관리:

schema_id committed_at fields

0 2024-01-15 10:00:00 id, product_name, price
1 2024-01-16 11:00:00 id, name, price (renamed)
2 2024-01-16 14:00:00 id, name, price, discount_rate

5.5 파티션 진화 (Partition Evolution)

-- 초기 파티션 (일별)
CREATE TABLE lakehouse.sales_db.orders (
    order_id BIGINT,
    customer_id BIGINT,
    order_date DATE,
    amount DECIMAL(10,2)
)
USING iceberg
PARTITIONED BY (order_date);

-- 데이터가 쌓인 후 파티션 변경 (월별로 전환)
ALTER TABLE lakehouse.sales_db.orders
DROP PARTITION FIELD order_date;

ALTER TABLE lakehouse.sales_db.orders
ADD PARTITION FIELD months(order_date);

-- 기존 데이터는 일별 파티션 유지, 새 데이터는 월별 파티션
-- Iceberg가 자동으로 처리!

5.6 메타데이터 최적화

-- 작은 파일 병합 (Compaction)
CALL lakehouse.system.rewrite_data_files(
    table => 'sales_db.products',
    options => map(
        'target-file-size-bytes', '536870912',  -- 512MB
        'min-input-files', '5'
    )
);

-- 오래된 스냅샷 정리 (7일 이전)
CALL lakehouse.system.expire_snapshots(
    table => 'sales_db.products',
    older_than => TIMESTAMP '2024-01-08 00:00:00',
    retain_last => 10  -- 최소 10개 스냅샷 유지
);

-- 고아 파일 정리 (참조되지 않는 파일)
CALL lakehouse.system.remove_orphan_files(
    table => 'sales_db.products',
    older_than => TIMESTAMP '2024-01-01 00:00:00'
);

-- 메타데이터 파일 압축
CALL lakehouse.system.rewrite_manifests('sales_db.products');

6. ML 파이프라인 연계 아키텍처

6.1 전체 아키텍처

┌──────────────────────────────────────────────────────────┐
│                    ML Lifecycle                          │
├──────────────────────────────────────────────────────────┤
│                                                           │
│  ┌─────────────┐      ┌──────────────┐      ┌─────────┐│
│  │ Data Lake   │─────►│Feature Store │─────►│Training ││
│  │ (Iceberg)   │      │  (Feast)     │      │ (Spark) ││
│  └─────────────┘      └──────────────┘      └────┬────┘│
│         │                                         │      │
│         │ Trino Query                             │      │
│         │                                    ┌────▼────┐│
│  ┌──────▼──────┐                            │ MLflow  ││
│  │   EDA       │                            │Tracking ││
│  │ (Jupyter)   │                            └────┬────┘│
│  └─────────────┘                                 │      │
│                                              ┌────▼────┐│
│                                              │  Model  ││
│                                              │Registry ││
│                                              └────┬────┘│
│                                                   │      │
│                                              ┌────▼────┐│
│                                              │Inference││
│                                              │ Serving ││
│                                              └─────────┘│
└──────────────────────────────────────────────────────────┘

6.2 데이터 추출 및 특징 엔지니어링

6.2.1 Trino를 통한 데이터 추출

from trino.dbapi import connect
import pandas as pd

# Trino 연결
conn = connect(
    host='localhost',
    port=8080,
    user='datascientist',
    catalog='lakehouse',
    schema='sales_db'
)

# 학습 데이터 쿼리
query = """
SELECT 
    p.product_id,
    p.category,
    p.price,
    p.brand,
    COUNT(o.order_id) as order_count,
    SUM(o.quantity) as total_quantity,
    AVG(r.rating) as avg_rating,
    DATE_DIFF('day', p.created_at, CURRENT_DATE) as product_age_days
FROM products p
LEFT JOIN orders o ON p.product_id = o.product_id
LEFT JOIN reviews r ON p.product_id = r.product_id
WHERE o.order_date >= DATE '2024-01-01'
GROUP BY p.product_id, p.category, p.price, p.brand, p.created_at
"""

df = pd.read_sql(query, conn)
print(f"Extracted {len(df)} rows")

6.2.2 Spark를 통한 대규모 특징 추출

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("FeatureEngineering") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2") \
    .getOrCreate()

# Iceberg 테이블 직접 읽기
df = spark.read \
    .format("iceberg") \
    .load("lakehouse.sales_db.orders")

# 특징 생성
features = df.groupBy("customer_id").agg(
    count("order_id").alias("total_orders"),
    sum("amount").alias("total_spending"),
    avg("amount").alias("avg_order_value"),
    max("order_date").alias("last_order_date"),
    min("order_date").alias("first_order_date")
).withColumn(
    "customer_lifetime_days",
    datediff(col("last_order_date"), col("first_order_date"))
).withColumn(
    "avg_days_between_orders",
    col("customer_lifetime_days") / col("total_orders")
)

# Feature Store에 저장 (새 Iceberg 테이블)
features.writeTo("lakehouse.ml_features.customer_features") \
    .using("iceberg") \
    .createOrReplace()

6.3 Feature Store 연동 (Feast)

6.3.1 Feast 설정

# feature_repo/feature_store.yaml
project: sales_ml
registry: s3://lakehouse/feast/registry.db
provider: local
online_store:
    type: redis
    connection_string: "localhost:6379"
offline_store:
    type: file  # 또는 spark

Feature 정의:

# feature_repo/features.py
from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource

# Entity 정의
customer = Entity(
    name="customer_id",
    value_type=ValueType.INT64,
    description="고객 ID"
)

# Data Source (Iceberg 테이블 참조)
customer_source = FileSource(
    path="s3://lakehouse/warehouse/ml_features.db/customer_features",
    event_timestamp_column="last_order_date",
    created_timestamp_column="feature_created_at"
)

# Feature View
customer_features_view = FeatureView(
    name="customer_features",
    entities=["customer_id"],
    ttl=timedelta(days=30),
    features=[
        Feature(name="total_orders", dtype=ValueType.INT64),
        Feature(name="total_spending", dtype=ValueType.DOUBLE),
        Feature(name="avg_order_value", dtype=ValueType.DOUBLE),
    ],
    online=True,
    batch_source=customer_source,
)

Feature Store 배포:

cd feature_repo
feast apply  # Registry에 등록
feast materialize-incremental $(date +%Y-%m-%d)  # Online Store 동기화

6.4 MLflow 기반 실험 관리

6.4.1 MLflow 서버 설정

# docker-compose.yml에 추가
mlflow:
  image: python:3.10-slim
  container_name: mlflow
  ports:
    - "5000:5000"
  volumes:
    - ./mlruns:/mlruns
  command: >
    bash -c "pip install mlflow boto3 &&
             mlflow server 
             --backend-store-uri sqlite:///mlruns/mlflow.db
             --default-artifact-root s3://lakehouse/mlflow-artifacts
             --host 0.0.0.0"
  environment:
    AWS_ACCESS_KEY_ID: minioadmin
    AWS_SECRET_ACCESS_KEY: minioadmin123
    MLFLOW_S3_ENDPOINT_URL: http://minio:9000
  networks:
    - datalake

6.4.2 학습 파이프라인

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

# MLflow 설정
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("customer_churn_prediction")

# Iceberg에서 데이터 로드 (Trino 경유)
from trino.dbapi import connect
import pandas as pd

conn = connect(host='localhost', port=8080, catalog='lakehouse', schema='ml_features')
df = pd.read_sql("SELECT * FROM customer_features", conn)

# 레이블 생성 (예: 이탈 고객 여부)
df['is_churned'] = (df['avg_days_between_orders'] > 90).astype(int)

X = df[['total_orders', 'total_spending', 'avg_order_value']]
y = df['is_churned']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# MLflow Run 시작
with mlflow.start_run(run_name="rf_v1"):
    # 하이퍼파라미터 로깅
    params = {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_split": 5
    }
    mlflow.log_params(params)
    
    # 모델 학습
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    
    # 예측 및 평가
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    # 메트릭 로깅
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1_score", f1)
    
    # 모델 저장 (S3/MinIO)
    mlflow.sklearn.log_model(model, "model")
    
    # 데이터셋 버전 정보 로깅
    snapshot_id = pd.read_sql("""
        SELECT snapshot_id 
        FROM lakehouse.ml_features.customer_features.snapshots 
        ORDER BY committed_at DESC LIMIT 1
    """, conn).iloc[0, 0]
    
    mlflow.log_param("iceberg_snapshot_id", snapshot_id)
    
    print(f"✅ Model logged with accuracy: {accuracy:.3f}")

6.5 모델 서빙 아키텍처

6.5.1 Batch Inference (Spark)

import mlflow.pyfunc
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# MLflow에서 모델 로드
model_uri = "models:/customer_churn_model/Production"
model = mlflow.pyfunc.spark_udf(spark, model_uri, result_type="double")

# 추론 대상 데이터 (Iceberg)
df = spark.read \
    .format("iceberg") \
    .load("lakehouse.ml_features.customer_features")

# 배치 예측
predictions = df.withColumn(
    "churn_probability",
    model(struct("total_orders", "total_spending", "avg_order_value"))
)

# 결과 저장 (새 Iceberg 테이블)
predictions.writeTo("lakehouse.ml_results.churn_predictions") \
    .using("iceberg") \
    .partitionedBy("prediction_date") \
    .createOrReplace()

6.5.2 Real-time Inference (FastAPI)

from fastapi import FastAPI
from pydantic import BaseModel
import mlflow.pyfunc
import pandas as pd
from feast import FeatureStore

app = FastAPI()

# 모델 로드 (시작 시 1회)
model = mlflow.pyfunc.load_model("models:/customer_churn_model/Production")
feast_store = FeatureStore(repo_path="./feature_repo")

class PredictionRequest(BaseModel):
    customer_id: int

@app.post("/predict")
def predict_churn(request: PredictionRequest):
    # Feature Store에서 실시간 특징 조회
    features = feast_store.get_online_features(
        features=[
            "customer_features:total_orders",
            "customer_features:total_spending",
            "customer_features:avg_order_value"
        ],
        entity_rows=[{"customer_id": request.customer_id}]
    ).to_df()
    
    # 예측
    prediction = model.predict(features)
    
    return {
        "customer_id": request.customer_id,
        "churn_probability": float(prediction[0]),
        "risk_level": "High" if prediction[0] > 0.7 else "Low"
    }

# 실행: uvicorn main:app --host 0.0.0.0 --port 8000

6.6 운영 시 주의점

6.6.1 데이터 버전 추적

# 학습 데이터의 Iceberg 스냅샷 ID를 반드시 기록
with mlflow.start_run():
    # 데이터 버전 정보
    snapshot_info = spark.sql("""
        SELECT snapshot_id, committed_at 
        FROM lakehouse.ml_features.customer_features.snapshots 
        ORDER BY committed_at DESC LIMIT 1
    """).collect()[0]
    
    mlflow.log_param("data_snapshot_id", snapshot_info.snapshot_id)
    mlflow.log_param("data_snapshot_time", snapshot_info.committed_at)
    
    # 나중에 재현 가능
    # df = spark.read.format("iceberg")
    #     .option("snapshot-id", snapshot_info.snapshot_id)
    #     .load("lakehouse.ml_features.customer_features")

6.6.2 특징 드리프트 모니터링

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

# 학습 데이터 (과거 스냅샷)
reference_df = spark.read \
    .format("iceberg") \
    .option("snapshot-id", 3055729675574597004) \
    .load("lakehouse.ml_features.customer_features") \
    .toPandas()

# 현재 데이터
current_df = spark.read \
    .format("iceberg") \
    .load("lakehouse.ml_features.customer_features") \
    .toPandas()

# 드리프트 분석
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=reference_df, current_data=current_df)

# S3에 리포트 저장
report.save_html("s3://lakehouse/ml-monitoring/drift_report.html")

6.6.3 모델 재학습 트리거

# Airflow DAG 예제
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def check_data_freshness():
    """새 데이터 확인"""
    latest_snapshot_time = spark.sql("""
        SELECT MAX(committed_at) 
        FROM lakehouse.ml_features.customer_features.snapshots
    """).collect()[0][0]
    
    if (datetime.now() - latest_snapshot_time).days > 7:
        raise Exception("Data is stale! Trigger retraining.")

def retrain_model():
    """모델 재학습 (위 코드 재사용)"""
    # ... 학습 로직

dag = DAG(
    'ml_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@weekly',
    catchup=False
)

task_check = PythonOperator(
    task_id='check_data',
    python_callable=check_data_freshness,
    dag=dag
)

task_train = PythonOperator(
    task_id='retrain_model',
    python_callable=retrain_model,
    dag=dag
)

task_check >> task_train

7. Kubernetes 기반 확장 설계

7.1 Helm Chart 구조

datalake-helm/
├── Chart.yaml
├── values.yaml
├── templates/
│   ├── minio/
│   │   ├── statefulset.yaml
│   │   ├── service.yaml
│   │   └── pvc.yaml
│   ├── trino/
│   │   ├── coordinator-deployment.yaml
│   │   ├── worker-deployment.yaml
│   │   └── service.yaml
│   ├── hive-metastore/
│   │   └── deployment.yaml
│   └── ingress.yaml
└── values/
    ├── dev.yaml
    ├── staging.yaml
    └── production.yaml

7.2 MinIO StatefulSet

# templates/minio/statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: minio
spec:
  serviceName: minio-headless
  replicas: 4  # 4노드 분산 배포
  selector:
    matchLabels:
      app: minio
  template:
    metadata:
      labels:
        app: minio
    spec:
      containers:
      - name: minio
        image: minio/minio:RELEASE.2024-01-11T07-46-16Z
        args:
        - server
        - http://minio-{0...3}.minio-headless.default.svc.cluster.local/data
        env:
        - name: MINIO_ROOT_USER
          valueFrom:
            secretKeyRef:
              name: minio-secret
              key: root-user
        - name: MINIO_ROOT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: minio-secret
              key: root-password
        ports:
        - containerPort: 9000
          name: api
        - containerPort: 9001
          name: console
        volumeMounts:
        - name: data
          mountPath: /data
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
          limits:
            memory: "8Gi"
            cpu: "4"
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: fast-ssd
      resources:
        requests:
          storage: 500Gi

7.3 Trino 분산 배포

Coordinator

# templates/trino/coordinator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: trino-coordinator
spec:
  replicas: 1  # Coordinator는 단일 인스턴스
  selector:
    matchLabels:
      app: trino
      component: coordinator
  template:
    metadata:
      labels:
        app: trino
        component: coordinator
    spec:
      containers:
      - name: trino
        image: trinodb/trino:435
        ports:
        - containerPort: 8080
          name: http
        env:
        - name: COORDINATOR
          value: "true"
        - name: WORKERS
          value: "trino-worker:8080"
        volumeMounts:
        - name: config
          mountPath: /etc/trino
        - name: catalog
          mountPath: /etc/trino/catalog
        resources:
          requests:
            memory: "8Gi"
            cpu: "4"
          limits:
            memory: "16Gi"
            cpu: "8"
      volumes:
      - name: config
        configMap:
          name: trino-config
      - name: catalog
        configMap:
          name: trino-catalog

Worker

# templates/trino/worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: trino-worker
spec:
  replicas: 5  # Worker는 수평 확장
  selector:
    matchLabels:
      app: trino
      component: worker
  template:
    metadata:
      labels:
        app: trino
        component: worker
    spec:
      containers:
      - name: trino
        image: trinodb/trino:435
        env:
        - name: COORDINATOR
          value: "false"
        - name: DISCOVERY_URI
          value: "http://trino-coordinator:8080"
        volumeMounts:
        - name: config
          mountPath: /etc/trino
        resources:
          requests:
            memory: "16Gi"
            cpu: "8"
          limits:
            memory: "32Gi"
            cpu: "16"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: trino-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: trino-worker
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

7.4 Ingress 및 TLS 설정

# templates/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: datalake-ingress
  annotations:
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
spec:
  tls:
  - hosts:
    - trino.company.com
    - minio.company.com
    secretName: datalake-tls
  rules:
  - host: trino.company.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: trino-coordinator
            port:
              number: 8080
  - host: minio.company.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: minio
            port:
              number: 9001

7.5 배포 및 관리

# Helm Chart 설치
helm install datalake ./datalake-helm \
  -f values/production.yaml \
  --namespace datalake \
  --create-namespace

# 업그레이드
helm upgrade datalake ./datalake-helm \
  -f values/production.yaml \
  --namespace datalake

# Worker 수 조정
kubectl scale deployment trino-worker --replicas=10 -n datalake

# 롤링 업데이트 (무중단)
kubectl set image deployment/trino-worker \
  trino=trinodb/trino:436 \
  -n datalake

# 모니터링
kubectl logs -f deployment/trino-coordinator -n datalake
kubectl top pods -n datalake

8. React UI 연동 구현

8.1 아키텍처

┌─────────────────────────────────────────┐
│         React Frontend (Port 3000)      │
│  - Data Catalog                         │
│  - Query Editor                         │
│  - Dashboard                            │
└──────────────┬──────────────────────────┘
               │ REST API
┌──────────────▼──────────────────────────┐
│      Backend API (FastAPI/Flask)        │
│  - Trino Connection Pool                │
│  - Query Optimization                   │
│  - Authentication                       │
└──────────────┬──────────────────────────┘
               │ JDBC/HTTP
┌──────────────▼──────────────────────────┐
│          Trino Coordinator              │
└─────────────────────────────────────────┘

8.2 Backend API (FastAPI)

# backend/main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from trino.dbapi import connect
from trino.auth import BasicAuthentication
import pandas as pd

app = FastAPI(title="Data Lake API")

# CORS 설정
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Trino 연결 풀
def get_trino_connection():
    return connect(
        host='trino-coordinator',
        port=8080,
        user='api_user',
        catalog='lakehouse',
        schema='sales_db',
        auth=BasicAuthentication("api_user", "password")
    )

class QueryRequest(BaseModel):
    sql: str
    limit: int = 1000

@app.post("/api/query")
async def execute_query(request: QueryRequest):
    """SQL 쿼리 실행"""
    try:
        conn = get_trino_connection()
        cursor = conn.cursor()
        
        # SQL Injection 방지 (간단한 검증)
        if any(keyword in request.sql.upper() for keyword in ['DROP', 'DELETE', 'TRUNCATE']):
            raise HTTPException(status_code=403, detail="Destructive queries not allowed")
        
        # 쿼리 실행
        cursor.execute(f"{request.sql} LIMIT {request.limit}")
        
        # 결과 변환
        columns = [desc[0] for desc in cursor.description]
        rows = cursor.fetchall()
        
        return {
            "columns": columns,
            "data": [dict(zip(columns, row)) for row in rows],
            "row_count": len(rows)
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/catalogs")
async def list_catalogs():
    """카탈로그 목록 조회"""
    conn = get_trino_connection()
    df = pd.read_sql("SHOW CATALOGS", conn)
    return {"catalogs": df['Catalog'].tolist()}

@app.get("/api/schemas/{catalog}")
async def list_schemas(catalog: str):
    """스키마 목록 조회"""
    conn = get_trino_connection()
    df = pd.read_sql(f"SHOW SCHEMAS FROM {catalog}", conn)
    return {"schemas": df['Schema'].tolist()}

@app.get("/api/tables/{catalog}/{schema}")
async def list_tables(catalog: str, schema: str):
    """테이블 목록 조회"""
    conn = get_trino_connection()
    df = pd.read_sql(f"SHOW TABLES FROM {catalog}.{schema}", conn)
    return {"tables": df['Table'].tolist()}

@app.get("/api/table-info/{catalog}/{schema}/{table}")
async def get_table_info(catalog: str, schema: str, table: str):
    """테이블 상세 정보"""
    conn = get_trino_connection()
    
    # 컬럼 정보
    columns_df = pd.read_sql(
        f"DESCRIBE {catalog}.{schema}.{table}", 
        conn
    )
    
    # 스냅샷 정보 (Iceberg 전용)
    try:
        snapshots_df = pd.read_sql(
            f"SELECT * FROM {catalog}.{schema}.\"{table}$snapshots\" ORDER BY committed_at DESC LIMIT 10",
            conn
        )
        snapshots = snapshots_df.to_dict('records')
    except:
        snapshots = []
    
    # 파티션 정보
    try:
        partitions_df = pd.read_sql(
            f"SELECT * FROM {catalog}.{schema}.\"{table}$partitions\" LIMIT 100",
            conn
        )
        partitions = partitions_df.to_dict('records')
    except:
        partitions = []
    
    return {
        "columns": columns_df.to_dict('records'),
        "snapshots": snapshots,
        "partitions": partitions
    }

# uvicorn main:app --host 0.0.0.0 --port 8000

8.3 React Frontend

8.3.1 프로젝트 구조

frontend/
├── src/
│   ├── components/
│   │   ├── QueryEditor.jsx
│   │   ├── DataCatalog.jsx
│   │   ├── TableViewer.jsx
│   │   └── SnapshotTimeline.jsx
│   ├── services/
│   │   └── api.js
│   ├── App.jsx
│   └── main.jsx
├── package.json
└── vite.config.js

8.3.2 API 서비스

// src/services/api.js
import axios from 'axios';

const API_BASE_URL = 'http://localhost:8000/api';

const api = axios.create({
  baseURL: API_BASE_URL,
  timeout: 30000,
  headers: {
    'Content-Type': 'application/json',
  },
});

export const executeQuery = async (sql, limit = 1000) => {
  const response = await api.post('/query', { sql, limit });
  return response.data;
};

export const getCatalogs = async () => {
  const response = await api.get('/catalogs');
  return response.data.catalogs;
};

export const getSchemas = async (catalog) => {
  const response = await api.get(`/schemas/${catalog}`);
  return response.data.schemas;
};

export const getTables = async (catalog, schema) => {
  const response = await api.get(`/tables/${catalog}/${schema}`);
  return response.data.tables;
};

export const getTableInfo = async (catalog, schema, table) => {
  const response = await api.get(`/table-info/${catalog}/${schema}/${table}`);
  return response.data;
};

8.3.3 쿼리 에디터 컴포넌트

// src/components/QueryEditor.jsx
import React, { useState } from 'react';
import { executeQuery } from '../services/api';
import { Box, Button, Textarea, Table } from '@chakra-ui/react';

export const QueryEditor = () => {
  const [sql, setSql] = useState('SELECT * FROM lakehouse.sales_db.products');
  const [result, setResult] = useState(null);
  const [loading, setLoading] = useState(false);

  const handleExecute = async () => {
    setLoading(true);
    try {
      const data = await executeQuery(sql);
      setResult(data);
    } catch (error) {
      alert(`Error: ${error.message}`);
    } finally {
      setLoading(false);
    }
  };

  return (
    <Box>
      <Textarea
        value={sql}
        onChange={(e) => setSql(e.target.value)}
        placeholder="Enter SQL query..."
        size="lg"
        fontFamily="monospace"
        minHeight="200px"
      />
      <Button 
        onClick={handleExecute} 
        isLoading={loading}
        colorScheme="blue"
        mt={4}
      >
        Execute Query
      </Button>

      {result && (
        <Box mt={6}>
          <Text>Rows: {result.row_count}</Text>
          <Table variant="simple" mt={4}>
            <Thead>
              <Tr>
                {result.columns.map(col => (
                  <Th key={col}>{col}</Th>
                ))}
              </Tr>
            </Thead>
            <Tbody>
              {result.data.map((row, idx) => (
                <Tr key={idx}>
                  {result.columns.map(col => (
                    <Td key={col}>{row[col]}</Td>
                  ))}
                </Tr>
              ))}
            </Tbody>
          </Table>
        </Box>
      )}
    </Box>
  );
};

8.3.4 데이터 카탈로그 컴포넌트

// src/components/DataCatalog.jsx
import React, { useState, useEffect } from 'react';
import { getCatalogs, getSchemas, getTables, getTableInfo } from '../services/api';
import { Box, VStack, Accordion, AccordionItem, AccordionButton, AccordionPanel } from '@chakra-ui/react';

export const DataCatalog = ({ onTableSelect }) => {
  const [catalogs, setCatalogs] = useState([]);
  const [schemas, setSchemas] = useState({});
  const [tables, setTables] = useState({});

  useEffect(() => {
    loadCatalogs();
  }, []);

  const loadCatalogs = async () => {
    const data = await getCatalogs();
    setCatalogs(data);
  };

  const loadSchemas = async (catalog) => {
    if (!schemas[catalog]) {
      const data = await getSchemas(catalog);
      setSchemas({ ...schemas, [catalog]: data });
    }
  };

  const loadTables = async (catalog, schema) => {
    const key = `${catalog}.${schema}`;
    if (!tables[key]) {
      const data = await getTables(catalog, schema);
      setTables({ ...tables, [key]: data });
    }
  };

  const handleTableClick = async (catalog, schema, table) => {
    const info = await getTableInfo(catalog, schema, table);
    onTableSelect({ catalog, schema, table, info });
  };

  return (
    <Box width="300px" borderRight="1px" borderColor="gray.200" p={4}>
      <Accordion allowMultiple>
        {catalogs.map(catalog => (
          <AccordionItem key={catalog}>
            <AccordionButton onClick={() => loadSchemas(catalog)}>
              📦 {catalog}
            </AccordionButton>
            <AccordionPanel>
              {schemas[catalog]?.map(schema => (
                <Accordion key={schema} allowMultiple>
                  <AccordionItem>
                    <AccordionButton onClick={() => loadTables(catalog, schema)}>
                      📂 {schema}
                    </AccordionButton>
                    <AccordionPanel>
                      {tables[`${catalog}.${schema}`]?.map(table => (
                        <Box 
                          key={table}
                          p={2}
                          cursor="pointer"
                          _hover={{ bg: 'gray.100' }}
                          onClick={() => handleTableClick(catalog, schema, table)}
                        >
                          📄 {table}
                        </Box>
                      ))}
                    </AccordionPanel>
                  </AccordionItem>
                </Accordion>
              ))}
            </AccordionPanel>
          </AccordionItem>
        ))}
      </Accordion>
    </Box>
  );
};

8.3.5 스냅샷 타임라인

// src/components/SnapshotTimeline.jsx
import React from 'react';
import { Box, Text, VStack, Badge } from '@chakra-ui/react';
import { format } from 'date-fns';

export const SnapshotTimeline = ({ snapshots }) => {
  return (
    <VStack align="stretch" spacing={4}>
      <Text fontSize="xl" fontWeight="bold">Snapshot History</Text>
      {snapshots.map((snap, idx) => (
        <Box 
          key={snap.snapshot_id}
          p={4}
          borderLeft="4px"
          borderColor={idx === 0 ? 'green.500' : 'gray.300'}
          bg={idx === 0 ? 'green.50' : 'white'}
        >
          <Text fontWeight="bold">
            Snapshot #{snap.snapshot_id}
            {idx === 0 && <Badge ml={2} colorScheme="green">CURRENT</Badge>}
          </Text>
          <Text fontSize="sm" color="gray.600">
            {format(new Date(snap.committed_at), 'yyyy-MM-dd HH:mm:ss')}
          </Text>
          <Text fontSize="sm">Operation: {snap.operation}</Text>
          <Text fontSize="sm">Parent: {snap.parent_id || 'None'}</Text>
        </Box>
      ))}
    </VStack>
  );
};

8.4 Docker Compose에 추가

# React 개발 서버
react-ui:
  build:
    context: ./frontend
    dockerfile: Dockerfile
  ports:
    - "3000:3000"
  volumes:
    - ./frontend:/app
    - /app/node_modules
  environment:
    - REACT_APP_API_URL=http://localhost:8000
  networks:
    - datalake

# Backend API
api-server:
  build:
    context: ./backend
    dockerfile: Dockerfile
  ports:
    - "8000:8000"
  environment:
    - TRINO_HOST=trino
    - TRINO_PORT=8080
  depends_on:
    - trino
  networks:
    - datalake

9. 운영 및 모니터링 가이드

9.1 로깅 및 메트릭

9.1.1 Prometheus + Grafana 스택

# docker-compose.yml에 추가
prometheus:
  image: prom/prometheus:latest
  ports:
    - "9090:9090"
  volumes:
    - ./configs/prometheus.yml:/etc/prometheus/prometheus.yml
  networks:
    - datalake

grafana:
  image: grafana/grafana:latest
  ports:
    - "3001:3000"
  environment:
    - GF_SECURITY_ADMIN_PASSWORD=admin
  volumes:
    - ./grafana-data:/var/lib/grafana
  networks:
    - datalake

prometheus.yml:

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'minio'
    static_configs:
      - targets: ['minio:9000']
    metrics_path: /minio/v2/metrics/cluster

  - job_name: 'trino'
    static_configs:
      - targets: ['trino:8080']
    metrics_path: /v1/info

9.1.2 주요 모니터링 지표

MinIO 메트릭:

  • minio_cluster_capacity_raw_total_bytes: 전체 용량
  • minio_cluster_capacity_raw_free_bytes: 여유 공간
  • minio_s3_requests_total: API 요청 수
  • minio_s3_errors_total: 에러 수

Trino 메트릭:

  • trino.execution.QueuedQueries: 대기 중인 쿼리
  • trino.execution.RunningQueries: 실행 중인 쿼리
  • trino.memory.AssignedQueries: 메모리 할당량
  • trino.execution.QueryWallTime: 쿼리 실행 시간

9.2 성능 최적화 체크리스트

## 데이터 레이아웃 최적화
- [ ] 파티셔닝 전략 수립 (날짜/카테고리)
- [ ] 파일 크기 최적화 (128MB ~ 512MB)
- [ ] Compaction 주기 설정 (주 1회)
- [ ] Orphan file 정리 (월 1회)

## 쿼리 최적화
- [ ] Predicate Pushdown 활용
- [ ] Partition Pruning 확인
- [ ] 통계 정보 수집 (ANALYZE TABLE)
- [ ] Dynamic Filtering 활성화

## 인프라 최적화
- [ ] SSD 스토리지 사용
- [ ] 네트워크 대역폭 확보 (10Gbps+)
- [ ] Trino Worker 메모리 증설
- [ ] MinIO Erasure Coding 설정

9.3 백업 및 복구 절차

# MinIO 데이터 백업 (mc 클라이언트)
mc mirror --preserve myminio/lakehouse backup-server/lakehouse-backup

# Iceberg 메타데이터 백업
mc cp --recursive myminio/lakehouse/warehouse/*/metadata/ \
  backup-server/metadata-backup/$(date +%Y%m%d)/

# PostgreSQL Metastore 백업
docker exec postgres pg_dump -U hive metastore > metastore_backup.sql

# 복구 절차
# 1. MinIO 데이터 복원
mc mirror backup-server/lakehouse-backup myminio/lakehouse

# 2. Metastore 복원
docker exec -i postgres psql -U hive metastore < metastore_backup.sql

# 3. Iceberg 메타데이터 정합성 확인
trino --execute "SELECT * FROM lakehouse.information_schema.tables"

10. 요약 및 다음 단계

10.1 구축 완료 체크리스트

✅ **로컬 환경 (Windows 11)**
- [ ] Docker Desktop 설치 및 WSL 2 설정
- [ ] MinIO 클러스터 구성 (docker-compose)
- [ ] Trino + Hive Metastore 연동
- [ ] Spark를 통한 Iceberg 테이블 생성
- [ ] Jupyter Notebook 환경 구축

✅ **데이터 파이프라인**
- [ ] 데이터 적재 파이프라인 구현
- [ ] 타임 트래블 기능 테스트
- [ ] 스냅샷 롤백 시나리오 검증
- [ ] 스키마 진화 테스트

✅ **ML 연계**
- [ ] Feature Store (Feast) 설정
- [ ] MLflow 실험 추적
- [ ] 배치 추론 파이프라인
- [ ] 실시간 추론 API

✅ **UI 개발**
- [ ] React 프론트엔드 구축
- [ ] FastAPI 백엔드 개발
- [ ] 데이터 카탈로그 구현
- [ ] 쿼리 에디터 통합

✅ **엔터프라이즈 준비**
- [ ] LDAP/AD 인증 연동
- [ ] TLS 암호화 설정
- [ ] 감사 로그 구성
- [ ] Kubernetes 마이그레이션 계획

10.2 다음 단계 로드맵

Phase 1: 안정화 (1-2개월)

  1. 로컬 환경 철저한 테스트
  2. 성능 벤치마크 (TPC-DS 쿼리)
  3. 장애 시나리오 시뮬레이션

Phase 2: 프로덕션 준비 (2-3개월)

  1. Kubernetes 클러스터 구축
  2. CI/CD 파이프라인 (GitLab/Jenkins)
  3. 보안 강화 (Vault, IAM)

Phase 3: 엔터프라이즈 배포 (3-6개월)

  1. 온프레미스 하드웨어 프로비저닝
  2. 네트워크 격리 및 방화벽 설정
  3. 데이터 거버넌스 정책 수립
  4. 운영팀 교육

Phase 4: 확장 (6개월+)

  1. 멀티 리전 복제
  2. 실시간 스트리밍 (Kafka + Flink)
  3. 고급 분석 (Superset, Metabase)

10.3 참고 자료

공식 문서:

커뮤니티:

벤치마크:


결론

이 가이드는 완전 오픈소스 기반 Data Lake 구축의 전 과정을 다룹니다:

  1. 로컬 개발 환경: Windows 11 + Docker로 빠른 프로토타이핑
  2. 프로덕션 아키텍처: Kubernetes 기반 고가용성 설계
  3. 엔터프라이즈 보안: 인증, 암호화, 감사 로그
  4. ML 통합: Feature Store + MLflow + 추론 서빙
  5. 사용자 경험: React 기반 직관적 UI

핵심 강점:

  • ✅ 클라우드 벤더 독립적 (AWS/Azure/GCP 이관 자유)
  • ✅ TCO 절감 (라이선스 비용 제로)
  • ✅ 완전한 데이터 통제 (규정 준수 용이)
  • ✅ 대기업/공공기관 아키텍처 표준 부합

지금 바로 docker-compose up으로 시작하세요! 🚀

반응형