首页 > 网络 > 云计算 >

kafka基础使用方法(java)

2017-08-05

kafka基础使用方法(java)。

kafka基础使用方法(java)。

操作步骤:

创建Topic

$ cd /opt/cloudera/parcels/KAFKA-2.1.1-1.2.1.1.p0.18 
$ bin/kafka-topics --create
--zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

这里写图片描述
分析:虽然kafka对外开放的api中没有建立topic的方法,但是实验证明,当producer发送一个新topic消息到broker时会自动建立一个对应的topic。
不过自动建立的topic使用的是默认配置,若有需要还需手动修改配置。

发送数据到kafka

$ bin/kafka-console-producer --broker-list xxx.xxx.xxx.xxx:9092 --topic test

这里写图片描述
(红框内的消息需要手动输入)
分析:kafka默认通过9092端口与producer和consumer进行数据交互。正式使用时该步骤由producer端的脚本替代。

建立Consumer消费数据

$ bin/kafka-console-consumer --zookeeper xxx.xxx.xxx.xxx:2181--bootstrap-server xxx.xxx.xxx.xxx:9092 --topic
test --from-beginning

这里写图片描述
分析:成功读出test中的数据。正式使用时该步骤由consumer端的脚本替代。


问题汇总:

别用localhost!!!

问题再现:最开始按照官方文档的指引,使用一下命令来建立一个测试用的producer

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

这里写图片描述
结果出现上图中的问题:
org.apache.kafka.common.errors.TimeoutException:
Failed to update metadata after 60000 ms.
对于该问题作出一下猜想:认为问题出在zookeeper对于某ip的权限限制上面,换句话说官方文档中的demo应该是在单机环境中的,此时zookeeper与kafka在同一机器上,zookeeper对该机器localhost有权限更新metadata,而在使用多节点集群时,zookeeper为了防止传输出现偏差,禁止了localhost的相应权限。
要验证以上猜想,需要在单机环境下进行测试,没时间做。再说吧。╮(╯_╰)╭
解决方法:改用特定ip即可,就这么简单,但是问题找了很久,一点办法都木有。
总结:别用localhost!!有必要熟悉一下zookeeper的原理。

相关文章
最新文章
热点推荐