消息隊列的概念、原理、實現(xiàn)方式
概念
- 隊列結(jié)構(gòu)的一個中間件
- 不需要立即消費消息
- 由消費者或者訂閱者進行按順序消費
基本的流程圖如下所示
- 流程
應用場景
- 冗余
- 解耦
- 流量削峰
- 異步通信
實現(xiàn)方式
- mysql:可靠、速度慢
- redis:速度快,對于大消息包處理較慢
- 消息系統(tǒng):可靠、專業(yè)性強
消息的觸發(fā)機制
- 死循環(huán)的方式,故障時無法及時恢復
- 定時任務:壓力均分、但是處理量有上限
- 守護進程的方式
解耦 (訂單和配送系統(tǒng))
-
架構(gòu)設計1 采用定時任務的方式
php入門到就業(yè)線上直播課:進入學習
Apipost = Postman + Swagger + Mock + Jmeter 超好用的API調(diào)試工具:點擊使用 -
使用配送處理系統(tǒng)進行處理時,將當前數(shù)據(jù)庫里需要處理的訂單狀態(tài)更新為2,待處理完成后將狀態(tài)設為1
-
可以每次指定更新多少條數(shù)據(jù)
流量削鋒 (redis實現(xiàn)秒殺)
-
使用隊列的數(shù)據(jù)結(jié)構(gòu)
- lpush/rpush 將數(shù)據(jù)放入列表中
- lpop/rpop 將數(shù)據(jù)移除列表并獲取到移除的值
- ltrim 保留指定區(qū)間內(nèi)的元素
- llen 獲取列表長度
- lset 通過索引設置列表的值
- lindex 通過索引獲取列表中的值
- lrange 獲取指定范圍的元素
-
圖示如下
-
代碼流程如下
-
秒殺程序?qū)⒄埱髮懭雛edis(uid,time)
-
檢查redis列表存放的長度,超過10個直接舍棄
-
通過死循環(huán)讀取redis數(shù)據(jù),并存入數(shù)據(jù)庫
// Spike.php 秒殺程序if(Redis::llen('lottery') < 10){ // 成功 Redis::lpush('lottery', $uid.'%'.microtime());}else{ // 失敗}
登錄后復制// Warehousing.php 入庫程序while(true){ $user = Redis::rpop('lottery'); if (!$user || $user == 'nil') { sleep(2); continue; } $user_arr = explode($user, '%'); $insert_user = [ 'uid' => $user_arr[0], 'time' => $user_arr[1] ]; $res = DB::table('lottery_queue')->insert($insert_user); if (!$res) { Redis::lpush('lottery', $user); }}
登錄后復制
-
-
上述代碼中假如并發(fā)過大的話會存在超賣的情況,此時可以使用文件鎖或者redis分布式鎖進行控制,先將商品放入redis list中 使用rpop進行取出,如果取不到則說明已經(jīng)賣完
-
具體的思路及偽代碼如下
// 先將商品放入redis中 $goods_id = 2; $sql = select id,num from goods where id = $goods_id; $res = DB::select($sql); if (!empty($res)) { // 也可以指定多少件 Redis::del('lottery_goods' . $goods_id); for($i=0;$i<$res['num'];$i++){ Redis::lpush('lottery_goods . $goods_id', $i); } LOG::info('商品存入隊列成功,數(shù)量:' . Redis::llen('lottery_goods . $goods_id')); } else { LOG::info($goods_id . '加入失敗'); }
登錄后復制// 開始秒殺 $count = Redis::rpop('lottery_goods' . $goods_id); if (!$count) { // 商品已搶完 ... } // 用戶搶購隊列 $user_list = 'user_goods_id_' . $goods_id; $user_status = Redis::sismember($user_list, $user_id); if ($user_status) { // 已搶過 ... } // 將搶到的放到列表中 Redis::sadd($user_list, $uid); $msg = '用戶:' . $uid . '順序' . $count; Log::info($msg); // 生成訂單等 ... // 減庫存 $sql = update goods set num = num -1 where id = $goods_id and num > 0; // 防止超賣 DB::update($sql) // 搶購成功
登錄后復制
rabbitmq
-
架構(gòu)及原理
其中P代表生產(chǎn)者,X為交換機(channal),C代表消費者 -
簡單使用
// Send.php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創(chuàng)建通道 $channel = $connection->channel(); // 聲明一個隊列 $channel->queue_declare('user_email', false, false, false, false); // 制作消息 $msg = new AMQPMessage('send email'); // 將消息推送到隊列 $channel->basic_publish($msg, '', 'user_email'); echo '[x] send email'; $channel->close(); $connection->close();
登錄后復制// Receive.php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); //創(chuàng)建通道 $channel = $connection->channel(); $channel->queue_declare('user_email', false, false, false, false); // 當收到消息時的回調(diào)函數(shù) $callback = function($msg){ //發(fā)送郵件 echo 'Received '.$msg->body.'n'; }; $channel->basic_consume('user_email', '', false, true, false, false, $callback); // 保持監(jiān)聽狀態(tài) while($channel->is_open()){ $channel->wait(); }
登錄后復制