Apache Kafka 대충 사용해보기 (2)

카프카를 실제 사용해보기 위해 알아야할 추가 개념들에 대해 알아봅니다.


Offset Commit (컨슈머에 문제가 생겨 백업 컨슈머로 대체될 때 어떻게 Offset을 유지하는가?)

Kafka Offset Commit은 컨슈머 그룹 단위로 이루어진다.

컨슈머 그룹마다 각 토픽의 파티션에서 어느 오프셋까지 처리 완료했는지 정보를 기록한다.

오프셋 커밋은 처리 완료 여부를 메시지마다 기록하는 것이 아니라 처리를 완료한 메시지 중에서 최대의 오프셋을 기록하는 형태로 이루어진다. (카프카가 파티션 안의 메시지를 연속적으로 처리하는 것을 가정하고 있기 때문이다)

오프셋 커밋 정보에 의해 장애에 의한 비정상적인 재개 또는 계획된 정지 상황에서 재개할 때 메시지 재처리를 방지하고 문제 영향을 줄일 수 있다. (오프셋 커밋의 방법에 따라 영향의 정도가 달라진다.)

Offset Commit의 정보는 어디에 기록되는가?

커밋된 오프셋 정보는 __consumer_offsets라는 전용 토픽에 기록된다. 이 토픽은 일반 토픽처럼 파티션과 Replica의 구조를 하고 있다. 이 메커니즘에 의해 카프카 클러스터는 오프셋 커밋 처리를 분산할 수 있으며, 여러 대의 브로커가 정지해도 데이터 손실 없이 처리할 수 있다.

Offset Commit의 방법은?

오프셋 커밋의 방법에는 Auto Offset Commit과 Manual Offset Commit이 있다.

1. Auto Offset Commit

먼저 자동 오프셋 커밋은 일정 간격마다 자동으로 오프셋 커밋을 하는 방식이다. 컨슈머의 옵션 중 enable.auto.commit을 true로 설정하여 사용할 수 있다.

오프셋 커밋의 간격또한 auto.commit.interval.ms로 지정할 수 있으며, 기본값은 5초다. 설정된 타이밍에 카프카 클러스터에서 완료된 메시지에 대해 오프셋 커밋을 실행한다.

장점은 컨슈머 어플리케이션에서 오프셋 커밋을 명시적으로 실시할 필요가 없어 컨슈머 어플리케이션이 간결해진다.

단점은 컨슈머에 장애가 발생했을 때 메시지가 손실되거나, 여러 메시지의 재처리가 발생할 수 있다는 단점이 있다.

만약 5초동안 20개의 이벤트를 처리하고 있었는데 장애가 발생했다면 실제 20개의 이벤트에 대해 처리는 했지만 Offset Commit기록이 없어서 재처리를 하게 될 수 있고, 일부 이벤트에 대해 제대로 처리를 못했음에도 자동으로 Offset Commit이 추가되면서 손실이 되는 경우가 발생할 수도 있을 것이다.

또한 Auto Offset Commit을 하게 되면 매 번 폴링 즉, pool()을 호출할 때마다 commit할 시간이 되었는지 확인하기 때문에 쓸데없이 리소스를 먹게 된다.

2. Manual Offset Commit

수동 오프셋 커밋은 컨슈머 어플리케이션 내에서 KafkaConsumer의 commitSync & commitAsync라는 메서드를 통해 오프셋 커밋을 실행한다.

수동 오프셋 커밋의 장점은 구조를 이해하고 적절히 사용함으로써 메시지 손실 혹은 재처리에 대해 방지를 할 수 있다는 점이다. 언제라도 오프셋 커밋을 수행할 수 있음으로 인해 메시지 처리가 완료된 시점에서 커밋을 할 수 있다.

물론 이 구조도 생각을 해보자면 최악의 상황에서 최대 1건의 메시지 재처리는 이루어질 수 있다.

만약 1개의 이벤트에 대해 처리를 완료하고 커밋을 한다고 가정했을 때, 1개의 이벤트가 처리는 되었지만 커밋하는 과정에서 문제가 생긴다면 ..!

단점은 자동 오프셋 커밋에 비해 자주 커밋 처리를 실시하므로 카프카 클러스터 부하가 높아진다는 점에서 주의가 필요하다는 것이다. (하지만 메시지 양이 적다면 단점이라고 보긴 힘들다)

자동 오프셋 커밋과 수동 오프셋 커밋의 공통점으로는 카프카에 송신된 모든 메시지는 반드시 1회 이상 컨슈머에서 수신되는 것을 보장한다는 것이다.

이러한 성질을 카프카에서는 At Least Once라고 한다.

자동 오프셋 리셋

컨슈머는 이러한 오프셋 커밋을 토대로 메시지 처리를 시작할 오프셋을 결정한다.

그러나 시작할 때 오프셋 커밋 기록이 존재하지 않는 경우나 기록되어 있는 오프셋이 유효하지 않은 경우 (만약 장애가 발생해서 정지되었을 때 일정 시간이 지나 오래되어 사라지는 경우) 에는 지정된 정책에 따라 오프셋 초기화를 실시하여 메시지 처리를 시작할 오프셋을 새로 결정한다.

이 초기화 처리를 자동 오프셋 리셋이라고 한다.

자동 오프셋 리셋도 앞서 다른 설정들과 마찬가지로 컨슈머 어플리케이션에서 auto.offset.reset이라는 옵션을 통해 지정할 수 있다. 그 정책에는 아래 3가지가 있다.

  1. latest - 파티션의 가장 최근의 오프셋으로 초기화. 이미 존재하는 메시지는 처리할 수 없다
  2. earliest - 파티션의 가장 오래된 오프셋으로 초기화. 이미 존재하는 메시지 모두를 재처리한다
  3. none - 유효한 오프셋 커밋 정보가 없는 경우에 예외를 반환한다

명시적으로 지정을 원할 때 none으로 설정을 하고, 처리의 목적과 요구 사항을 감안해 적절한 정책을 설정하면 된다고 한다.

파티션 재배치

카프카에서의 파티션은 하나 이상의 Replica를 가지고, 카프카 클러스터 중 어느 하나의 브로커에 보관되어 있다고 하였다.

일반적으로 이 복제본은 작성된 때 배치된 브로커에서 계속 보관되지만 특정 이유로 배치를 변경하고 싶은 경우가 있다.

예를 들면 카프카 클러스터의 브로커 수를 증감시키는 경우이다. 카프카에서는 계획적인 정지나 장애로 인한 정지를 불문하고 브로커가 보유하고 있던 Replica를 다른 브로커로 자동으로 이동시키지 않는다.

따라서 운영 환경에서 브로커 수를 줄이려고 할 때에는 감소시킬 브로커가 보유하고 있는 Replica를 미리 다른 브로커로 이동시켜야 한다.

마찬가지로 브로커 수를 늘리려고 할 때에도 새로 추가할 브로커에 Replica를 배치하여 메시지의 송수신 부하를 균등하게 배분시켜야 한다.

이 작업을 파티션 재배치라고 하며, 파티션 재배치를 할 때에는 새로운 Replica를 배치할 브로커에 Replica를 생성하여 동기화시킨 후에 제거해야 한다.

주의해야할 점은 파티션 재배치를 하는 과정에서는 평소보다도 카프카 클러스터 부하가 늘어나는 것에 주의가 필요하다는 것이다. (파티션 재배치에 따른 동기화 처리에 사용되는 네트워크 대역폭에 제한을 걸 수 있다고 한다)

파티션 재배치 방법은?

파티션 재배치를 실행하려면 카프카에 부속된 스크립트인 kafka-reassign-partitions.sh 쉘을 통해 수행할 수 있다.

levi라는 토픽 내에 Partition 0의 Replica가 Broker 1, 2, 3에 보관되어 있는 상태에서 Broker 3, 4, 5에 보관되는 상태로 변경하고, Partition 1의 Replica가 Broker 4, 5, 6에 보관되어 있는 상태에서 Broker 1, 2, 3에 보관되는 상태로 서로 변경하는 경우를 예시로 들겠다.

{ 
    "version": 1,
    "partitions": [
        {
            "topic": "test1", "partition": 0, "replicas": [4, 5, 6]
        },
        {
            "topic": "test1", "partition": 1, "replicas": [1, 2, 3]
        }
    ]
}

위와 같은 json 파일을 카프카 클라이언트의 사용자 홈 디렉토리 아래에 reassignment.json이라는 파일명으로 지정하고 kafka-reassign-partitions.sh 쉘을 실행하면 재배치가 완료된다.

BackPressure

만약 컨슈머의 처리량에 비해 프로듀서가 매우 높은 비율로 메세지를 발행하는 상황이 생긴다면 어떻게 처리할 수 있을까?

조금 다른 예지만, 우리는 살면서 한번쯤은 액체가 역류하는 순간을 마주치게 된다. 역류하게 되는 원인은 바로 배압, Backpressure 때문이다.

Backpressure란 무엇인가? 파이프를 통한 유체 흐름에 반하는 저항, 힘을 말한다. 액체나 증기가 관을 통해 배출 될 때, 유체가 흐르는 방향과 반대 방향으로 작용하는 저항 압력이다.

예를 들면, 세면대에서 물을 사용할 때 배수관의 구부러진 곳에서 역방향의 저항이 발생하면서 물이 즉시 내려가는 게 아니라 멈췄다가 서서히 내려가는 현상도 배압의 증거라고 볼 수 있다. (배수관이 존재하는 이유도 냄새가 올라오지 못하게 하는 것이기 때문에 배압의 증거라고 볼 수 있음)

소프트웨어에서의 Backpressure는 내용물만 액체나 기체가 아닐 뿐이지만, 데이터로서 존재한다.

Backpressure가 필요한 이유는 뭘까? 만약 운영 환경에서 프로듀서가 컨슈머를 폭파시켜버릴 정도로 메세지를 발행시키면 그를 처리하는 컨슈머는 이런 경우를 대비해서 Backpressure를 통해 입력율을 줄여 처리량과 처리 시간을 줄일 수 있다.

Kafka는 Polling 구조

일반적으로 다른 메시징 큐는 메시지 큐에서 메시지를 Push 한다. 카프카 구성 요소로 예를 들면 브로커가 컨슈머로 메시지를 보내는 방식이다. 하지만 이런 Push 방식의 가장 큰 단점은 메시지 큐가 컨슈머 측의 처리 성능을 염두해야 한다.

즉, 메시지 큐가 컨슈머로 메시지를 Push 할 때, “컨슈머가 이 정도는 처리할 수 있겠지” 라고 컨슈머 환경을 고려해야 한다.

이와 반대로 카프카는 컨슈머가 브로커로부터 메시지를 요청하는 Polling 구조로 설계되었다. 즉, 컨슈머가 자신이 원하는 만큼 브로커로 메시지를 요청한다.

이러한 구조의 가장 큰 장점은 각 컨슈머가 자신의 환경에 메시지 구독 성능을 최적화할 수 있다는 것이다.

추가로 브로커는 컨슈머가 요청하는 것만큼 메시지를 전달해주기만 하면 되기 때문에 컨슈머의 환경을 고려할 필요가 없다. (마치 객체지향 SOLID 원칙 중 단일 책임 원칙처럼)

보통 메시징 큐 시스템에서 기본적으로 Producer가 폭발적인 속도로 메시지를 전송할 때 처리가 느린 Consumer에서 문제를 일으킬 수 있다.

image

이러한 상황에서 적절하게 리소스 문제를 방지해야 하는데, 위에서 살펴본 Polling구조로 인해 Consumer가 자신이 원하는 페이스대로 메시지를 처리함으로써 일반적인 Backpressure를 수행할 수 있다.

image

하지만 시스템 간 메시지의 동기 처리에 의존하는 이 접근법은 비동기로 빠르게 쳐내야할 경우에서는 비효율적일 수 있을 것으로 보이고 Consumer의 상태에 따라 동적으로 처리하기에는 아쉬움이 있는 듯 하다(?)

Categories:

Updated:

Leave a comment