RabbitMQ GCP 에 올리기
26 Jul 2021개요
RabbitMQ 란 메시지 브로커 방식의 메시지 큐 오픈소스입니다.
메시지 큐를 왜 사용하는지, 어떻게 설치하여 사용하는지 차근차근 살펴보겠습니다.
아래와 같은 준비물이 필요합니다.
- RabbitMQ
- SpringBoot
- GCE (구글 클라우드 플랫폼 - Google Compute Engine)
- Docker
RabbitMQ 란?
AMQP(Advanced Message Queuing Protocol)은 각기 다른 벤더들이 메시징을 통해 통신할 때 사용하는 응용 계층 프로토콜 입니다.
RabbitMQ는 얼랭(Erlang)으로 짜여진 AMQP 프로토콜의 구현체로 비동기 작업 큐에 주로 사용됩니다.
이러한 메시지 큐에는 RabbitMQ 외에도 Kafka, ActiveMQ, ZeroMQ 등 여러가지가 있습니다.
주로 RabbitMQ 와 Kafka 가 쓰이며, 비교적 가벼운 RabbitMQ 를 다뤄보도록 하겠습니다.
RabbitMQ 는 주로 복잡한 라우팅, 신속함이 요구되는 웹에 유리한 반면 Kafka 는 무거운 성능을 다루는데 유리합니다. 비교적 가벼운 작업 또는 안정적인 작업이 요구된다면 RabbitMQ 를 사용하는 것이 바람직해 보입니다.
라우팅 모델
AMQP 의 라우팅 모델은 Exchange
, Queue
, Binding
으로 구성됩니다.
-
Exchange
Producer 로부터 수신한 메시지를 큐에 분배하는 라우터 역할 -
Queue
메시지를 메모리/디스크에 저장했다가 Consumer 에게 전달하는 역할 -
Binding
Exchange
와Queue
의 관계를 정의한 것
Exchange Type
AMQP 는 메시지를 어떻게 분배(라우팅)할지에 대해 다음과 같이 타입을 나누었습니다.
-
Direct
Exchange
에 바인딩 된Queue
중 메시지의 라우팅 키와 매핑되어 있는Queue
로 메시지 전달.
1:1 관계로 Unicast 방식에 적합. -
Fanout
메시지의 라우팅 키를 무시하고 바인딩 된 모든Queue
에 메시지를 전달.
1:N 관계로 브로드캐스트 하는 용도로 사용. -
Topic
메시지의 라우팅 키를Queue
에 정의된 패턴과 대조하여 메시지를 전달.
Multicast 방식에 적합. -
Headers
라우팅 키를 사용하지 않고 메시지 헤더에 여러 속성들을 더해 속성이 매칭되는Queue
에 메시지 전달.
RabbitMQ 설치 (in GCP)
GCE 인스턴스 생성
GCE
인스턴스 생성이 처음이시라면 이곳을 참고해주세요.
생성된 인스턴스에 방화벽 규칙을 추가하여 RabbitMQ 사용 포트(5672
, 15672
)를 열어줍니다.

Docker 설치
RabbitMQ 를 간단하게 GCE 인스턴스위에 올리기 위해 컨테이너 기술을 사용합니다.
컨테이너 기술인 Docker
설명 및 설치에 관한 내용은 이곳에 정리해두었습니다.
RabbitMQ 컨테이너 생성
도커 명령어를 통해 RabbitMQ 이미지를 받아와 컨테이너화 합니다.
sudo docker run -d --name rabbitmq -p 5672:5672 -p 8080:15672 --restart=unless-stopped -e RABBITMQ_DEFAULT_USER= -e RABBITMQ_DEFAULT_PASS= rabbitmq:management
-
USERNAME
RabbitMQ 관리 모듈에 사용되는 아이디 설정 -
PASSWORD
RabbitMQ 관리 모듈에 사용되는 패스워드 설정
{외부 접속 IP}:8080
으로 접속하면 관리 페이지를 확인할 수 있습니다.

RabbitMQ 스프링 연동
간단한 스프링 부트 프로젝트를 생성합니다.
전체 코드는 이곳을 확인해주세요.
스프링 AMQP 사용을 위한 디펜던시를 추가해줍니다.
(데이터 바인딩을 위한 jackson 도 추가해줍니다)
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'com.fasterxml.jackson.core:jackson-databind'
}
application.yml
파일을 열어 rabbitmq 설정을 적어줍니다.
spring:
rabbitmq:
host:
port: 5672
username:
password:
# virtual-host: virtual host를 사용하는 경우 virtual host 이름
RabbitTemplate (Publisher)
RabbitTemplate
은 메시지 큐로 이벤트를 생산하는 Publisher 입니다.
아래와 같이 RabbitTemplate
빈을 설정합니다.
@Bean
public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
-
setConnectionFactory()
주입 받은 connectionFactory 를 설정합니다. -
setChannelTransacted()
위 설정이 true 라면, @Transaction 이 붙은 것처럼 처리됩니다. -
setReplyTimeout()
요청에 대한 타임아웃 시간을 설정합니다. -
setMessageConverter()
받아온 JSON 형태의 메시지를 객체로 바인딩하는 컨버터와 연결시켜줍니다.
SimpleRabbitListenerContainerFactory (Consumer)
구독한 메시지 큐를 통해 받아온 이벤트를 처리하는 Consumer 입니다.
아래와 같이 SimpleRabbitListenerContainerFactory
빈을 설정합니다.
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDefaultRequeueRejected(false);
factory.setMessageConverter(messageConverter());
factory.setChannelTransacted(true);
factory.setAdviceChain(RetryInterceptorBuilder
.stateless()
.maxAttempts(MAX_TRY_COUNT)
.recoverer(new RabbitMqExceptionHandler())
.backOffOptions(INITIAL_INTERVAL, MULTIPLIER, MAX_INTERVAL)
.build());
return factory;
}
-
setDefaultRequeueRejected()
true 시, 리스너에서 예외 발생하면 다시 큐로 돌아가 쌓이게 됩니다.
(예외 상황을 해제하기 전까지 무한 반복될 듯 합니다) -
setMessageConverter()
받아온 JSON 형태의 메시지를 객체로 바인딩하는 컨버터와 연결시켜줍니다. -
setChannelTransacted()
위 설정이 true 라면, @Transaction 이 붙은 것처럼 처리됩니다. -
maxAttempts()
예외가 발생했을 경우 몇 번을 더 재시도할 지를 정합니다. -
recoverer()
예외 핸들러를 추가합니다. -
backOffOptions()
재시도 횟수에 대한 옵션을 지정합니다.
ex) 3000, 3, 10000 으로 지정한 경우: 3초 간격으로 3으로 곱해서 최대 10초 까지 재시도
이벤트 이름 정의
각 이벤트 이름들에 대해 interface 형태로 관리합니다.
이곳에 정의된 이벤트 이름으로 라우팅하게 됩니다.
public interface RabbitMqEvent {
String PRODUCT_MESSAGE_EVENT = "product.message";
String PRODUCT_SAVE_EVENT = "product.save";
}
Queue 빈 등록
정의한 이벤트를 받아올 Queue 를 등록합니다.
각 Queue 는 Bean 으로 등록되고 서버에 자동으로 등록됩니다.
@Bean
public Queue productMessageEvent() {
return QueueBuilder
.durable(RabbitMqEvent.PRODUCT_MESSAGE_EVENT)
.build();
}
@Bean
public Queue productSaveEvent() {
return QueueBuilder
.durable(RabbitMqEvent.PRODUCT_SAVE_EVENT)
.build();
}
이벤트 정의
이벤트 데이터 모델을 정의합니다.
@Builder
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
@Getter
@ToString
public class ProductEvent {
private String name;
private int price;
}
- 기본 생성자의 접근지정자를 private 로 설정할 경우 오류가 발생합니다.
- jackson 데이터 바인딩 시 내부 필드를 읽을 수 있도록 fieldVisibility 를 열어줍니다.
이벤트 핸들러 클래스 정의
수신받은 이벤트를 처리할 핸들러를 정의합니다.
@Component
@AllArgsConstructor
public class ProductListener {
@RabbitListener(queues = RabbitMqEvent.PRODUCT_MESSAGE_EVENT)
public void productMessageHandler(final Message message) {
System.out.println("productMessageHandler");
System.out.println(message);
}
@RabbitListener(queues = RabbitMqEvent.PRODUCT_SAVE_EVENT)
public void productSaveHandler(final ProductEvent event) {
System.out.println("productSaveHandler");
System.out.println(event);
}
}
이제 스프링 프로젝트를 시작하면 수신부가 돌아가게 됩니다.
다음은 간단한 이벤트 메시지를 발신하는 발신부 코드입니다.
Sender 프로젝트
이 역시 스프링 프로젝트로 생성 후 윗 작업과 같은 반복을 해줍니다.
- spring amqp 디펜던시 추가
- application.yml 에 rabbitMQ 설정 추가
RabbitTemplate
빈을 등록해줍니다.
@Configuration
public class RabbitConfiguration {
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
메시지 발송
CommandLineRunner
를 통해 간단한 메시지를 발송합니다.
@Component
public class ProductSender implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
public ProductSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void run(String... args) {
ProductEvent event = ProductEvent.builder()
.name("초코파이")
.price(3600)
.build();
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(RabbitMqEvent.PRODUCT_MESSAGE_EVENT, "all message!!");
System.out.println("Sending my custom message");
rabbitTemplate.convertAndSend(RabbitMqEvent.PRODUCT_SAVE_EVENT, event);
}
}
참고 사이트
- https://cheese10yun.github.io/spring-rabbitmq/