모종닷컴

SQS 메시지 Polling 컨트롤하기. 본문

Programming/Spring

SQS 메시지 Polling 컨트롤하기.

모종 2022. 12. 4. 11:07
반응형

혹시 AWS Simple Queue Service(SQS)를 활용하고 계신가요? 그리고 어떤 이유에 의해서 SQS에서 polling 해오는 부분을 임의로 중지 및 시작할 수 있도록 컨트롤하고 싶으신 적이 있으신가요? 그렇다면 이번 글이 도움이 될 수도 있을 것 같습니다.

저는 위와 같이 SQS를 임의로 중지하거나 시작하는 등의 컨트롤을 해야 하는 경우가 있었고 이 글이 저와 같은 고민을 하고 계신 분들께 도움이 될 수 있을 것 같아 이번 포스팅을 쓰게 되었습니다. 시작하기 전에 먼저 테스트 환경 및 코드를 먼저 맞춰보도록 하죠.

환경 세팅

IAM 사용자 생성

SQS를 사용할 수 있는 사용자 하나를 추가해보도록 하겠습니다. IAM 서비스에 들어가서 '액세스 관리'의 '사용자' 탭에 들어갑니다. 그러면 아래와 같은 화면이 보일 텐데 우측에 '사용자 추가' 버튼을 눌러줍니다.

 

사용자 이름은 원하시는대로 설정하시고, 액세스 유형은 '액세스 키 - 프로그래밍 액세스 방식'을 선택해주세요. 권한 설정에서 아래와 같이 권한을 선택해주세요.

이후 선택사항들에서는 저는 추가적인 세팅 없이 넘어갔습니다. 사용자 생성이 되면 액세스 키와 비밀 액세스 키를 받을 텐데 잘 복사해주세요. 추가적으로 생성된 사용자 정보에서 사용자 ARN도 복사해주세요.

큐 생성

먼저 제일 중요한 SQS 큐를 하나 생성을 해야 합니다. 저는 프리티어 기간이 아직 유효한 계정이 있어서 해당 계정으로 큐를 하나 생성하도록 하겠습니다. 생성 시 구성은 아래 같이 구성하였습니다.

  • 유형 : Standard
  • 큐 이름 : Monny-Test-Queue
  • 표시제한시간 : 3분
  • 메시지 보존 기간 : 1시간
  • 전송 지연시간 : 0초
  • 메시지 수신 대기 시간 : 20초
  • 암호화 : 비활성화
  • 액세스 정책 : 
    • 대기열로 메시지를 전송할 수 있는 사용자 정의 : 위에서 복사한 사용자 ARN
    • 대기열에서 메시지를 수신할 수 있는 사용자 정의 : 위에서 복사한 사용자 ARN

Gradle Dependency 설정

implementation("org.springframework.cloud:spring-cloud-aws-messaging:2.2.5.RELEASE")

application.properties or application.yml 프로퍼티 설정.

저는 yml 형식을 사용하고 있고 아래와 같이 설정해주었습니다. url에는 위에서 생성한 큐의 URL을 적어주시면 됩니다. maxNumberOfMessages는 큐에서 메시지를 폴링 해올 때 한 번에 몇 개를 들고 올지를 설정하는 부분이 있는데 해당 부분에 설정할 값입니다.

aws:
  credential:
    accessKey: {{위에서 생성한 사용자의 엑세스키}}
    secretKey: {{위에서 생성한 사용자의 시크릿키}}
my-queue:
  url: {{큐 생성 후 발급받은 URL}}
  maxNumberOfMessages: 1

SQS 설정 

SQS 설정 관련된 파일을 만들겠습니다.

@EnableSqs
@Configuration
class MyQueueConfig(
    @Value("\${my-queue.maxNumberOfMessages}") private val maxNumberOfMessages: Int,
    @Value("\${aws.credential.accessKey}") private val accessKey: String,
    @Value("\${aws.credential.secretKey}") private val secretKey: String,
) {

    @Primary
    @Bean
    fun amazonSQSAsync(): AmazonSQSAsync {
        val awsCreds = BasicAWSCredentials(accessKey, secretKey)

        return AmazonSQSAsyncClientBuilder.standard()
            .withRegion("ap-northeast-2")
            .withClientConfiguration(ClientConfiguration())
            .withCredentials(AWSStaticCredentialsProvider(awsCreds))
            .build()
    }
    /**
     * Publisher Config
     * */
    @Bean
    fun queueMessagingTemplate() = QueueMessagingTemplate(amazonSQSAsync())


    /**
     * Listener Config
     * */
    @Bean
    fun simpleMessageListenerContainerFactory(): SimpleMessageListenerContainerFactory {
        return SimpleMessageListenerContainerFactory().apply {
            setAmazonSqs(amazonSQSAsync())
            setMaxNumberOfMessages(maxNumberOfMessages)
        }
    }
}

테스트 컨트롤러 생성

private val log = KotlinLogging.logger {  }

@RestController
@RequestMapping("/api/sqs")
class SQSTestApiController(
    private val queueMessagingTemplate: QueueMessagingTemplate,
    @Value("\${my-queue.url}") private val queueUrl: String
) {

    @PostMapping("/sendMessage")
    fun sendMessage(@RequestBody wrapper: MessageWrapper) {
        log.info { "[SEND] message = ${wrapper.message}" }
        queueMessagingTemplate.convertAndSend(queueUrl, wrapper.message)
    }

    @SqsListener(value = ["\${my-queue.url}"], deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
    fun receiveMessage(message: String) {
        log.info { "[RECEIVE] message = $message" }
    }


    class MessageWrapper(val message: String = "Default Message")
}

SQS Publish & Subscribe 테스트

SQS 제대로 생성되었는지 위의 테스트 컨트롤러를 이용해서 확인해보도록 하겠습니다. /api/sqs/sendMessage 엔드포인트에 메시지를 전달해보겠습니다. 아래와 같은 스펙으로 요청을 날려보겠습니다.

POST http://localhost:{{포트번호}}/api/sqs/sendMessage
Content-Type: application/json

{
  "message": "Hello World!"
}

아래 사진과 같이 로그가 잘 찍히고 있습니다.

리스너 쪽 살펴보기 (SimpleMessageListenerContainer)

이제 본격적으로 큐로부터 메시지를 폴링 해오는 부분을 봐보겠습니다. 위에서 Sqs 설정하는 코드를 보면 SimpleMessageListenerContainerFactory를 빈으로 등록했었죠? 그리고 설정 클래스 상단에는 @EnableSqs 어노테이션을 설정했고요. 자세한 내용은 다루지 않겠습니다만 간단하게 설명하면 @EnableSqs가 세팅되면 SqsConfiguration 설정 클래스가 import 되고, 이 설정 파일의 내용을 보면 SimpleMessageListenerContainerFactory를 이용해서 SimpleMessageListenerContainer를 생성하고 이것을 스프링 빈으로 등록해줍니다. 

이렇게 생성된 SimpleMessageListenerContainer가 DefaultLifecycleProcessor에 의해 start()가 호출되고 SimpleMessageListenerContainer는 start 하면서 AsynchronousMessageListener라 불리는 task를 TaskExecutor에 등록해놓게 됩니다. AsynchronousMessageListener 클래스가 바로 큐로부터 메시지를 가져오는 역할을 합니다. 

이 task를 중지시키면 바로 폴링 태스크를 멈출 수 있는 거지요. SimpleMessageListenerContainer에는 stop과 start 메서드가 존재하는데 이 역할이 바로 폴링 태스크를 멈출지 시작할지를 의미합니다. 그럼 이제 이것을 구현해보도록 하겠습니다. 테스트 컨트롤러에 아래와 같이 추가를 해줍니다.

@PostMapping("/pollingStop")
fun pollingStop() {
    log.info { "[Polling Task] stop" }
    /**
     * stop() 메서드 호출시 polling task가 추가적으로 돌지 않도록 flag를 세우고, 현재 실행중인 태스크가 종료되길 기다린다(timeout : queueStopTimeout 초)
     * 만약 이 태스크가 큐로부터 메시지를 가져오는 중이고 가져올 메시지가 없다면 타임아웃(default: 20초)시간 동안 대기하게 된다.
     * 따라서 (실행중인 태스크가 종료되길 기다리는 시간 = queueStopTimeout) > (큐로부터 메시지를 가져오길 기다리는 시간 = DEFAULT_WAIT_TIME_IN_SECONDS)
     * @see org.springframework.cloud.aws.messaging.listener.AbstractMessageListenerContainer.DEFAULT_WAIT_TIME_IN_SECONDS
     * */
    simpleMessageListenerContainer.queueStopTimeout = 25 * 1000 // 이 부분이 상당히 재밌는데 궁금하다면 한번쯤 파보시죠
    simpleMessageListenerContainer.stop()
}

@PostMapping("/pollingStart")
fun pollingStart() {
    log.info { "[Polling Task] start" }
    simpleMessageListenerContainer.start()
}

 

테스트

위의 코드를 추가한 이후 다시 애플리케이션을 구동시킵니다. 그리고 /api/sqs/pollingStop 엔드포인트를 먼저 호출해서 polling task를 중지시킨 후 메시지 하나를 큐에 넣어봅시다. 이전과는 다르게 메시지를 큐에 쌓았지만 polling task가 중지되었기 때문에 메시지 소비는 하지 않고 있습니다.

이제 /api/sqs/pollingStart를 호출해서 다시 polling task를 실행시켜서 메시지를 소비해보도록 하겠습니다.

polling task를 실행시키니 바로 큐로부터 메시지를 소비를 하고 있네요.

포스팅 마무리

이번 글에서는 SQS를 스프링 부트에 적용했고 큐로부터 메시지를 가져오는 부분을 수동으로 컨트롤하는 법에 대해 배웠습니다. 혹시라도 저와 같이 헤매던 분들께 조금이라도 도움이 되었으면 좋겠네요. 오늘 포스팅은 여기까지입니다. 읽어주셔서 감사합니다.

반응형