SpringMVC消費RabbitMQ隊列

語言: CN / TW / HK

1.環境介紹

  • Jdk1.8.0_202
  • Spring 4.3.9.RELEASE
  • spring-webmvc 4.3.9.RELEASE

2.添加Maven依賴

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.6.0</version>
</dependency>
<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-amqp</artifactId>
   <version>2.0.12.RELEASE</version>
</dependency>
<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-rabbit</artifactId>
   <version>2.0.12.RELEASE</version>
</dependency>
<dependency>
   <groupId>org.springframework.retry</groupId>
   <artifactId>spring-retry</artifactId>
   <version>1.2.4.RELEASE</version>
</dependency>

3.添加RabbitMQ配置

在jdbc.properties文件中加入RabbitMQ地址端口用户密碼等信息
#RabbitMQ地址
mq_host=127.0.0.1
#RabbitMQ端口
mq_port=5672
#RabbitMQ虛擬主機
mq_virtual_host=/
#RabbitMQ用户名
mq_username=admin
#RabbitMQ密碼
mq_password=your rabbitmq password

4.創建RabbitMQ配置文件

新建 applicationContext-rabbitmq.xml

內容如下
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/rabbit
 http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd" >
    <description>RabbitMQ 連接服務配置</description>
    <!-- 連接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${mq_host}" username="${mq_username}" password="${mq_password}" port="${mq_port}" virtual-host="${mq_virtual_host}"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- rabbit template聲明-->
    <rabbit:template exchange="capture_exchanges" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
    <!-- 消息對象json轉換類 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
    <!-- 申明消息隊列Queue -->
    <!--
     durable:是否持久化
     exclusive: 僅創建者可以使用的私有隊列,斷開後自動刪除
     auto_delete: 當所有消費客户端連接斷開後,是否自動刪除隊列
     ignore-declaration-exceptions : [非常重要] 當前隊列已經創建,則忽略;消費已有隊列時需增加此配置,否則啟動報錯
     -->
    <!--<rabbit:queue id="mail" name="mail" durable="true" auto-delete="false" exclusive="false" />-->
    <rabbit:queue id="capture-000001" name="capture-000001" durable="true" ignore-declaration-exceptions="true" auto-delete="false" exclusive="false" />
    <!-- 交換機定義 -->
    <!--
     rabbit:direct-exchange:定義exchange模式為direct,意思就是消息與一個特定的路由鍵完全匹配,才會轉發。
     rabbit:binding:設置消息queue匹配的key
     -->
    <rabbit:direct-exchange name="capture_exchanges" durable="true" auto-delete="false"  ignore-declaration-exceptions="true" id="capture_exchanges">
        <rabbit:bindings>
            <!--<rabbit:binding queue="mail" key="mail"/>-->
            <rabbit:binding queue="capture-000001" key="000001"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 配置監聽
       acknowledeg = "manual" 設置手動應答 當消息處理失敗時:會一直重發 直到消息處理成功,需要自己手動提交;否則不會繼續消費;Auto自動應答,則會繼續往下消費
       prefetch ="1" 設置消息消費數量,每次從隊列中取1條消息;多個監聽器公平分發,同一時刻僅有一條消息處理
       concurrency = "1" 設置每個listener併發的消費者個數
    -->
    <rabbit:listener-container connection-factory="connectionFactory" transaction-size="1" prefetch="1" concurrency="1" acknowledge="auto" message-converter="jsonMessageConverter" monitor-interval="5000">
        <!-- 配置監聽器
          queues:監聽的隊列,多個的話用逗號(,)分隔
          ref:監聽器,需實現監聽器接口,@Component加入Spring管理
        -->
        <!--<rabbit:listener queues="mail" ref="mailListener"/>-->
        <rabbit:listener queues="capture-000001" ref="plateNumRabbitConsumer" />
    </rabbit:listener-container>

</beans>

5.創建監聽器消費者

創建與ref="plateNumRabbitConsumer"對應的消費者類,必須添加註解@Component加入Spring管理,實例名可自定義,但必須與ref關聯的名字一致;消費者類必須實現MessageListener,監聽隊列的變化,實現實時消費;

遇到的問題

  • 同一個項目中若同時存在生產者和消費者,applicationContext-rabbitmq.xml中聲明的工廠,路由,隊列都無特別注意的地方;生產者需要使用到rabbit:template,消費者不需要此配置即可行
  • 若只需消費隊列,在聲明隊列時,若rabbitmq服務中已創建過同名隊列,則必須配置ignore-declaration-exceptions="true",否則項目啟動時將報錯
  • 監聽容器的配置,需注意prefetch="1" 和 acknowledge="auto",prefetch表示單次從隊列中消費的數量,prefetch=”1”,説明單次消費1條消息,若容器中配置多個監聽器消費,則會公平分發;acknowledge="auto" 表示自動應答,消費完成後回覆確認,改消息將從rabbitmq服務中刪除,並且會自動消費下一條消息,acknowledge=" manual"表示手動應答 當消息處理失敗時:會一直重發 直到消息處理成功,需要自己手動提交;否則不會繼續消費;手動應答方式需要配置template的confirm-callback回調方法;