发布/订阅模式:交换机类型为 fanout
,也称为广播模式,即:一个生产者对应多个消费者。它会把所有发送到该交换机的消息路由到所有与该交换机绑定的队列中(无视 BindingKey)
发布/订阅模式的简要说明如下:
发布/订阅模式结构如下图:
Exchange 类型有以下几种:
Exchange 类型详解请移步到之前的文章:
Exchange(交换机)只负责转发消息,不具备存储消息的能力。因此,如果没有任何队列与 Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
【注意】:路由键在 fanout 类型的交换机中不起作用
先添加两个队列 fanout_queue1
、fanout_queue2
,
再添加一个交换机 fanout_exchange,它的类型为 fanout。(相较于之前的两个模式,这里需要新建一个交换机,并不是使用的默认的交换机)
和前面两种模式不同:
public class Producer {
private static final String FANOUT_EXCHANGE_NAME = "code_fanout_exchange";
public static void main(String[] args) throws Exception{
// 1. 获取连接
Connection connection = RabbitMqUtil.getConnection("生产者");
// 2. 通过连接获取通道 Channel
Channel channel = connection.createChannel();
// 3. 通过通道声明交换机,以及交换机类型为 fanout
/**
* @param1:交换机名称
* @param2:交换机类型
*/
channel.exchangeDeclare(FANOUT_EXCHANGE_NAME, "fanout");
// 4. 消息内容
String message = "Hello RabbitMQ Fanout!!";
// 5. 发送消息到交换机
channel.basicPublish(FANOUT_EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("消息发送完成~~~发送的消息为:" + message);
// 6. 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
exchangeDeclare() 方法详解:声明交换机
exchangeDeclare() 方法有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成:
DeclareOk exchangeDeclare(String exchange,
String type, boolean durable,
boolean autoDelete, boolean internal,
Map<String, Object> arguments) throws IOException;
这个方法的返回值是 Exchange.DeclareOk
,用来标识成功声明了一个交换机。
各个参数详细说明如下:
消费者
和前面两种模式不同:
添加两个消费者,这两个消费者绑定不同的队列:fanout_queue1、fanout_queue2
public class Consumer {
private static final String FANOUT_QUEUE_NAME = "fanout_queue1";
private static final String FANOUT_EXCHANGE_NAME = "code_fanout_exchange";
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMqUtil.getConnection("消费者");
// 获取通道
Channel channel = connection.createChannel();
// 绑定队列到交换机
channel.queueBind(FANOUT_QUEUE_NAME, FANOUT_EXCHANGE_NAME, "");
// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 消息体
String msg = new String(body,"utf-8");
System.out.println("收到消息:" + msg);
}
};
// 监听队列
channel.basicConsume(FANOUT_QUEUE_NAME, true, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
// 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
Consumer2 代码与 Consumer 代码一样,只不过需要将 FANOUT_QUEUE_NAME
稍作修改
queueBind() 方法详解:将交换机与队列进行绑定
Queue.BindOk queueBind(String queue,
String exchange, String bindingKey,
Map<String, Object> argument) throws IOException;
方法中的各参数含义如下:
不同:
相同:
因篇幅问题不能全部显示,请点此查看更多更全内容