Kafka

카프카 프로듀서

  • 목적, 요구 조건이 다양함 -> 프로듀서 API를 사용하는 방식과 설정에 영향을 미침

메세지 쓰는 순서

  1. ProducerRecord 객체 생성
    • 필수: 레코드가 저장될 토픽, 밸류 지정
    • 선택: 키와 파티션 지정
  2. 전송 API 호출
    • 프로듀서는 키와 값 개체를 네트워크로 전송할 수 있도록 직렬화
    • 파티션을 지정하지 않았다면 파티셔너에게 보냄
    • 파티셔너에서 ProducerRecord 객체의 키 값을 기준으로 파티션 결정
  3. 같은 토픽, 파티션으로 전송될 레코드들 모음인 레코드 배치(record batch)에 추가
  4. 별도의 스레드가 레코드 배치를 적절한 카프카 브로커에게 전송
  5. 브로커가 메세지를 받으면 응답을 돌려줌
    • 성공적으로 저장: RecordMetadata(토픽, 파티션, 레코드 오프셋을 담음) 객체를 리턴
    • 실패: 에러 리턴

카프카 프로듀서 생성

  • 3개의 필수 속성 값을 가짐
  1. bootstrap.servers
    • 카프카 클러스터와 첫 연결을 생성하기 위해 프로듀서가 사용할 브로커의 host:port 목록
    • 브로커 중 하나가 작동을 정지하는 경우에도 프로듀서가 클러스터에 연결할 수 있도록 최소 2개 이상 지정 권장
  2. key.serializer
    • 카프카에 쓸 레코드의 키 값을 직렬화하기 위해 사용하는 시리얼라이저(serializer) 클래스의 이름
  3. value.serializer
    • 카프카에 쓸 레코드의 밸류값을 직렬화하기 위해 사용하는 시리얼라이즈 클래스의 이름

메시지 전송 방법

  1. 파이어 앤 포겟 (Fire and Forget)
    • 메시지를 서버에 전송마나하고 성공 혹은 실패 여부에는 신경 쓰지 않는다.
  2. 동기적 전송 (Synchronous Send)
    • 다음 메시지를 전송하기 전 get() 메서드를 호출해서 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인
  3. 비동기적 전송 (Asynchronous send)
    • 콜백 함수와 함께 send() 메서드를 호출하면 응답을 받는 시점에서 자동으로 콜백 함수 호출
    • org.apache.kafka.clients.producer.Callback 인터페이스를 구현하는 클래스가 필요함
    • 콜백은 프로듀서의 메인 스레드에서 실행 -> 콜백이 충분히 빨라야함

프로듀서 설정하기

  1. client.id

    • 프로듀서와 그것을 사용하는 애플리케이션을 구분하기 위한 논리적 식별자
  2. acks

    • 프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지 결정
    • 신뢰성과 프로듀서 지연 사이에는 트레이드 오프 관계가 있음
    • acks=0 : 프로듀서는 메시지가 성공적으로 전달되었다고 간주하고 브로커의 응답을 기다리지 않음
    • acks=1 : 프로듀서는 리더 레클리카가 메시지를 받는 순간 브로커로부터 성공했다는 응답을 받는다.
    • acks=all : 프로듀서는 메시지가 모든 인-싱크 레플리카 (in-sync replica)에 전달된 뒤에야 브로커로부터 성공했다는 응답을 받는다.
  3. 메시지 전달 시간

    • send()를 호출했을 때 성공 혹은 실패하기까지 시간
    • 현재 버전의 카프카에서 이 값들을 조정하는 것을 권장하지 않음
    • 크래시 난 브로커가 정상으로 돌아오기까지의 시간을 테스트한 뒤 delivery.timeout.ms 매개변수를 잡아주는 것을 권장
    1. max.block.ms: 메타데이터를 요청했을 때 프로듀서가 얼마나 오랫동안 블록되는지를 결정
    2. delivery.timeout.ms: 레코드 전송 준비가 완료된 시점에서부터 브로커의 응답을 받거나, 전송을 포기하게 되는 시점까지의 제한시간
    3. request.timeout.ms: 프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴 것인지를 결정
    4. retries: 메시지 재전송 횟수
    5. retry.backoff.ms: 재전송 간격 조정
  4. linger.ms: 현재 배치를 전송하기 전까지 대기하는 시간

  5. buffer.memory: 메시지를 대기시키는 버퍼의 크기

  6. compression.type: 매개변수 snappy, gzip, lz4, zstd 중 하나로 설정하면 해당 압축 알고리즘 사용

  7. batch.size: 각각의 배치에 사용될 메모리 양 결정

  8. max.in.flight.requests.per.connection: 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수

    • 원래 메시지 순서는 보존하지만 재시도하면서 순서가 뒤집어질 수 있음
  9. max.request.size: 전송하는 쓰기 요청의 크기

  10. receive.buffer.bytes, send.buffer.bytes: 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기를 결정

  11. enable.idempotence: 메시지 중복 저장을 방지

시리얼라이저

  • 모두 같은 로직을 가지고 있어야하고 변경이 힘들기 때문에 범용 라이브러리 사용을 권장
  • 아파치 에이브로
    • 데이터를 읽는 쪽 애플리케이션을 전부 변경하지 않고 스키마를 변경하더라도 예외나 에러가 발생하지 않음
    • 데이터를 쓸 때 사용하는 스키마와 읽을 떄 기대하는 스키마가 호환되어야 함
    • 역직렬화를 할 때는 데이터를 쓸 때 사용했던 스키마에 접근이 가능해야함

파티션

  • 접착성 처리: 요청의 수를 줄임

헤더

  • 레코드의 키/밸류를 건드리지 않고 추가 메타데이터를 심을 때 사용함

인터셉터

  • 클라이언트의 코드를 고치지 않으면서, 그 작동을 변경해야 하는 경우 사용
  • ProducerInterceptor
    • onSend, onAcknowledgment
  • 모니터링, 정보 추적, 표준 헤더 삽입
  • kafka-console-producer.sh 툴과 함께 사용하여 클라이언트 코드를 고치지 않고 인터셉터 적용

쿼터, 스로틀링

  • 쿼터(quota): 한도
    • 쓰기 쿼터 (produce quota), 읽기 쿼터 (consume quota), 요청 쿼터 (request quota)
    • 브로커 설정 파일에서 지정 가능, kafka-configs.sh
    • JMX 메트릭을 통해서 스로틀링 작동 여부를 확인
      • produce-throttle-time-avg
      • produce-throttle-time-max
      • fetch-throttle-time-avg
      • fetch-throttle-time-max