NIRVANA
[Kafka] Kafka 구성 요소 살펴보고 간단 실습해보기 본문
📌Kafka란?
2008년 링크드인에서 내부 실시간 데이터 처리를 위해 개발한 소프트웨어 플랫폼으로 Scala와 Java로 작성 되었다. 사용자의 event data를 실시간으로 수집하고 바로 action 취하기 위해 사용되며 2011년 초에 아파치 재단에 프로젝트로 편입 되어 오픈소스화되었다.
- 실시간 데이터를 처리 하기 위해 설계된 오픈소스 분산 스트리밍 플랫폼
- Scalability와 Fault Tolerance를 제공하는 Publish-Subscription 메시징 시스템
- Publish-Subscription
: producer가 생성한 data를 저장하는 메시지 큐를 두어 producer와 consumer가 독립적으로 실행될 수 있도록 진행 → 시스템의 안정성을 높이고 다수의 consumer가 동일한 data를 소비할 수 있도록 함- Publisher(producer)
- data를 생성하는 프로세스
- 생성한 data를 큐에 저장
- Subscriber(Consumer)
- 큐에 저장된 data를 소비하는 프로세스
- Publisher(producer)
- Publish-Subscription
- High Throughput과 Low Latency를 지원, 실시간 데이터 처리에 맞게 구현
- 분산 아키텍처를 따르므로 Scale Out이란 형태로 스케일 가능
- 정해진 보유 기간(retention period) 동안 메시지 저장
📌Kafka의 특징
1. 보유 기간 동안 메시지를 저장
기존 메시징 시스템의 경우 큐에 저장된 데이터를 consumer가 읽으면 큐에서 바로 제거하는 시스템이었으나, 카프카의 경우 Consumer가 데이터를 읽었는지 여부와 상관 없이 지정된 보유 기간 동안 데이터를 저장
- 기본 보유 기간은 일주일
- 기간 외에 하나의 topic이 가질 수 있는 data 크기 동안 data를 저장하는 방식도 가능
2. 메시지의 생산과 소비를 분리
Kafka Broker를 통해 Producer와 Consumer는 각각 독립적으로 메시지를 생산하고 소비하므로 생산자와 소비자가 각자의 속도에 맞춰 독립적으로 작업할 수 있음
이를 통해 시스템의 안전성을 높이고 back pressure 문제의 발생을 감소 시킬 수 있음
- back pressure
- 데이터의 생산 속도와 소비 속도의 불균형 시에 발생하는 문제. producer의 생산 속도에 비해 consumer의 소비 속도가 느려 시스템에 부하가 발생하는 것을 의미함
3. 한 partition 내에서 메시지 순서를 보장
partition 내부에서 Append-only 방식으로 메시지를 추가하고, consumer는 메시지를 읽을 때 offset을 순서대로 소비하므로 하나의 partition 내부에서는 메시지 순서를 보장할 수 있지만, 다수의 partition에 걸쳐 토픽이 생성될 경우에는 전체 토픽에 대한 순서 보장이 어려움. 해당 토픽에 대한 메시지 순서는 토픽 생성 시에 설정한 Consistency 모델에 따라 달라지게 됨
- Eventual Consistency
- Producer가 Topic event에 데이터를 작성할 때, n개의 Partition 복제본에 정보가 전파될 때 까지 기다린 후에 데이터 return
- write 작업 시에는 n개의 복제본에까지의 작성을 보장하므로 시간이 (비교적) 적게 소요되나, Read 작업 시에는 data 일관성을 보장하지 못한다는 단점 존재
- Strong Consistency
- Producer가 Topic event에 데이터를 작성할 때, Partition 복제본 전부에 전달 될 때까지 대기 후에 데이터 return
- write 작업 시에는 전체 복제본에 모두 작성을 보장해야 하므로 시간이 (비교적) 오래 소요되나, Read 작업 시에는 data의 일관성을 보장할 수 있다는 장점 존재
+) kafka는 하나의 Topic을 여러 개의 Partiton으로 나누어서 분산 저장하게 됨
이때, 하나의 Partiton 내부에서는 각 event의 순서를 보장가능하나, 전체 partition에 대해서는 순서 보장이 어려움
📌Kafka Cluster 구성 요소

Topic
: Kafka에서 데이터를 스트리밍 방식으로 다루는 핵심 단위. kafka에서는 데이터 이벤트 스트림을 Topic이라고 부른다.
- Topic은 다수의 event로 구성되며, event는 timestamp를 기반으로 정렬됨
- 즉, topic은 다수의 시간 순으로 정렬된 message들로 구성됨
- offset이 존재하여 0부터 n까지의 일련번호가 붙음
- topic은 저장될 시에 확장성을 위해 다수의 partition으로 나누어져 저장되고, 각 partition은 각각 다른 broker 서버에 저장됨
- Topic은 producer에 의해 생성되며 정해진 보유 기간 동안 데이터를 유지함
- Fault Tolerance를 위해 여러 broker에 중복되어 저장됨
Event(Message) 구조

- 최대 1MB의 크기를 가짐
- Timestamp는 보통 데이터가 topic에 추가된 시점을 의미
- Key 자체도 복잡한 구조를 가질 수 있으며, Key는 partitioning 시에 Partition Key로 사용됨
- Header는 선택적 구성 요소, Key-Value에 대한 경량 메타 데이터
Partition

하나의 topic은 다수의 partition으로 나누어져 broker에 나누어 저장되고, 각 partition은 fail-over를 위해 복제본을 생성하게 된다. 이때, 복제 되어 여러개가 된 partition은 내부에서 leader와 follower와 나누어지고, 쓰기 작업은 leader를 통해서만 진행하게 된다. (읽기 작업의 경우 leader와 follower가 모두 참여함)
- 하나의 event가 어떤 partition에 속하는 지를 결정하는 방법은 key의 유무에 따라 달라지게 됨
- 키가 존재하면 hashing값을 partition의 수로 나눈 나머지로 결정
- 키가 없다면 라운드 로빈 방식으로 결정
- 하나의 partition은 여러 개의 segment로 구성됨. 이때, segment는 변경되지 않는 추가만 되는 로그파일로 일종의 commit-log로 볼 수 있음
- 각 segment는 디스크 상에 존재하는 하나의 파일로, 최대 크기가 존재하며 정해진 크기를 넘어가면 새로운 segment 파일을 생성함
- 각 segment는 데이터 오프셋 범위를 가짐
- partition의 특성은 사실상 Segment의 특성으로 볼 수 있다.
- immutable
- append-only
Kafka Broker
producer가 생성한 실제 데이터를 저장하는 서버. kafka cluster는 기본적으로 다수의 broker로 구성되며 여기에 원활한 관리와 부가 기능을 위한 다른 서비스들이 추가 되는 형식이다. (ex, zookeeper)
- 실제 producer, consumer와 통신을 수행
- Broker는 물리 서버 혹은 VM위에서 동작하며, 해당 서버의 디스크에 partition 데이터들을 기록
- 하나의 broker는 다수의 partition을 관리 및 운영하는 역할을 맡음
- Broker의 수를 늘림으로써 클러스터 용량을 늘릴 수 있음(Scale out)
Producer
- topic으로 전송되는 데이터를 생성하는 프로세스
- 대부분의 프로그래밍 언어로 작성 가능
- event가 들어갈 partition 지정을 producer가 선택하게 함
- 기본 partition 선택: hash(key) % parition의 수
- 라운드 로빈
- 그외 커스텀 로직
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka import KafkaAdminClient
from kafka.producer import KafkaProducer
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
create_topic(bootstrap_servers, topic_name, 4)
#카프카 producer 생성
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
client_id="Fake_Person_Producer", #카프카 프로듀서 id 지정
)
#토픽 생성 함수
def create_topic (bootstrap_servers, name, partitions, replica=1):
#카프카 브로커와 연결되는 관리자 클라이언트 객체 생성
#토픽 생성, 삭제, 설정 변경 등의 관리 작업 수행 시에 사용
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
try:
#kafka 토픽 생성에 필요한 정보를 담은 객체 생성
#해당 객체 자체가 토픽을 생성하는 것 X → 토픽을 만들기 위한 설정을 담는 것 O
topic = NewTopic(
name=name, #토픽 이름 지정
num_partitions=partitions, #이벤트를 나누어 저장할 파티션 개수
replication_factor=replica #파티션당 복제본 생성 개수
)
#kafka에 실제로 토픽을 생성하는 함수, 리스트 형태로 전달
client.create_topics([topic])
except TopicAlreadyExistsError as e:
print(e)
pass
finally:
client.close()
Consumer
- topic을 기반으로 message를 읽는 프로세스
- offset을 가지며, 마지막으로 읽어들인 message 위치 정보를 기억
- consumer group이라는 개념으로 scaling이 가능하며, 하나의 topic을 여러 consumer가 읽게할 수 있음
- 동일한 그룹에 속한 consumer들은 토픽의 파티션을 서로 나우어 병렬로 처리
- 각 파티션은 하나의 consumer에게만 할당 되며, 그룹 내에서 메시지가 중복처리 되지 않도록 함
- 서로 다른 consumer group이 동일한 topic을 구독할 수 있으며, 이는 메시지가 여러 번 처리 됨을 의미함
from kafka import TopicPartition, OffsetAndMetadata
from kafka.consumer import KafkaConsumer
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "manual_fake_group"
consumer = KafkaConsumer(
bootstrap_servers = bootstrap_servers, #연결할 카프카 브로커 주소
group_id = consumer_groupt_id, #컨슈머가 속한 컨슈머 그룹 지정
key_deserializer=key_deserializer, #key 역직렬화
value_deserializer=value_deserializer, #value 역직렬화
auto_offset_reset='earliest', #파티션 데이터를 어디서부터 읽을지 결정, 처음 메시지부터 읽도록 함(토픽에 저장되어 있는)
enable_auto_commit=False #자동 커밋 꺼두는 편이 정확성 좋음
)
정리
- producer는 특정 topic으로 생성한 데이터를 전송. 이때, topic은 여러 개의 event로 구성 되며 각 event는 timestamp를 갖고 시간순으로 정렬됨. 또한 각 event가 들어갈 partition을 결정하는 방법 역시 producer 단에서 선택하게 됨.
- broker에서 producer가 생성한 데이터를 partition으로 나누어 저장. partition은 다시 여러 개의 segment로 구성되며, 최대 크기를 가지고 최대 크기만큼 event가 차면 새로운 segment를 생성함. 하나의 partition은 하나의 로그 파일로 볼 수 있음(segment의 특징)
- consumer는 topic을 구독하여 broker에 저장된 data를 partition 단위로 읽어옴
'DataEngineering' 카테고리의 다른 글
| [Airflow] airflow - spark 연동(YARN 및 Spark 복습) (0) | 2025.03.04 |
|---|---|
| [Airflow] 항공권 정보 ETL 파이프라인 만들어보기 (0) | 2025.02.02 |
| [Airflow] Airflow DAG 작성 연습 및 Backfill 이해하기 (3) | 2025.01.26 |
| [SnowFlake] S3버킷 연동 및 Json 데이터를 관계형 테이블에 로드 (2) | 2024.12.26 |
| 아파치 파케이(Parquet)와 열 지향 데이터베이스 (1) | 2024.09.22 |