在項目中rabbitmq得到了廣泛的時候,這里對rabbitmq的常規(guī)功能做了一個簡單的總結(jié),并封裝成了composer包,composer包地址、github地址,歡迎fork,由于水平有限,難免存在bug,歡迎提出寶貴意見。
easy-rabbitmq 包簡介
對php-amqplib/php-amqplib包的二次封裝,為常見功能提供一套開箱即用的生產(chǎn)解決方案。目前支持的功能列表如下:
-
推送消息到直連交換機(含延遲消息)
-
推送消息到扇形交換機(含延遲消息)
-
推送消息到主題交換機(含延遲消息)
-
訂閱模式下的可靠消費, 消費者消費失敗后將會嘗試繼續(xù)消費,最多嘗試5次。
-
拉取模式下的可靠消費, 消費者消費失敗后將會嘗試繼續(xù)消費,最多嘗試5次。
如果還有其它場景,歡迎繼續(xù)補充,隨后進行迭代?。?/p>
要求
安裝包對PHP版本對要求主要取決于php-amqplib/php-amqplib包本身對要求,這里為了兼顧php5.0的使用者,我們使用了php-amqplib/php-amqplib包V2.9.0的版本。
具體的要求參照這里。
不過筆者推薦使用php7.0及其以上版本, 這個開發(fā)包也是在7.0這個版本上面開發(fā)完成的!
安裝
composer require maweibinguo/easyrabbitmq
使用
在這里我們推薦php腳本+supervisor結(jié)合使用,用以保證消費進程的可靠性、增強worker的消費能力! 如果你還沒有聽說過supervisor,可以點擊這里了解.
1、推送消息
1-1、推送消息到直連交換機
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); //延遲消息,30 秒中后才會到達指定的交換機 $instance->pushToDirect( $msg = time(), //消息體內(nèi)容 $exchange = "easy_direct_exchange", //交換機名稱 $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致 $delaySec = 30 //延遲秒數(shù) ); //無延遲,推入到指定到直鏈交換機 $instance->pushToDirect( $msg = time(), //消息體內(nèi)容 $exchange = "easy_direct_exchange", //交換機名稱 $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致 );
1-2、推送消息到扇形交換機
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); //延遲消息,30 秒中后才會到達指定的交換機 $instance->pushToFanout( $msg = time(), //消息體內(nèi)容 $exchange = "easy_fanout_exchange", //交換機名稱 $delaySec = 30 //延遲秒數(shù) ); //無延遲,推入到指定到直鏈交換機 $instance->pushToFanout( $msg = time(), //消息體內(nèi)容 $exchange = "easy_fanout_exchange" //交換機名稱 );
1-3、推送消息到主題交換機
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); //延遲消息,30 秒中后才會到達指定的交換機 $instance->pushToTopic( $msg = time(), //消息體內(nèi)容 $exchange = "easy_topic_exchange", //交換機名稱 /** * routingKey 要同consum(get)方法的bindingKey相匹配 * bindingKey支持兩種特殊的字符"*"、“#”,用作模糊匹配, 其中"*"用于匹配一個單詞、“#”用于匹配多個單詞(也可以是0個) * 無論是bindingKey還是routingKey, 被"."分隔開的每一段獨立的字符串就是一個單詞, easy.topic.queue, 包含三個單詞easy、topic、queue */ $routingKey = "easy.topic.queue", $delaySec = 30 //延遲秒數(shù) ); //無延遲,推入到指定到直鏈交換機 $instance->pushToTopic( $msg = time(), //消息體內(nèi)容 $exchange = "easy_topic_exchange", //交換機名稱 $routingKey = "easy.topic.queue" );
2、消費消息
消費支持自動重試,最多嘗試重試5次,每次消費失敗后該消息將會被重新投入到消費隊列中。重新的時間將會隨著失敗的次數(shù)增多逐漸推移,本客戶端支持的推移策略如下:
失敗1次(1秒鐘后會再被投遞), 失敗2次(2秒鐘后會再被投遞), 失敗3次(4秒鐘后會再被投遞), 失敗4次(8秒鐘后會再被投遞), 失敗5次(16秒鐘后會再被投遞)
2-1、訂閱模式
訂閱模式下的可靠消費
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); $instance->consume( $queueName = "direct_test_queue",//訂閱的隊列名稱 $consumerTag = "c1",//消費標記 $exchange = "easy_direct_exchange",//交換機名稱 $bindingKey = "direct_test_queue",//bindingkey,如果是直鏈交換機需要同routingKey保持一致 $callback = function($msg){ $body = $msg->body; file_put_contents("./test.log", "time => " . time() . "t" . " body => " . $body . PHP_EOL , FILE_APPEND); //如果返回結(jié)果不絕對等于(===)true,那么將觸發(fā)消息重試機制 return false; }, //5次消費消費失敗后,失敗消息將會投遞到的隊列名稱 $failedQueue = "easymq@failed" );
2-2、拉取模式
拉取模式下的可靠消費
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); $instance->get( $queue = "get_queue", $exchange = "easy_fanout_exchange", $bindingKey = "", $callback = function($msg){ $body = $msg->body; file_put_contents("./test.log", "time => " . time() . "t" . " body => " . $body . PHP_EOL , FILE_APPEND); //如果返回結(jié)果不絕對等于(===)true,那么將觸發(fā)消息重試機制 return false; }, //5次消費消費失敗后,失敗消息將會投遞到的隊列名稱 $failedQueue = 'easymq@failed' );
推薦學(xué)習(xí):php視頻教程