package pre.example.mq.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import com.rabbitmq.client.Channel;
@Configuration
public class AmqpConfig {
private final static Logger LOG = LoggerFactory.getLogger(AmqpConfig.class);
@Value("${rabbitmq.app.reciveQueue}")
private String recieveQueue;
@Value("${rabbitmq.ip}")
private String rabbitmqIp;
@Value("${rabbitmq.user}")
private String rabbitmqUser;
@Value("${rabbitmq.password}")
private String rabbitmqPassword;
@Value("${rabbitmq.virtualHost}")
private String virtualHost;
public static final String EXCHANGE = "spring-boot-exchange";
* 不管是发送端还是接收端,都需要创建这个
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(rabbitmqIp);
connectionFactory.setUsername(rabbitmqUser);
connectionFactory.setPassword(rabbitmqPassword);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
* 发送者发送消息需要RabbitTemplate提供api,此处消费者不需要
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
* 针对消费者配置 1. 设置交换机类型 2. 将队列绑定到交换机 FanoutExchange:
* 将消息分发到所有的绑定队列,无routingkey的概念 HeadersExchange :通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列 TopicExchange:多关键字匹配
*/
@Bean
public TopicExchange defaultExchange() {
return new TopicExchange(EXCHANGE);
}
@Bean
public Queue queue() {
return new Queue(recieveQueue, true);
}
* 这里有必要记录一下,这里queue跟exchange绑定了,
* 也就是生产者发消息到该exchange后,
* exchange会根据routingKey关键字发送到符合规则的queue中去,
* 如果不符合则丢弃,如果上rabbitmq的管理后台看的话可以看到即便项目关闭,
* exchang和queue还是存在绑定关系,看到defaultExchange()方法中,
* 创建的topicExchange使用了spring-boot-exchange这个名字,
* 如果临时想换个类型的交换机,则需要使用不同的exchang名字,
* 不然的话会报错,或者直接到管理后台把exchang或删了
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange())
.with("info.red.#");
}
* 消费者接收消息并作出应答,确保消息不会因为消费者的奔溃而被丢弃
*/
@Bean
public SimpleMessageListenerContainer messageContainer() {
LOG.info("recieveQueue:" + recieveQueue + "rabbitmqIp:" + rabbitmqIp);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel)
throws Exception {
byte[] body = message.getBody();
LOG.info("receive msg : " + new String(body));
System.out.println("receive msg : " + new String(body));
try {
channel.basicAck(message.getMessageProperties()
.getDeliveryTag(), false);
} catch (Exception e) {
LOG.error("消费队列失败:" + "", e);
channel.basicNack(message.getMessageProperties()
.getDeliveryTag(), false, false);
}
}
});
return container;
}
}