링크드인은 왜 Kafka를 만들었을까?

링크드인이 2011년에 개발한 Kafka의 탄생 배경과 실현 목표, 그를 이루기 위해 어떻게 구현했는지에 대해 알아봅니다.


Kafka란?

여러 대의 분산 서버에서 대량의 데이터를 처리하는 분산 메시징 시스템

메시지를 받고, 보내기 위해 사용되며 여러 시스템과 장치를 연결하는 중요한 역할을 함

탄생 배경

2011년 미국 링크드인에서 웹사이트에서 생성되는 로그를 처리하여 활동을 추적하는 것을 목적으로 개발함.

빅데이터를 어떻게 활용할 것인가? → 대량의 로그를 빠르게 분석하여 사용자의 활동을 모니터링하고 서비스 개선에 활용

실현 목표

1. 높은 처리량으로 실시간 처리 (주요 목적)

방대한 데이터를 처리하는 목적 및 사용자의 활동에 따라 즉시 피드백하기 위한 목적

2. 임의의 타이밍에서 데이터를 읽음 (주요 목적)

기존 시스템에서 수집한 로그로 배치 처리 목적 및 방대한 데이터를 전달할 때 필요한 버퍼 목적

3. 다양한 제품과 시스템에 쉽게 연동

데이터 소스와 관련된 시스템이 하나가 아니므로 쉽게 연결할 수 있기 위한 목적 (하둡 외 데이터베이스, 데이터 웨어하우스)

4. 메시지를 잃지 않음

취급하는 메시지가 방대하더라도 메시지를 잃지 않아야 하지만 트랜잭션이 엄격해지면 오버헤드가 커지므로, ‘높은 처리량으로 실시간 처리’라는 요건과 균형을 가미하여 현실적으로 제거해도 좋은 것을 찾기 위한 목적

이전 제품을 선택하지 않고 새로 만든 이유

1. 강력한 트랜잭션의 오버 스펙 효과

엄격한 트랜잭션보다 높은 처리량의 실현의 우선순위가 더 높아야 하며, 송수신 보증 > 처리량은 바람직하지 않다

2. 메시지가 대량으로 쌓이는 경우를 예상하지 않음

메시지 큐는 즉시 이용되는 것으로 예상되었지, 장시간에 걸쳐 데이터를 축적하는 경우는 예상하지 않았음

따라서 실시간 처리 뿐만 아니라 배치 처리를 할 때 기존 메시지 큐로는 감당할 수 없음 (고가의 스토리지 시스템으로 데이터를 디스크에 축적한다면 영속성을 실현할 수 있지만, Kafka가 내세우는 취지와 다름)

실현 목표를 이루기 위한 Kafka의 실현 수단 및 메시징 모델

1. 메시징 모델과 스케일 아웃형 아키텍처

실현 목표 중 3가지의 요구 사항 해결을 위해 메시징 모델을 채용하였다. 일반적으로 메시징 모델은 다음 세 가지 요소로 구성된다.

Producer : 메시지 생산자

Broker : 메시지 수집 / 전달 역할

Consumer : 메시지 소비자

image

메시징 모델은 크게 2가지로 나뉘게 되는데, 큐잉 모델과 펍/섭 모델이 있다.

먼저, 큐잉 모델은 프로듀서의 메시지를 브로커의 큐에 저장해두고 그 큐에 있는 메시지를 여러 개의 컨슈머가 추출할 수 있어 병렬로 처리를 할 수 있고, 그렇게 추출된 메시지는 사라지므로 다른 컨슈머에서 처리할 수 없다는 특징이 있다.

i.e. 브로커의 메시지 입장에서 보면 처리 능력을 높이는 데에 장점이 있다.

그리고, 펍/섭 모델은 프로듀서를 퍼블리셔, 컨슈머를 섭스크라이버라고 하는데, 퍼블리셔는 브로커에 있는 토픽이라는 카테고리에 메시지를 등록해두고, 섭스크라이버는 구독하고 있는 토픽에 저장되는 메시지를 받게 되며 큐잉 모델과 달리 추출된 메시지를 다른 컨슈머에서도 처리할 수 있다는 특징이 있다.

i.e. 복수의 컨슈머에 메시지를 전달할 수 있는 장점이 있다.

둘 다 브로커를 사용한다는 공통점이 있다, 이는 변경에 강한 시스템 아키텍처를 만들기 위함이다

e.g. 프로듀서/컨슈머 역할을 하는 서버가 늘어나거나 줄어들 경우에도 유연하게 처리할 수 있다.

2. Kafka 메시징 모델

Kafka는 두 모델의 장점을 모두 실현하기 위해 컨슈머 그룹이라는 개념을 도입해 컨슈머를 확장 구성할 수 있도록 설계한다.

image

이는 결과적으로 프로듀서 ↔︎ 브로커 ↔︎ 컨슈머 가 아닌 브로커 내에 여러 개의 토픽을 만들어 프로듀서 ↔︎ 토픽 ↔︎ 컨슈머 관계가 될 수 있게 만들어준다. 또한 데이터를 쌓기 위해 여러 개의 브로커 구성으로 동작할 수 있게 되어 있다.

3. 디스크로의 데이터 영속화

Kafka는 다음 2가지 요구에 부응하기 위해 브로커로 전송된 메시지를 디스크에 영속화하고 있다.

  1. 임의의 타이밍에 데이터를 읽는다. (ex: 수집한 데이터를 통해 배치 처리를 할 때)

  2. 메시지를 잃지 않는다.

배치 처리의 경우 데이터를 일정 기간 모아야하기 때문에 메모리에서만 유지하는 것은 불가능하다.

따라서, 메시지 영속화는 디스크에서 이루어지고 디스크에 영속화함에도 불구하고 높은 처리량을 제공한다는 특징이 있다.

이를 통해 단일 브로커의 고장이 발생하더라도 즉시 데이터 손실로 이어지지 않도록 복제 구조를 갖추고 있다.

4. 이해하기 쉬운 API 제공

Kafka는 다양한 제품과 시스템에 쉽게 연동한다는 요구와 관련하여 데이터 출입을 쉽게 하는 Connect API를 제공한다.

이 API를 이용하여 각종 외부 시스템과 접속하며 카프카에 접속하기 위한 프레임워크로 Kafka Connect도 제공한다.

또한 Kafka에 존재하는 데이터를 스트림 처리하는 API를 라이브러리화한 Kafka Streams를 이용해서 자바 애플리케이션을 쉽게 만들고 작동시킬 수 있다.

5. 전달 보증

마지막으로 Kafka는 메시지를 잃지 않는다 라는 요구사항을 아래와 같이 세 가지 수준으로 전달을 보증한다.

종류 개요 비고
At Most Once 1회 전달 시도 메시지는 중복되지 않지만 상실될 수 있다
At Least Once 1회 전달 보증 메시지가 중복될 수도 있지만 상실되지는 않는다
Exactly Once 1회만 전달 메시지가 중복되지도, 상실되지도 않지만 성능이 느리다

데이터베이스 트랜잭션 격리 수준 처럼, 엄격해질수록 느려지고 느슨해질수록 빨라진다.

Kafka 개발 초기에는 성능을 중시하였기 때문에 Exactly Once는 고려하지 않았고, 최소한의 상실 방지를 위한 At Least Once를 택했다.

이를 실현하기 위해 Ack 와 Offset Commit라는 개념을 도입하였다.

  • Ack : 브로커가 프로듀서로부터 메시지를 정상 수신했다면 프로듀서에게 반환 (네트워크에서의 Ack와 같은 개념)

  • Offset commit : 컨슈머가 수신한 메시지를 정상 처리했다면 완료 기록을 브로커에 반환 (문제가 발생했을 때 어디부터 재전송하면 되는지 알 수 있음)

그 외에도 유용성이 높아지면서 트랜잭션 개념을 도입하여 At Most Once, Exactly Once와 같은 전달 보증 방식을 사용할 수 있다.

Kafka 환경 구축하기

Docker 이미지 찾아보기

먼저, Kafka를 로컬에 설치하고 환경을 구축하기 위한 작업을 단축시키기 위해 Docker 이미지를 찾아보았다.
image

공식 이미지는 없어서 가장 Star 개수가 많은 https://hub.docker.com/r/wurstmeister/kafka 을 살펴보기로 했지만, Kafka 자체가 Zookeeper 와 같이 실행해야 작동하므로, 하나의 컨테이너만으로 동작하는 게 아닌 docker-compose를 통해 모듈들을 각각 실행시켜야 했고 내가 원하는 취지와 맞지 않는 것 같아 로컬에 Kafka를 설치하기로 했다.

Kafka 다운로드 및 설치

https://kafka.apache.org/downloads 페이지에 접속하여 바이너리 압축 파일을 원하는 위치에 다운받고 압축을 풀어주기만 하면 바로 카프카를 실행할 수 있는 상태가 된다.

카프카는 스칼라로 구현되어 있어서 다운로드 페이지에서 스칼라 버전에 따라 각각 다른 빌드를 제공하는데, 스칼라를 사용하고 있지 않다면 2.12로 빌드된 압축 파일을 다운받으면 된다.

(나는 2.1.2로 빌드된 Kafka 2.5.0 버전을 설치했다)

압축을 풀게되면 아래와 같이 디렉토리들이 포함되어 있다.

image

  • bin : Zookeeper와 Kafka를 실행시킬수 있는 실행 파일 스크립트가 들어있다

  • config : Kafka를 실행시키는데 필요한 설정 파일들이 포함되어 있다.

Kafka 및 Zookeeper 실행

Kafka는 클러스터 노드 관리를 위해 Zookeeper를 사용한다. 따라서 Kafka를 실행하려면 Zookeeper를 먼저 실행해야 한다. 다운받은 파일에 Zookeeper를 실행하는 스크립트가 포함되어 있으므로 아래 명령어로 Zookeeper 서버를 Foreground로 실행할 수 있다.

bin/zookeeper-server-start.sh config/zookeeper.properties

하지만 Foreground로 실행하면 불편하니까 아래 명령어를 통해서 Background에서 Daemon으로 실행할 수 있다.

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

그 후, Kafka 서버를 마찬가지로 Daemon으로 실행한다.

bin/kafka-server-start.sh -daemon config/server.properties

현재 Listen 중인 포트를 출력해보면 Zookeeper의 기본 포트인 2181과 Kafka의 기본 포트인 9092가 출력된다.

image

Kafka Topic 생성하기

Zookeeper와 Kafka가 잘 실행 되었다면 다음은 Topic을 생성할 차례이다.

Partition 값이 1이고 Replication factor가 1인 “levi”라는 이름을 갖는 토픽을 아래 명령어로 생성할 것이다.

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic levi

그 후, Topic이 잘 생성되었는지는 아래 명령어를 통해 확인할 수 있다.

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Kafka Partition이란?

간단히 설명하자면, Topic은 지정한 Partition 개수만큼으로 나눠진다. Partition 내의 한 칸은 로그라고 불린다. 이는 배열과 비슷하며 데이터는 하나의 index에 순차적으로 append된다고 생각하면 된다.

그리고 메세지의 상대적인 위치를 나타내는게 offset이다, 이는 앞서 말한 offset commit과 관련이 있다.

하나의 Partition으로도 처리할 수 있지만, 처리할 메시지 양이 많아지면 부담이 되므로, 병렬 처리를 위해 여러 개의 Partition을 설정하는 것이다. (멀티쓰레드를 사용하는 것처럼?)

한 번 늘린 Partition은 절대 다시 줄일 수 없다고 하므로 충분히 고려하고 설정해야 한다고 주의를 한다.

Kafka Replication factor, Broker 성질

Replication factor은 복제를 몇 개 하는지에 대한 값이다.

Kafka는 반드시 하나의 Broker, Partition만 Leader가 될 수 있고, 나머지 Broker, Partition들은 Follower가 되는데, Replication factor는 몇 개의 Follower Broker에 Follower Partition을 복제를 해 둘 것인지에 대한 정보이므로 Broker 수보다 절대 값을 높일 수 없다.

무조건 Leader만이 메시지 송수신을 전적으로 담당하고, 만약 죽게될 경우 Follower 중 하나가 Leader가 된다.

이를 통해서 Kafka의 특징인 전달 보증을 확실하게 할 수 있고, 이는 앞서 말한 ack와 관련이 있으며 이 값을 어떻게 조정하냐에 따라 전달 보증 수준과 성능 비용을 고려해 원하는 값으로 설정할 수 있다.

Kafka Producer 생성하기

아래 명령어를 입력하게 되면, Producer 상태가 되어 메시지를 입력할 수 있는 상태가 된다.

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic levi

이 상태에서 메시지를 입력하게 되면, levi 토픽으로 메시지가 발행된다.

image

Kafka Consumer 생성하기

아래 명령어를 입력하게 되면, Consumer 상태가 되어 메시지를 입력할 수 있는 상태가 된다.

기본적으로 컨슈머는 실행 이후부터 메시지를 읽기 때문에 –from-beginning 옵션을 사용해야 이미 발행된 메시지를 읽을 수 있게 된다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic levi --from-beginning

image

Spring Cloud Stream 기반 Kafka 모듈 연동하기

Spring Cloud Stream은 메시지 기반 마이크로서비스 애플리케이션을 만들기위한 프레임워크로 Spring Boot 기반의 DevOps 친화적인 마이크로서비스 애플리케이션을 만들고 메시지 브로커와의 연결을 제공한다.

높은 추상화를 통하여 어떤 인프라인지 알 필요 없이 애플리케이션 설정과 구현만으로 메시징 시스템을 구현할 수 있다.

즉, 동일한 인터페이스를 사용하므로 어떤 메시지 브로커를 선택하더라도 사용하는 입장에서는 사실상 차이가 없다.

image

공식 문서에 따르면

메시지 브로커의 독창적인 구성을 제공하여 여러 미들웨어 공급 업체에 pub/sub, 소비자 그룹 및 파티션 개념을 도입한다. 이 독창적인 구성은 스트림 처리 응용 프로그램을 만드는 기초를 제공한다. 응용 프로그램에 @EnableBinding을 추가하면 메시지 브로커에 즉시 연결되며 메서드에 @StreamListener를 추가하면 스트림 처리를 위한 이벤트가 수신된다.

라고 설명을 하고 있다.

Spring Cloud Stream 기반 프로젝트 생성하기

먼저 Spring Boot 기반 프로젝트를 생성하고 spring-cloud-stream-binder-kafka 의존성을 추가한다.

# build.gradle
plugins {
   id 'org.springframework.boot' version '2.4.13-SNAPSHOT'
   id 'io.spring.dependency-management' version '1.0.11.RELEASE'
   id 'java'
}

repositories {
   mavenCentral()
   maven { url 'https://repo.spring.io/milestone' }
   maven { url 'https://repo.spring.io/snapshot' }
}

dependencies {
   implementation group: 'org.springframework.cloud', name: 'spring-cloud-stream-binder-kafka', version: '3.0.6.RELEASE'
}

Spring Cloud Stream Topic 정의하기 (Output)

아래와 같이 interface를 만들면, 내부적으로는 아무 것도 없는 interface로 동작하게 된다.

public interface Producer {

    String OUTPUT = "levi-producer";

    @Output(Producer.OUTPUT)
    MessageChannel output();

}

하지만 이 형식을 따라서 Binding을 하게 되는데, 이 때 @Output 안의 문자열은 Topic이 된다.

(물론 RabbitMQ나 Kafka나 상관이 없으며, application.yaml에서 Topic 값을 수정하면 덮어씌워진다.)

그리고, @Output어노테이션이 붙게 된 함수 이름은 Bean으로 등록되고 이는 Binding 할 때 중요한 지표가 될 수 있다.

Spring Cloud Stream Topic 정의하기 (Input)

또한 마찬가지로 아래와 같이 interface를 만들면, 마찬가지로 내부적으로는 아무 것도 없는 interface가 생성된다.

public interface Consumer {

    String INPUT = "levi-consumer";

    @Input(Consumer.INPUT)
    SubscribableChannel input();

}

이도 @Output 과 마찬가지로 @Input 안의 문자열은 Topic이 되며, 위에서 생성한 Producer와 함께 실제로 메시지를 송수신 받을 수 있게 된다.

또한 함수 이름도 Bean으로 등록되고 Binding할 때 중요한 지표가 될 수 있다.

Spring Cloud Stream Message Binder 생성하기

위에서 생성한 Producer, Consumer로 실제 Topic에 메시지를 송수신하기 위해 Handler을 만들어야 한다.

Producer, Consumer을 묶어서 사용하기 위한 자식 인터페이스인 Binding 인터페이스를 먼저 생성했다.

public interface Binding extends Producer, Consumer {
}

그리고 클래스 단위에 만들어둔 Binding 인터페이스를 통해 바인딩 활성화를 하게 되면 @StreamListener, @SendTo를 사용할 수 있다.

클래스 단위에 @EnableBinding으로 Binding할 클래스를 활성화시키고, @StreamListener로 levi-consumer topic에 대한 Consumer역할을, @SendTo로 levi-producer topic에 대한 Producer역할을 할 수 있게끔 아래 메서드를 선언한다.

이 handler 메서드는 levi-consumer topic에 메시지가 송신되면 값을 출력하고 levi-producer topic에 메시지를 그대로 수신하는 역할을 하게 될 것이다.

@SpringBootApplication
@EnableBinding(Binding.class)
public class KafkaApplication {

   public static void main(String[] args) {
      SpringApplication.run(KafkaApplication.class, args);
   }

   @StreamListener(Binding.INPUT)
   @SendTo(Binding.OUTPUT)
   public String handle(String value) {
      System.out.println("Message: "+value);
      return value.toUpperCase();
   }

}
  • @StreamListener은 앞서 작성한 Consumer 인터페이스를 통해 동작하며 levi-consumer Topic에 메시지를 수신한다.

  • @SendTo는 앞서 작성한 Producer 인터페이스를 통해 동작하며 levi-producer Topic의 메시지를 송신한다.

Spring Cloud Stream으로 메시지 송수신하기

프로젝트를 실행하면, 인터페이스로 정의했던 levi-producer, levi-consumer Topic들이 생성된 걸 확인할 수 있다. image

먼저 levi-consumer가 잘 동작하는지 확인하기 위해서, Kafka Producer을 만들어 해당 Topic에 메시지를 보낸다. image

그러면 실제로 @StreamListener을 통해 levi-consumer topic에 대한 consumer 역할을 하게 되고, 수신된 메시지가 작성한 코드대로 출력이 된다. image

그렇다면 수신한 메시지가 levi-producer topic으로 producer 역할을 하여 메시지가 잘 수신되는지 확인하기 위해 Kafka Consumer을 만들어 해당 Topic의 메시지를 수신해본다. image

위와 같이 정상적으로 대문자로 모두 변형되어서 Topic에 메시지가 정상적으로 수신되었다는 것을 알 수 있다.

문자열이 아닌 객체로 메시지를 수신하는 방법

만약 메시지가 스트링이 아니라 객체로 받아야 한다면 그에 맞게 클래스를 만들어줘야 한다.

예를 들어, 축구 선수의 정보가 담긴 이름, 포지션, 등번호 에 대한 정보를 포함한 메시지라면 아래와 같이 정의한다.

package com.example.kafka.domain;

public class SoccerPlayer {
    private String name;
    private String position;
    private int backNumber;

    public String getName() { return this.name; }
    public String getPosition() { return this.position; }
    public int getBackNumber() { return this.backNumber; }
}

그리고 핸들러에서 받을 Parameter type을 SoccerPlayer로 변경한다

@StreamListener(Binding.INPUT)
@SendTo(Binding.OUTPUT)
public String handle(SoccerPlayer sp) {
   System.out.printf("Name : %s, Position : %s, BackNumber : %s\n", sp.getName(), sp.getPosition(), sp.getBackNumber());
   return sp.getName();
}

그 후, 메시지가 제대로 수신되는지 확인하기 위해서 levi-consumer Topic으로 객체를 전송해야 하는데, 이 때 무조건 JSON 양식으로 보내야 하며 JSON으로 보내면 알아서 객체에 매핑이 된다. image

이렇게 JSON 양식으로 메시지를 보내게 되면 아래와 같이 메시지를 출력할 수 있다. image

만약 JSON 양식으로 제대로 입력하지 않았다면, Could not read JSON으로 시작하는 오류가 뜨게 된다.

Spring Cloud Stream의 Binder와 Bindings

이때까지 기본적인 Binder와 input-binding, output-binding 을 사용해봤는데 좀 더 자세히 살펴보려고 한다.

먼저 Binder와 Bindings에 대한 정의는 아래와 같다

  • Binder : 메시지 시스템(kafka)과 내부 어플리케이션을 연결하는 접합부이다.

  • Bindings : 메시지 시스템(kafka)과 내부 어플리케이션의 연결이다. (Binder에 의해 만들어지는)

그리고 Spring Cloud Stream은 여러 Binder가 있는 상황을 고려하였기 때문에 여러 메시지 시스템과 매우 쉽게 연결할 수 있다.

기본적인 application.yaml 생성

위와 같이 Binder가 하나인 예제에서는, 자동으로 Kafka Binder에 붙을 수 있어 별다른 설정을 하지 않아도 된다.

하지만 Binder의 정보가 변경된다거나, Topic들에 대한 Consumer와 Producer 설정을 원하는대로 변경하려면 application.yaml 설정파일을 통해 값들을 지정해줘야 한다.

spring.cloud.stream:
  kafka:
    binder:
      brokers: localhost
      defaultBrokerPort: 9092
  bindings:
    input:
      destination: levi-consumer
      group: titan

spring.cloud.stream.bindings 아래에 있는 속성들을 확인해봐야 하는데, 먼저 input 속성은 이전에 @Input, @Output 어노테이션을 붙혀 생성한 함수는 Bean으로 등록된다고 말했었는데, 그 Bean의 이름이다. 즉 Topic명이거나 단순한 환경변수는 아니고, 함수명이 어떻게 되냐에 따라 달라질 수 있는 값이라는 것이다.

그리고 그 아래에 있는 destination속성은 Topic인데, 기본적으로는 지정하지 않아도 자동으로 매핑이 되지만 코드 상에서 String INPUT = “levi-consumer” 를 빼고 싶다거나, 덮어씌우고 싶을 때 값을 지정하면 된다.

여러 개의 메시지 시스템과 연결하기

만약 전혀 다른 Kafka 서버 2대와 연결한다거나, 또는 Kafka와 RabbitMQ를 모두 사용하고 싶다고 가정해본다.

앞서 작성한 application.yaml에 bindings 속성을 통해 Topic에 대한 설정을 할 수 있었는데, 마찬가지로 binders 속성이라는 것이 있다.

spring.cloud.stream:
  bindings:
    input:
      destination: levi-consumer
      group: titan
  binders:
    kafka1:
      type: kafka
      environment:
        spring.cloud.stream.kafka.binder:
           brokers: localhost
           defaultBrokerPort: 9092
    kafka2:
      type: kafka
      environment:
        spring.cloud.stream.kafka.binder:
           brokers: localhost
           defaultBrokerPort: 9093

binders 속성을 사용하게 되면 binder에 대한 속성을 1개가 아닌 배열처럼 여러개를 만들 수 있게 된다.

Categories:

Updated:

Leave a comment