本篇文章給大家介紹PHP實現(xiàn)生產(chǎn)者與消費者,希望對需要的朋友有所幫助!
前言
PHP中使用Kafka需要RdKafka擴展,而RdKafka依賴于librdkafka,所以這兩個我們都需要安裝,具體安裝方法自行百度,本篇不做說明了。
生產(chǎn)者(測試)
創(chuàng)建消費者需要步驟:
- 生產(chǎn)者配置參數(shù)
- 創(chuàng)建生產(chǎn)者實例
- 創(chuàng)建主題實例(依賴生產(chǎn)者)
- 生產(chǎn)主題消息
- 推送消息
具體代碼如下:
$conf = new RdKafkaConf(); // 綁定服務(wù)節(jié)點 $conf->set('metadata.broker.list', '127.0.0.1:32772'); // 創(chuàng)建生產(chǎn)者 $kafka = new RdKafkaProducer($conf); // 創(chuàng)建主題實例 $topic = $kafka->newTopic('p1r1'); // 生產(chǎn)主題數(shù)據(jù),此時消息在緩沖區(qū)中,并沒有真正被推送 $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message'); // 阻塞時間(毫秒), 0為非阻塞 $kafka->poll(0); // 推送消息,如果不調(diào)用此函數(shù),消息不會被發(fā)送且會丟失 $result = $kafka->flush(5000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new RuntimeException('Was unable to flush, messages might be lost!'); }
消費者
創(chuàng)建一個消費者需要幾個步驟:
- 消費者配置參數(shù)
- 應(yīng)用配置參數(shù)創(chuàng)建消費者實例
- 訂閱對應(yīng)主題
- 拉取數(shù)據(jù)
- 提交位移
具體代碼如下:
$conf = new RdKafkaConf(); // 綁定消費者組 $conf->set('group.id', 'ceshi'); // 綁定服務(wù)節(jié)點,多個用,分隔 $conf->set('metadata.broker.list', '127.0.0.1:32787'); // 設(shè)置自動提交為false $conf->set('enable.auto.commit', 'false'); // 設(shè)置當(dāng)前消費者拉取數(shù)據(jù)時的偏移量, 可選參數(shù): // earliest: 如果消費者組是新創(chuàng)建的,從頭開始消費,否則從消費者組當(dāng)前消費位移開始。 // latest:如果消費者組是新創(chuàng)建的,從最新偏移量開始,否則從消費者組當(dāng)前消費位移開始。 $conf->set('auto.offset.reset', 'earliest'); // 創(chuàng)建消費者實例 $consumer = new RdKafkaKafkaConsumer($conf); // 消費者訂閱主題,數(shù)組形式 $consumer->subscribe(['topic1','topic2']); while (true) { // 消費數(shù)據(jù),阻塞5秒(5秒內(nèi)有數(shù)據(jù)就消費,沒有數(shù)據(jù)等待5秒進入下一輪循環(huán)) $message = $consumer->consume(5000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: // 業(yè)務(wù)邏輯 var_dump($message); // 提交位移 $consumer->commit($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for moren"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed outn"; break; default: throw new Exception($message->errstr(), $message->err); break; } } // 關(guān)閉消費者(一般用在腳本中,不需要關(guān)閉) $conumser->close();
只消費指定分區(qū)中的數(shù)據(jù):
// 對消費者指定分區(qū),注意此方式不能與subscribe一同使用 $consumer->assign([ new RdKafkaTopicPartition("topic", 0), new RdKafkaTopicPartition("topic", 1), ]);