Springboot redis 消息订阅发布

levis
2024-06-24 / 0 评论 / 7 阅读 / 正在检测是否收录...

/**
 * 事件类型
 */
public enum EventType {

    ORDER_CREATE("order_create","订单创建"),
    ORDER_CANCEL("order_cancel","订单取消"),
    ;

    private String type;
    private String name;

    EventType(String type, String name) {
        this.type = type;
        this.name = name;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
/**
 * 事件数据
 */
@Data
@Builder
public class EventData implements Serializable {
    private EventType eventType;
    private Object data;

    // 默认构造函数
    public EventData() {
    }

    @JsonCreator
    public EventData(@JsonProperty("eventType") EventType eventType,
                     @JsonProperty("data") Object data) {
        this.eventType = eventType;
        this.data = data;
    }
}
/**
 * 事件消费者
 */
@Slf4j
@Component
public class EventConsumer {
    private final Map<String, List<Consumer<EventData>>> eventContent = new HashMap<>();

    /**
     * 添加消费者
     *
     * @param eventType 事件类型
     * @param consumer  消费者
     */
    public void addConsumer(EventType eventType, Consumer<EventData> consumer) {
        log.info("添加消费者:{}", eventType.getType());
        eventContent.computeIfAbsent(eventType.getType(), k -> new ArrayList<>()).add(consumer);
    }

    /**
     * 获取消费者列表
     *
     * @param eventType 事件类型
     * @return 消费者列表
     */
    public List<Consumer<EventData>> getConsumers(EventType eventType) {
        return eventContent.getOrDefault(eventType.getType(), Collections.emptyList());
    }
}
/**
 * 订阅发布
 */
@Component
@Slf4j
public class PublishService {

    private static final long EXPIRE_TIME = 24 * 60 * 60;

    @Resource
    StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedisService redisService;

    private final static String MALL_MESSAGE_PREFIX = "mall:message:";

    /**
     * 发送消息
     */
    public void publish(EventData eventData) {
        log.info("====发布Redis订阅:{}====", JSON.toJSONString(eventData));
        String redisKey = MALL_MESSAGE_PREFIX + IdUtil.fastSimpleUUID();
        redisService.set(redisKey, eventData, EXPIRE_TIME);
        stringRedisTemplate.convertAndSend(eventData.getEventType().getType(), redisKey);
    }
}
@Configuration
public class RedisListenerConfig {

    @Resource
    EventConsumer eventConsumer;

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        for (EventType value : EventType.values()) {
            container.addMessageListener(listenerAdapter, new PatternTopic(value.getType()));
        }
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(SubscribeListener subscribeListener) {
        return new MessageListenerAdapter(subscribeListener);
    }

    // bean 实例化后添加consumer
    @PostConstruct
    public void init() {
        // 添加消费者
        eventConsumer.addConsumer(EventType.ORDER_CREATE, eventData -> {
            System.out.println("Handling order creation: " + eventData.getData());
        });

        eventConsumer.addConsumer(EventType.ORDER_CANCEL, eventData -> {
            System.out.println("Handling order cancellation: " + eventData.getData());
        });
    }

}
@Component
@Slf4j
public class SubscribeListener implements MessageListener {

    @Resource
    private EventConsumer eventConsumer;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedisService redisService;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            RedisSerializer<String> stringSerializer = stringRedisTemplate.getStringSerializer();
            String redisKey = stringSerializer.deserialize(message.getBody());
            if (redisKey != null) {
                String messageData = JSON.toJSONString(redisService.get(redisKey));
                EventData eventData = JSON.parseObject(messageData, EventData.class);
                log.info("====接收Redis订阅:{}====", JSON.toJSONString(eventData));
                if (eventData != null) {
                    List<Consumer<EventData>> consumers = eventConsumer.getConsumers(eventData.getEventType());
                    consumers.forEach(consumer -> consumer.accept(eventData));
                }
                redisService.del(redisKey);
            }
        } catch (Exception e) {
            log.warn("处理订阅消息时出错: ", e);
        }
    }
}
0

评论 (0)

取消