Kafka安装

Kafka安装需要有Java和zookeeper环境,如何配置可参考本站文章《Java_sdk安装》和《zookeeper安装》。

1.本地安装

1.1.解压安装

本地安装,适用于本地开发调试使用。自动化安装脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/bin/bash
# kafka下载地址
kafka_addr=""

# kafka的安装路径
kafka_home="/test/kafka"


# 创建安装目录
mkdir -p $kafka_home
if [ "$?" != "0" ]; then
echo 'create file failed!'
exit 1
fi

# 下载安装包
wget $kafka_addr
if [ "$?" != "0" ]; then
echo 'download failed!'
exit 1
fi

# 解压安装包
name=$(ls | grep zookeeper)
tar -zxvf $name -C $kafka_home
if [ "$?" != "0" ]; then
echo 'decompression file failed!'
exit 1
fi

echo 'install successful!'

1.2.调整配置

进入conf目录下确认server.properties配置是否符合预期,常见的配置项如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# kafka server配置 kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect

# 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
broker.id=0

# broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
host.name=10.0.0.1

# broker server服务端口
port =9092

# broker处理消息的最大线程数,一般情况下数量为cpu核数
num.network.threads=4

# broker处理磁盘IO的线程数,数值为cpu核数2倍
num.io.threads=8

# socket的发送缓冲区,socket的调优参数SO_SENDBUFF
socket.send.buffer.bytes=1048576

# socket的接受缓冲区,socket的调优参数SO_RECVBUFF
socket.receive.buffer.bytes=1048576

# socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600

# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=2

# 数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略,log.retention.bytes和log.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除
# 有2删除数据文件方式: 按照文件大小删除:log.retention.bytes 按照2中不同时间粒度删除:分别为分钟,小时
log.retention.hours=168

# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=536870912

# 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=60000

# 是否开启日志清理
log.cleaner.enable=false

# zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=10.0.0.1:2181

# ZooKeeper的连接超时时间
zookeeper.connection.timeout.ms=60000

# ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不应过大
zookeeper.session.timeout.ms=6000

# ZooKeeper集群中leader和follower之间的同步时间
zookeeper.sync.time.ms =2000

# kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2
log.dirs=/tmp/kafka.log

1.3.启停服务

进入bin目录

1
2
3
4
5
# 启动服务
./kafka-server-start.sh -daemon ../config/server.properties

# 停止服务
./kafka-server-stop.sh

1.4.验证功能

1
2
3
4
5
6
7
8
9
10
11
# 创建并验证主题
./kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic test --bootstrap-server localhost:9092

# 查看主题
./kafka-topics.sh --list --bootstrap-server localhost:9092

# 向测试主题上发布消息
./kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

# 从测试主题上读取消息
./kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092