PHP實現非同步延遲訊息佇列(庫存歸還)

語言: CN / TW / HK

點選進入“PHP開源社群”    

免費獲取進階面試、文件、影片資源

一、前言

需求:電商秒殺場景中,如果使用者下單10分鐘未支付,需要進行庫存歸還

本篇是用PHP+Laravel+RabbitMQ來實現非同步延遲訊息佇列

二、場景

在電商專案中,當我們下單之後,一般需要 20 分鐘之內或者 30 分鐘之內付款,否則訂單就會進入異常處理邏輯中,被取消,那麼進入到異常處理邏輯中,就可以當成是一個延遲佇列

公司的會議預定系統,在會議預定成功後,會在會議開始前半小時通知所有預定該會議的使用者

安全工單超過 24 小時未處理,則自動拉企業微信群提醒相關責任人

使用者下單外賣以後,距離超時時間還有 10 分鐘時提醒外賣小哥即將超時

很多場景下我們都需要延遲佇列。

本文以 RabbitMQ 為例來和大家聊一聊延遲佇列的玩法。

使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 外掛來實現定時任務,這種方案較簡單。

三、安裝RabbitMQ延遲佇列外掛

官網外掛下載地址:https://www.rabbitmq.com/community-plugins.html

我這裡直接下載了最新版本,你們根據自己的rabbitmq版本號進行下載

把下載好的檔案移動到rabbitmq的外掛plugins下,以我自己的Mac為例子,放到了如下路徑

然後執行安裝外掛指令,如下

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

最後重啟rabbitmq服務,並重新整理檢視exchanges交換機有沒有該外掛

如上圖則延遲訊息佇列外掛安裝完成

四、在Laravel框架中進行使用

新建rabbitmq服務類,包含延遲訊息佇列生產訊息,和消費訊息,如下

程式碼如下:

<?php



namespace App\Http\Controllers\Service;



use PhpAmqpLib\Connection\AMQPStreamConnection;

use PhpAmqpLib\Message\AMQPMessage;

use PhpAmqpLib\Wire\AMQPTable;



class RabbitmqServer

{

private $host = "127.0.0.1";

private $port = 5672;

private $user = "guest";

private $password = "guest";



private $msg;

private $channel;

private $connection;



// 過期時間

const TIMEOUT_5_S = 5; // 5s

const TIMEOUT_10_S = 10; // 10s



private $exchange_logs = "logs";

private $exchange_direct = "direct";

private $exchange_delayed = "delayed";



private $queue_delayed = "delayedQueue";



const EXCHANGETYPE_FANOUT = "fanout";

const EXCHANGETYPE_DIRECT = "direct";

const EXCHANGETYPE_DELAYED = "x-delayed-message";



public function __construct($type = false)

{

$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);

$this->channel = $this->connection->channel();

// 宣告Exchange

$this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false, false, false, new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT]));

$this->channel->queue_declare($this->queue_delayed, false, true, false, false);

$this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed, $this->queue_delayed);

}



/**

* delay creat message

*/

public function createMessageDelay($msg, $time)

{

$delayConfig = [

'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,

'application_headers' => new AMQPTable(['x-delay' => $time * 1000])

];

$msg = new AMQPMessage($msg, $delayConfig);

return $msg;

}



/**

* delay send message

*/

public function sendDelay($msg, $time = self::TIMEOUT_10_S)

{

$msg = $this->createMessageDelay($msg, $time);;

$this->channel->basic_publish($msg, $this->exchange_delayed, $this->queue_delayed);

$this->channel->close();

$this->connection->close();

}



/**

* delay consum

*/

public function consumDelay()

{

$callback = function ($msg) {

echo ' [x] ', $msg->body, "\n";

$this->channel->basic_ack($msg->delivery_info['delivery_tag'], false);

};

$this->channel->basic_qos(null, 1, null);

$this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback);

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

while (count($this->channel->callbacks)) {

$this->channel->wait();

}



$this->channel->close();

$this->connection->close();

}

}



比如新建QueueController控制器,進行測試生產訊息放到延遲訊息佇列中

程式碼如下:

<?php



namespace App\Http\Controllers\Api\v1;



use App\Http\Controllers\Controller;

use App\Http\Controllers\Service\RabbitmqServer;

use App\Jobs\Queue;

use Illuminate\Http\Request;



class QueueController extends Controller

{

//

public function index(Request $request)

{

//比如說現在是下訂單操作

//需求:如果使用者10分鐘之內不支付訂單就要取消訂單,並且庫存歸還

$msg = $request->post();

$Rabbit = new RabbitmqServer("x-delayed-message");

//第一個引數傳送的訊息,第二個引數延遲多少秒

$Rabbit->sendDelay(json_encode($msg),5);

}

}



至此通過介面除錯工具進行模擬生產訊息即可

訊息生產完畢要進行消費,這裡使用的是Laravel的任務排程,程式碼如下

<?php



namespace App\Console\Commands;



use App\Http\Controllers\Service\RabbitmqServer;

use Illuminate\Console\Command;

use PhpAmqpLib\Connection\AMQPStreamConnection;

use PhpAmqpLib\Message\AMQPMessage;



class RabbitmqConsumerCommand extends Command

{

/**

* The name and signature of the console command.

*

* @var string

*/

protected $signature = 'rabbitmq_consumer';//給消費者起個command名稱



/**

* The console command description.

*

* @var string

*/

protected $description = 'Command description';



/**

* Create a new command instance.

*

* @return void

*/

public function __construct()

{

parent::__construct();

}



/**

* Execute the console command.

* @return int

*/

public function handle()

{

$Rabbit = new RabbitmqServer("x-delayed-message");

$Rabbit->consumDelay();

}

}



五、執行生產訊息和消費訊息

用postman模擬生產訊息,效果如下:

然後消費訊息,用一下命令,如果延遲5秒執行消費則成功

至此,就完成了rabbitmq非同步延遲訊息佇列

*宣告:本文於網路整理,版權歸原作者所有,如來源資訊有誤或侵犯權益,請聯絡我們刪除或授權事宜

END

PHP開源社群

掃描關注  進入”PHP資料“

免費獲取進階

面試、文件、影片資源

點選“檢視原文”獲取更多