5、Python3 任务调度系统

作者: Brinnatt 分类: python 道 发布时间: 2023-09-24 23:28

这个任务调度系统是从运维自动化的角度来设计,提高运维效率。运维管理大致分以下几个阶段:

  1. 人工阶段

    人工盯着服务器,出了问题,跑到机器前,翻日志,查状态,手动操作。

  2. 脚本阶段

    开始写一些自动化脚本,启动计划任务,自动启动服务,监控服务等。

  3. 工具阶段

    脚本功能太弱,开发了大量工具,某种工具解决某个特定领域的问题,常用的有 ansible、puppet 等。

  4. 平台阶段

    将工具整合,自主开发,实现标准化,实现自动化流程控制。

  5. AI 智能化

    结合大数据和人工智能平台自动计算、分析数据,智能告警,智能解决复杂问题。

5.1、调度设计

类似于 ansible 功能,更加简单,满足企业需求。

  1. 分发任务

    分发脚本到目标节点上去执行。

  2. 控制

    控制并发,控制多少个节点同时执行。

    对错误做出响应。由用户设定,最多允许失败的比例或者数量,当超过范围时,需要终止任务执行。

    可以终止正在执行的任务。

  3. 能跨机房部署

  4. 能对作业做版本控制,这个是辅助的需求,可以以后实现。

本项目的出发点,要求会使用 shel 脚本就可以了,而 ansible、salt 等需要学习特定的内部语言。

如果觉得 ansible 这样的工具不能满足需求,二次开发难度较高,其代码量不小。本身它们开发接口不完善,而且熟悉它的架构也比较难,就算开发出来维护也难。

从这些项目上二次开发,等于拉一个分支,如果主分支有了新的特性,想合并也很困难。

自己开发,量身定做,满足自己需求。代码规模可控,便于他人接手维护。

自己开发就是造轮子,造轮子不是不好,不要一上来就设计打造一个超级牛的轮子,结果能力不足,项目失败。

先构建一个适合的、能力所及、满足需求的轮子,后面再逐步完善。一般来说,越是自动化运维程度越高的公司,自己写的系统越多,因为满足他们需要的工具少。

python3_mscheduler

5.2、分发任务设计

分为有 Agent、无 Agent。

  • 有 Agent,被控节点需要安装或运行特殊的软件,和服务器端通信,服务器端把脚本、命令传给 Agent 端,由 Agent 来控制执行。

  • 无 Agent,被控节点不需要安装或运行特殊的软件,例如通过 ssh。这其实也是有 Agent,不过不是自己写的程序。

    • 通用、简单、易实现。但管理不善,容易出安全问题。

    • 并行效率不高。有 Agent 并行执行任务,可以不和管理服务器通信,所以并发很高。ssh 执行要和 master 之间通信,耗费更多时间。

    • ssh 连接是有状态的。任务执行的时候,master 不能挂了,否则任务执行失败。

5.3、执行脚本

Python3 建议使用标准库 subprocess 模块,启动一个子进程。

from subprocess import Popen

p = Popen('echo hello', shell=True)
print(p)
class Popen:
    def __init__(self, args, bufsize=-1, executable=None,
                 stdin=None, stdout=None, stderr=None,
                 preexec_fn=None, close_fds=True,
                 shell=False, cwd=None, env=None, universal_newlines=None,
                 startupinfo=None, creationflags=0,
                 restore_signals=True, start_new_session=False,
                 pass_fds=(), *, user=None, group=None, extra_groups=None,
                 encoding=None, errors=None, text=None, umask=-1, pipesize=-1):
  • shell 为 True,则使用 shell 来执行 args,建议 args 是一个字符串。
  • args 可以是执行脚本的字符串,功能很强大。
    # 注意scripts目录在项目的根目录下
    p = Popen('python scripts/handler.py', shell=True)

5.4、构建项目

项目名称 mschedule。

构建一个模块 agent,模块下建一个 executor.py。

import subprocess

class Executor:
    def __init__(self, script):
        self.script = script

    def run(self):
        p = subprocess.Popen(self.script, shell=True)
        p.wait()

项目根目录下,建立一个 app.py。

from agent.executor import Executor

if __name__ == '__main__':
    executor = Executor('echo "hello brinnatt"')
    executor.run()

运行成功,看来可以成功运行脚本。

5.4.1、agent 设计

用户和 Master Server 通信,提交任务。

Master 按照用户要求将任务分发到指定的节点上,这些节点上需要有一个 Agent 和 Master 通信,接收 Master 发布的任务,并执行这些任务。

设计 Agent 时,应当注意,越简单越好,越简单 Bug 就越少,越稳定。

从本质上来说,Master、Agent 设计是典型的 CS 编程模式。

Master 作为 CS 中的 Server,Agent 作为 CS 中的 client。

  1. 注册信息

    Agent 启动后,需要主动联系 Server,注册自己的信息。信息包括:

    • 我是谁,hostname、UUID。UUID 保证唯一,因为主机名有可能重复。

    • 来自哪里,IP 地址是多少。需要 Agent 主动在信息中告诉 Master。其他信息,这个根据情况而定。

  2. 心跳信息

    Agent 定时向 Master 发送心跳包,包含 UUID 这个唯一标识,附带 hostname 和 ip。

    hostname 和 ip 都可能变动,但是这个 Agent 不变,UUID 也就不变。

    其他信息也可以附加,例如增加一个 flag,表示 Agent 上是否有任务在跑。

  3. 任务消息

    Master 分派任务给 Agent,发送任务描述信息到 Agent。

    注意脚本字符串使用 Base64 编码。

  4. 任务结果消息

    当 Agent 执行完任务,返回给 Master 该任务的状态码和输出结果。

以上 Master、Agent 之间需要传送消息,消息采用 json 格式。

5.4.2、消息设计

注册消息:

{
    "type": "register",
    "payload": {
        "id": "uuid",
        "hostname": "xxx",
        "ip": []
    }
}

心跳消息:

{
    "type": "heartbeat",
    "payload": {
        "id": "uuid",
        "hostname": "xxx",
        "ip": []
    }
}

任务消息:

{
    "type": "task",
    "payload": {
        "id": "task-uuid",
        "script": "base64encode",
        "timeout": 0,
        "parallel": 1,
        "fail_rate": 0,
        "fail_count": -1
    }
}

parallel:并行,表示同时执行的任务。

fail_rate:失败率,0 表示不允许失败。

fail_count:失败数, -1 不关心失败的数量和失败率。

执行结果消息:

{
    "type": "result",
    "payload": {
        "id": "task-uuid",
        "agent_id": "agent-uuid",
        "code":0,
        "output": "base64encode"
    }
}

id:任务 id。

agent_id:Agent 是谁。

code:返回的状态码,0 正常,非零错误,和 linux 的命令返回值一样。

output:输出的结果。字符串,Base64 编码后返回。

5.5、代码实现

5.5.1、配置

agent.config 模块,配置信息放这里。

5.5.2、日志

项目根目录下 utils.py:

import logging

def getLogger(mod_name: str, filepath: str):
    logger = logging.getLogger(mod_name)
    logger.setLevel(logging.INFO)  # 单独设置
    logger.propagate = False
    handler = logging.FileHandler(filepath)
    handler.setLevel(logging.INFO)
    formatter = logging.Formatter(fmt="%(asctime)s [%(name)s %(funcName)s] %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

5.5.3、通信模块

原生的 Socket 编程太过底层,少使用。任何一门语言都要避免直接使用 socket 库开发,太过底层,难写难维护。

这次使用一个非常轻巧的、跨语言的 RPC 通信模块 zerorpc。它基于 ZeroMQ 和 MessagePack。

官网:https://www.zerorpc.io/

$ pip install zerorpc 

测试一下,Server 代码如下:

import zerorpc

class MyRPC:
    def igetyou(self, text):  # RPC对外的接口,其中处理客户端传来的数据
        return f"send back {text}"

s = zerorpc.Server(MyRPC())
s.bind("tcp://0.0.0.0:9000")  # 绑定
s.run()  # 持久运行监听

测试一下,Client 代码如下:

import zerorpc
import threading

c = zerorpc.Client()
c.connect('tcp://127.0.0.1:9000')

i = 1
while not threading.Event().wait(3):
    print(c.igetyou('hello server'), i)
    i += 1

在 agent 模块下,创建 cm 模块,准备使用 zerorpc 实现网络连接。

注意:不要把 zerorpc 的方法随便的放到线程中,会抛异常。

5.5.4、注册消息实现

uuid

使用 uuid.uuid4().hex 获取一个 uuid。一个节点起始运行的时候是没有 uuid 的,一旦运行会生成一个 uuid,并持久化到一个文件中,下次运行先找这个文件,如果文件中有 uuid,就直接读取,没有 uuid,就重新生成并写入到该文件中。

hostname

windows 和 Linux 取主机名方式不一样。

可以在所有平台使用 socket.gethostname() 获取主机名。

ip 列表

$ pip install netifaces2

netifaces.interfaces() 返回接口列表。

netifaces.ifaddresses(interface) 取指定接口上的 IP 地址。

import netifaces

print('-->', netifaces.interfaces())

i = 1
for iface in netifaces.interfaces():
    print(i, '-->', netifaces.ifaddresses(if_name=iface))
    i += 1

返回信息如下:

--> ['VMware Virtual Ethernet Adapter for VMnet1', 'VMware Virtual Ethernet Adapter for VMnet8', 'Realtek USB GbE Family Controller', 'Intel(R) Dual Band Wireless-AC 8265', 'Microsoft Wi-Fi Direct Virtual Adapter', 'Microsoft Wi-Fi Direct Virtual Adapter #2']
1 --> {2: [{'mask': '255.255.255.0', 'addr': '192.168.187.1'}], -1000: [{'addr': '0:50:56:C0:0:1:0:0'}]}
2 --> {2: [{'addr': '192.168.136.1', 'mask': '255.255.255.0'}], -1000: [{'addr': '0:50:56:C0:0:8:0:0'}]}
3 --> {2: [{'addr': '0.0.0.0', 'mask': '0.0.0.0'}], -1000: [{'addr': '0:E0:4C:68:9:D0:0:0'}]}
4 --> {2: [{'mask': '255.255.255.0', 'addr': '10.47.76.58'}], -1000: [{'addr': '7C:76:35:5E:EC:46:0:0'}]}
5 --> {2: [{'mask': '0.0.0.0', 'addr': '0.0.0.0'}], -1000: [{'addr': '7C:76:35:5E:EC:47:0:0'}]}
6 --> {2: [{'mask': '0.0.0.0', 'addr': '0.0.0.0'}], -1000: [{'addr': '7E:76:35:5E:EC:46:0:0'}]}

这是一个字典,key 为 2 就是 ipv4 地址。

每一个接口返回的 ipv4 地址是一个列表,也就是说可以有多个,ipv4 地址描述是在 addr 上。

import netifaces

print('-->', netifaces.interfaces())

for iface in netifaces.interfaces():
    if 2 in netifaces.ifaddresses(if_name=iface).keys():
        for ip in netifaces.ifaddresses(if_name=iface)[2]:
            print(ip['addr'])

# 运行结果
--> ['VMware Virtual Ethernet Adapter for VMnet1', 'VMware Virtual Ethernet Adapter for VMnet8', 'Realtek USB GbE Family Controller', 'Intel(R) Dual Band Wireless-AC 8265', 'Microsoft Wi-Fi Direct Virtual Adapter', 'Microsoft Wi-Fi Direct Virtual Adapter #2']
192.168.187.1
192.168.136.1
0.0.0.0
10.47.76.58
0.0.0.0
0.0.0.0

如何拿到自己想要的 ip,例如排除掉回环、多播等地址呢 ?

ipaddress 库:

import ipaddress

ips = ['127.0.0.1', '192.168.0.1', '169.254.122.2', '0.0.0.0', '239.268.0.255', '224.0.0.1', '8.8.8.8']

for ip in ips:
    print(ip)
    ip = ipaddress.ip_address(ip)
    print(f"link_local {ip.is_link_local}")
    print(f"回环 {ip.is_loopback}")
    print(f"多播 {ip.is_multicast}")
    print(f"公网 {ip.is_global}")
    print(f"私有 {ip.is_private}")
    print(f"保留 {ip.is_reserved}")
    print(f"版本 {ip.version}")
    print('-' * 30)

在 agent 模块下新建 msg.py:

import netifaces
import ipaddress
import os
import uuid
import socket

class Message:
    def __init__(self, myidpath):
        # 从文件中读取主机的UUID
        if os.path.exists(myidpath):
            with open(myidpath) as f:
                self.id = f.readline().strip()
        else:
            self.id = uuid.uuid4().hex
            with open(myidpath, 'w') as f:
                f.write(self.id)

    def _get_address(self):
        """获取主机上所有接口可用的IPV4地址"""
        addresses = []
        for iface in netifaces.interfaces():
            if 2 in netifaces.ifaddresses(if_name=iface).keys():
                for ip in netifaces.ifaddresses(if_name=iface)[2]:
                    ip = ipaddress.ip_address(ip['addr'])
                    if ip.version != 4:
                        continue
                    if ip.is_link_local:
                        continue
                    if ip.is_loopback:
                        continue
                    if ip.is_multicast:
                        continue
                    if ip.is_reserved:
                        continue
                    addresses.append(str(ip))
        return addresses

    def reg(self):
        """生成注册消息"""
        return {
            "type": "reg",
            "payload": {
                "id": self.id,
                "hostname": socket.gethostname(),
                "ip": self._get_address()
            }
        }

完成心跳信息的方法,它应该和 reg 是相似的。

    def heartbeat(self):
        """生成心跳消息"""
        return {
            "type": "heartbeat",
            "payload": {
                "id": self.id,
                "hostname": socket.gethostname(),
                "ip": self._get_address()
            }
        }

问题?

心跳信息是非常频繁的发送,几秒一次的。

每一次都需要查询一下 IP 列表,把 IP 都发过去一次,浪费计算资源。

后期可以考虑,单独跑一个线程(进程)处理 IP 地址和主机名等反复使用的信息。

5.5.5、消息发送

完成 agent.cm 模块代码。

一旦 Agent 启动,就会尝试和 Master 建立 TCP 连接发送数据。假设 Master 有 2 个接口方法 reg、heartbeart 可供调用。

agent.config 模块:

MASTER_URL = "tcp://127.0.0.1:9000"
MYID_PATH = "d:/myid"

agent/cm.py 模块:

import zerorpc
import threading
from .msg import Message

class ConnectionManager:
    def __init__(self, master_url, message: Message):
        self.master_url = master_url
        self.message = message
        self.client = zerorpc.Client()
        self.event = threading.Event()

    def start(self, interval):
        self.client.connect(self.master_url)
        # 注册
        self.client.reg(self.message.reg())
        # 心跳
        while not self.event.wait(interval):
            self.client.heartbeat(self.message.heartbeat())

    def shutdown(self):
        self.event.set()
        self.client.close()

    def join(self):  # 让主线程阻塞
        self.event.wait()

特别注意这个 join 方法,让调用的主线程阻塞。

5.5.6、Agent 类

在 agent 模块的 __init__.py 中构建 Agent 类。

其作用是:管理连接,开启连接,负责重连,关闭连接。

注:一个包中有很多模块,但往往把包 __init__.py 作为入口很明智,逻辑清晰。

from .cm import ConnectionManager
from .msg import Message
from .config import MYID_PATH, MASTER_URL
import threading

class Agent:
    def __init__(self):
        self.msg = Message(MYID_PATH)
        self.cm = ConnectionManager(MASTER_URL, self.msg)
        self.event = threading.Event()

    def start(self, interval):
        # 注意:这里的self.event别当成ConnectionManager中的self.event
        while not self.event.is_set(): # 重连
            try:
                self.cm.start(interval)
            except Exception as e:
                self.cm.shutdown()
            self.event.wait(3)

    def shutdown(self):
        self.event.set()
        self.cm.shutdown()

app.py 中测试。

from agent import Agent

if __name__ == '__main__':
    agent = Agent()
    try:
        agent.start(3)
    except KeyboardInterrupt:
        agent.shutdown()

运行后发现未连接到服务器,超时抛异常。

5.5.7、ConnectionManager 重连机制实现

本来考虑使用多个接口实现 reg、heartbeat 等方法,从本质上来说,它们都是一个方法。

因此修改为使用同一个接口 sendmsg 方法和 Master 通信。

在 connect、sendmsg 方法调用的时候都有可能出现异常,一旦异常,就 set event。Agent 实例中知道不阻塞了,就可以重连了。直到连接上为止,或者直到 shutdown 为止。

agent/cm.py 修改:

import zerorpc
import threading
from .msg import Message
from utils import getLogger

logger = getLogger(__name__, 'd:/cm.log')

class ConnectionManager:
    def __init__(self, master_url, message: Message):
        self.master_url = master_url
        self.message = message
        self.client = zerorpc.Client()
        self.event = threading.Event()

    def start(self, interval=5):
        try:
            self.event.clear() # 重置event

            # 连接
            self.client.connect(self.master_url)

            # 注册
            self._send(self.message.reg())

            # 心跳循环
            while not self.event.wait(interval):
                self._send(self.message.heartbeat())
        except Exception as e:
            logger.error('Failed to connect to master. Error:{}'.format(e))
            raise e

    def shutdown(self):
        self.event.set()
        self.client.close()

    def _send(self, msg):
        ack = self.client.sendmsg(msg)
        logger.info(ack)

    def join(self):  # 让主线程阻塞
        self.event.wait()

好,agent 端先告一段落,开始 Master 端开发,等两边调试通再做进一步开发。

思考:上面的 join 方法还有用吗?需不需要去除?理由是什么?

没啥用,心跳循环相当于无限循环,主线程本来就是阻塞的。

5.6、Master 设计

5.6.1、基本功能

  • TCP Server:绑定端口,启动监听,等待 Agent 连接。

  • 信息存储:存储 Agent 列表;存储用户提交的 Task 列表。用户通过 WEB 提交的任务信息都存储下来。

  • 接收注册:将注册信息写入 Agent 列表。

  • 接收心跳信息:接收 Agent 发来的心跳信息。

  • 派发任务:将用户提交的任务分配到 Agent 端。

5.6.2、代码实现

构建 master 模块,master.config 模块。

MASTER_URL = "tcp://0.0.0.0:9000"

master.cm 模块,负责 TCP 连接。

from utils import getLogger

logger = getLogger(__name__, 'd:/cm.log')

class ConnectionManager:
    @staticmethod
    def handler(msg):
        logger.info(type(msg))
        return f"{msg}"

    sendmsg = handler

定义 Master 类,负责启动 TCP Server。

在 master 模块的 __init__.py 中构建 Master 类。

import zerorpc
from .cm import ConnectionManager
from .config import MASTER_URL

class Master:
    def __init__(self):
        self.tcpserver = zerorpc.Server(ConnectionManager())
        self.tcpserver.bind(MASTER_URL)

    def start(self):
        self.tcpserver.run() # 循环阻塞的

    def shutdown(self):
        self.tcpserver.close()

项目根目录下构建一个 appserver.py 用于测试:

from master import Master

if __name__ == '__main__':
    master = Master()
    try:
        master.start()
    except KeyboardInterrupt:
        master.shutdown()

查看日志,不断有数据写入,测试成功。

2023-09-01 16:05:26,976 [master.cm handler] <class 'dict'>
2023-09-01 16:05:26,978 [agent.cm _send] {'type': 'reg', 'payload': {'id': '5b5bef507bcb4c11a6888d7a4c7f064e', 'hostname': 'DESKTOP-MQ4C8ML', 'ip': ['192.168.187.1', '192.168.136.1', '0.0.0.0', '10.47.76.58', '0.0.0.0', '0.0.0.0']}}
2023-09-01 16:05:30,007 [master.cm handler] <class 'dict'>
2023-09-01 16:05:30,008 [agent.cm _send] {'type': 'heartbeat', 'payload': {'id': '5b5bef507bcb4c11a6888d7a4c7f064e', 'hostname': 'DESKTOP-MQ4C8ML', 'ip': ['192.168.187.1', '192.168.136.1', '0.0.0.0', '10.47.76.58', '0.0.0.0', '0.0.0.0']}}
2023-09-01 16:05:33,038 [master.cm handler] <class 'dict'>
2023-09-01 16:05:33,039 [agent.cm _send] {'type': 'heartbeat', 'payload': {'id': '5b5bef507bcb4c11a6888d7a4c7f064e', 'hostname': 'DESKTOP-MQ4C8ML', 'ip': ['192.168.187.1', '192.168.136.1', '0.0.0.0', '10.47.76.58', '0.0.0.0', '0.0.0.0']}}
2023-09-01 16:05:36,080 [master.cm handler] <class 'dict'>
2023-09-01 16:05:36,082 [agent.cm _send] {'type': 'heartbeat', 'payload': {'id': '5b5bef507bcb4c11a6888d7a4c7f064e', 'hostname': 'DESKTOP-MQ4C8ML', 'ip': ['192.168.187.1', '192.168.136.1', '0.0.0.0', '10.47.76.58', '0.0.0.0', '0.0.0.0']}}
2023-09-01 16:05:39,120 [master.cm handler] <class 'dict'>
2023-09-01 16:05:39,121 [agent.cm _send] {'type': 'heartbeat', 'payload': {'id': '5b5bef507bcb4c11a6888d7a4c7f064e', 'hostname': 'DESKTOP-MQ4C8ML', 'ip': ['192.168.187.1', '192.168.136.1', '0.0.0.0', '10.47.76.58', '0.0.0.0', '0.0.0.0']}}

经过观察发现,目前注册和心跳除了类型不同,其它都一样。可以这样认为,第一次心跳成功,就是注册。

5.6.3、Master 的数据设计

Master 端核心需要存储 2 种数据:Agent 端数据、用户提交的任务 Task。

构造一个数据结构,存储这些信息。

{
    "agents": {
        "agent_id": {
            "heartbeat": "timestamp",
            "busy": false,
            "info": {
                "hostname": "",
                "ip": []
            }
        }
    }
}
{
    "tasks": {
        "task_id": {
            "script": "base64encode",
            "targets": {
                "agent_id": {
                    "state": "WAITING",
                    "output": ""
                }
            },
            "state": "WAITING"
        }
    }
}

上面 2 个数据结构:

  • agents 里面记录了所有注册的 agent

    • agent_id,字典的 key,每一个 agent 有不同的 uuid,所以这个字典就是 item=uuid:{}

    • connection 给 agent 记录 master 和 agent 建立的连接,备用?TODO?还要不要了?

    • heartbeat 由于设计中并没有让 agent 端发送心跳时间,所以就在 master 端记录收到的时间。

    • busy 如果 agent 上有任务在跑。就会置这个值为 True。

    • info 记录 agent 上发过来的 hostname 和 ip 列表。

  • tasks 记录所有任务及其 target (agent) 的状态。

    • task_id,字典的 key 对应一个个 task,item 也是 taskid:{} 的结构。

    • task 任务,task.json 的 payload 信息。

    • targets 目标,用来执行 agent 的节点,记录 agent 上的 state 和输出 output。

      • state 状态,单个 agent 上的执行状态。
    • state 这是一个 task 的状态,整个任务的状态,比如统计达到了 agent 失败上限了,这个 task 的 state 就置为失败。

状态常量:WAITINGRUNNINGSUCCEEDFAILED

构建 master/state.py

WAITING = 'WAITING'
RUNNING = 'RUNNING'
SUCCEED = 'SUCCEED'
FAILED = 'FAILED'

5.6.4、agent 信息存储

构建 Storage 类。

master/storage.py:

import datetime

class Storage:
    def __init__(self):
        self.agents = {}
        self.tasks = {}

    def reg_hb(self, agent_id, info):
        self.agents[agent_id] = {
            'heartbeat': datetime.datetime.now(),
            'info':info,
            'busy':self.agents.get(agent_id, {}).get('busy', False)
        }
        # busy读不到置False,读的到不变。后期实现。

为什么 heartbeat 不用时间戳?

可以用,但是这里时间取的是 Master 的时间,而且不用 Master、Agent 间来回传输,是 Master 内部数据结构 Storage 的数据,所以没有用时间戳。

如果使用时间类型,且使用 json 序列化,就要注意数据类型了。zerorpc 内部使用了 messagepack,支持日期类型。

master/cm.py:

from utils import getLogger
from .storage import Storage

logger = getLogger(__name__, 'd:/cm.log')

class ConnectionManager:
    def __init__(self):
        self.store = Storage()

    def handler(self, msg):
        logger.info(type(msg))
        try:
            if msg['type'] in {"heartbeat", "reg"}:
                payload = msg["payload"]
                info = {"hostname": payload["hostname"], "ip": payload["ip"]}
                self.store.reg_hb(payload['id'], info)

                logger.info("{}".format(self.store.agents))
                return "ack {}".format(msg)
        except Exception as e:
            logger.error("{}".format(e))
            return "bad Request."

    sendmsg = handler  # zerorpc 接口

5.6.5、task 任务处理

用户通过WEB(HTTP)提交新的任务,任务json信息有:
1、任务脚本script,base64编码
2、超时时间timeout
3、并行度parallel
4、失败率failrate
5、失败次数fail count
6、targets 是跑任务的Agent的agent_id列表,这个目前也是在用户端选择好。比如用户需要在主机名叫做WEBServer-xxx的几台主机上运行脚本。为了用户方便,可以类似ansible的分组。

在Master端收到任务信息后,需要添加2个信息:
    task_id是Master端新建任务时生成的uuid。
    state 默认WAITING。

在WEB Server中最后将用户端发来的数据组成下面的字典
{
    'task_id': t.id,
    'script': t.script,
    'timeout': t.timeout,
    'parallel': t.parallel,
    'fail_rate': t.fail_rate,
    'fail_count': t.fail_count,
    'state': t.state,
    'targets': t.targets
}

将任务数据封装成任务对象,构建 Task 类。

master/task.py:

import uuid
from .state import *

class Task:
    def __init__(self, task_id, script, targets, timeout=0,
                 parallel=1, fail_rate=0, fail_count=-1):
        self.id = task_id
        self.script = script
        self.timeout = timeout
        self.parallel = parallel
        self.fail_rate = fail_rate
        self.fail_count = fail_count
        self.state = WAITING
        self.targets = {
            agent_id: {'state': WAITING, 'output': ""} for agent_id in targets
        }
        self.target_count = len(self.targets)

WEB Server 调用 Storage 中方法,将任务数据存入。

import datetime
from .task import Task

class Storage:
    def __init__(self):
        self.agents = {}
        self.tasks = {}

    def reg_hb(self, agent_id, info):
        self.agents[agent_id] = {
            'heartbeat': datetime.datetime.now(),
            'info': info,
            'busy': self.agents.get(agent_id, {}).get('busy', False)
        }

    def add_task(self, task: dict):  # 从WEB Server来
        t = Task(**task)
        self.tasks[t.id] = t
        return t.id

5.6.6、任务分派

分派方式:

任务在 Storage 中存储,一旦有了任务,需要将任务分派到指定节点执行,交给这些节点上的 Agent。

不过,目前使用 zerorpc,Master 是被动的接收 Agent 的数据并响应的。

所以,可以考虑一种 Agent 主动拉取机制,就是提供一个接口,让 Agent 访问。如果 Agent 处于空闲,就主动拉取任务,有任务就领走。

当 Agent 少的时候,Master 推送任务到 Agent 端,或者 Agent 端主动拉取任务都是可以的。但是如果考虑 Agent 多的时候,或许 Agent 拉模式是更好的选择。

本次采用 Agent 拉取模式实现,所以 Master 就不需要设计调度器了。

master/storage.py:

import datetime
from .task import Task
from .state import *

class Storage:
    def __init__(self):
        self.agents = {}
        self.tasks = {}

    def reg_hb(self, agent_id, info):
        self.agents[agent_id] = {
            'heartbeat': datetime.datetime.now(),
            'info': info,
            'busy': self.agents.get(agent_id, {}).get('busy', False)
        }

    def add_task(self, task: dict):  # 从WEB Server来
        t = Task(**task)
        self.tasks[t.id] = t
        return t.id

    # 任务没有人领是WAITING,有一个人领算RUNNING
    def iter_tasks(self, states={WAITING, RUNNING}):
        yield from (task for task in self.tasks.values() if task.state in states)

    def get_task_by_agentid(self, agent_id, state=WAITING):
        for task in self.iter_tasks():
            if agent_id in task.targets.keys():
                t = task.targets.get(agent_id)
                if t.get('state') == state:
                    return task, t  # 为节点找到一个任务就返回

master/cm.py:

from utils import getLogger
from .storage import Storage
from .state import *

logger = getLogger(__name__, 'd:/cm.log')

class ConnectionManager:
    def __init__(self):
        self.store = Storage()

    def handler(self, msg):
        logger.info(type(msg))
        try:
            if msg['type'] in {"heartbeat", "reg"}:
                payload = msg["payload"]
                info = {"hostname": payload["hostname"], "ip": payload["ip"]}
                self.store.reg_hb(payload['id'], info)

                logger.info("{}".format(self.store.agents))
                return "ack {}".format(msg)
        except Exception as e:
            logger.error("".format(e))
            return "bad Request."

    sendmsg = handler  # zerorpc 接口

    def take_task(self, agent_id): # 空闲Agent主动拉取任务
        # 有任务则返回任务信息,否则返回 None
        # {'id': task.id, 'script': task.script, 'timeout': task.timeout}
        # 遍历状态是RUNNING或WAITING的任务,其中targets中agent_id是自己的且状态是WAITING的
        info = self.store.get_task_by_agentid(agent_id)
        # 找到了,就将WAITING任务转换为RUNNING,将agent自己的状态置为RUNNING
        if info:
            task, target = info
            task.state = RUNNING
            target['state'] = RUNNING
            return {
                'id': task.id,
                'script': task.script,
                'timeout': task.timeout
            }

5.6.7、Agent 领取任务

Agent 领取任务,就是 client 去调用 take_task 接口。

执行流程:

放在心跳循环中,不过要增加状态,这个状态直接使用 master 中定义的状态文件。

如果在循环中,Agent 的当前状态是 WAITING,就可以去 Master 获取任务。

如果没有任务,就隔一段时间尝试再次取任务。如果获取到任务,就可以将状态置为 RUNNING 并开启新的进程执行脚本,直到脚本执行完,把状态码和输出结果封装成 result 消息返回给 Master。

agent/cm.py:

import zerorpc
import threading
from .msg import Message
from utils import getLogger
from .state import *
from .executor import Executor

logger = getLogger(__name__, 'd:/cm.log')

class ConnectionManager:
    def __init__(self, master_url, message: Message):
        self.master_url = master_url
        self.message = message  # 对象
        self.client = zerorpc.Client()
        self.event = threading.Event()
        self.state = WAITING
        self.executor = Executor()

    def start(self, interval=5):
        try:
            self.event.clear()  # 重置event

            # 连接
            self.client.connect(self.master_url)

            # 注册
            self._send(self.message.reg())

            # 心跳循环
            while not self.event.wait(interval):
                self._send(self.message.heartbeat())
                if self.state == WAITING:
                    self._get_task(self.message.id)

        except Exception as e:
            logger.error('Failed to connect to master. Error:{}'.format(e))
            raise e

    def shutdown(self):
        self.event.set()
        self.client.close()

    def _send(self, msg):
        try:
            ack = self.client.sendmsg(msg)
            logger.info(ack)
        except Exception as e:
            logger.error(f'Failed to connect to master. Error:{e}')
            self.event.set()

    def _get_task(self, agent_id):
        task = self.client.take_task(self.message.id)
        if task:  # 拿回了任务
            logger.info(f"{task}")
            self.state = RUNNING
            # 调用执行器,开启子进程
            # {'id': task.id, 'script': task.script, 'timeout': task.timeout}
            script = task['script']  # 注意为了简单测试,没有做base64编码,后期加上
            code, output = self.executor.run(script)

            self._send(self.message.result(task['id'], code, output))
            self.state = WAITING

Executor 类,agent/executor.py:

from subprocess import Popen, PIPE
from utils import getLogger

logger = getLogger(__name__, 'd:/exec.log')

class Executor:
    def run(self, script, timeout=None):
        logger.info('Agent start exec~~~~~~~~~~~~~~~~~~~~~~')
        proc = Popen(script, shell=True, stdout=PIPE)
        code = proc.wait()  # 阻塞返回状态码
        output = proc.stdout.read()  # 标准输出
        logger.info(code)
        logger.info(output)
        return code, output

agent/msg.py:

import netifaces
import ipaddress
import os
import uuid
import socket

class Message:
    def __init__(self, myidpath):
        # 从文件中读取主机的UUID
        if os.path.exists(myidpath):
            with open(myidpath) as f:
                self.id = f.readline().strip()
        else:
            self.id = uuid.uuid4().hex
            with open(myidpath, 'w') as f:
                f.write(self.id)

    def _get_addresses(self):
        """获取主机上所有接口可用的IPV4地址"""
        addresses = []

        for iface in netifaces.interfaces():
            if 2 in netifaces.ifaddresses(if_name=iface).keys():
                for ip in netifaces.ifaddresses(if_name=iface)[2]:
                    ip = ipaddress.ip_address(ip['addr'])
                    if ip.version != 4:
                        continue
                    if ip.is_link_local:
                        continue
                    if ip.is_loopback:
                        continue
                    if ip.is_multicast:
                        continue
                    if ip.is_reserved:
                        continue
                    addresses.append(str(ip))
        return addresses

    def reg(self):
        """生成注册消息"""
        return {
            "type": "reg",
            "payload": {
                "id": self.id,
                "hostname": socket.gethostname(),
                "ip": self._get_addresses()
            }
        }

    def heartbeat(self):
        """生成心跳消息"""
        return {
            "type": "heartbeat",
            "payload": {
                "id": self.id,
                "hostname": socket.gethostname(),
                "ip": self._get_addresses()
            }
        }

    def result(self, task_id, code, output):
        """返回执行结果"""
        return {
            'type': 'result',
            'payload': {
                'id': task_id,
                'agent_id': self.id,
                'code': code,
                'output': output
            }
        }

5.6.8、master 接收 result 消息

master/cm.py:

class ConnectionManager:
    def __init__(self):
        self.store = Storage()

    def handler(self, msg):
        logger.info(type(msg))
        try:
            if msg['type'] in {"heartbeat", "reg"}:
                payload = msg["payload"]
                info = {"hostname": payload["hostname"], "ip": payload["ip"]}
                self.store.reg_hb(payload['id'], info)

                logger.info("{}".format(self.store.agents))
                return "ack {}".format(msg)
            elif msg['type'] == 'result':  # 处理result消息
                payload = msg['payload']
                agent_id = payload['agent_id']
                task_id = payload['id']
                state = SUCCEED if payload['code'] == 0 else FAILED
                output = payload['output']

                task = self.store.get_task_by_agentid(task_id)
                t = task.targets[agent_id]
                t.state = state
                t.output = output
                return 'ack result'

        except Exception as e:
            logger.error("".format(e))
            return "bad Request."

    sendmsg = handler  # zerorpc 接口

master/storage.py:

import datetime
from .task import Task
from .state import *

class Storage:
    def __init__(self):
        self.agents = {}
        self.tasks = {}

    def reg_hb(self, agent_id, info):
        self.agents[agent_id] = {
            'heartbeat': datetime.datetime.now(),
            'info': info,
            'busy': self.agents.get(agent_id, {}).get('busy', False)
        }

    def add_task(self, task: dict):  # 从WEB Server来
        t = Task(**task)
        self.tasks[t.id] = t
        return t.id

    def iter_tasks(self, states=None):
        if states is None:
            states = {WAITING, RUNNING}
        yield from (task for task in self.tasks.values() if task.state in states)

    def get_task_by_agentid(self, agent_id, state=WAITING):
        for task in self.iter_tasks():
            if agent_id in task.targets.keys():
                t = task.targets.get(agent_id)
                if t.get('state') == state:
                    return task, t  # 为节点找到一个任务就返回

    def get_task_by_id(self, task_id) -> Task:
        return self.tasks.get(task_id)

5.7、前后端交互开发

5.7.1、提交任务

用户通过 WEB(HTTP) 提交新的任务,任务 json 信息有:

  1. 任务脚本 script,base64 编码。

  2. 超时时间 timeout。

  3. 并行度 parallel。

  4. 失败率 fail_rate。

  5. 失败次数 failcount。

  6. targets 是跑任务的 Agent 的 agent_id 列表,可以让用户看到一个列表,勾选。

1-5 都是文本框填写信息即可;6 即 targets 列表需要从 Master 端获取。

5.7.2、targets 列表

master/storage.py:

class Storage:
    def get_agents(self):  # 返回所有agent_id
        return list(self.agents.keys())

master/cm.py:

class ConnectionManager:
    def get_agents(self):  # 返回所有已经注册的agent_id
        return self.store.get_agents()

项目根目录下构建一个 appwebserver.py:

from aiohttp import web, log
import zerorpc

client = zerorpc.Client()
client.connect('tcp://127.0.0.1:9000')

async def targetshandler(request: web.Request):
    txt = client.get_agents()
    return web.json_response(txt)

app = web.Application()
app.router.add_get('/task/targets', targetshandler)

web.run_app(app, host='0.0.0.0', port=9900)

5.7.3、提交任务

客户端 json 如下:

{
    "script": "echo hello",
    "timeout": 30,
    "targets": []
}

appwebserver.py 增加路由和 handler:

from aiohttp import web, log
import zerorpc

client = zerorpc.Client()
client.connect('tcp://127.0.0.1:9000')

async def targetshandler(request: web.Request):
    txt = client.get_agents()
    return web.json_response(txt)

async def taskhandler(request: web.Request):
    j = await request.json()
    txt = client.add_task(j)  # j为字典

    return web.Response(text=txt, status=201)

app = web.Application()
app.router.add_get('/task/targets', targetshandler)
app.router.add_post('/task', taskhandler)

web.run_app(app, host='0.0.0.0', port=9900)

master/cm.py:

class ConnectionManager:    
    def add_task(self, task):  # 添加任务的接口
        task['task_id'] = uuid.uuid4().hex  # 增加任务id
        return self.store.add_task(task)  # 成功则返回task_id

联调测试:

服务端先启动 appwebserver, appserver, app;

客户端通过 appwebserver 获取 agents 列表,如下:

python3_mscheduler_guests

拿到 agents 列表后,向列表中的 agents 发布任务:

python3_mscheduler_results

标签云