카프카 컨슈머

카프카, 데이터 플랫폼의 최강자 의 5장 카프카 컨슈머 요약

컨슈머의 주요 기능

  • 특정 파티션을 관리하고 있는 파티션 리더에게 메시지를 가져오도록 요청
    • 각 요청은 로그의 오프셋을 명시
    • 그 위치로 부터 로그 메시지를 수신
    • 이를 통해 가져올 메시지의 위치를 조정, 필요한 경우 이미 가져온 데이터도 다시 수신 가능
  • 버그를 수정한 후 가져왔던 메시지들을 다시 가져올 수 있음
    • 래빗 엠큐와 같은 일반적인 메시지큐 솔루션에서는 제공하지 않는 기능

주키퍼 사용 여부에 따른 컨슈머 종류

  • 올드 컨슈머(deprecated)
    • 컨슈머의 오프셋을 주키퍼의 지노드에 저장하는 방식을 지원
  • 뉴 컨슈머
    • 오프셋 저장을 주키퍼가 아닌 카프카의 토픽에 저장하는 방식

컨슈머 주요 옵션

bootstrap.servers

  • 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보

fetch.min.bytes

  • 한 번에 가져올 수 있는 최소 데이터 사이즈
    • 만약 지정한 사이즈보다 작은 경우, 요청에 대해 응답하지 않고 데이터가 누적될 때까지 대기

group.id

  • 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자
  • 그룹 아이디는 매주 중요

enable.auto.commit

  • 백그라운드로 주기적으로 오프셋을 커밋

auto.offset.reset

  • 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않는 경우(데이터가 삭제)에 오프셋을 리셋
  • earliest
    • 가장 초기의 오프셋으로 설정
  • latest
    • 가장 마지막의 오프셋 값으로 설정
  • none
    • 이전 오프셋 값을 찾지 못하면 에러는 발생

request.timeout.ms

  • 요청에 대해 응답을 기다리는 최대 시간

session.timeout.ms

  • 컨슈머와 브로커 사이의 세션 타임 아웃 시간
    • 브로커가 컨슈머가 살아있는 것으로 판단하는 시간
    • 기본값 10초
  • 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 session.timeout.ms이 지나면 해당 컨슈머는 종료되거나 장애가 발생한 것으로 판단
    • 컨슈머 그룹은 리밸런스를 시도
  • 하트비트 없이 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어
  • heartbeat.interval.ms와 밀접한 관련이 있음
    • 두 속성이 함께 수정
  • session.timeout.ms를 짧게 설정하면 실패를 빨리 감지
    • GC나 poll loop를 완료하는 시간이 길어지게 되면 원하지 않게 리밸런스가 발생
  • session.timeout.ms를 길게 설정
    • 리밸런스가 일어날 가능성은 줄지만 실제 오류를 감지하는 시간이 오래 소요

heartbeat.interval.ms

  • 그룹 코디네이터에게 얼마나 자주 KafkaConsumer poll() 메소드로 하트비트를 보낼 것인지 조정
  • session.timeout.ms 보다 낮아야 함
  • 일반적으로 1/3으로 설정

max.poll.records

  • 단일 호출 poll()에 대한 최대 레코드 수를 조정
  • 애플리케이션이 폴링 루프에서 데이터 양을 조정

max.poll.interval.ms

  • 컨슈머가 살아있는지 체크하기 위해 하트비트를 주기적으로 보내는데, 하트비트만 보내고 실제로 메시지를 가져가지 않는 경우
  • 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단
    • 컨슈머 그룹에서 추방
    • 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게 변경

카프카 컨슈밍 - 자바

  • 메시지를 가져오기 위해 카프카에 지속적으로 poll() 을 요청
  • 컨슈머는 카프카에 폴링하는 것을 계속 유지
    • 그렇지 않으면 종료된 것으로 간주
    • 컨슈머에게 할당된 파티션은 다른 컨슈머에게 전달
  • poll()은 타임 아웃 주기 이고, 데이터가 컨슈머 버퍼에 없다면 poll()은 얼마나 오랫동안 블럭할지를 조정
    • 만약 0으로 설정하면 poll()은 즉시 리턴하게 되고
    • 값을 입력하면 정해진 시간 동안만 대기
  • poll()은 레코드 전체를 리턴
    • 레코드에는 토픽, 파티션, 파티션의 오프셋, 키, 값을 포함
    • n개의 메시지 처리를 위해 반복문이 필요
  • close()를 통해서 네트워크 연결과 소켓을 종료
    • 컨슈머가 하트비트를 전송하지 않음
    • 그룹 코디네이터가 해당 컨슈머가 종료된 것으로 감지하는 것보다 빠르게 감지하게 되고 즉시 리밸런스가 발생

파티션 3개로 구성한 토픽과 메시지 순서

  • 환경
    • 파티션 3
    • 리플리케이션 팩터 1
  • 프로듀서가 메시지를 전달한 순서를 보장하지 않음
    • 지극히 정상임
  • 동일한 파티션 내에서는 프로듀서가 생성한 순서와 동일하게 처리
    • 파티션과 파티션 사이에서는 순서를 보장하지 않음

파티션 1개로 구성한 토픽과 메시지 순서

  • 메시지의 순서를 반드시 보장해야 하는 경우
  • 메시지의 순서는 보장되지만 파티션 수가 하나이기 때문에 분산 처리 불가능
  • 하나의 컨슈머에서 처리되기 때문에 처리량이 떨어짐

컨슈머 그룹

  • 하나의 토픽에 여러 컨슈머 그룹이 동시에 접속해 메시지를 가져올 수 있음
  • 하나의 데이터를 다양한 용도로 사용하는 요구가 많아졌음
  • 컨슈머가 메시지를 가져가는 속도보다 프로듀싱 속도가 빠르면?
    • 컨슈머가 처리하지 못한 메시지들이 점점 적재
    • 카프카로 메시지가 들어오는 시간과 메시지가 처리하는 시간 사이의 캡이 커짐
  • 단순하게 컨슈머만 확장하는 경우
    • 기존의 컨슈머 오프셋 정보와 새로 추가된 컨슈머의 오프셋 정보가 혼재
    • 동일한 토픽에 대해 여러 컨슈머가 메시지를 가져갈 수 있도록 컨슈머 그룹이라는 기능을 사용
  • 컨슈머 그룹을 통해서 컨슈머는 확장이 용이
    • 컨슈머의 장애에도 빠른 대처가 가능

컨슈머 그룹의 동작

  • 환경
    • 파티션 3개
    • 컨슈머 1개
  • 카프카에 메시지가 계속 쌓이게 됨
  • 컨슈머 그룹 내의 컨슈머들은 토픽의 파티션에 대해 소유권을 공유
  • 컨슈머 2개 추가
    • 파티션과 컨슈머가 1:1로 매칭
  • 리밸런스
    • 토픽의 파티션에 대한 소유권이 이동하는 것
    • 컨슈머를 쉽고 안전하게 추가/제거 가능
    • 높은 가용성과 확장성을 제공
  • 리밸런스의 단점
    • 일시적으로 컨슈머는 메시지 소비를 할 수 없음
    • 컨슈머 그룹 전체가 일시적으로 사용할 수 없음

파티션과 컨슈머

  • 토픽의 파티션에는 하나의 컨슈머만 연결 가능
    • 파티션 내에서는 메시지의 순서가 보장
    • 하나의 파티션에 두 개의 컨슈머가 연결되면 안정적으로 메시지 순서를 보장 할 수 없음
  • 토픽의 파티션도 늘려주고, 컨슈머 수도 같이 늘려줘야 함
  • 컨슈머 그룹 안에서 멤버로 유지하고 할당된 파티션의 소유권을 유지하는 방법
    • 하트비트를 보내는 것
    • 컨슈머가 일정한 주기로 하트비트를 보낸다는 사실은 해당 파티션의 메시지를 잘 처리하고 있다는 의미
  • 하트비트는 언제 나가나 ?
    • 컨슈머가 poll 할 때 가져간 메시지의 오프셋을 커밋할 때 보내짐

하나의 토픽(큐)에 대해 여러 용도로 사용 가능

  • 일반적인 메시지 큐 솔루션에서 특정 컨슈머가 메시지를 가져가면, 큐에서 메시지가 삭제
    • 다른 컨슈머는 가져갈 수 없음
  • 카프카에서는 컨슈머가 메시지를 가져가더라도 삭제하지 않음
    • 하나의 메시지를 여러 컨슈머가 다른 용도로 사용할 수 있도록 시스템을 구성
  • 여러 컨슈머 그룹들이 하나의 토픽에서 메시지를 가져갈 수 있는 이유는?
    • 컨슈머 그룹 마다 각자의 오프셋을 별도로 관리
  • 여러 컨슈머 그룹이 동시에 하나의 토픽의 메시지를 이용하는 경우
    • 컨슈머 그룹 아이디는 서로 중복되지 않아야 함
  • 다수의 컨슈머, 컨슈머 그룹 아이디, 오프셋은 밀접한 관계가 있음

커밋과 오프셋

  • 컨슈머가 poll()을 호출할 때마다 컨슈머 그룹은 카프카에 저장되어 있는 아직 읽지 않은 메시지를 가져옴
    • 컨슈머 그룹이 메시지를 어디까지 가져갔는지 알고 있음
    • 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치정보(오프셋)을 기록
  • commit
    • 각 파티션에 대해 현재 위치를 업데이트하는 동작
  • 카프카는 각 컨슈머 그룹의 파티션 별로 오프셋 정보를 저장하기 위한 저장소가 별도로 필요
    • 카프카 내에서 별도로 사용하는 토픽(__consumer_offsets)을 만들고 그 토픽에 오프셋 정보를 저장
  • 리밸런스가 일어나면 각각의 컨슈머는 이전에 처리했던 토픽의 파티션이 나닌 다른 새로운 파티션에 할당
    • 새로운 파티션에 대해서 가장 최근 커밋된 오프셋을 읽고 그 이후부터 메시지들을 가져옴
    • 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 작으면 마지막 처리된 오프셋과 커밋된 오프셋 사이의 메시지는 중복으로 처리

자동 커밋

  • enable.auto.commit = true
  • auto.commit.invertal.ms 주기로 컨슈머는 poll() 을 호출할 때, 가장 마지막 오프셋을 커밋
    • poll 을 요청할 때마다 커밋할 시간인지 아닌지 체크
    • poll 요청으로 가져온 마지막 오프셋을 커밋
  • 중복이 발생할 수 있음
  • 최악의 경우
    • 데이터베이스에는 메시지가 처리되지 않은 채로 컨슈머 장애가 발생하면 메시지 손실이 발생

수동 커밋

  • 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안 되는 경우 사용
  • 데이터베이스에 메시지를 저장한 후 커밋을 해야만 안전하게 메시지를 저장할 수 있음
  • 메시지를 가져온 것으로 간주되는 시점을 자유롭게 조정할 수 있음
  • 최악의 경우, 메시지 중복이 발생할 수 있음
    • 데이터베이스에 저장하는 도중에 실패

특정 파티션 할당

  • 특정 파티션에 대해 세밀하게 제어하기를 원하는 경우
    • 키-값의 형태로 파티션에 저장되어 있고, 특정 파티션에 대한 메시지들만 가져와야 하는 경우
    • 컨슈머 프로세스가 가용성이 높은 구성인 경우, 카프카가 컨슈머의 실패를 감지하고 재조정할 필요 없고 자동으로 컨슈머 프로세스가 다른 시스템에서 재시작되는 경우
  • 컨슈머 인스턴스마다 컨슈머 그룹 아이디를 다르게 할당
    • 동일한 컨슈머 그룹 아이디를 사용하게 되면 컨슈머마다 할당된 파티션에 대한 오프셋 정보를 서로 공유하기 때문에

특정 오프셋으로부터 메시지 가져오기

  • seek(파티션 번호, 오프셋 번호) 사용
  • consumer.seek
    • consumer.poll 의 위치를 지정할 수 있음
RequestBody의 내용을 로그로 남기고 싶다(2) 카프카 프로듀서

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×