kafka use
kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
- 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
- 支持通过kafka服务器和消费机集群来分区消息。
- 支持Hadoop并行数据加载。
卡夫卡的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。



kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。几个基本概念:
(1)message(消息)是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。如果consumer订阅了这个主题,那么新发布的消息就会广播给这些consumer。
(2)Kafka是显式分布式的,多个producer、consumer和broker可以运行在一个大的集群上,作为一个逻辑整体对外提供服务。对于consumer,多个consumer可以组成一个group,这个message只能传输给某个group中的某一个consumer.
1、kafka安装
1 | http://kafka.apache.org/documentation.html#quickstart |
2、启动kafka
1 | #启动之前zookeeper必须启动!zookeeper请参考[zookeeper cluster deploy](https://www.itweet.cn/2015/07/12/zk-cluster-deploy/) |
3、创建一个topic
1 | [root@server1 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --create --zookeeper server1:2181,server2:2181,server3:2181 --replication-factor 3 --partitions 1 --topic test |
4、命令行查看topic
1 | [root@server2 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --list --zookeeper server1:2181,server2:2181,server3:2181 |
5、发送一些消息
1 | [root@server1 kafka_2.10-0.8.2-beta]# bin/kafka-console-producer.sh --broker-list server1:9092 --topic test |
6、开始消费信息
1 | [root@server2 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic test --from-beginning |
8、查看集群topic详细信息
1 | [root@server2 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --describe --zookeeper server1:2181,server2:2181,server3:2181 --topic test |
9、删除topic
1 | $ kafka-topics.sh --zookeeper server1:2181,server2:2181,server3:2181 --delete --topic test |
10、kafka的webui
Kafka监控工具一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32https://github.com/quantifind/KafkaOffsetMonitor
wget https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar
Running It
This is a small webapp, you can run it locally or on a server, as long as you have access to the ZooKeeper nodes controlling kafka.
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk zk-server1,zk-server2 \
--port 8080 \
--refresh 10.seconds \
--retain 2.days
The arguments are:
- zk the ZooKeeper hosts
- port on what port will the app be available
- refresh how often should the app refresh and store a point in the DB
- retain how long should points be kept in the DB
- dbName where to store the history (default 'offsetapp')
[root@server1 local]# mkdir kafka-offset-console
[root@server1 local]# cd kafka-offset-console/
[root@server1 kafka-offset-console]# cat mobile_start_en.sh
#!/bin/bash
cd /usr/local/kafka-offset-console
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp ./KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk server1,server2,server3/config/mobile/kafka-offset-console --port 9999 --refresh 10.seconds --retain 7.days 1>./stdout.log 2>./stderr.log &
[root@server1 kafka-offset-console]# chmod +x mobile_start_en.sh
[root@server1 kafka-offset-console]# sh mobile_start_en.sh
[root@server1 kafka-offset-console]# tail -f stdout.log
serving resources from: jar:file:/usr/local/kafka-offset-console/KafkaOffsetMonitor-assembly-0.2.1.jar!/offsetappKafka监控工具二
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19https://github.com/yahoo/kafka-manager
[root@server1 kafka-manager]# echo $JAVA_HOME
/usr/local/jdk1.7.0_45
[root@server1 sbt]# sbt -version
Getting org.scala-sbt sbt 0.13.8 ...
https://github.com/yahoo/kafka-manager
[root@server1 hsu]# yum install git -y
[root@server1 hsu]# git clone https://github.com/yahoo/kafka-manager.git
[root@server1 hsu]# cd kafka-manager/
[root@server1 kafka-manager]# sbt clean dist #这里生产zip包
Configuration
$ unzip kafka-manager-1.0-SNAPSHOT.zip
在kafka-manager安装包的conf目录下面的application.conf文件中进行配置
kafka-manager.zkhosts="server1:2181,server2:2181,server3:2181"
Starting the service
$ bin/kafka-manager -Dconfig.file=./conf/application.conf -Dhttp.port=8081 > manager-ui.log &

这个工具需要自己编译,也可以直接找我获取编译包!
- Kafka监控工具三
1
2
3
4https://github.com/shunfei/DCMonitor
druid:
https://github.com/alibaba/druid/wiki/%E9%85%8D%E7%BD%AE_StatViewServlet%E9%85%8D%E7%BD%AE
11、kafka和MQ区别
1 | https://github.com/alibaba/RocketMQ/wiki/rmq_vs_kafka |
- 性能对比
•Kafka单机写入TPS约在百万条/秒,消息大小10个字节
• RocketMQ单机写入TPS单实例约7万条/秒,单机部署3个Broker,可以跑到最高12万条/秒,消息大小10个字节
总结:Kafka的TPS跑到单机百万,主要是由于Producer端将多个小消息合并,批量发向Broker。
12、项目分析
1 | 数据源: |
ambari 自动安装Kafka启动报错
1、由于log.dirs目录下的meta.properties中配置的broker.id和配置目录下的server.properties中的broker.id不一致了,解决问题的方法是将两者修改一致后再重启。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17$ cat /var/log/kafka/server.log
[2016-06-02 05:08:59,151] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn't match stored brokerId 1 in meta.properties
at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:630)
at kafka.server.KafkaServer.startup(KafkaServer.scala:175)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
at kafka.Kafka$.main(Kafka.scala:67)
at kafka.Kafka.main(Kafka.scala)
[2016-06-02 05:08:59,152] INFO shutting down (kafka.server.KafkaServer)
解决,如下可以看到broker.id不一致,修改后者重启即可:
[root@bigdata-hadoop-3 kafka-logs]# cat /etc/kafka/conf/server.properties |grep broker.id
broker.id=2
[root@bigdata-hadoop-3 kafka-logs]# cat /kafka-logs/meta.properties |grep id
broker.id=1
参考:
- 【0】kafka:https://kafka.apache.org/documentation.html#gettingStarted
- 【1】Kafka主页:http://sna-projects.com/kafka/design.php
- 【3】Kafka与Hadoop:http://sna-projects.com/sna/media/kafka_hadoop.pdf