본문 바로가기
Spring & JPA

[Spring Integration] Spring에서 MQTT 통신하기 01

by yhames 2024. 10. 12.
728x90

들어가기 전에

MQTT

MQTTMessage Queuing Telemetry Transport의 약자로, 보통 스마트 센서나 사물 인터넷(IoT) 등 낮은 대역폭과 리소스가 제한된 환경에서 안정적으로 통신할 수 있는 경량 메시징 프로토콜입니다. MQTT는 최소한의 리소스와 적은 전력을 소비하기 때문에 IoT와 같은 제한된 환경에서 사용하기 적합합니다.
 
MQTT는 AMQP와 같이 발행-구독 패턴을 사용하며, 메시지 브로커가 필요합니다. MQTT에 대한 이론적인 내용은 MQTT 공식 레퍼런스에서 가이드 받을 수 있으며, HiveMQ에서 무료 e-book을 배포하고 있으니 활용하시면 될 것 같습니다.
 

 

Spring Integration

스프링에서 MQTT 통신을 하기 위해서는 먼저 Spring Integration을 알아야합니다.
 
Spring Integration이란 스프링에서 메시지 기반 시스템을 통합하기 위한 프레임워크입니다. Spring Integration은 Kafka, AMQP(RabbitMQ), MQTT 등 다양한 메시지 기반 시스템을 지원합니다. Spring Integration에서는 메시지 기반 아키텍처를 Message, MessagingChannels, MessagingEndpoints 등 추상화된 개념을 사용하기 때문에 관련해서 배경 지식이 반드시 필요합니다. 저는 이론적인 배경 지식을 위해 공식 레퍼런스를 참고하였습니다.

Figure 4. Service Activator
Figure 5. An inbound channel adapter endpoint connects a source system to a MessageChannel.
Figure 6. An outbound channel adapter endpoint connects a MessageChannel to a target system.

 

Mosquitto

Mosquitto는 EPL/EDL 라이선스의 오픈소스 MQTT 메시지 브로커 입니다. 또한 mosquitto_pub, mosquitto_sub 등 간단한 Command Line MQTT clients를 지원합니다.
 

Mosquitto 설치

먼저 MQTT 메시지 브로커를 사용하기 위해서 Mosquitto를 설치합니다.
 

  • Ubuntu
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
sudo apt-get update
sudo apt install mosquitto mosquitto-clients
sudo systemctl start mosquitto

 

  • MacOS
brew install mosquitto
brew services start mosquitto

 
Mosquitto 테스트

mosquitto-clients를 사용하여 설치된 mosquitto 메시지 브로커가 잘 동작하는지 간단하게 테스트를 할 수 있습니다. 별도의 설정을 하지 않으면 기본적으로 host는 localhost, port는 1883을 사용합니다.
 

  • Subscriber
mosquitto_sub -t "awesome"

 
mosquitto_sub를 통해 "awesome"이라는 토픽을 구독하는 명령어 입니다.
 

  • Publisher
mosquitto_pub -t "awesome" -m "Hello World!"

 
mosquitto_pub를 통해 해당 "awesome" 토픽에 "Hello World!" 메시지를 발행하는 명령어 입니다.
 

 

Spring MQTT

먼저 전체 흐름을 보면 다음과 같습니다.

  • Inbound

Spring Integration에서는 MqttPahoMessageDrivenAdapter를 사용해서 외부 시스템(MQTT Message Broker)과 MessageChannel을 연결합니다. MessageChannel을 통해 ServiceActivator에 메시지가 전달되고, 해당 메시지를 처리하는 MessageHandler가 호출됩니다.
 

  • Outbound

저는 MessagingGateway와 상호작용하는 MqttPublisher라는 엔드포인트를 만들어서 사용자가 토픽을 발행할 수 있게 구현했습니다. MessagingGateway에서 생성된 Message는 MessageChannel에 전달되고, MqttPahoMessageHandler 어댑터를 통해 외부 시스템(MQTT Message Broker)에 토픽을 발행합니다.
 

MQTT Subscriber 설정 

@Configuration
public class MqttInboundConfig {

    public static final String MQTT_USERNAME = "username";
    public static final String MQTT_PASSWORD = "password";
    public static final String MQTT_BROKER_URL = "tcp://localhost:1883";

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{MQTT_BROKER_URL});
        options.setUserName(MQTT_USERNAME);
        options.setPassword(MQTT_PASSWORD.toCharArray());
        options.setAutomaticReconnect(true);
        return options;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions mqttConnectOptions) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions);
        return factory;
    }

    // ...
}

 
먼저 MQTT 통신 연결하기위한 설정을 위해 MqttPahoClientFactory를 Bean으로 등록합니다. 공식 문서에서는 MqttConnectOptions 객체를 구성하여 팩토리에 주입하는 것을 권장하고 있습니다.

We recommend configuring an MqttConnectOptions object and injecting it into the factory, instead of setting the (deprecated) options on the factory itself.

 

@Configuration
public class MqttInboundConfig {
    // ...

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public MessageProducerSupport inboundAdapter(MqttPahoClientFactory mqttClientFactory,
                                                 MessageChannel mqttInputChannel) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                MqttClient.generateClientId(), mqttClientFactory, "topic");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel);   // mqttInputChannel에 연결
        return adapter;
    }

    // ...
}

 
다음으로 Inbound를 위한 MessageChannel과 MQTT Message Broker를 연결시키는 MqttPahoMessageDrivenChannelAdapter를 Bean으로 등록합니다. 이때 토픽 이름에 와일드 카드를 사용할 수 있습니다. '+'는 한 레벨의 토픽을 대체하고, '#' 여러 레벨의 토픽을 대체합니다.
 

@Configuration
public class MqttInboundConfig {
    // ...

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")   // mqttInputChannel에 연결
    public MessageHandler topicHandler() {
        return message -> {
            // 비즈니스 로직
            log.info("[Topic] Received message: {}", message.getPayload());
        };
    }
}

 
마지막으로 전달된 메시지를 처리하는 MessageHandler를 빈으로 등록합니다. 이때 @ServiceActivator 어노테이션을 사용하여 앞서 생성한 mqttInputChannel에 연결합니다.
 

MQTT Publisher 설정

@Configuration
public class MqttOutboundConfig {

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MqttPahoMessageHandler outboundAdapter(MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                MqttClient.generateClientId(), mqttClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        return messageHandler;
    }
}

 
토픽을 발행하는 outbound에서는 위와 같이 MqttPahoMessageHandler 클래스의 어댑터를 빈으로 등록합니다. 이때 어댑터에 @ServiceActivator 어노테이션을 사용하여 mqttOutboundChannel에 연결했습니다.
 

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateWay {

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
}

 
다음으로 @MessageGateway 어노테이션을 활용하여 복잡한 구현 없이 추상화된 인터페이스를 바로 사용할 수 있습니다. 매개변수에 @Header 어노테이션을 통해 메시지 헤더에 토픽을 추가하여 특정 토픽 메시지를 발행할 수 있도록 구현했습니다.
 

@Service
@RequiredArgsConstructor
public class MqttPublisher {

    private final MqttGateWay mqttGateWay;

    public void publish(String topic, String message) {
        log.info("Publishing message: {} to topic: {}", message, topic);
        mqttGateWay.sendToMqtt(topic, message);
    }
}

 
마지막으로 MessageGateway 인터페이스를 사용할 수 있도록 MqttPublisher라는 엔트포인트를 구현했습니다.
 

마무리 

이번 글에서는 공식문서를 참고하여 어떤 식으로 Spring에서 MQTT 통신을 할 수 있는지 간단하게 알아보았습니다.
 
공식문서의 예제로 간단한 MQTT 통신을 할 수는 있지만, 몇 가지 부족한 점이 있는 것 같습니다. 먼저 위 코드는 오직 단일 토픽만 구독하고 있습니다. 다른 레퍼런스를 참고하면서 여러 토픽을 구독할 수 있는 방법을 찾아보면 좋을 것 같습니다. 또한 Configuration 클래스에 모든 코드가 밀집되어있기 때문에 가독성과 확장성이 부족하다는 생각을 했습니다. 특히 비즈니스 로직을 처리하는 MessageHandler가 설정 클래스에 같이 들어가 있는 것은 개선이 필요해보입니다.
 

참고자료

 
Spring Integration 공식 레퍼런스

Spring Integration :: Spring Integration

Welcome to the Spring Integration reference documentation!

docs.spring.io

Spring Integration 공식 레퍼런스 - 한글 번역

Contents

스프링 인티그레이션 공식 레퍼런스를 한글로 번역한 문서입니다. 버전은 5.5.15 기준입니다.

godekdls.github.io

반응형