5、Python3 任务调度系统
这个任务调度系统是从运维自动化的角度来设计,提高运维效率。运维管理大致分以下几个阶段:
-
人工阶段
人工盯着服务器,出了问题,跑到机器前,翻日志,查状态,手动操作。
-
脚本阶段
开始写一些自动化脚本,启动计划任务,自动启动服务,监控服务等。
-
工具阶段
脚本功能太弱,开发了大量工具,某种工具解决某个特定领域的问题,常用的有 ansible、puppet 等。
-
平台阶段
将工具整合,自主开发,实现标准化,实现自动化流程控制。
-
AI 智能化
结合大数据和人工智能平台自动计算、分析数据,智能告警,智能解决复杂问题。
5.1、调度设计
类似于 ansible 功能,更加简单,满足企业需求。
-
分发任务
分发脚本到目标节点上去执行。
-
控制
控制并发,控制多少个节点同时执行。
对错误做出响应。由用户设定,最多允许失败的比例或者数量,当超过范围时,需要终止任务执行。
可以终止正在执行的任务。
-
能跨机房部署
-
能对作业做版本控制,这个是辅助的需求,可以以后实现。
本项目的出发点,要求会使用 shel 脚本就可以了,而 ansible、salt 等需要学习特定的内部语言。
如果觉得 ansible 这样的工具不能满足需求,二次开发难度较高,其代码量不小。本身它们开发接口不完善,而且熟悉它的架构也比较难,就算开发出来维护也难。
从这些项目上二次开发,等于拉一个分支,如果主分支有了新的特性,想合并也很困难。
自己开发,量身定做,满足自己需求。代码规模可控,便于他人接手维护。
自己开发就是造轮子,造轮子不是不好,不要一上来就设计打造一个超级牛的轮子,结果能力不足,项目失败。
先构建一个适合的、能力所及、满足需求的轮子,后面再逐步完善。一般来说,越是自动化运维程度越高的公司,自己写的系统越多,因为满足他们需要的工具少。
5.2、分发任务设计
分为有 Agent、无 Agent。
-
有 Agent,被控节点需要安装或运行特殊的软件,和服务器端通信,服务器端把脚本、命令传给 Agent 端,由 Agent 来控制执行。
-
无 Agent,被控节点不需要安装或运行特殊的软件,例如通过 ssh。这其实也是有 Agent,不过不是自己写的程序。
-
通用、简单、易实现。但管理不善,容易出安全问题。
-
并行效率不高。有 Agent 并行执行任务,可以不和管理服务器通信,所以并发很高。ssh 执行要和 master 之间通信,耗费更多时间。
-
ssh 连接是有状态的。任务执行的时候,master 不能挂了,否则任务执行失败。
-
5.3、执行脚本
Python3 建议使用标准库 subprocess 模块,启动一个子进程。
from subprocess import Popen
p = Popen('echo hello', shell=True)
print(p)
class Popen:
def __init__(self, args, bufsize=-1, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=True,
shell=False, cwd=None, env=None, universal_newlines=None,
startupinfo=None, creationflags=0,
restore_signals=True, start_new_session=False,
pass_fds=(), *, user=None, group=None, extra_groups=None,
encoding=None, errors=None, text=None, umask=-1, pipesize=-1):
- shell 为 True,则使用 shell 来执行 args,建议 args 是一个字符串。
- args 可以是执行脚本的字符串,功能很强大。
# 注意scripts目录在项目的根目录下 p = Popen('python scripts/handler.py', shell=True)
5.4、构建项目
项目名称 mschedule。
构建一个模块 agent,模块下建一个 executor.py。
import subprocess
class Executor:
def __init__(self, script):
self.script = script
def run(self):
p = subprocess.Popen(self.script, shell=True)
p.wait()
项目根目录下,建立一个 app.py。
from agent.executor import Executor
if __name__ == '__main__':
executor = Executor('echo "hello brinnatt"')
executor.run()
运行成功,看来可以成功运行脚本。
5.4.1、agent 设计
用户和 Master Server 通信,提交任务。
Master 按照用户要求将任务分发到指定的节点上,这些节点上需要有一个 Agent 和 Master 通信,接收 Master 发布的任务,并执行这些任务。
设计 Agent 时,应当注意,越简单越好,越简单 Bug 就越少,越稳定。
从本质上来说,Master、Agent 设计是典型的 CS 编程模式。
Master 作为 CS 中的 Server,Agent 作为 CS 中的 client。
-
注册信息
Agent 启动后,需要主动联系 Server,注册自己的信息。信息包括:
-
我是谁,hostname、UUID。UUID 保证唯一,因为主机名有可能重复。
-
来自哪里,IP 地址是多少。需要 Agent 主动在信息中告诉 Master。其他信息,这个根据情况而定。
-
-
心跳信息
Agent 定时向 Master 发送心跳包,包含 UUID 这个唯一标识,附带 hostname 和 ip。
hostname 和 ip 都可能变动,但是这个 Agent 不变,UUID 也就不变。
其他信息也可以附加,例如增加一个 flag,表示 Agent 上是否有任务在跑。
-
任务消息
Master 分派任务给 Agent,发送任务描述信息到 Agent。
注意脚本字符串使用 Base64 编码。
-
任务结果消息
当 Agent 执行完任务,返回给 Master 该任务的状态码和输出结果。
以上 Master、Agent 之间需要传送消息,消息采用 json 格式。
5.4.2、消息设计
注册消息:
{
"type": "register",
"payload": {
"id": "uuid",
"hostname": "xxx",
"ip": []
}
}
心跳消息:
{
"type": "heartbeat",
"payload": {
"id": "uuid",
"hostname": "xxx",
"ip": []
}
}
任务消息:
{
"type": "task",
"payload": {
"id": "task-uuid",
"script": "base64encode",
"timeout": 0,
"parallel": 1,
"fail_rate": 0,
"fail_count": -1
}
}
parallel:并行,表示同时执行的任务。
fail_rate:失败率,0 表示不允许失败。
fail_count:失败数, -1 不关心失败的数量和失败率。
执行结果消息:
{
"type": "result",
"payload": {
"id": "task-uuid",
"agent_id": "agent-uuid",
"code":0,
"output": "base64encode"
}
}
id:任务 id。
agent_id:Agent 是谁。
code:返回的状态码,0 正常,非零错误,和 linux 的命令返回值一样。
output:输出的结果。字符串,Base64 编码后返回。
5.5、代码实现
5.5.1、配置
agent.config 模块,配置信息放这里。
5.5.2、日志
项目根目录下 utils.py:
import logging
def getLogger(mod_name: str, filepath: str):
logger = logging.getLogger(mod_name)
logger.setLevel(logging.INFO) # 单独设置
logger.propagate = False
handler = logging.FileHandler(filepath)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(fmt="%(asctime)s [%(name)s %(funcName)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
5.5.3、通信模块
原生的 Socket 编程太过底层,少使用。任何一门语言都要避免直接使用 socket 库开发,太过底层,难写难维护。
这次使用一个非常轻巧的、跨语言的 RPC 通信模块 zerorpc。它基于 ZeroMQ 和 MessagePack。
$ pip install zerorpc
测试一下,Server 代码如下:
import zerorpc
class MyRPC:
def igetyou(self, text): # RPC对外的接口,其中处理客户端传来的数据
return f"send back {text}"
s = zerorpc.Server(MyRPC())
s.bind("tcp://0.0.0.0:9000") # 绑定
s.run() # 持久运行监听
测试一下,Client 代码如下:
import zerorpc
import threading
c = zerorpc.Client()
c.connect('tcp://127.0.0.1:9000')
i = 1
while not threading.Event().wait(3):
print(c.igetyou('hello server'), i)
i += 1
在 agent 模块下,创建 cm 模块,准备使用 zerorpc 实现网络连接。
注意:不要把 zerorpc 的方法随便的放到线程中,会抛异常。
5.5.4、注册消息实现
uuid
使用 uuid.uuid4().hex
获取一个 uuid。一个节点起始运行的时候是没有 uuid 的,一旦运行会生成一个 uuid,并持久化到一个文件中,下次运行先找这个文件,如果文件中有 uuid,就直接读取,没有 uuid,就重新生成并写入到该文件中。
hostname
windows 和 Linux 取主机名方式不一样。
可以在所有平台使用 socket.gethostname()
获取主机名。
ip 列表
$ pip install netifaces2
netifaces.interfaces()
返回接口列表。
netifaces.ifaddresses(interface)
取指定接口上的 IP 地址。
import netifaces
print('-->', netifaces.interfaces())
i = 1
for iface in netifaces.interfaces():
print(i, '-->', netifaces.ifaddresses(if_name=iface))
i += 1
返回信息如下:
--> ['VMware Virtual Ethernet Adapter for VMnet1', 'VMware Virtual Ethernet Adapter for VMnet8', 'Realtek USB GbE Family Controller', 'Intel(R) Dual Band Wireless-AC 8265', 'Microsoft Wi-Fi Direct Virtual Adapter', 'Microsoft Wi-Fi Direct Virtual Adapter #2']
1 --> {2: [{'mask': '255.255.255.0', 'addr': '192.168.187.1'}], -1000: [{'addr': '0:50:56:C0:0:1:0:0'}]}
2 --> {2: [{'addr': '192.168.136.1', 'mask': '255.255.255.0'}], -1000: [{'addr': '0:50:56:C0:0:8:0:0'}]}
3 --> {2: [{'addr': '0.0.0.0', 'mask': '0.0.0.0'}], -1000: [{'addr': '0:E0:4C:68:9:D0:0:0'}]}
4 --> {2: [{'mask': '255.255.255.0', 'addr': '10.47.76.58'}], -1000: [{'addr': '7C:76:35:5E:EC:46:0:0'}]}
5 --> {2: [{'mask': '0.0.0.0', 'addr': '0.0.0.0'}], -1000: [{'addr': '7C:76:35:5E:EC:47:0:0'}]}
6 --> {2: [{'mask': '0.0.0.0', 'addr': '0.0.0.0'}], -1000: [{'addr': '7E:76:35:5E:EC:46:0:0'}]}
这是一个字典,key 为 2 就是 ipv4 地址。
每一个接口返回的 ipv4 地址是一个列表,也就是说可以有多个,ipv4 地址描述是在 addr 上。
import netifaces
print('-->', netifaces.interfaces())
for iface in netifaces.interfaces():
if 2 in netifaces.ifaddresses(if_name=iface).keys():
for ip in netifaces.ifaddresses(if_name=iface)[2]:
print(ip['addr'])
# 运行结果
--> ['VMware Virtual Ethernet Adapter for VMnet1', 'VMware Virtual Ethernet Adapter for VMnet8', 'Realtek USB GbE Family Controller', 'Intel(R) Dual Band Wireless-AC 8265', 'Microsoft Wi-Fi Direct Virtual Adapter', 'Microsoft Wi-Fi Direct Virtual Adapter #2']
192.168.187.1
192.168.136.1
0.0.0.0
10.47.76.58
0.0.0.0
0.0.0.0
如何拿到自己想要的 ip,例如排除掉回环、多播等地址呢 ?
ipaddress 库:
import ipaddress
ips = ['127.0.0.1', '192.168.0.1', '169.254.122.2', '0.0.0.0', '239.268.0.255', '224.0.0.1', '8.8.8.8']
for ip in ips:
print(ip)
ip = ipaddress.ip_address(ip)
print(f"link_local {ip.is_link_local}")
print(f"回环 {ip.is_loopback}")
print(f"多播 {ip.is_multicast}")
print(f"公网 {ip.is_global}")
print(f"私有 {ip.is_private}")
print(f"保留 {ip.is_reserved}")
print(f"版本 {ip.version}")
print('-' * 30)
在 agent 模块下新建 msg.py:
import netifaces
import ipaddress
import os
import uuid
import socket
class Message:
def __init__(self, myidpath):
# 从文件中读取主机的UUID
if os.path.exists(myidpath):
with open(myidpath) as f:
self.id = f.readline().strip()
else:
self.id = uuid.uuid4().hex
with open(myidpath, 'w') as f:
f.write(self.id)
def _get_address(self):
"""获取主机上所有接口可用的IPV4地址"""
addresses = []
for iface in netifaces.interfaces():
if 2 in netifaces.ifaddresses(if_name=iface).keys():
for ip in netifaces.ifaddresses(if_name=iface)[2]:
ip = ipaddress.ip_address(ip['addr'])
if ip.version != 4:
continue
if ip.is_link_local:
continue
if ip.is_loopback:
continue
if ip.is_multicast:
continue
if ip.is_reserved:
continue
addresses.append(str(ip))
return addresses
def reg(self):
"""生成注册消息"""
return {
"type": "reg",
"payload": {
"id": self.id,
"hostname": socket.gethostname(),
"ip": self._get_address()
}
}
完成心跳信息的方法,它应该和 reg 是相似的。
def heartbeat(self):
"""生成心跳消息"""
return {
"type": "heartbeat",
"payload": {
"id": self.id,
"hostname": socket.gethostname(),
"ip": self._get_address()
}
}
问题?
心跳信息是非常频繁的发送,几秒一次的。
每一次都需要查询一下 IP 列表,把 IP 都发过去一次,浪费计算资源。
后期可以考虑,单独跑一个线程(进程)处理 IP 地址和主机名等反复使用的信息。
5.5.5、消息发送
完成 agent.cm 模块代码。
一旦 Agent 启动,就会尝试和 Master 建立 TCP 连接发送数据。假设 Master 有 2 个接口方法 reg、heartbeart 可供调用。
agent.config 模块:
MASTER_URL = "tcp://127.0.0.1:9000"
MYID_PATH = "d:/myid"
agent/cm.py 模块:
import zerorpc
import threading
from .msg import Message
class ConnectionManager:
def __init__(self, master_url, message: Message):
self.master_url = master_url
self.message = message
self.client = zerorpc.Client()
self.event = threading.Event()
def start(self, interval):
self.client.connect(self.master_url)
# 注册
self.client.reg(self.message.reg())
# 心跳
while not self.event.wait(interval):
self.client.heartbeat(self.message.heartbeat())
def shutdown(self):
self.event.set()
self.client.close()
def join(self): # 让主线程阻塞
self.event.wait()
特别注意这个 join 方法,让调用的主线程阻塞。
5.5.6、Agent 类
在 agent 模块的 __init__.py
中构建 Agent 类。
其作用是:管理连接,开启连接,负责重连,关闭连接。
注:一个包中有很多模块,但往往把包 __init__.py
作为入口很明智,逻辑清晰。
from .cm import ConnectionManager
from .msg import Message
from .config import MYID_PATH, MASTER_URL
import threading
class Agent:
def __init__(self):
self.msg = Message(MYID_PATH)
self.cm = ConnectionManager(MASTER_URL, self.msg)
self.event = threading.Event()
def start(self, interval):
# 注意:这里的self.event别当成ConnectionManager中的self.event
while not self.event.is_set(): # 重连
try:
self.cm.start(interval)
except Exception as e:
self.cm.shutdown()
self.event.wait(3)
def shutdown(self):
self.event.set()
self.cm.shutdown()
app.py 中测试。
from agent import Agent
if __name__ == '__main__':
agent = Agent()
try:
agent.start(3)
except KeyboardInterrupt:
agent.shutdown()
运行后发现未连接到服务器,超时抛异常。
5.5.7、ConnectionManager 重连机制实现
本来考虑使用多个接口实现 reg、heartbeat 等方法,从本质上来说,它们都是一个方法。
因此修改为使用同一个接口 sendmsg 方法和 Master 通信。
在 connect、sendmsg 方法调用的时候都有可能出现异常,一旦异常,就 set event。Agent 实例中知道不阻塞了,就可以重连了。直到连接上为止,或者直到 shutdown 为止。
agent/cm.py 修改:
import zerorpc
import threading
from .msg import Message
from utils import getLogger
logger = getLogger(__name__, 'd:/cm.log')
class ConnectionManager:
def __init__(self, master_url, message: Message):
self.master_url = master_url
self.message = message
self.client = zerorpc.Client()
self.event = threading.Event()
def start(self, interval=5):
try:
self.event.clear() # 重置event
# 连接
self.client.connect(self.master_url)
# 注册
self._send(self.message.reg())
# 心跳循环
while not self.event.wait(interval):
self._send(self.message.heartbeat())
except Exception as e:
logger.error('Failed to connect to master. Error:{}'.format(e))
raise e
def shutdown(self):
self.event.set()
self.client.close()
def _send(self, msg):
ack = self.client.sendmsg(msg)
logger.info(ack)
def join(self): # 让主线程阻塞
self.event.wait()
好,agent 端先告一段落,开始 Master 端开发,等两边调试通再做进一步开发。
思考:上面的 join 方法还有用吗?需不需要去除?理由是什么?
没啥用,心跳循环相当于无限循环,主线程本来就是阻塞的。
5.6、Master 设计
5.6.1、基本功能
-
TCP Server:绑定端口,启动监听,等待 Agent 连接。
-
信息存储:存储 Agent 列表;存储用户提交的 Task 列表。用户通过 WEB 提交的任务信息都存储下来。
-
接收注册:将注册信息写入 Agent 列表。
-
接收心跳信息:接收 Agent 发来的心跳信息。
-
派发任务:将用户提交的任务分配到 Agent 端。
5.6.2、代码实现
构建 master 模块,master.config 模块。
MASTER_URL = "tcp://0.0.0.0:9000"
master.cm 模块,负责 TCP 连接。
from utils import getLogger
logger = getLogger(__name__, 'd:/cm.log')
class ConnectionManager:
@staticmethod
def handler(msg):
logger.info(type(msg))
return f"{msg}"
sendmsg = handler
定义 Master 类,负责启动 TCP Server。
在 master 模块的 __init__.py
中构建 Master 类。
import zerorpc
from .cm import ConnectionManager
from .config import MASTER_URL
class Master:
def __init__(self):
self.tcpserver = zerorpc.Server(ConnectionManager())
self.tcpserver.bind(MASTER_URL)
def start(self):
self.tcpserver.run() # 循环阻塞的
def shutdown(self):
self.tcpserver.close()
项目根目录下构建一个 appserver.py 用于测试:
from master import Master
if __name__ == '__main__':
master = Master()
try:
master.start()
except KeyboardInterrupt:
master.shutdown()
查看日志,不断有数据写入,测试成功。
2023-09-01 16:05:26,976 [master.cm handler] <class 'dict'>
2023-09-01 16:05:26,978 [agent.cm _send] {'type': 'reg', 'payload': {'id': '5b5bef507bcb4c11a6888d7a4c7f064e', 'hostname': 'DESKTOP-MQ4C8ML', 'ip': ['192.168.187.1', '192.168.136.1', '0.0.0.0', '10.47.76.58', '0.0.0.0', '0.0.0.0']}}
2023-09-01 16:05:30,007 [master.cm handler] <class 'dict'>
2023-09-01 16:05:30,008 [agent.cm _send] {'type': 'heartbeat', 'payload': {'id': '5b5bef507bcb4c11a6888d7a4c7f064e', 'hostname': 'DESKTOP-MQ4C8ML', 'ip': ['192.168.187.1', '192.168.136.1', '0.0.0.0', '10.47.76.58', '0.0.0.0', '0.0.0.0']}}
2023-09-01 16:05:33,038 [master.cm handler] <class 'dict'>
2023-09-01 16:05:33,039 [agent.cm _send] {'type': 'heartbeat', 'payload': {'id': '5b5bef507bcb4c11a6888d7a4c7f064e', 'hostname': 'DESKTOP-MQ4C8ML', 'ip': ['192.168.187.1', '192.168.136.1', '0.0.0.0', '10.47.76.58', '0.0.0.0', '0.0.0.0']}}
2023-09-01 16:05:36,080 [master.cm handler] <class 'dict'>
2023-09-01 16:05:36,082 [agent.cm _send] {'type': 'heartbeat', 'payload': {'id': '5b5bef507bcb4c11a6888d7a4c7f064e', 'hostname': 'DESKTOP-MQ4C8ML', 'ip': ['192.168.187.1', '192.168.136.1', '0.0.0.0', '10.47.76.58', '0.0.0.0', '0.0.0.0']}}
2023-09-01 16:05:39,120 [master.cm handler] <class 'dict'>
2023-09-01 16:05:39,121 [agent.cm _send] {'type': 'heartbeat', 'payload': {'id': '5b5bef507bcb4c11a6888d7a4c7f064e', 'hostname': 'DESKTOP-MQ4C8ML', 'ip': ['192.168.187.1', '192.168.136.1', '0.0.0.0', '10.47.76.58', '0.0.0.0', '0.0.0.0']}}
经过观察发现,目前注册和心跳除了类型不同,其它都一样。可以这样认为,第一次心跳成功,就是注册。
5.6.3、Master 的数据设计
Master 端核心需要存储 2 种数据:Agent 端数据、用户提交的任务 Task。
构造一个数据结构,存储这些信息。
{
"agents": {
"agent_id": {
"heartbeat": "timestamp",
"busy": false,
"info": {
"hostname": "",
"ip": []
}
}
}
}
{
"tasks": {
"task_id": {
"script": "base64encode",
"targets": {
"agent_id": {
"state": "WAITING",
"output": ""
}
},
"state": "WAITING"
}
}
}
上面 2 个数据结构:
-
agents 里面记录了所有注册的 agent
-
agent_id,字典的 key,每一个 agent 有不同的 uuid,所以这个字典就是
item=uuid:{}
。 -
connection 给 agent 记录 master 和 agent 建立的连接,备用?TODO?还要不要了?
-
heartbeat 由于设计中并没有让 agent 端发送心跳时间,所以就在 master 端记录收到的时间。
-
busy 如果 agent 上有任务在跑。就会置这个值为 True。
-
info 记录 agent 上发过来的 hostname 和 ip 列表。
-
-
tasks 记录所有任务及其 target (agent) 的状态。
-
task_id,字典的 key 对应一个个 task,item 也是
taskid:{}
的结构。 -
task 任务,task.json 的 payload 信息。
-
targets 目标,用来执行 agent 的节点,记录 agent 上的 state 和输出 output。
- state 状态,单个 agent 上的执行状态。
-
state 这是一个 task 的状态,整个任务的状态,比如统计达到了 agent 失败上限了,这个 task 的 state 就置为失败。
-
状态常量:WAITING
,RUNNING
,SUCCEED
,FAILED
。
构建 master/state.py
WAITING = 'WAITING'
RUNNING = 'RUNNING'
SUCCEED = 'SUCCEED'
FAILED = 'FAILED'
5.6.4、agent 信息存储
构建 Storage 类。
master/storage.py:
import datetime
class Storage:
def __init__(self):
self.agents = {}
self.tasks = {}
def reg_hb(self, agent_id, info):
self.agents[agent_id] = {
'heartbeat': datetime.datetime.now(),
'info':info,
'busy':self.agents.get(agent_id, {}).get('busy', False)
}
# busy读不到置False,读的到不变。后期实现。
为什么 heartbeat 不用时间戳?
可以用,但是这里时间取的是 Master 的时间,而且不用 Master、Agent 间来回传输,是 Master 内部数据结构 Storage 的数据,所以没有用时间戳。
如果使用时间类型,且使用 json 序列化,就要注意数据类型了。zerorpc 内部使用了 messagepack,支持日期类型。
master/cm.py:
from utils import getLogger
from .storage import Storage
logger = getLogger(__name__, 'd:/cm.log')
class ConnectionManager:
def __init__(self):
self.store = Storage()
def handler(self, msg):
logger.info(type(msg))
try:
if msg['type'] in {"heartbeat", "reg"}:
payload = msg["payload"]
info = {"hostname": payload["hostname"], "ip": payload["ip"]}
self.store.reg_hb(payload['id'], info)
logger.info("{}".format(self.store.agents))
return "ack {}".format(msg)
except Exception as e:
logger.error("{}".format(e))
return "bad Request."
sendmsg = handler # zerorpc 接口
5.6.5、task 任务处理
用户通过WEB(HTTP)提交新的任务,任务json信息有:
1、任务脚本script,base64编码
2、超时时间timeout
3、并行度parallel
4、失败率failrate
5、失败次数fail count
6、targets 是跑任务的Agent的agent_id列表,这个目前也是在用户端选择好。比如用户需要在主机名叫做WEBServer-xxx的几台主机上运行脚本。为了用户方便,可以类似ansible的分组。
在Master端收到任务信息后,需要添加2个信息:
task_id是Master端新建任务时生成的uuid。
state 默认WAITING。
在WEB Server中最后将用户端发来的数据组成下面的字典
{
'task_id': t.id,
'script': t.script,
'timeout': t.timeout,
'parallel': t.parallel,
'fail_rate': t.fail_rate,
'fail_count': t.fail_count,
'state': t.state,
'targets': t.targets
}
将任务数据封装成任务对象,构建 Task 类。
master/task.py:
import uuid
from .state import *
class Task:
def __init__(self, task_id, script, targets, timeout=0,
parallel=1, fail_rate=0, fail_count=-1):
self.id = task_id
self.script = script
self.timeout = timeout
self.parallel = parallel
self.fail_rate = fail_rate
self.fail_count = fail_count
self.state = WAITING
self.targets = {
agent_id: {'state': WAITING, 'output': ""} for agent_id in targets
}
self.target_count = len(self.targets)
WEB Server 调用 Storage 中方法,将任务数据存入。
import datetime
from .task import Task
class Storage:
def __init__(self):
self.agents = {}
self.tasks = {}
def reg_hb(self, agent_id, info):
self.agents[agent_id] = {
'heartbeat': datetime.datetime.now(),
'info': info,
'busy': self.agents.get(agent_id, {}).get('busy', False)
}
def add_task(self, task: dict): # 从WEB Server来
t = Task(**task)
self.tasks[t.id] = t
return t.id
5.6.6、任务分派
分派方式:
任务在 Storage 中存储,一旦有了任务,需要将任务分派到指定节点执行,交给这些节点上的 Agent。
不过,目前使用 zerorpc,Master 是被动的接收 Agent 的数据并响应的。
所以,可以考虑一种 Agent 主动拉取机制,就是提供一个接口,让 Agent 访问。如果 Agent 处于空闲,就主动拉取任务,有任务就领走。
当 Agent 少的时候,Master 推送任务到 Agent 端,或者 Agent 端主动拉取任务都是可以的。但是如果考虑 Agent 多的时候,或许 Agent 拉模式是更好的选择。
本次采用 Agent 拉取模式实现,所以 Master 就不需要设计调度器了。
master/storage.py:
import datetime
from .task import Task
from .state import *
class Storage:
def __init__(self):
self.agents = {}
self.tasks = {}
def reg_hb(self, agent_id, info):
self.agents[agent_id] = {
'heartbeat': datetime.datetime.now(),
'info': info,
'busy': self.agents.get(agent_id, {}).get('busy', False)
}
def add_task(self, task: dict): # 从WEB Server来
t = Task(**task)
self.tasks[t.id] = t
return t.id
# 任务没有人领是WAITING,有一个人领算RUNNING
def iter_tasks(self, states={WAITING, RUNNING}):
yield from (task for task in self.tasks.values() if task.state in states)
def get_task_by_agentid(self, agent_id, state=WAITING):
for task in self.iter_tasks():
if agent_id in task.targets.keys():
t = task.targets.get(agent_id)
if t.get('state') == state:
return task, t # 为节点找到一个任务就返回
master/cm.py:
from utils import getLogger
from .storage import Storage
from .state import *
logger = getLogger(__name__, 'd:/cm.log')
class ConnectionManager:
def __init__(self):
self.store = Storage()
def handler(self, msg):
logger.info(type(msg))
try:
if msg['type'] in {"heartbeat", "reg"}:
payload = msg["payload"]
info = {"hostname": payload["hostname"], "ip": payload["ip"]}
self.store.reg_hb(payload['id'], info)
logger.info("{}".format(self.store.agents))
return "ack {}".format(msg)
except Exception as e:
logger.error("".format(e))
return "bad Request."
sendmsg = handler # zerorpc 接口
def take_task(self, agent_id): # 空闲Agent主动拉取任务
# 有任务则返回任务信息,否则返回 None
# {'id': task.id, 'script': task.script, 'timeout': task.timeout}
# 遍历状态是RUNNING或WAITING的任务,其中targets中agent_id是自己的且状态是WAITING的
info = self.store.get_task_by_agentid(agent_id)
# 找到了,就将WAITING任务转换为RUNNING,将agent自己的状态置为RUNNING
if info:
task, target = info
task.state = RUNNING
target['state'] = RUNNING
return {
'id': task.id,
'script': task.script,
'timeout': task.timeout
}
5.6.7、Agent 领取任务
Agent 领取任务,就是 client 去调用 take_task 接口。
执行流程:
放在心跳循环中,不过要增加状态,这个状态直接使用 master 中定义的状态文件。
如果在循环中,Agent 的当前状态是 WAITING,就可以去 Master 获取任务。
如果没有任务,就隔一段时间尝试再次取任务。如果获取到任务,就可以将状态置为 RUNNING 并开启新的进程执行脚本,直到脚本执行完,把状态码和输出结果封装成 result 消息返回给 Master。
agent/cm.py:
import zerorpc
import threading
from .msg import Message
from utils import getLogger
from .state import *
from .executor import Executor
logger = getLogger(__name__, 'd:/cm.log')
class ConnectionManager:
def __init__(self, master_url, message: Message):
self.master_url = master_url
self.message = message # 对象
self.client = zerorpc.Client()
self.event = threading.Event()
self.state = WAITING
self.executor = Executor()
def start(self, interval=5):
try:
self.event.clear() # 重置event
# 连接
self.client.connect(self.master_url)
# 注册
self._send(self.message.reg())
# 心跳循环
while not self.event.wait(interval):
self._send(self.message.heartbeat())
if self.state == WAITING:
self._get_task(self.message.id)
except Exception as e:
logger.error('Failed to connect to master. Error:{}'.format(e))
raise e
def shutdown(self):
self.event.set()
self.client.close()
def _send(self, msg):
try:
ack = self.client.sendmsg(msg)
logger.info(ack)
except Exception as e:
logger.error(f'Failed to connect to master. Error:{e}')
self.event.set()
def _get_task(self, agent_id):
task = self.client.take_task(self.message.id)
if task: # 拿回了任务
logger.info(f"{task}")
self.state = RUNNING
# 调用执行器,开启子进程
# {'id': task.id, 'script': task.script, 'timeout': task.timeout}
script = task['script'] # 注意为了简单测试,没有做base64编码,后期加上
code, output = self.executor.run(script)
self._send(self.message.result(task['id'], code, output))
self.state = WAITING
Executor 类,agent/executor.py:
from subprocess import Popen, PIPE
from utils import getLogger
logger = getLogger(__name__, 'd:/exec.log')
class Executor:
def run(self, script, timeout=None):
logger.info('Agent start exec~~~~~~~~~~~~~~~~~~~~~~')
proc = Popen(script, shell=True, stdout=PIPE)
code = proc.wait() # 阻塞返回状态码
output = proc.stdout.read() # 标准输出
logger.info(code)
logger.info(output)
return code, output
agent/msg.py:
import netifaces
import ipaddress
import os
import uuid
import socket
class Message:
def __init__(self, myidpath):
# 从文件中读取主机的UUID
if os.path.exists(myidpath):
with open(myidpath) as f:
self.id = f.readline().strip()
else:
self.id = uuid.uuid4().hex
with open(myidpath, 'w') as f:
f.write(self.id)
def _get_addresses(self):
"""获取主机上所有接口可用的IPV4地址"""
addresses = []
for iface in netifaces.interfaces():
if 2 in netifaces.ifaddresses(if_name=iface).keys():
for ip in netifaces.ifaddresses(if_name=iface)[2]:
ip = ipaddress.ip_address(ip['addr'])
if ip.version != 4:
continue
if ip.is_link_local:
continue
if ip.is_loopback:
continue
if ip.is_multicast:
continue
if ip.is_reserved:
continue
addresses.append(str(ip))
return addresses
def reg(self):
"""生成注册消息"""
return {
"type": "reg",
"payload": {
"id": self.id,
"hostname": socket.gethostname(),
"ip": self._get_addresses()
}
}
def heartbeat(self):
"""生成心跳消息"""
return {
"type": "heartbeat",
"payload": {
"id": self.id,
"hostname": socket.gethostname(),
"ip": self._get_addresses()
}
}
def result(self, task_id, code, output):
"""返回执行结果"""
return {
'type': 'result',
'payload': {
'id': task_id,
'agent_id': self.id,
'code': code,
'output': output
}
}
5.6.8、master 接收 result 消息
master/cm.py:
class ConnectionManager:
def __init__(self):
self.store = Storage()
def handler(self, msg):
logger.info(type(msg))
try:
if msg['type'] in {"heartbeat", "reg"}:
payload = msg["payload"]
info = {"hostname": payload["hostname"], "ip": payload["ip"]}
self.store.reg_hb(payload['id'], info)
logger.info("{}".format(self.store.agents))
return "ack {}".format(msg)
elif msg['type'] == 'result': # 处理result消息
payload = msg['payload']
agent_id = payload['agent_id']
task_id = payload['id']
state = SUCCEED if payload['code'] == 0 else FAILED
output = payload['output']
task = self.store.get_task_by_agentid(task_id)
t = task.targets[agent_id]
t.state = state
t.output = output
return 'ack result'
except Exception as e:
logger.error("".format(e))
return "bad Request."
sendmsg = handler # zerorpc 接口
master/storage.py:
import datetime
from .task import Task
from .state import *
class Storage:
def __init__(self):
self.agents = {}
self.tasks = {}
def reg_hb(self, agent_id, info):
self.agents[agent_id] = {
'heartbeat': datetime.datetime.now(),
'info': info,
'busy': self.agents.get(agent_id, {}).get('busy', False)
}
def add_task(self, task: dict): # 从WEB Server来
t = Task(**task)
self.tasks[t.id] = t
return t.id
def iter_tasks(self, states=None):
if states is None:
states = {WAITING, RUNNING}
yield from (task for task in self.tasks.values() if task.state in states)
def get_task_by_agentid(self, agent_id, state=WAITING):
for task in self.iter_tasks():
if agent_id in task.targets.keys():
t = task.targets.get(agent_id)
if t.get('state') == state:
return task, t # 为节点找到一个任务就返回
def get_task_by_id(self, task_id) -> Task:
return self.tasks.get(task_id)
5.7、前后端交互开发
5.7.1、提交任务
用户通过 WEB(HTTP) 提交新的任务,任务 json 信息有:
-
任务脚本 script,base64 编码。
-
超时时间 timeout。
-
并行度 parallel。
-
失败率 fail_rate。
-
失败次数 failcount。
-
targets 是跑任务的 Agent 的 agent_id 列表,可以让用户看到一个列表,勾选。
1-5 都是文本框填写信息即可;6 即 targets 列表需要从 Master 端获取。
5.7.2、targets 列表
master/storage.py:
class Storage:
def get_agents(self): # 返回所有agent_id
return list(self.agents.keys())
master/cm.py:
class ConnectionManager:
def get_agents(self): # 返回所有已经注册的agent_id
return self.store.get_agents()
项目根目录下构建一个 appwebserver.py:
from aiohttp import web, log
import zerorpc
client = zerorpc.Client()
client.connect('tcp://127.0.0.1:9000')
async def targetshandler(request: web.Request):
txt = client.get_agents()
return web.json_response(txt)
app = web.Application()
app.router.add_get('/task/targets', targetshandler)
web.run_app(app, host='0.0.0.0', port=9900)
5.7.3、提交任务
客户端 json 如下:
{
"script": "echo hello",
"timeout": 30,
"targets": []
}
appwebserver.py 增加路由和 handler:
from aiohttp import web, log
import zerorpc
client = zerorpc.Client()
client.connect('tcp://127.0.0.1:9000')
async def targetshandler(request: web.Request):
txt = client.get_agents()
return web.json_response(txt)
async def taskhandler(request: web.Request):
j = await request.json()
txt = client.add_task(j) # j为字典
return web.Response(text=txt, status=201)
app = web.Application()
app.router.add_get('/task/targets', targetshandler)
app.router.add_post('/task', taskhandler)
web.run_app(app, host='0.0.0.0', port=9900)
master/cm.py:
class ConnectionManager:
def add_task(self, task): # 添加任务的接口
task['task_id'] = uuid.uuid4().hex # 增加任务id
return self.store.add_task(task) # 成功则返回task_id
联调测试:
服务端先启动 appwebserver, appserver, app;
客户端通过 appwebserver 获取 agents 列表,如下:
拿到 agents 列表后,向列表中的 agents 发布任务: