简单任务调度系统
一 概述
1 运维管理的阶段
1 人工阶段
人工盯着服务器,出了问题,到机器前面,翻日志,查状态,手动操作
创新互联公司从2013年成立,先为牡丹等服务建站,牡丹等地企业,进行企业商务咨询服务。为牡丹企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
2 脚本阶段
开始写一些自动化脚本,启动计划任务,自动启动服务,监控服务等
3 工具阶段
脚本功能太弱,开发了大量工具,某种工具解决某个特定领域的问题,常用的有ansible,puppet等
4 平台阶段
将工具整合,自主研发,实现标准化,实现自动化流程控制,而今,平台已经开始迈向智能化的发展方向。
二 mschedule 设计
1 完整代码链接
https://gitee.com/ChangPaoZhe/mschedule
2要求
1 分发任务
分发脚本到目前节点上去执行
2 控制
控制并发,控制多少个节点同时执行
对错误做出响应,由用户设定,最多允许失败的比例或者数量,当超过范围时,需要终止任务执行
3 能跨机房部署
4 能对作业做版本控制,这是辅助功能,可过后实现
3 项目基本概述
1 基本概述
本项目的出发点,是只需要会使用shell脚本就可以了,可以通过使用shell脚本的方式来完成远程任务的下发和处理流程。
2 其他自动化工具二次开发缺点
ansible,salt等需要学习特定的内部语言,如果觉得ansible这样的工具不能满足需求,二次开发难度过高,代码量不小,本身它们开发接口不完善,而且熟悉它的叫也比较难,就算开发出来维护也难。
从这些项目上二次开发,等于拉一个分支,如果主分支有了新的特性,想要合并也是比较困难的。
自己开发,满足自己需求,完全适合自己需求,代码规模可控,便于他人接收维护。
3 项目初始版本目标
自己开发就是造轮子,造轮子不是不好,其起初要实现的功能应该是比较简单的。后面可以逐步进行完善操作。
4 项目基本架构图
浏览器端和webSERVER端交互是通过HTTP实现的,而WEB server和master server 是通过TCP链接来实现的,master server 和agent之间也是通过TCP 链接来实现的
4 分发任务设计
1 分发任务分类
1 有agent 类
有agent类,被控节点需要安装或运行特殊的软件,用于和服务器端进行通信,服务器端把脚本,命令传递给agent端,由agent端控制来执行
2 无agent类
被控节点不需要安装或者运行特殊软件,如通过SSH来实现,这其实也是有agent的,不过不是自己写的程序
优缺点
1 通用,简单,易实现,但管理不善,容易出现安全问题
2 并行效率不高,有agent的并行执行可以不和管理服务器通信,可以并发很高,ssh执行要和master之间通信
3 ssh链接是有状态的,任务执行的时候,master不能挂了,否则任务将执行失败。
5 执行脚本(subprocess)
python 中有很多运行进程的方式,不过都过时了。
建议使用标准库subprocess模块,启动一个子进程。
1 初始化类源码
def __init__(self, args, bufsize=-1, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS,
shell=False, cwd=None, env=None, universal_newlines=False,
startupinfo=None, creationflags=0,
restore_signals=True, start_new_session=False,
pass_fds=()):
第一个是参数,后面是可选,但shell默认为False,可将其置为True, stdout 后面跟文件或管道
def wait(self, timeout=None, endtime=None):
"""Wait for child process to terminate. Returns returncode
attribute."""
if endtime is not None:
timeout = self._remaining_time(endtime)
if timeout is None:
timeout_millis = _winapi.INFINITE
else:
timeout_millis = int(timeout * 1000)
if self.returncode is None:
result = _winapi.WaitForSingleObject(self._handle,
timeout_millis)
if result == _winapi.WAIT_TIMEOUT:
raise TimeoutExpired(self.args, timeout)
self.returncode = _winapi.GetExitCodeProcess(self._handle)
return self.returncode
此处返回是状态,0为成功,其他为失败
stdout 方法调用的是一个文件,因此可使用文件的形式进行处理
if c2pread != -1:
self.stdout = io.open(c2pread, 'rb', bufsize)
if universal_newlines:
self.stdout = io.TextIOWrapper(self.stdout)
2 基本代码如下
#!/usr/bin/poython3.6
#conding:utf-8
import subprocess
from subprocess import Popen,PIPE
out=Popen("echo 'hello'",shell=True,stdout=PIPE)
code=out.wait(10)
txt=out.stdout.read()
print ("code={} txt={}".format(code,txt.decode()))
结果如下
6 项目基本构建
1 创建文件并添加虚拟环境
mkdir mschedule -p
cd mschedule/
pyenv virtualenv 3.5.3 msch
pyenv local msch
2 构建模块agent,并创建执行程序executor.py
#!/usr/bin/poython3.6
#conding:utf-8
from subprocess import PIPE,Popen
class Executor:
def run(self,script,timeout):
p=Popen(script,shell=True,stdout=PIPE)
code=p.wait(timeout=timeout)
txt=p.stdout.read()
return (code,txt)
if __name__ == "__main__":
exec=Executor()
print (exec.run("echo 'hello'",3))
结果如下
7 agent 和master设计
用户和master server 通信,提交任务,此处是通过HTTP的方式提交任务
master 按照用户要求将任务分发到指定的节点上,这些节点上需要有agent用于和master通信,接受master发布的任务,并执行这些任务
设计agent,越简单越好,越简单bug越少,越稳定。
从本质上来说,master,agent设计是典型的CS编程模式
master作为CS中的server,agent作为CS中的client
8 消息设计
1 注册信息
agent启动后,需要主动连接server,并注册自己
信息包括
hostname:报告自己的主机名称,此主机名称可能会重复UUID,用于唯一标识这台主机
IP: 用于更加方便的管理主机
其它相关信息视情况而定
{
"type": "register", # 此处用于定义消息类型
"payload":{
"id" : uuid, #用于唯一标识一台主机
"hostname": "xxxx", # 对应agent名称
"IP": [], # agent IP地址,其可能包含多个IP地址,因此此处使用列表进行存储
}
}
2 心跳信息
agent定时向master发送心跳包,包含UUID这个唯一标识,附带hostname和ip地址,hostname和ip都可能变动,但agent不变,其UUID便不会发生变化,其他相关信息科一附加, 如更加flag,用于标识agent是否有正在执行的任务。
{
"type": "heartbeat", # 此处用于定义消息类型
"payload":{
"id" : uuid, #用于唯一标识一台主机
"hostname": "xxxx", # 对应agent名称
"IP": [], # agent IP地址,其可能包含多个IP地址,因此此处使用列表进行存储
}
}
3 任务消息
master分派任务给agent,发送任务描述信息到agent。
注意脚本字符串使用base64编码
{
"type" :"task",
"payload" :{
"id" :"task-uuid", # 定义任务的唯一标识
"script" : "base64code", #定义执行任务的内容
"timeout" :0, # 定义超时时长
"parallel" :1, # 定义并行执行数
"fail_rate" :0, # 定义失败率,及百分比为多少代表失败
"fail_count" :-1 # 定义失败的次数为多少次表示失败,-1表示不关心
}
}
4 任务结果消息
当agent任务执行完成后,返回给master该任务执行的状态码和输出结果。
{
"type" :"result",
"payload" :{
"id": "task-uuid", # 定义任务唯一标识
"agent_id": "agent-uuid", #定义任务执行者
"code" : 0, #定义任务执行结果返回值。0 表示成功,其他表示失败
"output" :"base64encode" # 定义任务执行结果,及输出到控制台的结果
}
}
以上的master,agent之间需要传递消息,消息采用json格式。
三 agent端代码实现
1 日志实现
具体代码如下
#!/usr/bin/poython3.6
#conding:utf-8
import logging
def getlogger(mod_name:str,filepath:str='/var/log/mschedule'):
logger=logging.getLogger(mod_name) # 获取名字
logger.setLevel(logging.INFO) # 添加日志级别
logger.propagate=False # 配置不想上传递
handler=logging.FileHandler("{}/{}.log".format(filepath,mod_name))
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s (%(filename)s:L%(lineno)d)",
datefmt='%Y-%m-%d %H:%M:%S')
handler.setFormatter(fmt)
logger.addHandler(handler)
return logger
if __name__ == "__main__":
log = getlogger('test')
log.info('13234545654')
结果如下
2 通信模块实现(zerorpc )
1 介绍和安装
原生的socket编程过于底层,很少使用,任何一门语言都要避开直接使用socket库开发,太过底层,难写难维护。
zeroprc 是基于 ZeroMQ和MessagePack 来实现的通信工具。
官网地址
http://www.zerorpc.io
安装
pip install zerorpc
2 基本代码实现
根目录创建app.py和appserver.py
server 端配置
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
class HelloRPC(object): #定义方法
def hello(self, name):
return "Hello, %s" % name
s = zerorpc.Server(HelloRPC()) # 方法注入
s.bind("tcp://0.0.0.0:8080") # 绑定方法
s.run() # 运行方法
client端配置
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:8080")
print (c.hello("RPC"))
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
import threading
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:8080")
e=threading.Event()
while not e.wait(3):
print(c.hello('test client'))
print ('```````````````')
结果如下
3 注册消息实现
1 uuid唯一主机标识
使用uuid.uuid4().hex 获取一个uuid,一个节点起始运行的时候是没有uuid的,一旦运行会生成一个uuid,并持久化到一个文件中,下次运行先找这个文件,如果文件中有uuid,就直接读取,没有uuid就重新生成并写入到该文件中。
#!/usr/bin/poython3.6
#conding:utf-8
#!/usr/bin/poython3.6
#conding:utf-8
import uuid
print (uuid.uuid4().hex)
print (uuid.uuid4().hex)
print (uuid.uuid4().hex)
结果如下
2 hostname
windows 和Linux 获取主机名称的方式是不同的
可以在所有平台上是使用socket.gethostname()获取主机名。
#!/usr/bin/poython3.6
#conding:utf-8
import socket
print (socket.gethostname())
3 ip 列表
pip install netifaces
netifaces.interfaces() 返回接口列表
netifaces.ifaddresss(interface) 获取指定接口的IP地址,返回相关信息
ip地址判断
#!/usr/bin/poython3.6
#conding:utf-8
import ipaddress
ips=['127.0.0.1','192.168.0.1','169.254.123.1','0.0.0.0','239.168.0.255','224.0.0.1','8.8.8.8']
for ip in ips:
print (ip)
ip=ipaddress.ip_address(ip)
print ('Linklocal {}'.format(ip.is_link_local)) # 169.254地址
print ('回环 {}'.format(ip.is_loopback)) # 回环
print ('多播 {}'.format(ip.is_multicast)) # 多播
print ('公网 {}'.format(ip.is_global)) # 公网,全球范围地址
print ('私有 {}'.format(ip.is_private)) # 私有地址
print ('保留 {}'.format(ip.is_reserved)) # 保留地址
print ('版本 {}'.format(ip.version)) #ipv4地址
print ('----------------------------')
结果如下
#!/usr/bin/poython3.6
#conding:utf-8
import netifaces
print (netifaces.interfaces()) # 获取所有的网卡接口
for i in netifaces.interfaces():
print ('i....',netifaces.ifaddresses(i)) # 使用ifaddress获取端口对应的IP地址
print ()
print ('------------------------------')
print ()
print ('[2]',netifaces.ifaddresses(i)[2]) # 获取字典key为2的对应的值
结果如下
其是一个字典,key为2就是ipv4地址
每一个接口返回的ipv4地址是一个列表,也就是说可以有多个,ipv4地址描述是在addr上
#!/usr/bin/poython3.6
#conding:utf-8
import netifaces
print (netifaces.interfaces()) # 获取所有的网卡接口
for i in netifaces.interfaces():
for p in netifaces.ifaddresses(i)[2]:
if p['addr']:
print ('ip',p['addr']) # 获取ip地址
结果如下
#!/usr/bin/poython3.6
#conding:utf-8
import netifaces
import ipaddress
print (netifaces.interfaces()) # 获取所有的网卡接口
for i in netifaces.interfaces():
for p in netifaces.ifaddresses(i)[2]:
if p['addr']:
ip=ipaddress.ip_address(p['addr']) #获取ip地址
if ip.is_loopback or ip.is_multicast or ip.is_link_local or ip.is_reserved: # 判断IP地址
continue
print (ip)
结果如下
4 注册信息和相关信息处理
在agent文件包中创建msg.py文件,用于存储相关主从信息和配置信息
#!/usr/bin/poython3.6
#conding:utf-8
import socket
import uuid
import netifaces
import ipaddress
import os
class Messgae:
def __init__(self,myidpath):
if os.path.exists(myidpath): # 如果存在
with open(myidpath) as f:
self.id=f.readline().strip()
else:
self.id=uuid.uuid4().hex
with open(myidpath,'w') as f:
f.write(self.id)
def get_ipaddress(self):
address=[]
for p in netifaces.interfaces(): # 获取网口列表
n=netifaces.ifaddresses(p) # 获取字典
if n.get(2): # 查看是否存在ipv4地址
for ip in n[2]: # 此处获取对应列表的值
if ip['addr']: # 查看ip地址是否存在
ip=ipaddress.ip_address(ip['addr'])
if ip.is_reserved or ip.is_multicast or ip.is_link_local or ip.is_loopback:
continue
address.append(str(ip))
return address
def hearbeat(self):
return {
"type" :"hearbeat",
"payload" :{
"ip" : self.get_ipaddress(),
"hostname" : socket.gethostname(),
"id" : self.id
}
}
def reg(self):
return {
"type" :"register",
"payload" :{
"ip" : self.get_ipaddress(),
"hostname" : socket.gethostname(),
"id" : self.id
}
}
if __name__ == "__main__":
msg=Messgae('/var/log/mschedule/uuid')
print (msg.reg())
测试结果如下
5 处理链接相关配置
agent中创建config模块用于添加相关链接服务端IP地址
agent中创建cm 模块用于处理链接相关配置
config.py 配置如下
#!/usr/bin/poython3.6
#conding:utf-8
CONN_URL="tcp://127.0.0.1:9000"
cm.py 模块配置如下
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc #添加模块
import threading # 用于处理中断相关
from .msg import Messgae # 获取消息
from .config import CONN_URL
from utils import getlogger
class Conn_Manager:
def __init__(self,timeout=3):
self.timeout=timeout
self.client=zerorpc.Client()
self.event=threading.Event()
self.message=Messgae('/var/log/mschedule/uuid') # 此处用于初始化消息
self.log=getlogger('agent') # 此处填写相关的log日志名称
def start(self):
self.client.connect(CONN_URL) # 链接处理
self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg()))) # 发送心跳信息
self.client.send(self.message.reg()) #处理注册消息
while not self.event.wait(self.timeout): # 等待的时间
self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hearbeat()))) # 发送心跳信息
def shutdown(self):
self.log.info("关闭操作")
self.client.close()
self.event.set()
agent 中 _init_.py 端配置
#!/usr/bin/poython3.6
#conding:utf-8
from .cm import Conn_Manager
class app:
def __init__(self,timeout):
self.conn=Conn_Manager(timeout)
def start(self):
self.conn.start()
def shutdown(self):
self.conn.shutdown()
全局根目录下 app.py 端配置如下
#!/usr/bin/poython3.6
#conding:utf-8
from agent import app
if __name__ == "__main__":
agent=app(3)
try:
agent.start()
except KeyboardInterrupt:
agent.shutdown()
服务端测试文件appserver 配置如下
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
class HelloRPC(object): #定义方法
def send(self, name):
return "Hello, %s" % name
s = zerorpc.Server(HelloRPC()) # 方法注入
s.bind("tcp://0.0.0.0:9000") # 绑定方法
s.run() # 运行方法
启动结果如下
日志结果如下
处理客户端重连机制
默认的,服务端关闭后,客户端结果如下
处理结果如下
cm.py如下
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc #添加模块
import threading # 用于处理中断相关
from .msg import Messgae # 获取消息
from .config import CONN_URL
from utils import getlogger
class Conn_Manager:
def __init__(self,timeout=3):
self.timeout=timeout
self.client=zerorpc.Client()
self.event=threading.Event()
self.message=Messgae('/var/log/mschedule/uuid') # 此处用于初始化消息
self.log=getlogger('agent') # 此处填写相关的log日志名称
def start(self):
try:
self.client.connect(CONN_URL) # 链接处理
self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg()))) # 发送心跳信息
self.client.send(self.message.reg()) #处理注册消息
while not self.event.wait(self.timeout): # 等待的时间
self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hreadbeat()))) # 发送心跳信息
except Exception as e:
print ('--------------------')
self.event.set()
raise e # 此处是抛出异常到上一级
def shutdown(self):
self.log.info("关闭操作")
self.client.close()
self.event.set()
agent._init_.py 结果如下
#!/usr/bin/poython3.6
#conding:utf-8
from .cm import Conn_Manager
import threading
class app:
def __init__(self,timeout):
self.conn=Conn_Manager(timeout)
self.event=threading.Event()
def start(self):
while not self.event.is_set():
try:
self.conn.start()
except Exception as e:
print('重连')
self.conn.shutdown()
self.event.wait(3)
def shutdown(self):
self.event.set()
self.conn.shutdown()
app.py 如下
#!/usr/bin/poython3.6
#conding:utf-8
from agent import app
if __name__ == "__main__":
agent=app(3)
try:
agent.start()
except KeyboardInterrupt:
agent.shutdown()
结果如下
四 master端实现
1 基本功能
1 TCP Server
绑定端口,启动监听,等待agent链接。
2 信息存储
存储agent列表
存储用户提交的Task列表,用户通过WEB提交的任务信息存储下来。
3 接受注册
将注册信息写入agent列表
接受心跳信息
接受agent端发送的心跳信息
4 派发任务
将用户提交的任务分配到agent端
2 基本代码实现
1 master.config 模块
用于指定服务端绑定IP地址和端口号
#!/usr/bin/poython3.6
#conding:utf-8
MASTER_URL="tcp://0.0.0.0:9000"
if __name__ == "__main__":
pass
2 master.handler 模块
主要负责客户端数据的调度
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
log=getlogger('handler')
class Handler(object):
def send(self,msg): # 定义一个可调用的基础函数
log.info(" ack ok {}".format(msg))
return " ack ok {}".format(msg)
3 cm.py 模块
用于tcp 链接建立和关闭
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
from .config import MASTER_URL
import zerorpc
from .handler import Handler
log=getlogger('server')
class Master_Listen:
def __init__(self):
self.server=zerorpc.Server(Handler())
def start(self):
self.server.bind(MASTER_URL)
log.info('Master 启动配置')
self.server.run()
def shutdown(self):
self.server.close()
4 master._init_.py 模块
#!/usr/bin/poython3.6
#conding:utf-8
from .cm import Master_Listen
class appserver:
def __init__(self):
self.appserver=Master_Listen()
def start(self):
self.appserver.start()
def shutdown(self):
self.appserver.shutdown()
5 appserver.py模块
#!/usr/bin/poython3.6
#conding:utf-8
from master import appserver
if __name__ == "__main__":
appserver=appserver()
try:
appserver.start()
except KeyboardInterrupt:
appserver.shutdown()
启动服务测试如下
结果如下
上述代码实现了基本的注册,心跳部分的功能
经观察可知,目前注册和心跳除了类型不同外,其可以认为第一次心跳成功就是注册。
3 master的数据设计
master端核心需要存储2中数据:agent端数据,用户客户端浏览器提交的任务Task,构造出一个数据结构,存储相关信息.具体数据结构如下
1 agent客户端数据存储结构
{
"agents" :{
"agent_id" :{
"heartbeat" :"timestamp",
"busy" :False,
"info" :{
"hostname" :"",
"ip" :[]
}
}
}
}
数据结构解释如下
1 agents里面记录了所有注册的agent
agent_id,字典的key,每一个agent 都有一个不同uuid,所以这个字典的键就是uuid,
heartbeat 由于设计中并没有让agent端发送心跳时间,所以就在master端记录了收到的时间
busy 如果agent 上有任务在执行。则此值表现为True
info 记录agent上发过来的hostname和ip列表
2 task数据存储结构
{
"tasks" :{
"task_id" :{
"script" :"base64encode",
"targets" :{
"agent_id" :{
"state":"WAITING",
"output" :""
}
},
"state" :"WAITING"
}
}
}
task 记录所有任务及target(agent)的状态
task_id ,字典的key对应一个一个task,item 也是taskid:{} 结构
task 任务,task.json 的payload信息
targets目标,用于指定agent的节点,记录agent上的state和输出output
state状态,单个agent上的执行状态state 这是一个task的状态,整个任务的状态,比如统计达到了agent失败上限了,这个task的state 就置为失败
状态常量
"WAITING" "RUNNING" "SUCCEED" "FAILED"
4 agent 端信息存储
创建 storage.py 模块
构建Storage 类,用于存储用户信息
#!/usr/bin/poython3.6
#conding:utf-8
import datetime
class Storage:
def __init__(self):
self.agents={} # 此处用于存储用户信息
self.tasks={} # 此处用于存储作业信息
def reg_hb(self,agent_id,info): # id 及就是客户端的id ,info 及就是host和ip地址
self.agents[agent_id] = {
'heaerbeat' : datetime.datetime.now(),
'info' :info,
'busy':self.agents.get(agent_id,{}).get('busy',False)
}
# busy 读不到置False,读到了不变
handler.py端配置如下
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
from .storage import Storage
log=getlogger('handler')
class Handler(object):
def __init__(self):
self.store=Storage()
def send(self,msg): # 定义一个可调用的基础函数,此处的msg及就是对应的函数
log.info('客户端agent发送消息为:{}'.format(msg))
try:
if msg['type'] in {'hearbeat','register'}:
payload=msg['payload']
info={'hostname' :payload['hostname'],'ip' :payload['ip']}
self.store.reg_hb(payload['id'],info)
log.info("客户端数据列表为:{}".format(self.store.agents)) # 客户端的列表
return "agent信息为: {}".format(msg)
except Exception as e:
log.error("注册客户端信息错误为:{}".format(e))
return "Bad Request...."
运行结果如下
5 task 任务基本注册和创建
1 概述
用户通过WEB(HTTP)提交新的任务,任务json信息有:
1 任务脚本script,base64编码
2 超时时间timeout
3 并行度 parallel
4 失败率 fail_rate
5 失败次数fail_count
6 targets 是跑任务的Agent的agent_id列表,这个目前也是在用户端选好的,如yoghurt需要在主机名为webserver-xxxx的几台设备上运行脚本,为了用户方便,可以使用类似ansible的分组。在Master端受到信息后,需要添加2个信息
task_id 是Mater 端新建任务时生成的uuid
state 默认状态是WAITING
在WEB server 中最后将用户端发送来的数据组成下面的字典
task={
"task_id" :t.id,
"script" :t.script,
"timeout":t.timeout,
"parallel" :t.parallelm,
"fail_rate":t.fail_rate,
"fail_count":t.fail_count,
"state":t.state,
"targets":t.targets
}
2 构建state类
用于处理相关消息的类型
#!/usr/bin/poython3.6
#conding:utf-8
WAITING='WAITING'
RUNNING='RUNNING'
SUCCEED='SUCCEED'
FAILED='FAILED'
3 构建task类
创建master/task.py 类处理webserver端数据
\
#!/usr/bin/poython3.6
#conding:utf-8
import uuid # 获取唯一的task_id
from .state import *
class Task:
def __init__(self,task_id,script,targets,timeout=0,parallel=1,fail_rate=0,fail_count=-1):
self.id=task_id # task唯一标识,用于确定任务
self.script=script # 对应的脚本内容,客户端输入的脚本
self.timeout=timeout # 超时时间
self.parallel=parallel # 并行执行数量
self.fail_rate=fail_rate #失败率
self.fail_count=fail_count #失败数
self.state=WAITING # 对应的消息的状态
self.targets={agent_id:{'state' : WAITING,'output':''} for agent_id in targets} # 此处对应客户端列表
self.target_count=len(self.targets) # 此处对应客户端的数量
在master.storage.py模块中进行相关方法调用,并将其存储进入task中
#!/usr/bin/poython3.6
#conding:utf-8
import datetime
from .task import Task
class Storage:
def __init__(self):
self.agents={} # 此处用于存储用户信息
self.tasks={} # 此处用于存储作业信息
def reg_hb(self,agent_id,info): # id 及就是客户端的id ,info 及就是host和ip地址
self.agents[agent_id] = {
'heaerbeat' : datetime.datetime.now(),
'info' :info,
'busy':self.agents.get(agent_id,{}).get('busy',False)
}
# busy 读不到置False,读到了不变
def add_task(self,task:dict): # 此处用于从客户端获取相关的数据
t=Task(**task) # 此处进行参数解构
self.tasks[t.id]=t
return t.id # 此处用于获取处理id
在master/handler.py 中处理用于webservr调用相关配置
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
from .storage import Storage
import uuid
log=getlogger('handler')
class Handler(object):
def __init__(self):
self.store=Storage()
def send(self,msg): # 定义一个可调用的基础函数,此处的msg及就是对应的函数
log.info('客户端agent发送消息为:{}'.format(msg))
try:
if msg['type'] in {'hearbeat','register'}:
payload=msg['payload']
info={'hostname' :payload['hostname'],'ip' :payload['ip']}
self.store.reg_hb(payload['id'],info)
log.info("客户端数据列表为:{}".format(self.store.agents)) # 客户端的列表
return "agent信息为: {}".format(msg)
except Exception as e:
log.error("注册客户端信息错误为:{}".format(e))
return "Bad Request...."
def add_task(self,task): # 此处用于在webserver 端创建的agent调用方法返回结果
task['task_id']=uuid.uuid4().hex # 用于生成相关的任务id
return self.store.add_task(task) # 此处用于调用相关配置
def get_agents(self):
return self.store.get_agents()
6 task 任务分派
1 任务分派方式
任务在Storage中存储,一旦有了任务,需要将任务分派到指定节点执行,交给这些节点上的agent
不过,目前使用zerorpc,master是被动的接受agent端的数据并进行相关的响应操作,所以可以考虑使用一种agent端主动拉取数据的机制,提供一个接口,让agent访问,如果agent处于空闲状态,则就主动拉取任务,有任务就领走。
当agent少的时候,master推送任务到agent端,或者agent端主动拉取任务都是可以的,但是如果考虑到agent多的时候,或许使用agent拉模式是一个更好的选择。本次采用agent拉取模式实现,所以master就不需要设计调度器了
2 客户端配置状态参数
agent/state.py
#!/usr/bin/poython3.6
#conding:utf-8
WAITING='WAITING'
RUNNING='RUNNING'
SUCCEED='SUCCEED'
FAILED='FAILED'
3 客户端添加消息类型result
用于返回至server端,用于最后返回至web浏览器端
#!/usr/bin/poython3.6
#conding:utf-8
import socket
import uuid
import netifaces
import ipaddress
import os
class Messgae:
def __init__(self,myidpath):
if os.path.exists(myidpath): # 如果存在
with open(myidpath) as f:
self.id=f.readline().strip()
else:
self.id=uuid.uuid4().hex
with open(myidpath,'w') as f:
f.write(self.id)
def get_ipaddress(self):
address=[]
for p in netifaces.interfaces(): # 获取网口列表
n=netifaces.ifaddresses(p) # 获取字典
if n.get(2): # 查看是否存在ipv4地址
for ip in n[2]: # 此处获取对应列表的值
if ip['addr']: # 查看ip地址是否存在
ip=ipaddress.ip_address(ip['addr'])
if ip.is_reserved or ip.is_multicast or ip.is_link_local or ip.is_loopback:
continue
address.append(str(ip))
return address
def hearbeat(self):
return {
"type" :"hearbeat",
"payload" :{
"ip" : self.get_ipaddress(),
"hostname" : socket.gethostname(),
"id" : self.id
}
}
def reg(self):
return {
"type" :"register",
"payload" :{
"ip" : self.get_ipaddress(),
"hostname" : socket.gethostname(),
"id" : self.id
}
}
def result(self,task_id,code,output): # 返回数据至web端,处理相关数据执行结果的返回
return {
"type" :"result",
"payload" :{
"id" : task_id, # 此处用于定义task_id 及任务id
"agent_id" :self.id, # 此处用于获取客户端id
"code" : code, # 此处用于对执行结果状态进行保存
"output" : output #此处用于对执行结果的输出信息进行保存,并进行相关配置
}
}
4 agent/cm.py模块
用于处理配置拉取相关事宜
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc #添加模块
import threading # 用于处理中断相关
from .msg import Messgae # 获取消息
from .state import *
from .config import CONN_URL
from .executor import Executor
from utils import getlogger
class Conn_Manager:
def __init__(self,timeout=3):
self.timeout=timeout
self.client=zerorpc.Client()
self.event=threading.Event()
self.message=Messgae('/var/log/mschedule/uuid') # 此处用于初始化消息
self.log=getlogger('agent') # 此处填写相关的log日志名称
self.state=WAITING
self.exec=Executor()
def start(self):
try:
self.event.clear()
self.client.connect(CONN_URL) # 链接处理
self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg()))) # 发送心跳信息
self.client.send(self.message.reg()) #处理注册消息
while not self.event.wait(self.timeout): # 等待的时间
self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hearbeat()))) # 发送心跳信息
task=self.client.get_task(self.message.id) # 此处返回三个参数,1 为taskid,二是script ,三是timeout
if task:
code,output=self.exec.run(task[1],task[2])
self.client.send(self.message.result(task[0],code,output))
else:
return "目前无消息"
except Exception as e:
self.event.set()
raise e # 此处是抛出异常到上一级
def shutdown(self):
self.log.info("关闭操作")
self.client.close()
self.event.set()
4 服务端相关task获取配置
master/storage.py 用于配置获取agent_id和task相关信息
#!/usr/bin/poython3.6
#conding:utf-8
import datetime
from .task import Task
from .state import *
class Storage:
def __init__(self):
self.agents={} # 此处用于存储用户信息
self.tasks={} # 此处用于存储作业信息
def reg_hb(self,agent_id,info): # id 及就是客户端的id ,info 及就是host和ip地址
self.agents[agent_id] = {
'heaerbeat' : datetime.datetime.now(),
'info' :info,
'busy':self.agents.get(agent_id,{}).get('busy',False)
}
# busy 读不到置False,读到了不变
def get_agents(self):
return self.agents
def add_task(self,task:dict): # 此处用于从客户端获取相关的数据
t=Task(**task) # 此处进行参数解构
self.tasks[t.id]=t
return t.id # 此处用于获取处理id
@property
def itme_task(self):
yield from (task for task in self.tasks.values()) # 此处返回task
def get_task(self,agent_id):
return [task.id,task.script,task.timeout]
master/handler.py 配置如下
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
from .storage import Storage
import uuid
log=getlogger('handler')
class Handler(object):
def __init__(self):
self.store=Storage()
def send(self,msg): # 定义一个可调用的基础函数,此处的msg及就是对应的函数
log.info('客户端agent发送消息为:{}'.format(msg))
try:
if msg['type'] in {'hearbeat','register'}:
payload=msg['payload']
info={'hostname' :payload['hostname'],'ip' :payload['ip']}
self.store.reg_hb(payload['id'],info)
log.info("客户端数据列表为:{}".format(self.store.agents)) # 客户端的列表
return "agent信息为: {}".format(msg)
except Exception as e:
log.error("注册客户端信息错误为:{}".format(e))
return "Bad Request...."
def add_task(self,task): # 此处用于在webserver 端创建的agent调用方法返回结果
task['task_id']=uuid.uuid4().hex # 用于生成相关的任务id
return self.store.add_task(task) # 此处用于调用相关配置
def get_agents(self):
return self.store.get_agents()
def get_task(self,agent_id):
return self.store.get_task(agent_id)
5 处理服务端接受result 消息处理机制
master/handler.py中配置
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
from .storage import Storage
import uuid
log=getlogger('handler')
class Handler(object):
def __init__(self):
self.store=Storage()
def send(self,msg): # 定义一个可调用的基础函数,此处的msg及就是对应的函数
log.info('客户端agent发送消息为:{}'.format(msg))
try:
if msg['type'] in {'hearbeat','register'}:
payload=msg['payload']
info={'hostname' :payload['hostname'],'ip' :payload['ip']}
self.store.reg_hb(payload['id'],info)
log.info("客户端数据列表为:{}".format(self.store.agents)) # 客户端的列表
return "agent信息为: {}".format(msg)
elif msg['type']=="result": # 此处用于处理相关返回信息
self.store.result(msg['payload']) # 调用对应方法
except Exception as e:
log.error("注册客户端信息错误为:{}".format(e))
return "Bad Request...."
def add_task(self,task): # 此处用于在webserver 端创建的agent调用方法返回结果
task['task_id']=uuid.uuid4().hex # 用于生成相关的任务id
return self.store.add_task(task) # 此处用于调用相关配置
def get_agents(self):
return self.store.get_agents()
def get_task(self,agent_id):
return self.store.get_task(agent_id)
def get_result(self,task_id): # 此处返回对应的值
return self.store.get_result(task_id)
master/stroage.py端配置
#!/usr/bin/poython3.6
#conding:utf-8
import datetime
from .task import Task
from .state import *
class Storage:
def __init__(self):
self.agents={} # 此处用于存储用户信息
self.tasks={} # 此处用于存储作业信息
self.result={} # 用于存储agent端返回的结果
def reg_hb(self,agent_id,info): # id 及就是客户端的id ,info 及就是host和ip地址
self.agents[agent_id] = {
'heaerbeat' : datetime.datetime.now().timestamp(),
'info' :info,
'busy':self.agents.get(agent_id,{}).get('busy',False)
}
# busy 读不到置False,读到了不变
def get_agents(self):
return self.agents
def add_task(self,task:dict): # 此处用于从客户端获取相关的数据
t=Task(**task) # 此处进行参数解构
self.tasks[t.id]=t
return t.id # 此处用于获取处理id
@property
def itme_task(self):
yield from (task for task in self.tasks.values()) # 此处返回task
def get_task(self,agent_id):
for task in self.itme_task:
if agent_id in task.targets: # 此处用于判断当前节点接入任务情况
return [task.id,task.script,task.timeout]
def add_result(self,payload:dict):
self.result[payload['id']]=payload # 此处以task_id 为键,以payload为值进行处理
def get_result(self,task_id:dict):
return self.result.get(task_id['task_id']) # task_id,获取对应的payload值
五 web端配置和处理
1 概述
用户通过WEB(HTTP)提交新的任务,任务json信息有:
1 任务脚本script,base64编码
2 超时时间timeout
3 并行度 parallel
4 失败率 fail_rate
5 失败次数 fail_count
6 targets 是跑在agent上的agent_id 列表,可以让用户看到一个列表,通过列表的勾选来完成相关的操作
2 代码实现
根目录创建appwebserver.py配置
1 获取agent相关列表
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
from aiohttp import request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"
client=zerorpc.Client()
client.connect(CONN_URL)
async def targetshandler(request:web.Request):
txt=client.get_agents() #通过zerorpc调用master端接口
return web.json_response(txt) # 返回json端数据
app=web.Application()
app.router.add_get('/task/targets',targetshandler) # 使用get方法进行处理
2 提交任务端配置
1 客户端数据如下
{
"script" : "echo hello",
"timeout" :20,
"targets" :[]
}
2 添加提交数据接口
async def taskhandler(request:web.Request):
j = await request.json() # 获取post 提交的数据,用于task任务数据生成
txt=client.add_task(j)
return web.Response(text=txt,status=201)
app.router.add_post('/task',taskhandler)
3 添加获取执行结果配置
async def taskresult(request:web.Request):
j = await request.json()
txt =client.get_result(j)
return web.json_response(txt)
app.router.add_post('/result',taskresult)
4 整体代码如下
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
from aiohttp import request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"
client=zerorpc.Client()
client.connect(CONN_URL)
async def targetshandler(request:web.Request):
txt=client.get_agents() #通过zerorpc调用master端接口
return web.json_response(txt) # 返回json端数据
app=web.Application()
app.router.add_get('/task/targets',targetshandler) # 使用get方法进行处理
async def taskhandler(request:web.Request):
j = await request.json()
txt=client.add_task(j)
return web.Response(text=txt,status=201)
app.router.add_post('/task',taskhandler)
async def taskresult(request:web.Request):
j = await request.json()
txt =client.get_result(j)
return web.json_response(txt)
app.router.add_post('/result',taskresult)
if __name__ == "__main__":
web.run_app(app,host='0.0.0.0',port=80)
3 测试结果如下
六 处理数据和节点状态
1 状态管理类型
1 节点状态
当节点在进行相关事件调度处理时,其状态应该是RUNNING状态,当处理完成后,其状态应该恢复称为WAITING状态。
2 task 任务状态
当当前agent下的所有该任务都执行完成时的状态,此处设计较为简单,只是全部执行就将其状态置位成功,否则为RUNNING状态或者WAITING,当有一个agent领取任务时,其状态将被置为RUNNING。
3 task中对应的agent的状态
及就是当前节点执行当前任务的状态,此状态保存在task中的targets字典中,用于对其客户端执行结果进行判断而获取其对应状态。
2 客户端调整代码
主要是cm.py调整如下
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc #添加模块
import threading # 用于处理中断相关
from .msg import Messgae # 获取消息
from .state import *
from .config import CONN_URL
from .executor import Executor
from utils import getlogger
class Conn_Manager:
def __init__(self,timeout=3):
self.timeout=timeout
self.client=zerorpc.Client()
self.event=threading.Event()
self.message=Messgae('/var/log/mschedule/uuid') # 此处用于初始化消息
self.log=getlogger('agent') # 此处填写相关的log日志名称
self.state=WAITING
self.exec=Executor()
def start(self):
try:
self.event.clear()
self.client.connect(CONN_URL) # 链接处理
self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg()))) # 发送心跳信息
self.client.send(self.message.reg()) #处理注册消息
while not self.event.wait(self.timeout): # 等待的时间
self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hearbeat()))) # 发送心跳信息
if self.state == WAITING: # 如果此处是空闲状态,则进行领任务处理
print('获取任务task')
task = self.client.get_task(self.message.id) # 此处返回三个参数,1 为taskid,二是script ,三是timeout
if task: # 领取成功,则进行执行相关任务.并上传至服务器端其状态
self.state = RUNNING # 此处任务成功的情况
code,output=self.exec.run(task[1],task[2])
self.client.send(self.message.result(task[0], code, output))
self.state=WAITING #状态更新为当前正常状态
else:
return "目前无消息"
except Exception as e:
self.event.set()
raise e # 此处是抛出异常到上一级
def shutdown(self):
self.log.info("关闭操作")
self.client.close()
self.event.set()
3 master端代码调整
master/storage.py
#!/usr/bin/poython3.6
#conding:utf-8
import datetime
from .task import Task
from .state import *
from utils import getlogger
log=getlogger('storage')
class Storage:
def __init__(self):
self.agents={} # 此处用于存储用户信息
self.tasks={} # 此处用于存储作业信息
self.result={} # 用于存储agent端返回的结果
self.task_state=0 # 用于处理当所有agent状态都修改为成功或失败时将task的状态也进行相关的修改
def reg_hb(self,agent_id,info): # id 及就是客户端的id ,info 及就是host和ip地址
self.agents[agent_id] = {
'heaerbeat' : datetime.datetime.now().timestamp(),
'info' :info,
'busy':self.agents.get(agent_id,{}).get('busy',False)
}
# busy 读不到置False,读到了不变
def get_agents(self):
return self.agents
def add_task(self,task:dict): # 此处用于从客户端获取相关的数据
t=Task(**task) # 此处进行参数解构
self.tasks[t.id]=t
return t.id # 此处用于获取处理id
@property
def itme_task(self):
yield from (task for task in self.tasks.values() if task.state in {WAITING,RUNNING}) # 此处返回task,当其中有成功或者失败时,则不用进行相关的操作处理
#当为WAITING或者RUNNING 时,则进行相关的操作,其他情况则不进行相关操作
def get_task(self,agent_id):
for task in self.itme_task:
if agent_id in task.targets: # 此处用于判断当前节点接入任务情况
if task.state==WAITING:
task.state=RUNNING #当前消息的状态
task.targets[agent_id]['state']=RUNNING # 此处是指此消息中的agent是否执行的状态的处理,若获取了,则此处的状态为RUNNING
return [task.id,task.script,task.timeout]
def add_result(self,payload:dict):
for task in self.itme_task:
if payload['code']==0:
task.targets[payload['agent_id']]['state']=SUCCEED # 此处是指对此消息进行处理,若code=0,则表示客户端执行成功,若为1,则表示失败
self.task_state+=1
else:
task.targets[payload['agent_id']]['state']= FAILED#
self.task_state+=1
if self.task_state==task.target_count:
task.state=SUCCEED
self.task_state=0
payload['agent_state']=task.targets[payload['agent_id']]['state']
log.info("当前消息内容为:{}".format(self.result))
self.result[payload['id']]=payload # 此处以task_id 为键,以payload为值进行处理
def get_result(self,task_id:dict):
task_id=task_id['task_id']
return self.result.get(task_id) # task_id,获取对应的payload值
4 webserver端代码调整如下
webappserver.py
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
from aiohttp import request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"
client=zerorpc.Client()
client.connect(CONN_URL)
async def targetshandler(request:web.Request):
txt=client.get_agents() #通过zerorpc调用master端接口
return web.json_response(txt) # 返回json端数据
app=web.Application()
app.router.add_get('/task/targets',targetshandler) # 使用get方法进行处理
async def taskhandler(request:web.Request):
j = await request.json()
txt=client.add_task(j)
return web.Response(text=txt,status=201)
app.router.add_post('/task',taskhandler)
async def taskresult(request:web.Request):
j = await request.json()
txt =client.get_result(j)
if txt['code'] !=0:
txt['output']='参数不正确,请重新输入'
return web.json_response(txt)
app.router.add_post('/result',taskresult)
if __name__ == "__main__":
web.run_app(app,host='0.0.0.0',port=80)
5 结果如下
网站名称:简单任务调度系统
网站网址:http://scyanting.com/article/pssgdp.html