< img height="1" width="1" style="display:none;" alt="" src="https://px.ads.linkedin.com/collect/?pid=3131724&fmt=gif" />

步骤 2:使用 Vector 从 Kafka 读取数据

部署 vector aggregator

在 Kafka 所在集群,执行以下命令创建 vector namespace,并在该 namespace 下部署 vector-aggregator。

说明
  • 请勿将 vector aggregator 部署到 kubesphere-logging-system namespace, 以免与 KubeSphere 内置的 vector aggregator 冲突。

  • 请联系 KubeSphere 交付服务专家获取 vector aggregator 的 helm 包。

helm install vector-aggregator aggregator-0.30.0.tgz -n vector --create-namespace --set vectorConfig.image.tag=v0.2.1 --set image.tag=0.36.0-debian

所需镜像:

docker.io/timberio/vector:0.36.0-debian
docker.io/kubesphere/kubectl:v1.26.13
docker.io/kubesphere/vector-config:v0.2.1

获取证书

  1. 在 Kafka 所在集群的节点上,执行以下命令。

    说明

    kafka cluster 为 Kafka 集群的名称,kafka namespace 为 Kafka 所在的 namespace,kafka user 为之前创建的 Kafka 用户。

    export kafka_cluster=< kafka cluster >
    export kafka_namespace=< kafka namespace >
    export kafka_user=< kafka user >
    echo -e "apiVersion: v1\ndata:" > kafka-aggregator-ca.yaml
    echo "  ca.crt: $(kubectl get secret -n $kafka_namespace ${kafka_cluster}-cluster-ca-cert  \
    -o jsonpath='{.data.ca\.crt}')" >> kafka-aggregator-ca.yaml
    echo -e "kind: Secret\nmetadata:\n  name: kafka-aggregator-cluster-ca\n  labels:\n    logging.whizard.io/certification: 'true'\n    logging.whizard.io/vector-role: Aggregator\n  \
    namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml
    echo "---" >> kafka-aggregator-ca.yaml
    echo -e "apiVersion: v1\ndata:" >> kafka-aggregator-ca.yaml
    echo "  user.p12: $(kubectl get secret -n $kafka_namespace ${kafka_user}  \
    -o jsonpath='{.data.user\.p12}')" >> kafka-aggregator-ca.yaml
    echo -e "kind: Secret\nmetadata:\n  name: kafka-aggregator-user-ca\n  labels:\n    logging.whizard.io/certification: 'true'\n    logging.whizard.io/vector-role: Aggregator\n  \
    namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml

    此命令会生成 kafka-aggregator-ca.yaml 文件,包含 kafka-aggregator-cluster-ca 以及 kafka-aggregator-user-ca 两个 secret 文件,分别含有上一个步骤中的 ca.crt 以及 user.p12 信息。 示例如下:

    apiVersion: v1
    data:
      ca.crt: xxx
    kind: Secret
    metadata:
      name: kafka-aggregator-cluster-ca
      labels:
        logging.whizard.io/certification: 'true'
        logging.whizard.io/vector-role: Aggregator
      namespace: vector
    type: Opaque
    ---
    apiVersion: v1
    data:
      user.p12: xxx
    kind: Secret
    metadata:
      name: kafka-aggregator-user-ca
      labels:
        logging.whizard.io/certification: 'true'
        logging.whizard.io/vector-role: Aggregator
      namespace: vector
    type: Opaque

配置 vector-aggregator,将消息发送至 OpenSearch

创建 vector 配置,在 bootstrap_servers 填入相应的 Kafka 集群地址,在 sink:kafka_to_opensearch:endpoints 填入相应的 OpenSearch 地址。

cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
  name: vector-aggregator-opensearch
  namespace: vector
  labels:
    logging.whizard.io/vector-role: Aggregator
    logging.whizard.io/enable: "true"
stringData:
  kafka-pipeline.yaml: >-
    sources:
      kafka_source:
        type: "kafka"
        group_id: "ks"
        topics: [ "^(vector)-.+" ]
        bootstrap_servers: "172.31.53.102:32476"
        librdkafka_options:
          security.protocol: "ssl"
          ssl.endpoint.identification.algorithm: "none"
          ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
          ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
          ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
          max.poll.interval.ms: "600000"
          partition.assignment.strategy: roundrobin
        decoding:
          codec: json
        session_timeout_ms: 20000
        socket_timeout_ms: 90000
    transforms:
      kafka_remapped:
        inputs:
        - kafka_source
        source: |-
          .event.original = encode_json(.)
          ts = parse_timestamp!(.timestamp, format: "%+")
          .timestamp = format_timestamp!(ts, format: "%+", timezone: "local")
          .topictime = to_unix_timestamp(ts, unit: "milliseconds")
          .logstamp = from_unix_timestamp!(.topictime, unit: "milliseconds")
          .logdate = .timestamp
          .idxdate = format_timestamp!(ts, format: "%Y.%m.%d", timezone: "local")
          tmp = split!(.topic, "-")
          .index = join!(remove!(tmp, [0]), "-")
        type: remap
    sinks:
      kafka_to_opensearch:
        api_version: v8
        auth:
          password: admin
          strategy: basic
          user: admin
        batch:
          timeout_secs: 5
        buffer:
          max_events: 10000
        endpoints:
        -  https://<opensearch-url>:<port>
        tls:
          verify_certificate: false
        type: elasticsearch
        inputs:
        - kafka_remapped
        bulk:
          index: "{{ .index }}-%Y.%m.%d"
        request:
          timeout_sec: 180
type: Opaque
EOF

Receive the latest news, articles and updates from KubeSphere


Thanks for the feedback. If you have a specific question about how to use KubeSphere, ask it on Slack. Open an issue in the GitHub repo if you want to report a problem or suggest an improvement.