0%

librdkafka--kafka C api介绍

用到的
更新:2017年11月08日07:23:09

0. 总结

支持

  • High-level producer
  • High-level consumer
  • Simple (Low-level) consumer
  • 压缩:snappy, gzip, lz4
  • SSL
  • SASL

  consumer有两套API,高级(high-level)和简单(simple)的,取名叫简单的其实不是很准确,应该叫底层API或者低级API,它跟高级API的区别是没有自动负载均衡,而高级API会自动进行负载均衡–当然底层的适合折腾。我下边就直接叫高级和低级API了。


kafka的 C 客户端主要用途就是发数据收数据,为了收发数据就有了

  • producer
  • consumer

发数据(producer干的活儿)比较好办,可以选择发一条和发多条:

1
2
3
4
// 发一条
rd_kafka_produce();
// 发多条
rd_kafka_produce_batch();

收数据(consumer干的活儿)的选择比较多:

1
2
3
4
5
6
7
8
9
10
// 高级API
msg = rd_kafka_consumer_poll();

// 低级API
// 一次收一条
msg = rd_kafka_consume();
// 一次收多条
msg = rd_kafka_consume_batch();
// 使用回调处理收到的msg,速度最快了
rd_kafka_consume_callback();

收发数据之前至少要先有个统一的句柄,好让kafka内部准备好连接brokers集群、建立内部使用的各项结构之类的工作:

1
2
// 建立producer或者consumer都用这个
rd = rd_kafka_new();

建立的这个kafka句柄要知道连到哪个broker:

1
2
3
4
5
// 运行时动态添加brokers
rd_kafka_brokers_add();

// 使用配置项添加brokers
rd_kafka_conf_set(conf, "metadata.broker.list",brokers, errstr, sizeof(errstr);

发布消息用rd_kafka_topic_t;订阅消息有两套,一套对应高级API:rd_kafka_topic_partition_list_t,一套对应低级API:rd_kafka_topic_t(跟producer用的一样)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 对rd_kafka_topic_partition_list_t结构的操作
// 创建
rd_kafka_topic_partition_list_new();
// 增加元素
rd_kafka_topic_partition_list_add();
// 删除元素
rd_kafka_topic_partition_list_del();
// 查找元素
rd_kafka_topic_partition_list_find();

// 对rd_kafka_topic_t的操作
// 创建
rd_kafka_topic_new();
// 删除
rd_kafka_topic_destroy();
// 获取该topic的名字
rd_kafka_topic_name();
// 获取该topic传入的应用参数
rd_kafka_topic_opaque();

// 使用rd_kafka_topic_partition_list_t的时候,topic+partition是连在一起的,
// 所以给kafka句柄的时候只用一个参数就够了
// 订阅消息
rd_kafka_subscribe (rd_kafka_t *rk,
const rd_kafka_topic_partition_list_t *topics);
// 指定消费的partition,可以在运行时更换
rd_kafka_assign (rd_kafka_t *rk,
const rd_kafka_topic_partition_list_t *partitions);

// 用rd_kafka_topic_t比较麻烦,需要配合一个partition才行
// 直接启动consumer了
rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
int64_t offset);
// 每次接收也要带上partition
rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
int timeout_ms);


// 发送的时候使用 rd_kafka_topic_t
int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
int msgflags,
void *payload, size_t len,
const void *key, size_t keylen,
void *msg_opaque);

另外,对kafka各种事件进行响应的是:

1
2
int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms);
rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);

第一个是主要的入口,可以用于producer和consumer,第二个是专门针对consumer的入口。

除了上边这些主要的,kafka正常运行还需要一些辅助的东西,比如配置,有两个配置项,一个是kafka本身的rd_kafka_conf_t,一个是针对topic的rd_kafka_topic_conf_t:

1
2
3
4
5
6
7
8
9
// 针对kafka本身的rd_kafka_conf_t在创建kafka句柄之前设置,在创建kafka句柄的时候用到,
// 用完就死掉了,不用单独针对rd_kafka_conf_t去销毁,一心想要销毁的还会出错
rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,
char *errstr, size_t errstr_size);

// 针对topic的rd_kafka_topic_conf_t在创建rd_kafka_topic_t时候会用到,跟上一个的
// 命运一样,用完就死了
rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic,
rd_kafka_topic_conf_t *conf);

比如队列queue,用来支持低级API把从几个topic收到的message汇聚到一起来处理的额外数据结构,高级api就没有这么自由了。
还比如错误处理,至关重要,但是比较琐碎。

1. 错误处理

相关数据结构

1
2
3
4
5
6
7
// kafka内部错误信息
typedef enum {
/* Internal errors to rdkafka: */
/** Begin internal error codes */
RD_KAFKA_RESP_ERR__BEGIN = -200,
...
} rd_kafka_resp_err_t;

获取可读的错误内容的字符串

1
const char *rd_kafka_err2str (rd_kafka_resp_err_t err);

获取错误名称的字符串

1
const char *rd_kafka_err2name (rd_kafka_resp_err_t err);

获取最近的错误

1
2
3
4
5
6
7
8
9
10
11
// 能产生错误的旧API
// rd_kafka_topic_new()
// rd_kafka_consume_start()
// rd_kafka_consume_stop()
// rd_kafka_consume()
// rd_kafka_consume_batch()
// rd_kafka_consume_callback()
// rd_kafka_consume_queue()
// rd_kafka_produce()

rd_kafka_resp_err_t rd_kafka_last_error (void);

2. topic_partition_list操作

相关数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 表示一个topic+partition
typedef struct rd_kafka_topic_partition_s {
char *topic; /* Topic name */
int32_t partition; /* Partition */
int64_t offset; /* Offset */
void *metadata; /* Metadata */
size_t metadata_size; /* Metadata size */
void *opaque; /* Application opaque */
rd_kafka_resp_err_t err; /* Error code, depending on use. */
void *_private; /* INTERNAL USE ONLY,
* INITIALIZE TO ZERO, DO NOT TOUCH */
} rd_kafka_topic_partition_t;

// 存放的list,一般是操作这个结构
typedef struct rd_kafka_topic_partition_list_s {
int cnt; /* Current number of elements */
int size; /* Current allocated size */
rd_kafka_topic_partition_t *elems; /* Element array[] */
} rd_kafka_topic_partition_list_t;

新建、销毁list

1
2
3
4
5
6
7
8
// 创建
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_new (int size);

// 销毁
void
rd_kafka_topic_partition_list_destroy (
rd_kafka_topic_partition_list_t *rkparlist);

添加新元素

1
2
3
4
5
6
7
8
9
10
11
// 添加1个元素,返回的element可用于填写其他field
rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add (
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition);

// 添加多个元素
void rd_kafka_topic_partition_list_add_range (
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic,
int32_t start, int32_t stop);

删除元素

1
2
3
4
5
6
7
8
9
10
11
// 根据topic+partition删除
int
rd_kafka_topic_partition_list_del (
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition);

// 根据index删除
int
rd_kafka_topic_partition_list_del_by_idx (
rd_kafka_topic_partition_list_t *rktparlist,
int idx);

查找元素

1
2
3
4
rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find (
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition);

3. message操作

相关数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typedef struct rd_kafka_message_s {
rd_kafka_resp_err_t err; /* Non-zero for error signaling. */
rd_kafka_topic_t *rkt; /* Topic */
int32_t partition; /* Partition */
void *payload; /* Producer: original message payload.
* Consumer: Depends on the value of err :
* err==0: Message payload.
* err!=0: Error string */
size_t len; /* Depends on the value of err :
* err==0: Message payload length
* err!=0: Error string length */
void *key; /* Depends on the value of err :
* err==0: Optional message key */
size_t key_len; /* Depends on the value of err :
* err==0: Optional message key length*/
int64_t offset; /* Consume:
* Message offset (or offset for error
* if err!=0 if applicable).
* dr_msg_cb:
* Message offset assigned by broker.
* If produce.offset.report is set then
* each message will have this field set,
* otherwise only the last message in
* each produced internal batch will
* have this field set, otherwise 0. */
void *_private; /* Consume:
* rdkafka private pointer: DO NOT MODIFY
* dr_msg_cb:
* msg_opaque from produce() call */
} rd_kafka_message_t;

销毁message

1
void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage);

获取message中的错误信息

1
2
static const char *
rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage);

获取message中的时间戳

1
2
3
4
5
6
7
8
9
// 时间戳类型
typedef enum rd_kafka_timestamp_type_t {
RD_KAFKA_TIMESTAMP_NOT_AVAILABLE, /* Timestamp not available */
RD_KAFKA_TIMESTAMP_CREATE_TIME, /* Message creation time */
RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME /* Log append time */
} rd_kafka_timestamp_type_t;

int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
rd_kafka_timestamp_type_t *tstype);

4. conf操作

相关数据结构

1
2
3
4
5
6
// 配置时错误类型
typedef enum {
RD_KAFKA_CONF_UNKNOWN = -2, /**< Unknown configuration name. */
RD_KAFKA_CONF_INVALID = -1, /**< Invalid configuration value. */
RD_KAFKA_CONF_OK = 0 /**< Configuration okay */
} rd_kafka_conf_res_t;

创建、销毁、复制配置句柄

1
2
3
rd_kafka_conf_t *rd_kafka_conf_new(void);
void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);
rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf);

设置某个配置项,所有可以设置的配置项见 CONFIGURATION.md

1
2
3
4
rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
const char *name,
const char *value,
char *errstr, size_t errstr_size);

设置回调,好多个回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// producer发送message后会不论成功或失败都会产生一个事件,该事件触发如下回调
// 这个不推荐使用了,参数比较多
void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf,
void (*dr_cb) (rd_kafka_t *rk,
void *payload, size_t len,
rd_kafka_resp_err_t err,
void *opaque, void *msg_opaque));
// 推荐使用这个,参数统一到了rd_kafka_message_t
void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
void (*consume_cb) (rd_kafka_message_t *
rkmessage,
void *opaque));

// consumer接收message后的回调,配合rd_kafka_consumer_poll()使用
void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
void (*consume_cb) (rd_kafka_message_t *
rkmessage,
void *opaque));

// 重新负载均衡的时候的回调
void rd_kafka_conf_set_rebalance_cb (
rd_kafka_conf_t *conf,
void (*rebalance_cb) (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque));

// commit的时候的回调,跟rd_kafka_consumer_poll()配合使用
void rd_kafka_conf_set_offset_commit_cb (
rd_kafka_conf_t *conf,
void (*offset_commit_cb) (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque));

// kafka内部发生严重错误的时候通知应用的回调
void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf,
void (*error_cb) (rd_kafka_t *rk, int err,
const char *reason,
void *opaque));

// producer发送消息或consumer取消息时遇到broker返回限流时间时的回调,不管是返回一段时间还是返回0(限流取消)
void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
void (*throttle_cb) (
rd_kafka_t *rk,
const char *broker_name,
int32_t broker_id,
int throttle_time_ms,
void *opaque));

// 设置日志输出的回调,默认是打印到stderr,内置的log_cb有rd_kafka_log_print(),
// rd_kafka_log_syslog()
void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf,
void (*log_cb) (const rd_kafka_t *rk, int level,
const char *fac, const char *buf));

// 统计时回调,由rd_kafka_poll()每隔statistics.interval.ms(需要单独设置)触发
void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf,
int (*stats_cb) (rd_kafka_t *rk,
char *json,
size_t json_len,
void *opaque));

5. topic操作

topic_conf创建、销毁、设置等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 创建
rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void);

// 备份
rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t
*conf);

// 销毁
void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf);

// 设置
rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,
const char *name,
const char *value,
char *errstr, size_t errstr_size);

topic的各种操作

1
2
3
4
5
6
// 创建
rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic,
rd_kafka_topic_conf_t *conf);

// 销毁
void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt);

6. kafka主操作句柄

rd_kafka_t的各种操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 创建
// kafka类型
typedef enum rd_kafka_type_t {
RD_KAFKA_PRODUCER, /**< Producer client */
RD_KAFKA_CONSUMER /**< Consumer client */
} rd_kafka_type_t;
rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,
char *errstr, size_t errstr_size);

// 销毁
void rd_kafka_destroy(rd_kafka_t *rk);

// poll,能捕捉到的事件有:
// delivery report callbacks (if dr_cb/dr_msg_cb is configured) [producer]
// error callbacks (rd_kafka_conf_set_error_cb()) [all]
// stats callbacks (rd_kafka_conf_set_stats_cb()) [all]
// throttle callbacks (rd_kafka_conf_set_throttle_cb()) [all]
int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms);

// 取消掉该次事件,只能在各种callback中使用
void rd_kafka_yield (rd_kafka_t *rk);

// 暂停某些partitions发送或接收消息
rd_kafka_resp_err_t
rd_kafka_pause_partitions (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *partitions);

// 恢复暂停的partitions的工作
rd_kafka_resp_err_t
rd_kafka_resume_partitions (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *partitions);

// 获取当前partition的高低水位,阻塞
rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets (rd_kafka_t *rk,
const char *topic, int32_t partition,
int64_t *low, int64_t *high, int timeout_ms);
// 获取当前partition的高低水位,非阻塞
rd_kafka_resp_err_t
rd_kafka_get_watermark_offsets (rd_kafka_t *rk,
const char *topic, int32_t partition,
int64_t *low, int64_t *high);

7. kafka_queue操作

kafka_queue能将来自几个不同的topic+partition的message汇总到一个queue中交给应用去处理
创建、销毁

1
2
rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk);
void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu);

重新路由的操作,不能对相同的topic+partition调用多次

1
2
int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition,
int64_t offset, rd_kafka_queue_t *rkqu);

接收处理message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 接收一条message
rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu,
int timeout_ms);

// 接收多条message
ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu,
int timeout_ms,
rd_kafka_message_t **rkmessages,
size_t rkmessages_size);

// 使用回调去处理接收到的消息,处理速度最快
int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu,
int timeout_ms,
void (*consume_cb) (rd_kafka_message_t
*rkmessage,
void *opaque),
void *opaque);

8. consumer操作

message的offset可以为绝对数或者以下逻辑值:

1
2
3
4
RD_KAFKA_OFFSET_BEGINNING
RD_KAFKA_OFFSET_END
RD_KAFKA_OFFSET_STORED
RD_KAFKA_OFFSET_TAIL

低级api

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 开始接收message
int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
int64_t offset);

// 停止接收
int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);

// 查找现在的offset
rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *rkt,
int32_t partition,
int64_t offset,
int timeout_ms);

// 接收一条message
rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
int timeout_ms);

// 接收多条message
ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition,
int timeout_ms,
rd_kafka_message_t **rkmessages,
size_t rkmessages_size);

// 使用回调处理收到的message,速度最快
int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition,
int timeout_ms,
void (*consume_cb) (rd_kafka_message_t
*rkmessage,
void *opaque),
void *opaque);

// 手动commit
rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt,
int32_t partition, int64_t offset);

高级api

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 订阅topic
rd_kafka_resp_err_t
rd_kafka_subscribe (rd_kafka_t *rk,
const rd_kafka_topic_partition_list_t *topics);

// 取消订阅
rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk);

// 获取当前的订阅
rd_kafka_resp_err_t
rd_kafka_subscription (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t **topics);

// 消费message
rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);

// 关闭consumer
rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk);

// 分派partition,跟rd_kafka_subscribe()的区别就是assign必须有partition才生效,
// 当你知道partition的时候,最好用这个,因为kafka不用再去找了,速度最快
rd_kafka_resp_err_t
rd_kafka_assign (rd_kafka_t *rk,
const rd_kafka_topic_partition_list_t *partitions);

// 获取当前分派的partition
rd_kafka_resp_err_t
rd_kafka_assignment (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t **partitions);

// 手动commit,可以同步可以异步,异步的时候要设置rd_kafka_conf_set_offset_commit_cb()
rd_kafka_resp_err_t
rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets,
int async);

// 手动commit
rd_kafka_resp_err_t
rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
int async);

// 获取当前commit的offset
rd_kafka_resp_err_t
rd_kafka_committed (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *partitions,
int timeout_ms);

// 获取当前topic+partition的offset
rd_kafka_resp_err_t
rd_kafka_position (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *partitions);

要调用rd_kafka_consumer_poll(),还需要一个高级API:

1
2
// 将接收到 queue 从rd_kafka_poll()重定向到rd_kafka_poll_set_consumer()
rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk);

9. producer操作

message flags

1
2
#define RD_KAFKA_MSG_F_FREE  0x1 /* Delegate freeing of payload to rdkafka. */
#define RD_KAFKA_MSG_F_COPY 0x2 /* rdkafka will make a copy of the payload. */

发送

1
2
3
4
5
6
7
8
9
10
11
// 发送一条
int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
int msgflags,
void *payload, size_t len,
const void *key, size_t keylen,
void *msg_opaque);

// 发送很多条
int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition,
int msgflags,
rd_kafka_message_t *rkmessages, int message_cnt);

10.杂项

获取元信息(metadata,就是全局的一些信息,比如broker的地址啦,名称啦,topic的个数啦,都叫什么啦)

1
2
3
4
5
6
7
rd_kafka_resp_err_t
rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
rd_kafka_topic_t *only_rkt,
const struct rd_kafka_metadata **metadatap,
int timeout_ms);

void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata);

groups信息

1
2
3
4
5
6
7
8
// 获取信息
rd_kafka_resp_err_t
rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
const struct rd_kafka_group_list **grplistp,
int timeout_ms);

// 销毁获取的结构体
void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist);

添加brokers

1
int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist);

设置kafka内部日志等级

1
void rd_kafka_set_log_level(rd_kafka_t *rk, int level);

导出当前kafka的状态,只用于debug

1
void rd_kafka_dump(FILE *fp, rd_kafka_t *rk);

参考资料

  1. 针对kafka本身的所有的配置项CONFIGURATION.md
  2. kafka的C客户端github