Flink心跳服务流程-创新互联

之前了解到的 Flink 的心跳服务都比较浅显,只知道 在 Flink 中心跳服务是由 ReourceManager 发送给 JobMaster 和 TaskExecutor 以及 JobMaster 发送给 TaskExecutor。 然后 TaskExecutor 返回相关的Slot等数据给 ResouceManager。所以一直以为 心跳服务是 Akka 的 ask 进行传递的。 但是查看相关源码发现和我的理解有些出入。并且在最开始查看源码的时候发现,Flink 对心跳服务封装的比较好,定义的接口在很多地方都是匿名的实现,所以一开始看的时候很容易混淆,搞不清楚整个心跳的流程,下面用ResourceManager和TaskManager的心跳服务 来简单聊一聊 Flink 中心跳服务的流程。
下面是心跳服务类的继承关系
在这里插入图片描述

创新互联成立10年来,这条路我们正越走越好,积累了技术与客户资源,形成了良好的口碑。为客户提供网站设计、成都网站制作、网站策划、网页设计、域名与空间、网络营销、VI设计、网站改版、漏洞修补等服务。网站是否美观、功能强大、用户体验好、性价比高、打开快等等,这些对于网站建设都非常重要,创新互联通过对建站技术性的掌握、对创意设计的研究为客户提供一站式互联网解决方案,携手广大客户,共同发展进步。

最核心的类就是HeaderbeatManager接口的实现类HeartbeatManagerImpl类。其中实现了接收到心跳请求和接受到心跳的代码。它的子类HeartbeatManagerSendImpl继承了Runnable接口,用于定期触发心跳请求。
HeartbeatManagerImpl中有一个存放HeartbeatMonitor对象的 Map 集合。
HeartbeatMonitor类主要是记录心跳的时间,判断心跳是否超时。在构造HeartbeatMonitor的时候需要传入一个HeartbeatTarget接口的实现对象。
HeartbeatTarget接口定义的是接受到心跳请求后的操作和接收到心跳的操作。 该接口的实现类主要在两个地方,一个是在添加 Motitor 时的匿名对象,比如在RM添加对 TaskManager 监听时会传入一个实现了HeartbeatTarget 接口的匿名对象。一个是在HeartbeatManagerSendImpl中的实现。这个地方我最开始看源码时特别容易混淆。HeartbeatManagerSendImpl中的requestHeartbeat()方法是接收到心跳请求后的处理,receiveHeartbeat()是接收到心跳后的处理。 在匿名对象中的requestHeartbeat()是发送心跳请求的动作(e.g. RM向TM发送心跳请求)而receiveHeartbeat()则是实现了 接收到心跳请求后发送心跳的动作 (e.g. TM 就收到RM的心跳请求,向RM发送心跳及需要汇报的信息)

下面是ResourceManager 和 TaskExecutor 的心跳服务的流程
在这里插入图片描述

RM 心跳服务的创建与调度

在最开始,ResourceManager 服务启动的时会创建两个 心跳服务管理对象, RM用来管理TaskManager的心跳服务的对象名叫taskManagerHeartbeatManager

private void startHeartbeatServices() {taskManagerHeartbeatManager =
            heartbeatServices.createHeartbeatManagerSender(
                    resourceId,
                    new TaskManagerHeartbeatListener(),
                    getMainThreadExecutor(),
                    log);

    jobManagerHeartbeatManager =
            heartbeatServices.createHeartbeatManagerSender(
                    resourceId,
                    new JobManagerHeartbeatListener(),
                    getMainThreadExecutor(),
                    log);
}

一个用来管理 TaskManager 的心跳通信,一个用来管理 JobManager 的心跳通信。这两个对象都是HeartbeatManagerSenderImpl对象。在HeartbeatManagerSenderImpl的构造方法中就会启动定时任务。

public class HeartbeatManagerSenderImplextends HeartbeatManagerImplimplements Runnable {
    HeartbeatManagerSenderImpl(ScheduledExecutor mainThreadExecutor, ...) {super(heartbeatTimeout,...mainThreadExecutor);
    
            this.heartbeatPeriod = heartbeatPeriod;
        	// 开始任务调度
            mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
        }

    @Override
    public void run() {if (!stopped) {log.debug("Trigger heartbeat request.");
            for (HeartbeatMonitorheartbeatMonitor : getHeartbeatTargets().values()) {requestHeartbeat(heartbeatMonitor);
            }
        	// 设置新的任务调度
            getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
        }
    }
}

mainThreadExecutor是一个RpcEndpoint的静态内部类,这里使用它的schedule()方法来实现定时任务调度。schedule()接受一个Runnable接口的对象,而HeartbeatManagerSenderImpl就实现了Runnable接口,所以,在定时任务被触发时就会执行HeartbeatManagerSenderImpl#run()方法。 在run()方法中,会继续设置一个新的定时任务,这样不断地循环。这里默认的延迟时间为 10000 毫秒。

schedule()方法实现任务的延迟执行主要是通过给 Actor 发送一条异步任务的消息,该消息会带上延迟执行的时间。 在这里就是 ResourceManager 给自己的 Acotr 发送了一条延迟消息。

@Override
    public void scheduleRunAsync(Runnable runnable, long delayMillis) {if (isLocal) {// 计算任务调度的时间
            long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
        	// 向自己发送一条 异步任务处理 的消息
            tell(new RunAsync(runnable, atTimeNanos));
        } 
    }

在 ResourceManager 的 Actor 接收到这条消息的时候,会判断任任务是否需要立即执行,如果是延迟执行,则会使用 Akka 的 ActorSystem.scheduler() 来定时执行该任务。

private void handleRunAsync(RunAsync runAsync) {final long timeToRun = runAsync.getTimeNanos();
        final long delayNanos;
    	// 如果接收到的任务已经到达任务的执行时间则立即执行
        if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime())<= 0) {// run immediately
            try {runAsync.getRunnable().run();
            } catch (Throwable t) {log.error("Caught exception while executing runnable in main thread.", t);
                ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
            }
            // 如果没有到达任务的执行时间,则发送一条新的延迟消息给自己
        } else {// schedule for later. send a new message after the delay, which will then be
            // immediately executed
            FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
            RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);

            final Object envelopedSelfMessage = envelopeSelfMessage(message);

            getContext()
                    .system()
                    .scheduler()
                    .scheduleOnce(
                            delay,
                            getSelf(),
                            envelopedSelfMessage,
                            getContext().dispatcher(),
                            ActorRef.noSender());
        }
    }
心跳监听对象的添加与触发

heartbeatMonitor 对象的添加是在 TaskManger 启动后,向 ResourceManager 注册时调用HeartbeatManagerSenderImpl#monitorTarget()方法添加的。 添加的时候会传入一个HeartbeatTarget 接口的匿名实现类。 该实现类就定义了触发心跳请求时的操作。下面代码中就定义了RM向TaskManager发送心跳时需要怎么做,但是接收心跳请求的方法

private RegistrationResponse registerTaskExecutorInternal(
        TaskExecutorGateway taskExecutorGateway,
		TaskExecutorRegistration taskExecutorRegistration) {// 向 RM的TaskManager心跳管理服务 添加心跳监听对象
    taskManagerHeartbeatManager.monitorTarget(
    taskExecutorResourceId,
    new HeartbeatTarget() {@Override
        public void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the
            // TaskManager
        }

        @Override
        public void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);
        }
    });
}

HeartbeatManagerSenderImplrun()方法中,会遍历所有的正在监视的 heartbeatMonitor 对象,并调用 在添加监视时传入的heartbeatTarget匿名对象的requestHeartbeat()方法,就像上面代码一样。所以在RM向TaskManager 发送心跳请求的时候 是通过 调用taskExecutorGateway的heartbeatFromResourceManager() 发送了 RPC 请求

TaskManager 心跳服务创建与监听对象添加

在 TM 服务启动的时候同样也会创建一个心跳服务来管理与RM之间的心跳

  this.resourceManagerHeartbeatManager =
                createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
// ============

    publicHeartbeatManagercreateHeartbeatManager(
            ResourceID resourceId,
            HeartbeatListenerheartbeatListener,
            ScheduledExecutor mainThreadExecutor,
            Logger log) {return new HeartbeatManagerImpl<>(
                heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
    }

在TM中创建的就是HeartbeatManagerImpl对象,因为TM并不需要发送心跳请求,所以不是创建HeartbeatManagerSenderImpl对象。

TM 向 RM 注册成功后,会添加一个对 RM 的监听对象

// monitor the resource manager as heartbeat target
        resourceManagerHeartbeatManager.monitorTarget(
                resourceManagerResourceId,
                new HeartbeatTarget() {@Override
                    public void receiveHeartbeat(
                            ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {resourceManagerGateway.heartbeatFromTaskManager(
                                resourceID, heartbeatPayload);
                    }

                    @Override
                    public void requestHeartbeat(
                            ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {// the TaskManager won't send heartbeat requests to the ResourceManager
                    }
                });

在这里,HeartbeatTarget 匿名对象中,receiveHeartbeat() 就是向RM 发送心跳并附带上汇报信息,而requestHeartbeat 是空的,因为 TM 不会向 RM 发送心跳请求。

TaskManager 接受心跳请求并发送心跳

回到之前,RM 调用taskExecutorGateway的heartbeatFromResourceManager方法,通过RPC方式发送了心跳请求。 在TaskExecutor类中的heartbeatFromResourceManager方法就会被调用。并传入了RM 的 resourceID。

@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

resourceManagerHeartbeatManager 就是 TM 时创建的HeartbeatManagerImpl对象,所以这里调用的requestHeartbeat() 方法是HeartbeatManagerImpl中的方法。

public class HeartbeatManagerImplimplements HeartbeatManager{// 接受到RM的心跳请求
	@Override
    public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat request from {}.", requestOrigin);
            // 汇报心跳,清除HeartbeatMonitor中的超时Future
            final HeartbeatTargetheartbeatTarget = reportHeartbeat(requestOrigin);

            if (heartbeatTarget != null) {if (heartbeatPayload != null) {heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
                }

                heartbeatTarget.receiveHeartbeat(
                        getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
            }
        }
    }
}

在这个方法中 首先会调用reportHeartbeat方法.

HeartbeatTargetreportHeartbeat(ResourceID resourceID) {if (heartbeatTargets.containsKey(resourceID)) {// 通过 RM 的reosurceID 找到 TM对RM的监听器
            HeartbeatMonitorheartbeatMonitor = heartbeatTargets.get(resourceID);
            // 重新设置 监听器的超时时间
            heartbeatMonitor.reportHeartbeat();

            return heartbeatMonitor.getHeartbeatTarget();
        } else {return null;
        }
    }

之后就会调用之前创建监听器时的匿名对象的方法来通过RPC调用向RM发送心跳数据。

resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);

之后又回到了RM

RM 接受TM的心跳数据

在 TM 发送 RPC 请求后,ResourceManager 类中的heartbeatFromTaskManager()方法会被调用。该方法只有一行代码

@Override
public void heartbeatFromTaskManager(
        final ResourceID resourceID, final TaskExecutorHeartbeatPayload heartbeatPayload) {taskManagerHeartbeatManager.receiveHeartbeat(resourceID, heartbeatPayload);
}

所以在这里,会调用 RM 管理 TM 的心跳服务对象(HeartbeatManagerSenderImpl) 的receiveHeartbeat()方法。

@Override
public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat from {}.", heartbeatOrigin);
        reportHeartbeat(heartbeatOrigin);

        if (heartbeatPayload != null) {heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
        }
    }
}

这里首先会调用reportHeartbeat()来重新设置 在 RM 中对 TM 的监听器的超时时间。 然后调用heartbeatListener来处理TM 传过来的数据。

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


当前标题:Flink心跳服务流程-创新互联
路径分享:http://scyanting.com/article/gdhpg.html