Kafka

카프카에서 데이터를 읽는 것은 다른 메시지 전달 시트템에서 데이터를 읽는 것과 조금 다름

카프카 컨슈머

컨슈머, 컨슈머 그룹

  • 처리 속도로 인해 밀리지 않기 위해 데이터를 읽어 오는 작업을 확장할 수 있어야 함
  • 카프카 컨슈머는 보통 컨슈머 그룹의 일부로서 동작함
  • 일반적인 규모 확장 방식
    1. 하나의 컨슈머로 토픽에 들어오는 데이터의 속동를 감당할 수 없는 상황
    2. 컨슈머를 추가
    3. 단위 컨슈머가 처리하는 파티션과 메시지의 수를 분산
  • 여러 애플리케이션에서 동일한 토픽에서 데이터를 읽어와야 하는 경우
    1. 애플리케이션 별로 컨슈머 그룹 생성
    2. 컨슈머 추가

컨슈머 그룹과 파티션 리밸런스

  • 컨슈머 그룹에 속한 컨슈머들은 토픽의 파티션들에 대한 소유권을 공유
  • 리밸런스(rebalance): 컨슈머에 할당된 파티션을 다른 컨슈머에게 할당해주는 작업
  • 조급한 리밸런스 (eager rebalance)
    • 모든 컨슈머는 읽기 작업을 멈춤, 할당되었던 파티션 소유권 포기
    • 컨슈머 그룹에 다시 참여, 새로운 파티션 할당
  • 협력적 리밸런스 (cooperative rebalance)
    • 한 컨슈머에게 할당되어있던 파티션만 다른 컨슈머에게 재할당
    • 다른 컨슈머들은 하던일 방해받지 않음
    • 재할당할 파티션에 대한 읽기 작업을 멈춤
    • group coordinator 역할을 지정받은 브로커에 heartbeat를 전송하여 소유권 유지

정적 그룹 멤버십

  • 기본적으로 컨슈머의 그룹 멤버로서의 자격은 일시적임
  • 컨슈머에 고유한 group.instance.id 값을 부여하면 정적 멤버가됨
  • 컨슈머가 끝나면 자동으로 그룹을 떠나지 않고, 다시 조인하면 이전에 할당받았던 파티션 그대로 재할당
    • 파티션 할당을 캐시해두고 있어 리밸런싱을 발생시키지 않음

컨슈머 생성하기

  • bootstrap.servers: 카프카 클러스터로의 연결 문자열
  • key.deserialize, value.deserializer: 바이트 배열을 자바 객체로 변환
  • group.id: 컨슈머 그룹 지정, 필수는 아니지만 그룹에 속하지 않는 컨슈머를 생성하는 것은 일반적이지 않음

토픽 구독하기

  • 정규식을 이용하여 다수의 토픽을 구독하려하면 리밸런스가 발생

폴링 루프

  • 컨슈머 API의 핵심은 서버에 추가 데이터가 들어왔는지 폴링하는 단순한 루프
  • 일정시간(max.poll.interval.ms) 폴링되지 않으면 죽은 것으로 간주
  • 하나의 스레드당 하나의 컨슈머가 원칙

컨슈머 설정하기

  • fetch.min.bytes: 컨슈머가 브로커로부터 레코드를 얻어올 때 받는 데이터의 최소량
  • fetch.max.wait.ms: 얼마나 오래 기다릴 것인지 결정
  • fetch.max.bytes: 컨슈머가 브로커를 폴링할 때 카프카가 리턴하는 최대 바이트 수
  • max.poll.records: poll을 호출할 때마다 리턴되는 최대 레코드 수
  • max.partition.fetch.bytes: 서버가 파티션별로 리턴하는 최대 바이트 수
  • session.timeout.ms, heartbeat.interval.ms
  • max.poll.interval.ms: 컨슈머가 폴링 하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간