久久久久久久视色,久久电影免费精品,中文亚洲欧美乱码在线观看,在线免费播放AV片

<center id="vfaef"><input id="vfaef"><table id="vfaef"></table></input></center>

    <p id="vfaef"><kbd id="vfaef"></kbd></p>

    
    
    <pre id="vfaef"><u id="vfaef"></u></pre>

      <thead id="vfaef"><input id="vfaef"></input></thead>

    1. 站長資訊網(wǎng)
      最全最豐富的資訊網(wǎng)站

      詳解PHP實現(xiàn)生產(chǎn)者與消費者(Kafka應(yīng)用)

      本篇文章給大家介紹PHP實現(xiàn)生產(chǎn)者與消費者,希望對需要的朋友有所幫助!

      前言

      PHP中使用Kafka需要RdKafka擴展,而RdKafka依賴于librdkafka,所以這兩個我們都需要安裝,具體安裝方法自行百度,本篇不做說明了。

      生產(chǎn)者(測試)

      創(chuàng)建消費者需要步驟:

      • 生產(chǎn)者配置參數(shù)
      • 創(chuàng)建生產(chǎn)者實例
      • 創(chuàng)建主題實例(依賴生產(chǎn)者)
      • 生產(chǎn)主題消息
      • 推送消息

      具體代碼如下:

              $conf = new RdKafkaConf();         // 綁定服務(wù)節(jié)點         $conf->set('metadata.broker.list', '127.0.0.1:32772');          // 創(chuàng)建生產(chǎn)者         $kafka = new RdKafkaProducer($conf);          // 創(chuàng)建主題實例         $topic = $kafka->newTopic('p1r1');         // 生產(chǎn)主題數(shù)據(jù),此時消息在緩沖區(qū)中,并沒有真正被推送         $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message');         // 阻塞時間(毫秒), 0為非阻塞         $kafka->poll(0);           // 推送消息,如果不調(diào)用此函數(shù),消息不會被發(fā)送且會丟失         $result = $kafka->flush(5000);          if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {             throw new RuntimeException('Was unable to flush, messages might be lost!');         }

      消費者

      創(chuàng)建一個消費者需要幾個步驟:

      • 消費者配置參數(shù)
      • 應(yīng)用配置參數(shù)創(chuàng)建消費者實例
      • 訂閱對應(yīng)主題
      • 拉取數(shù)據(jù)
      • 提交位移

      具體代碼如下:

              $conf = new RdKafkaConf();         // 綁定消費者組         $conf->set('group.id', 'ceshi');         // 綁定服務(wù)節(jié)點,多個用,分隔         $conf->set('metadata.broker.list', '127.0.0.1:32787');         // 設(shè)置自動提交為false         $conf->set('enable.auto.commit', 'false');         // 設(shè)置當(dāng)前消費者拉取數(shù)據(jù)時的偏移量, 可選參數(shù):         // earliest: 如果消費者組是新創(chuàng)建的,從頭開始消費,否則從消費者組當(dāng)前消費位移開始。         // latest:如果消費者組是新創(chuàng)建的,從最新偏移量開始,否則從消費者組當(dāng)前消費位移開始。         $conf->set('auto.offset.reset', 'earliest');          // 創(chuàng)建消費者實例         $consumer = new RdKafkaKafkaConsumer($conf);         // 消費者訂閱主題,數(shù)組形式         $consumer->subscribe(['topic1','topic2']);         while (true) {             // 消費數(shù)據(jù),阻塞5秒(5秒內(nèi)有數(shù)據(jù)就消費,沒有數(shù)據(jù)等待5秒進入下一輪循環(huán))             $message = $consumer->consume(5000);             switch ($message->err) {                 case RD_KAFKA_RESP_ERR_NO_ERROR:                     // 業(yè)務(wù)邏輯                     var_dump($message);                      // 提交位移                     $consumer->commit($message);                     break;                 case RD_KAFKA_RESP_ERR__PARTITION_EOF:                     echo "No more messages; will wait for moren";                     break;                 case RD_KAFKA_RESP_ERR__TIMED_OUT:                     echo "Timed outn";                     break;                 default:                     throw new Exception($message->errstr(), $message->err);                     break;             }         }         // 關(guān)閉消費者(一般用在腳本中,不需要關(guān)閉)         $conumser->close();

      只消費指定分區(qū)中的數(shù)據(jù):

          // 對消費者指定分區(qū),注意此方式不能與subscribe一同使用     $consumer->assign([         new RdKafkaTopicPartition("topic", 0),         new RdKafkaTopicPartition("topic", 1),     ]);

      贊(0)
      分享到: 更多 (0)
      網(wǎng)站地圖   滬ICP備18035694號-2    滬公網(wǎng)安備31011702889846號