Apache Kafka 대충 사용해보기 (1)

카프카를 대충 사용해보기 위해 알아야할 메세지 송수신 구조와 기본 용어를 설명합니다.


Kafka 소개

카프카란? 카프카는 자신을 분산 이벤트 스트리밍 플랫폼이라고 소개하고 있으며 고성능을 강조

여러 대의 분산 서버에서 대량의 데이터를 처리하는 분산 메시징 시스템

메시지를 받고, 보내기 위해 사용되며 여러 시스템과 장치를 연결하는 중요한 역할을 함

Kafka 구성 요소

카프카의 주요 구성 요소는 아래와 같다.

첫 번째는 카프카 클러스터 (메세지를 저장하는 저장소), 하나의 클러스터는 여러 개의 브로커로 구성이 됨. 브로커는 각각의 서버라고 보면 됨. 브로커는 메세지를 나눠서 저장하고 이중화 처리도 하고 장애가 나면 대체도 함.

두 번째는 주키퍼 클러스터, 카프카 클러스터를 관리하기 위해서 주키퍼 앙상블이 필요하고 주키퍼 속에 카프카 클러스터와 관련된 정보가 기록이 되고 관리가 됨.

그리고 프로듀서와 컨슈머, 프로듀서는 메세지를 카프카에 넣는 역할을 하고 컨슈머는 반대로 카프카에서 메세지를 읽어오는 역할을 함. image

마지막으로 토픽, 토픽은 메세지를 종류 별로 관리하는 스토리지 역할을 하며 브로커에 배치되어 관리됨. 특정 토픽을 지정하여 메세지를 송수신함으로써 단일 카프카 클러스터에서 여러 유형의 메세지를 중계 가능.

Kafka Topic

토픽은 위에서 말했듯이, 메세지를 구분하는 단위가 되고 이는 파일시스템의 폴더와 유사함.

예를 들면 로그용 토픽, 메타 연계용 토픽, 알림용 토픽 등 유형에 맞게 나누어 사용할 수 있음.

한 개의 토픽은 한 개 이상의 파티션으로 구성되는데 이 때 파티션은 메세지를 저장하는 물리적인 파일을 의미함.

따라서 프로듀서는 메세지를 카프카에 저장할 때 특정 토픽에 요청을 하는 것이고 컨슈머는 메세지를 특정 토픽에서 수신 요청을 하는 것임. 토픽을 기준으로 서로 송수신한다.

Kafka Partition

한 개의 토픽은 한 개 이상의 파티션으로 구성된다고 말했는데, 쉽게 말해 토픽이 폴더라면 파티션은 파일이라고 생각하면 된다.

파티션은 메세지를 저장하는 물리적인 파일이며 흔히 로그 파일을 쌓는 것처럼 파티션도 그와 같은 역할을 한다고 생각하면 된다.

단순히 Kafka를 프로듀싱, 컨슈밍 용으로 사용하는 입장이라면 각 파티션을 브로커에 어떻게 배치하는가에 대한 정보는 알 수 없다. 또한 프로듀서 및 컨슈머는 파티션들을 은폐해서 오로지 토픽만을 지정하여 통신하기 때문에 파티션을 의식할 필요도 없다. (파티션이 생소한 이유)

또한 파티션은 append-only 파일이기에 프로듀서가 넣은 메세지는 파티션의 맨 뒤에 추가가 되고 파티션에 적재된 메세지는 수동으로 삭제할 수가 없다. (설정에 따라 일정 시간이 지난 뒤에 삭제됨)

Kafka Offset

파티션에 각각의 메세지가 저장되는 위치를 offset이라고 한다.

프로듀서가 메세지를 카프카에 저장하면 각 파티션에서 수신한 메세지에는 차례대로 일련번호가 부여되어 있어 파티션 단위로 메세지 위치를 나타내는 offset이라는 관리 정보를 통해 컨슈머가 취득하는 메세지의 범위 및 재시도를 제어할 수 있다. 제어에 사용되는 offset에는 다음과 같은 종류가 있다.

  • Log-End-Offset(LEO): 파티션 데이터의 끝을 나타낸다 (파티션에 관한 정보로 업데이트)
  • Current Offset: 컨슈머가 어디까지 메세지를 읽었는가를 나타낸다. (컨슈머에서의 데이터 취득을 계기로 업데이트)
  • Commit Offset: 컨슈머가 어디까지 커밋했는지를 나타낸다. (컨슈머 그룹마다 보관되어 업데이트)

컨슈머는 offset 기준으로 메세지를 순서대로 읽는다. 섞어서 읽지 못하고 반드시 순서대로 읽는다.

여기서 궁금증이 생긴 게, 각각의 파티션은 append-Only 방식으로 메세지를 기록한다 했고, 내부적으로 offset 이라는 개념을 통해 순서대로 읽는다고 하는데 왜 메시지의 순서가 보장되지 않는 것일까?

그 이유는 하나의 토픽에는 여러 개의 파티션이 존재할 수 있기 때문이다. 카프카는 파티션 간의 순서는 보장하지 않는다.

아래 그림에서 볼 수 있듯, 메세지는 파티션 내에서 유의미한 순서를 가지고 이를 보장하고 있지만 파티션 간에는 보장하고 있지 않다.

R1280x0

그렇다면 순서가 완벽히 보장이 필요하다면 어떻게 할 수 있을까? (앞서 말씀드렸듯이, 파티션은 유의미한 순서를 가지기 때문에 1개의 파티션만을 사용하거나 또는 프로듀서가 메세지를 전송할 때 파티션의 키를 같이 전송해서 파티션을 지정할 수 있다)

하지만 키가 제대로 분산이 되지 않는 경우에는 브로커의 장애로 이어질 수 있다.

Kafka Consumer Group

컨슈머는 컨슈머 그룹이라는 곳에 속하게 된다. 컨슈머가 카프카 브로커에 연결할 때 어떤 컨슈머 그룹에 속하는지 지정하게 돼있다.

여기서 중요한 점은 한 개의 파티션(토픽 아님)은 해당 컨슈머 그룹의 한 개의 컨슈머에만 연결이 가능하다는 것이다.

이를 통해 한 컨슈머그룹 기준으로 파티션의 메세지는 순서대로 처리된다는 것을 보장할 수 있게 된다.

(아래 그림에서 볼 수 있듯이, 컨슈머그룹에 속한 각 컨슈머들은 동일한 파티션에 대해서 공유할 수가 없다.)

99EFEE475C64B58C27

여기서 나는, 컨슈머 그룹이라는 개념이 꼭 필요한지에 대해 궁금증이 생겼다.

컨슈머 그룹이 필요한 이유는 무엇일까? 이를 알기 위해서는 Kafka가 어떤 메세징 모델을 지원하는 지 알아야 한다.

Kafka가 지원하는 메세징 모델

메시징 모델은 크게 2가지로 나뉘게 되는데, 큐잉 모델과 펍/섭 모델이 있다.

먼저, 큐잉 모델은 프로듀서의 메시지를 브로커의 큐에 저장해두고 그 큐에 있는 메시지를 여러 개의 컨슈머가 추출할 수 있어 병렬로 처리를 할 수 있고, 그렇게 추출된 메시지는 사라지므로 다른 컨슈머에서 처리할 수 없다는 특징이 있다.

i.e. 브로커의 메시지 입장에서 보면 처리 능력을 높이는 데에 장점이 있다.

그리고, 펍/섭 모델은 프로듀서를 퍼블리셔, 컨슈머를 섭스크라이버라고 하는데, 퍼블리셔는 브로커에 있는 토픽이라는 카테고리에 메시지를 등록해두고, 섭스크라이버는 구독하고 있는 토픽에 저장되는 메시지를 받게 되며 큐잉 모델과 달리 추출된 메시지를 다른 컨슈머에서도 처리할 수 있다는 특징이 있다.

i.e. 복수의 컨슈머에 메시지를 전달할 수 있는 장점이 있다.

컨슈머 그룹이 필요한 이유는, 바로 펍/섭 모델과 관련이 있다.

실제 Production 환경에서 운영하는 도중에 컨슈머 인스턴스 A에 장애가 발생했다고 가정해보자. A는 제 역할을 못할 것이고 데이터를 수신하는 과정도 중단될 것이다.

만약 백업용 컨슈머 인스턴스 B가 A를 대체한다면, 기존에 A가 처리하고 있던 offset을 B가 이어받아서 처리할 것이고, A의 장애가 복구되더라도 offset은 A가 가장 마지막에 가지고 있던 offset이 아닌 B가 마지막에 가지고 있던 offset을 가지게 될 것이다.

그렇게 된다면 펍/섭 모델을 구현할 수가 없게 된다.

이 모델을 실현하기 위해서 컨슈머 그룹이라는 개념을 도입한 것이고, 같은 컨슈머 그룹 내에 백업용 컨슈머 인스턴스를 둠으로써 컨슈머 그룹 각각 동일한 파티션에서 메세지를 수신하더라도 다른 offset을 가지며 펍/섭 모델 식의 처리가 가능한 것이다.

kafka-architecture-kafka-consumer-groups

Kafka 성능

Kafka는 성능이 좋다. 그 성능이 왜 좋은지 이유를 살펴본다.

  1. 파티션 파일은 OS가 제공하는 페이지캐시를 사용한다.
    • 파티션에 대한 파일 IO를 메모리에서 처리되기 때문에 빨라짐
    • 서버에서 페이지캐시를 카프카만 사용해야 성능에 유리해짐
  2. Zero Copy라는 걸 사용한다.
    • 디스크에서 데이터를 읽어서 네트워크에 보내는 속도가 빠름
  3. 브로커가 컨슈머에 대해서 하는 일이 별로 없다.
    • 메세지를 필터링하거나 재전송하는 일을 브로커가 하지 않음 (프로듀서, 컨슈머가 직접 함)
  4. 프로듀싱, 컨슈밍 시 선택 옵션을 통해서 제어할 수 있는 값이 많다.
    • 프로듀서: 일정 크기만큼 모아서 전송할 수 있다, 브로커 상황에 따라 딜레이를 줄 수 있다.
    • 컨슈머: 일정 크기만큼 모아서 수신할 수 있다, 트랜잭션의 범위를 지정할 수 있다.
  5. 유연하게 브로커, 파티션, 컨슈머를 추가하여 처리량을 증대할 수 있다.

Kafka Replication

Kafka는 장애가 났을 때 이를 대처하기 위해서 레플리카를 사용한다.

파티션은 단일 또는 여러 개의 레플리카로 구성되어 토픽 단위로 레플리카 수를 지정할 수 있다. (토픽의 총 파티션 수 = 파티션 수 X 레플리카 수)

레플리카 중 하나는 Leader이며, 나머지는 모두 Follower라고 한다. Follower는 Leader로부터 메세지를 계속적으로 취득하여 복제를 유지하도록 동작하며 Leader가 속한 브로커가 장애가 생길 때 Follower 중 하나가 Leader가 된다.

프로듀서/컨슈머의 데이터 교환은 모두 Leader가 처리한다.

replication 004-600x450

Go로 Kafka Interface 만들기

Kafka만으로 topic, partition, producer, consumer 등 다양한 설정들 및 테스트를 할 수 있지만 Go로 Kafka Interface를 직접 만들면서 사용해보려고 한다.

1. Kafka 및 Zookeeper 실행

설치는 생략함(만약 도커를 사용하려 한다면, 카프카 외에 주키퍼가 필요해서 도커 컴퍼즈를 사용해야 하니 웬만하면 주키퍼가 포함된 기본 Kafka를 설치하는 것 추천)

Kafka를 설치한 경로에서 아래 명령어 각각 실행한다.

# Background로 Zookeeper Cluster 실행, 기본 포트는 2181임.
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

# Background로 Kafka Cluster 실행, 기본 포트는 9092임.
bin/kafka-server-start.sh -daemon config/server.properties

2. Kafka Interface Domain

package kafka_interface

import (
	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/compress"
)

type brokers []string

type KafkaSetting struct {
	Network  string
	Url      string
	Broker   brokers
	Topic    *topic
	Producer *producer
	Consumer *consumer
}

type topic struct {
	Name              string
	NumPartition      int
	ReplicationFactor int
}

type producer struct {
	BatchSize   int 
	MaxAttempts int 
	Compression kafka.Compression
}

type consumer struct {
	MinBytes int
	MaxBytes int
}

func GetDefaultKafkaSetting() *KafkaSetting {
	return &KafkaSetting{
		"tcp",
		"localhost:9092",
		brokers{"localhost:9092"},
		&topic{
			"levi-topic",
			1,
			1,
		},
		&producer{
			16384,
			1,
			compress.Lz4,
		},
		&consumer{
			0,
			10e6,
		},
	}
}

3. Kafka Topic 생성

Topic을 생성하는 명령어와 로직이다.

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic levi-topic
func CreateTopic(kafkaSetting *KafkaSetting, autoCreateEnable bool) {
	if autoCreateEnable {
		_, err := kafka.DialLeader(context.Background(), kafkaSetting.Network, kafkaSetting.Url, kafkaSetting.Topic.Name, kafkaSetting.Topic.NumPartition)
		if err != nil {
			panic(err.Error())
		}
	} else {
		conn, err := kafka.Dial(kafkaSetting.Network, kafkaSetting.Url)
		if err != nil {
			panic(err.Error())
		}
		defer conn.Close()

		controller, err := conn.Controller()
		if err != nil {
			panic(err.Error())
		}
		var controllerConn *kafka.Conn
		controllerConn, err = kafka.Dial(kafkaSetting.Network, net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
		if err != nil {
			panic(err.Error())
		}
		defer controllerConn.Close()


		topicConfigs := []kafka.TopicConfig{
			{
				Topic:             kafkaSetting.Topic.Name,
				NumPartitions:     kafkaSetting.Topic.NumPartition,
				ReplicationFactor: kafkaSetting.Topic.ReplicationFactor,
			},
		}

		err = controllerConn.CreateTopics(topicConfigs...)
		if err != nil {
			panic(err.Error())
		}
	}
}

4. Kafka Topic List

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

func GetTopics(kafkaSetting *KafkaSetting) map[string]struct{}{
	conn, err := kafka.Dial(kafkaSetting.Network, kafkaSetting.Url)
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	partitions, err := conn.ReadPartitions()
	if err != nil {
		panic(err.Error())
	}

	m := map[string]struct{}{}

	for _, p := range partitions {
		m[p.Topic] = struct{}{}
	}

	return m
}

func main() {
    topic := kafka_interface.GetTopics(kafkaSetting)
	for k := range topic {
		fmt.Println(k)
    }
}

5. Kafka Consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic levi-topic
func CreateConsumer(kafkaSetting *KafkaSetting) *kafka.Reader {
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{kafkaSetting.Url},
		Topic:     kafkaSetting.Topic.Name,
		Partition: 0,
		MinBytes:  kafkaSetting.Consumer.MinBytes, //10KB
		MaxBytes:  kafkaSetting.Consumer.MaxBytes, //10MB
	})
}

func main() {
    consumer := kafka_interface.CreateConsumer(kafkaSetting)
        consumer.SetOffset(0)
        defer consumer.Close()
        for {
            data, err := consumer.ReadMessage(context.Background())
            if err != nil {
                break
            }
            fmt.Printf("message at offset %d: %s = %s\n", data.Offset, string(data.Key), string(data.Value))
        }
}

6. Kafka Producer

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic levi-topic
func CreateProducer(kafkaSetting *KafkaSetting) *kafka.Writer {
	return &kafka.Writer{
		Addr:        kafka.TCP(kafkaSetting.Url),
		Topic:       kafkaSetting.Topic.Name,
		BatchSize:   kafkaSetting.Producer.BatchSize,
		MaxAttempts: kafkaSetting.Producer.MaxAttempts,
		Compression: kafkaSetting.Producer.Compression,
		Balancer:    &kafka.LeastBytes{},
	}
}

func main() {
    producer := kafka_interface.CreateProducer(kafkaSetting)
	defer producer.Close()
	err := producer.WriteMessages(context.Background(),
		kafka.Message{
			Value: []byte("Hello World!"),
		},
		kafka.Message{
			Value: []byte("One!"),
		},
		kafka.Message{
			Value: []byte("Two!"),
		},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}
}

Categories:

Updated:

Leave a comment