kafka2.8.0不再需要zookeeper:

下载kafka:

https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz

下载zookeeper:

https://dlcdn.apache.org/zookeeper/zookeeper-3.5.10/apache-zookeeper-3.5.10-bin.tar.gz

安装zookeeper:

tar -xf /root/apache-zookeeper-3.5.10-bin.tar.gz -C /usr/local/
ln -sv /usr/local/apache-zookeeper-3.5.10-bin/ /usr/local/zookeeper

修改配置:

cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
vim /usr/local/zookeeper/conf/zoo.cfg
dataDir=/data/zookeeper
server.1=192.168.199.35:2888:3888
server.2=192.168.199.36:2888:3888
server.3=192.168.199.38:2888:3888

连接zookeeper:

]# bin/zkCli.sh -server 192.168.199.38:2181 --list

zookeeper.service:

cat >> /usr/lib/systemd/system/zookeeper.service <<EOF
[Unit]
Description=zookeeper.service
After=network.target
 
[Service]
Type=forking
Environment=JAVA_HOME=/usr/local/jdk
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=always
[Install]
WantedBy=multi-user.target
EOF

安装kafka:

tar -xf /root/kafka_2.12-2.8.0.tgz -C /usr/local/
ln -sv /usr/local/kafka_2.12-2.8.0 /usr/local/kafka

配置:

vim /usr/local/kafka/config/server.properties
broker.id=1 # 每个节点id都不能一样
listeners=PLAINTEXT://192.168.199.35:9092 # 监听的内部地址
advertised.listeners=PLAINTEXT://192.168.199.35:9092 # 监听的外部地址
log.dirs=/data/kafka # 数据目录需要改一下,默认为/tmp
zookeeper.connect=zookeeper01:2181,zookeeper02:2181,zookeeper03:2181/kafka # 这个目录用于集中存放数据便于管理

kafka.service:

cat >> /usr/lib/systemd/system/kafka.service <<EOF
[Unit]
Description=kafka.service
After=network.target
 
[Service]
Type=forking
Environment=JAVA_HOME=/usr/local/jdk
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=always
[Install]
WantedBy=multi-user.target
EOF

常用命令:

kafka-topics.sh

--bootstrap-server 连接kafka

--topic   操作topic

--create 创建主题

--delete 删除主题

--alter    修改主题

--list      查看所有主题

--describe  查看主题详细描述

--partition  设置分区

--replication-factor 设置分区副本

--config  更新系统默认的配置


连接kafka:

]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --list

创建topic:一个分区三个副本

]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092 --topic first --create --partitions 1 --replication-factor 3

查看主题详细信息:

]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092 --topic first --describe
Topic: first	TopicId: AFrTNBEQQiWg6Yy-_4Jruw	PartitionCount: 1	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: first	Partition: 0	Leader: 1	Replicas: 1,3,2	Isr: 1,3,2

连zookeeper也可以:

]# bin/kafka-topics.sh --zookeeper 192.168.199.38:2181 --topic first --describe
Topic: first	TopicId: AFrTNBEQQiWg6Yy-_4Jruw	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: first	Partition: 0	Leader: 3	Replicas: 1,3,2	Isr: 3,2
	Topic: first	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2,3
	Topic: first	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 3,2

修改分区:

]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092 --topic first --alter --partitions 3

创建生产者:

]# bin/kafka-console-producer.sh --bootstrap-server 192.168.199.35:9092 --topic first
>hello

创建消费者:

]# ./kafka-console-consumer.sh --bootstrap-server 192.168.199.36:9092 --topic first --from-beginning
hello

参数:

betch.size:数据到达这个值之后sender才会发送,默认16k

linger.ms:数据未到达betch.size,等待这个时间后也会发送默认0ms


kafka分区迁移


    场景是这样的,现在有三个节点,有一个节点故障需要迁移,原来的broker.id为1、2、3现在增加了一个4,需要把坏掉的节点3上面的数据迁移出来。

创建需要迁移的topic列表:多个topic写多个topic即可。

]# cat topics-to-move.json 
{
  "topics": [
       {"topic": "first"}
     ],
  "version": 1
}

生成Topic分区分配表:

--broker-list "1,2,4" 这里面的数字就是要迁移到的broker,就是不管之前数据在那些broker里面现在就是要迁移到这些borker里面。

]# bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --topics-to-move-json-file /root/topics-to-move.json --broker-list "1,2,4" --generate

上面这个命令输出的内容:

Current partition replica assignment # 这段保存到文件expand-cluster-reassignment.json,用来执行迁移
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[3,1,2],"log_dirs":["any","any","any"]}]}

Proposed partition reassignment configuration # 这段保存到文件rollback-cluster-reassignment.json,用来执行回滚
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,2,4],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[4,1,2],"log_dirs":["any","any","any"]}]}

执行迁移:根据数据量时间可能有长短

]# bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --reassignment-json-file /root/expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[3,1,2],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-0,first-1,first-2

验证迁移:

]# bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --reassignment-json-file /root/expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 1,2,3,4
Clearing topic-level throttles on topic first

迁移结果查看:

]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --topic first --describe

回滚:迁移完了之后又不想迁移了就可以执行回滚

]# bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --reassignment-json-file /root/rollback-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,2,4],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[4,1,2],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-0,first-1,first-2

回滚验证:

]# bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --reassignment-json-file /root/rollback-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 1,2,3,4
Clearing topic-level throttles on topic first

回滚结果查看:

]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --topic first --describe
Topic: first	TopicId: AFrTNBEQQiWg6Yy-_4Jruw	PartitionCount: 3	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: first	Partition: 0	Leader: 1	Replicas: 1,3,2	Isr: 2,1,3
	Topic: first	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2,1,3
	Topic: first	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 2,1,3

参考地址:

garywu520.github.io/2018/12/06/KAFKA%E6%89%A9%E5%AE%B9%E8%8A%82%E7%82%B9%E5%92%8C%E5%88%86%E5%8C%BA%E8%BF%81%E7%A7%BB-CDH/


副本扩容缩容


按照如下修改:修改完执行即可

{
  "version": 1,
  "partitions": [
    {
      "topic": "first",
      "partition": 0,
      "replicas": [1,3,2], # 表里是 broker id,第一个是leader
      "log_dirs": ["any","any","any"] # 这里log_dirs中any的数量要和replicas中broker id的数量匹配
    },
    {
      "topic": "first",
      "partition": 1,
      "replicas": [2,3,1], # 扩容或缩容直接去掉或增加broker id就行
      "log_dirs": ["any","any","any"] # log_dirs里面的值也要去掉或添加对应的数量
    },
    {
      "topic": "first",
      "partition": 2,
      "replicas": [3,1,2],
      "log_dirs": ["any","any","any"]
    }
  ]
}


跨路径迁移


log.dirs是可以指定多个目录:

log.dirs=/data/kafka-logs-5,/data/kafka-logs-6,/data/kafka-logs-7,/data/kafka-logs-8

示例:

{
  "version": 1,
  "partitions": [
    {
      "topic": "first",
      "partition": 0,
      "replicas": [1,3,2],
      "log_dirs": ["/data/kafka-logs-5","/data/kafka-logs-6","/data/kafka-logs-7"] # 只要把any改成对应的目录即可
    },
    {
      "topic": "first",
      "partition": 1,
      "replicas": [2,3,1],
      "log_dirs": ["/data/kafka-logs-5","/data/kafka-logs-6","/data/kafka-logs-7"]
    },
    {
      "topic": "first",
      "partition": 2,
      "replicas": [3,1,2],
      "log_dirs": ["/data/kafka-logs-5","/data/kafka-logs-6","/data/kafka-logs-7"]
    }
  ]
}

副本扩容,跨路径迁移:

51cto.com/article/679893.html