RabbitMQ Bridge後臺系統開發

語言: CN / TW / HK

​想了解更多關於開源的內容,請訪問:​

​51CTO 開源基礎軟體社群​

​https://ost.51cto.com​

前言

前面幾篇文章已經簡單寫了關於RabbitMQ安裝,使用,結合SpringBoot使用流程,有了前面的基礎知識了,我們現在開始開發一個完整,可以直接使用到生產上的MQBridge後臺系統,建立SpringBoot專案,這裡就不詳細說了,主要講解MQBridge專案的開發過程,我畫了一個流程圖,整體說明MQBridge的功能和流程。

第一步項 目依賴

專案依賴哪些第三方包,專案結構說明:

<dependencies>
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

第二步 公共實體類和工具類

引數DefaultMessage說明:

@Data
@ToString
public class DefaultMessage implements Serializable {
    // 標識不同功能模組, 如: 訂單、物流
    private String source;
    // 標識不同路由資訊,如: 訂單,物流
    private String action;
    // 引數
    private Map<String, Object> data;
}

返回類StandardResponse說明:

@Data
public class StandardResponse<T> implements Serializable {
    // 狀態碼
    private String statusCode;
    // 狀態描述
    private String statusDesc;
    // 響應結果
    private Boolean success;
    // 返回內容
    private T data;
}

第三步 傳送Controller

  1. 對外提供傳送訊息Controller:
@RestController
@Slf4j
public class BridgeController {
    @Autowired
    BridgeService bridgeService;
    @PostMapping("/sendMessage")
    public StandardResponse sendMessage(@RequestBody DefaultMessage message) {
        log.info("[sendMessage] params: " + message);
        return bridgeService.sendMessage(message);
    }
}

2. MQBridge服務介面與實現類:

/**
 * Bridge橋接傳送訊息介面
 */
public interface BridgeService {
    // MQ橋接傳送訊息
    StandardResponse sendMessage(DefaultMessage message);
}
@Slf4j
@Service
public class BridgeServiceImpl implements BridgeService {
    /**
     * 快取RabbitMQ訊息生產者
     */
    @Autowired
    private Map<String, MessageSender> senderMap;

    @Override
    public StandardResponse sendMessage(DefaultMessage message) {
        log.info("[sendMessage] params: " + message);
        try {
            // 根據不同source呼叫不同的訊息生產者
            return senderMap.get(message.getSource()).send(message);
        }catch (Exception e) {
            e.printStackTrace();
            log.info("[sendMessage] Error: " + e.getMessage());
            StandardResponse standardResponse = new StandardResponse();
            standardResponse.setSuccess(false);
            standardResponse.setStatusCode("101");
            standardResponse.setStatusDesc("Not Find Service.");
            return standardResponse;
        }
    }
}

第四步 訊息生產者介面與父類實現

/**
 * RabbitMQ訊息生產介面
 */
public interface MessageSender {
    // 傳送RabbitMQ訊息
    StandardResponse send(DefaultMessage defaultMessage);
}
/**
 * RabbitMQ訊息生產父類
 */
@Slf4j
public class BaseMessageSender implements MessageSender {
    @Autowired
    RabbitTemplate rabbitTemplate;
    // 子類注入交換機名稱
    protected String exchange;
    @Override
    public StandardResponse send(DefaultMessage defaultMessage) {
        StandardResponse standardResponse = new StandardResponse();
        try {
            log.info("{} Sender...",defaultMessage.getAction());
            // 根據引數Action,傳送訊息(交換機,路由主鍵, 引數)
            rabbitTemplate.convertAndSend(exchange, defaultMessage.getAction(), defaultMessage);
            standardResponse.setSuccess(true);
            standardResponse.setStatusCode("100");
            standardResponse.setStatusDesc("Send Success");
            return standardResponse;
        }catch (Exception e) {
            e.printStackTrace();
            log.error("convertAndSend Error: " + e.getMessage());
            standardResponse.setSuccess(false);
            standardResponse.setStatusCode("102");
            standardResponse.setStatusDesc("Send Error.");
            return standardResponse;
        }
    }
}

訊息消費者介面:

/**
 * RabbitMQ訊息消費介面
 */
public interface MessageHandler {
    // 處理RabbitMQ訊息
    StandardResponse handle(DefaultMessage message);
}

第五步 訂單訊息

訂單交換機、佇列、路由主鍵之間關係使用:

1. 建立一個訂單交換機。

/**
 * 訂單交換機
 */
@Slf4j
@Configuration
public class OrderTopicRabbitConfig {
    // 通過application.yml資原始檔定義訂單交換機名稱,並引入
    @Value("${rabbitmq.exchange.order}")
    private String orderExchange;
    // 建立訂單交換機,在生產者,消費者使用
    @Bean(name = "${rabbitmq.exchange.order}")
    public TopicExchange OrderTopicExchange() {
        log.info("[MQBridge.Order]主題交換機{}", orderExchange);
        return new TopicExchange(orderExchange,true,false);
    }
}

2. 訂單訊息生產者類,例項訊息生產者類(測試使用)。

/**
 * 訂單訊息生產者
 */
@Slf4j
@Service("MQBridge.Order")
public class OrderSender extends BaseMessageSender {
    // 通過application.yml資原始檔定義訂單交換機名稱,並引入
    @Value("${rabbitmq.exchange.order}")
    private String orderExchange;
    /**
     * 服務啟動時,把訂單交換機名稱注入到父類變數
     */
    @PostConstruct
    public void init() {
        this.exchange = orderExchange;
    }
}
/**
 * 例項訊息生產者
 */
@Slf4j
@Service("MQBridge.Sample")
public class SampleSender extends BaseMessageSender {
    // 通過application.yml資原始檔定義訂單交換機名稱,並引入
    @Value("${rabbitmq.exchange.order}")
    private String orderExchange;
    /**
     * 服務啟動時,把訂單交換機名稱注入到父類變數
     */
    @PostConstruct
    public void init() {
        this.exchange = orderExchange;
    }
}

3. 訂單Email消費者,訂單公共消費者(此消費在這裡說明萬用字元交換機其中一使用)。

/**
 * 訂單訊息消費者
 */
@Slf4j
@Service
@RabbitListener(queues = "MQBridge_Order_Email")
public class OrderEmailHandler implements MessageHandler {
    // 定義訂單佇列名
    private static final String QUEUE_NAME = "MQBridge_Order_Email";
    // 建立訂單佇列物件
    @Bean(QUEUE_NAME)
    public Queue OrderTopicQueueEmail() {
        log.info("Order主題佇列名{}", QUEUE_NAME);
        return new Queue(QUEUE_NAME,true);
    }
    // 訂單佇列物件與訂單交換機繫結,路由主鍵為MQBridge.Order.Email
    @Bean
    Binding bindingOrderTopicEmail(@Qualifier(QUEUE_NAME) Queue queue,
                                   @Qualifier("${rabbitmq.exchange.order}") TopicExchange exchange) {
        log.info("Order主題佇列繫結到上交換機,佇列為{}", QUEUE_NAME);
        return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Order.Email");
    }
    /**
     * 消費訂單Email通知訊息
     * @param message
     * @return
     */
    @RabbitHandler
    @Override
    public StandardResponse handle(DefaultMessage message) {
        log.info("[OrderEmailHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Order.Email");
        log.info("[MQBridge_Order]-[MQBridge.Order.Email]消費引數:{}", message);
        return null;
    }
}
/**
 * 訂單訊息消費者
 */
@Slf4j
@Service
@RabbitListener(queues = "MQBridge_Order_Common")
public class OrderCommonlHandler implements MessageHandler {
    // 定義訂單佇列名
    private static final String QUEUE_NAME = "MQBridge_Order_Common";
    // 建立訂單佇列物件
    @Bean(QUEUE_NAME)
    public Queue OrderTopicQueueCommon() {
        log.info("Order主題佇列名{}", QUEUE_NAME);
        return new Queue(QUEUE_NAME,true);
    }
    // 訂單佇列物件與訂單交換機繫結,路由主鍵為MQBridge.Order.Email
    @Bean
    Binding bindingOrderTopicCommon(@Qualifier(QUEUE_NAME) Queue queue,
                                   @Qualifier("${rabbitmq.exchange.order}") TopicExchange exchange) {
        log.info("Order主題佇列繫結到上交換機,佇列為{}", QUEUE_NAME);
        return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Order.#");
    }
    /**
     * 消費訂單公共通知訊息
     * @param message
     * @return
     */
    @RabbitHandler
    @Override
    public StandardResponse handle(DefaultMessage message) {
        log.info("[OrderCommonlHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Order.#");
        log.info("[MQBridge_Order]-[MQBridge.Order.#]消費引數:{}", message);
        return null;
    }
}

第六步 物流訊息

物流交換機、佇列、路由主鍵之間關係使用:

1. 建立一個物流交換機。

/**
 * 物流交換機
 */
@Slf4j
@Configuration
public class LogisticsTopicRabbitConfig {
    // 通過application.yml資原始檔定義物流交換機名稱,並引入
    @Value("${rabbitmq.exchange.logistics}")
    private String logisticsExchange;
    // 建立物流交換機,在生產者,消費者使用
    @Bean(name = "${rabbitmq.exchange.logistics}")
    public TopicExchange LogisticsTopicExchange() {
        log.info("[MQBridge.Logistics]主題交換機{}", logisticsExchange);
        return new TopicExchange(logisticsExchange,true,false);
    }
}

2. 物流訊息生產者:

/**
 * 物流訊息生產者
 */
@Slf4j
@Service("MQBridge.Logistics")
public class LogisticsSender extends BaseMessageSender {
    // 通過application.yml資原始檔定義物流交換機名稱,並引入
    @Value("${rabbitmq.exchange.logistics}")
    private String logisticsExchange;
    /**
     * 服務啟動時,把物流交換機名稱注入到父類變數
     */
    @PostConstruct
    public void init() {
        this.exchange = logisticsExchange;
    }
}

3. 物流訊息消費者:

/**
 * 物流訊息消費者
 */
@Slf4j
@Service
@RabbitListener(queues = "MQBridge_Logistics_Email")
public class LogisticsEmailHandler implements MessageHandler {
    // 定義物流佇列名
    private static final String QUEUE_NAME = "MQBridge_Logistics_Email";
    // 建立物流佇列物件
    @Bean(QUEUE_NAME)
    public Queue LogisticsTopicQueue() {
        log.info("Logistics主題佇列名{}", QUEUE_NAME);
        return new Queue(QUEUE_NAME,true);
    }
    // 物流佇列物件與物流交換機繫結,路由主鍵為MQBridge.Logistics.Email
    @Bean
    Binding bindingLogisticsTopic(@Qualifier(QUEUE_NAME) Queue queue,
                                   @Qualifier("${rabbitmq.exchange.logistics}") TopicExchange exchange) {
        log.info("Logistics主題佇列繫結到上交換機,佇列為{}", QUEUE_NAME);
        return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Logistics.#");
    }
    /**
     * 消費物流Email通知訊息
     * @param message
     * @return
     */
    @RabbitHandler
    @Override
    public StandardResponse handle(DefaultMessage message) {
        log.info("[LogisticsEmailHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Logistics.#");
        log.info("[MQBridge_Logistics_Email]消費引數:{}", message);
        return null;
    }
}

第七步 小結

從專案結構和程式碼可以看出,第五步和第六步是不同的業務功能,我們通過傳遞引數Source不同,呼叫不同的業務邏輯功能,之後如果新增新的模組,就可以參考第五步或第六步就可以實現,然後呼叫時,引數Source指定為新值就可以;這裡再說一下Source引數值與Service是怎麼對應的,在LogisticsSender類,OrderSender類,SampleSender類上都有一個註解@Service(“MQBridge.Order”), 括號裡的字串就是對應引數的Source值了。

第八步  除錯

1.  Source為MQBridge.Order呼叫。

2.  Source為MQBridge.Sample呼叫 。

3.  Source為MQBridge.Logistics呼叫 。

最後總結

從除錯和列印日誌可以看出MQBridge專案,可以很方便新增新功能,下圖是除錯的三次日誌。

​想了解更多關於開源的內容,請訪問:​

​51CTO 開源基礎軟體社群​

​https://ost.51cto.com​ ​。