1

轻松上手Kafka

效率能够让你做到你以前不曾做到的事情。

Productivity is being able to do things that you were never able to do before.

弗兰兹·卡夫卡

众所周知,开源大数据产品名字起得都比较呆萌,比如Hadoop(玩具大象的名字)、Mahout(看象人)、Pig(豕)、Hive(蜂巢)、Oozie(驯象人)、ZooKeeper(动物园管理员)、Falcon(猎鹰),好像掉进了西郊公园。相比之下,Kafka的名字却优雅得多。根据其作者Jay Kreps说,由于在设计上为写操作高度优化,想用作家的名字来命名,而大学文化课上Franz Kafka令他印象深刻,所以就起了Kafka这个名字。程序猿也可以很文艺 🙂

云上卡夫卡一文中我们介绍了百度Kafka背景,这次分享使用方法。

用户体验

百度Kafka是全托管大数据服务,提供了开箱即用的体验。登录百度云管理控制台后,选择“产品服务>百度Kafka服务”,点击“创建主题”并设置以下信息:

  1. 主题名称,比如“demo”,将会创建名为“<前缀>__demo”的主题。
  2. 分区个数,每个分区支持每秒写入1MB和读取2MB数据,如果你需要每秒上传10MB的数据那就选择10个分区。

确认之后便可以与该主题交互了。

百度Kafka支持基于X509证书的认证和授权,在管理安全的同时,通过SSL能够确保数据在传输时候不被监听或篡改。在管理控制台中打开“访问策略”标签,点击kafka-key.zip便能下载证书,之后请参照https://github.com/BCEBIGDATA/kafka-sample-java克隆Java源代码并配置妥当。

Windows的Maven环境请执行run.bat <topic_name>(Linux环境请换成sh run.sh <topic_name>),便能够看到消息正确地被发布和消费:

本质上,脚本创建了Application实例,依次创建Producer实例发布消息、Consumer实例订阅消息。

Producer核心示例代码如下:

Properties properties = new Properties();
properties.load(...);
properties.setProperty(...);
 
KafkaProducer&lt;byte[], byte[]&gt; producer
    = new KafkaProducer&lt;byte[], byte[]&gt;(properties);
 
byte[] key = Integer.toString(0).getBytes("UTF-8");
byte[] value = "hello, kafka".getBytes("UTF-8");
ProducerRecord&lt;byte[], byte[]&gt; record
    = new ProducerRecord&lt;byte[], byte[]&gt;(TOPIC, key, value);
producer.send(record);

Consumer核心示例代码如下:

Properties properties = new Properties();
properties.load(...);
properties.setProperty(...);
 
KafkaConsumer&lt;byte[], byte[]&gt; consumer
    = new KafkaConsumer&lt;byte[], byte[]&gt;(properties);
consumer.subscribe(Collections.singletonList(TOPIC));
 
ConsumerRecords&lt;byte[], byte[]&gt; records = consumer.poll(TIME_OUT_MS);
for (ConsumerRecord&lt;byte[], byte[]&gt; record : records) {
    String position = record.partition() + "-" + record.offset();
    String key = new String(record.key(), "UTF-8");
    String value = new String(record.value(), "UTF-8");
    logger.info(position + ": " + key + " " + value);
}

完整代码请参考GitHub,不再赘述。方便起见,还可以在管理控制台下载sample-with-kafka-key.zip,其中包含了完整的Java项目以及证书,可以直接编译运行。

数据集成

百度Kafka还为特定场景的数据集成进行了优化。

首先让我们来看一看实时日志实时分析场景。在大数据时代,企业的生产、销售、经营、管理逐渐由技术驱动转型为数据驱动,以便更全面地认知现状把握未来。其中,日志是其中最易获得、覆盖面最广同时也是最有价值的数据之一,比如用户行为、服务器运行、数据库改动等产生的时间序列数据都是典型的日志。

比如,为了监控服务器集群的运行状态,可以借助百度日志服务BLS汇总日志,对接Kafka作为消息队列缓存,再对接百度MapReduce服务上的Spark Streaming实时分析服务器是否宕机或者遇到了攻击等,以便输出报表或作出报警:

图中箭头处的连接器是百度云提供的数据集成特性,在创建日志传输任务的时候,可以直接选择一个百度Kafka主题进行可靠的海量数据转发,而无需任何编码:

除了外部客户,百度云本身很多服务就是用的这套架构做日志分析的。

让我们再来看一个物联网实时数据分析的例子。

为了支撑智能景观灯项目,主控设备通过物接入上传景观灯注册数据以及心跳数据,通过规则引擎进行过滤和变形,然后需要在云端分析处理。由于这是一个多租户的服务,考虑到数据量增长非常迅速,便借助百度Kafka来做消息队列缓存,然后通过多个云服务器组成的Kafka消费者组共同来处理消息,确保计算能力能够水平扩展。处理完的数据持久化在MySQL中并用缓存服务提高性能,最后显示在用户的移动设备上:

图中箭头处的连接器也是百度云内置的数据集成特性,在规则引擎中通过SQL将物联网数据进行过滤和变形,然后把数据转发到某个百度Kafka主题而无需编码。

此外,客户也非常满意百度Kafka的计费方式。之前需要在云服务器集群上部署运维Kafka,初期虽然数据量不大但仍然需要为整个集群付费,切换到百度Kafka后只需要为使用的资源买单,省却了运维工程师的人工,性价比非常突出。

总结

百度Kafka是百度云天算大数据平台推出的多租户托管服务,与自行部署并运维Kafka相比,有以下增强:

  1. 开箱即用:可以直接创建主题并使用Kafka服务,专注业务而不用花费精力去安装、部署、配置、调试和维护集群。
  2. 低廉价格:只需为创建的分区和收发的消息付费,而不是虚拟主机集群。
  3. 数据安全:支持SSL加密,保证数据在传输的过程中不被窃听或者篡改。
  4. 可靠耐用:独特的服务高可用性和数据高可靠性设计。

秉承开源开放的宗旨,百度Kafka与社区的Kafka完全兼容。众所周知,开源在大数据的传播过程中起到了至关重要的作用,使用主流开源软件不但具有很好的性价比,迁移成本低也不用担心被供应商绑定,市场也有人才储备。

欢迎使用百度Kafka!

 



张 琪

One Comment

发表评论

电子邮件地址不会被公开。 必填项已用*标注