티스토리 뷰

 

*본 포스팅은 인프런 [데브원영]님의 아파치 카프카 프로그래밍 강의 세션 4를 수강하고 이를 참고해 작성했습니다. 

 

[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! |

데브원영 DVWY | 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고

www.inflearn.com

 

* 이전 포스팅: 로컬 카프카 실행 방법 - 섹션 4-1 포스팅

 

[Apache Kafka] 로컬 카프카 설치 및 실행 방법 | 주키퍼 실행 | 정상 실행 확인 방법 | 섹션 4-1 | 스터

*본 포스팅은 인프런 [데브원영]님의 아파치 카프카 프로그래밍 강의 세션 4를 수강하고 이를 참고해 작성했습니다.  [아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커

yuejeong.tistory.com


 

이번 강의에서는 Apache kafka_2.12-2.5.0 버전에서 제공하는 쉘스크립트를 직접 터미널에서 실행해 보며 실제 프로듀서와 컨슈머가 데이터를 처리하는 과정을 CLI 상에서 확인할 수 있었다.

 

 우선 본격적인 포스팅에 들어가기에 앞서서 나는 아파치 카프카 브로커 서버를 실행하려는 과정에서 다음과 같은 오류를 마주쳤다.

2024-05-14 22:37:36,660] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID vF36pE_BS3OvSnVeQhHTlg doesn't match stored clusterId Some(NUrwMEh7R26EgD2xoKZh5w) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.

 

섹션 4를 나눠 들으면서 지난주에 브로커와 주키퍼를 실행시켰을 때는 문제없이 실행됐는데, 껐다 다시 키려니 카프카 브로커를 실행하는 명령어에서 다음과 같은 오류가 발생한 것이다. 

 

해결 원인을 찾아보니, 이전에 실행할 때 로그 메타데이터를 저장할 data 디렉토리를 생성해서 server.properties에 해당 디렉토리 경로를 적어줬는데, 이전에 실행했던 브로커 서버 아이디가 메타 데이터에 이미 저장됐기 때문에 새로 생성하려는 서버 아이디는 매치하지 않아서 발생한 문제였다.

그래서 기존에 생성해 뒀던 data 디렉토리는 'sudo rm -r data' 명령어를 사용해 강제 삭제하고, 'mkdir data' 명령어를 입력해 다시 생성하면 된다. 그리고 카프카 실행 명령어 'bin/kafka-server-start.sh config/server.properties'를 입력하면 이제 서버가 잘 올라간다.


kafka-topics.sh

해당 스크립트를 실행하면 클러스터 정보와 토픽 이름만으로 토픽을 생성할 수 있다. 이때 만들어진 토픽의 파티션 개수, 복제 개수 등과 같은 옵션은 따로 지정하지 않으면 브로커에 설정된 기본값으로 설정된다.

 

--create, --describe

▪️ '--create' 옵션을 사용해 로컬 서버에 hello.kafka를 이름으로 갖는 토픽을 생성했다.

▪️ '--describe' 옵션을 사용하면 방금 만든 토픽의 정보를 조회할 수 있다. 파티션, 레플리케이션 등은 기본값으로 설정되어 있는 것을 볼 수 있다.

 

 

이번에는 파티션 개수, 복제 개수, 토픽 데이터 유지 기간 옵션들을 지정하여 생성해 보자. 

 --partitions, --replication-factor, --config retention.ms 등의 옵션을 추가로 붙여서 토픽을 생성한 후, --describe로 조회했더니 다음과 같이 파티션이 성공적으로 0부터 9까지 10개 생성되어 있는 것을 볼 수 있다. 

 

 

이번에는 파티션 개수를 늘려보자. 파티션 개수를 늘리는 방법은 --alter를 사용하면 된다. 

--alter

▪️ 우선 test 토픽을 특별한 옵션을 지정하지 않은 채로 생성했다. describe로 조회해 보면 다음과 같이 파티션 개수는 기본값인 1개이다(1)

▪️ 이제 --alter 명령어로 --partitions 10으로 지정해 파티션 개수를 10개로 수정한 뒤(2), 다시 조회해 보면 파티션 개수가 10개로 늘어난 것을 확인할 수 있다(3)

 

 

그럼 파티션을 줄이는 것도 가능할까? 시도해 보자. 위에서 했던 것처럼 --alter 명령어에서 파티션 개수를 5로 지정해서 실행해 보았다. 

오류 발생

결과는 오류가 발생한다. 분산 시스템에서 이미 분산된 데이터를 줄이는 방법은 매우 복잡하다. 삭제 대상 파티션을 지정해야 할 뿐만 아니라 기존에 저장되어 있던 레코드를 분산하여 저장하는 로직이 필요하기 때문이다. 이 때문에 카프카에서는 파티션을 줄이는 로직을 제공하지 않는다. 

만약 파티션 개수를 꼭 줄여야 하는 상황이면 토픽을 지우고 새로 생성하는 편이 좋다.


kafka-configs.sh

토픽의 일부 옵션을 설정하기 위해서는 kafka-configs.sh 명령어를 사용해야 한다. 

데이터를 주고받을 때 안전성을 보장할 수 있는 min.insync.replicas 옵션은 다음과 같이 bin/kafka-configs.sh 스크립트로 --alter 옵션을 사용해 설정할 수 있다. 

bin/kafka-configs.sh

 

브로커에 설정된 기본값을 확인하려면 server.properties를 직접 확인할 수도 있지만 다음과 같이 --all -describe를 사용하면 브로커의 설정값을 모두 확인할 수 있다.

--all -describe


kafka-console-producer.sh

이제 본격적으로 토픽에 데이터를 넣어보자. kafka-console-producer.sh 명령어를 사용하면 키보드로 문자를 입력해서 토픽에 데이터를 저장할 수 있다.

key = null

로컬 브로커에 hello kafka 토픽에 데이터를 보내는 모습이다. >로 각 레코드가 구분돼서 총 9개의 레코드를, 키 없이 메시지 밸류만 전송했다.

 

이번에는 키를 지정해서 레코드를 전송해 보자. 키를 전송할 때는 --property 옵션으로 "parse.key=true", "key.separator=:"를 지정해 줬다. 이때 key.separator는 메시지 키와 밸류를 구분하는 구분자를 설정하는 것으로 원래 기본자는 '탭키(₩t)'이다. 그래서 메시지 키를 작성하고 ➡️ 탭키를 누른 뒤 ➡️ 메시지 값을 작성하고 ➡️ 엔터를 누르면 키를 명시한 레코드를 전송할 수 있는데 예제에서는 키와 밸류를 명시적으로 구분하기 위해 key.separator로 ':'를 설정해 줬다.

key 지정해서 전송하기

메시지 키 유무에 따른 파티셔닝 방법을 복습해 보자면,

메시지 키가 null인 경우, 프로듀서가 파티션으로 전송할 때 레코드 배치 단위(전송 묶음)로 라운드 로빈으로 전송한다.

메시지 키가 존재하는 경우에는 키의 해시값에 할당되는 파티션에 전송이 되는데, 이로 인해 메시지 키가 동일한 경우에는 모두 동일한 파티션으로 전송된다 ➡️ 덕분에 순서를 보장할 수 있는 것이다. 


kafka-console-consumer.sh

위에서 hello.kafka 토픽으로  프로듀서가 데이터를 보냈으니, 이제 컨슈머를 실행해서 데이터를 읽어와 보자.

이때 필수 옵션은 --bootstrap-server에 클러스터 정보, --topic에 토픽 이름, 그리고 추가로 가장 처음 데이터 즉 오래된 데이터부터 출력하기 위해서 --from-beginning 옵션을 사용할 수 있다.

kafka-console-consumer.sh

 

출력해 보면 데이터가 레코드의 값만 출력된 것을 볼 수 있다. 만약 레코드의 키도 함께 확인하고 싶다면 --property print.key=true를 추가하면 된다. 마찬가지로 명시적인 구분을 위해 --property key.separator="-" 옵션으로 키와 값을 구분해 줬다. 

 

 

만약 메시지 수를 지정해서 출력하고 싶다면, --max-messages 옵션을 사용하면 된다. 실무 상황에서는 데이터가 수백만 개가 들어있으므로 끝없이 출력될 것이니 특정 개수만 보고 싶을 때 유용하게 사용할 수 있다. 

--max-message

--max-message를 2로 설정했더니 hello, kafka 두 개의 레코드만 받아온 것을 확인할 수 있다. 

 

 

--partition 옵션을 사용하면 특정 파티션만 가져올 수 있다. 출력해 보면 현재 레코드는 모두 0번 파티션에만 들어있으므로 1번, 2번 파티션을 출력했을 때 아무것도 없는 것을 볼 수 있다. 

위 기능을 사용하면 메시지 키가 포함된 데이터를 전송할 때 특정 파티션에만 잘 쌓이고 있는지 확인할 수 있다.

 

이번에는 --group 옵션을 사용해 보자. group 옵션을 사용하면 kafka-console-consumer.sh가 컨슈머 그룹을 기반으로 동작한다. 이때 컨슈머 그룹은 '특정 목적을 가진 컨슈머들을 묶음으로 사용하는 것'으로 토픽의 레코드를 어디까지 읽었는지 즉 오프셋에 대한 데이터가 브로커에 저장된다. 

컨슈머 그룹은 따로 생성하는 것이 아니라 다음과 같이 컨슈머를 동작시킬 때 그룹 이름을 지정하면 생성된다.

다음과 같이 'hello-group' 그룹을 지정해서 데이터를 읽으면 마지막으로 읽은 레코드 'youjung'를 커밋해 둬서 다음에 읽어올 때는 그다음부터 읽어오게 된다. 커밋 데이터는 '__consumer_offsets'에 저장된다.

 

컨슈머 그룹에 대한 내용은 아래에서 더 자세히 다룬다.

다음 내용으로 넘어가기 전에 프로듀서와 컨슈머가 실시간으로 데이터를 보내고 가져오는 모습을 확인해 보자

 

다음과 같이 각 터미널에 하나는 데이터를 보낼 프로듀서 역할을 할 kafka-console-producer.sh을 실행하고 

하나는 데이터를 가져올 컨슈머 역할을 할 kafka-console-consumer.sh을 실행하면 프로듀서에서 보내는 레코드를 컨슈머가 바로바로 가져가는 모습을 실시간으로 확인할 수 있다.

 


kafka-consumer-groups.sh

지금까지 생성한 컨슈머 그룹의 리스트는 kafka-consumer-groups.sh 명령어에 --list 옵션을 붙여서 조회할 수 있다. 

여기서 --describe 옵션을 사용하면 컨슈머 그룹이 어떤 토픽을 대상으로 레코드를 가져갔는지 상태를 확인할 수 있다. 파티션 번호, 레코드의 오프셋, 파티션의 가장 최근 레코드의 오프셋, 컨슈머 렉(앞에 두 개의 차이), 컨슈머 ID, 호스트 등의 정보를 알 수 있기 때문에 컨슈머의 상태를 조회할 때 유용하다.

 

컨슈머 렉은 프로듀서가 보낸 가장 최신 레코드의 오프셋과 컨슈머가 현재 가져간 레코드의 오프셋의 차이이므로 파티션과 컨슈머 개수를 조정할 때 꼭 확인해야 하는 지표이다. 

 

 

컨슈머와 프로듀서의 동작에 따라 LAG 값이 변화하는 것을 직접 확인해 보자.

현재 위에 예시 화면에서 컨슈머 그룹을 지정해 토픽을 실행하고, 토픽의 레코드를 모두 확인한 뒤, --describe로 컨슈머 그룹 정보를 확인해 보자

(1) 현재 hello.kafka 토픽에는 14개의 데이터가 있고 컨슈머는 모든 데이터를 읽었으므로 CURRENT-OFFSETLOG-END-OFFSET이 일치한다. 그러므로 LAG은 0이다.

(2) 그런데 여기서 프로듀서가 데이터를 더 넣으면 어떻게 될까? 다음과 같이 kafka-console-producer.sh

명령어로 1부터 7까지 레코드를 더 전송한다면, 현재 토픽에는 7개의 데이터가 더 추가된다. 

(3) 이를 직접 확인해 보면 LOG-END-OFFSET이 21로 바뀐 것을 볼 수 있다. 하지만 컨슈머는 아직 새로운 데이터를 가져오지 않았으므로 CURRENT-OFFSET은 그대로이고 그러므로 LAG은 7로 바뀌어있다.

 

그럼 이제 LAG을 줄여보자!

(1) 우선 렉을 줄이기 전 컨슈머 그룹의 오프셋 데이터는 다음과 같다. 토픽에 있는 가장 최신 데이터의 오프셋은 21, 현재 컨슈머가 읽은 데이터의 오프셋은 14이므로 총 7의 지연이 발생했다.

(2) 그럼 kafka-console-consumer.sh를 실행해서 데이터를 읽어오자. 해당 명령어를 실행하니 우리가 새로 넣었던 레코드인 1부터 7까지의 데이터가 넘어왔다.

(3) 이후에 다시 kafka-consumer-groups.sh  --describe를 실행하면 컨슈머의 오프셋이 가장 최신 데이터의 오프셋과 일치하면서 지연이 사라진 것을 확인할 수 있다.

 


그렇다면 이 컨슈머의 오프셋을 사용자가 마음대로 변경할 수 있을까? ➡️ 가능하다.

kafka-consumer-groups.sh  명령어에 --reset-offsets 옵션을 붙여서 오프셋을 자유자재로 조정할 수 있다.  

kafka-consumer-groups.sh  --reset-offsets 옵션에 --to-earliest로 리셋함으로써 가장 오래된 레코드 즉 가장 낮은 숫자의 오프셋을 가지는 데이터부터 가져오도록 오프셋을 설정한다.

이후 kafka-console-consumer.sh 실행 시 이전에 이미 읽어왔던 레코드부터 모두 가져온 것을 확인할 수 있다. 

 

kafka-consumer-groups.sh 오프셋 리셋의 종류는 다음과 같다. 

--to-earliest: 가장 처음(작은) 오프셋으로 리셋
--to-latest: 가장 마지막(큰) 오프셋으로 리셋
--to-current: 현시점 기준 오프셋으로 리셋
--to-datetime {YYYY-MM-DDTHH:mmSS.sss}: 특정 일시로 오프셋 리셋 (레코드 타임스탬프 기준) 
--to-offset {long}: 특정 오프셋으로 리셋
--shift-by {+/- long}: 현재 컨슈머 오프셋에서 앞뒤로 옮겨서 리셋

 


그 외 커맨드 라인 툴

kafka-producer-perf-test.sh: 카프카 프로듀서로 퍼포먼스를 측정할 때 사용

 

 

kafka-consumer-perf-test.sh: 카프카 컨슈머로 퍼포먼스를 측정할 때 사용

 

 

kafka-reassign-partitions.sh: 리더 파티션과 팔로워 파티션의 위치를 변경할 수 있다.

만약 다음과 같이 하나의 브로커에 리더 파티션이 몰리는 경우, 해당 브로커에만 네트워크가 과부하되고 다른 브로커는 놀게 된다. 이렇게 특정 브로커에만 몰려있는 경우에 kafka-reassign-partitions.sh를 사용해서 리더 파티션을 리밸런싱 할 수 있다. 

 

카프카 브로커에는 auto.leader.rebalance.enable 옵션이 있는데 이 옵션의 기본값은 true로 클러스터 단위에서 리더 파티션을 자동 리밸런싱하도록 도와준다. 브로커의 백그라운드 스레드가 일정한 간격으로 리더의 위치를 파악하고 필요시 리더 리밸런싱을 통해 리더의 위치가 알맞게 배분된다.

 

 

끝.

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2024/09   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
글 보관함