写在最前
因为安全要求现在需将 Kafka 升级至 3.9.x 或 4.x 版本。在此背景下,本文不仅记录部署过程,也引出一种常见且高效的日志采集方案:Java 应用将日志推送至 Kafka,由 Logstash 进行消费与处理,再写入 Elasticsearch,最终通过 Kibana 实现可视化查询与分析。
1. docker 部署
2. kubernetes 部署
2.1 statefulset
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: kafka
namespace: bx
annotations:
kubesphere.io/creator: tanqidi
kubesphere.io/description: 'apache/kafka:3.9.1'
spec:
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
creationTimestamp: null
labels:
app: kafka
annotations:
kubesphere.io/creator: admin
kubesphere.io/imagepullsecrets: '{}'
kubesphere.io/restartedAt: '2025-06-17T09:56:45.334Z'
logging.kubesphere.io/logsidecar-config: '{}'
spec:
containers:
- name: kafka
image: 'apache/kafka:3.9.1'
command:
- sh
- '-c'
- >
BROKER_ID=$(( ${HOSTNAME##*-} + 1 ))
echo "Using broker.id=$BROKER_ID"
exec /opt/kafka/bin/kafka-server-start.sh
/opt/kafka/config/server.properties \
--override broker.id=$BROKER_ID \
--override zookeeper.connect=zookeeper-0.zookeeper.bx.svc.cluster.local:2181,zookeeper-1.zookeeper.bx.svc.cluster.local:2181,zookeeper-2.zookeeper.bx.svc.cluster.local:2181 \
--override listeners=PLAINTEXT://0.0.0.0:9092 \
--override advertised.listeners=PLAINTEXT://$(POD_NAME).kafka.bx.svc.cluster.local:9092 \
--override log.dirs=/data/kafka/data
ports:
- name: http-9092
containerPort: 9092
protocol: TCP
env:
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: TZ
value: Asia/Shanghai
resources:
limits:
cpu: '1'
memory: 3Gi
requests:
cpu: 40m
memory: 1500Mi
volumeMounts:
- name: kafka-data
mountPath: /data/kafka
- name: kafka-logs
mountPath: /opt/kafka/logs
livenessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 2
periodSeconds: 10
successThreshold: 1
failureThreshold: 3
readinessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 2
periodSeconds: 5
successThreshold: 1
failureThreshold: 3
startupProbe:
tcpSocket:
port: 9092
initialDelaySeconds: 120
timeoutSeconds: 5
periodSeconds: 5
successThreshold: 1
failureThreshold: 72
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
securityContext: {}
imagePullSecrets:
- name: harbor-bx
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- kafka
topologyKey: kubernetes.io/hostname
schedulerName: default-scheduler
volumeClaimTemplates:
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: kafka-data
creationTimestamp: null
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
volumeMode: Filesystem
status:
phase: Pending
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: kafka-logs
creationTimestamp: null
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
volumeMode: Filesystem
status:
phase: Pending
serviceName: kafka
podManagementPolicy: OrderedReady
updateStrategy:
type: RollingUpdate
rollingUpdate:
partition: 0
revisionHistoryLimit: 10
2.2 service
kind: Service
apiVersion: v1
metadata:
name: kafka
namespace: bx
annotations:
kubesphere.io/creator: admin
spec:
ports:
- protocol: TCP
port: 9092
targetPort: 9092
selector:
app: kafka
clusterIP: None
clusterIPs:
- None
type: ClusterIP
sessionAffinity: None
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
internalTrafficPolicy: Cluster
2.3 logstash
2.3.1 deployment
kind: Deployment
apiVersion: apps/v1
metadata:
name: logstash
namespace: bx
annotations:
deployment.kubernetes.io/revision: '3'
kubesphere.io/creator: admin
spec:
replicas: 1
selector:
matchLabels:
app: logstash
template:
metadata:
creationTimestamp: null
labels:
app: logstash
annotations:
kubesphere.io/creator: admin
kubesphere.io/restartedAt: '2025-06-18T04:36:04.784Z'
spec:
volumes:
- name: config-volume
configMap:
name: logstash-config
defaultMode: 420
containers:
- name: logstash
image: 'docker.elastic.co/logstash/logstash:7.17.0'
ports:
- containerPort: 5044
protocol: TCP
resources: {}
volumeMounts:
- name: config-volume
mountPath: /usr/share/logstash/pipeline
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
securityContext: {}
schedulerName: default-scheduler
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 25%
maxSurge: 25%
revisionHistoryLimit: 10
progressDeadlineSeconds: 600
2.3.2 service
kind: Service
apiVersion: v1
metadata:
name: logstash
namespace: bx
annotations:
kubesphere.io/creator: admin
spec:
ports:
- name: beats
protocol: TCP
port: 5044
targetPort: 5044
selector:
app: logstash
type: ClusterIP
sessionAffinity: None
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
internalTrafficPolicy: Cluster
2.3.3 configmap
kind: ConfigMap
apiVersion: v1
metadata:
name: logstash-config
namespace: bx
annotations:
kubesphere.io/creator: admin
data:
logstash.conf: |
input {
kafka {
# 指定 Kafka 消息的解码器为 JSON,方便结构化处理
codec => "json"
# 匹配所有以 logstash- 开头的 Kafka topic,例如 logstash-app、logstash-nginx 等
topics_pattern => "logstash-.*"
# Kafka 集群的地址列表(多个 broker,逗号分隔)
bootstrap_servers => "kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092"
# 当没有 offset 时,从最新的消息开始消费(可选值:earliest 或 latest)
auto_offset_reset => "latest"
# 添加 Kafka 的元信息(如 topic、partition 等)到事件的 [@metadata] 字段中
decorate_events => true
# Kafka 消费者组 ID,用于管理 offset 和避免重复消费
group_id => "logstash-g1"
}
}
output {
elasticsearch {
# Elasticsearch 集群节点地址,可以配置多个做高可用
hosts => ["http://elasticsearch.bx.svc.cluster.local:9200"]
# 使用 Kafka 的 topic 名动态生成索引,并按天拆分索引,例如 logstash-app-2025-06-18
index => "%{[@metadata][kafka][topic]}-%{+YYYY-MM-dd}"
}
# 以下为可选输出,用于调试时打印完整事件到控制台
# stdout {
# # 使用 rubydebug 格式美化输出,适合调试数据结构
# codec => rubydebug
# }
}
2.4 kafka 常用命令
# 创建一个logstash-test1的topic,设置为 3 个分区,
./kafka-topics.sh \
--bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 \
--create --topic logstash-hello --partitions 3 --replication-factor 3
# 生成1000条json相关的模拟信息
seq 1 10000 | awk '
function rand_level() {
levels[1]="INFO"; levels[2]="WARN"; levels[3]="ERROR";
return levels[int(1+rand()*3)];
}
BEGIN { srand(); }
{
ts = strftime("2025-06-18T11:%02d:%02dZ", 0, 0, 0, 0, 0, 0, 0, 0) # 固定时间可改
level = rand_level()
printf("{\"timestamp\":\"2025-06-18T11:%02d:%02dZ\",\"level\":\"%s\",\"app\":\"my-app\",\"host\":\"host1.example.com\",\"message\":\"Test log message %d\",\"user_id\":%d,\"trace_id\":\"trace-%d\"}\n", int(rand()*59), int(rand()*59), level, $1, 1000+$1, 100000+$1);
}' | ./kafka-console-producer.sh --bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 --topic logstash-hello
# 列表topic
./kafka-topics.sh --bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 --list
# 删除topic
./kafka-topics.sh \
--bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 \
--delete \
--topic logstash-test1
2.5 kafka 测试脚本
#!/bin/bash
# Kafka 二进制目录,按需改成你实际路径
KAFKA_BIN_DIR="/opt/kafka/bin"
BOOTSTRAP_SERVERS="kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092"
function list_topics() {
echo "📜 所有 Topic 列表:"
topics=$($KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list)
if [[ -z "$topics" ]]; then
echo "没有找到任何Topic"
return
fi
# 过滤掉__consumer_offsets主题
filtered_topics=""
for topic in $topics; do
if [[ "$topic" != "__consumer_offsets" ]]; then
filtered_topics+=" $topic"
fi
done
if [[ -z "$filtered_topics" ]]; then
echo "除了__consumer_offsets外没有找到其他Topic"
return
fi
for topic in $filtered_topics; do
echo "🧩 $topic"
done
}
function create_topic() {
read -p "请输入Topic名称: " t
read -p "请输入分区数(默认3): " p
read -p "请输入副本因子(默认3): " r
p=${p:-3}
r=${r:-3}
$KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --create --topic "$t" --partitions $p --replication-factor $r
echo "Topic $t 创建完成"
}
function delete_topic() {
read -p "请输入要删除的Topic名称: " t
$KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --delete --topic "$t"
echo "Topic $t 删除请求已发送"
}
function describe_topic() {
read -p "请输入要查看分区状态的Topic名称: " t
$KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --topic "$t"
}
function send_complex_messages() {
read -p "请输入Topic名称: " t
read -p "请输入发送消息数: " n
echo "开始发送复杂JSON消息到 $t"
# 创建临时文件存储所有消息
tmpfile=$(mktemp)
trap 'rm -f "$tmpfile"' EXIT
# 定义随机业务类型
business_types=("订单支付" "用户注册" "商品浏览" "购物车操作" "订单退款" "库存更新" "优惠券使用" "搜索查询")
payment_methods=("支付宝" "微信支付" "银联" "信用卡" "余额支付")
product_categories=("电子产品" "服装鞋帽" "家居用品" "食品饮料" "图书音像" "美妆个护" "母婴用品")
user_roles=("普通用户" "VIP会员" "高级会员" "企业用户")
# 定义可能的异常类型和消息
error_types=("SQLException" "NullPointerException" "TimeoutException" "ValidationException" "IOException" "BusinessException" "DataNotFoundException" "UnauthorizedException")
error_messages=(
"数据库连接超时"
"空指针异常,参数未初始化"
"服务调用超时,等待超过30秒"
"参数验证失败:字段不能为空"
"文件读写错误,权限不足"
"业务规则冲突:库存不足"
"未找到匹配的数据记录"
"用户未授权,访问被拒绝"
)
for i in $(seq 1 $n); do
timestamp=$(date +"%Y-%m-%dT%H:%M:%S.%3N%z")
business_type=${business_types[$((RANDOM % ${#business_types[@]}))]}
# 10%概率生成错误日志
if (( RANDOM % 10 == 0 )); then
level="ERROR"
error_index=$((RANDOM % ${#error_types[@]}))
error_type=${error_types[$error_index]}
error_message=${error_messages[$error_index]}
stacktrace=$(cat <<EOF | tr -d '\n'
[
"at com.example.demo.service.${business_type// /}Service.process(${business_type// /}Service.java:$((100 + RANDOM % 200))",
"at com.example.demo.controller.${business_type// /}Controller.handleRequest(${business_type// /}Controller.java:$((50 + RANDOM % 100))",
"at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
"at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
"at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
"at java.lang.reflect.Method.invoke(Method.java:498)"
]
EOF
)
# 20%概率生成警告日志
elif (( RANDOM % 5 == 0 )); then
level="WARN"
random_delay=$((RANDOM % 5000))
error_message="性能警告:操作耗时较长(${random_delay}ms)"
error_type="PerformanceWarning"
stacktrace="[]"
else
level="INFO"
error_type=""
error_message=""
stacktrace="[]"
fi
# 根据业务类型生成不同的详细数据
case $business_type in
"订单支付")
amount=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 10000)) / 100" | bc))
payment_method=${payment_methods[$((RANDOM % ${#payment_methods[@]}))]}
order_id="ORD-$((1000000 + RANDOM % 9000000))"
products_count=$((1 + RANDOM % 5))
# 生成随机商品列表
products_json=""
for ((j=1; j<=products_count; j++)); do
product_price=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 1000)) / 100" | bc))
product_name="商品$(printf "%03d" $((RANDOM % 1000)))"
category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}
if [[ -n "$products_json" ]]; then
products_json+=", "
fi
products_json+="{\"name\":\"$product_name\",\"category\":\"$category\",\"price\":$product_price,\"quantity\":$((1 + RANDOM % 5))}"
done
details=$(cat <<EOF | tr -d '\n'
{
"orderId": "$order_id",
"amount": $amount,
"paymentMethod": "$payment_method",
"discount": $(printf "%.2f" $(echo "scale=2; $((RANDOM % 100)) / 100" | bc)),
"products": [$products_json],
"shippingAddress": {
"province": "省份$(printf "%02d" $((RANDOM % 34)))" ,
"city": "城市$(printf "%03d" $((RANDOM % 300)))" ,
"district": "区县$(printf "%03d" $((RANDOM % 3000)))" ,
"address": "街道$(printf "%04d" $((RANDOM % 10000)))-$(printf "%04d" $((RANDOM % 10000)))"
},
"paymentTime": "$timestamp"
}
EOF
)
;;
"用户注册")
role=${user_roles[$((RANDOM % ${#user_roles[@]}))]}
register_source=("网站" "APP" "小程序" "第三方平台")
source=${register_source[$((RANDOM % ${#register_source[@]}))]}
details=$(cat <<EOF | tr -d '\n'
{
"username": "user$((10000 + RANDOM % 90000))",
"email": "user$((10000 + RANDOM % 90000))@example.com",
"phone": "13${RANDOM:0:9}",
"registerTime": "$timestamp",
"source": "$source",
"role": "$role",
"referrer": "user$((10000 + RANDOM % 90000))"
}
EOF
)
;;
"商品浏览")
product_id="PRD-$((1000000 + RANDOM % 9000000))"
category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}
price=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 1000)) / 100" | bc))
# 修复用户代理生成的语法错误
chrome_version=$((80 + RANDOM % 10))
build_number=$((1000 + RANDOM % 1000))
patch_version=$((10 + RANDOM % 90))
details=$(cat <<EOF | tr -d '\n'
{
"productId": "$product_id",
"productName": "商品$(printf "%03d" $((RANDOM % 1000)))-$category",
"category": "$category",
"price": $price,
"brand": "品牌$(printf "%02d" $((RANDOM % 100)))",
"viewTime": "$timestamp",
"viewDuration": $((1 + RANDOM % 300)),
"fromPage": "category/$category",
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/${chrome_version}.0.${build_number}.${patch_version} Safari/537.36"
}
EOF
)
;;
"购物车操作")
action=("添加" "删除" "修改数量")
selected_action=${action[$((RANDOM % ${#action[@]}))]}
product_id="PRD-$((1000000 + RANDOM % 9000000))"
category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}
details=$(cat <<EOF | tr -d '\n'
{
"action": "$selected_action",
"productId": "$product_id",
"productName": "商品$(printf "%03d" $((RANDOM % 1000)))-$category",
"category": "$category",
"quantity": $((1 + RANDOM % 10)),
"cartId": "CART-$((1000000 + RANDOM % 9000000))",
"operationTime": "$timestamp"
}
EOF
)
;;
"订单退款")
order_id="ORD-$((1000000 + RANDOM % 9000000))"
amount=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 10000)) / 100" | bc))
reason=("商品质量问题" "尺寸不合适" "颜色不符" "重复购买" "其他原因")
refund_reason=${reason[$((RANDOM % ${#reason[@]}))]}
details=$(cat <<EOF | tr -d '\n'
{
"orderId": "$order_id",
"refundId": "REF-$((1000000 + RANDOM % 9000000))",
"amount": $amount,
"reason": "$refund_reason",
"refundTime": "$timestamp",
"status": "处理中",
"handler": "客服$(printf "%02d" $((RANDOM % 100)))"
}
EOF
)
;;
"库存更新")
product_id="PRD-$((1000000 + RANDOM % 9000000))"
category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}
location=("仓库A" "仓库B" "仓库C" "门店$(printf "%02d" $((RANDOM % 50)))" "配送中心")
stock_location=${location[$((RANDOM % ${#location[@]}))]}
old_stock=$((RANDOM % 1000))
new_stock=$((RANDOM % 1000))
details=$(cat <<EOF | tr -d '\n'
{
"productId": "$product_id",
"productName": "商品$(printf "%03d" $((RANDOM % 1000)))-$category",
"location": "$stock_location",
"oldStock": $old_stock,
"newStock": $new_stock,
"changeReason": "库存盘点",
"updateTime": "$timestamp",
"operator": "员工$(printf "%04d" $((RANDOM % 10000)))"
}
EOF
)
;;
"优惠券使用")
coupon_id="COUPON-$((1000000 + RANDOM % 9000000))"
discount_type=("满减" "折扣" "无门槛")
selected_type=${discount_type[$((RANDOM % ${#discount_type[@]}))]}
amount=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 100)) / 100" | bc))
details=$(cat <<EOF | tr -d '\n'
{
"couponId": "$coupon_id",
"discountType": "$selected_type",
"discountAmount": $amount,
"minAmount": $((RANDOM % 200)),
"orderId": "ORD-$((1000000 + RANDOM % 9000000))",
"userId": "user$((10000 + RANDOM % 90000))",
"useTime": "$timestamp",
"expireDate": "$(date -d "+$((RANDOM % 30)) days" +"%Y-%m-%d")"
}
EOF
)
;;
"搜索查询")
query_terms=("手机" "电视" "衣服" "鞋子" "笔记本电脑" "化妆品" "婴儿用品" "食品" "图书" "耳机" "相机")
query_term=${query_terms[$((RANDOM % ${#query_terms[@]}))]}
results_count=$((RANDOM % 100))
details=$(cat <<EOF | tr -d '\n'
{
"query": "$query_term",
"resultsCount": $results_count,
"searchTime": "$timestamp",
"searchType": "商品搜索",
"sort": "默认",
"filters": {
"priceRange": "$((RANDOM % 1000))-$((RANDOM % 1000 + 1000))",
"brand": "品牌$(printf "%02d" $((RANDOM % 100)))"
},
"userId": "user$((10000 + RANDOM % 90000))"
}
EOF
)
;;
esac
# 根据日志级别调整消息内容
if [[ "$level" == "ERROR" ]]; then
message="处理$business_type时发生错误"
response_code=500
response_message="操作失败:$error_message"
success=false
elif [[ "$level" == "WARN" ]]; then
message="处理$business_type时出现警告"
response_code=200
response_message="操作成功,但有警告:$error_message"
success=true
else
message="$business_type 业务处理完成"
response_code=200
response_message="操作成功"
success=true
fi
# 生成完整JSON消息
json=$(cat <<EOF | tr -d '\n'
{
"timestamp": "$timestamp",
"level": "$level",
"thread": "http-nio-8080-exec-$((RANDOM%10+1))",
"logger": "com.example.demo.service.${business_type// /}Service",
"message": "$message",
"requestId": "$(uuidgen 2>/dev/null || echo fallback-$(date +%s%N))",
"businessType": "$business_type",
"details": $details,
"response": {
"code": $response_code,
"message": "$response_message",
"success": $success,
"timestamp": "$timestamp"
},
"error": {
"type": "$error_type",
"message": "$error_message",
"stackTrace": $stacktrace
}
}
EOF
)
# 将单行JSON写入临时文件
echo "$json" >> "$tmpfile"
done
# 使用临时文件发送所有消息
$KAFKA_BIN_DIR/kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVERS --topic "$t" < "$tmpfile"
echo "消息发送完成"
}
function consumer_group_lag() {
read -p "请输入消费者组名称: " cg
$KAFKA_BIN_DIR/kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --group "$cg"
}
function batch_delete_topics() {
read -p "请输入匹配删除Topic的正则表达式(例如 ^test.*): " pattern
topics=$($KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list | grep -E "$pattern")
if [[ -z "$topics" ]]; then
echo "没有匹配的Topic"
return
fi
echo "将删除以下Topics:"
echo "$topics"
for t in $topics; do
$KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --delete --topic "$t"
echo "删除请求已发送: $t"
done
}
function cluster_health() {
echo "🔍 Kafka 集群 Leader/副本状态:"
$KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe | grep -E "Topic|Leader|Partition|ISR"
}
function menu() {
echo ""
echo "==== Kafka 工具脚本 ===="
echo "1) 列出所有 Topic"
echo "2) 创建 Topic"
echo "3) 删除 Topic"
echo "4) 查看 Topic 分区状态"
echo "5) 发送复杂 JSON 消息 (模拟 SpringBoot MVC 日志)"
echo "6) 查看消费者组 Lag"
echo "7) 批量删除符合模式 Topic"
echo "8) 集群健康检查"
echo "0) 退出"
echo "======================="
read -p "请输入编号: " choice
case $choice in
1) list_topics ;;
2) create_topic ;;
3) delete_topic ;;
4) describe_topic ;;
5) send_complex_messages ;;
6) consumer_group_lag ;;
7) batch_delete_topics ;;
8) cluster_health ;;
0) exit 0 ;;
*) echo "无效选项" ;;
esac
}
while true; do
menu
done
3. 配置优化
因为我的机器配置比较吃紧保存那么久的文件会很大,视情况调整参数。
# 使用基于时间的日志清理策略(默认),清理过期 segment 文件
log.cleanup.policy=delete
# 日志最多保留 1 小时(超出时间的 segment 文件将被标记清理)
log.retention.hours=1
# 每个日志段(segment)文件最大 100MB,超出后新建文件
log.segment.bytes=104857600
# 每 60 秒检查一次是否有旧日志需要清理(默认是 5 分钟,这里更快便于测试)
log.retention.check.interval.ms=60000