direct 类型的交换机路由规则是完全匹配 BindingKey 和 RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。
topic 类型的交换机在匹配规则上进行了扩展,它与 direct 类型的交换机类似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
模糊匹配举例:
Topic 模型如下图:
Topic 在 direct 模式上面进一步筛选
新建3个队列:topic_queue1、topic_queue2、topic_queue3
生产者
和路由模式不同:
public class Producer {
private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange";
public static void main(String[] args) throws Exception{
// 1. 获取连接
Connection connection = RabbitMqUtil.getConnection("生产者");
// 2. 通过连接获取通道 Channel
Channel channel = connection.createChannel();
// 3. 通过通道声明交换机,以及交换机类型为 direct
/**
* @param1:交换机名称
* @param2:交换机类型
*/
channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 4. 消息内容
String message = "Hello RabbitMQ Topic!!";
String routingKey = "com.rabbitmq.client";
// 5. 发送消息到交换机,并指定路由键 RoutingKey 为 com.rabbitmq.client
channel.basicPublish(TOPIC_EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("消息发送完成~~~发送的消息为:" + message);
// 6. 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
消费者
和路由模式不同:
public class Consumer {
private static final String TOPIC_QUEUE_NAME = "topic_queue1";
private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange";
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMqUtil.getConnection("消费者");
// 获取通道
Channel channel = connection.createChannel();
String bindingKey = "*.rabbitmq.*";
// 绑定队列到交换机,并指定一个绑定键 BindingKey
channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, bindingKey);
// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 消息体
String msg = new String(body,"utf-8");
System.out.println("收到消息:" + msg);
}
};
// 监听队列
channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
// 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
public class Consumer2 {
private static final String TOPIC_QUEUE_NAME = "topic_queue2";
private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange";
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMqUtil.getConnection("消费者");
// 获取通道
Channel channel = connection.createChannel();
String bindingKey = "*.*.client";
// 绑定队列到交换机,并指定一个绑定键 BindingKey
channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, bindingKey);
// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 消息体
String msg = new String(body,"utf-8");
System.out.println("收到消息:" + msg);
}
};
// 监听队列
channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
// 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容