카프카, 데이터 플랫폼의 최강자 의 5장 카프카 컨슈머 요약
컨슈머의 주요 기능
- 특정 파티션을 관리하고 있는 파티션 리더에게 메시지를 가져오도록 요청
- 각 요청은 로그의 오프셋을 명시
- 그 위치로 부터 로그 메시지를 수신
- 이를 통해 가져올 메시지의 위치를 조정, 필요한 경우 이미 가져온 데이터도 다시 수신 가능
- 버그를 수정한 후 가져왔던 메시지들을 다시 가져올 수 있음
- 래빗 엠큐와 같은 일반적인 메시지큐 솔루션에서는 제공하지 않는 기능
주키퍼 사용 여부에 따른 컨슈머 종류
- 올드 컨슈머(deprecated)
- 컨슈머의 오프셋을 주키퍼의 지노드에 저장하는 방식을 지원
- 뉴 컨슈머
- 오프셋 저장을 주키퍼가 아닌 카프카의 토픽에 저장하는 방식
컨슈머 주요 옵션
bootstrap.servers
- 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보
fetch.min.bytes
- 한 번에 가져올 수 있는 최소 데이터 사이즈
- 만약 지정한 사이즈보다 작은 경우, 요청에 대해 응답하지 않고 데이터가 누적될 때까지 대기
group.id
- 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자
- 그룹 아이디는 매주 중요
enable.auto.commit
- 백그라운드로 주기적으로 오프셋을 커밋
auto.offset.reset
- 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않는 경우(데이터가 삭제)에 오프셋을 리셋
- earliest
- 가장 초기의 오프셋으로 설정
- latest
- 가장 마지막의 오프셋 값으로 설정
- none
- 이전 오프셋 값을 찾지 못하면 에러는 발생
request.timeout.ms
- 요청에 대해 응답을 기다리는 최대 시간
session.timeout.ms
- 컨슈머와 브로커 사이의 세션 타임 아웃 시간
- 브로커가 컨슈머가 살아있는 것으로 판단하는 시간
- 기본값 10초
- 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 session.timeout.ms이 지나면 해당 컨슈머는 종료되거나 장애가 발생한 것으로 판단
- 컨슈머 그룹은 리밸런스를 시도
- 하트비트 없이 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어
- heartbeat.interval.ms와 밀접한 관련이 있음
- 두 속성이 함께 수정
- session.timeout.ms를 짧게 설정하면 실패를 빨리 감지
- GC나 poll loop를 완료하는 시간이 길어지게 되면 원하지 않게 리밸런스가 발생
- session.timeout.ms를 길게 설정
- 리밸런스가 일어날 가능성은 줄지만 실제 오류를 감지하는 시간이 오래 소요
heartbeat.interval.ms
- 그룹 코디네이터에게 얼마나 자주 KafkaConsumer poll() 메소드로 하트비트를 보낼 것인지 조정
- session.timeout.ms 보다 낮아야 함
- 일반적으로 1/3으로 설정
max.poll.records
- 단일 호출 poll()에 대한 최대 레코드 수를 조정
- 애플리케이션이 폴링 루프에서 데이터 양을 조정
max.poll.interval.ms
- 컨슈머가 살아있는지 체크하기 위해 하트비트를 주기적으로 보내는데, 하트비트만 보내고 실제로 메시지를 가져가지 않는 경우
- 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단
- 컨슈머 그룹에서 추방
- 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게 변경
카프카 컨슈밍 - 자바
- 메시지를 가져오기 위해 카프카에 지속적으로 poll() 을 요청
- 컨슈머는 카프카에 폴링하는 것을 계속 유지
- 그렇지 않으면 종료된 것으로 간주
- 컨슈머에게 할당된 파티션은 다른 컨슈머에게 전달
- poll()은 타임 아웃 주기 이고, 데이터가 컨슈머 버퍼에 없다면 poll()은 얼마나 오랫동안 블럭할지를 조정
- 만약 0으로 설정하면 poll()은 즉시 리턴하게 되고
- 값을 입력하면 정해진 시간 동안만 대기
- poll()은 레코드 전체를 리턴
- 레코드에는 토픽, 파티션, 파티션의 오프셋, 키, 값을 포함
- n개의 메시지 처리를 위해 반복문이 필요
- close()를 통해서 네트워크 연결과 소켓을 종료
- 컨슈머가 하트비트를 전송하지 않음
- 그룹 코디네이터가 해당 컨슈머가 종료된 것으로 감지하는 것보다 빠르게 감지하게 되고 즉시 리밸런스가 발생
파티션 3개로 구성한 토픽과 메시지 순서
- 환경
- 파티션 3
- 리플리케이션 팩터 1
- 프로듀서가 메시지를 전달한 순서를 보장하지 않음
- 지극히 정상임
- 동일한 파티션 내에서는 프로듀서가 생성한 순서와 동일하게 처리
- 파티션과 파티션 사이에서는 순서를 보장하지 않음
파티션 1개로 구성한 토픽과 메시지 순서
- 메시지의 순서를 반드시 보장해야 하는 경우
- 메시지의 순서는 보장되지만 파티션 수가 하나이기 때문에 분산 처리 불가능
- 하나의 컨슈머에서 처리되기 때문에 처리량이 떨어짐
컨슈머 그룹
- 하나의 토픽에 여러 컨슈머 그룹이 동시에 접속해 메시지를 가져올 수 있음
- 하나의 데이터를 다양한 용도로 사용하는 요구가 많아졌음
- 컨슈머가 메시지를 가져가는 속도보다 프로듀싱 속도가 빠르면?
- 컨슈머가 처리하지 못한 메시지들이 점점 적재
- 카프카로 메시지가 들어오는 시간과 메시지가 처리하는 시간 사이의 캡이 커짐
- 단순하게 컨슈머만 확장하는 경우
- 기존의 컨슈머 오프셋 정보와 새로 추가된 컨슈머의 오프셋 정보가 혼재
- 동일한 토픽에 대해 여러 컨슈머가 메시지를 가져갈 수 있도록 컨슈머 그룹이라는 기능을 사용
- 컨슈머 그룹을 통해서 컨슈머는 확장이 용이
- 컨슈머의 장애에도 빠른 대처가 가능
컨슈머 그룹의 동작
- 환경
- 파티션 3개
- 컨슈머 1개
- 카프카에 메시지가 계속 쌓이게 됨
- 컨슈머 그룹 내의 컨슈머들은 토픽의 파티션에 대해 소유권을 공유
- 컨슈머 2개 추가
- 파티션과 컨슈머가 1:1로 매칭
- 리밸런스
- 토픽의 파티션에 대한 소유권이 이동하는 것
- 컨슈머를 쉽고 안전하게 추가/제거 가능
- 높은 가용성과 확장성을 제공
- 리밸런스의 단점
- 일시적으로 컨슈머는 메시지 소비를 할 수 없음
- 컨슈머 그룹 전체가 일시적으로 사용할 수 없음
파티션과 컨슈머
- 토픽의 파티션에는 하나의 컨슈머만 연결 가능
- 파티션 내에서는 메시지의 순서가 보장
- 하나의 파티션에 두 개의 컨슈머가 연결되면 안정적으로 메시지 순서를 보장 할 수 없음
- 토픽의 파티션도 늘려주고, 컨슈머 수도 같이 늘려줘야 함
- 컨슈머 그룹 안에서 멤버로 유지하고 할당된 파티션의 소유권을 유지하는 방법
- 하트비트를 보내는 것
- 컨슈머가 일정한 주기로 하트비트를 보낸다는 사실은 해당 파티션의 메시지를 잘 처리하고 있다는 의미
- 하트비트는 언제 나가나 ?
- 컨슈머가 poll 할 때 가져간 메시지의 오프셋을 커밋할 때 보내짐
하나의 토픽(큐)에 대해 여러 용도로 사용 가능
- 일반적인 메시지 큐 솔루션에서 특정 컨슈머가 메시지를 가져가면, 큐에서 메시지가 삭제
- 다른 컨슈머는 가져갈 수 없음
- 카프카에서는 컨슈머가 메시지를 가져가더라도 삭제하지 않음
- 하나의 메시지를 여러 컨슈머가 다른 용도로 사용할 수 있도록 시스템을 구성
- 여러 컨슈머 그룹들이 하나의 토픽에서 메시지를 가져갈 수 있는 이유는?
- 컨슈머 그룹 마다 각자의 오프셋을 별도로 관리
- 여러 컨슈머 그룹이 동시에 하나의 토픽의 메시지를 이용하는 경우
- 컨슈머 그룹 아이디는 서로 중복되지 않아야 함
- 다수의 컨슈머, 컨슈머 그룹 아이디, 오프셋은 밀접한 관계가 있음
커밋과 오프셋
- 컨슈머가 poll()을 호출할 때마다 컨슈머 그룹은 카프카에 저장되어 있는 아직 읽지 않은 메시지를 가져옴
- 컨슈머 그룹이 메시지를 어디까지 가져갔는지 알고 있음
- 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치정보(오프셋)을 기록
- commit
- 각 파티션에 대해 현재 위치를 업데이트하는 동작
- 카프카는 각 컨슈머 그룹의 파티션 별로 오프셋 정보를 저장하기 위한 저장소가 별도로 필요
- 카프카 내에서 별도로 사용하는 토픽(__consumer_offsets)을 만들고 그 토픽에 오프셋 정보를 저장
- 리밸런스가 일어나면 각각의 컨슈머는 이전에 처리했던 토픽의 파티션이 나닌 다른 새로운 파티션에 할당
- 새로운 파티션에 대해서 가장 최근 커밋된 오프셋을 읽고 그 이후부터 메시지들을 가져옴
- 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 작으면 마지막 처리된 오프셋과 커밋된 오프셋 사이의 메시지는 중복으로 처리
자동 커밋
- enable.auto.commit = true
- auto.commit.invertal.ms 주기로 컨슈머는 poll() 을 호출할 때, 가장 마지막 오프셋을 커밋
- poll 을 요청할 때마다 커밋할 시간인지 아닌지 체크
- poll 요청으로 가져온 마지막 오프셋을 커밋
- 중복이 발생할 수 있음
- 최악의 경우
- 데이터베이스에는 메시지가 처리되지 않은 채로 컨슈머 장애가 발생하면 메시지 손실이 발생
수동 커밋
- 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안 되는 경우 사용
- 데이터베이스에 메시지를 저장한 후 커밋을 해야만 안전하게 메시지를 저장할 수 있음
- 메시지를 가져온 것으로 간주되는 시점을 자유롭게 조정할 수 있음
- 최악의 경우, 메시지 중복이 발생할 수 있음
- 데이터베이스에 저장하는 도중에 실패
특정 파티션 할당
- 특정 파티션에 대해 세밀하게 제어하기를 원하는 경우
- 키-값의 형태로 파티션에 저장되어 있고, 특정 파티션에 대한 메시지들만 가져와야 하는 경우
- 컨슈머 프로세스가 가용성이 높은 구성인 경우, 카프카가 컨슈머의 실패를 감지하고 재조정할 필요 없고 자동으로 컨슈머 프로세스가 다른 시스템에서 재시작되는 경우
- 컨슈머 인스턴스마다 컨슈머 그룹 아이디를 다르게 할당
- 동일한 컨슈머 그룹 아이디를 사용하게 되면 컨슈머마다 할당된 파티션에 대한 오프셋 정보를 서로 공유하기 때문에
특정 오프셋으로부터 메시지 가져오기
- seek(파티션 번호, 오프셋 번호) 사용
- consumer.seek
- consumer.poll 의 위치를 지정할 수 있음
Comments