I. 部署

按照官方文档很容易部署,提几点比较重要的事:

  1. 默认情况下,kafka的进程都不是daemon进程,如果需要让启动的server转入后台运行,可以加上参数-daemon,比如:

    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    
  2. 配cluster的时候,除了需要修改server.properties里面的broker.id,还需要修改zookeeper.connect为启动zookeeper server进程的那台机器。

  3. 实际部署的时候一般只需要部署多个kafka server作为brokers,producer和consumer一般是自己写代码来实现(kaka server本身应该就是一个tcp server,我们程序中只要向它发送请求即可达到发送message和接收message的目的)。

II. 使用

我这里用的是kafka的python sdk,可以参考官方文档

  1. 使用KafkaProducer来发送消息,如果是json格式的,建议使用msgpack来进行打包发送,它比直接用json.dumps更高效。

  2. 使用KafkaConsumer来接收消息的话,最好指定一个group_id,否则每次默认都会使用同一个group_id。

  3. 每次使用一个新的group_id来consume brokers里面的消息时,Zookeeper都会自动创建并记录下这个group_id所抓消息的offset,当这次consumer断开连接下次再抓消息时,还会从上次断开的位置来拿数据,确保了数据不会丢失。

  4. 如上述所讲,如果你要从头接收brokers里面某个topic的所有消息,要么使用一个新的group_id,并设置KafkaConsumer中的参数auto_offset_reset='earliest';要么对一个已有的group_id进行reset offset(参照下一条)。

  5. 想要reset一个group的offset,方法一是在创建group的时候就告诉zookeeper不要记录它的offset:在代码里就是init KafkaConsumer的时候设置参数enable_auto_commit=False方法二是调用提供的api:在代码里就是KafkaConsumer.seek_to_beginning()方法(需要注意的是,如果你是初始化KafkaConsumer的时候指定的topic,然后执行KafkaConsumer.seek_to_beginning会报错(可能是个bug),必须在初始化之后调用KafkaConsumer.assign()方法来指定抓哪个topic的哪个partition里的数据,然后再调用seek_to_beginning方法)。

  6. 很多方法需要TopicPartition对象作为input,这个类型实际是由namedtuple生成的,有两个参数分别是topic和partition(一个整数,比如0)。

  7. 用初始化KafkaConsumer的方式(其实也是调用了subscribe()方法)或者调用KafkaConsumer.subscribe()方法是不需要指定partition的,只需要给定一个包含topic name的list即可,而调用KafkaConsumer.assign()方法是需要指定topic的partition的(subscribe()方法之所以不需要指定partition是因为它把这个任务交给了group coordinator来做了,实际运行时会通过它的算法动态选取topic中的某个partition)。官方文档也说了这两种方式是会冲突的,所以使用的时候选择某一个用即可。

  8. Kafka目前并没有提供删除某个group_id的api(Currently, the only way to remove a Kafka consumer group is manually deleting Zookeeper path /consumers/[group_id].)。

  9. kafka-python目前没有提供一个直接创建新的topic的api,想要创建新的topic,方法一是在server端使用kafka自带的shell脚本,比如:

    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

    方法二是在server端设置里面auto.create.topics.enable=true(默认就是true),这样直接发送message到一个新的topic里面会自动创建这个topic。

  10. Kafka的topic默认是存储在/tmp/kafka-logs/路径下的(可以通过修改log.dirs(你没看错,它并不是设置log的路径的~)来修改这个存储的路径),而它产生的自己的log文件是在解压缩的kafka文件夹下的(和bin文件夹同目录)。

  11. 想要删除一个topic必须要先设置所有的kafka server.properties里面的delete.topic.enable=true(default为false),然后再执行其删除topic的shell才有用(否则只是标记了某些要删除的topic,实际并没有删除)。