/**
* 事件类型
*/
public enum EventType {
ORDER_CREATE("order_create","订单创建"),
ORDER_CANCEL("order_cancel","订单取消"),
;
private String type;
private String name;
EventType(String type, String name) {
this.type = type;
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
/**
* 事件数据
*/
@Data
@Builder
public class EventData implements Serializable {
private EventType eventType;
private Object data;
// 默认构造函数
public EventData() {
}
@JsonCreator
public EventData(@JsonProperty("eventType") EventType eventType,
@JsonProperty("data") Object data) {
this.eventType = eventType;
this.data = data;
}
}
/**
* 事件消费者
*/
@Slf4j
@Component
public class EventConsumer {
private final Map<String, List<Consumer<EventData>>> eventContent = new HashMap<>();
/**
* 添加消费者
*
* @param eventType 事件类型
* @param consumer 消费者
*/
public void addConsumer(EventType eventType, Consumer<EventData> consumer) {
log.info("添加消费者:{}", eventType.getType());
eventContent.computeIfAbsent(eventType.getType(), k -> new ArrayList<>()).add(consumer);
}
/**
* 获取消费者列表
*
* @param eventType 事件类型
* @return 消费者列表
*/
public List<Consumer<EventData>> getConsumers(EventType eventType) {
return eventContent.getOrDefault(eventType.getType(), Collections.emptyList());
}
}
/**
* 订阅发布
*/
@Component
@Slf4j
public class PublishService {
private static final long EXPIRE_TIME = 24 * 60 * 60;
@Resource
StringRedisTemplate stringRedisTemplate;
@Resource
private RedisService redisService;
private final static String MALL_MESSAGE_PREFIX = "mall:message:";
/**
* 发送消息
*/
public void publish(EventData eventData) {
log.info("====发布Redis订阅:{}====", JSON.toJSONString(eventData));
String redisKey = MALL_MESSAGE_PREFIX + IdUtil.fastSimpleUUID();
redisService.set(redisKey, eventData, EXPIRE_TIME);
stringRedisTemplate.convertAndSend(eventData.getEventType().getType(), redisKey);
}
}
@Configuration
public class RedisListenerConfig {
@Resource
EventConsumer eventConsumer;
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
for (EventType value : EventType.values()) {
container.addMessageListener(listenerAdapter, new PatternTopic(value.getType()));
}
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(SubscribeListener subscribeListener) {
return new MessageListenerAdapter(subscribeListener);
}
// bean 实例化后添加consumer
@PostConstruct
public void init() {
// 添加消费者
eventConsumer.addConsumer(EventType.ORDER_CREATE, eventData -> {
System.out.println("Handling order creation: " + eventData.getData());
});
eventConsumer.addConsumer(EventType.ORDER_CANCEL, eventData -> {
System.out.println("Handling order cancellation: " + eventData.getData());
});
}
}
@Component
@Slf4j
public class SubscribeListener implements MessageListener {
@Resource
private EventConsumer eventConsumer;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedisService redisService;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
RedisSerializer<String> stringSerializer = stringRedisTemplate.getStringSerializer();
String redisKey = stringSerializer.deserialize(message.getBody());
if (redisKey != null) {
String messageData = JSON.toJSONString(redisService.get(redisKey));
EventData eventData = JSON.parseObject(messageData, EventData.class);
log.info("====接收Redis订阅:{}====", JSON.toJSONString(eventData));
if (eventData != null) {
List<Consumer<EventData>> consumers = eventConsumer.getConsumers(eventData.getEventType());
consumers.forEach(consumer -> consumer.accept(eventData));
}
redisService.del(redisKey);
}
} catch (Exception e) {
log.warn("处理订阅消息时出错: ", e);
}
}
}
评论 (0)