1.offset管理
kafka支持将consumer消费的offset存放到集群中,在0.8.2.2版本,默认存放到zookeeper中,在0.10.1.1中就默认存放到broker中的一个__consumer_offsets
的topic中。
zookeeper并不是用来做大规模读写管理的,因此放到这里会对系统造成瓶颈。
librdkafka支持两种offset管理方式:
- 存放到本地文件
- 存放到集群中
librdkafka consumer的high level API支持存放到broker中,low level API支持存放到本地文件
存放到集群
影响的配置有这么几个:
- group.id: 设置consumer组,因为offset是按组来管理的,所以必须有这个东西
- enable.auto.commit: 设置是否自动保存,[true, false]
- auto.commit.interval.ms: 自动保存的的时间,默认是5000(5s)
下面是一段示例代码
1 | conf = rd_kafka_conf_new(); |
可以用这个去查看存储的offset到哪儿了
1 | $ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic topic_test --group hehe |
存放到本地
诶,参考上边吧
2.high level API多topic操作
有时候需要在一个进程中订阅多个topic,并且每个topic需要单独操作,这时候就可以多申请几个rd_kafka_topic_partition_list_t
,分别在每个rd_kafka_topic_partition_list_t
中加入各自的topic,在订阅消息的时候,使用rd_kafka_assign(rk, rktp_list);
来切换各个topic。就可以了。
3.high level API手动存储consumer的offset
先设置这几个参数
1 | # rk_conf的配置 |
操作的话,需要用到
1 | // 这两个是实际将offset提交到broker的操作,async为false的时候,该操作阻塞。 |
另外,还有个跟offset相关的函数
1 | rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt, |
这个函数只是把offset提交到内存,并不是提交到broker存储起来,所以还是需要调用*commit()函数
注意:如果多topic切换的时候,需要先commit,再切换,否则切换回来还是会从原先的offset开始读取消息。
session.timeout.ms
和max.poll.interval.ms
心跳用于broker检测consumer是否断开, poll()
超时用于判断处理线程是否断开. 在KIP-62
之前, 只有session.timeout.ms
, 心跳和poll()
超时在一个线程; 从v0.10.1之后引入了max.poll.interval.ms
, 把心跳和poll()
放到两个线程, session.timeout.ms
只用于心跳,max.poll.interval.ms
则用于处理线程超时检测.KIP-62允许处理时间(两次poll()
间隔)比心跳更长.
假设处理1个msg用时1min,
- KIP-62之前: 需要设置
session.timeout.ms > 1min
, 防止处理线程超时, 但如果consumer整个挂掉, broker需要>1min
才能检测到超时; - KIP-62之后: 可以设置
max.poll.interval.ms > 1min
, 防止处理线程超时; 设置session.timeout.ms = 500ms
, 让broker更快检测到consumer整个挂掉;
增加参数的整体思路是即使处理数据需要很长时间,也能更快检测到consumer挂掉.session.timeout.ms
默认45000(45 seconds); max.poll.interval.ms
默认300000ms(5min).
参考: