基本概念介紹
在Kafka中有一些基本的概念,
Topic
-
簡介:Topic在Kafka中是一個(gè)抽象的概念,一個(gè)主題是已經(jīng)發(fā)布的記錄的種類。主題在Kafka中是可以被多重訂閱的,這就意味著一個(gè)主題可能有0個(gè)、一個(gè)、或者許多個(gè)消費(fèi)者去訂閱這個(gè)主題中的消息。
-
Partitions:在每一個(gè)topic在Kafka中可以有多個(gè)分區(qū),增加一個(gè)主題的分區(qū)可以提高Kafka的吞吐率,但是不是越多越好,因?yàn)槿绻謪^(qū)數(shù)量越多的話生產(chǎn)者插入的效率也會(huì)降低。所以真正到生產(chǎn)環(huán)境時(shí),需要權(quán)衡生產(chǎn)與消費(fèi)的一個(gè)平衡關(guān)系,消費(fèi)稍微大于生產(chǎn)者,不會(huì)產(chǎn)生消息的堆積,也能夠充分提高Kafka的效率。
-
Replication Factor:復(fù)制因子,是對(duì)于當(dāng)前的Topic是否需要副本。如果設(shè)置成1的話,代表當(dāng)前Topic在整個(gè)Kafka中只有一份。這里有個(gè)限制Topic的數(shù)量不能夠多于當(dāng)前Kafka的Broker數(shù)量。
-
存儲(chǔ)方式:在Kafka的配置中(Server.properties)有l(wèi)ogs.dir的配置,這個(gè)是Kafka存儲(chǔ)消息的位置。如果Topic復(fù)制因子是1分區(qū)是1的話,在對(duì)應(yīng)的文件夾下會(huì)有一個(gè)名稱為topicname的文件夾;如果復(fù)制因子是2分區(qū)是2,假設(shè)存在兩個(gè)Broker,在每個(gè)Broker中將會(huì)存在兩個(gè)文件夾分別為topicname_0 topicname_1的文件夾
-
Leader與Follower:由于每個(gè)topic如果存在副本的話,是對(duì)于partition進(jìn)行復(fù)制。這么多存在在不同的Broker上的副本,其中有一個(gè)partition是leader其他的是Followers,當(dāng)一個(gè)broker宕機(jī)會(huì)在副本中選擇一個(gè)充當(dāng)Leader。
Producer
生產(chǎn)者,顧明思議是生產(chǎn)消息,允許應(yīng)用發(fā)布一個(gè)流的消息到一個(gè)或者多個(gè)主題中,
Consumer
- 簡介:消費(fèi)者是訂閱某個(gè)topic消息。
- Group:每個(gè)消費(fèi)者都有個(gè)groupid 來標(biāo)定當(dāng)前消費(fèi)者屬于哪個(gè)group。Group的作用是,當(dāng)同一個(gè)group的兩個(gè)消費(fèi)者訂閱一個(gè)topic的時(shí)候,如果當(dāng)前topic沒有分區(qū)那么其中一個(gè)消費(fèi)者是獲得不了任何消息的;如果有分區(qū)的話,將會(huì)按照數(shù)量進(jìn)行負(fù)載均衡,每個(gè)消費(fèi)者獲得不同的分區(qū)的消息。
-
同一個(gè)Group下的消費(fèi)者不會(huì)同時(shí)訂閱一個(gè)主題下的同一個(gè)分區(qū),如果消費(fèi)者數(shù)量杜宇分區(qū)數(shù)量,則多出的消費(fèi)者是不會(huì)有任何消息獲得的。
Broker
Broker 是一個(gè)Kafka的Server,一臺(tái)單物理機(jī)或者集群都可以擁有多個(gè)broker一個(gè)broker可以容納多個(gè)主題,這個(gè)與復(fù)制因子、主題的分區(qū)都有關(guān)系。
Kafka單機(jī)配置,一個(gè)Broker
環(huán)境:
- win10物理機(jī)
- Wmare CentOS7虛擬機(jī)
- XShell 訪問虛擬機(jī)
配置zookeeper
- 下載
# zookeeper wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
- 解壓后進(jìn)入目錄
cd zookeeper-3.4.13/conf
- 復(fù)制zookeeper的配置文件
cp zoo_sample.cfg zoo.cfg
- 返回上級(jí)進(jìn)入bin目錄下,鍵入如下命令
./zkServer.sh start
- 查看是否成功開啟zookeeper服務(wù)
#注:這里提示一下開啟后提示的成功不一定是真的成功,所以需要查看一下 netstat -tunlp|egrep 2181 # 如果沒有結(jié)果查看統(tǒng)計(jì)目錄下的 zookeeper.out文件 查看log信息 # 使用jps命令查看 QuorumPeerMain是zookeeper的守護(hù)進(jìn)程 11089 QuorumPeerMain 11114 Jps
配置Kafka
- 下載安裝包
# Kafka wget http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
- 解壓后進(jìn)入文件夾下bin目錄下
# 第一個(gè)是start.sh位置第二個(gè)是server.rpoperties的位置,所以確認(rèn)好路徑的正確性 ./kafka-server-start.sh ./../config/server.properties & # 我們可以在Kafka的目錄下直接執(zhí)行,而不進(jìn)入到bin下,命令看著更舒服些 ./bin/kafka-server-start.sh ./config/server.properties &
- 查看是否開啟成功:默認(rèn)的Kafka端口是9092,zookeeper是2181
netstat -tunlp|egrep "(2181|9092)" # 結(jié)果如下 [root@localhost ~]# netstat -tunlp|egrep "(2181|9092)" tcp6 0 0 :::9092 :::* LISTEN 1877/java tcp6 0 0 :::2181 :::* LISTEN 1820/java # jps 查看 11089 QuorumPeerMain 11458 Kafka 11847 Jps
- 至此Kafka配置成功
使用Kafka
創(chuàng)建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test # 返回結(jié)果 Created topic "test"
在虛擬機(jī)用sh腳本上作為生產(chǎn)者生產(chǎn)消息
- 我們重新開一個(gè)Xshell窗口,CD到
Kafka目錄/bin下,我們先介紹這一節(jié)會(huì)使用到的 kafka-console-producer.sh
# 鍵入如下命令 ./kafka-console-producer.sh --broker-list localhost:9092 --topic test >today message > # 最近本的指定,broker-list與topic是必須的參數(shù) # 成功命令行會(huì)進(jìn)入一個(gè)>的情況,鍵入消息按回車鍵就是發(fā)送消息到Kafka了 # 發(fā)送一個(gè)【today message】
- kafka-console-producer.sh參數(shù)說明,運(yùn)行
./kafka-console-producer.sh --help可查看
在虛擬機(jī)上用sh腳本作為消費(fèi)者消費(fèi)消息
- 重新開另個(gè)一Xshell窗口CD到
Kafka目錄/bin下,我們先介紹這一節(jié)會(huì)使用到的 kafka-console-consumer.sh
# 鍵入如下命令 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning # 最近本的指定,bootstrap-server與topic/whitelist是必須的參數(shù) # 由于有 from-beginning 參數(shù) 會(huì)從頭load所有消息 # 消費(fèi)后返回如下 today message #在生產(chǎn)端鍵入消息后,消費(fèi)端會(huì)同步消息出現(xiàn)
- kafka-console-consumer.sh參數(shù)說明運(yùn)行
./kafka-console-consumer.sh --help可查看
使用Python作為生產(chǎn)者、消費(fèi)者
- 在物理機(jī)上寫一個(gè)Python生產(chǎn)者的腳本
from kafka.producer import KafkaProducer import time def send_data(data): producer = KafkaProducer(bootstrap_servers='192.168.233.138:9092') producer.send("test",b''+str(data)+'') producer.flush() print ("end") if __name__=="__main__": send_data("physics python message");
- 查看Xshell上消費(fèi)的命令行
[root@localhost ~]# /home/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.138:9092 --topic test --from-beginning 111 333 1 12 physics python message
- 在物理機(jī)上寫一個(gè)消費(fèi)者的腳本
from kafka import KafkaConsumer import time def get_data(data): consumer = KafkaConsumer('test',bootstrap_servers='192.168.233.138:9092', group_id='my_favorite_group') print ("end") for msg in consumer: print(msg) if __name__=="__main__": get_data();
- 物理機(jī)消費(fèi)者的結(jié)果
# 我這邊是先運(yùn)行的消費(fèi)者的腳本,所以實(shí)時(shí)接收到了物理機(jī)產(chǎn)生的消息 ConsumerRecord(topic=u'test', partition=0, offset=5, timestamp=1551762485911L, timestamp_type=0, key=None, value='physics python message', checksum=1520092583, serialized_key_size=-1, serialized_value_size=22)
- 測試使用虛擬機(jī)sh端的生產(chǎn)者發(fā)送123 消息,查看物理機(jī)消費(fèi)者結(jié)果
ConsumerRecord(topic=u'test', partition=0, offset=6, timestamp=1551762784609L, timestamp_type=0, key=None, value='123', checksum=1760815061, serialized_key_size=-1, serialized_value_size=3)
- 幾點(diǎn)注意
# 物理機(jī)連接時(shí)可能出現(xiàn)【kafka.errors.NoBrokersAvailable: NoBrokersAvailable】這個(gè)錯(cuò)誤按照如下順序依次更改 1. 查看虛擬機(jī)防火墻是否關(guān)閉 systemctl status firewalld systemctl stop firewalld 2. 更改kafka服務(wù)端的server.properties: 增加 [ listeners=PLAINTEXT://192.168.233.138:9092 ]這一行 3. 修改物理機(jī)的hosts文件 C:WindowsSystem32driversetchosts 增加 【虛擬機(jī)ip 虛擬機(jī)主機(jī)名】 Eg:[192.168.233.138 localhost]
使用Springboot 作為生產(chǎn)者、消費(fèi)者
注:我直接在我的一個(gè)寄存的Spring Boot Demo項(xiàng)目上更改
- 在pom.xml中添加kafka依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- 提示一件事情此處別指定version了,直接用最新的就可以,老的版本一些包找不到 -->
- 寫一個(gè)kafka 生產(chǎn)者配置類
package com.example.kane.config; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @Configuration @EnableKafka public class kafka_config { public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.138:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
- 創(chuàng)建一個(gè)生產(chǎn)數(shù)據(jù)的Controller
package com.example.kane.Controller; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @RestController @RequestMapping("/kafka") public class CollectController { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private KafkaTemplate kafkaTemplate; @RequestMapping(value = "/send", method = RequestMethod.GET) public void sendKafka(HttpServletRequest request, HttpServletResponse response) { try { String message = request.getParameter("message"); logger.info("kafka的消息={}", message); kafkaTemplate.send("test", "key", message); logger.info("發(fā)送kafka成功."); } catch (Exception e) { logger.error("發(fā)送kafka失敗", e); } } }
- 啟動(dòng)項(xiàng)目后,在瀏覽器訪問http://localhost:8080/kafka/send?message=url_producer
# 查看結(jié)果 2019-03-05 13:57:16.438 INFO 10208 --- [nio-8080-exec-1] c.e.kane.Controller.CollectController : 發(fā)送kafka成功. 2019-03-05 13:57:45.871 INFO 10208 --- [nio-8080-exec-5] c.e.kane.Controller.CollectController : kafka的消息=url_producer 2019-03-05 13:57:45.872 INFO 10208 --- [nio-8080-exec-5] c.e.kane.Controller.CollectController : 發(fā)送kafka成功. # 查看虛擬機(jī) Consumer結(jié)果 [root@localhost ~]# /home/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.138:9092 --topic test --from-beginning physics python message 123 null url_producer
- 增加消費(fèi)者的配置
package com.example.kane.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; import com.example.kane.service.kafka_listener; @Configuration @EnableKafka public class kafka_consumer_config { @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.138:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); return propsMap; } @Bean public kafka_listener listener() { return new kafka_listener(); } }
- 增加listener類
package com.example.kane.service; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; public class kafka_listener { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics = {"test"}) public void listen(ConsumerRecord<?, ?> record) { logger.info(record.toString()); logger.info("kafka的key: " + record.key()); logger.info("kafka的value: " + record.value().toString()); } }
- 同樣我們用訪問http://localhost:8080/kafka/send?message=url_producer1重新發(fā)一個(gè)消息
# 結(jié)果 2019-03-05 14:31:04.787 INFO 10208 --- [nio-8080-exec-1] c.e.kane.Controller.CollectController : 發(fā)送kafka成功. 2019-03-05 14:31:04.848 INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener : ConsumerRecord(topic = test, partition = 0, offset = 10, CreateTime = 1551767464787, serialized key size = 3, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = url_producer1) 2019-03-05 14:31:04.848 INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener : kafka的key: key 2019-03-05 14:31:04.848 INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener : kafka的value: url_producer1 # 查看虛擬機(jī) 消費(fèi)者信息 physics python message 123 null url_producer url_producer1 url_producer1
一些需要注意的問題
- 現(xiàn)在kafka官方提供自帶zookeeper版本,不建議使用自帶的,還是建議自己安裝zookeeper
- 物理機(jī)沒法訪問的時(shí)候,看文中的注意事項(xiàng),依次更改一定能訪問