< img height="1" width="1" style="display:none;" alt="" src="https://px.ads.linkedin.com/collect/?pid=3131724&fmt=gif" />

步骤 2:使用 Vector 从 Kafka 读取数据

部署 vector aggregator

在 Kafka 所在集群,执行以下命令创建 vector namespace,并在该 namespace 下部署 vector-aggregator。

说明
  • 请勿将 vector aggregator 部署到 kubesphere-logging-system namespace, 以免与 KubeSphere 内置的 vector aggregator 冲突。

  • 请联系 KubeSphere 交付服务专家获取 vector aggregator 的 helm 包。

helm install vector-aggregator aggregator-0.30.0.tgz -n vector --create-namespace --set vectorConfig.image.tag=v0.2.1 --set image.tag=0.36.0-debian

所需镜像:

docker.io/timberio/vector:0.36.0-debian docker.io/kubesphere/kubectl:v1.26.13 docker.io/kubesphere/vector-config:v0.2.1

获取证书

  1. 在 Kafka 所在集群的节点上,执行以下命令。

    说明

    kafka cluster 为 Kafka 集群的名称,kafka namespace 为 Kafka 所在的 namespace,kafka user 为之前创建的 Kafka 用户。

    export kafka_cluster=< kafka cluster > export kafka_namespace=< kafka namespace > export kafka_user=< kafka user > echo -e "apiVersion: v1\ndata:" > kafka-aggregator-ca.yaml echo " ca.crt: $(kubectl get secret -n $kafka_namespace ${kafka_cluster}-cluster-ca-cert \ -o jsonpath='{.data.ca\.crt}')" >> kafka-aggregator-ca.yaml echo -e "kind: Secret\nmetadata:\n name: kafka-aggregator-cluster-ca\n labels:\n logging.whizard.io/certification: 'true'\n logging.whizard.io/vector-role: Aggregator\n \ namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml echo "---" >> kafka-aggregator-ca.yaml echo -e "apiVersion: v1\ndata:" >> kafka-aggregator-ca.yaml echo " user.p12: $(kubectl get secret -n $kafka_namespace ${kafka_user} \ -o jsonpath='{.data.user\.p12}')" >> kafka-aggregator-ca.yaml echo -e "kind: Secret\nmetadata:\n name: kafka-aggregator-user-ca\n labels:\n logging.whizard.io/certification: 'true'\n logging.whizard.io/vector-role: Aggregator\n \ namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml

    此命令会生成 kafka-aggregator-ca.yaml 文件,包含 kafka-aggregator-cluster-ca 以及 kafka-aggregator-user-ca 两个 secret 文件,分别含有上一个步骤中的 ca.crt 以及 user.p12 信息。 示例如下:

    apiVersion: v1 data: ca.crt: xxx kind: Secret metadata: name: kafka-aggregator-cluster-ca labels: logging.whizard.io/certification: 'true' logging.whizard.io/vector-role: Aggregator namespace: vector type: Opaque --- apiVersion: v1 data: user.p12: xxx kind: Secret metadata: name: kafka-aggregator-user-ca labels: logging.whizard.io/certification: 'true' logging.whizard.io/vector-role: Aggregator namespace: vector type: Opaque

配置 vector-aggregator,将消息发送至 OpenSearch

创建 vector 配置,在 bootstrap_servers 填入相应的 Kafka 集群地址,在 sink:kafka_to_opensearch:endpoints 填入相应的 OpenSearch 地址。

cat <<EOF | kubectl apply -f - apiVersion: v1 kind: Secret metadata: name: vector-aggregator-opensearch namespace: vector labels: logging.whizard.io/vector-role: Aggregator logging.whizard.io/enable: "true" stringData: kafka-pipeline.yaml: >- sources: kafka_source: type: "kafka" group_id: "ks" topics: [ "^(vector)-.+" ] bootstrap_servers: "172.31.53.102:32476" librdkafka_options: security.protocol: "ssl" ssl.endpoint.identification.algorithm: "none" ssl.ca.location: "/etc/vector/custom/certification/ca.crt" ssl.keystore.location: "/etc/vector/custom/certification/user.p12" ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o" max.poll.interval.ms: "600000" partition.assignment.strategy: roundrobin decoding: codec: json session_timeout_ms: 20000 socket_timeout_ms: 90000 transforms: kafka_remapped: inputs: - kafka_source source: |- .event.original = encode_json(.) ts = parse_timestamp!(.timestamp, format: "%+") .timestamp = format_timestamp!(ts, format: "%+", timezone: "local") .topictime = to_unix_timestamp(ts, unit: "milliseconds") .logstamp = from_unix_timestamp!(.topictime, unit: "milliseconds") .logdate = .timestamp .idxdate = format_timestamp!(ts, format: "%Y.%m.%d", timezone: "local") tmp = split!(.topic, "-") .index = join!(remove!(tmp, [0]), "-") type: remap sinks: kafka_to_opensearch: api_version: v8 auth: password: admin strategy: basic user: admin batch: timeout_secs: 5 buffer: max_events: 10000 endpoints: - https://<opensearch-url>:<port> tls: verify_certificate: false type: elasticsearch inputs: - kafka_remapped bulk: index: "{{ .index }}-%Y.%m.%d" request: timeout_sec: 180 type: Opaque EOF

Receive the latest news, articles and updates from KubeSphere