0%

librdkafka相关

1.offset管理

kafka支持将consumer消费的offset存放到集群中,在0.8.2.2版本,默认存放到zookeeper中,在0.10.1.1中就默认存放到broker中的一个__consumer_offsets的topic中。
zookeeper并不是用来做大规模读写管理的,因此放到这里会对系统造成瓶颈。

librdkafka支持两种offset管理方式:

  1. 存放到本地文件
  2. 存放到集群中

librdkafka consumer的high level API支持存放到broker中,low level API支持存放到本地文件

存放到集群
影响的配置有这么几个:

  • group.id: 设置consumer组,因为offset是按组来管理的,所以必须有这个东西
  • enable.auto.commit: 设置是否自动保存,[true, false]
  • auto.commit.interval.ms: 自动保存的的时间,默认是5000(5s)

下面是一段示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf,"group.id","hehe",errstr,sizeof(errstr));
rd_kafka_conf_set(conf,"enable.auto.commit","true",errstr,sizeof(errstr));
rd_kafka_conf_set(conf,"auto.commit.interval.ms","1000",errstr,sizeof(errstr));
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr));

# 创建个topic_partition list
rtkp_list = rd_kafka_topic_partition_list_new(1);
# 将名字为topic_test的topic添加到rtkp_list中,并将offset设置为RD_KAFKA_OFFSET_STORED,也就是consumer上次存储在broker中的offset。rd_kafka_topic_partition_list_add()返回的是1个rd_kafka_topic_partition_t。
rd_kafka_topic_partition_list_add(rtkp_list,"topic_test",0)->offset = RD_KAFKA_OFFSET_STORED;

# 这里是确定订阅哪个topic了
rd_kafka_assign(rk, rtkp_list);
while(1){
msg = rd_kafka_consumer_poll(rk, 1000);
...
}

可以用这个去查看存储的offset到哪儿了

1
2
3
4
5
6
$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic topic_test  --group hehe

Group Topic Pid Offset logSize Lag Owner
hehe topic_test 0 6483667 1674985741 1668502074 none

# offset就是当前存储的offset,logSize是当前最大的offset,lag是还没消费的msg个数

存放到本地
诶,参考上边吧

2.high level API多topic操作

有时候需要在一个进程中订阅多个topic,并且每个topic需要单独操作,这时候就可以多申请几个rd_kafka_topic_partition_list_t,分别在每个rd_kafka_topic_partition_list_t中加入各自的topic,在订阅消息的时候,使用rd_kafka_assign(rk, rktp_list);来切换各个topic。就可以了。

3.high level API手动存储consumer的offset

先设置这几个参数

1
2
3
4
5
6
7
8
9
10
# rk_conf的配置
enable.auto.commit = false
enable.auto.offset.store = false
# 一般还会设置这个配置,指示在broker没有存储offset(最开始时候)或offset出现错误的时候,系统应该初始化的offset位置,默认是latest
auto.offset.reset = [earliest, latest, none]


# 开始时候topic_conf的几个配置会让人迷惑,auto.commit.enable 只针对 low level API, high level API需要使用rk_conf的全局配置,enable.autom.commit是auto.commit.enable的别名
auto.commit.enable = false
enable.auto.commit = false

操作的话,需要用到

1
2
3
4
5
6
7
// 这两个是实际将offset提交到broker的操作,async为false的时候,该操作阻塞。
// 两个的区别是,第一个可以控制提交的offset的具体值,第二个只能提交msg的offset,不能手动控制。
rd_kafka_resp_err_t rd_kafka_commit(
rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async);

rd_kafka_resp_err_t rd_kafka_commit_message(
rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async);

另外,还有个跟offset相关的函数

1
2
rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt,
int32_t partition, int64_t offset);

这个函数只是把offset提交到内存,并不是提交到broker存储起来,所以还是需要调用*commit()函数

注意:如果多topic切换的时候,需要先commit,再切换,否则切换回来还是会从原先的offset开始读取消息。

session.timeout.msmax.poll.interval.ms

心跳用于broker检测consumer是否断开, poll()超时用于判断处理线程是否断开. 在KIP-62之前, 只有session.timeout.ms, 心跳和poll()超时在一个线程; 从v0.10.1之后引入了max.poll.interval.ms, 把心跳和poll()放到两个线程, session.timeout.ms只用于心跳,max.poll.interval.ms 则用于处理线程超时检测.KIP-62允许处理时间(两次poll()间隔)比心跳更长.

假设处理1个msg用时1min,

  • KIP-62之前: 需要设置session.timeout.ms > 1min, 防止处理线程超时, 但如果consumer整个挂掉, broker需要>1min才能检测到超时;
  • KIP-62之后: 可以设置max.poll.interval.ms > 1min, 防止处理线程超时; 设置session.timeout.ms = 500ms, 让broker更快检测到consumer整个挂掉;

增加参数的整体思路是即使处理数据需要很长时间,也能更快检测到consumer挂掉.session.timeout.ms默认45000(45 seconds); max.poll.interval.ms默认300000ms(5min).


参考:

  1. kafka的offset管理
  2. librdkafka的一些说明
  3. 关于high level API手动管理offset的讨论