Kafka 集群迁移方案分析与实施

2022/09/07 Kafka 共 6442 字,约 19 分钟
欢迎

由于业务变更,需要将原有的 Kafka 集群物理搬迁至新地方,或者直接迁移至新机器上。在保证数据的一致性的基础上,根据集群情况不同选择了不同的方案。

方案一 先扩容后缩容

概述

Kafka 有自己独立使用的 Zookeeper 集群时使用此方案。Zookeeper 与 Kafka 部署在相同的三台机器上,分别名为 host01、host02、host03,Zookeeper 与 Kafka 各自的 id 也分别为 1、2、3。新机器分别为 host04、host05、host06。

整体思路为先将 Zookeeper 和 Kafka 扩容至 6 台机器,然后再缩容为 3 台。优点是对业务方友好,有较长时间段可以更新连接地址且感知较低;保存了所有的历史数据(包括 offset 信息)。缺点是维护方操作略复杂。可参考 Zookeeper 的扩容和缩容方案

过程

注意:每次启停一台机器的服务时,都检查一下集群,确保符合预期;启动 Zookeeper 时,任何时候都记住,原则是 “id 先小后大,最后 leader”

  1. 将 Zookeeper 扩容至 5 台。

    为了给业务方留出较长时间更新连接,同时保证 Zookeeper 的安全性(防止脑裂),先扩容至 5 台保证奇数。

    在 host04、05 上修改配置文件 zoo.cfg

     # 省略其他配置,以下 3 行为原机器信息
     server.1=host01:2888:3888
     server.2=host02:2888:3888
     server.3=host03:2888:3888
     # 以下两行为新增 2 台机器
     server.4=host04:2888:3888
     server.5=host05:2888:3888
     # 以下为新增的 host06 机器,在后续第 4 步中放开注释
     #server.6=host06:2888:3888
    

    先后启动 host04、05,然后依次重启 host01、03、02。此处假定 host02 为leader,因此最后重启,重启后 leader 应为 host05。

  2. 将 Kafka 扩容至 6 台。

    Kafka 正常启动即可,启动后查看应有 6 台 broker。此处新 Kafka 机器可以直接使用新 Zookeeper 的 3 台机器地址,避免后期还要更新参数重启。

    可选项为开启 JMX,方便以后监控相关指标。在 ./bin/kafka-server-start.sh最后一行之前添加以下:

     export LOG_DIR=/var/log/kafka
     export JMX_PORT=9999
     export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g -Xmn2g"
     export KAFKA_JVM_PERFORMANCE_OPTS="-XX:MetaspaceSize=96m -XX:MaxMetaspaceSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
     -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16m -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
     -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/var/log/kafka/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails
     -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=4 -XX:GCLogFileSize=50m"
    
  3. 迁移 Kafka 数据。

    使用 Kafka 自带的 kafka-reassign-partitions.sh 迁移数据,参考官方文档中的 Reassign Partitions Tool。也可使用 CMAK(曾用名 Kafka Manager),在图形化界面上方便地完成数据迁移。

  4. 添加最后一台新 Zookeeper 机器,并关闭旧服务。

    执行此步骤之前,必须确保所有生产/消费者都已经更新了连接地址。

    直接关闭 3 台旧的 Kafka 服务。

    将 host06 的 zoo.cfg 如第 1 步配置,同时放开其中 server.6 的注释。然后启动 host06,再依次重启 01-05(重启前 leader 为 05,重启后变为 06)。

    然后开始关闭 Zookeeper。为保证集群始终可用,先将 02-06 上 zoo.cfg 中的 server.1 注释掉,然后直接关闭 host01,其余 5 台依次重启,至此host01 成功关闭。依次对 02、03 重复此过程,再次强调 leader 一定要最后重启。

可能遇到的问题

  1. 整体下来未收到异常反馈,但在自己测试时,发现在集群外消费数据时,有时会报如下错误:

     Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    

    旧机器全部停掉后,该错误现象不再出现。若等不及,根据报错中的内容调整消费者或集群的对应参数即可。

  2. 迁移完成一段时间后收到反馈,某几分钟内集群无法连接,查看日志有以下信息:

     [2022-09-02 07:49:26,234] INFO [GroupCoordinator 6]: Member rdkafka-c33fe9e9-6a8a-4c6e-9538-b05520f7c5c1 in group event20220711 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
     [2022-09-02 07:49:26,234] INFO [GroupCoordinator 6]: Preparing to rebalance group event20220711 with old generation 82 (__consumer_offsets-0) (kafka.coordinator.group.GroupCoordinator)
    

    因此判断与迁移无关,是消费者失败导致的 rebalance 过程,可参考关于 Kafka Rebalance 的更多信息

方案二 新建独立集群

此方案适用于 Kafka 集群使用了公用的 Zookeeper,不便随意调整的情况。优点是负责人直接搭建集群即可,操作简单,安全性高;缺点是新旧集群完全独立,没有历史数据;需要业务方集中在短时间段内完成地址更新,且不可避免地有一定数据延迟。

  1. 搭建集群,较为简单不再赘述。

  2. 在新集群创建与原集群相同的 topic。

  3. 提前通知业务方,在短时间内集中更新连接地址。注意点有:

    • 先更新生产者。

    • 待消费者消费完原集群的数据后,更新消费者。

    • 消费者需要设置 auto.offset.reset=earliest 即当没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量(如数据已删除)时,自动将偏移量重置为最早偏移量,可以保证新集群从最开始消费数据。

方案三 物理搬迁

适用于运行服务的机器不改变,只是需要“关机-搬迁-开机”过程的情况。在数据有 n 副本的情况下,允许同时搬迁 n-1 台机器。

对于 3 副本,由于设置有 min.insync.replicas=2,对于消费者来说,控制消息至少要被写入到2个副本才算是“已提交”。如果 ISR中 的副本数少于 2,客户端会返回异常 org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required,因此需要临时设置该参数为1,搬迁后改回。由于 Kafka 无法对多个 topic 批量修改/重置参数(不支持正则匹配 topic),因此写了个简单 Shell 脚本,实现此功能。针对数据的可靠性,可以参阅 kafka 数据可靠性深度解读

#!/bin/bash
kf_home='your_kafka_home'
zk_address='your_zookeeper_address_port_path_for_kafka'

if [ ! -n "$1" ]; then
        echo "Please input one arg 1 or 0"
        exit 1
elif [ $1 = 0 ]; then
        cmd_tail='--alter --delete-config min.insync.replicas'
elif [ $1 = 1 ]; then
        cmd_tail="--alter --add-config min.insync.replicas=$1"
else
        echo "Input args only accept 1 or 0"
        exit 1
fi

topic_list=(`$kf_home/bin/kafka-topics.sh --zookeeper $zk_address --list`)
for topic in ${topic_list[@]}
do
        cmd="$kf_home/bin/kafka-configs.sh --zookeeper $zk_address --entity-type topics --entity-name $topic $cmd_tail"
        result=`$kf_home/bin/kafka-configs.sh --zookeeper $zk_address --entity-type topics --entity-name $topic $cmd_tail`
        echo $cmd
        echo $result
done

更多常用命令

echo stat|nc 127.0.0.1 2181     # 查看 zk 版本
echo mntr|nc 127.0.0.1 2181     # 查看 follower
./bin/zkServer.sh start         # 启动 zk,restart/status/start-foreground 同理
./bin/zkCli.sh -timeout 5000 -r -server your_zk_address:2181    # -r 表示即使 zk 集群挂掉半数以上,也给客户端提供读服务
ls zk_path_for_kf/brokers/ids   # 查看 kf 的 工作机器,zk_path_for_kf 表示 kf 中设置的 zk 路径

./bin/kafka-topics.sh --zookeeper your_zk_address --list    # 列出 topic
./bin/kafka-topics.sh --zookeeper your_zk_address --topic your_topic --describe             # 查看 topic,其中 your_topic 支持正则
./bin/kafka-topics.sh --zookeeper your_zk_address --describe --under-replicated-partitions  # 查看副本未同步完成的 topic
./bin/kafka-configs.sh --zookeeper your_zk_address --entity-type topics --entity-name your_topic --describe    # 查看指定 topic 参数,不支持正则
./bin/kafka-configs.sh --zookeeper your_zk_address --entity-type topics --entity-name your_topic --alter --add-config min.insync.replicas=2     # 修改指定 topic 参数,以最小同步副本数为例
./bin/kafka-configs.sh --zookeeper your_zk_address --entity-type topics --entity-name your_topic --alter --delete-config min.insync.replicas        # 修改指定 topic 参数,以最小同步副本数为例
./bin/kafka-consumer-groups.sh --bootstrap-server your_kf_address:9092 --list       # 列出所有消费者组
./bin/kafka-consumer-groups.sh --bootstrap-server your_kf_address:9092 --group your_consumer-group --describe       # 查看消费 lag 等详情
./bin/kafka-server-start.sh -daemon ./config/server.properties          # 启动 kf

查看新版及旧版消费者组信息的脚本:

#!/bin/bash
# Kafka 的安装目录
KAFKA_HOME='your_kafka_home'

# Kafka broker 地址
BROKER_LIST='your_kafka_server:9092'

# ZooKeeper 地址
ZOOKEEPER='your_zookeeper_address_port_path_for_kafka'

# 列出所有新版消费者API的消费者组并打印他们正在消费的topic信息
echo "========== 新版消费者API的消费者组 =========="
CONSUMER_GROUPS=$($KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server $BROKER_LIST --list)

for GROUP in $CONSUMER_GROUPS
do
    echo "----- 消费者组:$GROUP -----"
    $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server $BROKER_LIST --describe --group $GROUP
    echo "-------------------------------"
done

# 列出所有旧版消费者API的消费者组并打印他们正在消费的topic信息
echo "========== 旧版消费者API的消费者组 =========="
CONSUMER_GROUPS=$($KAFKA_HOME/bin/zookeeper-shell.sh $ZOOKEEPER ls /consumers | tail -n 1)

for GROUP in ${CONSUMER_GROUPS//,/ }
do
    GROUP=${GROUP//[[:space:]]/}
    if [[ ! -z "$GROUP" && "$GROUP" != "["* && "$GROUP" != "]"* ]]
    then
        echo "----- 消费者组:$GROUP -----"
        $KAFKA_HOME/bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper $ZOOKEEPER --describe --group $GROUP
        echo "-------------------------------"
    fi
done

文档信息

Search

    Table of Contents