1.offset管理
kafka支持将consumer消费的offset存放到集群中,在0.8.2.2版本,默认存放到zookeeper中,在0.10.1.1中就默认存放到broker中的一个__consumer_offsets
的topic中。
zookeeper并不是用来做大规模读写管理的,因此放到这里会对系统造成瓶颈。
librdkafka支持两种offset管理方式:
- 存放到本地文件
- 存放到集群中
librdkafka consumer的high level API支持存放到broker中,low level API支持存放到本地文件
存放到集群
影响的配置有这么几个:
- group.id: 设置consumer组,因为offset是按组来管理的,所以必须有这个东西
- enable.auto.commit: 设置是否自动保存,[true, false]
- auto.commit.interval.ms: 自动保存的的时间,默认是5000(5s)
下面是一段示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| conf = rd_kafka_conf_new(); rd_kafka_conf_set(conf,"group.id","hehe",errstr,sizeof(errstr)); rd_kafka_conf_set(conf,"enable.auto.commit","true",errstr,sizeof(errstr)); rd_kafka_conf_set(conf,"auto.commit.interval.ms","1000",errstr,sizeof(errstr)); rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr));
# 创建个topic_partition list rtkp_list = rd_kafka_topic_partition_list_new(1); # 将名字为topic_test的topic添加到rtkp_list中,并将offset设置为RD_KAFKA_OFFSET_STORED,也就是consumer上次存储在broker中的offset。rd_kafka_topic_partition_list_add()返回的是1个rd_kafka_topic_partition_t。 rd_kafka_topic_partition_list_add(rtkp_list,"topic_test",0)->offset = RD_KAFKA_OFFSET_STORED;
# 这里是确定订阅哪个topic了 rd_kafka_assign(rk, rtkp_list); while(1){ msg = rd_kafka_consumer_poll(rk, 1000); ... }
|
可以用这个去查看存储的offset到哪儿了
1 2 3 4 5 6
| $ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic topic_test --group hehe
Group Topic Pid Offset logSize Lag Owner hehe topic_test 0 6483667 1674985741 1668502074 none
# offset就是当前存储的offset,logSize是当前最大的offset,lag是还没消费的msg个数
|
存放到本地
诶,参考上边吧
2.high level API多topic操作
有时候需要在一个进程中订阅多个topic,并且每个topic需要单独操作,这时候就可以多申请几个rd_kafka_topic_partition_list_t
,分别在每个rd_kafka_topic_partition_list_t
中加入各自的topic,在订阅消息的时候,使用rd_kafka_assign(rk, rktp_list);
来切换各个topic。就可以了。
3.high level API手动存储consumer的offset
先设置这几个参数
1 2 3 4 5 6 7 8 9 10
| # rk_conf的配置 enable.auto.commit = false enable.auto.offset.store = false # 一般还会设置这个配置,指示在broker没有存储offset(最开始时候)或offset出现错误的时候,系统应该初始化的offset位置,默认是latest auto.offset.reset = [earliest, latest, none]
# 开始时候topic_conf的几个配置会让人迷惑,auto.commit.enable 只针对 low level API, high level API需要使用rk_conf的全局配置,enable.autom.commit是auto.commit.enable的别名 auto.commit.enable = false enable.auto.commit = false
|
操作的话,需要用到
1 2 3 4 5 6 7
| // 这两个是实际将offset提交到broker的操作,async为false的时候,该操作阻塞。 // 两个的区别是,第一个可以控制提交的offset的具体值,第二个只能提交msg的offset,不能手动控制。 rd_kafka_resp_err_t rd_kafka_commit( rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async);
rd_kafka_resp_err_t rd_kafka_commit_message( rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async);
|
另外,还有个跟offset相关的函数
1 2
| rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset);
|
这个函数只是把offset提交到内存,并不是提交到broker存储起来,所以还是需要调用*commit()函数
注意:如果多topic切换的时候,需要先commit,再切换,否则切换回来还是会从原先的offset开始读取消息。
参考:
- kafka的offset管理
- librdkafka的一些说明
- 关于high level API手动管理offset的讨论