用到的更新:2017年11月08日07:23:09
0. 总结 支持
High-level producer
High-level consumer
Simple (Low-level) consumer
压缩:snappy, gzip, lz4
SSL
SASL
consumer有两套API,高级(high-level)和简单(simple)的,取名叫简单的
其实不是很准确,应该叫底层API或者低级API,它跟高级API的区别是没有自动负载均衡,而高级API会自动进行负载均衡–当然底层的适合折腾。我下边就直接叫高级和低级API了。
kafka的 C 客户端主要用途就是发数据收数据,为了收发数据就有了
发数据(producer干的活儿)比较好办,可以选择发一条和发多条:
1 2 3 4 // 发一条 rd_kafka_produce(); // 发多条 rd_kafka_produce_batch();
收数据(consumer干的活儿)的选择比较多:
1 2 3 4 5 6 7 8 9 10 // 高级API msg = rd_kafka_consumer_poll(); // 低级API // 一次收一条 msg = rd_kafka_consume(); // 一次收多条 msg = rd_kafka_consume_batch(); // 使用回调处理收到的msg,速度最快了 rd_kafka_consume_callback();
收发数据之前至少要先有个统一的句柄,好让kafka内部准备好连接brokers集群、建立内部使用的各项结构之类的工作:
1 2 // 建立producer或者consumer都用这个 rd = rd_kafka_new();
建立的这个kafka句柄要知道连到哪个broker:
1 2 3 4 5 // 运行时动态添加brokers rd_kafka_brokers_add(); // 使用配置项添加brokers rd_kafka_conf_set(conf, "metadata.broker.list",brokers, errstr, sizeof(errstr);
发布消息用rd_kafka_topic_t
;订阅消息有两套,一套对应高级API:rd_kafka_topic_partition_list_t
,一套对应低级API:rd_kafka_topic_t
(跟producer用的一样)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 // 对rd_kafka_topic_partition_list_t结构的操作 // 创建 rd_kafka_topic_partition_list_new(); // 增加元素 rd_kafka_topic_partition_list_add(); // 删除元素 rd_kafka_topic_partition_list_del(); // 查找元素 rd_kafka_topic_partition_list_find(); // 对rd_kafka_topic_t的操作 // 创建 rd_kafka_topic_new(); // 删除 rd_kafka_topic_destroy(); // 获取该topic的名字 rd_kafka_topic_name(); // 获取该topic传入的应用参数 rd_kafka_topic_opaque(); // 使用rd_kafka_topic_partition_list_t的时候,topic+partition是连在一起的, // 所以给kafka句柄的时候只用一个参数就够了 // 订阅消息 rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics); // 指定消费的partition,可以在运行时更换 rd_kafka_assign (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions); // 用rd_kafka_topic_t比较麻烦,需要配合一个partition才行 // 直接启动consumer了 rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset); // 每次接收也要带上partition rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms); // 发送的时候使用 rd_kafka_topic_t int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque);
另外,对kafka各种事件进行响应的是:
1 2 int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms); rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);
第一个是主要的入口,可以用于producer和consumer,第二个是专门针对consumer的入口。
除了上边这些主要的,kafka正常运行还需要一些辅助的东西,比如配置,有两个配置项,一个是kafka本身的rd_kafka_conf_t
,一个是针对topic的rd_kafka_topic_conf_t
:
1 2 3 4 5 6 7 8 9 // 针对kafka本身的rd_kafka_conf_t在创建kafka句柄之前设置,在创建kafka句柄的时候用到, // 用完就死掉了,不用单独针对rd_kafka_conf_t去销毁,一心想要销毁的还会出错 rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size); // 针对topic的rd_kafka_topic_conf_t在创建rd_kafka_topic_t时候会用到,跟上一个的 // 命运一样,用完就死了 rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf);
比如队列queue,用来支持低级API把从几个topic收到的message汇聚到一起来处理的额外数据结构,高级api就没有这么自由了。 还比如错误处理,至关重要,但是比较琐碎。
1. 错误处理 相关数据结构
1 2 3 4 5 6 7 // kafka内部错误信息 typedef enum { /* Internal errors to rdkafka: */ /** Begin internal error codes */ RD_KAFKA_RESP_ERR__BEGIN = -200, ... } rd_kafka_resp_err_t;
获取可读的错误内容的字符串
1 const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
获取错误名称的字符串
1 const char *rd_kafka_err2name (rd_kafka_resp_err_t err);
获取最近的错误
1 2 3 4 5 6 7 8 9 10 11 // 能产生错误的旧API // rd_kafka_topic_new() // rd_kafka_consume_start() // rd_kafka_consume_stop() // rd_kafka_consume() // rd_kafka_consume_batch() // rd_kafka_consume_callback() // rd_kafka_consume_queue() // rd_kafka_produce() rd_kafka_resp_err_t rd_kafka_last_error (void);
2. topic_partition_list操作 相关数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 // 表示一个topic+partition typedef struct rd_kafka_topic_partition_s { char *topic; /* Topic name */ int32_t partition; /* Partition */ int64_t offset; /* Offset */ void *metadata; /* Metadata */ size_t metadata_size; /* Metadata size */ void *opaque; /* Application opaque */ rd_kafka_resp_err_t err; /* Error code, depending on use. */ void *_private; /* INTERNAL USE ONLY, * INITIALIZE TO ZERO, DO NOT TOUCH */ } rd_kafka_topic_partition_t; // 存放的list,一般是操作这个结构 typedef struct rd_kafka_topic_partition_list_s { int cnt; /* Current number of elements */ int size; /* Current allocated size */ rd_kafka_topic_partition_t *elems; /* Element array[] */ } rd_kafka_topic_partition_list_t;
新建、销毁list
1 2 3 4 5 6 7 8 // 创建 rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_new (int size); // 销毁 void rd_kafka_topic_partition_list_destroy ( rd_kafka_topic_partition_list_t *rkparlist);
添加新元素
1 2 3 4 5 6 7 8 9 10 11 // 添加1个元素,返回的element可用于填写其他field rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_add ( rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition); // 添加多个元素 void rd_kafka_topic_partition_list_add_range ( rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t start, int32_t stop);
删除元素
1 2 3 4 5 6 7 8 9 10 11 // 根据topic+partition删除 int rd_kafka_topic_partition_list_del ( rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition); // 根据index删除 int rd_kafka_topic_partition_list_del_by_idx ( rd_kafka_topic_partition_list_t *rktparlist, int idx);
查找元素
1 2 3 4 rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_find ( rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition);
3. message操作 相关数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 typedef struct rd_kafka_message_s { rd_kafka_resp_err_t err; /* Non-zero for error signaling. */ rd_kafka_topic_t *rkt; /* Topic */ int32_t partition; /* Partition */ void *payload; /* Producer: original message payload. * Consumer: Depends on the value of err : * err==0: Message payload. * err!=0: Error string */ size_t len; /* Depends on the value of err : * err==0: Message payload length * err!=0: Error string length */ void *key; /* Depends on the value of err : * err==0: Optional message key */ size_t key_len; /* Depends on the value of err : * err==0: Optional message key length*/ int64_t offset; /* Consume: * Message offset (or offset for error * if err!=0 if applicable). * dr_msg_cb: * Message offset assigned by broker. * If produce.offset.report is set then * each message will have this field set, * otherwise only the last message in * each produced internal batch will * have this field set, otherwise 0. */ void *_private; /* Consume: * rdkafka private pointer: DO NOT MODIFY * dr_msg_cb: * msg_opaque from produce() call */ } rd_kafka_message_t;
销毁message
1 void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage);
获取message中的错误信息
1 2 static const char * rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage);
获取message中的时间戳
1 2 3 4 5 6 7 8 9 // 时间戳类型 typedef enum rd_kafka_timestamp_type_t { RD_KAFKA_TIMESTAMP_NOT_AVAILABLE, /* Timestamp not available */ RD_KAFKA_TIMESTAMP_CREATE_TIME, /* Message creation time */ RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME /* Log append time */ } rd_kafka_timestamp_type_t; int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage, rd_kafka_timestamp_type_t *tstype);
4. conf操作 相关数据结构
1 2 3 4 5 6 // 配置时错误类型 typedef enum { RD_KAFKA_CONF_UNKNOWN = -2, /**< Unknown configuration name. */ RD_KAFKA_CONF_INVALID = -1, /**< Invalid configuration value. */ RD_KAFKA_CONF_OK = 0 /**< Configuration okay */ } rd_kafka_conf_res_t;
创建、销毁、复制配置句柄
1 2 3 rd_kafka_conf_t *rd_kafka_conf_new(void); void rd_kafka_conf_destroy(rd_kafka_conf_t *conf); rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf);
设置某个配置项,所有可以设置的配置项见 CONFIGURATION.md
1 2 3 4 rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size);
设置回调,好多个回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 // producer发送message后会不论成功或失败都会产生一个事件,该事件触发如下回调 // 这个不推荐使用了,参数比较多 void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf, void (*dr_cb) (rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, void *opaque, void *msg_opaque)); // 推荐使用这个,参数统一到了rd_kafka_message_t void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf, void (*consume_cb) (rd_kafka_message_t * rkmessage, void *opaque)); // consumer接收message后的回调,配合rd_kafka_consumer_poll()使用 void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf, void (*consume_cb) (rd_kafka_message_t * rkmessage, void *opaque)); // 重新负载均衡的时候的回调 void rd_kafka_conf_set_rebalance_cb ( rd_kafka_conf_t *conf, void (*rebalance_cb) (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)); // commit的时候的回调,跟rd_kafka_consumer_poll()配合使用 void rd_kafka_conf_set_offset_commit_cb ( rd_kafka_conf_t *conf, void (*offset_commit_cb) (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque)); // kafka内部发生严重错误的时候通知应用的回调 void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf, void (*error_cb) (rd_kafka_t *rk, int err, const char *reason, void *opaque)); // producer发送消息或consumer取消息时遇到broker返回限流时间时的回调,不管是返回一段时间还是返回0(限流取消) void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf, void (*throttle_cb) ( rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque)); // 设置日志输出的回调,默认是打印到stderr,内置的log_cb有rd_kafka_log_print(), // rd_kafka_log_syslog() void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf, void (*log_cb) (const rd_kafka_t *rk, int level, const char *fac, const char *buf)); // 统计时回调,由rd_kafka_poll()每隔statistics.interval.ms(需要单独设置)触发 void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf, int (*stats_cb) (rd_kafka_t *rk, char *json, size_t json_len, void *opaque));
5. topic操作 topic_conf创建、销毁、设置等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 // 创建 rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void); // 备份 rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t *conf); // 销毁 void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf); // 设置 rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size);
topic的各种操作
1 2 3 4 5 6 // 创建 rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf); // 销毁 void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt);
6. kafka主操作句柄 rd_kafka_t的各种操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 // 创建 // kafka类型 typedef enum rd_kafka_type_t { RD_KAFKA_PRODUCER, /**< Producer client */ RD_KAFKA_CONSUMER /**< Consumer client */ } rd_kafka_type_t; rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size); // 销毁 void rd_kafka_destroy(rd_kafka_t *rk); // poll,能捕捉到的事件有: // delivery report callbacks (if dr_cb/dr_msg_cb is configured) [producer] // error callbacks (rd_kafka_conf_set_error_cb()) [all] // stats callbacks (rd_kafka_conf_set_stats_cb()) [all] // throttle callbacks (rd_kafka_conf_set_throttle_cb()) [all] int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms); // 取消掉该次事件,只能在各种callback中使用 void rd_kafka_yield (rd_kafka_t *rk); // 暂停某些partitions发送或接收消息 rd_kafka_resp_err_t rd_kafka_pause_partitions (rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions); // 恢复暂停的partitions的工作 rd_kafka_resp_err_t rd_kafka_resume_partitions (rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions); // 获取当前partition的高低水位,阻塞 rd_kafka_resp_err_t rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms); // 获取当前partition的高低水位,非阻塞 rd_kafka_resp_err_t rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic, int32_t partition, int64_t *low, int64_t *high);
7. kafka_queue操作 kafka_queue能将来自几个不同的topic+partition的message汇总到一个queue中交给应用去处理 创建、销毁
1 2 rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk); void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu);
重新路由的操作,不能对相同的topic+partition调用多次
1 2 int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, rd_kafka_queue_t *rkqu);
接收处理message
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // 接收一条message rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu, int timeout_ms); // 接收多条message ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size); // 使用回调去处理接收到的消息,处理速度最快 int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu, int timeout_ms, void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque), void *opaque);
8. consumer操作 message的offset可以为绝对数或者以下逻辑值:
1 2 3 4 RD_KAFKA_OFFSET_BEGINNING RD_KAFKA_OFFSET_END RD_KAFKA_OFFSET_STORED RD_KAFKA_OFFSET_TAIL
低级api
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 // 开始接收message int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset); // 停止接收 int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition); // 查找现在的offset rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, int timeout_ms); // 接收一条message rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms); // 接收多条message ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size); // 使用回调处理收到的message,速度最快 int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque), void *opaque); // 手动commit rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset);
高级api
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 // 订阅topic rd_kafka_resp_err_t rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics); // 取消订阅 rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk); // 获取当前的订阅 rd_kafka_resp_err_t rd_kafka_subscription (rd_kafka_t *rk, rd_kafka_topic_partition_list_t **topics); // 消费message rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms); // 关闭consumer rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk); // 分派partition,跟rd_kafka_subscribe()的区别就是assign必须有partition才生效, // 当你知道partition的时候,最好用这个,因为kafka不用再去找了,速度最快 rd_kafka_resp_err_t rd_kafka_assign (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions); // 获取当前分派的partition rd_kafka_resp_err_t rd_kafka_assignment (rd_kafka_t *rk, rd_kafka_topic_partition_list_t **partitions); // 手动commit,可以同步可以异步,异步的时候要设置rd_kafka_conf_set_offset_commit_cb() rd_kafka_resp_err_t rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async); // 手动commit rd_kafka_resp_err_t rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async); // 获取当前commit的offset rd_kafka_resp_err_t rd_kafka_committed (rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions, int timeout_ms); // 获取当前topic+partition的offset rd_kafka_resp_err_t rd_kafka_position (rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions);
要调用rd_kafka_consumer_poll()
,还需要一个高级API:
1 2 // 将接收到 queue 从rd_kafka_poll()重定向到rd_kafka_poll_set_consumer() rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk);
9. producer操作 message flags
1 2 #define RD_KAFKA_MSG_F_FREE 0x1 /* Delegate freeing of payload to rdkafka. */ #define RD_KAFKA_MSG_F_COPY 0x2 /* rdkafka will make a copy of the payload. */
发送
1 2 3 4 5 6 7 8 9 10 11 // 发送一条 int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque); // 发送很多条 int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, rd_kafka_message_t *rkmessages, int message_cnt);
10.杂项 获取元信息(metadata,就是全局的一些信息,比如broker的地址啦,名称啦,topic的个数啦,都叫什么啦)
1 2 3 4 5 6 7 rd_kafka_resp_err_t rd_kafka_metadata (rd_kafka_t *rk, int all_topics, rd_kafka_topic_t *only_rkt, const struct rd_kafka_metadata **metadatap, int timeout_ms); void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata);
groups信息
1 2 3 4 5 6 7 8 // 获取信息 rd_kafka_resp_err_t rd_kafka_list_groups (rd_kafka_t *rk, const char *group, const struct rd_kafka_group_list **grplistp, int timeout_ms); // 销毁获取的结构体 void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist);
添加brokers
1 int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist);
设置kafka内部日志等级
1 void rd_kafka_set_log_level(rd_kafka_t *rk, int level);
导出当前kafka的状态,只用于debug
1 void rd_kafka_dump(FILE *fp, rd_kafka_t *rk);
参考资料
针对kafka本身的所有的配置项CONFIGURATION.md
kafka的C客户端github