写在最前
因为安全要求现在需将 Kafka 升级至 3.9.x 或 4.x 版本。在此背景下,本文不仅记录部署过程,也引出一种常见且高效的日志采集方案:Java 应用将日志推送至 Kafka,由 Logstash 进行消费与处理,再写入 Elasticsearch,最终通过 Kibana 实现可视化查询与分析。
1. docker 部署
2. kubernetes 部署
2.1 statefulset
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: kafka
namespace: bx
annotations:
kubesphere.io/creator: tanqidi
kubesphere.io/description: 'apache/kafka:3.9.1'
spec:
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
creationTimestamp: null
labels:
app: kafka
annotations:
kubesphere.io/creator: admin
kubesphere.io/imagepullsecrets: '{}'
kubesphere.io/restartedAt: '2025-06-17T09:56:45.334Z'
logging.kubesphere.io/logsidecar-config: '{}'
spec:
containers:
- name: kafka
image: 'apache/kafka:3.9.1'
command:
- sh
- '-c'
- >
BROKER_ID=$(( ${HOSTNAME##*-} + 1 ))
echo "Using broker.id=$BROKER_ID"
exec /opt/kafka/bin/kafka-server-start.sh
/opt/kafka/config/server.properties \
--override broker.id=$BROKER_ID
ports:
- name: http-9092
containerPort: 9092
protocol: TCP
env:
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: TZ
value: Asia/Shanghai
resources:
limits:
cpu: '1'
memory: 3Gi
requests:
cpu: 40m
memory: 1500Mi
volumeMounts:
- name: kafka-data
mountPath: /data/kafka
- name: kafka-logs
mountPath: /opt/kafka/logs
livenessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 2
periodSeconds: 10
successThreshold: 1
failureThreshold: 3
readinessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 2
periodSeconds: 5
successThreshold: 1
failureThreshold: 3
startupProbe:
tcpSocket:
port: 9092
initialDelaySeconds: 120
timeoutSeconds: 5
periodSeconds: 5
successThreshold: 1
failureThreshold: 72
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
securityContext: {}
imagePullSecrets:
- name: harbor-bx
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- kafka
topologyKey: kubernetes.io/hostname
schedulerName: default-scheduler
volumeClaimTemplates:
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: kafka-data
creationTimestamp: null
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
volumeMode: Filesystem
status:
phase: Pending
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: kafka-logs
creationTimestamp: null
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
volumeMode: Filesystem
status:
phase: Pending
serviceName: kafka
podManagementPolicy: OrderedReady
updateStrategy:
type: RollingUpdate
rollingUpdate:
partition: 0
revisionHistoryLimit: 10
2.2 service
kind: Service
apiVersion: v1
metadata:
name: kafka
namespace: bx
annotations:
kubesphere.io/creator: admin
spec:
ports:
- protocol: TCP
port: 9092
targetPort: 9092
selector:
app: kafka
clusterIP: None
clusterIPs:
- None
type: ClusterIP
sessionAffinity: None
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
internalTrafficPolicy: Cluster
2.3 logstash
2.3.1 deployment
kind: Deployment
apiVersion: apps/v1
metadata:
name: logstash
namespace: bx
annotations:
deployment.kubernetes.io/revision: '3'
kubesphere.io/creator: admin
spec:
replicas: 1
selector:
matchLabels:
app: logstash
template:
metadata:
creationTimestamp: null
labels:
app: logstash
annotations:
kubesphere.io/creator: admin
kubesphere.io/restartedAt: '2025-06-18T04:36:04.784Z'
spec:
volumes:
- name: config-volume
configMap:
name: logstash-config
defaultMode: 420
containers:
- name: logstash
image: 'docker.elastic.co/logstash/logstash:7.17.0'
ports:
- containerPort: 5044
protocol: TCP
resources: {}
volumeMounts:
- name: config-volume
mountPath: /usr/share/logstash/pipeline
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
securityContext: {}
schedulerName: default-scheduler
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 25%
maxSurge: 25%
revisionHistoryLimit: 10
progressDeadlineSeconds: 600
2.3.2 service
kind: Service
apiVersion: v1
metadata:
name: logstash
namespace: bx
annotations:
kubesphere.io/creator: admin
spec:
ports:
- name: beats
protocol: TCP
port: 5044
targetPort: 5044
selector:
app: logstash
type: ClusterIP
sessionAffinity: None
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
internalTrafficPolicy: Cluster
2.3.3 configmap
kind: ConfigMap
apiVersion: v1
metadata:
name: logstash-config
namespace: bx
annotations:
kubesphere.io/creator: admin
data:
logstash.conf: |
input {
kafka {
# 指定 Kafka 消息的解码器为 JSON,方便结构化处理
codec => "json"
# 匹配所有以 logstash- 开头的 Kafka topic,例如 logstash-app、logstash-nginx 等
topics_pattern => "logstash-.*"
# Kafka 集群的地址列表(多个 broker,逗号分隔)
bootstrap_servers => "kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092"
# 当没有 offset 时,从最新的消息开始消费(可选值:earliest 或 latest)
auto_offset_reset => "latest"
# 添加 Kafka 的元信息(如 topic、partition 等)到事件的 [@metadata] 字段中
decorate_events => true
# Kafka 消费者组 ID,用于管理 offset 和避免重复消费
group_id => "logstash-g1"
}
}
output {
elasticsearch {
# Elasticsearch 集群节点地址,可以配置多个做高可用
hosts => ["http://elasticsearch.bx.svc.cluster.local:9200"]
# 使用 Kafka 的 topic 名动态生成索引,并按天拆分索引,例如 logstash-app-2025-06-18
index => "%{[@metadata][kafka][topic]}-%{+YYYY-MM-dd}"
}
# 以下为可选输出,用于调试时打印完整事件到控制台
# stdout {
# # 使用 rubydebug 格式美化输出,适合调试数据结构
# codec => rubydebug
# }
}
2.4 kafka 常用命令
# 创建一个logstash-test1的topic,设置为 3 个分区,
./kafka-topics.sh \
--bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 \
--create --topic logstash-hello --partitions 3 --replication-factor 3
# 生成1000条json相关的模拟信息
seq 1 10000 | awk '
function rand_level() {
levels[1]="INFO"; levels[2]="WARN"; levels[3]="ERROR";
return levels[int(1+rand()*3)];
}
BEGIN { srand(); }
{
ts = strftime("2025-06-18T11:%02d:%02dZ", 0, 0, 0, 0, 0, 0, 0, 0) # 固定时间可改
level = rand_level()
printf("{\"timestamp\":\"2025-06-18T11:%02d:%02dZ\",\"level\":\"%s\",\"app\":\"my-app\",\"host\":\"host1.example.com\",\"message\":\"Test log message %d\",\"user_id\":%d,\"trace_id\":\"trace-%d\"}\n", int(rand()*59), int(rand()*59), level, $1, 1000+$1, 100000+$1);
}' | ./kafka-console-producer.sh --bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 --topic logstash-hello
# 列表topic
./kafka-topics.sh --bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 --list
# 删除topic
./kafka-topics.sh \
--bootstrap-server kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092 \
--delete \
--topic logstash-test1
2.5 kafka 测试脚本
#!/bin/bash
# Kafka 二进制目录,按需改成你实际路径
KAFKA_BIN_DIR="/opt/kafka/bin"
BOOTSTRAP_SERVERS="kafka-0.kafka.bx.svc.cluster.local:9092,kafka-1.kafka.bx.svc.cluster.local:9092,kafka-2.kafka.bx.svc.cluster.local:9092"
function list_topics() {
echo "📜 所有 Topic 列表:"
topics=$($KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list)
if [[ -z "$topics" ]]; then
echo "没有找到任何Topic"
return
fi
# 过滤掉__consumer_offsets主题
filtered_topics=""
for topic in $topics; do
if [[ "$topic" != "__consumer_offsets" ]]; then
filtered_topics+=" $topic"
fi
done
if [[ -z "$filtered_topics" ]]; then
echo "除了__consumer_offsets外没有找到其他Topic"
return
fi
for topic in $filtered_topics; do
echo "🧩 $topic"
done
}
function create_topic() {
read -p "请输入Topic名称: " t
read -p "请输入分区数(默认3): " p
read -p "请输入副本因子(默认3): " r
p=${p:-3}
r=${r:-3}
$KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --create --topic "$t" --partitions $p --replication-factor $r
echo "Topic $t 创建完成"
}
function delete_topic() {
read -p "请输入要删除的Topic名称: " t
$KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --delete --topic "$t"
echo "Topic $t 删除请求已发送"
}
function describe_topic() {
read -p "请输入要查看分区状态的Topic名称: " t
$KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --topic "$t"
}
function send_complex_messages() {
read -p "请输入Topic名称: " t
read -p "请输入发送消息数: " n
echo "开始发送复杂JSON消息到 $t"
# 创建临时文件存储所有消息
tmpfile=$(mktemp)
trap 'rm -f "$tmpfile"' EXIT
# 定义随机业务类型
business_types=("订单支付" "用户注册" "商品浏览" "购物车操作" "订单退款" "库存更新" "优惠券使用" "搜索查询")
payment_methods=("支付宝" "微信支付" "银联" "信用卡" "余额支付")
product_categories=("电子产品" "服装鞋帽" "家居用品" "食品饮料" "图书音像" "美妆个护" "母婴用品")
user_roles=("普通用户" "VIP会员" "高级会员" "企业用户")
# 定义可能的异常类型和消息
error_types=("SQLException" "NullPointerException" "TimeoutException" "ValidationException" "IOException" "BusinessException" "DataNotFoundException" "UnauthorizedException")
error_messages=(
"数据库连接超时"
"空指针异常,参数未初始化"
"服务调用超时,等待超过30秒"
"参数验证失败:字段不能为空"
"文件读写错误,权限不足"
"业务规则冲突:库存不足"
"未找到匹配的数据记录"
"用户未授权,访问被拒绝"
)
for i in $(seq 1 $n); do
timestamp=$(date +"%Y-%m-%dT%H:%M:%S.%3N%z")
business_type=${business_types[$((RANDOM % ${#business_types[@]}))]}
# 10%概率生成错误日志
if (( RANDOM % 10 == 0 )); then
level="ERROR"
error_index=$((RANDOM % ${#error_types[@]}))
error_type=${error_types[$error_index]}
error_message=${error_messages[$error_index]}
stacktrace=$(cat <<EOF | tr -d '\n'
[
"at com.example.demo.service.${business_type// /}Service.process(${business_type// /}Service.java:$((100 + RANDOM % 200))",
"at com.example.demo.controller.${business_type// /}Controller.handleRequest(${business_type// /}Controller.java:$((50 + RANDOM % 100))",
"at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
"at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
"at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
"at java.lang.reflect.Method.invoke(Method.java:498)"
]
EOF
)
# 20%概率生成警告日志
elif (( RANDOM % 5 == 0 )); then
level="WARN"
random_delay=$((RANDOM % 5000))
error_message="性能警告:操作耗时较长(${random_delay}ms)"
error_type="PerformanceWarning"
stacktrace="[]"
else
level="INFO"
error_type=""
error_message=""
stacktrace="[]"
fi
# 根据业务类型生成不同的详细数据
case $business_type in
"订单支付")
amount=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 10000)) / 100" | bc))
payment_method=${payment_methods[$((RANDOM % ${#payment_methods[@]}))]}
order_id="ORD-$((1000000 + RANDOM % 9000000))"
products_count=$((1 + RANDOM % 5))
# 生成随机商品列表
products_json=""
for ((j=1; j<=products_count; j++)); do
product_price=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 1000)) / 100" | bc))
product_name="商品$(printf "%03d" $((RANDOM % 1000)))"
category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}
if [[ -n "$products_json" ]]; then
products_json+=", "
fi
products_json+="{\"name\":\"$product_name\",\"category\":\"$category\",\"price\":$product_price,\"quantity\":$((1 + RANDOM % 5))}"
done
details=$(cat <<EOF | tr -d '\n'
{
"orderId": "$order_id",
"amount": $amount,
"paymentMethod": "$payment_method",
"discount": $(printf "%.2f" $(echo "scale=2; $((RANDOM % 100)) / 100" | bc)),
"products": [$products_json],
"shippingAddress": {
"province": "省份$(printf "%02d" $((RANDOM % 34)))" ,
"city": "城市$(printf "%03d" $((RANDOM % 300)))" ,
"district": "区县$(printf "%03d" $((RANDOM % 3000)))" ,
"address": "街道$(printf "%04d" $((RANDOM % 10000)))-$(printf "%04d" $((RANDOM % 10000)))"
},
"paymentTime": "$timestamp"
}
EOF
)
;;
"用户注册")
role=${user_roles[$((RANDOM % ${#user_roles[@]}))]}
register_source=("网站" "APP" "小程序" "第三方平台")
source=${register_source[$((RANDOM % ${#register_source[@]}))]}
details=$(cat <<EOF | tr -d '\n'
{
"username": "user$((10000 + RANDOM % 90000))",
"email": "user$((10000 + RANDOM % 90000))@example.com",
"phone": "13${RANDOM:0:9}",
"registerTime": "$timestamp",
"source": "$source",
"role": "$role",
"referrer": "user$((10000 + RANDOM % 90000))"
}
EOF
)
;;
"商品浏览")
product_id="PRD-$((1000000 + RANDOM % 9000000))"
category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}
price=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 1000)) / 100" | bc))
# 修复用户代理生成的语法错误
chrome_version=$((80 + RANDOM % 10))
build_number=$((1000 + RANDOM % 1000))
patch_version=$((10 + RANDOM % 90))
details=$(cat <<EOF | tr -d '\n'
{
"productId": "$product_id",
"productName": "商品$(printf "%03d" $((RANDOM % 1000)))-$category",
"category": "$category",
"price": $price,
"brand": "品牌$(printf "%02d" $((RANDOM % 100)))",
"viewTime": "$timestamp",
"viewDuration": $((1 + RANDOM % 300)),
"fromPage": "category/$category",
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/${chrome_version}.0.${build_number}.${patch_version} Safari/537.36"
}
EOF
)
;;
"购物车操作")
action=("添加" "删除" "修改数量")
selected_action=${action[$((RANDOM % ${#action[@]}))]}
product_id="PRD-$((1000000 + RANDOM % 9000000))"
category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}
details=$(cat <<EOF | tr -d '\n'
{
"action": "$selected_action",
"productId": "$product_id",
"productName": "商品$(printf "%03d" $((RANDOM % 1000)))-$category",
"category": "$category",
"quantity": $((1 + RANDOM % 10)),
"cartId": "CART-$((1000000 + RANDOM % 9000000))",
"operationTime": "$timestamp"
}
EOF
)
;;
"订单退款")
order_id="ORD-$((1000000 + RANDOM % 9000000))"
amount=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 10000)) / 100" | bc))
reason=("商品质量问题" "尺寸不合适" "颜色不符" "重复购买" "其他原因")
refund_reason=${reason[$((RANDOM % ${#reason[@]}))]}
details=$(cat <<EOF | tr -d '\n'
{
"orderId": "$order_id",
"refundId": "REF-$((1000000 + RANDOM % 9000000))",
"amount": $amount,
"reason": "$refund_reason",
"refundTime": "$timestamp",
"status": "处理中",
"handler": "客服$(printf "%02d" $((RANDOM % 100)))"
}
EOF
)
;;
"库存更新")
product_id="PRD-$((1000000 + RANDOM % 9000000))"
category=${product_categories[$((RANDOM % ${#product_categories[@]}))]}
location=("仓库A" "仓库B" "仓库C" "门店$(printf "%02d" $((RANDOM % 50)))" "配送中心")
stock_location=${location[$((RANDOM % ${#location[@]}))]}
old_stock=$((RANDOM % 1000))
new_stock=$((RANDOM % 1000))
details=$(cat <<EOF | tr -d '\n'
{
"productId": "$product_id",
"productName": "商品$(printf "%03d" $((RANDOM % 1000)))-$category",
"location": "$stock_location",
"oldStock": $old_stock,
"newStock": $new_stock,
"changeReason": "库存盘点",
"updateTime": "$timestamp",
"operator": "员工$(printf "%04d" $((RANDOM % 10000)))"
}
EOF
)
;;
"优惠券使用")
coupon_id="COUPON-$((1000000 + RANDOM % 9000000))"
discount_type=("满减" "折扣" "无门槛")
selected_type=${discount_type[$((RANDOM % ${#discount_type[@]}))]}
amount=$(printf "%.2f" $(echo "scale=2; $((RANDOM % 100)) / 100" | bc))
details=$(cat <<EOF | tr -d '\n'
{
"couponId": "$coupon_id",
"discountType": "$selected_type",
"discountAmount": $amount,
"minAmount": $((RANDOM % 200)),
"orderId": "ORD-$((1000000 + RANDOM % 9000000))",
"userId": "user$((10000 + RANDOM % 90000))",
"useTime": "$timestamp",
"expireDate": "$(date -d "+$((RANDOM % 30)) days" +"%Y-%m-%d")"
}
EOF
)
;;
"搜索查询")
query_terms=("手机" "电视" "衣服" "鞋子" "笔记本电脑" "化妆品" "婴儿用品" "食品" "图书" "耳机" "相机")
query_term=${query_terms[$((RANDOM % ${#query_terms[@]}))]}
results_count=$((RANDOM % 100))
details=$(cat <<EOF | tr -d '\n'
{
"query": "$query_term",
"resultsCount": $results_count,
"searchTime": "$timestamp",
"searchType": "商品搜索",
"sort": "默认",
"filters": {
"priceRange": "$((RANDOM % 1000))-$((RANDOM % 1000 + 1000))",
"brand": "品牌$(printf "%02d" $((RANDOM % 100)))"
},
"userId": "user$((10000 + RANDOM % 90000))"
}
EOF
)
;;
esac
# 根据日志级别调整消息内容
if [[ "$level" == "ERROR" ]]; then
message="处理$business_type时发生错误"
response_code=500
response_message="操作失败:$error_message"
success=false
elif [[ "$level" == "WARN" ]]; then
message="处理$business_type时出现警告"
response_code=200
response_message="操作成功,但有警告:$error_message"
success=true
else
message="$business_type 业务处理完成"
response_code=200
response_message="操作成功"
success=true
fi
# 生成完整JSON消息
json=$(cat <<EOF | tr -d '\n'
{
"timestamp": "$timestamp",
"level": "$level",
"thread": "http-nio-8080-exec-$((RANDOM%10+1))",
"logger": "com.example.demo.service.${business_type// /}Service",
"message": "$message",
"requestId": "$(uuidgen 2>/dev/null || echo fallback-$(date +%s%N))",
"businessType": "$business_type",
"details": $details,
"response": {
"code": $response_code,
"message": "$response_message",
"success": $success,
"timestamp": "$timestamp"
},
"error": {
"type": "$error_type",
"message": "$error_message",
"stackTrace": $stacktrace
}
}
EOF
)
# 将单行JSON写入临时文件
echo "$json" >> "$tmpfile"
done
# 使用临时文件发送所有消息
$KAFKA_BIN_DIR/kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVERS --topic "$t" < "$tmpfile"
echo "消息发送完成"
}
function consumer_group_lag() {
read -p "请输入消费者组名称: " cg
$KAFKA_BIN_DIR/kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --group "$cg"
}
function batch_delete_topics() {
read -p "请输入匹配删除Topic的正则表达式(例如 ^test.*): " pattern
topics=$($KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list | grep -E "$pattern")
if [[ -z "$topics" ]]; then
echo "没有匹配的Topic"
return
fi
echo "将删除以下Topics:"
echo "$topics"
for t in $topics; do
$KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --delete --topic "$t"
echo "删除请求已发送: $t"
done
}
function cluster_health() {
echo "🔍 Kafka 集群 Leader/副本状态:"
$KAFKA_BIN_DIR/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe | grep -E "Topic|Leader|Partition|ISR"
}
function menu() {
echo ""
echo "==== Kafka 工具脚本 ===="
echo "1) 列出所有 Topic"
echo "2) 创建 Topic"
echo "3) 删除 Topic"
echo "4) 查看 Topic 分区状态"
echo "5) 发送复杂 JSON 消息 (模拟 SpringBoot MVC 日志)"
echo "6) 查看消费者组 Lag"
echo "7) 批量删除符合模式 Topic"
echo "8) 集群健康检查"
echo "0) 退出"
echo "======================="
read -p "请输入编号: " choice
case $choice in
1) list_topics ;;
2) create_topic ;;
3) delete_topic ;;
4) describe_topic ;;
5) send_complex_messages ;;
6) consumer_group_lag ;;
7) batch_delete_topics ;;
8) cluster_health ;;
0) exit 0 ;;
*) echo "无效选项" ;;
esac
}
while true; do
menu
done
3. 配置优化
因为我的机器配置比较吃紧保存那么久的文件会很大,视情况调整参数。
# broker.id=0
# listeners=PLAINTEXT://0.0.0.0:9092
# advertised.listeners=PLAINTEXT://kafka-0.kafka.bx.svc.cluster.local:9092
############################# 服务基础配置 #############################
# Kafka Broker 的唯一编号(将通过启动参数覆盖)
# broker.id=0
############################# Socket 网络配置 #############################
# 网络接收线程数
num.network.threads=3
# IO 线程数(处理磁盘读写)
num.io.threads=8
# Socket 发送缓冲区大小(字节)
socket.send.buffer.bytes=102400
# Socket 接收缓冲区大小(字节)
socket.receive.buffer.bytes=102400
# 单个请求允许的最大字节数
socket.request.max.bytes=104857600
############################# 日志基本配置 #############################
# Kafka 数据目录(启动参数会覆盖此项)
log.dirs=/data/kafka/data
# 每个 Topic 默认分区数
num.partitions=1
# 每个数据目录恢复线程数
num.recovery.threads.per.data.dir=1
############################# 内部 Topic 配置 #############################
# 消费者组元数据 Topic 副本数
offsets.topic.replication.factor=1
# 事务状态日志的副本数
transaction.state.log.replication.factor=1
# 事务日志最小同步副本数
transaction.state.log.min.isr=1
############################# 日志刷盘策略(可选) #############################
# 接收指定数量消息后触发刷盘
# log.flush.interval.messages=10000
# 消息最多在内存中停留时间(毫秒)后强制刷盘
# log.flush.interval.ms=1000
############################# 日志保留策略 #############################
# 日志保留时间(小时),到期的 segment 将被清理
log.retention.hours=168
# 单个 segment 文件大小达到阈值时滚动新文件(默认 1GB)
log.segment.bytes=1073741824
# 日志清理检查周期(毫秒)
log.retention.check.interval.ms=300000
############################# ZooKeeper 配置 #############################
# ZooKeeper 地址(将通过启动参数覆盖)
zookeeper.connect=zookeeper-0.zookeeper.bx.svc.cluster.local:2181,zookeeper-1.zookeeper.bx.svc.cluster.local:2181,zookeeper-2.zookeeper.bx.svc.cluster.local:2181
# ZooKeeper 连接超时时间(毫秒)
zookeeper.connection.timeout.ms=18000
############################# 消费者组协调器配置 #############################
# 初始消费者 rebalance 的延迟时间(毫秒)
group.initial.rebalance.delay.ms=0