티스토리 뷰

728x90
SMALL

앞서 두 포스팅을 통해 Kafka(Zookeeper)의 아키텍처와 구축/기동 방법에 대해 살펴보았다.

 

Kafka(Zookeeper) 아키텍처
Kafka(Zookeeper) 구축

 

이번 포스팅에서는 실제 Kafka Message Broker를 활용한 Pub/Sub SpringBoot Application 개발 방법에 대해 알아보자.

Publisher Project 개발 가이드

Publisher 구현은 다음과 같은 과정을 거친다.

(build.gradle 구성 > @SpringBootApplication 구성 > Swagger 구성 > application.properties 정의 > KafkaPublisherConfig 정의 > domain(Sender & Reciever Message) 정의 > Controller 구성 > Service 정의)

 

1. build.gradle

gradle 빌드로 dependency를 정의한다. 기본 SpringBoot Application 이외에 Swagger, Testing, Kafka 등이 포함되어 있다.

plugins {
	id 'org.springframework.boot' version '2.2.6.RELEASE'
	id 'io.spring.dependency-management' version '1.0.9.RELEASE'
	id 'java'
}

group = 'kafka.publisher'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
	mavenCentral()
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
	implementation 'org.springframework.boot:spring-boot-starter-aop'
	implementation 'org.springframework.kafka:spring-kafka'

	compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '2.6.1'
	compile group: 'io.springfox', name: 'springfox-swagger2', version: '2.6.1'

	testImplementation('org.springframework.boot:spring-boot-starter-test') {
		exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
	}

	testImplementation 'org.springframework.kafka:spring-kafka-test'
}

test {
	useJUnitPlatform()
}

jar {
	enabled=true
	manifest {
		attributes 'Main-Class' : 'kafka.publisher.KafkaPublisherSpringBootApplication'
	}
}

a. implementation

- spring-boot-starter-web / spring-boot-starter-aop

- spring-kafka

b. compile

- springfox-swagger-ui / springfox-swagger2

c. testImplementation

- spring-boot-starter-test

- spring-kafka-test

 

2. @SpringBootApplication 클래스 정의

 

3. Swagger 구성

Swagger 구성은 다음 URL을 참고한다.

Rest API 관리 (Spring Boot Application + Swagger)

 

4. application.properties 정의

SpringBoot Embedded Tomcat Port 8080과 kafka server 정보(kafka.bootstrap) 등을 정의한다.

또한 kafka server의 topic을 ping.topic.name이라는 필드로 정의하며 topic name은 hello이다. 이 정보는 이후 Subscriber가 offset을 통해 Listen하게 되는 topic 정보이다.

server.port=8080
kafka.bootstrap=localhost:9092
ping.topic.name=hello

 

5. KafkaPublisherConfig 정의

Producer가 Message를 Publishing하기 위한 ProducerConfig를 구성한다.

package kafka.publisher.config;

import kafka.publisher.domain.Ping;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaPublisherConfig {

	@Value(value = "${kafka.bootstrap}")
	private String bootstrap;

	@Bean
	public ProducerFactory<String, Ping> pingProducerFactory() {
		Map<String, Object> configProps = new HashMap<>();
		configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
		configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
		return new DefaultKafkaProducerFactory<>(configProps);
	}

	@Bean
	public KafkaTemplate<String, Ping> pingKafkaTemplate() {
		return new KafkaTemplate<>(pingProducerFactory());
	}
}

a. BOOTSTRAP_SERVERS_CONFIG : 연결할 Kafka Server 주소이다. Multi Cluster 환경일 경우 여러개를 나열하여 구성할 수 있다.

b. KEY_SERIALIZER_CLASS_CONFIG / VALUE_SERIALIZER_CLASS_CONFIG : key-value serializer로 각각 메시지를 serialize할 때 사용할 클래스를 지정한다. ByteArraySerializer, JsonSerializer, StringSerializer 등의 클래스를 지정할 수 있다.

 

6. domain 정의

domain은 송/수신 메시지를 저장하는 Object로써 정의된다. 실제 SpringBoot RestAPI를 정의할 경우 DTO to Domain Model 형태로 접근하지만, 본 가이드에서는 Domain Model만 사용하도록 한다. (View 계층이 없으므로..)

Ping은 전송할 메시지, Pong은 Return 메시지이다.

[Ping.java]
package kafka.publisher.domain;

import java.io.Serializable;

public class Ping implements Serializable {

    private String msg;
    private String name;

    public Ping() {

    }

    public Ping(String msg, String name) {
        this.msg = msg;
        this.name = name;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return msg + ", " + name + "!";
    }
}

[Pong.java]
package kafka.publisher.domain;

public class Pong {
    private String name;
    private String message;

    public Pong(String name, String message) {
        this.name = name;
        this.message = message;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

 

7. Controller 구성

RestContoller를 정의한다.

package kafka.publisher.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import kafka.publisher.domain.Pong;
import kafka.publisher.domain.Ping;
import kafka.publisher.service.PingService;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaPublisherController {

    @Autowired
    PingService pingService;

    @RequestMapping(value = "/publish", method=RequestMethod.POST)
    public Pong pingAndPong(@RequestBody final Ping ping) throws Exception {
        return pingService.pingAndPong(ping);
    }
}

메시지를 발신하고 수신하는 /kafka/publish RequestMapping을 갖는 pingAndPong API를 생성한다.

Controller는 그 역할 그대로 PingService의 비즈니스 로직을 처리하기 위한 Mapping 역할을 한다.

 

8. Service 정의Service에서 실제 비즈니스 로직을 처리하도록 구현한다.

package kafka.publisher.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import kafka.publisher.domain.Pong;
import kafka.publisher.domain.Ping;

@Component
public class PingService{
	@Autowired
    private KafkaTemplate<String, Ping> pingKafkaTemplate;

    @Value(value = "${ping.topic.name}")
    private String pingTopicName;
    
    public Pong pingAndPong(Ping ping) throws Exception {
    	ListenableFuture<SendResult<String, Ping>> future = pingKafkaTemplate.send(pingTopicName, ping);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Ping>>() {
            @Override
            public void onSuccess(SendResult<String, Ping> result) {
                Ping g = result.getProducerRecord().value();
                System.out.println("Sent message=[" + g.toString() + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println( "Unable to send message=[" + ping.toString() + "] due to : " + ex.getMessage());
            }
        });
        return new Pong("Son","Hello~!");
    }
}

위와 같이 Publisher를 정의하고 SpringBoot Application을 기동한다.

728x90

Subscriber Project 개발 가이드

Subscriber 구현은 다음과 같은 과정을 거친다.

(build.gradle 구성 > @SpringBootApplication 구성 > application.properties 정의 > KafkaSubscriberConfig 정의 > domain(Sender & Reciever Message) 정의 > Service 정의)

 

1. build.gradle

gradle 빌드로 dependency를 정의한다. 기본 SpringBoot Application 이외에 Swagger, Testing, Kafka 등이 포함되어 있다.

plugins {
	id 'org.springframework.boot' version '2.2.6.RELEASE'
	id 'io.spring.dependency-management' version '1.0.9.RELEASE'
	id 'java'
}

group = 'kafka.subscriber'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
	mavenCentral()
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
	implementation 'org.springframework.kafka:spring-kafka'
	testImplementation('org.springframework.boot:spring-boot-starter-test') {
		exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
	}
	testImplementation 'org.springframework.kafka:spring-kafka-test'
}

test {
	useJUnitPlatform()
}

jar {
	enabled=true
	manifest {
		attributes 'Main-Class' : 'kafka.subscriber.KafkaSubscriberSpringBootApplication'
	}
}

수신측에는 별도의 swagger 구성이 필요 없으며, 그외의 구성은 Publisher와 동일하다.

 

2. @SpringBootApplication 클래스 정의

 

3. application.properties 정의

SpringBoot Embedded Tomcat Port 8081과 kafka server 정보(kafka.bootstrap) 등을 정의한다.

앞서 Publisher가 정의한 topic name과 동일한 ping.topic.name으로 hello를 정의한다.

server.port=8081
kafka.bootstrap=localhost:9092
ping.topic.name=hello

 

4. KafkaSubscriberConfig 정의

Consumer(=Subscriber)가 Message를 Recieve하기 위한 ConsumerConfig를 구성한다.

package kafka.subscriber.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import kafka.subscriber.domain.Ping;

@EnableKafka
@Configuration
public class KafkaSubscriberConfig {

	@Value(value = "${kafka.bootstrap}")
	private String bootstrap;

	public ConsumerFactory<String, Ping> pingConsumerFactory() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "pong");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

		return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
				new JsonDeserializer<>(Ping.class, false));
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, Ping> pingKafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, Ping> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
		factory.setConsumerFactory(pingConsumerFactory());
		return factory;
	}
}

a. BOOTSTRAP_SERVERS_CONFIG : 연결할 Kafka Server 주소이다. Multi Cluster 환경일 경우 여러개를 나열하여 구성할 수 있다.

b. GROUP_ID_CONFIG : Consumer를 식별하는 고유 아이디이다. 메시지를 전송할 때 지정하는 Topic과는 다른 의미이다. Zookeeper에서는 각 그룹의 메시지 offset을 관리하는데 GROUP_ID가 같으면 offset 역시 공유된다.

c. KEY_SERIALIZER_CLASS_CONFIG / VALUE_SERIALIZER_CLASS_CONFIG : key-value serializer로 각각 메시지를 serialize할 때 사용할 클래스를 지정한다. ByteArraySerializer, JsonSerializer, StringSerializer 등의 클래스를 지정할 수 있다.

d. ENABLE_AUTO_COMMIT_CONFIG : Consumer가 반환한 메시지의 offset을 kafka에 주기적으로 커밋할지 여부를 결정한다. 커밋된 offset은 프로세스가 실패할 때 Consuming이 시작될 위치로 사용된다.

e. AUTO_OFFSET_RESET_CONFIG : Kafka에 초기 offset이 없거나, offset이 범위를 벗어난 경우 수행할 작업을 지정한다.

- earliest : 가장 빠른 오프셋으로 자동 재설정
- latest : 최신 오프셋으로 자동 재설정
- none : 이전 오프셋이 발견되지 않으면 컨슈머 그룹에 예외 발생
- anything else : 컨슈머에게 예외 발생

 

5. domain 정의

domain은 송/수신 메시지를 저장하는 Object로써 정의된다. 실제 SpringBoot RestAPI를 정의할 경우 DTO to Domain Model 형태로 접근하지만, 본 가이드에서는 Domain Model만 사용하도록 한다. (View 계층이 없으므로..)

[Ping.java]

 

package kafka.subscriber.domain;

import java.io.Serializable;

public class Ping implements Serializable {

    private String msg;
    private String name;

    public Ping() {

    }

    public Ping(String msg, String name) {
        this.msg = msg;
        this.name = name;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return msg + ", " + name + "!";
    }
}

 

6. Service 정의

Service에서 실제 비즈니스 로직을 처리하도록 구현한다.

package kafka.subscriber.service;

import org.omg.CORBA.SystemException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import kafka.subscriber.domain.Ping;

@Component
public class KafkaSubscriberService {

    @KafkaListener(topics = "${ping.topic.name}", containerFactory = "pingKafkaListenerContainerFactory")
    public void pingListener(Ping ping, Acknowledgment ack) {
        try {
            System.out.println("Recieved ping message: " + ping);
            ack.acknowledge();
        } catch (Exception e) {
        	String msg = "시스템에 예상치 못한 문제가 발생했습니다";
        	System.out.println("Recieved ping message: " + msg + e);
        }
    }
}

이때 KafkaListener를 통해 hello(application.properties에 정의된 topic.name) topic을 구독하게 되며, offset에 따라 메시지를 수신한 후 비즈니스 로직을 처리한다.

# 중요 : ack.acknowledge()는 모든 비즈니스 로직이 처리된 이후 offset 값을 변경하도록 하여 실패에 대한 offset 변경이 발생하지 않도록 비즈니스 로직 제일 마지막에 처리한다.

728x90

Pub/Sub Test

실제 테스트는 API를 호출하는 Client로 부터 시작하지만, 현재 View가 별도로 구현되어 있지 않은 관계로 Publisher에 구성한 Swagger를 기반으로 테스트를 진행해 보도록하자.

 

1. Swagger UI 접속

http://localhost:8080/swagger-ui.html

 

 

 

2. API Test

위와 같이 Swagger-UI에 접근하여 아래와 같이 kafka-publisher-controller API를 테스트할 수 있다.

 

 

API Parameter에 message와 name을 다음과 같이 입력하고 Try it out! 버튼을 클릭하면 다음과 같이 결과를 Return 받을 수 있다.

 

 

 

3. 결과 확인

먼저 Publisher 측 Log를 확인한다.

Sent message=[Son, Hi!] with offset=[1]
Sent message=[Son, Hi!] with offset=[2]
Sent message=[Glad to see you, Nara!] with offset=[3]

로그는 위와 같이 message와 offset 정보를 출력하고 있다.

2번에서 발신한 Glad to see you, Nara!라는 메시지와 앞서 전송한 2번의 메시지가 있어 offset 3으로 메시지가 발생된 것을 확인할 수 있다.

이는 앞서 아키텍처에서도 확인했지만, Topic Name이 hello인 hello-0 partition에 메시지가 3번째 쌓이고 있으며, 이는 Subscriber가 수신할 수 있는 상태이다.

다음으로 Subscriber 측 Log를 확인해 보자.

Recieved ping message: Son, Hi!
Recieved ping message: Son, Hi!
Recieved ping message: Glad to see you, Nara!

다음과 같이 Subscriber 측에서 메시지를 수신한 것을 확인할 수 있다.

마지막으로 Subscriber를 다운한 상태로 메시지를 연속으로 발신한 후 한번에 수신하는지 여부를 확인해 보자.

Publisher는 다음과 같이 offset 10번까지 메시지를 발신하였고,

Sent message=[Son, Hi!] with offset=[1]
Sent message=[Son, Hi!] with offset=[2]
Sent message=[Glad to see you, Nara!] with offset=[3]
Sent message=[Glad to see you, Nara!] with offset=[4]
Sent message=[Glad to see you, Nara!] with offset=[5]
Sent message=[Glad to see you, Nara!] with offset=[6]
Sent message=[Glad to see you, Nara!] with offset=[7]
Sent message=[Glad to see you, Nara!] with offset=[8]
Sent message=[Glad to see you, Nara!] with offset=[9]
Sent message=[Glad to see you, Nara!] with offset=[10]

Subscriber는 다운 상태이다.

 

 

Subscriber를 기동하고 상태를 확인해 보자.

기동 로그에서 다음과 같은 로그를 확인할 수 있을 것이다.

2020-09-30 00:19:00.775  INFO 326540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=pong] Setting offset for partition hello-0 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=DESKTOP-SDT0PVK:9092 (id: 0 rack: null), epoch=0}}

시작점인 offset이 5임을 알 수 있고 현재 Publisher가 offset 10번까지 메시지를 전송하였으니 이 정보를 기반으로 총 6개의 메시지가 수신될 것을 기대할 수 있다.

 

 

위와 같이 메시지가 총 6개 수신된 것을 확인할 수 있다.

여기서 기억해 두어야 할 점은 앞서 Kafka 아키텍처에서 살펴보았듯이, Kafka는 Group 당 하나의 메시지를 가져간다는 점이다. 이로 인해 동일 그룹의 Subscriber가 추가되면 메시지는 그 중 하나의 수신자에게만 전달된다.

public ConsumerFactory<String, Ping> pingConsumerFactory() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "pongpong");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

		return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
				new JsonDeserializer<>(Ping.class, false));
	}

다만 위와 같이 GROUP_ID_CONFIG를 변경하여 서로 다른 Group에 존재할 경우 각 Subscriber는 모두 메시지를 수신한다.

728x90

결론

이와 같이 Kafka Message Broker를 이용한 Pub/Sub 테스트를 진행하였다. Pub/Sub 구조의 가장 큰 특징은 Broker가 직접 Consumer에게 메시지를 전송하는 것이 아닌 Consumer가 Broker를 구독하고 있다가 offset이 update되면 직접 pulling한다는 점이다. 이로 인해 offset을 조정하여 원하는 데이터만 추출해 올 수 있도록 구현할 수도 있다는 특징이 있다.

Kafka가 RabbitMQ와 달리 AMQP 프로토콜이나 Java Message Service를 지원하지 않지만, TCP 기반의 단순한 발신으로 오버헤드를 감소 시켜 대용량 발신에 포커싱이 맞추어진 Message Broker라 할 수 있다.
Kafka와 같은 메시지 브로커를 통해 비동기 통신으로 마이크로서비스가 갖고 있는 수직적 구조가 갖고 있는 성능 제약사항을 일부 극복할 수 있지만 마찬가지로 고려해야 할 사항도 다수 존재한다.
예를 들어 Kafka가 대용량 처리에 적합한 메시지 브로커라고 소개했는데 이는 어디까지나 최적화된 아키텍처와 개발이 적용되었을때의 이야기이다.
대용량 데이터를 전송하기 위해서는 처리시간이 보장되는 안정된 성능을 확보할 수 있는 메시지 전송 구조가 확보되어야 하며, 무엇보다 데이터 전송에 발생하는 시스템 리소스와 네트워크 비용을 고려해야한다.
또한 데이터의 일관성과 데이터 순서보장 등에 대한 고려가 충분히 이루어져야 비로서 Kafka의 가치가 나타나게 될 것이다.

 

Kafka 테스트 코드는

github.com/sonnaraon/kafka-sample

sonnaraon/kafka-sample

Son.Nara Kafka Sample (SpringBootApplication). Contribute to sonnaraon/kafka-sample development by creating an account on GitHub.

github.com

에서 다운로드 받을 수 있다.

728x90
LIST
댓글
댓글쓰기 폼