본문 바로가기

기타

[Kafka] SpringBoot 에서 Kafka 연동하기

회사에서 MQ(Message Queue)를 사용하여 요청을 순차적으로 처리하는 방법에 대해 알게 되었다.  알려진 MQ 오픈소스  중, 그동안 가장 많이 들어왔던 Kafka로 이벤트 스트리밍 환경을 구성해보고자 했다.

 

 

Kafka란?

Apache의 오픈 소스 분산 이벤트 스트리밍 플랫폼이다. 이벤트를 지속적으로 발행(publish-write), 구독(subscribe-read)하고 저장(store)할 수 있으며, 저장된 이벤트 스트림은 발생 직후 또는 이후에 처리(process)할 수 있다. 

 

메시지 큐를 사용함으로써, 발행된 메시지는 모두 큐에 저장되고 결국에 소비자에게 전달된다는 보장성을 가진다. 또한, 메시지 발행과 메시지 사용 단이 분리되어 발신자와 수신자 서로가 의존하지 않으므로, 독립적으로 확장될 수 있다는 장점이 있다.

 

 

- Kafka 구조

이미지 출처 : https://ifuwanna.tistory.com/487

  • 주키퍼 (ZooKeeper) : 카프카를 띄우기 위해 반드시 실행되어야함. 카프카의 메타데이터들을 저장한다.
  • 카프카 클러스터 (KafkaCluster) : 브로커들의 모임. 확장성과 고가용성을 위해 broker들을 클러스터로 구성.
  • 브로커 : 각각의 카프카 서버로, 동일 노드에 여러 브로커를 띄울 수 있다.
  • 프로듀서 : 메시지(이벤트)를 발행하여 생산하는 주체
  • 컨슈머 : 메시지(이벤트)를 구독하여 소비하는 주체

 

 

- 토픽(Topic)

이미지 출처 : https://ifuwanna.tistory.com/487

카프카에 저장되는 메시지는 TOPIC으로 분류되고, 토픽은 여러 개의 파티션으로 나누어진다.

 

  • Topic : 메시지를 구분하는 단위(메시지를 담는 폴더) 이다
  • Partition : 메시지를 저장하는 물리적인 파일. 추가만 가능하다.
  • offset : 파티션 내에 메시지가 저장된 위치이다. 메시지는 파티션의 맨 뒤에 추가되며 순차적으로 처리된다.

 

 

- Kafka 동작 과정

1. Producer가 메시지를 발행 후 전송한다. >> 브로커는 Topic에 메시지를 저장한다.

  • 메시지 전송 시, Topic을 지정한다.
  • 같은 키를 같는 메시지는 같은 파티션에 저장되며 순서가 유지된다.

 

2. Consumer가 메시지를 구독하고, 소비한다. 

  • 각 Consumer들은 Consumer Group이라는 논리적 그룹에 속한다.
  • Topic의 파티션은 컨슈머 그룹과 1:N 매칭 관계로 컨슈머 그룹 내 한 개의 컨슈머만 연결 가능하다.

 

 

SpringBoot와 Kafka 연동

- Kafka 실행 환경

Kafka 서버를 실행하기 위해서, zookeeper를 먼저 실행하고 kafka를 실행해야 한다.

 

로컬에서 Docker를 사용하여 zookeeper와 kafka 컨테이너를 띄웠다.

 

이미지 실행을 위해서 docker-compose를 사용하여 zookeeper가 실행된 뒤에 kafka가 실행되도록 구성했다.

 

docker.compose.yml

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

 

자세한 가이드라인이 필요하다면 아래 블로그를 참고하길 바란다.

 

[Docker] Kafka 환경 구축하기

이 글에서는 local 환경에서 Docker를 이용하여 Kafka 서버를 구축하는 방법에 대해 알아보겠습니다. 01. 도커 이미지 선택 현재(2020-08-26 기준) Kafka 이미지는 공식 버전이 없으므로 Star가 제일 많은 이

victorydntmd.tistory.com

 

 

- 프로젝트 구조

 

application.yml

spring:
  kafka:
    consumer:
      #카프카 클러스터에 연결에 사용할 호스트:포트
      bootstrap-servers: localhost:9092
      #컨슈머 그룹 식별자
      group-id: consumers

      #offset이 카프카 서버에 없을 때 수행할 작업.
      #오류 등으로 인해 offset 정보(다음에 소비할 메시지 위치)가 사라졌을 때 어떻게 다시 지정할 지 설정
      # auto-offset-reset - 가장 오래된 메시지로 offset reset
      auto-offset-reset: earliest

      # 데이터를 수신할 때, key/value를 역직렬화 해서 String 데이터로 받아온다. 데이터 타입에 따라 설정 가능
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    producer:
      # 컨슈머와 동일
      bootstrap-servers: localhost:9092
      # 데이터를 보낼 때 역직렬화
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

application.yml 에 consumer와 producer에 대한 설정을 해준다. 카프카 클러스터에 연결할 호스트:포트 를 적어주고, 컨슈머 그룹 식별자를 선언해준다.

 

 

KafkaController.java

package com.example.kafkatest.controller;

import com.example.kafkatest.service.KafkaConsumerService;
import com.example.kafkatest.service.KafkaProducerService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaContoller {
    private final KafkaProducerService kafkaProducerService;

    // 메시지 생성
    @PostMapping( "/send")
    public String sendMessage(@RequestParam("message") String message){
        kafkaProducerService.sendMessage(message);

        return "success";
    }
}

컨트롤러에서는 producer에게 메시지를 보내라는 요청을 전달한다.

 

 

KafkaProducerService.java

package com.example.kafkatest.service.impl;

import com.example.kafkatest.service.KafkaProducerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducerServiceImpl implements KafkaProducerService {
    private static final String TOPIC = "exam";
    private final KafkaTemplate<String,String> kafkaTemplate;
    @Override
    public void sendMessage(String message) {
        // 템플릿으로 토픽 생성. 메시지 전달.
        log.info(String.format("Produce message : %s", message));
        kafkaTemplate.send(TOPIC, message);
    }
}

 

producer는 메시지를 저장할 Topic을 지정하고, KafkaTemplate에 토픽명과 메시지를 전달한다.

send() 가 실행되며, 데이터가 카프카 클러스터에 전달되어 "exam" 토픽에 메시지가 저장된다. 

 

 

KafkaConsumerService.java

package com.example.kafkatest.service.impl;

import com.example.kafkatest.service.KafkaConsumerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumerServiceImpl implements KafkaConsumerService {
    @Override
    @KafkaListener(topics = "exam", groupId = "consumers")
    //메시지를 보내면, 해당 메시지의 토픽을 지정해둔 컨슈머에 메시지가 전달된다.
    public void consume(String message) {
        log.info(String.format("Consumed message : %s", message));
    }
}

메시지가 전달되면, @KafkaListener를 걸어둔 메서드가 실행된다. 메시지를 가져올 topic 명과, 컨슈머 그룹을 적어준다.  @KafkaListners 로 여러개의 @KafkaListener 를 걸어줄 수도 있다.

 

 

간단한 테스트 용이므로, 단일 브로커만 사용했고, 파티션도 따로 지정해주지 않았다. 좀 더 복잡한 구조로 메시지를 주고 받는 예제도 만들어봐야할 것 같다. 

 

 

- 실행 결과

스프링 부트를 실행하면, 아래와 같이 카프카 연결이 잘 된 것을 확인할 수 있다.

 

포스트맨으로 간단한 메시지를 작성하여 실행했다.

 

Postman

 

 

Console

 

포스트맨에서 sendmessage API를 호출하였고, Producer에서 kafka 클러스터에 메시지를 만들어 전달하자마자, Consumer가 topic에서 메시지를 가져와 consume() 을 실행하였다.

 

 

 

참고 블로그

 

[Kafka] 카프카란? 주요개념 및 용어 소개

카프카(Kafka)란? Apache Kafka는 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 오픈 소스 분산 이벤트 스트리밍 플랫폼(distributed event streaming platform)

ifuwanna.tistory.com

 

[SpringBoot] Kafka 연동하기

이 글에서는 Springboot에 Kafka를 연동하는 방법에 대해 알아보겠습니다. Kafka가 궁금하시다면 이글을 참고해주세요! 이 글에서 다루는 내용은 다음과 같습니다. Springboot에서 Kafka의 특정 Topic에 메

victorydntmd.tistory.com