关于jobslot的信息

http://www.itjxue.com  2023-02-21 13:49  来源:未知  点击次数: 

Flink 作为一套分布式执行框架,计算资源可以不断的扩展。

不同的任务类型,可以控制需要的计算资源。在flink整个runtime的模型中

并行度是一个很重要的概念,通过设置并行度可以为认为分配合理的计算资源,

做到资源的合理配置。

整个flink的架构简单的说是 中心控制(jobManager)+ 多点分布执行(taskManager)

弹性的资源分配主要来自于taskManager的有效管理和配置。

在启动flink 之前,在核心的配置文件里面,需要指定两个参数。

taskmanager.numberOfTaskSlots 和 parallelism.default。

首先需要明白slot的概念。对于 taskManager,他其实是一个 JVM 程序。

这个JVM 可以同时执行多个task,每个task 需要使用本机的硬件资源。

slot 的属于 jvm 管理的 一些列资源卡槽。 每个slot 只能执行一个task。

每个slot分配有固定的内存资源,但是不做cpu的隔离。 JVM管理一个 slot的pool,

用来执行相应的task。taskmanager.numberOfTaskSlots = 10,则理论上可以同时执行10个子任务。

那么对于1个5节点,numberOfTaskSlots= 6的集群来说,那么就有30个slot可以使用。

对于具体的一个job来说,他会贪婪的使用所有的 slot吗?

使用多少slot 是由parallelism.default 决定的。如果是 5, 那么对于一个job 他最多同时使用5个slot。

这个配置对于多job平台的集群是很有必要的。

那么给定一个stream api 编写的flink 程序,被分解的task是否和map 到slot 上执行的呢?

flink 有几个经典的graph, stream-api对应的stream_graph- job_graph-execution_graph-物理执行图。

execution_graph 基本就决定了如何分布执行。

我们知道一个 stream-api, 主要有 source, operate, sink 这几部分。那么我们可以从source开始看 并行的控制。

source 有并行source和 非并行。我们主要看并行,想类似与kafka 这种生成消费者模式的数据源,能够 并行消费source是非常重要的。

所以可以看到kafka,FlinkKafkaConsumerBaseT extends RichParallelSourceFunctionT,可以充分利用并行度,大大提高吞吐量。

对应到具体的物理执行上,就是多个 source task 任务执行,他们属于一个kafka group同时消费 不同的partition。

对于parallelSource,默认使用cpu 核心做并行度。我们可以通过api进行设置。

接下来是 operate,每个operate都可以设置parallel,如果没有设置将会使用其他层次的设置,比如env,flink.conf中的配置,parallelism.default。

比如 source. map1().map2().grouby(key).sink()

这样一个程序,默认,source和 map1,map2有同样的parallel,上游的output 可以直接one-one forwarding.

在flink 的 优化中,甚至可以把这些 one-one 的operate 合成一个,避免转发,线程切换,网络通信开销。

对于groupby 这样的算子,则属于另外的一类。上游的output 需要 partion 到下游的不同的节点,而不能做位一个chain。

由于operate可以设置独自的parallel,如果与上游不一致。上游的output必然需要某种partion策略来 rebalnce数据。kafka有很多策略来处理这个细节。

对于partion放在专门的章节来说明。

对于sink,则可以理解位一个特定的operate,目前看没什么特殊处理逻辑。

Flink集群架构

Flink采用Master-Slave架构,其中JobManager作为集群Master节点,主要负责任务协调和资源分配,TaskWorker作为Salve节点,用于执行流task。除了JobManager和TaskManager,还有一个重要的角色就是Client。Client虽然不是Flink Cluster 运行态的一部分,但也是Flink重要组件之一,用来提交流任务。

Flink集群之间的通信,是通过Akka Actor System来进行管控通信的。包括Client-JobManager和JobManager-TaskManager,而TaskManager之间的数据交换,是基于Netty实现的。

Client主要作用是将批或流应用程序编译成Dataflow Graph(也就是JobGraph),然后将其提交给JobManager。详细来看,Client主要功能如下:

JobManager负责协调Flink application的分布式执行,比如task调度、Checkpoint协调、Failover协调等等。具体功能如下:

其中JobManager内部主要功能组件如下:

一个Flink Cluster至少有一个JobManager,在高可用部署模式下,可以有多个JobManager,但是只能有一个JobManager为leader,其它都为standby。

TaskManager主要用于执行Dataflow的task,并且缓冲和交换数据流。TaskManager中的task slot是集群的最小资源调度单位。TaskManager中的task slot数量,代表了该TaskManager所能并发处理的task数量。

TaskManager的主要功能如下:

上面的Client、JobManager和TaskManager中都有一个相同的组件,就是Actor System。Akka Actor System用于节点之间消息传输。

对于分布式任务执行,Flink会将能够chain到一起的operator放到一个Task中来执行,每个Task由一个Thread来执行。

将可以chain到一起的operator放到一个task执行,是一种非常有效的优化手段。因为它能够减少线程到线程的切换开销和缓存开销,能够降低延迟的同时增加吞吐量。

上图是Application Dataflow的JobGraph,最上面是Dataflow的逻辑视图JobGraph,下面是带有并发语义的JobGraph。Task代表Dataflow中operator执行任务,而SubTask代表同一Operator(或Chain operator)的并发任务,比如上面的source-map chain operator代表一个task,source-map[1]代表该task的subtask。

Flink集群中的每个TaskManager是一个JVM进程,TaskManager能够执行一个或多个task。而TaskManager能够执行多少task,就是通过task slot来表示的。

每个task slot代表TaskManager中的固定资源子集,比如TaskManager中有3个task slot,则每个task slot所分配的资源为TaskManager所管理内存的1/3。需要注意的是,这里只隔离了内存,像CPU、I/O等资源都没有做隔离。

如果一个TaskManger只有一个task slot的话,意味着每个task group(之所以称为组,是因为task slot会被共享)是JVM进程级别的隔离。而一个TaskManager如果有多个Task slot,则这些task之间能够共享JVM资源,比如TPC链接、心跳信息等;同时也可以共享数据集和数据结构,从而减少每个task的负载。

对于默认情况下,Flink是允许不同task的subtask共享slot的,只要它们属于同一job即可。通过共享slot,一个slot就可以容纳一个job的整个pipeline,比如下面第一个TaskManager中的第一个Task Slot,被source-map[1]、keyby-window[1]和sink[1]整个pipeline所共享,这样整个最大限度的减少数据跨线程/进程的数据通信。

共享slot除了可能执行整个pipline外,还有以下两个优点:

Flink源码4-Slot分配和Task执行

接上期:——》JobMaster#startJobExecution()

resetAndStartScheduler();

——》JobMaster#resetAndStartScheduler

schedulerAssignedFuture.thenRun(this::startScheduling);

——》JobMaster#startScheduling()

schedulerNG.startScheduling();

SchedulerBase# startScheduling();

startAllOperatorCoordinators();

* 注释: 开始调度

*/

startSchedulingInternal();

DefaultScheduler#startSchedulingInternal

schedulingStrategy.startScheduling();

LazyFromSourcesSchedulingStrategy#startScheduling();

* 注释: 申请 Slot 并且开始 部署 ExecutionVertices

*/

allocateSlotsAndDeployExecutionVertices(schedulingTopology.getVertices());

——》LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices

* 注释: 申请 slot 和 部署

* schedulerOperations = DefaultScheduler

*/

schedulerOperations.allocateSlotsAndDeploy(Collections.

DefaultScheduler#allocateSlotsAndDeploy()

*******来到正式入口:DefaultScheduler#allocateSlotsAndDeploy()***********

流程:

1、JobMaster 发送请求申请 slot

2、ResourceManager 接收到请求,执行 slot请求处理

3、TaskManager 处理 ResourceManager 发送过来的 Slot 请求

4、JobMaster 接收到 TaskManager 发送过来的 Slot 申请处理结果

—— 5 》DefaultScheduler#allocateSlotsAndDeploy()

* 1 注释: 申请Slot

final ListSlotExecutionVertexAssignment slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);

.............

* 2 注释: 部署运行

*/

waitForAllSlotsAndDeploy(deploymentHandles);

——先走 1 》DefaultScheduler#allocateSlots() 0:21

* 注释: 申请Slot

*/

final ListSlotExecutionVertexAssignment

slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);

——》DefaultScheduler#allocateSlots()

——4 》DefaultExecutionSlotAllocator#allocateSlotsFor

OptionalConsumer.of(findMatchingSlot(resourceProfile))

.ifPresent(

101:) taskManagerSlot - allocateSlot(taskManagerSlot, pendingSlotRequest))

.ifNotPresent(() - fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));

pendingTaskManagerSlotOptional = allocateResource(resourceProfile);

——》 SlotManagerImpl# allocateResource();

if(!resourceActions.allocateResource(defaultWorkerResourceSpec))

ResourceManager#ResourceActionsImpl#allocateResource()

// 这里2个实现 ,第一个 StandaloneResourceManager 直接返回false

// 第二个 startNewWorker ,申请新的 YarnContainer

return startNewWorker(workerResourceSpec);

返回上面101:) taskManagerSlot - allocateSlot

—— 》 SlotManagerImpl#allocateSlot() 1:18:00

* 注释: 申请 slot

CompletableFutureAcknowledge requestFuture = gateway

.requestSlot(slotId, pendingSlotRequest.getJobId(),

102:) TaskExecutor#requestSlot()

allocateSlot(slotId, jobId, allocationId, resourceProfile);

——》 TaskExecutor#allocateSlot

if(taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId

TaskSlotTableImpl#allocateSlot()

slots.add(allocationId);

▲▲▲▲回到上面102:)行方法 ▲▲▲▲▲▲▲▲

——》 TaskExecutor#offerSlotsToJobManager

—— 》 TaskExecutor# internalOfferSlotsToJobManager

acceptedSlotsFuture .whenCompleteAsync(handleAcceptedSlotOffers

——》 TaskExecutor#handleAcceptedSlotOffers()

▲▲▲▲回到上面 ——4 》▲▲ 1:48

——4 》DefaultExecutionSlotAllocator#allocateSlotsFor()

入口:

—— 5 》DefaultScheduler#waitForAllSlotsAndDeploy()

—— 》DefaultScheduler#deployAll()

—— 》DefaultScheduler#deployOrHandleError()

CompletableFuture.supplyAsync(() - taskManagerGateway.submitTask

(deployment, rpcTimeout), executor).

thenCompose(Function.identity())

—— 》RpcTaskManagerGateway#submitTask()

—— 》TaskExecutor#submitTask()

* 注释: 提交 Task 让 TaskManager 启动

* 第一个参数: TaskDeploymentDescriptor 包含启动当前这个 Task 所需要的一切信息

* 注释: 构建 Task

* 内部会初始化一个执行线程。一个Task 是线程级别的执行粒度

*/

Task task = new Task(jobInformation, taskInformation, tdd.getExecutionAttemptId(),

Flink运行模式

? ??在idea中运行Flink程序的方式就是开发模式。

? ??Flink中的Local-cluster(本地集群)模式,单节点运行,主要用于测试, 学习。

????????独立集群模式,由Flink自身提供计算资源。

把Flink应用提交给Yarn的ResourceManager

Flink会根据运行在JobManger上的job的需要的slot的数量动态的分配TaskManager资源

Yarn又分3种模式

Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。以后提交任务都向这里提交。

这个Flink集群会常驻在yarn集群中,除非手工停止。

在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.

缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job.

所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job.

一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。

每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

Per-job模式执行结果,一个job对应一个Application

Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.

与Per-Job-Cluster的区别:就是Application Mode下, 用户的main函数式在集群中执行的,并且当一个application中有多个job的话,per-job模式则是一个job对应一个yarn中的application,而Application Mode则这个application中对应多个job。

Application Mode模式执行结果,多个job对应一个Application

官方建议:

出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!

0.Flink任务提交后,Client向HDFS上传Flink的Jar包和配置

1.向Yarn ResourceManager提交任务,ResourceManager分配Container资源

2.通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager(Dispatcher)

2.1.Dispatcher启动JobMaster

3.JobMaster向ResourceManager(Flink)申请资源

4.ResourceManager(Flink)向ResourceManager(Yarn)申请资源启动TaskManager

5.ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager

6.TaskManager注册Slot

7.发出提供Slot命令

8.TaskManager向JobMaster提供Slot

9.JobMaster提交要在Slot中执行的任务

毛线小精灵slot是什么意思

《毛线小精灵(Unravel)》是由独立工作室Coldwood制作的一款动作冒险游戏,在游戏中,玩家要操控这个用毛线编织而成的小人,在探索和解谜的过程中,保证其不至于松散开来,并用毛线将游戏当中的物件重新连接一起。

slot就是槽的意思,是一个资源单位,只有给task分配了一个slot之后,这个task才可以运行。slot分两种,map slot沪蓉reduce slot。另外,slot是一个逻辑概念,一个数据节点的slots数量既不是CPU的核数,也不是memory chip,一个节点的slot数量用来表示此节点的资源容量或是计算计算能力的大小,也就是说slot是hadoop的资源单位。

2)

系统中map slots总数与reduce slots总数的计算方式如下:

map slots 总数=集群节点数*mapred.tasktracker.map.tasks.maximum(默认是2);

reduce slots 总数=集群节点数*mapred.tasktracker.reduce.tasks.maximum(默认是2);

当初以为slots就是节点上cpu的核数,一直搞不懂三个数据节点,总共5个cpu核,为什么会有6个map slots 、6个reduce slots:

默认一个节点上有2个map slots,2个reduce slots。而一个slot对应一个task,所以,map task capacity、reduce task capacity的值是6。Avg.Tasks/Node就是一个节点的平均任务数量,2个map+2个reduce就是4个。

运行的job包含42个map,由于集群总的map slots是6,所以会有22等待的map任务。

(责任编辑:IT教学网)

更多

推荐Mail服务器文章