使用Kafka实践心得
I. 部署
按照官方文档很容易部署,提几点比较重要的事:
-
默认情况下,kafka的进程都不是daemon进程,如果需要让启动的server转入后台运行,可以加上参数
-daemon
,比如:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
-
配cluster的时候,除了需要修改
server.properties
里面的broker.id
,还需要修改zookeeper.connect
为启动zookeeper server进程的那台机器。 -
实际部署的时候一般只需要部署多个kafka server作为brokers,producer和consumer一般是自己写代码来实现(kaka server本身应该就是一个tcp server,我们程序中只要向它发送请求即可达到发送message和接收message的目的)。
II. 使用
我这里用的是kafka的python sdk,可以参考官方文档。
-
使用
KafkaProducer
来发送消息,如果是json格式的,建议使用msgpack来进行打包发送,它比直接用json.dumps
更高效。 -
使用
KafkaConsumer
来接收消息的话,最好指定一个group_id,否则每次默认都会使用同一个group_id。 -
每次使用一个新的group_id来consume brokers里面的消息时,Zookeeper都会自动创建并记录下这个group_id所抓消息的offset,当这次consumer断开连接下次再抓消息时,还会从上次断开的位置来拿数据,确保了数据不会丢失。
-
如上述所讲,如果你要从头接收brokers里面某个topic的所有消息,要么使用一个新的group_id,并设置
KafkaConsumer
中的参数auto_offset_reset='earliest'
;要么对一个已有的group_id进行reset offset(参照下一条)。 -
想要reset一个group的offset,方法一是在创建group的时候就告诉zookeeper不要记录它的offset:在代码里就是init
KafkaConsumer
的时候设置参数enable_auto_commit=False
;方法二是调用提供的api:在代码里就是KafkaConsumer.seek_to_beginning()
方法(需要注意的是,如果你是初始化KafkaConsumer
的时候指定的topic,然后执行KafkaConsumer.seek_to_beginning
会报错(可能是个bug),必须在初始化之后调用KafkaConsumer.assign()
方法来指定抓哪个topic的哪个partition里的数据,然后再调用seek_to_beginning
方法)。 -
很多方法需要
TopicPartition
对象作为input,这个类型实际是由namedtuple生成的,有两个参数分别是topic和partition(一个整数,比如0)。 -
用初始化
KafkaConsumer
的方式(其实也是调用了subscribe()方法)或者调用KafkaConsumer.subscribe()方法是不需要指定partition的,只需要给定一个包含topic name的list即可,而调用KafkaConsumer.assign()方法是需要指定topic的partition的(subscribe()方法之所以不需要指定partition是因为它把这个任务交给了group coordinator来做了,实际运行时会通过它的算法动态选取topic中的某个partition)。官方文档也说了这两种方式是会冲突的,所以使用的时候选择某一个用即可。 -
Kafka目前并没有提供删除某个group_id的api(Currently, the only way to remove a Kafka consumer group is manually deleting Zookeeper path
/consumers/[group_id]
.)。 -
kafka-python目前没有提供一个直接创建新的topic的api,想要创建新的topic,方法一是在server端使用kafka自带的shell脚本,比如:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
方法二是在server端设置里面
auto.create.topics.enable=true
(默认就是true),这样直接发送message到一个新的topic里面会自动创建这个topic。 -
Kafka的topic默认是存储在/tmp/kafka-logs/路径下的(可以通过修改
log.dirs
(你没看错,它并不是设置log的路径的~)来修改这个存储的路径),而它产生的自己的log文件是在解压缩的kafka文件夹下的(和bin文件夹同目录)。 -
想要删除一个topic必须要先设置所有的kafka
server.properties
里面的delete.topic.enable=true
(default为false),然后再执行其删除topic的shell才有用(否则只是标记了某些要删除的topic,实际并没有删除)。
Comments