< img height="1" width="1" style="display:none;" alt="" src="https://px.ads.linkedin.com/collect/?pid=3131724&fmt=gif" />

步骤 1:使用 Vector 将数据写入 Kafka

部署 KSE 并安装日志相关组件

KubeSphere 中需要安装的扩展组件:

  • RadonDB DMP

  • OpenSearch 分布式检索与分析引擎

  • WhizardTelemetry 平台服务

  • WhizardTelemetry 数据流水线

  • WhizardTelemetry 日志

  • WhizardTelemetry 审计

  • WhizardTelemetry 通知

  • WhizardTelemetry 事件

禁用 OpenSearch Sink

在安装部署 WhizardTelemetry 日志、WhizardTelemetry 审计、WhizardTelemetry 事件以及 WhizardTelemetry 通知前,需要禁用这些扩展组件配置中 opensearch 的 sink。

以安装 WhizardTelemetry 审计扩展组件为例,将 sinks.opensearch.enabled 设置为 false。

vector

配置 Kafka

在 KubeSphere 中,安装 RadonDB DMP 扩展组件后,点击顶部导航栏上的grid图标,然后点击 RadonDB DMP 进入数据库管理平台,创建 Kafka 集群以用于收集日志。

vector

vector

启用自动创建 topic

点击 Kafka 集群名称,进入参数管理页签,启用自动创建 topic 的功能。

vector

vector

说明

在 Kafka 集群的详情页左侧可获取 Kafka 的读写地址。

创建 Kafka 用户

  1. 在 Kafka 集群的详情页面,进入 Kafka 用户页签,点击创建开始创建 Kafka 用户。

    vector

  2. 按下图所示设置用户权限。

    vector

获取证书

查看证书相关信息

为了与 Kafka 通信,需要配置相关的证书及文件,具体为 <cluster>-cluster-ca-cert,以及上一个步骤中创建的用户的 user.p12 字段及密码,详细信息可在 KubeSphere Web 控制台界面上查询,如下所示。

  1. 点击页面上方的工作台 > 集群管理,进入 host 集群。

  2. 在左侧导航栏选择配置 > 保密字典

  3. 保密字典页面,搜索 cluster-ca-cert,点击 Kafka 集群对应的保密字典进入详情页面,查看 ca-crt 字段的信息。

    vector

  4. 保密字典页面,搜索已创建的 Kafka 用户的名称,点击其对应的保密字典进入详情页面,查看 user.p12user.password 字段的信息。

    vector

生成证书

  1. 在 Kafka 所在集群的节点上,执行以下命令。

    说明

    kafka cluster 为 Kafka 集群的名称,kafka namespace 为 Kafka 所在的 namespace,kafka user 为之前创建的 Kafka 用户。

    export kafka_cluster=< kafka cluster >
    export kafka_namespace=< kafka namespace >
    export kafka_user=< kafka user >
    echo -e "apiVersion: v1\ndata:" > kafka-ca.yaml
    echo "  ca.crt: $(kubectl get secret -n $kafka_namespace ${kafka_cluster}-cluster-ca-cert  \
    -o jsonpath='{.data.ca\.crt}')" >> kafka-ca.yaml
    echo -e "kind: Secret\nmetadata:\n  name: kafka-agent-cluster-ca\n  labels:\n    logging.whizard.io/certification: 'true'\n    logging.whizard.io/vector-role: Agent\n  \
    namespace: kubesphere-logging-system\ntype: Opaque" >> kafka-ca.yaml
    echo "---" >> kafka-ca.yaml
    echo -e "apiVersion: v1\ndata:" >> kafka-ca.yaml
    echo "  user.p12: $(kubectl get secret -n $kafka_namespace ${kafka_user}  \
    -o jsonpath='{.data.user\.p12}')" >> kafka-ca.yaml
    echo -e "kind: Secret\nmetadata:\n  name: kafka-agent-user-ca\n  labels:\n    logging.whizard.io/certification: 'true'\n    logging.whizard.io/vector-role: Agent\n  \
    namespace: kubesphere-logging-system\ntype: Opaque" >> kafka-ca.yaml

    此命令会生成 kafka-ca.yaml 文件,包含 kafka-agent-cluster-ca 以及 kafka-agent-user-ca 两个 secret 文件,分别含有上一个步骤中的 ca.crt 以及 user.p12 信息。示例如下:

    apiVersion: v1
    data:
      ca.crt: xxx
    kind: Secret
    metadata:
      name: kafka-agent-cluster-ca
      labels:
        logging.whizard.io/certification: 'true'
        logging.whizard.io/vector-role:  Agent
      namespace: kubesphere-logging-system
    type: Opaque
    ---
    apiVersion: v1
    data:
      user.p12: xxxx
    kind: Secret
    metadata:
      name: kafka-agent-user-ca
      labels:
        logging.whizard.io/certification: 'true'
        logging.whizard.io/vector-role: Agent
      namespace: kubesphere-logging-system
  2. kafka-ca.yaml 文件复制到需要收集日志数据的集群节点上,执行以下命令。

    kubectl apply -f kafka-ca.yaml

    此命令会在 kubesphere-logging-system 项目下创建 kafka-agent-cluster-ca 以及 kafka-agent-user-ca 两个 secret 文件。vector-config 会自动加载这两个 secret,并且在 vector 中配置相关证书。

创建 Kafka 日志接收器

cat <<EOF | kubectl apply -f -
kind: Secret
apiVersion: v1
metadata:
  name: vector-agent-auditing-sink-kafka
  namespace: kubesphere-logging-system
  labels:
    logging.whizard.io/component: auditing
    logging.whizard.io/enable: 'true'
    logging.whizard.io/vector-role: Agent
  annotations:
    kubesphere.io/creator: admin
stringData:
  sink.yaml: >-
    sinks:
      kafka_auditing:
        type: "kafka"
        topic: "vector-{{ .cluster }}-auditing"
        # 逗号分隔的 Kafka bootstrap servers 如:"10.14.22.123:9092,10.14.23.332:9092"
        bootstrap_servers: "172.31.73.214:32239"
        librdkafka_options:
          security.protocol: "ssl"
          ssl.endpoint.identification.algorithm: "none"
          ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
          ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
          ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
        encoding:
          codec: "json"
        inputs:
        - auditing_remapped
        batch:
          max_events: 100
          timeout_secs: 10
type: Opaque

---
kind: Secret
apiVersion: v1
metadata:
  name: vector-agent-events-sink-kafka
  namespace: kubesphere-logging-system
  labels:
    logging.whizard.io/component: events
    logging.whizard.io/enable: 'true'
    logging.whizard.io/vector-role: Agent
  annotations:
    kubesphere.io/creator: admin
stringData:
  sink.yaml: >-
    sinks:
      kafka_events:
        type: "kafka"
        topic: "vector-{{ .cluster }}-events"
        bootstrap_servers: "172.31.73.214:32239"
        librdkafka_options:
          security.protocol: "ssl"
          ssl.endpoint.identification.algorithm: "none"
          ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
          ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
          ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
        encoding:
          codec: "json"
        inputs:
        - kube_events_remapped
        batch:
          max_events: 100
          timeout_secs: 10
type: Opaque

---
kind: Secret
apiVersion: v1
metadata:
  name: vector-agent-logs-sink-kafka
  namespace: kubesphere-logging-system
  labels:
    logging.whizard.io/component: logs
    logging.whizard.io/enable: 'true'
    logging.whizard.io/vector-role: Agent
  annotations:
    kubesphere.io/creator: admin
stringData:
  sink.yaml: >-
    sinks:
      kafka_logs:
        type: "kafka"
        topic: "vector-{{ .cluster }}-logs"
        bootstrap_servers: "172.31.73.214:32239"
        librdkafka_options:
          security.protocol: "ssl"
          ssl.endpoint.identification.algorithm: "none"
          ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
          ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
          ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
        encoding:
          codec: "json"
        inputs:
        - kube_logs_remapped
        - systemd_logs_remapped
        batch:
          max_events: 100
          timeout_secs: 10
type: Opaque

---
apiVersion: v1
kind: Secret
metadata:
  name: vector-aggregator-notification-history-sink-kafka
  namespace: kubesphere-logging-system
  labels:
    logging.whizard.io/component: "notification-history"
    logging.whizard.io/vector-role: Aggregator
    logging.whizard.io/enable: "true"
stringData:
  sink.yaml: >-
    sinks:
      kafka_notification_history:
        type: "kafka"
        topic: "vector-{{ .cluster }}-notification-history"
        bootstrap_servers: "172.31.73.214:32239"
        librdkafka_options:
          security.protocol: "ssl"
          ssl.endpoint.identification.algorithm: "none"
          ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
          ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
          ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
        encoding:
          codec: "json"
        inputs:
        - notification_history_remapped
        batch:
          max_events: 100
          timeout_secs: 10
type: Opaque
EOF

Receive the latest news, articles and updates from KubeSphere


Thanks for the feedback. If you have a specific question about how to use KubeSphere, ask it on Slack. Open an issue in the GitHub repo if you want to report a problem or suggest an improvement.