0%

librdkafka相关

1.offset管理

kafka支持将consumer消费的offset存放到集群中,在0.8.2.2版本,默认存放到zookeeper中,在0.10.1.1中就默认存放到broker中的一个__consumer_offsets的topic中。
zookeeper并不是用来做大规模读写管理的,因此放到这里会对系统造成瓶颈。

librdkafka支持两种offset管理方式:

  1. 存放到本地文件
  2. 存放到集群中

librdkafka consumer的high level API支持存放到broker中,low level API支持存放到本地文件

存放到集群
影响的配置有这么几个:

  • group.id: 设置consumer组,因为offset是按组来管理的,所以必须有这个东西
  • enable.auto.commit: 设置是否自动保存,[true, false]
  • auto.commit.interval.ms: 自动保存的的时间,默认是5000(5s)

下面是一段示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf,"group.id","hehe",errstr,sizeof(errstr));
rd_kafka_conf_set(conf,"enable.auto.commit","true",errstr,sizeof(errstr));
rd_kafka_conf_set(conf,"auto.commit.interval.ms","1000",errstr,sizeof(errstr));
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr));

# 创建个topic_partition list
rtkp_list = rd_kafka_topic_partition_list_new(1);
# 将名字为topic_test的topic添加到rtkp_list中,并将offset设置为RD_KAFKA_OFFSET_STORED,也就是consumer上次存储在broker中的offset。rd_kafka_topic_partition_list_add()返回的是1个rd_kafka_topic_partition_t。
rd_kafka_topic_partition_list_add(rtkp_list,"topic_test",0)->offset = RD_KAFKA_OFFSET_STORED;

# 这里是确定订阅哪个topic了
rd_kafka_assign(rk, rtkp_list);
while(1){
msg = rd_kafka_consumer_poll(rk, 1000);
...
}

可以用这个去查看存储的offset到哪儿了

1
2
3
4
5
6
$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic topic_test  --group hehe

Group Topic Pid Offset logSize Lag Owner
hehe topic_test 0 6483667 1674985741 1668502074 none

# offset就是当前存储的offset,logSize是当前最大的offset,lag是还没消费的msg个数

存放到本地
诶,参考上边吧

2.high level API多topic操作

有时候需要在一个进程中订阅多个topic,并且每个topic需要单独操作,这时候就可以多申请几个rd_kafka_topic_partition_list_t,分别在每个rd_kafka_topic_partition_list_t中加入各自的topic,在订阅消息的时候,使用rd_kafka_assign(rk, rktp_list);来切换各个topic。就可以了。

3.high level API手动存储consumer的offset

先设置这几个参数

1
2
3
4
5
6
7
8
9
10
# rk_conf的配置
enable.auto.commit = false
enable.auto.offset.store = false
# 一般还会设置这个配置,指示在broker没有存储offset(最开始时候)或offset出现错误的时候,系统应该初始化的offset位置,默认是latest
auto.offset.reset = [earliest, latest, none]


# 开始时候topic_conf的几个配置会让人迷惑,auto.commit.enable 只针对 low level API, high level API需要使用rk_conf的全局配置,enable.autom.commit是auto.commit.enable的别名
auto.commit.enable = false
enable.auto.commit = false

操作的话,需要用到

1
2
3
4
5
6
7
// 这两个是实际将offset提交到broker的操作,async为false的时候,该操作阻塞。
// 两个的区别是,第一个可以控制提交的offset的具体值,第二个只能提交msg的offset,不能手动控制。
rd_kafka_resp_err_t rd_kafka_commit(
rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async);

rd_kafka_resp_err_t rd_kafka_commit_message(
rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async);

另外,还有个跟offset相关的函数

1
2
rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt,
int32_t partition, int64_t offset);

这个函数只是把offset提交到内存,并不是提交到broker存储起来,所以还是需要调用*commit()函数

注意:如果多topic切换的时候,需要先commit,再切换,否则切换回来还是会从原先的offset开始读取消息。


参考:

  1. kafka的offset管理
  2. librdkafka的一些说明
  3. 关于high level API手动管理offset的讨论