kafka Kraft模式k8s集群搭建
本教程可以帮助你使用k8s搭建一个三主的kafka集群
1. namespace
首先创建Kafka命名空间,使用istio的ambient模式
apiVersion: v1
kind: Namespace
metadata:
name: kafka
labels:
istio.io/dataplane-mode: ambient2. ConfigMap
这里使用SASL_PLAIN的加密方式
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-env
namespace: kafka
data:
# KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller" # 节点的角色,可以分开部署,为了方便还是一起吧
KAFKA_LISTENERS: "CLIENT://:9092,CONTROLLER://:9093" # CLIENT和CONTROLLER是KAFKA_LISTENER_SECURITY_PROTOCOL_MAP配置的映射
#KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://{{ $.Env.NODE_IP }}:9092"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" #
KAFKA_INTER_BROKER_LISTENER_NAME: "CLIENT"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,CLIENT:SASL_PLAINTEXT" # 加密方式的映射
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-0.kafka-hl:9093,2@kafka-1.kafka-hl:9093,3@kafka-2.kafka-hl:9093" # 节点
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
KAFKA_LOG_DIRS: "/var/lib/kafka/data"
KAFKA_NUM_PARTITIONS: "3"
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
# KAFKA_SECURITY_INTER_BROKER_PROTOCOL: "CLIENT"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN"
---
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-conf
namespace: kafka
data:
kafka_server_jaas.conf: |
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="Kafka"
username="admin"
password="1qaz@2wsX"
user_admin="1qaz@2wsX"
user_user1="1qaz@2wsX";
};3. Service
创建的ClusterIP和NodePort,在这里值得注意的是 internalTrafficPolicy: Local externalTrafficPolicy: Local 这两个配置,因为Kafka有leader的概念,只有leader可以读写,如果不加上这两个配置,访问拿到的leader地址会被kube-proxy的负载均衡策略转发到其它pod上。
apiVersion: v1
kind: Service
metadata:
name: kafka-hl
namespace: kafka
labels:
app: kafka
spec:
type: ClusterIP
clusterIP: None
publishNotReadyAddresses: true
ports:
- name: tcp-client
port: 9092
protocol: TCP
targetPort: client
appProtocol: kafka
- name: tcp-controller
protocol: TCP
port: 9093
targetPort: controller
appProtocol: kafka
selector:
app: kafka
app.kubernetes.io/version: 4.1.0
---
apiVersion: v1
kind: Service
metadata:
name: kafka
namespace: kafka
labels:
app: kafka
spec:
type: NodePort
ports:
- name: tcp-client
port: 9092
protocol: TCP
nodePort: 9092
appProtocol: kafka
- name: tcp-interbroker
port: 9093
protocol: TCP
nodePort: 9093
appProtocol: kafka
selector:
app: kafka
app.kubernetes.io/version: 4.1.0
# 这两个配置非常重要
internalTrafficPolicy: Local
externalTrafficPolicy: Local4. TCPRoute
这里再创建一个istio的TCPRoute给外部访问,这一步可有可无
apiVersion: gateway.networking.k8s.io/v1alpha2
kind: TCPRoute
metadata:
name: kafka
namespace: kafka
spec:
parentRefs:
- name: istio-gateway
kind: Gateway
namespace: istio-system
sectionName: tcp-kafka
rules:
- backendRefs:
- name: kafka
port: 9092
namespace: kafka
kind: Service5. StatefulSet
创建StatefulSet,有三个副本也就是三个pod
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
namespace: kafka
labels:
app: kafka
spec:
podManagementPolicy: Parallel
replicas: 3
selector:
matchLabels:
app: kafka
app.kubernetes.io/version: 4.1.0
serviceName: kafka-hl
template:
metadata:
labels:
app: kafka
app.kubernetes.io/version: 4.1.0
spec:
containers:
- name: kafka
image: apache/kafka:4.1.0
imagePullPolicy: "IfNotPresent"
command:
- sh
- -c
- |
BASE_ID=$(echo "$KAFKA_META_NAME" | grep -oE '[0-9]+')
# +1 得到最终 Node ID
export KAFKA_ADVERTISED_LISTENERS="CLIENT://$NODE_IP:9092"
export KAFKA_NODE_ID=$((BASE_ID + 1))
echo "Pod Name = $KAFKA_META_NAME"
echo "BASE_ID = $BASE_ID"
echo "KAFKA_NODE_ID = $KAFKA_NODE_ID"
/etc/kafka/docker/run
env:
- name: KAFKA_META_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
envFrom:
- configMapRef:
name: kafka-env
ports:
- name: controller
containerPort: 9093
- name: client
containerPort: 9092
startupProbe:
failureThreshold: 3
initialDelaySeconds: 30
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 5
tcpSocket:
port: 9092
livenessProbe:
failureThreshold: 3
initialDelaySeconds: 30
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 5
tcpSocket:
port: 9092
volumeMounts:
- name: time
mountPath: /etc/localtime
- name: data
mountPath: /var/lib/kafka/data
- name: logs
mountPath: /opt/kafka/logs
- name: conf
mountPath: /opt/kafka/config/kafka_server_jaas.conf
subPath: kafka_server_jaas.conf
volumes:
- name: time
hostPath:
path: /etc/localtime
type: File
- name: data
hostPath:
path: /data/kafka/data
type: DirectoryOrCreate
- name: logs
hostPath:
path: /data/kafka/logs
type: DirectoryOrCreate
- name: conf
configMap:
name: kafka-conf
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
app: kafka
topologyKey: kubernetes.io/hostname
weight: 1
许可协议:
CC BY 4.0