春

春夏秋冬平平安安

A text-focused Halo theme

  • 首页
  • 关于
主页 kafka Kraft模式k8s集群搭建
文章

kafka Kraft模式k8s集群搭建

发表于 昨天 更新于 昨天
作者 Administrator
17~22 分钟 阅读

本教程可以帮助你使用k8s搭建一个三主的kafka集群

1. namespace

首先创建Kafka命名空间,使用istio的ambient模式

apiVersion: v1
kind: Namespace
metadata:
  name: kafka
  labels:
    istio.io/dataplane-mode: ambient

2. ConfigMap

这里使用SASL_PLAIN的加密方式

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-env
  namespace: kafka
data:
  #  KAFKA_NODE_ID: 1
  KAFKA_PROCESS_ROLES: "broker,controller" # 节点的角色,可以分开部署,为了方便还是一起吧
  KAFKA_LISTENERS: "CLIENT://:9092,CONTROLLER://:9093"   # CLIENT和CONTROLLER是KAFKA_LISTENER_SECURITY_PROTOCOL_MAP配置的映射
  #KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://{{ $.Env.NODE_IP }}:9092"
  KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"  # 
  KAFKA_INTER_BROKER_LISTENER_NAME: "CLIENT"
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,CLIENT:SASL_PLAINTEXT"  # 加密方式的映射
  KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-0.kafka-hl:9093,2@kafka-1.kafka-hl:9093,3@kafka-2.kafka-hl:9093" # 节点
  KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
  KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
  KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
  KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
  KAFKA_LOG_DIRS: "/var/lib/kafka/data"
  KAFKA_NUM_PARTITIONS: "3"
  KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
  # KAFKA_SECURITY_INTER_BROKER_PROTOCOL: "CLIENT"
  KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN"
  KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-conf
  namespace: kafka
data:
  kafka_server_jaas.conf: |
    KafkaServer {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      serviceName="Kafka"
      username="admin"
      password="1qaz@2wsX"
      user_admin="1qaz@2wsX"
      user_user1="1qaz@2wsX";
    };

3. Service

创建的ClusterIP和NodePort,在这里值得注意的是 internalTrafficPolicy: Local externalTrafficPolicy: Local 这两个配置,因为Kafka有leader的概念,只有leader可以读写,如果不加上这两个配置,访问拿到的leader地址会被kube-proxy的负载均衡策略转发到其它pod上。

apiVersion: v1
kind: Service
metadata:
  name: kafka-hl
  namespace: kafka
  labels:
    app: kafka
spec:
  type: ClusterIP
  clusterIP: None
  publishNotReadyAddresses: true
  ports:
    - name: tcp-client
      port: 9092
      protocol: TCP
      targetPort: client
      appProtocol: kafka
    - name: tcp-controller
      protocol: TCP
      port: 9093
      targetPort: controller
      appProtocol: kafka
  selector:
    app: kafka
    app.kubernetes.io/version: 4.1.0
---
apiVersion: v1
kind: Service
metadata:
  name: kafka
  namespace: kafka
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
    - name: tcp-client
      port: 9092
      protocol: TCP
      nodePort: 9092
      appProtocol: kafka
    - name: tcp-interbroker
      port: 9093
      protocol: TCP
      nodePort: 9093
      appProtocol: kafka
  selector:
    app: kafka
    app.kubernetes.io/version: 4.1.0
# 这两个配置非常重要
  internalTrafficPolicy: Local
  externalTrafficPolicy: Local

4. TCPRoute

这里再创建一个istio的TCPRoute给外部访问,这一步可有可无

apiVersion: gateway.networking.k8s.io/v1alpha2
kind: TCPRoute
metadata:
  name: kafka
  namespace: kafka
spec:
  parentRefs:
    - name: istio-gateway
      kind: Gateway
      namespace: istio-system
      sectionName: tcp-kafka
  rules:
    - backendRefs:
        - name: kafka
          port: 9092
          namespace: kafka
          kind: Service

5. StatefulSet

创建StatefulSet,有三个副本也就是三个pod

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kafka
  labels:
    app: kafka
spec:
  podManagementPolicy: Parallel
  replicas: 3
  selector:
    matchLabels:
      app: kafka
      app.kubernetes.io/version: 4.1.0
  serviceName: kafka-hl
  template:
    metadata:
      labels:
        app: kafka
        app.kubernetes.io/version: 4.1.0
    spec:
      containers:
        - name: kafka
          image: apache/kafka:4.1.0
          imagePullPolicy: "IfNotPresent"
          command:
            - sh
            - -c
            - |
              BASE_ID=$(echo "$KAFKA_META_NAME" | grep -oE '[0-9]+')

              # +1 得到最终 Node ID
              export KAFKA_ADVERTISED_LISTENERS="CLIENT://$NODE_IP:9092"
              export KAFKA_NODE_ID=$((BASE_ID + 1))

              echo "Pod Name = $KAFKA_META_NAME"
              echo "BASE_ID = $BASE_ID"
              echo "KAFKA_NODE_ID = $KAFKA_NODE_ID"

              /etc/kafka/docker/run
          env:
            - name: KAFKA_META_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: NODE_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
          envFrom:
            - configMapRef:
                name: kafka-env
          ports:
            - name: controller
              containerPort: 9093
            - name: client
              containerPort: 9092
          startupProbe:
            failureThreshold: 3
            initialDelaySeconds: 30
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 5
            tcpSocket:
              port: 9092
          livenessProbe:
            failureThreshold: 3
            initialDelaySeconds: 30
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 5
            tcpSocket:
              port: 9092
          volumeMounts:
            - name: time
              mountPath: /etc/localtime
            - name: data
              mountPath: /var/lib/kafka/data
            - name: logs
              mountPath: /opt/kafka/logs
            - name: conf
              mountPath: /opt/kafka/config/kafka_server_jaas.conf
              subPath: kafka_server_jaas.conf
      volumes:
        - name: time
          hostPath:
            path: /etc/localtime
            type: File
        - name: data
          hostPath:
            path: /data/kafka/data
            type: DirectoryOrCreate
        - name: logs
          hostPath:
            path: /data/kafka/logs
            type: DirectoryOrCreate
        - name: conf
          configMap:
            name: kafka-conf
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - podAffinityTerm:
                labelSelector:
                  matchLabels:
                    app: kafka
                topologyKey: kubernetes.io/hostname
              weight: 1

许可协议:  CC BY 4.0
分享

相关文章

下一篇

达梦数据库主备集群搭建

上一篇

最近更新

  • kafka Kraft模式k8s集群搭建
  • 达梦数据库主备集群搭建
  • RocketMQ proxy
  • 凝思操作系统开启系统日志
  • kubernetes学习记录

热门标签

Halo

目录

©2025 春夏秋冬平平安安. 保留部分权利。

使用 Halo 主题 Chirpy