最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

全球快资讯:springboot 中使用 RabbitMQ 配置使用优先级队列

来源:博客园


【资料图】

RabbitMQ 支持优先级队列,当工作中有一些任务需要紧急优先处理,此时可以使用优先级队列通过设置 MQ 的 x-max-priority 属性可以将对列设置为优先级队列

配置文件类

@Slf4j@Getter@Configurationpublic class RabbitMQConfig {    @Value("${spring.rabbitmq.host}")    private String host;    @Value("${spring.rabbitmq.port}")    private int port;    @Value("${spring.rabbitmq.username}")    private String username;    @Value("${spring.rabbitmq.password}")    private String password;    @Value("${spring.rabbitmq.virtual-host}")    private String virtualHost;    @Value("${spring.rabbitmq.cache.channel.size:100}")    private int channelCacheSize;       // 单条链接channel 缓存大小    @Value("${spring.rabbitmq.cache.channel.checkout-timeout:5000}")    private long channelCheckTimeout;    //获取channel等待时间,毫秒    @Value("${app.accept-text-queue-name}")    private String acceptTextQueueName;    @Bean    public MessageConverter jsonConverter() {        return new Jackson2JsonMessageConverter();    }    @Bean    public CachingConnectionFactory connectionFactory() {        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();        connectionFactory.setHost(host);        connectionFactory.setPort(port);        connectionFactory.setUsername(username);        connectionFactory.setPassword(password);        connectionFactory.setVirtualHost(virtualHost);        connectionFactory.setChannelCheckoutTimeout(channelCheckTimeout);        connectionFactory.setChannelCacheSize(channelCacheSize);        connectionFactory.setPublisherConfirms(true);        connectionFactory.setPublisherReturns(true);        return connectionFactory;    }    @Bean(name = "jsonRabbitTemplate")    public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {        RabbitTemplate template = new RabbitTemplate(connectionFactory);        template.setMandatory(true);        template.setMessageConverter(jsonConverter());        template.setConfirmCallback((correlationData, ack, cause) -> {            if (!ack) {                log.error("confirm message error! correlationData: {}, cause: {}", correlationData, cause);            }        });        return template;    }    @Bean(name = "acceptSseConvertQueue")    public Queue acceptSseConvertQueue() {        Map args = new HashMap<>();        args.put("x-max-priority", 100);// 设置优先级最高为 100,数字越大优先级越高        return new Queue(acceptTextQueueName, true, false, false, args);    }}

设置预取数

在消费者中默认 containerFactory 为 SimpleRabbitListenerContainerFactory ,其中默认预取数为 250,该配置在 AbstractMessageListenerContainer 中的 DEFAULT_PREFETCH_COUNT如果需要每次消费一个消息,就需要在配置文件中指定预取数为 1, 配置项为

// 设置预取数为 1spring.rabbitmq.listener.simple.prefetch = 1

设置并发消费数

在 @RabbitListener 中,配置项 concurrency 来配置并发消费数

关键词: