写在最前

因为安全要求现在需将 Kafka 升级至 3.9.x 或 4.x 版本。在此背景下,本文不仅记录部署过程,也引出一种常见且高效的日志采集方案:Java 应用将日志推送至 Kafka,由 Logstash 进行消费与处理,再写入 Elasticsearch,最终通过 Kibana 实现可视化查询与分析。

1. docker 部署

2. kubernetes 部署

2.1 statefulset

kind: StatefulSet
apiVersion: apps/v1
metadata:
  name: kafka
  namespace: bx
  annotations:
    kubesphere.io/creator: tanqidi
    kubesphere.io/description: 'apache/kafka:3.9.1'
spec:
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: kafka
      annotations:
        kubesphere.io/creator: admin
        kubesphere.io/imagepullsecrets: '{}'
        kubesphere.io/restartedAt: '2025-06-17T09:56:45.334Z'
        logging.kubesphere.io/logsidecar-config: '{}'
    spec:
      containers:
        - name: kafka
          image: 'apache/kafka:3.9.1'
          command:
            - sh
            - '-c'
            - >
              BROKER_ID=$(( ${HOSTNAME##*-} + 1 ))

              echo "Using broker.id=$BROKER_ID"

              exec /opt/kafka/bin/kafka-server-start.sh
              /opt/kafka/config/server.properties \
                --override broker.id=$BROKER_ID \
                --override zookeeper.connect=zookeeper-0.zookeeper.bx.svc.cluster.local:2181,zookeeper-1.zookeeper.bx.svc.cluster.local:2181,zookeeper-2.zookeeper.bx.svc.cluster.local:2181 \
                --override listeners=PLAINTEXT://0.0.0.0:9092 \
                --override advertised.listeners=PLAINTEXT://$(POD_NAME).kafka.bx.svc.cluster.local:9092 \
                --override log.dirs=/data/kafka/data
          ports:
            - name: http-9092
              containerPort: 9092
              protocol: TCP
          env:
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: metadata.name
            - name: TZ
              value: Asia/Shanghai
          resources:
            limits:
              cpu: '1'
              memory: 3Gi
            requests:
              cpu: 40m
              memory: 1500Mi
          volumeMounts:
            - name: kafka-data
              mountPath: /data/kafka
            - name: kafka-logs
              mountPath: /opt/kafka/logs
          livenessProbe:
            tcpSocket:
              port: 9092
            timeoutSeconds: 2
            periodSeconds: 10
            successThreshold: 1
            failureThreshold: 3
          readinessProbe:
            tcpSocket:
              port: 9092
            timeoutSeconds: 2
            periodSeconds: 5
            successThreshold: 1
            failureThreshold: 3
          startupProbe:
            tcpSocket:
              port: 9092
            initialDelaySeconds: 120
            timeoutSeconds: 5
            periodSeconds: 5
            successThreshold: 1
            failureThreshold: 72
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          imagePullPolicy: IfNotPresent
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
      dnsPolicy: ClusterFirst
      securityContext: {}
      imagePullSecrets:
        - name: harbor-bx
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - kafka
              topologyKey: kubernetes.io/hostname
      schedulerName: default-scheduler
  volumeClaimTemplates:
    - kind: PersistentVolumeClaim
      apiVersion: v1
      metadata:
        name: kafka-data
        creationTimestamp: null
      spec:
        accessModes:
          - ReadWriteOnce
        resources:
          requests:
            storage: 10Gi
        volumeMode: Filesystem
      status:
        phase: Pending
    - kind: PersistentVolumeClaim
      apiVersion: v1
      metadata:
        name: kafka-logs
        creationTimestamp: null
      spec:
        accessModes:
          - ReadWriteOnce
        resources:
          requests:
            storage: 1Gi
        volumeMode: Filesystem
      status:
        phase: Pending
  serviceName: kafka
  podManagementPolicy: OrderedReady
  updateStrategy:
    type: RollingUpdate
    rollingUpdate:
      partition: 0
  revisionHistoryLimit: 10

2.2 service

kind: Service
apiVersion: v1
metadata:
  name: kafka
  namespace: bx
  annotations:
    kubesphere.io/creator: admin
spec:
  ports:
    - protocol: TCP
      port: 9092
      targetPort: 9092
  selector:
    app: kafka
  clusterIP: None
  clusterIPs:
    - None
  type: ClusterIP
  sessionAffinity: None
  ipFamilies:
    - IPv4
  ipFamilyPolicy: SingleStack
  internalTrafficPolicy: Cluster

2.3 logstash

2.3.1 deployment

kind: Deployment
apiVersion: apps/v1
metadata:
  name: logstash
  namespace: bx
  annotations:
    deployment.kubernetes.io/revision: '3'
    kubesphere.io/creator: admin
spec:
  replicas: 1
  selector:
    matchLabels:
      app: logstash
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: logstash
      annotations:
        kubesphere.io/creator: admin
        kubesphere.io/restartedAt: '2025-06-18T04:36:04.784Z'
    spec:
      volumes:
        - name: config-volume
          configMap:
            name: logstash-config
            defaultMode: 420
      containers:
        - name: logstash
          image: 'docker.elastic.co/logstash/logstash:7.17.0'
          ports:
            - containerPort: 5044
              protocol: TCP
          resources: {}
          volumeMounts:
            - name: config-volume
              mountPath: /usr/share/logstash/pipeline
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          imagePullPolicy: IfNotPresent
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
      dnsPolicy: ClusterFirst
      securityContext: {}
      schedulerName: default-scheduler
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 25%
      maxSurge: 25%
  revisionHistoryLimit: 10
  progressDeadlineSeconds: 600

2.3.2 service

kind: Service
apiVersion: v1
metadata:
  name: logstash
  namespace: bx
  annotations:
    kubesphere.io/creator: admin
spec:
  ports:
    - name: beats
      protocol: TCP
      port: 5044
      targetPort: 5044
  selector:
    app: logstash
  type: ClusterIP
  sessionAffinity: None
  ipFamilies:
    - IPv4
  ipFamilyPolicy: SingleStack
  internalTrafficPolicy: Cluster

2.3.3 configmap

kind: ConfigMap
apiVersion: v1
metadata:
  name: logstash-config
  namespace: bx
  annotations:
    kubesphere.io/creator: admin
data:
  logstash.conf: |
    input {
      kafka {
        # 指定 Kafka 消息的解码器为 JSON,方便结构化处理
        codec => "json"

        # 匹配所有以 logstash- 开头的 Kafka topic,例如 logstash-app、logstash-nginx 等
        topics_pattern => "logstash-.*"

        # Kafka 集群的地址列表(多个 broker,逗号分隔)
        bootstrap_servers => "kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092"

        # 当没有 offset 时,从最新的消息开始消费(可选值:earliest 或 latest)
        auto_offset_reset => "latest"

        # 添加 Kafka 的元信息(如 topic、partition 等)到事件的 [@metadata] 字段中
        decorate_events => true

        # Kafka 消费者组 ID,用于管理 offset 和避免重复消费
        group_id => "logstash-g1"
      }
    }

    output {
      elasticsearch {
        # Elasticsearch 集群节点地址,可以配置多个做高可用
        hosts => ["http://elasticsearch.bx.svc.cluster.local:9200"]

        # 使用 Kafka 的 topic 名动态生成索引,并按天拆分索引,例如 logstash-app-2025-06-18
        index => "%{[@metadata][kafka][topic]}-%{+YYYY-MM-dd}"
      }

      # 以下为可选输出,用于调试时打印完整事件到控制台
      # stdout {
      #   # 使用 rubydebug 格式美化输出,适合调试数据结构
      #   codec => rubydebug
      # }
    }

2.4 kafka 常用命令

# 创建一个logstash-test1的topic,设置为 3 个分区,
./kafka-topics.sh \
  --bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 \
  --create --topic logstash-hello --partitions 3 --replication-factor 3

# 生成1000条json相关的模拟信息
seq 1 10000 | awk '
  function rand_level() {
    levels[1]="INFO"; levels[2]="WARN"; levels[3]="ERROR"; 
    return levels[int(1+rand()*3)];
  }
  BEGIN { srand(); }
  {
    ts = strftime("2025-06-18T11:%02d:%02dZ", 0, 0, 0, 0, 0, 0, 0, 0)  # 固定时间可改
    level = rand_level()
    printf("{\"timestamp\":\"2025-06-18T11:%02d:%02dZ\",\"level\":\"%s\",\"app\":\"my-app\",\"host\":\"host1.example.com\",\"message\":\"Test log message %d\",\"user_id\":%d,\"trace_id\":\"trace-%d\"}\n", int(rand()*59), int(rand()*59), level, $1, 1000+$1, 100000+$1);
  }' | ./kafka-console-producer.sh --bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 --topic logstash-hello


# 列表topic
./kafka-topics.sh --bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 --list

# 删除topic
./kafka-topics.sh \
  --bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 \
  --delete \
  --topic logstash-test1

2.5 kafka 测试脚本

#!/bin/bash

# Kafka 二进制目录,按需改成你实际路径
KAFKA_BIN_DIR="/opt/kafka/bin"

BOOTSTRAP_SERVERS="kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092"

function list_topics() {
  echo "📜 所有 Topic 列表:"
  topics=$($KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list)
  if [[ -z "$topics" ]]; then
    echo "没有找到任何Topic"
    return
  fi
  
  # 过滤掉__consumer_offsets主题
  filtered_topics=""
  for topic in $topics; do
    if [[ "$topic" != "__consumer_offsets" ]]; then
      filtered_topics+=" $topic"
    fi
  done
  
  if [[ -z "$filtered_topics" ]]; then
    echo "除了__consumer_offsets外没有找到其他Topic"
    return
  fi
  
  for topic in $filtered_topics; do
    echo "🧩 $topic"
  done
}

function create_topic() {
  read -p "请输入Topic名称: " t
  read -p "请输入分区数(默认3): " p
  read -p "请输入副本因子(默认3): " r
  p=${p:-3}
  r=${r:-3}
  $KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --create --topic "$t" --partitions $p --replication-factor $r
  echo "Topic $t 创建完成"
}

function delete_topic() {
  read -p "请输入要删除的Topic名称: " t
  $KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --delete --topic "$t"
  echo "Topic $t 删除请求已发送"
}

function describe_topic() {
  read -p "请输入要查看分区状态的Topic名称: " t
  $KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --topic "$t"
}

function send_complex_messages() {
  read -p "请输入Topic名称: " t
  read -p "请输入发送消息数: " n
  echo "开始发送复杂JSON消息到 $t"

  # 创建临时文件存储所有消息
  tmpfile=$(mktemp)
  trap 'rm -f "$tmpfile"' EXIT

  # 定义随机业务类型
  business_types=("订单支付" "用户注册" "商品浏览" "购物车操作" "订单退款" "库存更新" "优惠券使用" "搜索查询")
  payment_methods=("支付宝" "微信支付" "银联" "信用卡" "余额支付")
  product_categories=("电子产品" "服装鞋帽" "家居用品" "食品饮料" "图书音像" "美妆个护" "母婴用品")
  user_roles=("普通用户" "VIP会员" "高级会员" "企业用户")

  # 定义可能的异常类型和消息
  error_types=("SQLException" "NullPointerException" "TimeoutException" "ValidationException" "IOException" "BusinessException" "DataNotFoundException" "UnauthorizedException")
  error_messages=(
    "数据库连接超时"
    "空指针异常,参数未初始化"
    "服务调用超时,等待超过30秒"
    "参数验证失败:字段不能为空"
    "文件读写错误,权限不足"
    "业务规则冲突:库存不足"
    "未找到匹配的数据记录"
    "用户未授权,访问被拒绝"
  )

  for i in $(seq 1 $n); do
    timestamp=$(date +"%Y-%m-%dT%H:%M:%S.%3N%z")
    business_type=${business_types[$((RANDOM % ${#business_types[@]}))]}

    # 10%概率生成错误日志
    if (( RANDOM % 10 == 0 )); then
      level="ERROR"
      error_index=$((RANDOM % ${#error_types[@]}))
      error_type=${error_types[$error_index]}
      error_message=${error_messages[$error_index]}
      stacktrace=$(cat <<EOF | tr -d '\n'
[
  "at com.example.demo.service.${business_type// /}Service.process(${business_type// /}Service.java:$((100 + RANDOM % 200))",
  "at com.example.demo.controller.${business_type// /}Controller.handleRequest(${business_type// /}Controller.java:$((50 + RANDOM % 100))",
  "at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
  "at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
  "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
  "at java.lang.reflect.Method.invoke(Method.java:498)"
]
EOF
)
    # 20%概率生成警告日志
    elif (( RANDOM % 5 == 0 )); then
      level="WARN"
      random_delay=$((RANDOM % 5000))
      error_message="性能警告:操作耗时较长(${random_delay}ms)"
      error_type="PerformanceWarning"
      stacktrace="[]"
    else
      level="INFO"
      error_type=""
      error_message=""
      stacktrace="[]"
    fi

    # 根据业务类型生成不同的详细数据
    case $business_type in
      "订单支付")
        amount=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 10000)) / 100" | bc))
        payment_method=${payment_methods[$((RANDOM % ${#payment_methods[@]}))]}
        order_id="ORD-$((1000000 + RANDOM % 9000000))"
        products_count=$((1 + RANDOM % 5))

        # 生成随机商品列表
        products_json=""
        for ((j=1; j<=products_count; j++)); do
          product_price=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 1000)) / 100" | bc))
          product_name="商品$(printf "%03d" $((RANDOM % 1000)))"
          category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}

          if [[ -n "$products_json" ]]; then
            products_json+=", "
          fi
          products_json+="{\"name\":\"$product_name\",\"category\":\"$category\",\"price\":$product_price,\"quantity\":$((1 + RANDOM % 5))}"
        done

        details=$(cat <<EOF | tr -d '\n'
{
  "orderId": "$order_id",
  "amount": $amount,
  "paymentMethod": "$payment_method",
  "discount": $(printf "%.2f" $(echo "scale=2; $((RANDOM % 100)) / 100" | bc)),
  "products": [$products_json],
  "shippingAddress": {
    "province": "省份$(printf "%02d" $((RANDOM % 34)))" ,
    "city": "城市$(printf "%03d" $((RANDOM % 300)))" ,
    "district": "区县$(printf "%03d" $((RANDOM % 3000)))" ,
    "address": "街道$(printf "%04d" $((RANDOM % 10000)))-$(printf "%04d" $((RANDOM % 10000)))"
  },
  "paymentTime": "$timestamp"
}
EOF
)
        ;;

      "用户注册")
        role=${user_roles[$((RANDOM % ${#user_roles[@]}))]}
        register_source=("网站" "APP" "小程序" "第三方平台")
        source=${register_source[$((RANDOM % ${#register_source[@]}))]}

        details=$(cat <<EOF | tr -d '\n'
{
  "username": "user$((10000 + RANDOM % 90000))",
  "email": "user$((10000 + RANDOM % 90000))@example.com",
  "phone": "13${RANDOM:0:9}",
  "registerTime": "$timestamp",
  "source": "$source",
  "role": "$role",
  "referrer": "user$((10000 + RANDOM % 90000))"
}
EOF
)
        ;;

      "商品浏览")
        product_id="PRD-$((1000000 + RANDOM % 9000000))"
        category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}
        price=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 1000)) / 100" | bc))

        # 修复用户代理生成的语法错误
        chrome_version=$((80 + RANDOM % 10))
        build_number=$((1000 + RANDOM % 1000))
        patch_version=$((10 + RANDOM % 90))

        details=$(cat <<EOF | tr -d '\n'
{
  "productId": "$product_id",
  "productName": "商品$(printf "%03d" $((RANDOM % 1000)))-$category",
  "category": "$category",
  "price": $price,
  "brand": "品牌$(printf "%02d" $((RANDOM % 100)))",
  "viewTime": "$timestamp",
  "viewDuration": $((1 + RANDOM % 300)),
  "fromPage": "category/$category",
  "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/${chrome_version}.0.${build_number}.${patch_version} Safari/537.36"
}
EOF
)
        ;;

      "购物车操作")
        action=("添加" "删除" "修改数量")
        selected_action=${action[$((RANDOM % ${#action[@]}))]}
        product_id="PRD-$((1000000 + RANDOM % 9000000))"
        category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}

        details=$(cat <<EOF | tr -d '\n'
{
  "action": "$selected_action",
  "productId": "$product_id",
  "productName": "商品$(printf "%03d" $((RANDOM % 1000)))-$category",
  "category": "$category",
  "quantity": $((1 + RANDOM % 10)),
  "cartId": "CART-$((1000000 + RANDOM % 9000000))",
  "operationTime": "$timestamp"
}
EOF
)
        ;;

      "订单退款")
        order_id="ORD-$((1000000 + RANDOM % 9000000))"
        amount=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 10000)) / 100" | bc))
        reason=("商品质量问题" "尺寸不合适" "颜色不符" "重复购买" "其他原因")
        refund_reason=${reason[$((RANDOM % ${#reason[@]}))]}

        details=$(cat <<EOF | tr -d '\n'
{
  "orderId": "$order_id",
  "refundId": "REF-$((1000000 + RANDOM % 9000000))",
  "amount": $amount,
  "reason": "$refund_reason",
  "refundTime": "$timestamp",
  "status": "处理中",
  "handler": "客服$(printf "%02d" $((RANDOM % 100)))"
}
EOF
)
        ;;

      "库存更新")
        product_id="PRD-$((1000000 + RANDOM % 9000000))"
        category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}
        location=("仓库A" "仓库B" "仓库C" "门店$(printf "%02d" $((RANDOM % 50)))" "配送中心")
        stock_location=${location[$((RANDOM % ${#location[@]}))]}
        old_stock=$((RANDOM % 1000))
        new_stock=$((RANDOM % 1000))

        details=$(cat <<EOF | tr -d '\n'
{
  "productId": "$product_id",
  "productName": "商品$(printf "%03d" $((RANDOM % 1000)))-$category",
  "location": "$stock_location",
  "oldStock": $old_stock,
  "newStock": $new_stock,
  "changeReason": "库存盘点",
  "updateTime": "$timestamp",
  "operator": "员工$(printf "%04d" $((RANDOM % 10000)))"
}
EOF
)
        ;;

      "优惠券使用")
        coupon_id="COUPON-$((1000000 + RANDOM % 9000000))"
        discount_type=("满减" "折扣" "无门槛")
        selected_type=${discount_type[$((RANDOM % ${#discount_type[@]}))]}
        amount=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 100)) / 100" | bc))

        details=$(cat <<EOF | tr -d '\n'
{
  "couponId": "$coupon_id",
  "discountType": "$selected_type",
  "discountAmount": $amount,
  "minAmount": $((RANDOM % 200)),
  "orderId": "ORD-$((1000000 + RANDOM % 9000000))",
  "userId": "user$((10000 + RANDOM % 90000))",
  "useTime": "$timestamp",
  "expireDate": "$(date -d "+$((RANDOM % 30)) days" +"%Y-%m-%d")"
}
EOF
)
        ;;

      "搜索查询")
        query_terms=("手机" "电视" "衣服" "鞋子" "笔记本电脑" "化妆品" "婴儿用品" "食品" "图书" "耳机" "相机")
        query_term=${query_terms[$((RANDOM % ${#query_terms[@]}))]}
        results_count=$((RANDOM % 100))

        details=$(cat <<EOF | tr -d '\n'
{
  "query": "$query_term",
  "resultsCount": $results_count,
  "searchTime": "$timestamp",
  "searchType": "商品搜索",
  "sort": "默认",
  "filters": {
    "priceRange": "$((RANDOM % 1000))-$((RANDOM % 1000 + 1000))",
    "brand": "品牌$(printf "%02d" $((RANDOM % 100)))"
  },
  "userId": "user$((10000 + RANDOM % 90000))"
}
EOF
)
        ;;
    esac

    # 根据日志级别调整消息内容
    if [[ "$level" == "ERROR" ]]; then
      message="处理$business_type时发生错误"
      response_code=500
      response_message="操作失败:$error_message"
      success=false
    elif [[ "$level" == "WARN" ]]; then
      message="处理$business_type时出现警告"
      response_code=200
      response_message="操作成功,但有警告:$error_message"
      success=true
    else
      message="$business_type 业务处理完成"
      response_code=200
      response_message="操作成功"
      success=true
    fi

    # 生成完整JSON消息
    json=$(cat <<EOF | tr -d '\n'
{
  "timestamp": "$timestamp",
  "level": "$level",
  "thread": "http-nio-8080-exec-$((RANDOM%10+1))",
  "logger": "com.example.demo.service.${business_type// /}Service",
  "message": "$message",
  "requestId": "$(uuidgen 2>/dev/null || echo fallback-$(date +%s%N))",
  "businessType": "$business_type",
  "details": $details,
  "response": {
    "code": $response_code,
    "message": "$response_message",
    "success": $success,
    "timestamp": "$timestamp"
  },
  "error": {
    "type": "$error_type",
    "message": "$error_message",
    "stackTrace": $stacktrace
  }
}
EOF
)

    # 将单行JSON写入临时文件
    echo "$json" >> "$tmpfile"
  done

  # 使用临时文件发送所有消息
  $KAFKA_BIN_DIR/kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVERS --topic "$t" < "$tmpfile"
  echo "消息发送完成"
}

function consumer_group_lag() {
  read -p "请输入消费者组名称: " cg
  $KAFKA_BIN_DIR/kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --group "$cg"
}

function batch_delete_topics() {
  read -p "请输入匹配删除Topic的正则表达式(例如 ^test.*): " pattern
  topics=$($KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list | grep -E "$pattern")
  if [[ -z "$topics" ]]; then
    echo "没有匹配的Topic"
    return
  fi
  echo "将删除以下Topics:"
  echo "$topics"
  for t in $topics; do
    $KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --delete --topic "$t"
    echo "删除请求已发送: $t"
  done
}

function cluster_health() {
  echo "🔍 Kafka 集群 Leader/副本状态:"
  $KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe | grep -E "Topic|Leader|Partition|ISR"
}

function menu() {
  echo ""
  echo "==== Kafka 工具脚本 ===="
  echo "1) 列出所有 Topic"
  echo "2) 创建 Topic"
  echo "3) 删除 Topic"
  echo "4) 查看 Topic 分区状态"
  echo "5) 发送复杂 JSON 消息 (模拟 SpringBoot MVC 日志)"
  echo "6) 查看消费者组 Lag"
  echo "7) 批量删除符合模式 Topic"
  echo "8) 集群健康检查"
  echo "0) 退出"
  echo "======================="
  read -p "请输入编号: " choice
  case $choice in
    1) list_topics ;;
    2) create_topic ;;
    3) delete_topic ;;
    4) describe_topic ;;
    5) send_complex_messages ;;
    6) consumer_group_lag ;;
    7) batch_delete_topics ;;
    8) cluster_health ;;
    0) exit 0 ;;
    *) echo "无效选项" ;;
  esac
}

while true; do
  menu
done

3. 配置优化

因为我的机器配置比较吃紧保存那么久的文件会很大,视情况调整参数。

# 使用基于时间的日志清理策略(默认),清理过期 segment 文件
log.cleanup.policy=delete
# 日志最多保留 1 小时(超出时间的 segment 文件将被标记清理)
log.retention.hours=1
# 每个日志段(segment)文件最大 100MB,超出后新建文件
log.segment.bytes=104857600
# 每 60 秒检查一次是否有旧日志需要清理(默认是 5 分钟,这里更快便于测试)
log.retention.check.interval.ms=60000