一、部署准备
1.1 环境准备
我们这里使用vagrant创建了三台centos虚拟机,具体可参 《win10搭建Vagrant+VirtualBox环境》 。我们这里部署计划如下:
主机名 | IP地址 | 集群节点ID | 是否部署zookeeper | 是否部署kafka |
---|
node01 | 192.168.33.11 | 1 | 是 | 是 |
node02 | 192.168.33.12 | 2 | 是 | 是 |
node03 | 192.168.33.13 | 3 | 是 | 是 |
1.2 关闭防护墙
由于centos默认使用firewalld,使用如下命令关闭。
1 2
| systemctl stop firewalld systemctl disable firewalld
|
具体可参考 《firewalld常用命令总结》 。
如果使用iptables,可以在配置文件**/etc/sysconfig/iptables**增加如下内容:
1 2 3
| -A INPUT -s 192.168.33.11/32 -p tcp -m multiport --dports 0:65535 -j ACCEPT -A INPUT -s 192.168.33.12/32 -p tcp -m multiport --dports 0:65535 -j ACCEPT -A INPUT -s 192.168.33.13/32 -p tcp -m multiport --dports 0:65535 -j ACCEPT
|
然后执行下面命令使iptables生效
1
| service iptables restart
|
具体可参考 《iptables配置与常用示例》 。
1.3 编辑host文件
在**/etc/hosts**文件增加如下内容
1 2 3
| 192.168.33.11 node01 192.168.33.12 node02 192.168.33.13 node03
|
注意
:三个节点都要操作
二、部署zookeeper集群
2.1 下载zookeeper
执行下面命令下载:
1
| wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.9.1/apache-zookeeper-3.9.1-bin.tar.gz
|
也可以访问官网 [https://zookeeper.apache.org/releases.html] 进行下载。
2.2 解压并修改文件名
1 2
| tar xvf apache-zookeeper-3.9.1-bin.tar.gz mv apache-zookeeper-3.9.1-bin zookeeper
|
2.3 修改配置文件
1 2
| mv zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo.cfg vi zoo.cfg
|
修改内容如下
1 2 3 4 5 6
| dataDir=/opt/zookeeper/data dataLogDir=/opt/zookeeper/log
server.1=node01:2888:3888 server.2=node02:2888:3888 server.3=node03:2888:3888
|
配置参数解读
Server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
B是这个服务器的ip地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
2.4 新增myid
这个时候我们需要创建数据和日志目录,在zookeeper下执行
集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
分别按照《1.1 环境准备》中规划的节点ID进行配置。
2.5 启动zookeeper
分别在三台主机上执行以上操作,注意myid的区别。然后在zookeeper目录执行下面命令启动
最后分别查看启动状态
三、部署kafka集群
3.1 下载kafka
1
| wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
|
3.2 解压并修改文件名
1 2
| tar xvf kafka_2.13-3.6.0.tgz mv kafka_2.13-3.6.0 kafka
|
3.3 修改配置文件
1 2
| cd kafka vi config/server.properties
|
修改如下内容:
1 2 3
| broker.id=1 log.dirs=/opt/kafka/kafka-logs zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka
|
注意
: 定义broker id,不能重复,只能为数字,这里我们和每台节点上zookeeper的myid保持一致。
3.4 创建日志目录
3.5 启动kafka
分别在三台节点执行下面命令启动
1
| ./bin/kafka-server-start.sh ./config/server.properties &
|
四、测试验证
在其中一台上执行下面命令
4.1 查看主题
1
| ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
|
4.2 创建主题
1
| ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic --partitions 1
|
再次查看主题如下所示
4.3 增加topic分区数
1
| ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test-topic --partitions 2
|
4.4 添加配置
1
| ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name test-topic --alter --add-config x=y
|
4.5 删除配置
1
| ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name test-topic --alter --delete-config x
|
4.6 检查消费组offset
1
| ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
|
4.7 列出消费组
1
| ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
|
4.8 要手动删除一个或多个消费者组
1
| ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
|
4.9 将消费组的offset 重置为最新
1
| ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
|
4.10 删除主题
1
| ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test-topic
|