본문 바로가기
Spring & JPA

[Spring Integration] Spring에서 MQTT 통신하기 02

by yhames 2024. 10. 13.
728x90

들어가기 전에

지난 글에서 공식문서 예제에 다음과 같은 문제점이 있다고 생각을 했습니다.
 

  1. 단일 토픽
  2. 설정 클래스에 너무 많은 로직

 
이번 글에서는 여러가지 고민 끝에 생각한 구조를 공유해보려고합니다. 나름대로 확장성과 성능을 고려해보려고 했지만, 아직 많이 부족하니 감안하고 봐주시면 감사하겠습니다.
 
모든 코드는 github에 올려두었으니 참고하시면 될 것 같습니다.
 

아키텍처

먼저 어떤 방식으로 여러 토픽을 구독할 수 있는지 찾아보았습니다. 하지만 Spring Integration에 관련된 레퍼런스나 Spring에서 MQTT 통신하는 예제가 많이 없어서, 여러 토픽을 구독할 수 있는 아키텍처를 직접 생각을 해야했습니다. 그 중에서 StackOverflow에서 저와 비슷한 고민을 했던 질문이 있어서 해당 내용을 참고했습니다.
 
Paho MQTT Client: Best Practice for subscribing to multiple Topics

Paho MQTT Client: Best Practice for subscribing to multiple Topics

Our usecase is that multiple assets send different types of data (let's say status, temp and data) to a MQTT broker. Because message brokers are very good at routing, handling topics, etc., the ass...

stackoverflow.com

 

Message Router를 사용하는 방법

 
먼저 Message Router를 사용하여 적절한 채널에 라우팅하는 구조입니다. 특징으로는 하나의 Channel Adapter를 사용하기 때문에 자원을 효율적으로 사용할 수 있고, 와일드카드('#', '+')를 사용하여 여러 토픽들을 계층적으로 처리할 수 있을 것 같습니다. 하지만 구조가 복잡하고, 여전히 설정 클래스에 너무 많은 로직이 들어가야한다고 생각해습니다.
 

Channel Adapter를 사용하는 방법

 
다음으로는 여러 개의 Channel Adapter를 사용하여 여러 토픽을 구독하는 방법입니다. MqttPahoMessageDrivenAdapter가 여러번 중복되지만, 구조가 단순하고 다른 토픽에 영향을 받지 않는다는 특징이 있습니다.
 

Channel Adapter를 사용하는 방법을 선택한 이유

결론적으로 여러개의 Channel Adapter를 사용하여 여러 토픽을 구독하는 방법을 사용했습니다. 구조가 훨씬 간단해서 가독성이 좋았고, 채널, 어댑터 등을 분리하여 레이어드 아키텍처와 유사하게 관리하기 용이했습니다. 결정적으로 하나의 토픽에 너무 많은 트래픽이 발생하는 경우 다른 토픽에 영향을 줄 수 있기 때문에 Message Router를 사용해서 모든 토픽을 구독하는 것은 좋지 않다고 판단했습니다.
 

 
하지만 와일드카드를 사용하여 하위 토픽을 계층적으로 처리하는 것은 효율적으로 토픽을 처리하는 방법인 것 같습니다. 따라서 토픽이 계층 구조로 되어있다면 최상위 토픽은 Channel Adapter를 사용하여 구독하고, 하위 토픽은 Message Router를 사용하여 라우팅하는 방식으로 통합하여 활용할 수 있을 것 같습니다.
 

MQTT Inbound

 

@Configuration
public class MqttChannel {

    public static final String TOPIC1_CHANNEL = "topic1Channel";
    public static final String TOPIC2_CHANNEL = "topic2Channel";

    @Bean
    public MessageChannel topic1Channel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel topic2Channel() {
        return new DirectChannel();
    }

    // ...
}

 
먼저 MqttChannel 설정 클래스에서 Mqtt에서 사용할 채널을 모두 생성합니다. 또한 문자열 리터럴을 사용하지 않도록 채널명을 static 변수로 선언하였습니다.
 

@Configuration
public class MqttAdapter {

    @Bean
    public MessageProducerSupport topic1Adapter(MqttPahoClientFactory mqttClientFactory,
                                                MessageChannel topic1Channel) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                MqttClient.generateClientId(), mqttClientFactory, MqttTopics.TOPIC1);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(topic1Channel);
        return adapter;
    }

    @Bean
    public MessageProducerSupport topic2Adapter(MqttPahoClientFactory mqttClientFactory,
                                                MessageChannel topic2Channel) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                MqttClient.generateClientId(), mqttClientFactory, MqttTopics.TOPIC2);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(topic2Channel);
        return adapter;
    }

    // ...
}

 
다음으로 MqttAdapter 클래스에서 각 채널과 Mqtt Message Broker를 연결하는 어댑터를 생성합니다. MessageChannel은 빈을 주입받아서 사용하고, MqttTopics 클래스를 통해 토픽 이름을 따로 관리해주었습니다.
 

@Slf4j
@Component
public class MqttTopic1Handler {

    @ServiceActivator(inputChannel = MqttChannel.TOPIC1_CHANNEL)
    public void handleMessage(Message<?> message) {
        message.getHeaders().forEach((k, v) -> log.info("Header: {}={}", k, v));
        log.info("Received message in topic1: {}", message.getPayload());
    }
}
@Slf4j
@Component
public class MqttTopic2Handler {

    @ServiceActivator(inputChannel = MqttChannel.TOPIC2_CHANNEL)
    public void handleMessage(String payload) {
        log.info("Received message in topic2: {}", payload);
    }
}

 
마지막으로 MessageHandler의 경우 비즈니스 로직이 들어가야하기 때문에 설정 클래스가 아닌 별도 클래스로 선언하고, 컴포넌트 스캔 방식으로 사용했습니다. 이때 Message<?>가 아니라 String을 매개변수로 받으면, payload를 바로 String 타입으로 변환하여 사용할 수 있습니다. 상황에 따라서 메시지 헤더가 필요한 상황이면 Message<?> 타입으로 선언하면 될 것 같습니다.
 

MQTT Outbound

@Configuration
public class MqttChannel {

    public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
    
    // ...
}

 
Outbound의 경우에도 동일하게 MqttChannel 설정 클래스에 선언합니다.
 

@Configuration
public class MqttAdapter {
    //...

    @Bean
    @ServiceActivator(inputChannel = MqttChannel.OUTBOUND_CHANNEL)
    public MqttPahoMessageHandler outboundAdapter(MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                MqttClient.generateClientId(), mqttClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        return messageHandler;
    }
}

 
outboundAdapter의 경우에는 어댑터에 @ServiceActivator를 사용해야 채널에 연결할 수 있습니다.
 

@MessagingGateway(defaultRequestChannel = MqttChannel.OUTBOUND_CHANNEL)
public interface MqttGateWay {

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
}
@Service
@RequiredArgsConstructor
public class MqttPublisher {

    private final MqttGateWay mqttGateWay;

    public void publish(String topic, String message) {
        log.info("Publishing message: {} to topic: {}", message, topic);
        mqttGateWay.sendToMqtt(topic, message);
    }
}

 
이전 예제와 동일하게 MessagingGateway 인터페이스를 선언하고, Publisher 클래스를 통해 메시지를 발행하는 엔드포인트를 구현했습니다.
 

참고자료

 
Paho MQTT Client: Best Practice for subscribing to multiple Topics

Paho MQTT Client: Best Practice for subscribing to multiple Topics

Our usecase is that multiple assets send different types of data (let's say status, temp and data) to a MQTT broker. Because message brokers are very good at routing, handling topics, etc., the ass...

stackoverflow.com

Spring Integration - Router Implementations

Router Implementations :: Spring Integration

Spring Integration also provides a special type-based router called ErrorMessageExceptionTypeRouter for routing error messages (defined as messages whose payload is a Throwable instance). ErrorMessageExceptionTypeRouter is similar to the PayloadTypeRouter.

docs.spring.io

 
 

반응형