ballqs 님의 블로그
[Spring] Redis pub/sub란? 본문
Redis를 이래저래 공부를 해보았다가 다시 글을 쓰는건 오랜만인듯 하다.
무려... kafka , RabbitMQ 등을 공부하다가 Redis에도 Message Queue 라는 통신 방법이 있다는 것을 알고 검색해서 따라 해보았다.
이전 Redis 작성글 : https://ballqs.tistory.com/49
Message Queue란 무엇인가?
Message Queue는 애플리케이션 간 비동기 통신을 지원하는 시스템입니다. 보통 두 애플리케이션 사이에서 메시지를 보내고 받는 역할을 하며, 발신자는 메시지를 큐에 보내고 수신자는 이 큐에서 메시지를 가져가 처리합니다. 메시지 큐는 데이터를 임시로 저장하는 방식으로, 발신자와 수신자가 동시에 통신할 필요 없이 작업을 처리할 수 있게 해줍니다.
Message Queue의 구조와 자료구조
메시지 큐는 Queue 자료구조를 기반으로 합니다. Queue는 FIFO (First In First Out) 원칙을 따르며, 먼저 들어온 메시지가 먼저 처리됩니다. 이는 메시지의 순서 보장이 중요한 시스템에서 필수적입니다. 메시지 큐는 보통 발신자가 큐에 메시지를 넣고, 수신자가 이를 가져가는 방식으로 동작합니다.
Message Queue의 종류
- RabbitMQ: AMQP (Advanced Message Queuing Protocol)를 사용하는 메시지 브로커입니다. 메시지 전달 보장, 확장성, 유연한 라우팅 등을 지원하며, 복잡한 메시징 요구사항을 충족하는 데 적합합니다.
- Kafka: Apache Kafka는 분산 메시지 스트리밍 플랫폼입니다. 고성능, 대용량 데이터 처리에 특화되어 있으며, 로그 데이터를 처리하는 데 주로 사용됩니다. 메시지 큐라기보다는 로그 시스템에 가깝습니다.
- ActiveMQ: Java 기반의 메시징 시스템으로, JMS (Java Message Service) 표준을 지원합니다. 다중 프로토콜을 지원하며, 다양한 언어와 플랫폼에서 사용 가능합니다.
- Redis Message Queue (RMQ): Redis는 원래 인메모리 데이터 저장소이지만, Pub/Sub 및 Redis Streams 기능을 통해 메시지 큐처럼 사용할 수 있습니다. Redis를 메시지 큐로 사용하면 빠르고 가벼운 비동기 처리에 적합하며, 메시지를 실시간으로 처리할 수 있습니다.
Redis Message Queue
Redis Message Queue는 Redis의 Pub/Sub 또는 Redis Streams 기능을 통해 구현됩니다. Redis의 빠른 성능을 바탕으로, 실시간 메시징 시스템에서 많이 사용됩니다.
- Pub/Sub: 발행(Publish)자가 메시지를 특정 채널에 발행하면, 구독(Subscribe)한 수신자들이 이를 실시간으로 받아볼 수 있습니다. 발행과 동시에 메시지가 전달되므로 메시지 손실이 발생할 수 있다는 단점이 있습니다.
- Redis Streams: Redis Streams는 데이터 스트리밍 기능을 제공하며, Pub/Sub과 달리 메시지를 로그처럼 저장해두고 필요할 때 가져가는 방식입니다. 메시지 내구성이 강화되며, Kafka와 유사한 동작을 합니다.
Spring boot 에 적용 방법
※작성시 참조한 사이트 : 링크
dependencies 추가
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
RedisConfig.java 작성
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer redisMessageListener(
RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setEnableTransactionSupport(true);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public RedisConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration();
redisConfiguration.setHostName(host);
redisConfiguration.setPort(port);
return new LettuceConnectionFactory(redisConfiguration);
}
}
RedisPublisher.java 작성
@RequiredArgsConstructor
@Service
public class RedisPublisher {
private final RedisTemplate<String, Object> redisTemplate;
public void publish(ChannelTopic topic, String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
RedisSubscriber.java 작성
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String channel = new String(pattern); // 구독한 채널 이름 확인
log.info("channel: {}", channel);
String msg = (String) redisTemplate.getStringSerializer()
.deserialize(message.getBody());
if (isValidJson(msg)) { // JSON인지 체크
Map<String , String> obj = objectMapper.readValue(msg, HashMap.class);
for (String key : obj.keySet()) {
System.out.println(obj.get(key));
}
} else {
System.out.println("Received message: " + msg); // JSON이 아니면 단순 출력
}
} catch (Exception e) {
e.printStackTrace();
}
}
// JSON 형식인지 확인하는 함수
private boolean isValidJson(String str) {
try {
objectMapper.readTree(str);
return true;
} catch (IOException e) {
return false;
}
}
}
PubSubController.java 작성
@RequiredArgsConstructor
@RequestMapping("/pubsub")
@RestController
public class PubSubController {
private final RedisMessageListenerContainer redisMessageListener;
private final RedisPublisher redisPublisher;
private final RedisSubscriber redisSubscriber;
private Map<String, ChannelTopic> channels;
@PostConstruct
public void init() {
channels = new HashMap<>();
}
// 토픽 목록
@GetMapping("/topics")
public Set<String> getTopicAll() {
return channels.keySet();
}
// 토픽 생성
@PutMapping("/topics/{name}")
public void createTopic(@PathVariable String name) {
ChannelTopic channel = new ChannelTopic(name);
redisMessageListener.addMessageListener(redisSubscriber, channel);
channels.put(name, channel);
}
// 메시지 발행
@PostMapping("/topics/{name}")
public void pushMessage(@PathVariable String name, @RequestParam String message) {
ChannelTopic channel = channels.get(name);
redisPublisher.publish(channel, message);
}
// 토픽 제거
@DeleteMapping("/topics/{name}")
public void deleteTopic(@PathVariable String name) {
ChannelTopic channel = channels.get(name);
redisMessageListener.removeMessageListener(redisSubscriber, channel);
channels.remove(name);
}
}
출력 결과
토픽 생성
토픽 목록
토픽 메세지 발행
Redis Message Queue Command
명령어 | 설명 |
PUBSUB channels | 채널 목록 조회 |
SUBSCRIBE channel [channel ...] | 지정된 채널을 구독합니다. |
PUBLISH channel message | 지정된 채널에 메시지를 발행합니다. |
UNSUBSCRIBE channel [channel ...] | 지정된 채널의 구독을 해지합니다. |
PSUBSCRIBE pattern [pattern ...] | 패턴 매칭을 통해 여러 채널을 동시에 구독합니다. |
PUNSUBSCRIBE pattern [pattern ...] | 패턴 매칭을 통해 여러 채널의 구독을 해지합니다. |
'코딩 공부 > Spring' 카테고리의 다른 글
[Spring] Redis Cluster localhost에 구현하여 적용 (0) | 2024.10.15 |
---|---|
[Spring] Redis 동시성 문제 Redisson으로 해결! (0) | 2024.10.08 |
[Spring] @Async 란? (0) | 2024.10.04 |
[Spring] JpaRepository 쿼리 사용 방법 및 효율적으로 사용 방법 (0) | 2024.09.26 |
[Spring] JPA 매핑 기능 (0) | 2024.09.26 |