文章目录
- 1.Kafka简介
- 2.环境准备
- 3.启动集群
- 4.生产者使用
- 5.消费者使用
1.Kafka简介
Kafka是门及一种高吞吐量的分布式发布订阅消息系统,使用Scala编写。集群
kafka是环境一个分布式的,分区的搭建消息(官方称之为commit log)服务。它提供一个消息系统应该具备的门及功能,但是集群确有着独特的设计。
Kafka基础的环境消息相关术语:
- Topic:Kafka按照Topic分类来维护消息
- Producer:我们将发布(publish)消息到Topic的进程称之为生产者(producer)
- Consumer:我们将订阅(subscribe)Topic并且处理Topic中消息的进程称之为消费者
- Broker:Kafka以集群的方式运行,集群中的搭建每一台服务器称之为一个代理(broker)。
因此,门及从一个较高的集群层面上来看,producers通过网络发送消息到Kafka集群,环境然后consumers来进行消费,搭建如下图:
阿里开源的门及消息中间件RocketMQ就是借鉴了Kafka设计思路。
2.环境准备
节点类型 | 节点主机名 | 节点IP地址 |
---|---|---|
主节点 | master | 192.168.239.128 |
从节点 | slave1 | 192.168.239.129 |
从节点 | slave2 | 192.168.239.130 |
修改3台机器的集群/etc/hosts文件
在启动集群时,主节点需要通过主机名远程访问从节点,环境所以需要让主节点能够识别从节点的主机名。
vi /etc/hosts192.168.239.128 master192.168.239.129 slave1192.168.239.130 slave2
Kafka官网下载地址: 点我跳转
本文下载的是2.12-3.2.1版本,下载完上传文件到master节点服务器上面的/home/soft目录下并解压。
tar -zxvf /home/soft/kafka_2.12-3.2.1.tgz
修改Kafka配置文件
修改server.properties文件
主要配置下面三个参数
- broker.id: 节点的Id #第24行
- log.dirs: 日志存放路径 #第62行
- zookeeper.connect: zk集群的地址,多个以逗号分开 #第125行
vi /home/soft/kafka_2.12-3.2.1/config/server.propertiesbroker.id=0 log.dirs=/home/soft/kafka_2.12-3.2.1/logszookeeper.connect=master:2181,slave1:2181,slave2:2181
复制安装包到从节点并修改broker.id,从节点的/home/soft目录必须存在
scp -rq /home/soft/kafka_2.12-3.2.1 slave1:/home/softscp -rq /home/soft/kafka_2.12-3.2.1 slave2:/home/soft
slave1机器修改id为1
vi /home/soft/kafka_2.12-3.2.1/config/server.propertiesbroker.id=1
slave2机器修改id为2
vi /home/soft/kafka_2.12-3.2.1/config/server.propertiesbroker.id=2
3.启动集群
在3个节点分别执行下面命令启动kafka
cd /home/soft/kafka_2.12-3.2.1bin/kafka-server-start.sh -daemon config/server.properties
执行完使用jps查看kafka是否启动成功
4.生产者使用
bin/kafka-topics.sh --create --bootstrap-server master:9092 --partitions 5 --replication-factor 3 --topic test
参数 | 说明 |
---|---|
--create | 创建topic队列 |
--topic | 指定队列名称 |
--bootstrap-server | 指定Kafka集群的地址,指定1个或多个都可以,多个用逗号隔开,Kafka默认端口是9092,低版本是使用--zookeeper指定zk集群的地址 |
--partitions | 指定topic分区数量 |
--replication-factor | 指定topic中分区的副本因子,参数值需要<=broker数量。 |
运行完可以看到已创建好名称为test的队列
查看所有队列信息
bin/kafka-topics.sh --list --bootstrap-server master:9092
通过kafka提供的测试脚本向队列里生产数据
bin/kafka-console-producer.sh --broker-list master:9092 --topic test
参数 | 说明 |
---|---|
--broker-list | 指定Kafka集群地址,指定1个或多个都可以,多个用逗号隔开,Kafka默认端口是9092 |
--topic | 指定队列名称 |
运行完再控制台输入一行hello word然后回车。
5.消费者使用
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test
参数 | 说明 |
---|---|
--bootstrap-serve | 指定Kafka集群的地址,指定1个或多个都可以,多个用逗号隔开,Kafka默认端口是9092 |
--topic | 指定队列名称 |
--from-beginning | 重队列头开始消费消息 |
启动消费者后发现消费不了数据,因为Kafka消费者默认消费最新的消息,如果想消费之前生产的数据,则需要添加参数 –from-beginning。
添加参数执行后可以看到控制台已消费hello word条数据。