web前端开发师培训,前端校招面试题及解析大全

基于axios封装请求

  返回  

SpringBoot+RabbitMQ实现消息队列的各种模式

2021/7/21 18:15:01 浏览:

文章目录

    • 依赖:
    • 配置文件:
  • 简单模式:
  • 工作队列模式;
  • 发布订阅模式
  • 路由Routing模式
  • 通配符模式

依赖:

 <dependencies>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置文件:

spring.rabbitmq.addresses=xxx.xxx.xxx.xxx
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

简单模式:

在这里插入图片描述
配置类:,因为只有一个队列,所以配置类中只写一个就行

@Component
@Slf4j
public class ConfirmConsumer {
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    @RabbitListener(queues =CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message){
        String msg=new String(message.getBody());
        log.info("接受到队列 confirm.queue 消息:{}",msg);
    }
}

消息生产者:

@RestController
@RequestMapping("/simple")
@Slf4j
public class Simple_Producer {

    private static final String QUEUE_NAME="simple.queue";

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public String send(@PathVariable("message")String message){
        log.info("准备发送罅隙:"+message);
        rabbitTemplate.convertAndSend(QUEUE_NAME,message.getBytes());
        return "消息已经发送成功";
    }
}

消息消费者:

@Component
@Slf4j
public class SimpleConsumer {

    @RabbitListener(queues = "simple.queue")
    public void receive(Message message){
        log.info("消费者收到了"+new String(message.getBody()));
    }
}

此时,当我们访问http://localhost:8080/simple/sendMessage/hello,rabbit 时:
在这里插入图片描述

工作队列模式;

在这里插入图片描述
无非就是两个消费者都接收同一个队列,只需要修改一下消费者端即可:

@Component
@Slf4j
public class Work_Consumer {

    @RabbitListener(queues = "work.queue")
    public void receive1(Message message){
        log.info("消费者1收到的消息是"+new String(message.getBody()));
    }
    @RabbitListener(queues = "work.queue")
    public void receive2(Message message){
        log.info("消费者2收到的消息是"+new String(message.getBody()));
    }
}

运行之后:
在这里插入图片描述
可以发现两个消费者是通过轮询来分配消息,而且每个消息只能消费一次

发布订阅模式

在这里插入图片描述
这个模式可以解决工作模式中消息只能消费一次的问题
配置类:

import org.springframework.amqp.core.*;


@Configuration
public class PSConfig {
    private static final String EXCHANGE_NAME ="ps.exchange";
    private static final String QUEUE_NAME1="ps1.queue";
    private static final String QUEUE_NAME2="ps2.queue";

    @Bean("PSExchange")
    public Exchange PSExchange(){
        return new FanoutExchange(EXCHANGE_NAME);
    }
    @Bean("q1")
    public Queue q1(){
        return QueueBuilder.durable(QUEUE_NAME1).build();
    }
    @Bean("q2")
    public Queue q2(){
        return QueueBuilder.durable(QUEUE_NAME2).build();
    }
    @Bean
    public Binding bind1(@Qualifier("PSExchange") Exchange exchange,@Qualifier("q1")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();

    }
    @Bean
    public Binding bind2(@Qualifier("PSExchange") Exchange exchange,@Qualifier("q2")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
}

消息生产者:

@RestController
@RequestMapping("/ps")
@Slf4j
public class PS_Producer {
    private static final String EXCHANGE_NAME ="ps.exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public String send(@PathVariable("message")String message){
        rabbitTemplate.convertAndSend(EXCHANGE_NAME,"",message);
        log.info("消息: "+message+"  已经发送成功");
        return "消息已经发送";
    }
}

消息消费者:

import org.springframework.amqp.core.Message;

@Component
@Slf4j
public class PS_Consumer {

    @RabbitListener(queues = "ps1.queue")
    public void receive1(Message message){
        log.info("消费者1收到消息: "+new String(message.getBody()));
    }

    @RabbitListener(queues = "ps2.queue")
    public void receive2(Message message){
        log.info("消费者2收到消息: "+new String(message.getBody()));
    }
}

执行结果如下:
在这里插入图片描述

路由Routing模式

在这里插入图片描述
如图,我们可以发现,路由Routing模式和发布确认模式的区别就是使用了 Routing Keys ,而前面的发布确认的Routingkeys都设置成了 “” .

配置类,注意交换机要使用Direct类型的交换机

import org.springframework.amqp.core.*;
@Configuration
public class RoutConfig {
    private static final String EXCHANGE_NAME ="rout.exchange";
    private static final String QUEUE_NAME1="rout.queue";
    private static final String QUEUE_NAME2="rout.queue";

    @Bean("rq1")
    public Queue queue1(){
       return  QueueBuilder.durable(QUEUE_NAME1).build();
    }
    @Bean("rq2")
    public Queue queue2(){
       return  QueueBuilder.durable(QUEUE_NAME2).build();
    }
    @Bean("rexchange")
    public Exchange RExchange(){
        return new DirectExchange(EXCHANGE_NAME);
    }
    @Bean
    public Binding getbind1(@Qualifier("rq1")Queue queue,@Qualifier("rexchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("q1").noargs();
    }
    @Bean
    public Binding getbind2(@Qualifier("rq2")Queue queue,@Qualifier("rexchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("q2").noargs();
    }
}

消息生产者类:

@RestController
@RequestMapping("/Rout")
@Slf4j
public class RoutProducer {
    private static final String EXCHANGE_NAME ="rout.exchange";
    private static final String QUEUE_NAME1="rout.queue";
    private static final String QUEUE_NAME2="rout.queue";
    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public String send(@PathVariable("message")String message){
        if(message.equals("5")){
            log.info("消息是 "+message+"发送到消费者2");
            rabbitTemplate.convertAndSend(EXCHANGE_NAME,"q2",message);
        }else{
            log.info("消息是 "+message+"发送到消费者1");
            rabbitTemplate.convertAndSend(EXCHANGE_NAME,"q1",message);
        }
        return "消息已经发送";
    }
}

消息消费者类:

@Component
@Slf4j
public class RoutConsumer {
    private static final String QUEUE_NAME1="rout.queue";
    private static final String QUEUE_NAME2="rout.queue";


    @RabbitListener(queues = QUEUE_NAME1)
    public void listen1(Message message){
        log.info("消费者1收到消息 : "+new String(message.getBody()));
    }

    @RabbitListener(queues = QUEUE_NAME2)
    public void listen2(Message message){
        log.info("消费者2收到消息: "+new String(message.getBody()));
    }
}

我们配置的是如果消息是 5则发给消费者2,否则发给消费者1,运行如下:
在这里插入图片描述
如图,满足我们的要求

通配符模式

通配符模式和上面的路由模式差别不大,就是交换机的类型要变成Topic,然后绑定时的 RoutingKeys 可以设置 通配符:

*(星号)可以代替一个单词,#(井号)可以替代零个或多个单词

  • 中间带 orange 带 3 个单词的字符串(.orange.)
  • 最后一个单词是 rabbit 的 3 个单词(..rabbit)
  • 第一个单词是 lazy 的多个单词(lazy.#)

联系我们

如果您对我们的服务有兴趣,请及时和我们联系!

服务热线:18288888888
座机:18288888888
传真:
邮箱:888888@qq.com
地址:郑州市文化路红专路93号