티스토리 뷰
앞서 두 포스팅을 통해 Kafka(Zookeeper)의 아키텍처와 구축/기동 방법에 대해 살펴보았다.
Kafka(Zookeeper) 아키텍처
Kafka(Zookeeper) 구축
이번 포스팅에서는 실제 Kafka Message Broker를 활용한 Pub/Sub SpringBoot Application 개발 방법에 대해 알아보자.
Publisher Project 개발 가이드
Publisher 구현은 다음과 같은 과정을 거친다.
(build.gradle 구성 > @SpringBootApplication 구성 > Swagger 구성 > application.properties 정의 > KafkaPublisherConfig 정의 > domain(Sender & Reciever Message) 정의 > Controller 구성 > Service 정의)
1. build.gradle
gradle 빌드로 dependency를 정의한다. 기본 SpringBoot Application 이외에 Swagger, Testing, Kafka 등이 포함되어 있다.
plugins { id 'org.springframework.boot' version '2.2.6.RELEASE' id 'io.spring.dependency-management' version '1.0.9.RELEASE' id 'java' } group = 'kafka.publisher' version = '0.0.1-SNAPSHOT' sourceCompatibility = '1.8' repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-aop' implementation 'org.springframework.kafka:spring-kafka' compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '2.6.1' compile group: 'io.springfox', name: 'springfox-swagger2', version: '2.6.1' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'org.springframework.kafka:spring-kafka-test' } test { useJUnitPlatform() } jar { enabled=true manifest { attributes 'Main-Class' : 'kafka.publisher.KafkaPublisherSpringBootApplication' } }
a. implementation
- spring-boot-starter-web / spring-boot-starter-aop
- spring-kafka
b. compile
- springfox-swagger-ui / springfox-swagger2
c. testImplementation
- spring-boot-starter-test
- spring-kafka-test
2. @SpringBootApplication 클래스 정의
3. Swagger 구성
Swagger 구성은 다음 URL을 참고한다.
Rest API 관리 (Spring Boot Application + Swagger)
4. application.properties 정의
SpringBoot Embedded Tomcat Port 8080과 kafka server 정보(kafka.bootstrap) 등을 정의한다.
또한 kafka server의 topic을 ping.topic.name이라는 필드로 정의하며 topic name은 hello이다. 이 정보는 이후 Subscriber가 offset을 통해 Listen하게 되는 topic 정보이다.
server.port=8080 kafka.bootstrap=localhost:9092 ping.topic.name=hello
5. KafkaPublisherConfig 정의
Producer가 Message를 Publishing하기 위한 ProducerConfig를 구성한다.
package kafka.publisher.config; import kafka.publisher.domain.Ping; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaPublisherConfig { @Value(value = "${kafka.bootstrap}") private String bootstrap; @Bean public ProducerFactory<String, Ping> pingProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Ping> pingKafkaTemplate() { return new KafkaTemplate<>(pingProducerFactory()); } }
a. BOOTSTRAP_SERVERS_CONFIG : 연결할 Kafka Server 주소이다. Multi Cluster 환경일 경우 여러개를 나열하여 구성할 수 있다.
b. KEY_SERIALIZER_CLASS_CONFIG / VALUE_SERIALIZER_CLASS_CONFIG : key-value serializer로 각각 메시지를 serialize할 때 사용할 클래스를 지정한다. ByteArraySerializer, JsonSerializer, StringSerializer 등의 클래스를 지정할 수 있다.
6. domain 정의
domain은 송/수신 메시지를 저장하는 Object로써 정의된다. 실제 SpringBoot RestAPI를 정의할 경우 DTO to Domain Model 형태로 접근하지만, 본 가이드에서는 Domain Model만 사용하도록 한다. (View 계층이 없으므로..)
Ping은 전송할 메시지, Pong은 Return 메시지이다.
[Ping.java] package kafka.publisher.domain; import java.io.Serializable; public class Ping implements Serializable { private String msg; private String name; public Ping() { } public Ping(String msg, String name) { this.msg = msg; this.name = name; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return msg + ", " + name + "!"; } } [Pong.java] package kafka.publisher.domain; public class Pong { private String name; private String message; public Pong(String name, String message) { this.name = name; this.message = message; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
7. Controller 구성
RestContoller를 정의한다.
package kafka.publisher.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import kafka.publisher.domain.Pong; import kafka.publisher.domain.Ping; import kafka.publisher.service.PingService; @RestController @RequestMapping(value = "/kafka") public class KafkaPublisherController { @Autowired PingService pingService; @RequestMapping(value = "/publish", method=RequestMethod.POST) public Pong pingAndPong(@RequestBody final Ping ping) throws Exception { return pingService.pingAndPong(ping); } }
메시지를 발신하고 수신하는 /kafka/publish RequestMapping을 갖는 pingAndPong API를 생성한다.
Controller는 그 역할 그대로 PingService의 비즈니스 로직을 처리하기 위한 Mapping 역할을 한다.
8. Service 정의Service에서 실제 비즈니스 로직을 처리하도록 구현한다.
package kafka.publisher.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import kafka.publisher.domain.Pong; import kafka.publisher.domain.Ping; @Component public class PingService{ @Autowired private KafkaTemplate<String, Ping> pingKafkaTemplate; @Value(value = "${ping.topic.name}") private String pingTopicName; public Pong pingAndPong(Ping ping) throws Exception { ListenableFuture<SendResult<String, Ping>> future = pingKafkaTemplate.send(pingTopicName, ping); future.addCallback(new ListenableFutureCallback<SendResult<String, Ping>>() { @Override public void onSuccess(SendResult<String, Ping> result) { Ping g = result.getProducerRecord().value(); System.out.println("Sent message=[" + g.toString() + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { System.out.println( "Unable to send message=[" + ping.toString() + "] due to : " + ex.getMessage()); } }); return new Pong("Son","Hello~!"); } }
위와 같이 Publisher를 정의하고 SpringBoot Application을 기동한다.
Subscriber Project 개발 가이드
Subscriber 구현은 다음과 같은 과정을 거친다.
(build.gradle 구성 > @SpringBootApplication 구성 > application.properties 정의 > KafkaSubscriberConfig 정의 > domain(Sender & Reciever Message) 정의 > Service 정의)
1. build.gradle
gradle 빌드로 dependency를 정의한다. 기본 SpringBoot Application 이외에 Swagger, Testing, Kafka 등이 포함되어 있다.
plugins { id 'org.springframework.boot' version '2.2.6.RELEASE' id 'io.spring.dependency-management' version '1.0.9.RELEASE' id 'java' } group = 'kafka.subscriber' version = '0.0.1-SNAPSHOT' sourceCompatibility = '1.8' repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.kafka:spring-kafka' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'org.springframework.kafka:spring-kafka-test' } test { useJUnitPlatform() } jar { enabled=true manifest { attributes 'Main-Class' : 'kafka.subscriber.KafkaSubscriberSpringBootApplication' } }
수신측에는 별도의 swagger 구성이 필요 없으며, 그외의 구성은 Publisher와 동일하다.
2. @SpringBootApplication 클래스 정의
3. application.properties 정의
SpringBoot Embedded Tomcat Port 8081과 kafka server 정보(kafka.bootstrap) 등을 정의한다.
앞서 Publisher가 정의한 topic name과 동일한 ping.topic.name으로 hello를 정의한다.
server.port=8081 kafka.bootstrap=localhost:9092 ping.topic.name=hello
4. KafkaSubscriberConfig 정의
Consumer(=Subscriber)가 Message를 Recieve하기 위한 ConsumerConfig를 구성한다.
package kafka.subscriber.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.serializer.JsonDeserializer; import kafka.subscriber.domain.Ping; @EnableKafka @Configuration public class KafkaSubscriberConfig { @Value(value = "${kafka.bootstrap}") private String bootstrap; public ConsumerFactory<String, Ping> pingConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); props.put(ConsumerConfig.GROUP_ID_CONFIG, "pong"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Ping.class, false)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Ping> pingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Ping> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.setConsumerFactory(pingConsumerFactory()); return factory; } }
a. BOOTSTRAP_SERVERS_CONFIG : 연결할 Kafka Server 주소이다. Multi Cluster 환경일 경우 여러개를 나열하여 구성할 수 있다.
b. GROUP_ID_CONFIG : Consumer를 식별하는 고유 아이디이다. 메시지를 전송할 때 지정하는 Topic과는 다른 의미이다. Zookeeper에서는 각 그룹의 메시지 offset을 관리하는데 GROUP_ID가 같으면 offset 역시 공유된다.
c. KEY_SERIALIZER_CLASS_CONFIG / VALUE_SERIALIZER_CLASS_CONFIG : key-value serializer로 각각 메시지를 serialize할 때 사용할 클래스를 지정한다. ByteArraySerializer, JsonSerializer, StringSerializer 등의 클래스를 지정할 수 있다.
d. ENABLE_AUTO_COMMIT_CONFIG : Consumer가 반환한 메시지의 offset을 kafka에 주기적으로 커밋할지 여부를 결정한다. 커밋된 offset은 프로세스가 실패할 때 Consuming이 시작될 위치로 사용된다.
e. AUTO_OFFSET_RESET_CONFIG : Kafka에 초기 offset이 없거나, offset이 범위를 벗어난 경우 수행할 작업을 지정한다.
- earliest : 가장 빠른 오프셋으로 자동 재설정
- latest : 최신 오프셋으로 자동 재설정
- none : 이전 오프셋이 발견되지 않으면 컨슈머 그룹에 예외 발생
- anything else : 컨슈머에게 예외 발생
5. domain 정의
domain은 송/수신 메시지를 저장하는 Object로써 정의된다. 실제 SpringBoot RestAPI를 정의할 경우 DTO to Domain Model 형태로 접근하지만, 본 가이드에서는 Domain Model만 사용하도록 한다. (View 계층이 없으므로..)
[Ping.java]
package kafka.subscriber.domain; import java.io.Serializable; public class Ping implements Serializable { private String msg; private String name; public Ping() { } public Ping(String msg, String name) { this.msg = msg; this.name = name; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return msg + ", " + name + "!"; } }
6. Service 정의
Service에서 실제 비즈니스 로직을 처리하도록 구현한다.
package kafka.subscriber.service; import org.omg.CORBA.SystemException; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import kafka.subscriber.domain.Ping; @Component public class KafkaSubscriberService { @KafkaListener(topics = "${ping.topic.name}", containerFactory = "pingKafkaListenerContainerFactory") public void pingListener(Ping ping, Acknowledgment ack) { try { System.out.println("Recieved ping message: " + ping); ack.acknowledge(); } catch (Exception e) { String msg = "시스템에 예상치 못한 문제가 발생했습니다"; System.out.println("Recieved ping message: " + msg + e); } } }
이때 KafkaListener를 통해 hello(application.properties에 정의된 topic.name) topic을 구독하게 되며, offset에 따라 메시지를 수신한 후 비즈니스 로직을 처리한다.
# 중요 : ack.acknowledge()는 모든 비즈니스 로직이 처리된 이후 offset 값을 변경하도록 하여 실패에 대한 offset 변경이 발생하지 않도록 비즈니스 로직 제일 마지막에 처리한다.
Pub/Sub Test
실제 테스트는 API를 호출하는 Client로 부터 시작하지만, 현재 View가 별도로 구현되어 있지 않은 관계로 Publisher에 구성한 Swagger를 기반으로 테스트를 진행해 보도록하자.
1. Swagger UI 접속
http://localhost:8080/swagger-ui.html
2. API Test
위와 같이 Swagger-UI에 접근하여 아래와 같이 kafka-publisher-controller API를 테스트할 수 있다.
API Parameter에 message와 name을 다음과 같이 입력하고 Try it out! 버튼을 클릭하면 다음과 같이 결과를 Return 받을 수 있다.
3. 결과 확인
먼저 Publisher 측 Log를 확인한다.
Sent message=[Son, Hi!] with offset=[1] Sent message=[Son, Hi!] with offset=[2] Sent message=[Glad to see you, Nara!] with offset=[3]
로그는 위와 같이 message와 offset 정보를 출력하고 있다.
2번에서 발신한 Glad to see you, Nara!라는 메시지와 앞서 전송한 2번의 메시지가 있어 offset 3으로 메시지가 발생된 것을 확인할 수 있다.
이는 앞서 아키텍처에서도 확인했지만, Topic Name이 hello인 hello-0 partition에 메시지가 3번째 쌓이고 있으며, 이는 Subscriber가 수신할 수 있는 상태이다.
다음으로 Subscriber 측 Log를 확인해 보자.
Recieved ping message: Son, Hi! Recieved ping message: Son, Hi! Recieved ping message: Glad to see you, Nara!
다음과 같이 Subscriber 측에서 메시지를 수신한 것을 확인할 수 있다.
마지막으로 Subscriber를 다운한 상태로 메시지를 연속으로 발신한 후 한번에 수신하는지 여부를 확인해 보자.
Publisher는 다음과 같이 offset 10번까지 메시지를 발신하였고,
Sent message=[Son, Hi!] with offset=[1] Sent message=[Son, Hi!] with offset=[2] Sent message=[Glad to see you, Nara!] with offset=[3] Sent message=[Glad to see you, Nara!] with offset=[4] Sent message=[Glad to see you, Nara!] with offset=[5] Sent message=[Glad to see you, Nara!] with offset=[6] Sent message=[Glad to see you, Nara!] with offset=[7] Sent message=[Glad to see you, Nara!] with offset=[8] Sent message=[Glad to see you, Nara!] with offset=[9] Sent message=[Glad to see you, Nara!] with offset=[10]
Subscriber는 다운 상태이다.
Subscriber를 기동하고 상태를 확인해 보자.
기동 로그에서 다음과 같은 로그를 확인할 수 있을 것이다.
2020-09-30 00:19:00.775 INFO 326540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=pong] Setting offset for partition hello-0 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=DESKTOP-SDT0PVK:9092 (id: 0 rack: null), epoch=0}}
시작점인 offset이 5임을 알 수 있고 현재 Publisher가 offset 10번까지 메시지를 전송하였으니 이 정보를 기반으로 총 6개의 메시지가 수신될 것을 기대할 수 있다.
위와 같이 메시지가 총 6개 수신된 것을 확인할 수 있다.
여기서 기억해 두어야 할 점은 앞서 Kafka 아키텍처에서 살펴보았듯이, Kafka는 Group 당 하나의 메시지를 가져간다는 점이다. 이로 인해 동일 그룹의 Subscriber가 추가되면 메시지는 그 중 하나의 수신자에게만 전달된다.
public ConsumerFactory<String, Ping> pingConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); props.put(ConsumerConfig.GROUP_ID_CONFIG, "pongpong"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Ping.class, false)); }
다만 위와 같이 GROUP_ID_CONFIG를 변경하여 서로 다른 Group에 존재할 경우 각 Subscriber는 모두 메시지를 수신한다.
결론
이와 같이 Kafka Message Broker를 이용한 Pub/Sub 테스트를 진행하였다. Pub/Sub 구조의 가장 큰 특징은 Broker가 직접 Consumer에게 메시지를 전송하는 것이 아닌 Consumer가 Broker를 구독하고 있다가 offset이 update되면 직접 pulling한다는 점이다. 이로 인해 offset을 조정하여 원하는 데이터만 추출해 올 수 있도록 구현할 수도 있다는 특징이 있다.
Kafka가 RabbitMQ와 달리 AMQP 프로토콜이나 Java Message Service를 지원하지 않지만, TCP 기반의 단순한 발신으로 오버헤드를 감소 시켜 대용량 발신에 포커싱이 맞추어진 Message Broker라 할 수 있다.
Kafka와 같은 메시지 브로커를 통해 비동기 통신으로 마이크로서비스가 갖고 있는 수직적 구조가 갖고 있는 성능 제약사항을 일부 극복할 수 있지만 마찬가지로 고려해야 할 사항도 다수 존재한다.
예를 들어 Kafka가 대용량 처리에 적합한 메시지 브로커라고 소개했는데 이는 어디까지나 최적화된 아키텍처와 개발이 적용되었을때의 이야기이다.
대용량 데이터를 전송하기 위해서는 처리시간이 보장되는 안정된 성능을 확보할 수 있는 메시지 전송 구조가 확보되어야 하며, 무엇보다 데이터 전송에 발생하는 시스템 리소스와 네트워크 비용을 고려해야한다.
또한 데이터의 일관성과 데이터 순서보장 등에 대한 고려가 충분히 이루어져야 비로서 Kafka의 가치가 나타나게 될 것이다.
Kafka 테스트 코드는
github.com/sonnaraon/kafka-sample
에서 다운로드 받을 수 있다.
'③ 클라우드 > ⓜ MSA' 카테고리의 다른 글
성공적인 DevSecOps 구현하기 (0) | 2021.03.19 |
---|---|
Microservice Architecture API Composition (0) | 2020.10.04 |
Kafka(Zookeeper) 구축 (0) | 2020.09.29 |
Kafka(Zookeeper) 아키텍처 (0) | 2020.09.29 |
Event-Driven Architecture (0) | 2020.05.07 |
- Total
- Today
- Yesterday
- jeus
- wildfly
- aws
- TA
- openstack tenant
- OpenStack
- MSA
- JEUS7
- SWA
- nodejs
- openstack token issue
- kubernetes
- node.js
- JEUS6
- apache
- aa
- 오픈스택
- JBoss
- git
- Architecture
- webtob
- 마이크로서비스
- 쿠버네티스
- Docker
- 아키텍처
- Da
- k8s
- 마이크로서비스 아키텍처
- SA
- API Gateway
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |