由于现在k8s的生态越来越好了,很多的大数据组件都已经支持了k8s的环境了,大部分的公司还是用的大数据原生的环境部署和运行的,不过如果还想继续压缩部署和运行的成本,不妨在做新的产品或者项目的时候可以考虑尝试下k8s环境下的大数据部署和运行。下面就简单和快速的体验下flink on k8s的体验。

如果有兴趣也可以直接看flink的官网介绍 https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/resource-providers/standalone/kubernetes.html

环境

其实主要就是有一个可以运行的k8s的环境就可以了

这里强烈建议使用 https://www.sealyun.com/ 这个快速一键安装k8s集群工具,尤其是刚接触k8s的同学。

部署

开始 Session Mode 模式的一个部署体验,一个Flink Session集群至少需要3个组件来完成:

  1. 一个Pod运行 JobManager 组件
  2. 一个Pod运行 TaskManagers 组件
  3. 一个Pod运行 WebUI和API

创建4个yaml配置文件:

  1. flink-configuration-configmap.yaml

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: flink-config
      labels:
        app: flink
    data:
      flink-conf.yaml: |+
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 2
        blob.server.port: 6124
        jobmanager.rpc.port: 6123
        taskmanager.rpc.port: 6122
        queryable-state.proxy.ports: 6125
        jobmanager.memory.process.size: 1600m
        taskmanager.memory.process.size: 1728m
        parallelism.default: 2
      log4j-console.properties: |+
        rootLogger.level = INFO
        rootLogger.appenderRef.console.ref = ConsoleAppender
        rootLogger.appenderRef.rolling.ref = RollingFileAppender
       
        logger.akka.name = akka
        logger.akka.level = INFO
        logger.kafka.name= org.apache.kafka
        logger.kafka.level = INFO
        logger.hadoop.name = org.apache.hadoop
        logger.hadoop.level = INFO
        logger.zookeeper.name = org.apache.zookeeper
        logger.zookeeper.level = INFO
       
        appender.console.name = ConsoleAppender
        appender.console.type = CONSOLE
        appender.console.layout.type = PatternLayout
        appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
       
        appender.rolling.name = RollingFileAppender
        appender.rolling.type = RollingFile
        appender.rolling.append = false
        appender.rolling.fileName = ${sys:log.file}
        appender.rolling.filePattern = ${sys:log.file}.%i
        appender.rolling.layout.type = PatternLayout
        appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
        appender.rolling.policies.type = Policies
        appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
        appender.rolling.policies.size.size=100MB
        appender.rolling.strategy.type = DefaultRolloverStrategy
        appender.rolling.strategy.max = 10
       
        logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
        logger.netty.level = OFF
    
  2. jobmanager-service.yaml

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    apiVersion: v1
    kind: Service
    metadata:
      name: flink-jobmanager
    spec:
      type: ClusterIP
      ports:
      - name: rpc
        port: 6123
      - name: blob-server
        port: 6124
      - name: webui
        port: 8081
      selector:
        app: flink
        component: jobmanager
    
  3. jobmanager-session-deployment.yaml

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-jobmanager
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: flink
          component: jobmanager
      template:
        metadata:
          labels:
            app: flink
            component: jobmanager
        spec:
          containers:
          - name: jobmanager
            image: apache/flink:1.12.0-scala_2.11
            args: ["jobmanager"]
            ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
            livenessProbe:
              tcpSocket:
                port: 6123
              initialDelaySeconds: 30
              periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
          volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config
              items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
    
  4. taskmanager-session-deployment.yaml

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-taskmanager
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: flink
          component: taskmanager
      template:
        metadata:
          labels:
            app: flink
            component: taskmanager
        spec:
          containers:
          - name: taskmanager
            image: apache/flink:1.12.0-scala_2.11
            args: ["taskmanager"]
            ports:
            - containerPort: 6122
              name: rpc
            - containerPort: 6125
              name: query-state
            livenessProbe:
              tcpSocket:
                port: 6122
              initialDelaySeconds: 30
              periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf/
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
          volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config
              items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
    

执行kubectl命令执行脚本:

  1. 可以逐条的执行

    1
    2
    3
    4
    
    kubectl apply -f flink-configuration-configmap.yaml
    kubectl apply -f jobmanager-service.yaml
    kubectl apply -f jobmanager-session-deployment.yaml
    kubectl apply -f taskmanager-session-deployment.yaml
    
  2. 可以一起执行,需要4条在一个目录下

    1
    
    kubectl apply -f .
    

查看结果是否成功:

1
kubectl get pod

如上图所示都是running,说明已经都成功了。

提示:

如果不成功,主要原因都是拉取镜像不成功,可以考虑用VPN去拉取镜像在导入到k8s集群中,生成环境可以考虑用本地化的容器仓库。

测试

临时开放出端口用来测试,执行命令:

1
2
kubectl port-forward --address=0.0.0.0 实际运行的jobmanager的pod的名称 8081:8081
# --address可以让局域网内的机器访问,比如你自己的电脑

可以访问webui就可以愉快的跑flink的脚本了。

总结

虽然只是一个简单的例子,意义是很大的,因为有了k8s的生态和优秀的调度等等优点,至少目前可以搭建一套非常稳定和高性能的计算的集群,前提就是大家的架构是计算和存储是分开的才会比较的明显。

我们的目标就是简化在简化大数据的部署和运行成本。

有一些东西是需要大家一起去折腾的,不然都是道听途说。💪