kafka C++实现生产者

文章目录

  • 1 Kafka 生产者的逻辑
  • 2 Kafka 的C++ API
    • 2.1 RdKafka::Conf
    • 2.2 RdKafka::Message
    • 2.3 RdKafka::DeliveryReportCb
    • 2.4 RdKafka::Event
    • 2.5 RdKafka::EventCb
    • 2.6 RdKafka::PartitionerCb
    • 2.7 RdKafka::Topic
    • 2.8 RdKafka::Producer(核心)
  • 3 Kafka 生产者客户端开发
    • 3.1 必要的参数配置(bootstrap.servers)
    • 3.2 其他重要的生产者参数
      • 3.2.1 acks
      • 3.2.2 max.request.size
      • 3.2.3 retries 和 retry.backoff.ms
      • 3.2.4 compression.type
      • 3.2.5 connection.max.idle.ms
      • 3.2.6 linger.ms
      • 3.2.7 receive.buffer.bytes
      • 3.2.8 send.buffer.bytes
      • 3.2.9 request.timeout.ms
      • 3.2.10 client.id
      • 3.2.11 batch.size
    • 3.3 创建生产者实例
    • 3.4 消息发送
    • 3.5 完整示例代码
  • 4 总结

1 Kafka 生产者的逻辑

在这里插入图片描述
(1)配置生产者客户端参数。
(2)创建相应的生产者实例。
(3)构建待发送的消息。
(4)发送消息。
(5)关闭生产者实例。

2 Kafka 的C++ API

2.1 RdKafka::Conf

enum ConfType{ 
	CONF_GLOBAL, 	// 全局配置 
	CONF_TOPIC 		// Topic配置 
};
enum ConfResult{ 
	CONF_UNKNOWN = -2, 
	CONF_INVALID = -1, 
	CONF_OK = 0 
};
static Conf * create(ConfType type);
//创建配置对象。

Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
//设置配置对象的属性值,成功返回CONF_OK,错误时错误信息输出到errstr。

Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
//设置dr_cb属性值。

Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
//设置event_cb属性值。

Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr);
//设置用于自动订阅Topic的默认Topic配置。

Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
//设置partitioner_cb属性值,配置对象必须是CONF_TOPIC类型。

Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr);
//设置partitioner_key_pointer_cb属性值。

Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr);
//设置socket_cb属性值。

Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr);
//设置open_cb属性值。

Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr);
//设置rebalance_cb属性值。

Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr);
//设置offset_commit_cb属性值。

Conf::ConfResult get(const std::string &name, std::string &value) const;
//查询单条属性配置值。

2.2 RdKafka::Message

Message表示一条消费或生产的消息,或是事件。

std::string errstr() const;
//如果消息是一条错误事件,返回错误字符串,否则返回空字符串。

ErrorCode err() const;
//如果消息是一条错误事件,返回错误代码,否则返回0。

Topic * topic() const;
//返回消息的Topic对象。如果消息的Topic对象没有显示使用RdKafka::Topic::create()创建,需要使用topic_name函数。

std::string topic_name() const;
//返回消息的Topic名称。

int32_t partition() const;
//如果分区可用,返回分区号。

void * payload() const;
//返回消息数据。

size_t len() const;
//返回消息数据的长度。

const std::string * key() const;
//返回字符串类型的消息key。

const void * key_pointer() const;
//返回void类型的消息key。

size_t key_len() const;
//返回消息key的二进制长度。

int64_t offset () const;
//返回消息或错误的位移。

void * msg_opaque() const;
//返回RdKafka::Producer::produce()提供的msg_opaque。

virtual MessageTimestamp timestamp() const = 0;
//返回消息时间戳。

virtual int64_t latency() const = 0;
//返回produce函数内生产消息的微秒级时间延迟,如果延迟不可用,返回-1。

virtual struct rd_kafka_message_s *c_ptr () = 0;
//返回底层数据结构的C rd_kafka_message_t句柄。

virtual Status status () const = 0;
//返回消息在Topic Log的持久化状态。

virtual RdKafka::Headers *headers () = 0;
//返回消息头。

virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0;
//返回消息头,错误信息会输出到err。

2.3 RdKafka::DeliveryReportCb

每收到一条RdKafka::Producer::produce()函数生产的消息,调用一次投递报告回调函数,RdKafka::Message::err()将会标识Produce请求的结果。
为了使用队列化的投递报告回调函数,必须调用RdKafka::poll()函数。

virtual void dr_cb(Message &message)=0;

当一条消息成功生产或是rdkafka遇到永久失败或是重试次数耗尽,投递报告回调函数会被调用。

C++封装示例:

class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:
	void dr_cb(RdKafka::Message &message)
	{
		if(message.err())
			std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
		else
		{
			// Message delivered to topic test [0] at offset 135000
			std::cerr << "Message delivered to topic " << message.topic_name()
				<< " [" << message.partition() << "] at offset "
				<< message.offset() << std::endl;
		}
	}
};

2.4 RdKafka::Event

enum Type{ 
	EVENT_ERROR, //错误条件事件 
	EVENT_STATS, // Json文档统计事件 
	EVENT_LOG, // Log消息事件 
	EVENT_THROTTLE // 来自Broker的throttle级信号事件 
};
virtual Type type() const =0;
//返回事件类型。
virtual ErrorCode err() const =0;
//返回事件错误代码。
virtual Severity severity() const =0;
//返回log严重级别。
virtual std::string fac() const =0;
//返回log基础字符串。
virtual std::string str () const =0;
//返回Log消息字符串。
virtual int throttle_time() const =0;
//返回throttle时间。
virtual std::string broker_name() const =0;
//返回Broker名称。
virtual int broker_id() const =0;
//返回Broker ID。

2.5 RdKafka::EventCb

事件是从RdKafka传递错误、统计信息、日志等消息到应用程序的通用接口。

virtual void event_cb(Event &event)=0; //  事件回调函数

C++封装示例:

class ProducerEventCb : public RdKafka::EventCb
{
public:
    void event_cb(RdKafka::Event &event)
    {
        switch(event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
            break;
        case RdKafka::Event::EVENT_STATS:
            std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl;
            break;
        case RdKafka::Event::EVENT_LOG:
            std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl;
            break;
        case RdKafka::Event::EVENT_THROTTLE:
            std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl;
            break;
        }
    }
};

2.6 RdKafka::PartitionerCb

PartitionerCb用实现自定义分区策略,需要使用RdKafka::Conf::set()设置partitioner_cb属性。

virtual int32_t partitioner_cb(const Topic *topic, const std::string *key, int32_t partition_cnt,void *msg_opaque)=0;
//Partitioner回调函数

返回topic主题中使用key的分区,key可以是NULL或字符串。
partition_cnt表示该主题的分区数量(用于hash计算)
返回值必须在0到partition_cnt间,如果分区失败可能返回RD_KAFKA_PARTITION_UA (-1)。
msg_opaque与RdKafka::Producer::produce()调用提供的msg_opaque相同。

C++封装示例:

class HashPartitionerCb : public RdKafka::PartitionerCb
{
public:
    int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
                            int32_t partition_cnt, void *msg_opaque)
    {
        char msg[128] = {0};
        int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;
        //                          [topic][key][partition_cnt][partition_id] 
        //                          :[test][6419][2][1]
        sprintf(msg, "HashPartitionerCb:topic:[%s], key:[%s]partition_cnt:[%d], partition_id:[%d]", topic->name().c_str(),       
                key->c_str(), partition_cnt, partition_id);
        std::cout << msg << std::endl;
        return partition_id;
    }
private:

    static inline unsigned int generate_hash(const char *str, size_t len)
    {
        unsigned int hash = 5381;
        for (size_t i = 0 ; i < len ; i++)
            hash = ((hash << 5) + hash) + str[i];
        return hash;
    }
};

2.7 RdKafka::Topic

static Topic * create(Handle *base, const std::string &topic_str, Conf *conf, std::string &errstr);
//使用conf配置创建名为topic_str的Topic句柄。

const std::string name ();
//获取Topic名称。

bool partition_available(int32_t partition) const;
//获取parition分区是否可用,只能在 RdKafka::PartitionerCb回调函数内被调用。

ErrorCode offset_store(int32_t partition, int64_t offset);
//存储Topic的partition分区的offset位移,只能用于RdKafka::Consumer,不能用于RdKafka::KafkaConsumer高级接口类。
//使用本接口时,auto.commit.enable参数必须设置为false。

virtual struct rd_kafka_topic_s *c_ptr () = 0;
//返回底层数据结构的rd_kafka_topic_t句柄,不推荐利用rd_kafka_topic_t句柄调用C API,但如果C++ API没有提供相应功能,
//可以直接使用C API和librdkafka核心交互。
static const int32_t PARTITION_UA = -1;		//未赋值分区
static const int64_t OFFSET_BEGINNING = -2;	//特殊位移,从开始消费
static const int64_t OFFSET_END = -1;		//特殊位移,从末尾消费
static const int64_t OFFSET_STORED = -1000;	//使用offset存储

2.8 RdKafka::Producer(核心)

static Producer * create(Conf *conf, std::string &errstr);
//创建一个新的Producer客户端对象,conf用于替换默认配置对象,本函数调用后conf可以重用。成功返回新的Producer客户端对象
//,失败返回NULL,errstr可读错误信息。

ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len,
const std::string *key, void *msg_opaque);
//生产和发送单条消息到Broker。msgflags:可选项为RK_MSG_BLOCK、RK_MSG_FREE、RK_MSG_COPY。

在这里插入图片描述
返回错误码:
在这里插入图片描述

ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len,const void *key, 
size_t key_len, void *msg_opaque);
//生产和发送单条消息到Broker,传递key数据指针和key长度。

ErrorCode produce(Topic *topic, int32_t partition, const std::vector< char > *payload, 
const std::vector< char > *key, void *msg_opaque);
//生产和发送单条消息到Broker,传递消息数组和key数组。接受数组类型的key和payload,数组会被复制。

//ErrorCode flush (int timeout_ms);
//等待所有未完成的所有Produce请求完成。为了确保所有队列和已经执行的Produce请求在中止前完成,flush操作优先于销毁生产者
//实例完成。本函数会调用Producer::poll()函数,因此会触发回调函数。

//ErrorCode purge (int purge_flags);
//清理生产者当前处理的消息。本函数调用时可能会阻塞一定时间,当后台线程队列在清理时。应用程序需要在调用poll或flush函数后
//,执行清理消息的dr_cb回调函数。

virtual Error *init_transactions (int timeout_ms) = 0;
//初始化Producer实例的事务。失败返回RdKafka::Error错误对象,成功返回NULL。
//通过调用RdKafka::Error::is_retriable()函数可以检查返回的错误对象是否有权限重试,调用
//RdKafka::Error::is_fatal()检查返回的错误对象是否是严重错误。返回的错误对象必须elete。

virtual Error *begin_transaction () = 0;
//启动事务。本函数调用前,init_transactions()函数必须被成功调用。
//成功返回NULL,失败返回错误对象。通过调用RdKafka::Error::is_fatal_error()函数可以检查是否是严重错误,返回的错误对象
//必须delete。

virtual Error send_offsets_to_transaction (const std::vector &offsets,const ConsumerGroupMetadata
 *group_metadata,int timeout_ms) = 0;
//发送TopicPartition位移链表到由group_metadata指定的Consumer Group协调器,如果事务提交成功,位移才会被提交。

virtual Error *commit_transaction (int timeout_ms) = 0;
//提交当前事务。在实际提交事务时,任何未完成的消息会被完成投递。
//成功返回NULL,失败返回错误对象。通过调用错误对象的方法可以检查是否有权限重试,是否是严重错误、可中止错误等。

virtual Error *abort_transaction (int timeout_ms) = 0;
//停止事务。本函数从非严重错误、可终止事务中用于恢复。未完成消息会被清理。

3 Kafka 生产者客户端开发

3.1 必要的参数配置(bootstrap.servers)

(1)指定连接 Kafka 集群所需要的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或者多个地址,中间以逗号进行隔开,此参数的默认值为 “”。
(2)注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找其他 broker 的信息。
(3)过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。

// 创建Kafka Conf对象
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(m_config == NULL)
{
    std::cout << "Create RdKafka Conf failed." << std::endl;
}
// 创建Topic Conf对象
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(m_topicConfig == NULL)
{
    std::cout << "Create RdKafka Topic Conf failed." << std::endl;
}
// 设置Broker属性
RdKafka::Conf::ConfResult errCode;
m_dr_cb = new ProducerDeliveryReportCb;
std::string errorStr;
errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed:" << errorStr << std::endl;
}
m_event_cb = new ProducerEventCb;
errCode = m_config->set("event_cb", m_event_cb, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed:" << errorStr << std::endl;
}

m_partitioner_cb = new HashPartitionerCb;
errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed:" << errorStr << std::endl;
}

errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed:" << errorStr << std::endl;
}

errCode = m_config->set("message.max.bytes", "10240000", errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed:" << errorStr << std::endl;
}
errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed:" << errorStr << std::endl;
}

3.2 其他重要的生产者参数

3.2.1 acks

用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消 息是成功写入的。 acks
是生产者客户端中一个非常重要的参数 ,它涉及消息的可靠性和吞吐量之间的权衡。acks 参数有3种类
型的值(都是字符串类型)。

  • acks = 1。默认值即为 1。生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就
    会收到来自服务端的成功响应。如果消息无法写入 leader 副本,比如在 leader 副本崩溃、重新选
    举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者
    可以选择重发消息。如果消息写入 leader 副本并返回成功给生产者,且在被其他 follower 副本拉
    取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应
    的消息。
    acks 设置为 1,是消息可靠性和吞吐量之间的折中方案。
  • acks = 0。生产者发送消息之后不需要等待任何服务端的响应。
    如果在消息从发送到写入 Kafka 的过程中出现了某些异常,导致 Kafka 并没有收到这条消息,那么
    生产者也无从得知,消息也就丢失了。
    在其他配置环境相同的情况下,acks 设置为 0 可以达到最大的吞吐量。
  • acks = -1 或 acks = all。生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后
    才能够收到来自服务端的成功响应。
    在其他配置环境相同的情况下,acks 设置为 -1 可以达到最强的可靠性。
    但是并不意味着消息就一定可靠,因为 ISR 中可能只有 leader 副本,这样就退化成了 acks = 1 的
    情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动。

注意 acks 参数配置的值是一个字符串类型,而不是整数类型。

//范例:
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
ConfResult ret = conf->set("acks", "1", errstr);
ConfResult ret = conf->set("acks", "0", errstr);
ConfResult ret = conf->set("acks", "all", errstr);

3.2.2 max.request.size

这个参数用来限制生产者客户端能够发送的消息的最大值,默认值为 1048576 B,即 1 MB。
一般情况下,这个默认值就可以满足大多数的应用场景了。
不建议盲目地增大这个参数的配置值,尤其是在对 Kafka 整体脉络没有足够把控的时候。
因为这个参数还涉及一些其他参数的联动,比如 broker 端的 message.max.bytes 参数,如果配
置错误可能会引起一些不必要的一场。
比如讲 broker 端的 message.max.bytes 参数配置为 10, 而 max.request.size 参数配置为 20,
那么当我们发送一条消息大小为 15 的消息时,生产者客户端就会报出异常:
The reqeust included a message larger than the max message size the server will accept.

errCode = conf->set("message.max.bytes", "10240000", errorStr);

3.2.3 retries 和 retry.backoff.ms

retries 重试次数,默认0

retry.backoff.ms 重试间隔,默认100

  • retries 参数用来配置生产者重试的次数,默认值为 0,即发生异常的时候不进行任何的重试动作
  • 消息在从生产者发出道成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、Leader
    副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于 0 的值,以此
    通过内部重试来恢复而不是一味的将异常抛给生产者的应用程序。
  • 如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。
  • 不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过 max.request.size 参数 配置的值时,这种方式就不行了。
  • 重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为 100,它用来设定两次重试之
    间的时间间隔,避免无效的频繁重试。
  • 在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总
    的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。
  • Kafka 可以保证同一个分区中的消息时有序的。
  • 如果生产者按照一定的顺序发送消息,那么这些消息也会顺序的写入分区,进而消费者也可以按照 同样的顺序消费它们。
  • 对于某些应用来说,顺序性非常重要,比如 Mysql 的 binlog 传输,如果出现错误就会造成非常严 重的后果。
  • 如果讲 retries 参数设置为非零值,并且 max.in.flight.requests.per.connection 参数配置为大于
    1 的值,那么就会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生
    产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出 现了错序。
  • 一般而言,在需要保证顺序的场合建议把参数 max.in.flight.requests.per.connection 配置为 1,而不是把
    retries 配置为 0. 不过这样也会影响整体的吞吐。

max.in.flight.requests.per.connection = 1 限制客户端在单个连接上能够发送的未响应请求的个数。设
置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数
是为了避免消息乱序

3.2.4 compression.type

  • 这个参数用来指定消费的压缩方式,默认值为 “none”,即默认情况下,消息不会被压缩。
  • 该参数还可以配置为 “gzip”,“snappy”,“lz4”。
  • 对消息进行压缩可以极大地减少网络传输量、降低网络 I/O ,从而提高整体的性能。
  • 消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压
    缩。

3.2.5 connection.max.idle.ms

这个参数用来指定在多久之后关闭闲置的连接,默认值时 540000 ms,即 9 分钟。

3.2.6 linger.ms

  • 这个参数用来指定生产者发送 Producer Batch 之前等待更多消息(ProducerRecord)加入
    ProducerBatch 的时间,默认值为 0。

  • 生产者客户端会在 ProducerBatch 被填满或等待时间超过 linger.ms 值时发送出去。

  • 增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。

  • 这个 linger.ms 参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。

3.2.7 receive.buffer.bytes

  • 这个参数用来设置 Socket 接受消息缓冲区(SO_RECBUF)的大小,默认值为 32768(B),即 32
    KB。

  • 如果设置为 -1,则使用操作系统的默认值。

  • 如果 Producer 与 Kafka 处于不同的机房,则可以适当调大这个参数值。

3.2.8 send.buffer.bytes

  • 这个参数用来设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 131072 (B),即
    128 KB。
  • 与 receive.buffer.bytes 参数一样,如果设置为 -1 ,则使用操作系统默认值。

3.2.9 request.timeout.ms

  • 这个参数用来配置 Producer 等待请求响应的最长时间,默认值为 30000 ms。
  • 请求超时之后可以选择进行重试。
  • 注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重
    试而引起的消息重复的概率。
  • 根据具体场景和需求,需要根据网络状况、Kafka集群负载和消息处理要求来调整该参数值。较低延迟要求的场景可以选择较小的值,而对于网络不稳定或处理压力较大的情况,可能需要适当增加该参数值。

3.2.10 client.id

用来设定 KafkaProducer 对应的客户端 id。默认值为 “”。

3.2.11 batch.size

batch.size 是 producer 最重要的参数之一 !它对于调优 producer 吞吐量和延时性能指标都有着非常
重要的作用

producer 会将发往同一分区的多条消息封装进一个 batch中,当 batch 满了的时候, producer 会发送
batch 中的所有消息 。不过, producer并不总是等待batch满了才发送消息,很有可能当batch还有很
多空闲空间时 producer 就发送该 batch 。显然,batch 的大小就显得非常重要 。
通常来说,一个小的 batch 中包含的消息数很少,因而一次发送请求能够写入的消息数也很少,所以
producer 的吞吐量会很低;一个 batch 非常之巨大,那么会给内存使用带来极大的压力,因为不管是
否能够填满,producer 都会为该batch 分配固定大小的内存。
因此batch.size 参数的设置其实是一种时间与空间权衡的体现 。batch.size 参数默认值是 16384 ,即
16KB 。这其实是一个非常保守的数字。 在实际使用过程中合理地增加该参数值,通常都会发现
producer 的吞吐量得到了相应的增加 。

3.3 创建生产者实例

生产者的相关配置和实例的创建可以在类的构造函数实现。比如Kafka Conf对象、Topic Conf对象、设置Broker属性、Producer、Topic对象等。

// 创建Producer
m_producer = RdKafka::Producer::create(m_config, errorStr);
if(m_producer == NULL)
{
    std::cout << "Create Producer failed:" << errorStr << std::endl;
}
// 创建Topic对象
m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
if(m_topic == NULL)
{
    std::cout << "Create Topic failed:" << errorStr << std::endl;
}

3.4 消息发送

librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口
同一个生产者可以发送多个主题的,在内部处理时根据传入的topic对象发送给对应的主题分区。

 RdKafka::ErrorCode errorCode = m_producer->produce(
 									m_topic, 
 									RdKafka::Topic::PARTITION_UA,
                                   	RdKafka::Producer::RK_MSG_COPY,
                                   	payload, 
                                   	len, 
                                   	&key, 
                                   	NULL);

3.5 完整示例代码

KafkaProducer.h

#ifndef KAFKAPRODUCER_H
#define KAFKAPRODUCER_H

#pragma once
#include <string>
#include <iostream>
#include "rdkafkacpp.h"

class KafkaProducer
{
public:
	/**
	* @brief KafkaProducer
	* @param brokers
	* @param topic
	* @param partition
	*/
	explicit KafkaProducer(const std::string& brokers, const std::string& topic, int partition);
	/**
	* @brief push Message to Kafka
	* @param str, message data
	*/
	void pushMessage(const std::string& str, const std::string& key);
	~KafkaProducer();

private:
	std::string m_brokers;			// Broker列表,多个使用逗号分隔
	std::string m_topicStr;			// Topic名称
	int m_partition;				// 分区

	RdKafka::Conf* m_config;        // Kafka Conf对象
	RdKafka::Conf* m_topicConfig;   // Topic Conf对象
	RdKafka::Topic* m_topic;		// Topic对象
	RdKafka::Producer* m_producer;	// Producer对象

	/*只要看到Cb 结尾的类,要继承它然后实现对应的回调函数*/
	RdKafka::DeliveryReportCb* m_dr_cb;
	RdKafka::EventCb* m_event_cb;
	RdKafka::PartitionerCb* m_partitioner_cb;
};

#endif

KafkaProducer.cpp

#include "KafkaProducer.h"

// call back
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:
	void dr_cb(RdKafka::Message &message)
	{
		if(message.err())
			std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
		else
		{
			// Message delivered to topic test [0] at offset 135000
			std::cerr << "Message delivered to topic " << message.topic_name()
				<< " [" << message.partition() << "] at offset "
				<< message.offset() << std::endl;
		}
	}
};

class ProducerEventCb : public RdKafka::EventCb
{
public:
	void event_cb(RdKafka::Event &event)
	{
		switch (event.type())
		{
		case RdKafka::Event::EVENT_ERROR:
			std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
			break;
		case RdKafka::Event::EVENT_STATS:
			std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl;
			break;
		case RdKafka::Event::EVENT_LOG:
			std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl;
			break;
		case RdKafka::Event::EVENT_THROTTLE:
			std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl;
			break;
		}
	}
};

class HashPartitionerCb : public RdKafka::PartitionerCb
{
public:
	int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
		int32_t partition_cnt, void *msg_opaque)
	{
		char msg[128] = { 0 };
		int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;
		//                          [topic][key][partition_cnt][partition_id] 
		//                          :[test][6419][2][1]
		sprintf(msg, "HashPartitionerCb:topic:[%s], key:[%s]partition_cnt:[%d], partition_id:[%d]", topic->name().c_str(),
			key->c_str(), partition_cnt, partition_id);
		std::cout << msg << std::endl;
		return partition_id;
	}
private:

	static inline unsigned int generate_hash(const char *str, size_t len)
	{
		unsigned int hash = 5381;
		for (size_t i = 0; i < len; i++)
			hash = ((hash << 5) + hash) + str[i];
		return hash;
	}
};


KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition)
{
	m_brokers = brokers;
	m_topicStr = topic;
	m_partition = partition;

	/* 创建Kafka Conf对象 */
	m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
	if(m_config==NULL)
		std::cout << "Create RdKafka Conf failed." << std::endl;

	/* 创建Topic Conf对象 */
	m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
	if (m_topicConfig == NULL)
		std::cout << "Create RdKafka Topic Conf failed." << std::endl;

	/* 设置Broker属性 */
	RdKafka::Conf::ConfResult errCode;
	std::string errorStr;
	m_dr_cb = new ProducerDeliveryReportCb;
	// 设置dr_cb属性值
	errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
	if (errCode != RdKafka::Conf::CONF_OK)
	{
		std::cout << "Conf set failed:" << errorStr << std::endl;
	}
	// 设置event_cb属性值
	m_event_cb = new ProducerEventCb;
	errCode = m_config->set("event_cb", m_event_cb, errorStr);
	if (errCode != RdKafka::Conf::CONF_OK)
	{
		std::cout << "Conf set failed:" << errorStr << std::endl;
	}
	// 自定义分区策略
	m_partitioner_cb = new HashPartitionerCb;
	errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
	if (errCode != RdKafka::Conf::CONF_OK)
	{
		std::cout << "Conf set failed:" << errorStr << std::endl;
	}
	// 设置配置对象的属性值
	errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
	if (errCode != RdKafka::Conf::CONF_OK)
	{
		std::cout << "Conf set failed:" << errorStr << std::endl;
	}
	errCode = m_config->set("message.max.bytes", "10240000", errorStr);
	if (errCode != RdKafka::Conf::CONF_OK)
	{
		std::cout << "Conf set failed:" << errorStr << std::endl;
	}
	errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
	if (errCode != RdKafka::Conf::CONF_OK)
	{
		std::cout << "Conf set failed:" << errorStr << std::endl;
	}

	/* 创建Producer */
	m_producer = RdKafka::Producer::create(m_config, errorStr);
	if (m_producer == NULL)
	{
		std::cout << "Create Producer failed:" << errorStr << std::endl;
	}

	/* 创建Topic对象 */
	m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
	if (m_topic == NULL)
	{
		std::cout << "Create Topic failed:" << errorStr << std::endl;
	}
}

KafkaProducer::~KafkaProducer()
{
	while (m_producer->outq_len() > 0)
	{
		std::cerr << "Waiting for " << m_producer->outq_len() << std::endl;
		m_producer->flush(5000);
	}
	delete m_config;
	delete m_topicConfig;
	delete m_topic;
	delete m_producer;
	delete m_dr_cb;
	delete m_event_cb;
	delete m_partitioner_cb;
}

void KafkaProducer::pushMessage(const std::string& str, const std::string& key)
{
	int32_t len = str.length();
	void* payload = const_cast<void*>(static_cast<const void*>(str.data()));
	RdKafka::ErrorCode errorCode = m_producer->produce(
		m_topic,
		RdKafka::Topic::PARTITION_UA,
		RdKafka::Producer::RK_MSG_COPY,
		payload,
		len,
		&key,
		NULL);
	m_producer->poll(0);
	if (errorCode != RdKafka::ERR_NO_ERROR)
	{
		std::cerr << "Produce failed: " << RdKafka::err2str(errorCode) << std::endl;
		if (errorCode == RdKafka::ERR__QUEUE_FULL)
		{
			m_producer->poll(100);
		}
	}
}

CMakeLists.txt

cmake_minimum_required(VERSION 2.8)

project(KafkaProducer)

set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_COMPILER "g++")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")
set(CMAKE_INCLUDE_CURRENT_DIR ON)

# Kafka头文件路径
include_directories(/usr/local/include/librdkafka)
# Kafka库路径
link_directories(/usr/local/lib)

aux_source_directory(. SOURCE)

add_executable(${PROJECT_NAME} ${SOURCE})
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)

测试文件main.cpp

#include <iostream>
#include "KafkaProducer.h"
using namespace std;

int main()
{
    // 创建Producer
    // KafkaProducer producer("127.0.0.1:9092,192.168.2.111:9092", "test", 0);
    KafkaProducer producer("127.0.0.1:9092", "test", 0);
    for(int i = 0; i < 10000; i++)
    {
        char msg[64] = {0};
        sprintf(msg, "%s%4d", "Hello RdKafka ", i);
        // 生产消息
        char key[8] = {0};      // 主要用来做负载均衡
        sprintf(key, "%d", i);
        producer.pushMessage(msg, key);  
    }
    RdKafka::wait_destroyed(5000);
}

编译:

mkdir build
cd build
cmake ..
make

4 总结

Kafka Producer使用流程:

  • 创建Kafka配置实例。
  • 创建Topic配置实例。
  • 设置Kafka配置实例Broker属性。
  • 设置Topic配置实例属性。
  • 注册回调函数(分区策略回调函数需要注册到Topic配置实例)。
  • 创建Kafka Producer客户端实例。
  • 创建Topic实例。
  • 阻塞等待Producer生产消息完成。
  • 等待Produce请求完成。
  • 销毁Kafka Producer客户端实例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/201200.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

合阔智云:实现API无代码开发,连接ERP系统和CRM系统提高运营效率

概述 合阔智云&#xff0c;一家成立于2011年的科技公司&#xff0c;核心业务是提供云原生和移动化设计的新一代全渠道“云端一体”履约中台和去中心化模式智能门店供应链业务中台。他们的系统可以无需API开发即可实现电商系统和客服系统的连接和集成&#xff0c;大大提高了企业…

【机器学习 | 可视化】回归可视化方案

&#x1f935;‍♂️ 个人主页: AI_magician &#x1f4e1;主页地址&#xff1a; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 &#x1f468;‍&#x1f4bb;景愿&#xff1a;旨在于能和更多的热爱计算机的伙伴一起成长&#xff01;&#xff01;&…

pkpmbs 建设工程质量监督系统 文件上传漏洞复现

0x01 产品简介 pkpmbs 建设工程质量监督系统是湖南建研信息技术股份有限公司一个与工程质量检测管理系统相结合的&#xff0c;B/S架构的检测信息监管系统。 0x02 漏洞概述 pkpmbs 建设工程质量监督系统 FileUpOrDown.aspx、/Platform/System/FileUpload.ashx、接口处存在任意文…

hive里如何高效生成唯一ID

常见的方式&#xff1a; hive里最常用的方式生成唯一id&#xff0c;就是直接使用 row_number() 来进行&#xff0c;这个对于小数据量是ok的&#xff0c;但是当数据量大的时候会导致&#xff0c;数据倾斜&#xff0c;因为最后生成全局唯一id的时候&#xff0c;这个任务是放在一个…

鸿蒙4.0开发笔记之ArkTS装饰器语法基础@Extend扩展组件样式与stateStyles多态样式(十一)

一、Extend扩展组件样式 1、作用 前文提到可以使用Styles用于样式的扩展&#xff0c;在Styles的基础上&#xff0c;ArkTS语法还提供了Extend&#xff0c;⽤于扩展原生组件样式&#xff0c;包括Text、Button等等。 2、定义语法 Extend(UIComponentName) function functionNam…

Linux详解——安装JDK

目录 一、下载jdk 二、tar包安装 三、rpm包安装 一、下载jdk 1.下载jdk https://www.oracle.com/technetwork/java/javase/downloads/index.html 2.通过CRT|WinSCP工具将jdk上传到linux系统中 二、tar包安装 # 1.将JDK解压缩到指定目录 tar -zxvf jdk-8u171-linux…

配置自动化部署Jenkins和Gitea

配置自动化部署 这里使用的是JenkinsGitea 如果不知道怎么安装Jenkins和Gitea可以参考下面文章 https://blog.csdn.net/weixin_46533577/article/details/134644144 我的另一篇文章 介绍 前端 先说下自己的情况&#xff0c;因为自己服务器原因&#xff0c;使用的服务器内…

Win10系统无法登录Xbox live的四种解决方法

在Win10系统中&#xff0c;用户可以登录Xbox live平台&#xff0c;畅玩自己喜欢的游戏。但是&#xff0c;有用户却遇到了无法登录Xbox live的问题。接下来小编给大家详细介绍四种简单的解决方法&#xff0c;解决后用户在Win10电脑上就能成功登录上Xbox live平台。 Win10系统无法…

短 URL 生成器设计:百亿短 URL 怎样做到无冲突?

Java全能学习面试指南&#xff1a;https://javaxiaobear.cn 我们先来看看&#xff0c;当高并发遇到海量数据处理时的架构。在社交媒体上&#xff0c;人们经常需要分享一些 URL&#xff0c;但是有些 URL 可能会很长&#xff0c;比如&#xff1a; https://time.geekbang.org/hyb…

水离子水壁炉的科技创新与时尚家居潮流

近年来&#xff0c;水离子水壁炉作为家居装饰的新宠儿&#xff0c;正在以其独特的科技创新和时尚设计引领家居潮流。这一新型壁炉不仅注重外观美感&#xff0c;更借助先进科技实现了温馨的火焰效果&#xff0c;成为现代家居中的独特亮点。 水离子水壁炉的科技创新主要体现在其采…

【Mysql学习笔记】3 - 本章作业

1.判断 1. 这句话表示ename as name 可以不要这个as&#xff0c;同理后面的sal salary也是别名&#xff0c;而选项D的Annual Salary中间也有空格&#xff0c;程序会判断为as 但as不能连用&#xff0c;所以错误&#xff0c;选D 2.选B&#xff0c;因为null不能加上判断符号<&…

Stable Diffusion绘画系列【7】:极致东方美学

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能AI、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推荐--…

leetCode 131.分割回文串 + 回溯算法 + 图解 + 笔记

131. 分割回文串 - 力扣&#xff08;LeetCode&#xff09; 给你一个字符串 s&#xff0c;请你将 s 分割成一些子串&#xff0c;使每个子串都是 回文串 。返回 s 所有可能的分割方案。回文串 是正着读和反着读都一样的字符串 示例 1&#xff1a; 输入&#xff1a;s "aa…

RabbitMQ消息模型之Work Queues

Work Queues Work Queues&#xff0c;也被称为&#xff08;Task Queues&#xff09;&#xff0c;任务模型&#xff0c;也是官网给出的第二个模型&#xff0c;使用的交换机类型是直连direct&#xff0c;也是默认的交换机类型。当消息处理比较耗时的时候&#xff0c;可能生产消息…

F. Magic Will Save the World

首先积攒了能量打了怪再积攒是没有意义的&#xff0c;可以直接积攒好&#xff0c;然后一次性进行攻击 那么怎么进行攻击了&#xff1f;可以尽量的多选怪物使用水魔法攻击剩余的再用火魔法进行攻击&#xff0c; 也就是只要存在合法的体积&#xff08;即装入背包的怪物的体积之…

Docker、Kubernetes、OCI、CRI-O、containerd、runc 之间的关系以及它们是如何一起工作的?

最近网上看到一张图片&#xff0c;能够很清晰地展现出 Docker、Kubernetes、OCI、CRI-O、containerd、runc 之间的关系以及它们是如何在一起工作的&#xff0c;如下&#xff1a; 本文可以作为之前一篇文章&#xff08;《K8s、Docker、CRI、OCI 之间的爱恨情仇》&#xff09;的…

Echarts的引入使用

ECharts文档 1.下载并引入Echarts 2.准备一个具备大小的DOM容器 3.初始化echarts实例对象 4.指定配置项和数据(option) 5.将配置项设置给echarts实例对象 最后是一个js文件 echarts的引入 1.引入echarts - js 文件 <script src"js/echarts.min.js"></scri…

【新手解答1】深入探索 C 语言:变量名、形参 + 主调函数、被调函数 + 类和对象 + 源文件(.c 文件)、头文件(.h 文件)+ 库

C语言的相关问题解答 写在最前面目录 问题1变量名与变量的关系与区别变量和数据类型形参&#xff08;形式参数&#xff09;的概念 问题2解析&#xff1a;主调函数和被调函数延伸解析&#xff1a;主调函数对于多文件程序的理解总结 问题3类和对象变量和数据类型变量是否为抽象的…

YOLOv5算法进阶改进(6)— 更换主干网络之ResNet18

前言:Hello大家好,我是小哥谈。ResNet18是ResNet系列中最简单的一个模型,由18个卷积层和全连接层组成,其中包含了多个残差块。该模型在ImageNet数据集上取得了很好的表现,成为了深度学习领域的经典模型之一。ResNet18的优点是可以解决深度神经网络中梯度消失的问题,使得性…

第一个C代码讲解

文章目录 编写C文件创建文本文件编写代码修改文件后缀切换文件路径 编译代码打开命令行使用gcc编译代码运行程序双击运行使用命令行运行 代码分析编译过程 编写C文件 编辑C代码文件的工具有很多&#xff0c;为了让大家初学的时候摆脱编译软件的干扰&#xff0c;更容易理解编译过…