flink operator v1.10对接华为云对象存储OBS

news/2025/2/26 9:52:32

1 概述

flink operator及其flink集群,默认不直接支持华为云OBS,需要在这些java程序的插件目录放一个jar包,以及修改flink配置后,才能支持集成华为云OBS。
相关链接参考:

https://support.huaweicloud.com/bestpractice-obs/obs_05_1516.html

2 环境准备

2.1 华为云kubernetes集群

准备一个kubernetes集群,如下图所示:
在这里插入图片描述

flink_operator_helm_13">2.2 flink operator helm包下载地址

https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz

2.3 cert-manager yaml文件下载地址

https://github.com/jetstack/cert-manager/releases/download/v1.17.2/cert-manager.yaml

flink_25">2.4 准备flink应用示例

https://github.com/apache/flink/tree/master/flink-examples

flink官方示例的代码编译成jar包,再上传到对象存储OBS,如下图所示:
在这里插入图片描述
这些jar包存放在华为云OBS对象存储上,flink operator和可以通过OBS协议拉取jar包,最终提交给flink集群,并且flink集群的jobmanager、flink taskmanager也能读写OBS对象存储

3 部署

3.1 安装cert-manager

此组件是flink operator webhook的一个依赖,因此先安装它。

cd /tmp
wget https://github.com/jetstack/cert-manager/releases/download/v1.17.1/cert-manager.yaml
kubectl apply -f cert-manager.yaml

在这里插入图片描述

3.2 安装helm二进制工具

cd /tmp
wget https://get.helm.sh/helm-v3.16.2-linux-amd64.tar.gz
tar xf helm-v3.16.2-linux-amd64.tar.gz
cd linux-amd64
/bin/cp -f helm /usr/bin/
helm env

flink_operator_54">3.3 部署flink operator

下载fink operator的helm包,解压文件,最后通过helm命令将它部署在flink namespace中。

cd /tmp
wget https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz
tar xf flink-kubernetes-operator-1.10.0-helm.tgz

修改flink-kubernetes-operator/values.yaml文件,在文件的defaultConfiguration.flink-conf.yaml字段下新增如下内容:

defaultConfiguration:
  flink-conf.yaml: |+
    fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
    fs.obs.access.key: *********你的ak*********
    fs.obs.secret.key: *********你的sk*********
    fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com     # 这是对象存储端点,依据实际情况填写

部署k8s资源,命令如下:

helm upgrade --install flink-operator -n flink --create-namespace \
--set image.repository=swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator \
--set image.tag=1.10.0 \
./flink-kubernetes-operator/

我将flink-obs的jar包放入到镜像swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45中,此镜像是公共镜像,大家可随意拉取使用。

接着,更新operator deployment(需要使用initContainer和obs-plugin的volume的挂载),直接kubectl apply如下内容即可:

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    meta.helm.sh/release-name: flink-operator
    meta.helm.sh/release-namespace: flink
  generation: 4
  labels:
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: flink-kubernetes-operator
    app.kubernetes.io/version: 1.10.0
    helm.sh/chart: flink-kubernetes-operator-1.10.0
  name: flink-kubernetes-operator
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/name: flink-kubernetes-operator
  strategy:
    type: Recreate
  template:
    metadata:
      annotations:
        kubectl.kubernetes.io/default-container: flink-kubernetes-operator
      creationTimestamp: null
      labels:
        app.kubernetes.io/name: flink-kubernetes-operator
    spec:
      initContainers:
      - image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45
        name: sidecar
        command: ["sh"]
        args: [
                "-c",
                "mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"
        ]
        volumeMounts:
        - name: obs-plugin
          mountPath: /opt/flink/plugins/obs-fs-hadoop
      containers:
      - command:
        - /docker-entrypoint.sh
        - operator
        env:
        - name: OPERATOR_NAMESPACE
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.namespace
        - name: HOST_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.hostIP
        - name: POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        - name: POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        - name: OPERATOR_NAME
          value: flink-kubernetes-operator
        - name: FLINK_CONF_DIR
          value: /opt/flink/conf
        - name: FLINK_PLUGINS_DIR
          value: /opt/flink/plugins
        - name: LOG_CONFIG
          value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
        - name: JVM_ARGS
        image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0
        imagePullPolicy: IfNotPresent
        livenessProbe:
          failureThreshold: 3
          httpGet:
            path: /
            port: health-port
            scheme: HTTP
          initialDelaySeconds: 30
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 1
        name: flink-kubernetes-operator
        ports:
        - containerPort: 8085
          name: health-port
          protocol: TCP
        resources: {}
        securityContext: {}
        startupProbe:
          failureThreshold: 30
          httpGet:
            path: /
            port: health-port
            scheme: HTTP
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 1
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /opt/flink/conf
          name: flink-operator-config-volume
        - mountPath: /opt/flink/artifacts
          name: flink-artifacts-volume
        - name: obs-plugin
          mountPath: /opt/flink/plugins/obs-fs-hadoop
      - command:
        - /docker-entrypoint.sh
        - webhook
        env:
        - name: WEBHOOK_KEYSTORE_PASSWORD
          valueFrom:
            secretKeyRef:
              key: password
              name: flink-operator-webhook-secret
        - name: WEBHOOK_KEYSTORE_FILE
          value: /certs/keystore.p12
        - name: WEBHOOK_KEYSTORE_TYPE
          value: pkcs12
        - name: WEBHOOK_SERVER_PORT
          value: "9443"
        - name: LOG_CONFIG
          value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
        - name: JVM_ARGS
        - name: FLINK_CONF_DIR
          value: /opt/flink/conf
        - name: FLINK_PLUGINS_DIR
          value: /opt/flink/plugins
        - name: OPERATOR_NAMESPACE
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.namespace
        image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0
        imagePullPolicy: IfNotPresent
        name: flink-webhook
        resources: {}
        securityContext: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /certs
          name: keystore
          readOnly: true
        - mountPath: /opt/flink/conf
          name: flink-operator-config-volume
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext:
        runAsGroup: 9999
        runAsUser: 9999
      serviceAccount: flink-operator
      serviceAccountName: flink-operator
      terminationGracePeriodSeconds: 30
      volumes:
      - configMap:
          defaultMode: 420
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-operator.properties
            path: log4j-operator.properties
          - key: log4j-console.properties
            path: log4j-console.properties
          name: flink-operator-config
        name: flink-operator-config-volume
      - emptyDir: {}
        name: flink-artifacts-volume
      - name: keystore
        secret:
          defaultMode: 420
          items:
          - key: keystore.p12
            path: keystore.p12
          secretName: webhook-server-cert
      - name: obs-plugin
        emptyDir: {}

flink_session_cluster_268">3.4 部署flink session cluster

kubectl apply以下资源即可部署一个flink session集群,文件内容如下:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-session-cluster
  namespace: flink
spec:
  image: swr.cn-south-1.myhuaweicloud.com/migrator/flink:1.19
  flinkVersion: v1_19
  flinkConfiguration:
    fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
    fs.obs.access.key: *********你的ak*********
    fs.obs.secret.key: *********你的sk*********
    fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com   # 这是对象存储端点,依据实际情况填写
  jobManager:
    resource:
      memory: "2048m"
      cpu: 2
  taskManager:
    resource:
      memory: "2048m"
      cpu: 2
  serviceAccount: flink
  podTemplate:
    spec:
      volumes:
      - name: obs-plugin
        emptyDir: {}
      containers:
      # Do not change the main container name
      - name: flink-main-container
        volumeMounts:
        - name: obs-plugin
          mountPath: /opt/flink/plugins/obs-fs-hadoop
      initContainers:
      - image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45
        name: sidecar
        command: ["sh"]
        args: [
                "-c",
                "mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"
        ]
        volumeMounts:
        - name: obs-plugin
          mountPath: /opt/flink/plugins/obs-fs-hadoop

在这里插入图片描述

flink_318">4 提交flink作业

kubectl apply以下资源即可:

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: basic-session-job-example
  namespace: flink
spec:
  deploymentName: flink-session-cluster
  job:
    jarURI: obs://你的桶/StateMachineExample.jar    # jar包的位置,按实际情况填写
    parallelism: 1

在这里插入图片描述
可见flink作业是running状态,说明jar包被flink operator从华为云对象存储OBS拉取下来并提交到flink集群中。
继续查看flink operator日志,可以看见obs相关的信息:
在这里插入图片描述

小结

本文介绍flink operator及其管理的flink集群是如何对接到华为云对象存储OBS,对接完成后,不仅可以将作业的jar包存储在对象存储,也可以将flink作业的状态、输入输出等存储在对象存储


http://www.niftyadmin.cn/n/5868507.html

相关文章

论文笔记(七十二)Reward Centering(三)

Reward Centering(三) 文章概括摘要3 基于值的奖励中心化4 案例研究: 以奖励为中心的 Q-learning5 讨论、局限性与未来工作致谢 文章概括 引用: article{naik2024reward,title{Reward Centering},author{Naik, Abhishek and Wan…

洛谷 P8705:[蓝桥杯 2020 省 B1] 填空题之“试题 E :矩阵” ← 卡特兰数

【题目来源】 https://www.luogu.com.cn/problem/P8705 【题目描述】 把 1∼2020 放在 21010 的矩阵里。要求同一行中右边的比左边大,同一列中下边的比上边的大。一共有多少种方案? 答案很大,你只需要给出方案数除以 2020 的余数即可。 【答案提交】 …

ChatGPT免费背后的技术暗战 国产数字孪生如何打造“虚实共生”新生态?

当ChatGPT搜索功能向全球免费开放,AI技术的平民化时代正式来临。在这场看似“让利”的商业策略背后,实则是全球科技话语权的重新洗牌。国产厂商如何在这场博弈中占据主动?数字孪生技术的场景化落地提供了破局方向。据中国信通院认证&#xff…

TCP,http,WebSocket

TCP(Transmission Control Protocol,传输控制协议)和HTTP(HyperText Transfer Protocol,超文本传输协议)都是网络通信中的重要协议,但它们在网络协议栈的不同层次上工作,各自负责不同…

校园的网络安全

🍅 点击文末小卡片 ,免费获取网络安全全套资料,资料在手,涨薪更快 1、什么是端口安全 端口安全(Port Security),从基本原理上讲,Port Security特性会通过MAC地址表记录连接到交换机…

ReentrantLock 用法与源码剖析笔记

📒 ReentrantLock 用法与源码剖析笔记 🚀 一、ReentrantLock 核心特性 🔄 可重入性:同一线程可重复获取锁(最大递归次数为 Integer.MAX_VALUE)🔧 公平性:支持公平锁(按等…

Linux之loop设备(Loop Devices in Linux)

Linux之loop设备 在Linux/Unix系统中,loop设备是一项非常实用的技术,它允许我们将普通文件作为块设备来使用。今天,让我们深入了解loop设备的工作原理及其应用场 一、Loop设备概述 Loop设备(loop device)是一种虚拟块设备,它能…

JS宏进阶:浅谈曲线回归

曲线回归是一种统计学方法,用于研究两个或多个变量之间的非线性关系,并找到最能拟合数据点的曲线函数形式。与线性回归不同,曲线回归适用于描述那些不是直线性的变量关系。通过曲线回归,可以建立变量之间的非线性数学模型,用于预测和解释各种实际现象。 一、基本概念 定…