Kubernetes

Kafka Operator 설치 및 테스트

좋은 기회로 가시다님의 Database operator 수업을 듣게 되었고, kafka & Strimzi operator 수업에 배운 내용을 토대로, 쿠버네티스에서 카프카 운영에 도움을 주는 Strimzi 라는 오퍼레이터를 사용해 카프카의 안정성을 확인해 보겠습니다. 

카프카( Kafka ) 란 ?

https://kafka.apache.org/ 카프카 공식 홈페이지의 소개

아파치 카프카는 오픈소스 분산 이벤트 스트리밍 플랫폼(distributed event streaming platform) 입니다. 

아파치의 특징은 대표적으로 높은 처리량, 확장성, 영속성, 고가용성 입니다.

Kafka 아키텍처 및 주요 용어

  • KafkaCluster : 여러대의 브로커로 구성된 클러스터를 의미 합니다.
  • Broker : 카프카 애플리케이션이 설치된 서버를 말합니다. (동일 서버에 여러 브로커를 띄울수 있습니다)
  • Zookeeper : 카프카의 메타데이터 관리 및 브로커의 정상 상태 점검 (health check) 을 담당 합니다. 
  • Producer : 메시지(이벤트)를 발행하여 생산(Wirte) 하는 주체
  • Consumer : 메시지(이벤트)를 구독하여 소비(Read) 하는 주체
  • Topic : 메시지를 구분하는 단위 (파일시스템의 폴더와 유사)
  • Partition : 하나의 토픽을 여러 개로 나눈 것 (병렬 처리를 통한 고성능을 얻기 위함)
  • Offset : 파티션 내 각 메시지의 저장된 상대적 위치
  • Segment :  프로듀서가 전송한 실제 메시지가 브로커의 로컬 디스크에 저장되는 파일
  • message / record : 프로듀서가 브로커로 전송하거나 컨슈머가 읽어가는 데이터 조각

Operator란 ?

오퍼레이터는 쿠버네티스 추상화를 통해 관리 대상 소프트웨어의 전체 라이프사이클을 자동화, 애플리케이션을 패키징-배포-관리하는 방법론입니다. Operator가 파드 형태로 생성되면, 자신이 관리하는 CR(Custom Resource) 유형을 지속적으로 감시하고 현재 상태(current state)를 해당 리소스가 원하는 상태(desired state)로 유지하는 역할을 수행합니다.

 

아래 실습에 사용할 Strimzi 는 Apache kafka on kubernetes 를 목표로 운영되는 오픈소스 프로젝트 입니다. Kafka Operator를 사용해 클러스터 배포 및 다양한 설정에 필요한 기능을 제공합니다. 

쿠버네티스 클러스터 생성

저는 AWS cloudformation을 사용해 쿠버네티스 클러스터를 생성 했습니다.

테스트에 사용한 클러스터는 아래처럼 마스터노드 1대, 워커노드 3대 입니다.

aws 웹콘솔에서 쿠버네티스 클러스터 생성 확인

Kube-ops-view 배포

파드 배포 상태를 시각화하여 모니터링할 수 있도록 도와주는 kube-ops-view를 설치합니다.

설치가 끝나면 웹페이지에서 클러스터 내 배포된 파드의 현황을 한 눈에 볼수 있습니다.

kube-ops-veiw  설명

https://codeberg.org/hjacobs/kube-ops-viewㅇ

 

kube-ops-view

Kubernetes Operational View - read-only system dashboard for multiple K8s clusters

codeberg.org

Strimzi Operator 설치 ( with helm )

1. 네임스페이스 생성

kubectl create namespace kafka

2. strimzi repo 추가

helm repo add strimzi https://strimzi.io/charts/
helm repo update
helm show values strimzi/strimzi-kafka-operator

3. 차트 설치

tolerations 을 사용해 마스터 노드에 kafka operator 파드를  배포하도록 강제합니다.

 

printf 'tolerations: [{key: node-role.kubernetes.io/master, operator: Exists, effect: NoSchedule}]\n' | \
helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.29.0 --namespace kafka \
  --set nodeSelector."kubernetes\.io/hostname"=k8s-m --values /dev/stdin

4. 배포 내역 확인 

  • 배포가 잘 되었는지 확인

  • 차례로 오퍼레이터가 지원하는 카프카 버전, CRD를 확인 합니다.

카프카 클러스터 생성

이제 본격적으로 오퍼레이터를 사용하여 카프카 클러스터를 만들어 봅시다. 

위에서 설명한 카프카 오퍼레이터는 커스텀 리소스를 지속적으로 주시하고 있습니다. 

사용자가 구성하고자 하는 카프카 클러스터 내역이 담긴 커스텀 리소스(custom resource , 이하 CR)를 배포하면 오퍼레이터는 CR에 적힌대로 카프카 클러스터를 생성합니다.

 

kafka CR 은 다음과 같습니다. (카프카 버전을 명시하지 않으면 default로 최신 버전을 설치하게 됩니다.)

apiVersion: kafka.strimzi.io/v1beta2
 kind: Kafka
 metadata:
   name: my-cluster
 spec:
   kafka:
     #version: 3.1.1
     replicas: 3
     listeners:
       - name: plain
         port: 9092
         type: internal
         tls: false
       - name: tls
         port: 9093
         type: internal
         tls: false
       - name: external
         port: 9094
         type: nodeport
         tls: false
     storage:
       type: jbod
       volumes:
       - id: 0
         type: persistent-claim
         size: 10Gi
         deleteClaim: true
     config:
       offsets.topic.replication.factor: 3
       transaction.state.log.replication.factor: 3
       transaction.state.log.min.isr: 2
       default.replication.factor: 3
       min.insync.replicas: 2
       #inter.broker.protocol.version: "3.1.1"
     template:
       pod:
         affinity:
           podAntiAffinity:
             requiredDuringSchedulingIgnoredDuringExecution:
               - labelSelector:
                   matchExpressions:
                     - key: app.kubernetes.io/name
                       operator: In
                       values:
                         - kafka
                 topologyKey: "kubernetes.io/hostname"
   zookeeper:
     replicas: 3
     storage:
       type: persistent-claim
       size: 10Gi
       deleteClaim: true
     template:
       pod:
         affinity:
           podAntiAffinity:
             requiredDuringSchedulingIgnoredDuringExecution:
               - labelSelector:
                   matchExpressions:
                     - key: app.kubernetes.io/name
                       operator: In
                       values:
                         - zookeeper
                 topologyKey: "kubernetes.io/hostname"
   entityOperator:
     topicOperator: {}
     userOperator: {}

 

배포가 완료되면 각 노드에 주키퍼와 카프카가 statefulset 형태로 설치됩니다. 

 

주키퍼와 클러스터의 상세 설정 정보는 configmap에 저장됩니다. 

아래 명령어를 통해 확인할 수 있습니다. 

kubectl describe cm -n kafka strimzi-cluster-operator
kubectl describe cm -n kafka my-cluster-zookeeper-config
kubectl describe cm -n kafka my-cluster-entity-topic-operator-config
kubectl describe cm -n kafka my-cluster-entity-user-operator-config
kubectl describe cm -n kafka my-cluster-kafka-config

테스트용 파드를 생성하여 카프카 클러스터 정보 확인

kafka client는 카프카 테스트에 필요한 다양한 도구를 제공합니다. 

 

아래 야믈 파일을 배포합니다. 

apiVersion: v1
kind: Pod
metadata:
  name: myclient1
  labels:
    app: myclient
spec:
  nodeName: k8s-m
  containers:
  - name: myclient1
    image: bitnami/kafka:3.2
    command: ["tail"]
    args: ["-f", "/dev/null"]
  terminationGracePeriodSeconds: 0

 

카프카 클라이언트가 지원하는 도구 리스트

 

카프카 클러스터의 다양한 정보 확인 

# 카프카 파드의 SVC 도메인이름을 변수에 지정
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092

# 브로커 정보
kubectl exec -it myclient1 -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS

# 토픽 리스트 확인
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --list

# 토픽 리스트 확인 (kubectl native)
kubectl get kafkatopics -n kafka

 

토픽 생성 및 메시지 주고 받기

토픽을 생성하기 위한 야믈파일을 만듭니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: ${TOPICNAME}
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 1
  replicas: 3
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
    min.insync.replicas: 2

토픽을 생성합니다.

TOPICNAME=mytopic1 envsubst < mytopic.yaml | kubectl apply -f - -n kafka

 

mytopic1 이 생성 됨

토픽 상세 정보 확인 ( 설정값을 지정하지 않으면 기본값이 적용 됩니다 )

kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic1 --describe

토픽 파티션 갯수를 늘려보겠습니다

먼저, 파티션 1개 레플리카 3개의 토픽을 만듭니다.

kubectl exec -it myclient1 -- kafka-topics.sh --create --bootstrap-server $SVCDNS --topic mytopic2 --partitions 1 --replication-factor 3 --config retention.ms=172800000

클라이언트를 사용해 mytopic2 의 파티션 갯수를 2개로 늘립니다.

kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 2
kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe

mytopic2 의 파티션이 2개로 늘어남

여기서 궁금한게 파티션을 늘리는건 가능한데 줄이는건 불가할까 ? 

정답은 가능하긴 하지만 권장하지 않는다는 것. 그러므로 파티션을 설정할 때 잘 고민해야 합니다.

파티션을 줄이려 하자 에러가 출력됨

토픽에 데이터 넣기

테스트를 위해 클라이언트의 두가지 스크립트를 사용합니다.

kafka-console-producer.sh

kafka-console-consumer.sh

# 카프카 파드의 SVC 도메인이름을 변수에 지정
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092

# 토픽에 데이터 넣어보기 (producer)
kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1

# 토픽 데이터 확인 (consumer)
kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning

장애 발생 테스트

본격적으로 카프카에 장애를 발생시켰을 떄의 동작을 확인해 보겠습니다.

  • 장애상황 : kafka 혹은 zookeeper 파드 1개 삭제

토픽을 생성합니다.

TOPICNAME=mytopic3 envsubst < mytopic.yaml | kubectl apply -f - -n kafka

테스트 편의를 위해  kafkacat 이라는 툴을 설치합니다.

apt install kafkacat -y

kafkacat 을 사용해 컨트롤러 브로커 위치를 확인 합니다.

메시지를 받는 터미널과 메시지를 반복해서 보내는 터미널 띄운 상태에서 브로커 파드를 강제 삭제했을 때 상황을 보겠습니다.

1. 메시지를 받는 터미널

2. 반복해서 메시지를 보내는 터미널

  • kafka 클러스터를 삭제 합니다.

카프카 클러스터를 삭제해도 메시지를 잘 받는것을 확인할 수 있습니다. 

  • 주키퍼를 삭제 합니다. 

주키퍼를 삭제해도 카프카 클러스터와 마찬가지로 메시지 전송에는 문제가 없습니다. 스테이트풀셋 이기 떄문에 1분안에 주키퍼가 재생성 되는 것을 확인할 수 있습니다. 

 

마무리

데이터 조직에서 카프카를 많이 사용한다는 것은 알았지만, 실제로 어떤 서비스인지 잘 몰랐는데, 이번 학습을 통해 kafka의 개념과 작동원리를 파악할 수 있어서 좋았다. 특히, operator 패턴을 사용해 쿠버네티스에서 다양한 데이터 관련 서비스를 운영하기가 상당히 편리하고, 여러 장애 케이스에서도 안전한 서비스를 할 수 있도록 준비가 되있다는 사실을 직접 테스트하면서 알수 있게 되었다. 앞으로 쿠버네티스에서 데이터베이스를 구축할 일이 있을 경우 배운 지식을 적극적으로 활용해야겠다 !