PHP+Laravel+RabbitMQ實現非同步延遲訊息佇列(庫存歸還)

語言: CN / TW / HK

一、前言

需求:電商秒殺場景中,如果使用者下單10分鐘未支付,需要進行庫存歸還

本篇是用PHP+Laravel+RabbitMQ來實現非同步延遲訊息佇列

二、場景

  • 在電商專案中,當我們下單之後,一般需要 20 分鐘之內或者 30 分鐘之內付款,否則訂單就會進入異常處理邏輯中,被取消,那麼進入到異常處理邏輯中,就可以當成是一個延遲佇列
  • 公司的會議預定系統,在會議預定成功後,會在會議開始前半小時通知所有預定該會議的使用者
  • 安全工單超過 24 小時未處理,則自動拉企業微信群提醒相關責任人
  • 使用者下單外賣以後,距離超時時間還有 10 分鐘時提醒外賣小哥即將超時
  • ...

很多場景下我們都需要延遲佇列。\ 本文以 RabbitMQ 為例來和大家聊一聊延遲佇列的玩法。

使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 外掛來實現定時任務,這種方案較簡單。

三、安裝RabbitMQ延遲佇列外掛

官網外掛下載地址

image.png

我這裡直接下載了最新版本,你們根據自己的rabbitmq版本號進行下載

image.png

把下載好的檔案移動到rabbitmq的外掛plugins下,以我自己的Mac為例子,放到了如下路徑

image.png

然後執行安裝外掛指令,如下

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

image.png

最後重啟rabbitmq服務,並重新整理檢視exchanges交換機有沒有該外掛

image.png

如上圖則延遲訊息佇列外掛安裝完成

四、在Laravel框架中進行使用

新建rabbitmq服務類,包含延遲訊息佇列生產訊息,和消費訊息,如下

image.png

核心程式碼如下:

``` <?php

namespace App\Http\Controllers\Service;

use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable;

class RabbitmqServer { private $host = "127.0.0.1"; private $port = 5672; private $user = "guest"; private $password = "guest";

private $msg;
private $channel;
private $connection;

//  過期時間
const TIMEOUT_5_S = 5;     // 5s
const TIMEOUT_10_S = 10;    // 10s

private $exchange_logs = "logs";
private $exchange_direct = "direct";
private $exchange_delayed = "delayed";

private $queue_delayed = "delayedQueue";

const EXCHANGETYPE_FANOUT = "fanout";
const EXCHANGETYPE_DIRECT = "direct";
const EXCHANGETYPE_DELAYED = "x-delayed-message";

public function __construct($type = false)
{
    $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
    $this->channel = $this->connection->channel();
    // 宣告Exchange
    $this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false, false, false, new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT]));
    $this->channel->queue_declare($this->queue_delayed, false, true, false, false);
    $this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed, $this->queue_delayed);
}

/**
 * delay creat message
 */
public function createMessageDelay($msg, $time)
{
    $delayConfig = [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        'application_headers' => new AMQPTable(['x-delay' => $time * 1000])
    ];
    $msg = new AMQPMessage($msg, $delayConfig);
    return $msg;
}

/**
 * delay send message
 */
public function sendDelay($msg, $time = self::TIMEOUT_10_S)
{
    $msg = $this->createMessageDelay($msg, $time);;
    $this->channel->basic_publish($msg, $this->exchange_delayed, $this->queue_delayed);
    $this->channel->close();
    $this->connection->close();
}

/**
 * delay consum
 */
public function consumDelay()
{
    $callback = function ($msg) {
        echo ' [x] ', $msg->body, "\n";
        $this->channel->basic_ack($msg->delivery_info['delivery_tag'], false);
    };
    $this->channel->basic_qos(null, 1, null);
    $this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback);
    echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
    while (count($this->channel->callbacks)) {
        $this->channel->wait();
    }

    $this->channel->close();
    $this->connection->close();
}

} ```

比如新建QueueController控制器,進行測試生產訊息放到延遲訊息佇列中

image.png 程式碼如下:

``` <?php

namespace App\Http\Controllers\Api\v1;

use App\Http\Controllers\Controller; use App\Http\Controllers\Service\RabbitmqServer; use App\Jobs\Queue; use Illuminate\Http\Request;

class QueueController extends Controller { // public function index(Request $request) { //比如說現在是下訂單操作 //需求:如果使用者10分鐘之內不支付訂單就要取消訂單,並且庫存歸還 $msg = $request->post(); $Rabbit = new RabbitmqServer("x-delayed-message"); //第一個引數傳送的訊息,第二個引數延遲多少秒 $Rabbit->sendDelay(json_encode($msg),5); } } ```

至此通過介面除錯工具進行模擬生產訊息即可

訊息生產完畢要進行消費,這裡使用的是Laravel的任務排程,程式碼如下

image.png ``` <?php

namespace App\Console\Commands;

use App\Http\Controllers\Service\RabbitmqServer; use Illuminate\Console\Command; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage;

class RabbitmqConsumerCommand extends Command { /* * The name and signature of the console command. * * @var string / protected $signature = 'rabbitmq_consumer';//給消費者起個command名稱

/**
 * The console command description.
 *
 * @var string
 */
protected $description = 'Command description';

/**
 * Create a new command instance.
 *
 * @return void
 */
public function __construct()
{
    parent::__construct();
}

/**
 * Execute the console command.
 * @return int
 */
public function handle()
{
    $Rabbit = new RabbitmqServer("x-delayed-message");
    $Rabbit->consumDelay();
}

} ```

五、執行生產訊息和消費訊息

用postman模擬生產訊息,效果如下:

image.png

然後消費訊息,用一下命令,如果延遲5秒執行消費則成功

image.png

至此,就完成了rabbitmq非同步延遲訊息佇列

六、總結

程式碼已上傳git倉庫