久久久久久久视色,久久电影免费精品,中文亚洲欧美乱码在线观看,在线免费播放AV片

<center id="vfaef"><input id="vfaef"><table id="vfaef"></table></input></center>

    <p id="vfaef"><kbd id="vfaef"></kbd></p>

    
    
    <pre id="vfaef"><u id="vfaef"></u></pre>

      <thead id="vfaef"><input id="vfaef"></input></thead>

    1. 站長資訊網(wǎng)
      最全最豐富的資訊網(wǎng)站

      如何基于Hyperf實(shí)現(xiàn)RabbitMQ+WebSocket消息推送

      介紹

      基于 Hyperf+ WebSocket +RabbitMQ 實(shí)現(xiàn)的一個(gè)簡單大屏幕的消息推送。

      思路

      利用 WebSocket 協(xié)議讓客戶端和服務(wù)器端保持有狀態(tài)的長鏈接,

      保存鏈接上來的客戶端 id。訂閱發(fā)布者發(fā)布的消息針對(duì)已保存的客戶端 id 進(jìn)行廣播消息。

      WebSocket 服務(wù)

      composer require hyperf/websocket-server

      配置文件 [config/autoload/server.php]

      <?php return [     'mode' => SWOOLE_PROCESS,     'servers' => [         [             'name' => 'http',             'type' => Server::SERVER_HTTP,             'host' => '0.0.0.0',             'port' => 11111,             'sock_type' => SWOOLE_SOCK_TCP,             'callbacks' => [                 SwooleEvent::ON_REQUEST => [HyperfHttpServerServer::class, 'onRequest'],             ],         ],         [             'name' => 'ws',             'type' => Server::SERVER_WEBSOCKET,             'host' => '0.0.0.0',             'port' => 12222,             'sock_type' => SWOOLE_SOCK_TCP,             'callbacks' => [                 SwooleEvent::ON_HAND_SHAKE => [HyperfWebSocketServerServer::class, 'onHandShake'],                 SwooleEvent::ON_MESSAGE => [HyperfWebSocketServerServer::class, 'onMessage'],                 SwooleEvent::ON_CLOSE => [HyperfWebSocketServerServer::class, 'onClose'],             ],         ],     ],

      WebSocket 服務(wù)器端代碼示例

      <?php declare(strict_types=1); /**  * This file is part of Hyperf.  *  * @link     https://www.hyperf.io  * @document https://doc.hyperf.io  * @contact  group@hyperf.io  * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE  */ namespace AppController; use HyperfContractOnCloseInterface; use HyperfContractOnMessageInterface; use HyperfContractOnOpenInterface; use SwooleHttpRequest; use SwooleServer; use SwooleWebsocketFrame; use SwooleWebSocketServer as WebSocketServer; class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface {     /**      * 發(fā)送消息      * @param WebSocketServer $server      * @param Frame $frame      */     public function onMessage(WebSocketServer $server, Frame $frame): void     {         //心跳刷新緩存         $redis = $this->container->get(Redis::class);         //獲取所有的客戶端id         $fdList = $redis->sMembers('websocket_sjd_1');         //如果當(dāng)前客戶端在客戶端集合中,就刷新         if (in_array($frame->fd, $fdList)) {             $redis->sAdd('websocket_sjd_1', $frame->fd);             $redis->expire('websocket_sjd_1', 7200);         }         $server->push($frame->fd, 'Recv: ' . $frame->data);     }     /**      * 客戶端失去鏈接      * @param Server $server      * @param int $fd      * @param int $reactorId      */     public function onClose(Server $server, int $fd, int $reactorId): void     {         //刪掉客戶端id         $redis = $this->container->get(Redis::class);         //移除集合中指定的value         $redis->sRem('websocket_sjd_1', $fd);         var_dump('closed');     }     /**      * 客戶端鏈接      * @param WebSocketServer $server      * @param Request $request      */     public function onOpen(WebSocketServer $server, Request $request): void     {         //保存客戶端id         $redis = $this->container->get(Redis::class);         $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);         var_dump($res1);         $res = $redis->expire('websocket_sjd_1', 7200);         var_dump($res);         $server->push($request->fd, 'Opened');     } }

      WebSocket 前端代碼

          function WebSocketTest() {         if ("WebSocket" in window) {             console.log("您的瀏覽器支持 WebSocket!");             var num = 0             // 打開一個(gè) web socket             var ws = new WebSocket("ws://127.0.0.1:12222");             ws.onopen = function () {                 // Web Socket 已連接上,使用 send() 方法發(fā)送數(shù)據(jù)                 //alert("數(shù)據(jù)發(fā)送中...");                 //ws.send("發(fā)送數(shù)據(jù)");             };             window.setInterval(function () { //每隔5秒鐘發(fā)送一次心跳,避免websocket連接因超時(shí)而自動(dòng)斷開                 var ping = {"type": "ping"};                 ws.send(JSON.stringify(ping));             }, 5000);             ws.onmessage = function (evt) {                 var d = JSON.parse(evt.data);                 console.log(d);                 if (d.code == 300) {                     $(".address").text(d.address)                 }                 if (d.code == 200) {                     var v = d.data                     console.log(v);                     num++                     var str = `<div class="item">                                     <p>${v.recordOutTime}</p>                                     <p>${v.userOutName}</p>                                     <p>${v.userOutNum}</p>                                     <p>${v.doorOutName}</p>                                 </div>`                     $(".tableHead").after(str)                     if (num > 7) {                         num--                         $(".table .item:nth-last-child(1)").remove()                     }                 }             };             ws.error = function (e) {                 console.log(e)                 alert(e)             }             ws.onclose = function () {                 // 關(guān)閉 websocket                 alert("連接已關(guān)閉...");             };         } else {             alert("您的瀏覽器不支持 WebSocket!");         }     }

      AMQP 組件

      composer require hyperf/amqp

      配置文件 [config/autoload/amqp.php]

      <?php return [     'default' => [         'host' => 'localhost',         'port' => 5672,         'user' => 'guest',         'password' => 'guest',         'vhost' => '/',         'pool' => [             'min_connections' => 1,             'max_connections' => 10,             'connect_timeout' => 10.0,             'wait_timeout' => 3.0,             'heartbeat' => -1,         ],         'params' => [             'insist' => false,             'login_method' => 'AMQPLAIN',             'login_response' => null,             'locale' => 'en_US',             'connection_timeout' => 3.0,             'read_write_timeout' => 6.0,             'context' => null,             'keepalive' => false,             'heartbeat' => 3,         ],     ], ];

      MQ 消費(fèi)者代碼

      <?php declare(strict_types=1); namespace AppAmqpConsumer; use HyperfAmqpAnnotationConsumer; use HyperfAmqpMessageConsumerMessage; use HyperfAmqpResult; use HyperfServerServer; use HyperfServerServerFactory; /**  * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)  */ class DemoConsumer extends ConsumerMessage {     /**      * rabbmitMQ消費(fèi)端代碼      * @param $data      * @return string      */     public function consume($data): string     {         print_r($data);         //獲取集合中所有的value         $redis = $this->container->get(Redis::class);         $fdList=$redis->sMembers('websocket_sjd_1');         $server=$this->container->get(ServerFactory::class)->getServer()->getServer();         foreach($fdList as $key=>$v){             if(!empty($v)){                 $server->push((int)$v, $data);             }         }         return Result::ACK;     } }

      控制器代碼

          /**      * test      * @return array      */     public function test()     {         $data = array(             'code' => 200,             'data' => [                 'userOutName' => 'ccflow',                 'userOutNum' => '9999',                 'recordOutTime' => date("Y-m-d H:i:s", time()),                 'doorOutName' => '教師公寓',             ]         );         $data = GuzzleHttpjson_encode($data);         $message = new DemoProducer($data);         $producer = ApplicationContext::getContainer()->get(Producer::class);         $result = $producer->produce($message);         var_dump($result);         $user = $this->request->input('user', 'Hyperf');         $method = $this->request->getMethod();         return [             'method' => $method,             'message' => "{$user}.",         ];     }

      最終效果

      如何基于Hyperf實(shí)現(xiàn)RabbitMQ+WebSocket消息推送

      推薦:《PHP教程》

      贊(0)
      分享到: 更多 (0)
      網(wǎng)站地圖   滬ICP備18035694號(hào)-2    滬公網(wǎng)安備31011702889846號(hào)