关于jobslot的信息
flink 并行度
Flink 作为一套分布式执行框架,计算资源可以不断的扩展。
不同的任务类型,可以控制需要的计算资源。在flink整个runtime的模型中
并行度是一个很重要的概念,通过设置并行度可以为认为分配合理的计算资源,
做到资源的合理配置。
整个flink的架构简单的说是 中心控制(jobManager)+ 多点分布执行(taskManager)
弹性的资源分配主要来自于taskManager的有效管理和配置。
在启动flink 之前,在核心的配置文件里面,需要指定两个参数。
taskmanager.numberOfTaskSlots 和 parallelism.default。
首先需要明白slot的概念。对于 taskManager,他其实是一个 JVM 程序。
这个JVM 可以同时执行多个task,每个task 需要使用本机的硬件资源。
slot 的属于 jvm 管理的 一些列资源卡槽。 每个slot 只能执行一个task。
每个slot分配有固定的内存资源,但是不做cpu的隔离。 JVM管理一个 slot的pool,
用来执行相应的task。taskmanager.numberOfTaskSlots = 10,则理论上可以同时执行10个子任务。
那么对于1个5节点,numberOfTaskSlots= 6的集群来说,那么就有30个slot可以使用。
对于具体的一个job来说,他会贪婪的使用所有的 slot吗?
使用多少slot 是由parallelism.default 决定的。如果是 5, 那么对于一个job 他最多同时使用5个slot。
这个配置对于多job平台的集群是很有必要的。
那么给定一个stream api 编写的flink 程序,被分解的task是否和map 到slot 上执行的呢?
flink 有几个经典的graph, stream-api对应的stream_graph- job_graph-execution_graph-物理执行图。
execution_graph 基本就决定了如何分布执行。
我们知道一个 stream-api, 主要有 source, operate, sink 这几部分。那么我们可以从source开始看 并行的控制。
source 有并行source和 非并行。我们主要看并行,想类似与kafka 这种生成消费者模式的数据源,能够 并行消费source是非常重要的。
所以可以看到kafka,FlinkKafkaConsumerBaseT extends RichParallelSourceFunctionT,可以充分利用并行度,大大提高吞吐量。
对应到具体的物理执行上,就是多个 source task 任务执行,他们属于一个kafka group同时消费 不同的partition。
对于parallelSource,默认使用cpu 核心做并行度。我们可以通过api进行设置。
接下来是 operate,每个operate都可以设置parallel,如果没有设置将会使用其他层次的设置,比如env,flink.conf中的配置,parallelism.default。
比如 source. map1().map2().grouby(key).sink()
这样一个程序,默认,source和 map1,map2有同样的parallel,上游的output 可以直接one-one forwarding.
在flink 的 优化中,甚至可以把这些 one-one 的operate 合成一个,避免转发,线程切换,网络通信开销。
对于groupby 这样的算子,则属于另外的一类。上游的output 需要 partion 到下游的不同的节点,而不能做位一个chain。
由于operate可以设置独自的parallel,如果与上游不一致。上游的output必然需要某种partion策略来 rebalnce数据。kafka有很多策略来处理这个细节。
对于partion放在专门的章节来说明。
对于sink,则可以理解位一个特定的operate,目前看没什么特殊处理逻辑。

Flink集群架构
Flink采用Master-Slave架构,其中JobManager作为集群Master节点,主要负责任务协调和资源分配,TaskWorker作为Salve节点,用于执行流task。除了JobManager和TaskManager,还有一个重要的角色就是Client。Client虽然不是Flink Cluster 运行态的一部分,但也是Flink重要组件之一,用来提交流任务。
Flink集群之间的通信,是通过Akka Actor System来进行管控通信的。包括Client-JobManager和JobManager-TaskManager,而TaskManager之间的数据交换,是基于Netty实现的。
Client主要作用是将批或流应用程序编译成Dataflow Graph(也就是JobGraph),然后将其提交给JobManager。详细来看,Client主要功能如下:
JobManager负责协调Flink application的分布式执行,比如task调度、Checkpoint协调、Failover协调等等。具体功能如下:
其中JobManager内部主要功能组件如下:
一个Flink Cluster至少有一个JobManager,在高可用部署模式下,可以有多个JobManager,但是只能有一个JobManager为leader,其它都为standby。
TaskManager主要用于执行Dataflow的task,并且缓冲和交换数据流。TaskManager中的task slot是集群的最小资源调度单位。TaskManager中的task slot数量,代表了该TaskManager所能并发处理的task数量。
TaskManager的主要功能如下:
上面的Client、JobManager和TaskManager中都有一个相同的组件,就是Actor System。Akka Actor System用于节点之间消息传输。
对于分布式任务执行,Flink会将能够chain到一起的operator放到一个Task中来执行,每个Task由一个Thread来执行。
将可以chain到一起的operator放到一个task执行,是一种非常有效的优化手段。因为它能够减少线程到线程的切换开销和缓存开销,能够降低延迟的同时增加吞吐量。
上图是Application Dataflow的JobGraph,最上面是Dataflow的逻辑视图JobGraph,下面是带有并发语义的JobGraph。Task代表Dataflow中operator执行任务,而SubTask代表同一Operator(或Chain operator)的并发任务,比如上面的source-map chain operator代表一个task,source-map[1]代表该task的subtask。
Flink集群中的每个TaskManager是一个JVM进程,TaskManager能够执行一个或多个task。而TaskManager能够执行多少task,就是通过task slot来表示的。
每个task slot代表TaskManager中的固定资源子集,比如TaskManager中有3个task slot,则每个task slot所分配的资源为TaskManager所管理内存的1/3。需要注意的是,这里只隔离了内存,像CPU、I/O等资源都没有做隔离。
如果一个TaskManger只有一个task slot的话,意味着每个task group(之所以称为组,是因为task slot会被共享)是JVM进程级别的隔离。而一个TaskManager如果有多个Task slot,则这些task之间能够共享JVM资源,比如TPC链接、心跳信息等;同时也可以共享数据集和数据结构,从而减少每个task的负载。
对于默认情况下,Flink是允许不同task的subtask共享slot的,只要它们属于同一job即可。通过共享slot,一个slot就可以容纳一个job的整个pipeline,比如下面第一个TaskManager中的第一个Task Slot,被source-map[1]、keyby-window[1]和sink[1]整个pipeline所共享,这样整个最大限度的减少数据跨线程/进程的数据通信。
共享slot除了可能执行整个pipline外,还有以下两个优点:
Flink源码4-Slot分配和Task执行
接上期:——》JobMaster#startJobExecution()
resetAndStartScheduler();
——》JobMaster#resetAndStartScheduler
schedulerAssignedFuture.thenRun(this::startScheduling);
——》JobMaster#startScheduling()
schedulerNG.startScheduling();
↓
SchedulerBase# startScheduling();
▼
startAllOperatorCoordinators();
* 注释: 开始调度
*/
startSchedulingInternal();
↓
DefaultScheduler#startSchedulingInternal
schedulingStrategy.startScheduling();
↓
LazyFromSourcesSchedulingStrategy#startScheduling();
▼
* 注释: 申请 Slot 并且开始 部署 ExecutionVertices
*/
allocateSlotsAndDeployExecutionVertices(schedulingTopology.getVertices());
——》LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices
▼
* 注释: 申请 slot 和 部署
* schedulerOperations = DefaultScheduler
*/
schedulerOperations.allocateSlotsAndDeploy(Collections.
↓
DefaultScheduler#allocateSlotsAndDeploy()
*******来到正式入口:DefaultScheduler#allocateSlotsAndDeploy()***********
流程:
1、JobMaster 发送请求申请 slot
2、ResourceManager 接收到请求,执行 slot请求处理
3、TaskManager 处理 ResourceManager 发送过来的 Slot 请求
4、JobMaster 接收到 TaskManager 发送过来的 Slot 申请处理结果
—— 5 》DefaultScheduler#allocateSlotsAndDeploy()
▼
* 1 注释: 申请Slot
final ListSlotExecutionVertexAssignment slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
.............
* 2 注释: 部署运行
*/
waitForAllSlotsAndDeploy(deploymentHandles);
——先走 1 》DefaultScheduler#allocateSlots() 0:21
* 注释: 申请Slot
*/
final ListSlotExecutionVertexAssignment
slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
——》DefaultScheduler#allocateSlots()
↓
——4 》DefaultExecutionSlotAllocator#allocateSlotsFor
▼
OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(
101:) taskManagerSlot - allocateSlot(taskManagerSlot, pendingSlotRequest))
.ifNotPresent(() - fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
——》 SlotManagerImpl# allocateResource();
if(!resourceActions.allocateResource(defaultWorkerResourceSpec))
↓
ResourceManager#ResourceActionsImpl#allocateResource()
// 这里2个实现 ,第一个 StandaloneResourceManager 直接返回false
// 第二个 startNewWorker ,申请新的 YarnContainer
return startNewWorker(workerResourceSpec);
返回上面101:) taskManagerSlot - allocateSlot
—— 》 SlotManagerImpl#allocateSlot() 1:18:00
▼
* 注释: 申请 slot
CompletableFutureAcknowledge requestFuture = gateway
.requestSlot(slotId, pendingSlotRequest.getJobId(),
↓
102:) TaskExecutor#requestSlot()
▼
allocateSlot(slotId, jobId, allocationId, resourceProfile);
——》 TaskExecutor#allocateSlot
if(taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId
↓
TaskSlotTableImpl#allocateSlot()
slots.add(allocationId);
▲▲▲▲回到上面102:)行方法 ▲▲▲▲▲▲▲▲
——》 TaskExecutor#offerSlotsToJobManager
—— 》 TaskExecutor# internalOfferSlotsToJobManager
▼
acceptedSlotsFuture .whenCompleteAsync(handleAcceptedSlotOffers
——》 TaskExecutor#handleAcceptedSlotOffers()
▲▲▲▲回到上面 ——4 》▲▲ 1:48
——4 》DefaultExecutionSlotAllocator#allocateSlotsFor()
▼
入口:
—— 5 》DefaultScheduler#waitForAllSlotsAndDeploy()
—— 》DefaultScheduler#deployAll()
▼
—— 》DefaultScheduler#deployOrHandleError()
▼
CompletableFuture.supplyAsync(() - taskManagerGateway.submitTask
(deployment, rpcTimeout), executor).
thenCompose(Function.identity())
↓
—— 》RpcTaskManagerGateway#submitTask()
▼
—— 》TaskExecutor#submitTask()
▼
* 注释: 提交 Task 让 TaskManager 启动
* 第一个参数: TaskDeploymentDescriptor 包含启动当前这个 Task 所需要的一切信息
* 注释: 构建 Task
* 内部会初始化一个执行线程。一个Task 是线程级别的执行粒度
*/
Task task = new Task(jobInformation, taskInformation, tdd.getExecutionAttemptId(),
Flink运行模式
? ??在idea中运行Flink程序的方式就是开发模式。
? ??Flink中的Local-cluster(本地集群)模式,单节点运行,主要用于测试, 学习。
????????独立集群模式,由Flink自身提供计算资源。
把Flink应用提交给Yarn的ResourceManager
Flink会根据运行在JobManger上的job的需要的slot的数量动态的分配TaskManager资源
Yarn又分3种模式
Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。以后提交任务都向这里提交。
这个Flink集群会常驻在yarn集群中,除非手工停止。
在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.
缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job.
所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job.
一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
Per-job模式执行结果,一个job对应一个Application
Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.
与Per-Job-Cluster的区别:就是Application Mode下, 用户的main函数式在集群中执行的,并且当一个application中有多个job的话,per-job模式则是一个job对应一个yarn中的application,而Application Mode则这个application中对应多个job。
Application Mode模式执行结果,多个job对应一个Application
官方建议:
出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!
0.Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
1.向Yarn ResourceManager提交任务,ResourceManager分配Container资源
2.通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager(Dispatcher)
2.1.Dispatcher启动JobMaster
3.JobMaster向ResourceManager(Flink)申请资源
4.ResourceManager(Flink)向ResourceManager(Yarn)申请资源启动TaskManager
5.ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
6.TaskManager注册Slot
7.发出提供Slot命令
8.TaskManager向JobMaster提供Slot
9.JobMaster提交要在Slot中执行的任务
毛线小精灵slot是什么意思
《毛线小精灵(Unravel)》是由独立工作室Coldwood制作的一款动作冒险游戏,在游戏中,玩家要操控这个用毛线编织而成的小人,在探索和解谜的过程中,保证其不至于松散开来,并用毛线将游戏当中的物件重新连接一起。
slot就是槽的意思,是一个资源单位,只有给task分配了一个slot之后,这个task才可以运行。slot分两种,map slot沪蓉reduce slot。另外,slot是一个逻辑概念,一个数据节点的slots数量既不是CPU的核数,也不是memory chip,一个节点的slot数量用来表示此节点的资源容量或是计算计算能力的大小,也就是说slot是hadoop的资源单位。
2)
系统中map slots总数与reduce slots总数的计算方式如下:
map slots 总数=集群节点数*mapred.tasktracker.map.tasks.maximum(默认是2);
reduce slots 总数=集群节点数*mapred.tasktracker.reduce.tasks.maximum(默认是2);
当初以为slots就是节点上cpu的核数,一直搞不懂三个数据节点,总共5个cpu核,为什么会有6个map slots 、6个reduce slots:
默认一个节点上有2个map slots,2个reduce slots。而一个slot对应一个task,所以,map task capacity、reduce task capacity的值是6。Avg.Tasks/Node就是一个节点的平均任务数量,2个map+2个reduce就是4个。
运行的job包含42个map,由于集群总的map slots是6,所以会有22等待的map任务。