티스토리 뷰

*본 포스팅은 인프런 [데브원영]님의 아파치 카프카 프로그래밍 강의 섹션 6을 수강하고 이를 참고해 작성했습니다.

 

[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! |

데브원영 DVWY | 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고

www.inflearn.com

 

카프카 컨슈머 개요

 

프로듀서가 전송한 데이터는 카프카 브로커에 적재된다. 컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 한다. 

 

컨슈머 내부 구조

- Fetcher 

리더 파티션으로부터 레코드들을 배치로 미리 가져와서 대기한다.

 

- Poll()

Fetcher에 있는 레코드들을 리턴하는 레코드. 

 

- ConsumerRecords

처리하고자 하는 레코드들의 모음. 리스트 형태로 정렬되어 있고 병렬로 처리된다. 오프셋이 포함되어 있다. 

 

 

컨슈머 그룹

- 컨슈머 그룹이란? 특정 토픽에 대해서 어떤 목적에 따라 데이터를 처리하는 컨슈머들을 묶은 그룹으로 같은 그룹이면 대체로 같은 로직을 가진다. 

- 컨슈머 그룹으로 묶인 컨슈머들은 토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있다. 즉 여러 개의 파티션에 할당 가능

- 컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈 때, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하다. 

➡️ 파티션 : 토픽 = 1 : N 관계

- 그러므로 컨슈머 그룹의 컨슈머 개수는 토픽의 파티션 개수보다 같거나 작아야 한다. 

- 최대 성능을 내기 위해서는 파티션과 컨슈머가 일대일 매칭되는 것이 가장 좋다. 

 

컨슈머 그룹의 컨슈머가 파티션 개수보다 많을 경우

만약 4개의 컨슈머로 이뤄진 컨슈머 그룹으로 3개의 파티션을 가진 토픽을 일대일로 할당하면, 1개의 컨슈머는 파티션을 할당받지 못하고 IDLE 상태 즉 유휴 상태로 남게 된다. 

➡️ 유휴 상태의 컨슈머는 스레드만 차지하고 실질적인 데이터 처리를 하지 못하므로 애플리케이션 실행에 있어서 불필요한 스레드이다

그러므로, 컨슈머는 파티션 개수만큼만 띄우자

 

 

컨슈머 그룹을 활용하는 이유

CPU, 메모리 정보 등의 서버 주요 리소스 데이터를 수집하는 데이터 파이프라인을 구축한다고 가정해 보자. 

실시간 리소스를 시간순으로 확인하기 위해서 데이터를 엘라스틱 서치에 저장하고 이와 동시에 대용량 적재를 위해 하둡에 적재할 것이다. 

이 파이프라인을 카프카를 사용하지 않고 동기적으로 구현할 때 vs 카프카 컨슈머 그룹을 활용해서 구현할 때 두 상황을 각각 비교해 보자. 

 

- 카프카를 사용하지 않는 경우

리소스를 엘라스틱 서치와 하둡에 적재하기 위해 동기적으로 적재를 요청할 것이다. 그런데 서버 대수가 100대 이상 넘어가면 에이전트에서 수집하는 데이터 양은 방대한데 이 상황에서 엘라스틱이나 하둡 둘 중 하나에 장애가 발생하면 '강한 커플링'으로 인해 더는 적재가 불가능하다.

 

- 카프카 컨슈머 그룹을 사용하는 경우

카프카는 최종 적재되는 저장소의 장애에 유연하게 대응할 수 있도록 각기 다른 저장소에 저장하는 컨슈머를 다른 컨슈머 그룹으로 묶음으로써 각 저장소의 장애에 격리되어 운영할 수 있다. 예를 들어, 엘라스틱 서치에서 장애가 발생하더라도 하둡으로 데이터를 적재하는 데에는 문제가 없다. 엘라스틱 서치의 장애가 해소되면 엘라스틱 서치로 적재하는 컨슈머의 컨슈머 그룹은 마지막으로 적재 완료한 데이터 이후부터 다시 적재를 수행해 최종적으로 모두 정상화된다.

➡️ 강한 커플링을 끊고 유연한 구조를 구현할 수 있다. 더불어 컨슈머 수를 능동적으로 줄이고 늘려서 리소스 사용량을 조절할 수 있다

 

리밸런싱(Rebalancing)

리밸런싱(Rebalancing)이란? 컨슈머 그룹으로 이뤄진 컨슈머 중 일부 컨슈머에 장애가 발생했을 때 장애가 발생한 컨슈머에 할당된 파티션이 정상 작동하는 컨슈머에게 소유권이 넘어가도록 하는 과정

 

리밸런싱이 발생하는 상황 두 가지: [1] 컨슈머가 추가되는 경우 [2] 컨슈머가 제외되는 경우

이슈가 발생한 컨슈머를 컨슈머 그룹에서 제외하여 모든 파티션이 지속적으로 데이터를 처리할 수 있도록 가용성을 높여준다.

리밸런싱은 컨슈머가 데이터를 처리하는 도중 언제든지 발생할 수 있으므로 대응 코드를 작성하는 것이 필수!

 

커밋(Commit)

커밋(commit)이란? 카프카 브로커로부터 데이터를 어디까지 가져갔는지 기록하는 것

특정 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽(_consumer_offsets)에 기록된다.

이때, 컨슈머 동작 이슈가 발생해 오프셋 커밋이 정상적으로 기록되지 않으면 데이터 처리의 중복이 발생할 수 있다.

➡️ 데이터 처리의 중복이 발생하지 않기 위해서 컨슈머 애플리케이션이 오프셋 커밋을 정상적으로 처리했는지 검증해야 함!

 

어사이너(Assignor)

어사이너(Assignor)란? 컨슈머와 파티션의 할당 정책을 결정하는 것

- RangeAssignor: 각 토픽에서 파티션을 숫자로 정렬, 컨슈머를 사전 순서로 정렬해 할당

- RoundRobinAssignor: 모든 파티션을 컨슈머에서 번갈아가며 할당

- StickyAssignor: 최대한 파티션을 균등하게 배분하면서 할당

카프카 2.5.0 버전에서는 RangeAssignor가 기본값으로 설정된다.

 

 

컨슈머 필수 옵션

디폴트 값이 없는 필수 옵션들

- bootstrap.servers: 프로듀서가 데이터를 전송할 대상 클러스터에 속한 브로커의 호스트 이름  ➡️ 두 개 이상 입력해 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정 가능

- key.deserializer: 레코드의 메시지 키를 역직렬화하는 클래스 지정

- value.deserializer: 레코드의 메시지 값을 역직렬화하는 클래스 지정

이때 디시리얼라이저는 프로듀서 필수 옵션에서도 언급한 것처럼 프로듀서에서 설정하는 시리얼라이저와 일치시켜야 한다. 

 

 

컨슈머 선택 옵션

선택 옵션 설명 기본값
group.id 컨슈머 그룹 아이디를 지정한다. subsribe() 메서드로 토픽을 구독해 사용할 때는 필수, assign 사용해서 할당할 때는 선택 옵션이다. null
auto.offset.reset 컨슈머 그룹이 특정 파티션을 읽을 때 어느 오프셋부터 읽을지 선택하는 옵션.
이때, 이미 컨슈머 오프셋이 있다면 이 옵션값은 무시된다. 
latest
enable.auto.commit 자동 커밋 / 수동 커밋 true
auto.commit.interval.ms 자동 커밋일 경우 오프셋 커밋 간격을 지정 5000(5초)
max.poll.records poll() 메서드를 통해 반환되는 최대 레코드 개수 지정. 더 많은 데이터를 처리하고 싶다면 값을 늘린다. 500 
session.timeout.ms 컨슈머가 브로커와 연결이 끊기는 최대 시간. 해당 시간이 너무 길면 데이터 처리가 지연될 수 있고, 너무 짧으면 문제가 없음에도 리밸런싱이 자주 발생할 수 있음 10000(10초) 
heartbeat.interval.ms 하트비트를 전송하는 시간 간격. 여기서 하트비트는 컨슈머와 연결이 지속되고 있음을 확인하기 위해 전송하는 신호 3000(3초)
max.poll.interval.ms poll() 메서드를 호출하는 간격의 최대 시간. 레코드 하나당 데이터 처리 시간이 길다면 불필요한 리밸런싱 피하기 위해 최대 시간을 높이기. 짧다면 장애 상황을 빠르게 파악하기 위해 값을 낮추기. 300000(5분)
isolation.level 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용  

 

 

auto.offset.reset

컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션. 이전에 이미 커밋을 한 번이라도 한 경우 즉 커밋 기록이 이미 있다면 이 옵션값은 무시된다. latest, earliest, none 중 택 1. 기본값은 latest

- latest: 가장 높은(가장 최근에 넣은) 오프셋부터 읽기 시작

- earliest: 가장 낮은(가장 오래전에 넣은) 오프셋부터 읽기 시작

- none: 컨슈머 그룹이 커밋한 기록이 있는지 찾고 없으면 오류 반환. 있다면 기존 커밋 기록 이후 오프셋부터 읽기 시작한다. 

 

 

컨슈머 애플리케이션 개발

SimpleConsumer

public class SimpleConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                logger.info("record:{}", record);
            }
        }

    }
}

무한루프에서 poll을 실행시켜 레코드를 받아와 순차적으로 처리한다.

 

코드를 실행시키면 다음과 같이 컨슈머가 0번 파티션에 지정되어 정상적으로 실행되는 모습을 확인할 수 있다.

 

콘솔 프로듀서를 실행해서 hello, kafka 레코드를 전송해 보면 

 

다음과 같이 콘솔에서 보낸 hello와 kafka가 순차적으로 출력된 것을 확인할 수 있다.

 

 

수동 커밋 컨슈머 애플리케이션

동기 오프셋 커밋 컨슈머

poll() 메서드가 호출된 이후 commitSync() 메서드를 호출하여 오프셋 커밋을 명시적으로 수행할 수 있다. commitSync()는 poll() 메서드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋한다. 동기 오프셋 커밋을 사용할 경우 모든 레코드 처리가 끝난 이후 commitSync() 메서드를 호출해야 함! 동기 오프셋 커밋을 사용할 경우 커밋 응답을 기다리는 동안 데이터 처리가 일시적으로 중단된다. 

	KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                logger.info("record:{}", record);
            }
            consumer.commitSync();
        }

 

현재 소스코드는 '레코드별로' 커밋을 하도록 구현이 되어 있지만 커밋을 자주 하면 브로커에 부담이 가므로 다음과 같이 레코드마다 커밋을 진행하는 것은 흔하지 않다. 

 

 

비동기 오프셋 커밋 컨슈머

오프셋 커밋 응답을 기다리는 동안 데이터 처리가 일시적으로 중단되는 동기 오프셋 커밋과 달리 비동기적으로 처리해 더 많은 데이터를 처리할 수 있는 비동기 오프셋 커밋은 commitAsync()를 호출해 사용할 수 있다.

	KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                logger.info("record:{}", record);
            }
            consumer.commitAsync(new OffsetCommitCallback() {
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
                    if (e != null)
                        System.err.println("Commit failed");
                    else
                        System.out.println("Commit succeeded");
                    if (e != null)
                        logger.error("Commit failed for offsets {}", offsets, e);
                }
            });
        }

 

consumer.commitAsync()는 백그라운드에서 실행되므로 지속적으로 데이터 처리가 가능하다.

OffsetCommitCallback()을 사용해 커밋 완료 여부를 기록한다.

 

 

리밸런스 리스너를 가진 컨슈머 애플리케이션

리밸런스 발생 감지를 위해 카프카는 ConsumerRebalancedListener 인터페이스를 지원한다. ConsumerRebalanceListner 인터페이스로 구현된 클래스는 onPartitionAssigned(), onPartitionRevoked() 메서드로 이뤄진다.

- onPartitionAssigned(): 파티션 할당 / 리밸런스 끝난 뒤 파티션이 할당 완료되면 호출되는 메서드

- onPartitionRevoked(): 파티션 해제 / 리밸런스 시작되기 직전에 호출되는 메서드. 마지막으로 처리한 레코드를 기준으로 커밋을 하기 위해서는 리밸런스가 시작하기 직전에 커밋을 하면 되므로 onPartitionRevoked() 메서드에 커밋을 구현하여 처리할 수 있다.

 

RebalanceListener

public class RebalanceListener implements ConsumerRebalanceListener {
    private final static Logger logger = LoggerFactory.getLogger(RebalanceListener.class);

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        logger.warn("Partitions are assigned : " + partitions.toString());

    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        logger.warn("Partitions are revoked : " + partitions.toString());
    }
}

각 메서드에서 로그를 찍어 명시적으로 작동을 확인해 보자.

 

Consumer 코드

 public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                logger.info("{}", record);
            }
        }
    }

consumer subsribe에 rebalanceListener를 넣어 구독하고 있는 컨슈머 그룹이 어떻게 리밸런싱 하는지 확인한다.

그럼 코드를 실행하고 그 결과를 확인해 보자.

 

1. 먼저 컨슈머를 하나 실행하면 다음과 같이 0번 파티션에 할당된 것을 볼 수 있다.

1. 컨슈머 1번 실행 - 0번 파티션 할당

 

2. 이후 Enable multiple instance를 설정해 인스턴스를 하나 더 띄워주면, 새로 띄운 두 번째 컨슈머에는 파티션이 할당되지 않은 것을 볼 수 있다. 현재 토픽에 있는 파티션은 하나뿐이므로 두 번째 컨슈머에 할당될 파티션이 존재하지 않기 때문이다.

2. 컨슈머 2번 실행 - 할당할 파티션 없음

 

3. 두 번째 컨슈머가 실행된 시점에 기존 컨슈머에 대해 리밸런싱이 발생한다. 0번 revoke 하고 다시 0번을 그대로 assign 해준다.

컨슈머 1번 리밸런싱 발생 - 0번 파티션 그대로 가지고 있음

 

4. 그럼 이번에는 첫 번째 컨슈머를 종료시켜 보자.

컨슈머 1번 종료

 

ps -ef | grep ConsumerWithSyncOffsetCommit 명령어를 사용해 프로세스 번호를 확인하고

kill -term [프로세스 번호]를 입력해 실행 중이던 첫 번째 컨슈머를 삭제한다. 

 

5. 첫 번째 컨슈머를 종료시키고 두 번째 컨슈머의 로그를 확인해 보면, 컨슈머 삭제로 인해 리밸런싱이 발생하고, 첫 번째 컨슈머로부터 revoke 한 0번 파티션이 두 번째 컨슈머에게 할당된 것을 확인할 수 있다.

컨슈머 2번에 리밸런싱 발생 - 0번 파티션 할당

 

 컨슈머 애플리케이션의 안전한 종료

컨슈머 애플리케이션은 안전하게 종료되어야 한다. 정상적으로 종료되지 않은 컨슈머는 세션 타임아웃이 발생할 때까지 컨슈머 그룹에 남게 된다. 안전 종료를 위해서는 KafkaConsumer 클래스의 wakeup() 메서드를 사용한다.

[1] wakeup() 메서드가 실행

[2] poll() 메서드가 호출

[3] WakeupException 예외가 발생

[4] 예외 받으면 데이터 처리를 위해 사용한 자원들을 해제

 

코드를 실행해서 직접 확인해 보자.

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());

        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    logger.info("{}", record);
                }
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            logger.warn("Wakeup consumer");
        } finally {
            logger.warn("Consumer close");
            consumer.close();
        }
    }

    static class ShutdownThread extends Thread {
        public void run() {
            logger.info("Shutdown hook");
            consumer.wakeup();
        }
    }

try catch문을 사용해서 오류가 발생하면 컨슈머가 안전하게 종료하기 위해 wakeup() 메서드를 호출하도록 코드를 구성했다.

 

실행 결과 다음과 같이 로그가 차례대로 출력된 것을 볼 수 있다.

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2024/09   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
글 보관함