1、Python3 网络编程

作者: Brinnatt 分类: python 道 发布时间: 2023-05-21 09:35

1.1、Socket 介绍

Socket 套接字:Python 中提供 socket.py 标准库,非常底层的接口库。

Socket 是一种通用的网络编程接口,和网络层次没有一一对应的关系。

协议族:AF 表示 Address Family,用于 socket() 第一个参数。

名称 含义
AF_INET IPV4
AF_INET6 IPV6
AF_UNIX Unix Domain Socket,windows 没有

Socket 类型:

名称 含义
SOCK_STREAM 面向连接的流套接字。默认值,TCP协议。
SOCK_DGRAM 无连接的数据报文套接字。UDP 协议。

1.2、TCP 编程

Socket 编程,需要两端,一般来说需要一个服务端、一个客户端,服务端称为 Server,客户端称为 Client。

1.2.1、TCP 服务端编程

服务器端编程步骤:

  • 创建 Socket 对象。

  • 绑定 IP 地址 Address 和端口 Port。bind() 方法。

    IPv4 地址为一个二元组('IP地址字符串', Port)。

  • 开始监听,将在指定的 IP 的端口上监听。listen() 方法。

  • 获取用于传送数据的 Socket 对象。

    socket.accept() -> (socket object, address info)

    accept 方法阻塞等待客户端建立连接,返回一个新的 Socket 对象和客户端地址的二元组。

    地址是远程客户端的地址,IPv4 中它是一个二元组(clientaddr, port)。

    • 接收数据

    recv(bufsize[, flags]) 使用缓冲区接收数据。

    • 发送数据

    send(bytes) 发送数据。

python3_net_tcpsocket

问题:两次绑定同一个监听端口会怎么样?

import socket

s = socket.socket()  # 创建socket对象
s.bind(('127.0.0.1', 9999))  # 一个二元组

s.listen()  # 开始监听

# 开启一个连接
s1, info = s.accept()  # 阻塞直到和客户端成功建立连接,返回一个socket对象和客户端地址
data = s1.recv(1024)  # 使用缓冲区获取数据
print(1, '-->', data, info)
s1.send(b'www.brinnatt.com ack')

# 开启另外一个连接
s2, _ = s.accept()
data = s2.recv(1024)
print(2, '-->', data)
s2.send(b'hello python')

# 关闭套接字
s.close()

上例 accept 和 recv 是阻塞的,主线程经常被阻塞住而不能工作。怎么办?

1.2.2、群聊程序(服务端)

1.2.2.1、需求分析

聊天工具是 CS 程序,C 是每一个客户端,S 是服务器端。

服务器应该具有的功能:

  • 启动服务,包括绑定地址和端口,监听

  • 建立连接,能和多个客户端建立连接

  • 接收不同用户的信息

  • 分发,将接收的某个用户的信息转发到已连接的所有客户端

  • 停止服务

  • 记录连接的客户端

1.2.2.2、代码实现

服务端应该对应一个类:

import socket

class ChatServer:
    def __init__(self, ip, port):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)

    def start(self):  # 启动监听
        pass

    def accept(self):  # 多人连接
        pass

    def recv(self):  # 接收客户端数据
        pass

    def stop(self):  # 停止服务
        pass

在此基础上,扩展完成:

import logging
import socket
import threading
import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while True:
            sock, client = self.sock.accept()  # 阻塞
            self.clients[client] = sock  # 添加到客户端字典
            # 准备接收数据,recv是阻塞的,开启新的线程
            threading.Thread(target=self.recv, args=(sock, client)).start()

    def recv(self, sock: socket.socket, client):  # 接收客户端数据
        while True:
            data = sock.recv(1024)  # 阻塞到数据到来
            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client,
                                                            data.decode())
            logging.info(msg)
            msg = msg.encode()
            for s in self.clients.values():
                s.send(msg)

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()

cs = ChatServer()
cs.start()

基本功能完成,但是有问题。使用 Event 改进。

import logging
import socket
import threading
import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端
        self.event = threading.Event()

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while not self.event.is_set():
            sock, client = self.sock.accept()  # 阻塞
            self.clients[client] = sock  # 添加到客户端字典
            # 准备接收数据,recv是阻塞的,开启新的线程
            threading.Thread(target=self.recv, args=(sock, client)).start()

    def recv(self, sock: socket.socket, client):  # 接收客户端数据
        while not self.event.is_set():
            data = sock.recv(1024)  # 阻塞到数据到来
            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client,
                                                            data.decode())
            logging.info(msg)
            msg = msg.encode()
            for s in self.clients.values():
                s.send(msg)

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()
        self.event.set()

cs = ChatServer()
cs.start()

while True:
    cmd = input(">>").strip()
    if cmd == 'quit':
        cs.stop()
        threading.Event().wait(3)
        break

这一版基本能用了,测试通过。但是还有要完善的地方。例如各种异常的判断,客户端断开连接后字典中要移除客户端数据等。

客户端主动断开带来的问题:服务端知道自己何时断开,如果客户端断开,服务器不知道。

所以,好的做法是,客户端断开发出特殊消息通知服务器端断开连接。但是,如果客户端主动断开,服务端主动发
送一个空消息,超时返回异常,捕获异常并清理连接。

即使为客户端提供了断开命令,也不能保证客户端会使用它断开连接。但是还是要增加这个退出功能。

增加客户端退出命令:

import logging
import socket
import threading
import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端
        self.event = threading.Event()

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while not self.event.is_set():
            sock, client = self.sock.accept()  # 阻塞
            self.clients[client] = sock  # 添加到客户端字典
            # 准备接收数据,recv是阻塞的,开启新的线程
            threading.Thread(target=self.recv, args=(sock, client)).start()

    def recv(self, sock: socket.socket, client):  # 接收客户端数据
        while not self.event.is_set():
            data = sock.recv(1024)  # 阻塞到数据到来
            msg = data.decode().strip()
            # 客户端退出命令
            if msg == 'quit':
                self.clients.pop(client)
                sock.close()
                logging.info('{} quits'.format(client))
                break
            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client,
                                                            data.decode())
            logging.info(msg)
            msg = msg.encode()
            for s in self.clients.values():
                s.send(msg)

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()
        self.event.set()

cs = ChatServer()
cs.start()

while True:
    cmd = input(">>").strip()
    if cmd == 'quit':
        cs.stop()
        threading.Event().wait(3)
        break
    logging.info(threading.enumerate())  # 用来观察断开后线程的变化

程序还有瑕疵,但是业务功能基本完成了。

1.2.3、socket 常用方法

名称 含义
socket.recv(bufsize[, flags]) 获取数据。默认是阻塞的方式。
socket.recvfrom(bufsize[, flags]) 获取数据,返回一个二元组(bytes, address)。
socket.recv_into(buffer[, nbytes[, flags]]) 获取到nbytes的数据后,存储到buffer中。如果nbytes没有指定或0,将buffer大小的数据存入buffer中。返回接收的字节数。
socket.recvfrom_into(buffer[, nbytes[, flags]]) 获取数据,返回一个二元组(bytes, address)到buffer中。
socket.send(bytes[, flags]) TCP发送数据。
socket.sendall(bytes[, flags]) TCP发送全部数据,成功返回None。
socket.sendto(string[, flag], address) UDP发送数据。
socket.sendfile(file, offset=0, count=None) 发送一个文件直到EOF,使用高性能的os.sendfile机制,返回发送的字节数。如果win下不支持sendfile,或者不是普通文件,使用send()发送文件。offset告诉起始位置。3.5版本开始。
socket.getpeername() 返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)
socket.getsockname() 返回套接字自己的地址。通常是一个元组(ipaddr,port)
socket.setblocking(flag) 如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。
非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。
socket.settimeout(value) 设置套接字操作的超时期,timeout是一个浮点数,单位是秒。
值为 None 表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect())。
socket.setsockopt(level,optname,value) 设置套接字选项的值。比如缓冲区大小。太多了,去看文档。
不同系统,不同版本都不尽相同。

1.2.4、MakeFile

socket.makefile(mode='r', buffering=None, *, encoding=None, errors=None, newline=None)

创建一个与该套接字相关连的文件对象,将 recv 方法看做读方法,将 send 方法看做写方法。

import socket

socketserver = socket.socket()
ip = '127.0.0.1'
port = 9999
addr = (ip, port)
socketserver.bind(addr)
socketserver.listen()
print('-' * 30)
s, _ = socketserver.accept()
print('-' * 30)
f = s.makefile(mode='rw')

line = f.read(10)  # 阻塞
print('-' * 30)
print(line)
f.write('Return your msg: {}'.format(line))
f.flush()

上例不能循环接收消息,修改一下:

import socket
import threading

sockserver = socket.socket()
ip = '127.0.0.1'
port = 9999
addr = (ip, port)
sockserver.bind(addr)
sockserver.listen()
print('-' * 30)
event = threading.Event()

def accept(sock: socket.socket, e: threading.Event):
    s, _ = sock.accept()
    print(s)
    f = s.makefile(mode='rw')
    while True:
        line = f.readline()  # 发送的数据要有回车换行符,才能显示
        print(line)
        if line.strip() == "quit":  # 注意要发quit\n
            break
        f.write('Return your msg: {}'.format(line))
        f.flush()
    f.close()
    sock.close()
    e.wait(3)

t = threading.Thread(target=accept, args=(sockserver, event))
t.start()
t.join()
print(sockserver)

1.2.4.1、makefile 练习

使用 makefile 改写群聊类:

import logging
import socket
import threading
import datetime

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(thread)d %(message)s')

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}
        self.event = threading.Event()

    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        threading.Thread(target=self.accept).start()

    def accept(self):
        while not self.event.is_set():
            sock, client = self.sock.accept()
            f = sock.makefile('rw')
            self.clients[client] = f
            threading.Thread(target=self.recv, args=(f, client), name='recv').start()

    def recv(self, f, client):
        while not self.event.is_set():
            data = f.readline() # 发过来的信息要包含回车换行符
            msg = data.strip()

            if msg == 'quit':
                self.clients.pop(client)
                f.close()
                logging.info(f'{client} quit')
                break
            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client,
                                                            data)
            logging.info(msg)

            for s in self.clients.values():
                s.write(msg)
                s.flush()

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()

        self.sock.close()
        self.event.set()

cs = ChatServer()
cs.start()

while True:
    cmd = input('>>').strip()
    if cmd == 'quit':
        cs.stop()
        threading.Event().wait(3)
        break
    logging.info(threading.enumerate())

上例完成了基本功能,但是,如果客户端主动断开,或者 readline 出现异常,就不会从 clients 中移除作废的 socket。可以使用异常处理解决这个问题。

1.2.4.2、ChatServer 实验完整代码

注意,这个代码为实验用,代码中瑕疵还有很多。Socket太底层了,实际开发中很少使用这么底层的接口。

增加一些异常处理。

import logging
import socket
import threading
import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端
        self.event = threading.Event()

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while not self.event.is_set():
            sock, client = self.sock.accept()  # 阻塞
            # 准备接收数据,recv是阻塞的,开启新的线程
            f = sock.makefile('rw')  # 支持读写
            self.clients[client] = f  # 添加到客户端字典
            threading.Thread(target=self.recv, args=(f, client), name='recv').start()

    def recv(self, f, client):  # 接收客户端数据
        while not self.event.is_set():
            try:
                data = f.readline()  # 阻塞到换行符
            except Exception as e:
                logging.error(e)  # 有任何异常,退出
                data = 'quit'
            msg = data.strip()
            # 客户端退出命令
            if msg == 'quit':
                self.clients.pop(client)
                f.close()
                logging.info('{} quits'.format(client))
                break
            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client,
                                                            data)
            logging.info(msg)
            for s in self.clients.values():
                s.write(msg)
                s.flush()

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()
        self.event.set()

def main():
    cs = ChatServer()
    cs.start()
    while True:
        cmd = input('>>').strip()
        if cmd == 'quit':
            cs.stop()
            threading.Event().wait(3)
            break
        logging.info(threading.enumerate())  # 用来观察断开后线程的变化

if __name__ == '__main__':
    main()

1.2.5、TCP 客户端编程

客户端编程步骤:

  • 创建 Socket 对象

  • 连接到远端服务端的 ip 和 port,connect() 方法

  • 传输数据

    • 使用 send、recv 方法发送、接收数据
  • 关闭连接,释放资源

import socket

client = socket.socket()
ipaddr = ('127.0.0.1', 9999)
client.connect(ipaddr)

client.send(b'abcd\n')
data = client.recv(1024)
print(data)

client.close()

开始编写客户端类:

import socket
import threading
import datetime
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatClient:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.event = threading.Event()

    def start(self):  # 启动对远端服务器的连接
        self.sock.connect(self.addr)
        self.send("I'm ready.")
        # 准备接收数据,recv是阻塞的,开启新的线程
        threading.Thread(target=self.recv, name="recv").start()

    def recv(self):  # 接收客户端的数据
        while not self.event.is_set():
            try:
                data = self.sock.recv(1024)  # 阻塞
            except Exception as e:
                logging.error(e)
                break
            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *self.addr,
                                                            data.strip())
            logging.info(msg)

    def send(self, msg: str):
        data = "{}\n".format(msg.strip()).encode()  # 服务端需要一个换行符
        self.sock.send(data)

    def stop(self):
        self.sock.close()
        self.event.wait(3)
        self.event.set()
        logging.info('Client stops.')

def main():
    cc = ChatClient()
    cc.start()
    while True:
        cmd = input('>>>')
        if cmd.strip() == 'quit':
            cc.stop()
            break
        cc.send(cmd)  # 发送消息

if __name__ == '__main__':
    main()

同样,这样的客户端还是有些问题的,仅用于测试。

1.3、UDP 编程

测试命令:

$ netstat -anp udp | find "9988" # windows查找udp是否启动端口
$ echo "123abc" | nc -u 127.0.0.1 9988 # linux下发给服务端数据

1.3.1、UDP 服务端编程

python3_net_udp

  • 创建 socket 对象,socket.SOCK_DGRAM

  • 绑定 IP 和 Port,bind() 方法。

  • 传输数据

    • 接收数据,socket.recvfrom(bufsize[, flags]),获得一个二元组 (string, address)。

    • 发送数据,socket.sendto(string, address) 发给某地址某信息。

  • 释放资源

server = socket.socket(type=socket.SOCK_DGRAM)

server.bind(('0.0.0.0', 9999)) # 立即绑定一个udp端口
data = server.recv(1024) # 阻塞等待数据
data = server.recvfrom(1024) # 阻塞等待数据(value, (ip, port))
server.sendto(b'7', ('192.168.142.1', 10000))

server.close()

1.3.2、UDP 客户端编程

  • 创建 socket 对象,socket.SOCK_DGRAM

  • 发送数据,socket.sendto(string, address) 发给某地址某信息

  • 接收数据,socket.recvfrom(bufsize[, flags]),获得一个二元组 (string, address)。

  • 释放资源

client = socket.socket(type=socket.SOCK_DGRAM)
raddr = ('192.168.142.1', 10000)

client.connect(raddr)
client.sendto(b'8', raddr)
client.send(b'9')
data = client.recvfrom(1024) # 阻塞等待数据(value, (ip, port))
data = client.recv(1024) # 阻塞等待数据

client.close()

注意:UDP 是无连接协议,所以可以只有任何一端,例如客户端数据发往服务端,服务端存在与否无所谓。

UDP 编程中 bind、connect、send、sendto、recv、recvfrom 方法使用。

UDP 的 socket 对象创建后,是没有占用本地地址和端口的。

方法 说明
bind 方法 可以指定本地地址和端口 laddr,会立即占用。
connect 方法 可以立即占用本地地址和端口,填充远端地址和端口 raddr。
sendto 方法 可以立即占用本地地址和端口,并把数据发往指定远端。只有有了本地绑定端口,sendto 就可以向任何远端发送数据。
send 方法 需要和 connect 方法配合,可以使用已经从本地端口把数据发往 raddr 指定的远端。
recv 方法 要求一定要在占用了本地端口后,返回接收的数据。
recvfrom 方法 要求一定要占用了本地端口后,返回接收的数据和对端地址的二元组。

1.3.3、UDP 群聊服务端

# 服务端类的基本架构
class ChatUDPServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = (ip, port)
        self.sock = socket.socket(type=socket.SOCK_DGRAM)

    def start(self):
        self.sock.bind(self.addr)  # 立即绑定
        self.sock.recvfrom(1024)  # 阻塞接收数据

    def stop(self):
        self.sock.close()

在上面代码的基础之上扩充:

import socket
import threading
import datetime
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatUDPServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = (ip, port)
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.clients = set()  # 记录客户端
        self.event = threading.Event()

    def start(self):
        self.sock.bind(self.addr)  # 立即绑定
        # 启动线程
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        while not self.event.is_set():
            data, raddr = self.sock.recvfrom(1024)  # 阻塞接收数据
            if data.strip() == b'quit':
                # 有可能发来数据的不在clients中
                if raddr in self.clients:
                    self.clients.remove(raddr)
                logging.info('{} leaving'.format(raddr))
                continue
            self.clients.add(raddr)
            msg = '{}. from {}:{}'.format(data.decode(), *raddr)
            logging.info(msg)
            msg = msg.encode()

            for c in self.clients:
                self.sock.sendto(msg, c)  # 不保证对方能够收到

    def stop(self):
        for c in self.clients:
            self.sock.sendto(b'bye', c)

        self.sock.close()
        self.event.set()

def main():
    cs = ChatUDPServer()
    cs.start()
    while True:
        cmd = input(">>>")
        if cmd.strip() == 'quit':
            cs.stop()
            break
        logging.info(threading.enumerate())
        logging.info(cs.clients)

if __name__ == '__main__':
    main()

1.3.4、UDP 群聊客户端

import threading
import socket
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatUdpClient:
    def __init__(self, rip='127.0.0.1', rport=9999):
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.raddr = (rip, rport)
        self.event = threading.Event()

    def start(self):
        self.sock.connect(self.raddr)  # 占用本地地址和端口,设置远端地址和端口
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        while not self.event.is_set():
            data, raddr = self.sock.recvfrom(1024)
            msg = '{}. from {}:{}'.format(data.decode(), *raddr)
            logging.info(msg)

    def send(self, msg: str):
        self.sock.sendto(msg.encode(), self.raddr)

    def stop(self):
        self.sock.close()
        self.event.set()

def main():
    cc1 = ChatUdpClient()
    cc2 = ChatUdpClient()
    cc1.start()
    cc2.start()
    print(cc1.sock)
    print(cc2.sock)
    while True:
        cmd = input('Input your words >>')
        if cmd.strip() == 'quit':
            cc1.stop()
            cc2.stop()
            break
        cc1.send(cmd)
        cc2.send(cmd)

if __name__ == '__main__':
    main()

上面的例子并不完善,如果客户端断开了,服务端不知道。每一个服务端还需要对所有客户端发送数据,包括已经断开的客户端。

1.3.5、代码改进

1.3.5.1、服务端代码改进

加一个 ack 机制和心跳 heartbeat。 心跳,就是一端定时发往另一端的信息,一般每次数据越少越好。心跳时间间隔约定好就行。 ack 即响应,一端收到另一端的消息后,返回信息。

心跳机制:

  1. 一般来说是客户端定时发往服务端的,服务端并不需要 ack 回复客户端,只需要记录该客户端还活着就行了。
  2. 如果是服务端定时发往客户端的,一般需要客户端 ack 响应来表示活着,如果没有收到 ack 的客户端,服务端移除其信息。这种实现较为复杂,用的较少。
  3. 也可以双向都发心跳的,用的更少。

在服务器端代码中使用第一种机制改进:

import socket
import threading
import datetime
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatUDPServer:
    def __init__(self, ip='127.0.0.1', port=9999, interval=10):
        self.addr = (ip, port)
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.clients = {}  # 记录客户端,改为字典
        self.event = threading.Event()
        self.interval = interval  # 默认10秒,超时就要移除对应的客户端

    def start(self):
        self.sock.bind(self.addr)  # 立即绑定
        # 启动线程
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        while not self.event.is_set():
            localset = set()  # 清理超时
            data, raddr = self.sock.recvfrom(1024)  # 阻塞接收数据
            current = datetime.datetime.now().timestamp()  # float

            if data.strip() == b'^hb^':  # 心跳信息
                print('year ^hb^', raddr)
                self.clients[raddr] = current
                continue

            elif data.strip() == b'quit':
                # 有可能发来数据的不在clients中
                self.clients.pop(raddr, None)
                logging.info('{} leaving'.format(raddr))
                continue

            # 有信息来就更新时间
            # 什么时候比较心跳时间呢? 发送信息的时候,反正要遍历一遍
            self.clients[raddr] = current

            msg = '{}. from {}:{}'.format(data.decode(), *raddr)
            logging.info(msg)
            msg = msg.encode()

            for c,stamp in self.clients.items():
                if current - stamp > self.interval:
                    localset.add(c)
                else:
                    self.sock.sendto(msg, c) # 不保证对方能收到
            for c in localset:
                self.clients.pop(c)

    def stop(self):
        for c in self.clients:
            self.sock.sendto(b'bye', c)

        self.sock.close()
        self.event.set()

def main():
    cs = ChatUDPServer()
    cs.start()
    while True:
        cmd = input(">>>")
        if cmd.strip() == 'quit':
            cs.stop()
            break
        logging.info(threading.enumerate())
        logging.info(cs.clients)

if __name__ == '__main__':
    main()

1.3.5.2、客户端代码改进

增加定时发送心跳代码:

import threading
import socket
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatUdpClient:
    def __init__(self, rip='127.0.0.1', rport=9999):
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.raddr = (rip, rport)
        self.event = threading.Event()

    def start(self):
        self.sock.connect(self.raddr)  # 占用本地地址和端口,设置远端地址和端口

        threading.Thread(target=self._sendhb, name='heartbeat', daemon=True).start()
        threading.Thread(target=self.recv, name='recv').start()

    def _sendhb(self):  # 心跳
        while not self.event.wait(5):
            self.send('^hb^')

    def recv(self):
        while not self.event.is_set():
            data, raddr = self.sock.recvfrom(1024)
            msg = '{}. from {}:{}'.format(data.decode(), *raddr)
            logging.info(msg)

    def send(self, msg: str):
        self.sock.sendto(msg.encode(), self.raddr)

    def stop(self):
        self.send('quit')  # 通知服务端退出
        self.sock.close()
        self.event.set()

def main():
    cc1 = ChatUdpClient()
    cc2 = ChatUdpClient()
    cc1.start()
    cc2.start()
    print(cc1.sock)
    print(cc2.sock)
    while True:
        cmd = input('Input your words >>')
        if cmd.strip() == 'quit':
            cc1.stop()
            cc2.stop()
            break
        cc1.send(cmd)
        cc2.send(cmd)

if __name__ == '__main__':
    main()

1.3.5.3、UDP 协议应用

UDP 是无连接协议,它基于以下假设: 网络足够好,消息不会丢包,包不会乱序。

但是,即使是在局域网,也不能保证不丢包,而且包的到达不一定有序。

应用场景,视频、音频传输,一般来说,丢些包,问题不大,最多丢些图像、听不清话语,可以重新发话语来解决。

海量采集数据,例如传感器发来的数据,丢几十、几百条数据也没有关系。 DNS 协议,数据内容小,一个包就能
查询到结果,不存在乱序,丢包,重新请求解析。

一般来说,UDP 性能优于 TCP,但是可靠性要求高的场合的还是要选择 TCP 协议。

1.4、SocketServer

socket 编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对 socket 底层 API 进行封装,Python 的封装就是 socketserver 模块。它是网络服务编程框架,便于企业级快速开发。

1.4.1、类的继承关系

+------------+
| BaseServer |
+------------+
      |
      v
+-----------+        +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+        +------------------+
      |
      v
+-----------+        +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+        +--------------------+

SocketServer 简化了网络服务器的编写。

它有 4 个同步类:TCPServer,UDPServer,UnixStreamServer,UnixDatagramServer。

2 个 Mixin 类:ForkingMixIn 和 ThreadingMixIn 类,用来支持异步。

class ForkingUDPServer(ForkingMixIn, UDPServer): pass

class ForkingTCPServer(ForkingMixIn, TCPServer): pass

class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass

class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

fork 是创建多进程,thread 是创建多线程。

1.4.2、编程接口

socketserver.BaseServer(server_address, RequestHandlerClass)

需要提供服务器绑定的地址信息,和用于处理请求的 RequestHandlerClass 类。

RequestHandlerClass 类必须是 BaseRequestHandler 类的子类,在 BaseServer 中代码如下:

class BaseServer:

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def finish_request(self, request, client_address):  # 处理请求的方法
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self) # RequestHandlerClass构造

BaseRequestHandler 类:

它是和用户连接的用户请求处理类的基类,定义为 BaseRequestHandler(request, client_address, server)。

服务端 Server 实例接收用户请求后,最后会实例化这个类。

它被初始化时,送入 3 个构造参数:request, client_address, server 自身。

以后就可以在 BaseRequestHandler 类的实例上使用以下属性:

self.request 是和客户端的连接的 socket 对象

self.server 是 TCPServer 本身

self.client_address 是客户端地址

这个类在初始化的时候,它会依次调用 3 个方法。子类可以覆盖这些方法。

# BaseRequestHandler 要子类覆盖的方法
class BaseRequestHandler:
    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

测试代码:

import threading
import socketserver

class MyHandler(socketserver.BaseRequestHandler):
    def handle(self) -> None:
        super().handle()  # 可以不调用,父类handle什么都没有做
        print('-' * 80)
        print(1, '-->', self.server)  # 服务
        print(2, '-->', self.request)  # 服务端负责客户端连接请求的socket对象
        print(3, '-->', self.client_address)  # 客户端地址
        print(4, '-->', self.__dict__)  # 能看到负责accept的socket

        print(5, '-->', threading.enumerate())
        print(6, '-->', threading.current_thread())
        print('-' * 80)

addr = ('10.47.76.53', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever()  # 永久

测试结果说明,handle 方法相当于 socket 的 recv 方法。

每个不同的连接上的请求过来后,生成这个连接的 socket 对象即 self.request,客户端地址 self.client_address。

问题:

测试过程中,上面代码,连接后立即断开了,为什么?

怎样才能客户端和服务器端长时间连接?

import logging
import threading
import socketserver

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class MyHandler(socketserver.BaseRequestHandler):
    def handle(self) -> None:
        super().handle()  # 可以不调用,父类handle什么都没有做
        print('-' * 80)
        print(1, '-->', self.server)  # 服务
        print(2, '-->', self.request)  # 服务端负责客户端连接请求的socket对象
        print(3, '-->', self.client_address)  # 客户端地址
        print(4, '-->', self.__dict__)  # 能看到负责accept的socket

        print(5, '-->', threading.enumerate())
        print(6, '-->', threading.current_thread())
        print('-' * 80)

        for i in range(3):
            data = self.request.recv(1024)
            logging.info(data)
        logging.info('======= end =======')

addr = ('10.47.76.53', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever()  # 永久

将 ThreadingTCPServer 换成 TCPServer,同时连接 2 个客户端观察效果。

ThreadingTCPServer 是异步的,可以同时处理多个连接。

TCPServer 是同步的,一个连接处理完了,即一个连接的 handle 方法执行完了,才能处理另一个连接,且只有主线程。

创建服务器需要几个步骤:

  1. 从 BaseRequestHandler 类派生出子类,并覆盖其 handle() 方法来创建请求处理程序类,此方法将处理
    传入请求。
  2. 实例化一个服务器类,传参服务器的地址和请求处理类。
  3. 调用服务器实例的 handle_request() 或 serve_forever() 方法。
  4. 调用 server_close() 关闭套接字。

1.4.3、实现 EchoServer

顾名思义,Echo,来什么消息回显什么消息。客户端发来什么信息,返回什么信息。

import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys

class EchoHandler(BaseRequestHandler):
    def setup(self) -> None:
        super().setup()
        self.event = threading.Event()  # 初始工作

    def finish(self) -> None:
        super().finish()
        self.event.set()

    def handle(self) -> None:
        super().handle()

        while not self.event.is_set():
            data = self.request.recv(1024).decode()
            msg = "{} {}".format(self.client_address, data).encode()
            self.request.send(msg)
        print('End')

addr = ('0.0.0.0', 9999)
server = ThreadingTCPServer(addr, EchoHandler)

server_thread = threading.Thread(target=server.serve_forever, name='EchoServer', daemon=True)
server_thread.start()

try:
    while True:
        cmd = input('>>>')
        if cmd.strip() == 'quit':
            break
        print(threading.enumerate())
except Exception as e:
    print(e)
except KeyboardInterrupt:
    pass
finally:
    print('exit')
    sys.exit(0)

1.4.4、改写 ChatServer

使用 ThreadingTCPServer 改写 ChatServer:

import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatHandler(BaseRequestHandler):
    CLIENTS = {}

    def setup(self) -> None:
        super().setup()
        self.event = threading.Event()  # 初始工作
        self.CLIENTS[self.client_address] = self.request

    def finish(self) -> None:
        super().finish()  # 清理工作
        self.CLIENTS.pop(self.client_address)  # 能执行到吗?
        self.event.set()

    def handle(self) -> None:
        super().handle()

        while not self.event.is_set():
            data = self.request.recv(1024).decode()
            if data == 'quit':
                break

            msg = "{} {}".format(self.client_address, data).encode()
            logging.info(msg)

            for c in self.CLIENTS.values():
                c.send(msg)
        print('End')

addr = ('0.0.0.0', 9999)
server = ThreadingTCPServer(addr, ChatHandler)

server_thread = threading.Thread(target=server.serve_forever, name='ChatHandler', daemon=True)
server_thread.start()

try:
    while True:
        cmd = input('>>>')
        if cmd.strip() == 'quit':
            break
        print(threading.enumerate())
except Exception as e:
    print(e)
except KeyboardInterrupt:
    pass
finally:
    print('exit')
    sys.exit(0)

问题:

上例 self.clients.pop(self.client_address) 能执行到吗?

如果连接的线程中 handle 方法中抛出异常,例如客户端主动断开导致的异常,线程崩溃,self.clients 的 pop 方法还能执行吗?

当然能执行,基类源码保证了即使异常,也能执行 finish 方法。但不代表不应该捕获客户端各种异常。

1.4.5、解决客户端主动连接断开问题

如果客户端主动断开,总是抛出一个异常。看看到底发生了什么,在 handle 方法中增加一些语句。

    def handle(self) -> None:
        super().handle()

        while not self.event.is_set():
            data = self.request.recv(1024).decode()
            print(data, '+++++++++++++++++++')  # 增加
            if data == 'quit':
                break

            msg = "{} {}".format(self.client_address, data).encode()
            logging.info(msg)

            for c in self.CLIENTS.values():
                print('+++++++++++++++++++')  # 增加
                c.send(msg)
        print('End')

通过打印可以看到,客户端主动断开,会导致 recv 方法立即返回一个空 bytes,并没有同时抛出异常。当循环回到 recv 这一句的时候就会抛出异常。所以,可以通过判断 data 数据是否为空来客户端是否断开。

    def handle(self) -> None:
        super().handle()

        while not self.event.is_set():
            data = self.request.recv(1024).decode()
            print(data, '+++++++++++++++++++')  # 增加
            if not data or data == 'quit':
                print("Broken pipe")
                break

            msg = "{} {}".format(self.client_address, data).encode()
            logging.info(msg)

            for c in self.CLIENTS.values():
                print('+++++++++++++++++++')  # 增加
                c.send(msg)
        print('End')

为每一个连接提供 RequestHandlerClass 类实例,依次调用 setup、handle、finish 方法,且使用了 try...finally 结构保证 finish 方法一定能被调用。

这些方法依次执行完成,如果想维持这个连接和客户端通信,就需要在 handle 函数中使用循环。

socketserver 模块提供不同的类,但是编程接口是一样的,即使是多进程、多线程的类也是一样,大大减少了编程的难度。

1.5、异步编程

1.5.1、同步、异步

函数或方法被调用的时候,调用者是否得到最终结果的。

  • 直接得到最终结果的,就是同步调用;

  • 不直接得到最终结果的,就是异步调用。

同步就是我让你打饭,你不打好我不走开,直到你打好饭给了我。

异步就是我让你打饭,你打着,我不等你,但是我会盯着你,你打完,我会过来拿走的。异步并不保证多长时间最终打完饭。

1.5.2、阻塞、非阻塞

函数或方法调用的时候,是否立刻返回。

  • 立即返回就是非阻塞调用;
  • 不立即返回就是阻塞调用。

1.5.3、区别

同步、异步,与阻塞、非阻塞不相关。

同步、异步强调的是,是否得到 最终的结果

阻塞、非阻塞强调是时间,是否 等待

同步与异步区别在于:调用者是否得到了想要的最终结果。

同步就是一直要执行到返回最终结果;

异步就是直接返回了,但是返回的不是最终结果。调用者不能通过这种调用得到结果,还要通过被调用者,使用其它方式通知调用者,来取回最终结果。

阻塞与非阻塞的区别在于,调用者是否还能干其他事。

阻塞,调用者就只能干等;

非阻塞,调用者可以先去忙会别的,不用一直等。

1.5.4、联系

同步阻塞,我啥事不干,就等你打饭给我。打到饭是结果,而且我啥事不干一直等,同步加阻塞。

同步非阻塞,我等着你打饭给我,但我可以玩会手机、看看电视。打饭是结果,但是我不一直等。

异步阻塞,我要打饭,你说等叫号,并没有返回饭给我,我啥事不干,就干等着饭好了你叫我。

异步非阻塞,我要打饭,你说等叫号,并没有返回饭给我,我在旁边看电视、玩手机,饭打好了叫我。

1.5.5、IO 两个阶段

IO 过程分两阶段:

  1. 数据准备阶段

  2. 内核空间复制回用户进程缓冲区阶段

发生 IO 的时候:

  1. 内核从输入设备读、写数据(淘米,把米放饭锅里煮饭)

  2. 进程从内核复制数据(盛饭,从内核这个饭锅里面把饭装到碗里来)

系统调用 --> read 函数。

1.5.6、同步 IO

同步 IO 模型包括:阻塞 IO、非阻塞 IO、IO 多路复用。

1.5.6.1、阻塞 IO

python3_blocking_IO

进程等待(阻塞),直到读写完成。(全程等待)

read/write 函数。

1.5.6.2、非阻塞 IO

python3_nonblocking_IO

进程调用 read 操作,如果 IO 设备没有准备好,立即返回 EWOULDBLOCK ERROR 异常,进程不阻塞。用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。

第一阶段数据没有准备好,就先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的。

第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。

淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭。

read/write 函数。

1.5.6.3、IO 多路复用

所谓 IO 多路复用,就是同时监控多个 IO,有一个准备好了,就不需要等了,开始处理,提高了同时处理 IO 的能力。

select 几乎所有操作系统平台都支持,poll 是对 select 的升级。

epoll,Linux 系统内核 2.5+ 开始支持,对 select 和 poll 的增强,在监视的基础上,增加回调机制。BSD、Mac 平台有 kqueue,Windows 有 iocp。

python3_multiplexing_IO

以 select 为例,将关注的 IO 操作告诉 select 函数并调用,进程阻塞,内核 “监视” select 关注的文件描述符 fd,被关注的任何一个 fd 对应的 IO 准备好了数据,select 返回。再使用 read 将数据复制到用户进程。

select 举例,食堂供应很多菜(众多的 IO),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要等,你只好等待。其中一样菜好了,大师傅叫你过来说你点的菜有好的了,你得自己找找看哪一样菜好了,请服务员把做好的菜打给你。

epoll 是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了。

一般情况下,select 最多能监听 1024 个 fd(可以修改,但不建议改),但是由于 select 采用轮询的方式,当管理的 IO 多了,每次都要遍历全部 fd,效率低下。

epoll 没有管理的 fd 的上限,且是回调机制,不需遍历,效率很高。

1.5.7、异步 IO

python3_async_IO

进程发起异步 IO 请求,立即返回。内核完成 IO 的两个阶段,内核给进程发一个信号。

举例,来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两阶段都是异步的。

在整个过程中,进程都可以忙别的,等好了才过来。

举例,今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二
阶段)。

Linux 的 aio 的系统调用,内核从版本 2.6 开始支持。

1.6、Python3 中 IO 多路复用

IO 多路复用

  • 大多数操作系统都支持 select 和 poll

  • Linux 2.5+ 支持 epoll

  • BSD、Mac 支持 kqueue

  • Windows 的 IOCP

Python 的 select 库:

实现了 select、poll 系统调用,这个基本上操作系统都支持。部分实现了 epoll。

底层的 IO 多路复用模块。

开发中的选择:

1、完全跨平台,使用 select、poll。但是性能较差。

2、针对不同操作系统自行选择支持的技术,这样做会提高 IO 处理的性能。

1.6.1、selectors 库

3.4 版本提供这个库,高级 IO 复用库。

类层次结构︰
BaseSelector
+-- SelectSelector  实现select
+-- PollSelector    实现poll
+-- EpollSelector   实现epoll
+-- DevpollSelector 实现devpoll
+-- KqueueSelector  实现kqueue

selectors.DefaultSelector 返回当前平台最有效、性能最高的实现。

但是,由于没有实现 Windows 下的 IOCP,所以,只能退化为 select。

# 在selects模块源码最下面有如下代码
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

abstractmethod register(fileobj, events, data=None)

为 selector 注册一个文件对象,监视它的 IO 事件。

fileobj 被监视文件对象,例如 socket 对象。

events 事件,该文件对象必须等待的事件。

data 可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话 ID,关联方法。通过这个参数在关注的事件产生后让 selector 干什么事。

Event 常量 含义
EVENT_READ 可读 0b01,内核已经准备好输入输出设备,可以开始读了
EVENT_WRITE 可写 0b10,内核准备好了,可以往里写了

官方样例:

import selectors
import socket

# 构造缺省性能最优selector
sel = selectors.DefaultSelector()

# 回调函数,自己定义形参
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print(1, '-->', 'accepted', conn, 'from', addr)
    conn.setblocking(False)  # 非阻塞
    sel.register(conn, selectors.EVENT_READ, read)

# 回调函数
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print(2, '-->', 'echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print(3, '-->', 'closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('0.0.0.0', 1234))
sock.listen(100)
sock.setblocking(False)  # 非阻塞

# 注册文件对象sock关注读事件,返回SelectorKey
# 将sock、关注事件、data都绑定到key实例属性上
sltkey = sel.register(sock, selectors.EVENT_READ, accept)
print(4, '-->', sltkey)

while True:
    # 开始监视,等到有文件对象监控事件产生,返回(key, mask)元组
    events = sel.select()
    for key, mask in events:
        callback = key.data  # 回调函数
        callback(key.fileobj, mask)

1.6.2、ChatServer IO 多路复用

将 ChatServer 改写成 IO 多路复用的方式,不需要启动多线程来执行 socket 的 accept、recv 方法了。

import socket
import threading
import datetime
import logging
import selectors

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatServer:
    def __init__(self, ip='0.0.0.0', port=9999):
        self.sock = socket.socket()
        self.addr = (ip, port)

        self.event = threading.Event()
        self.selector = selectors.DefaultSelector()  # 创建selector

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        self.sock.setblocking(False)  # 不阻塞

        # 注册
        self.selector.register(self.sock, selectors.EVENT_READ, self.accept)

        threading.Thread(target=self.select, name='selector', daemon=True).start()

    def select(self):  # 阻塞
        while not self.event.is_set():
            # 开始监视,等到某文件对象被监控的事件产生,返回(key, mask)元组
            events = self.selector.select()  # 阻塞,直到events
            print('-' * 30)
            for key, mask in events:
                logging.info(key)
                logging.info(mask)
                key.data(key.fileobj)  # 回调函数,注册时都绑定到了selectkey实例

    def accept(self, sock: socket.socket):  # 接收客户端连接
        conn, raddr = sock.accept()
        conn.setblocking(False)  # 非阻塞

        # 注册,监视每一个与客户端的连接的socket对象
        self.selector.register(conn, selectors.EVENT_READ, self.recv)

    def recv(self, sock: socket.socket):  # 接收客户端数据
        data = sock.recv(1024)
        if not data or data == b'quit':  # 客户端主动断开或退出,注销并关闭socket
            self.selector.unregister(sock)
            sock.close()
            return
        msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(),
                                                        *sock.getpeername(), data.decode())
        logging.info(msg)
        msg = msg.encode()

        # 群发
        for key in self.selector.get_map().values():
            if key.data == self.recv:  # 排除self.accept
                key.fileobj.send(msg)

    def stop(self):  # 停止服务
        self.event.set()
        fobjs = []
        for fd, key in self.selector.get_map().items():
            fobjs.append(key.fileobj)
        for fobj in fobjs:
            self.selector.unregister(fobj)
            fobj.close()
        self.selector.close()

def main():
    cs = ChatServer()
    cs.start()

    while True:
        cmd = input('>>').strip()
        if cmd == 'quit':
            cs.stop()
            threading.Event().wait(3)
            break
        logging.info(threading.enumerate())

if __name__ == '__main__':
    main()

进阶:

send 是写操作,也可以让 selector 监听,如何监听?

self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.recv)

注册语句,要监听 selectors.EVENT_READ | selectors.EVENT_WRITE 读与写事件。

回调的时候,需要 mask 来判断究竟是读触发还是写触发了。所以,需要修改方法声明,增加 mask。

def recv(self, sock, mask) 但是由于 recv 方法处理读和写事件,所以叫 recv 不太合适,改名为 def handle(self, sock, mask)

注意读和写是分离的,那么 handle 函数应该写成下面这样:

def handle(self, sock:socket.socket, mask): # 接收客户端数据
    if mask & selectors.EVENT_READ:
        pass

    # 注意,这里是某一个socket的写操作
    if mask & selectors.EVENT_WRITE: # 写缓冲区准备好了,可以写入数据了
        pass

handle 方法里面处理读、写,mask 有可能是 0b01、0b10、0b11。

问题是,假设读取到了客户端发来的数据后,如何写出去?

为每一个与客户端连接的 socket 对象增加对应的队列。

与每一个客户端连接的 socket 对象,自己维护一个队列,某一个客户端收到信息后,会遍历发给所有客户端的队列。这里完成一对多,即一份数据放到了所有队列中。

与每一个客户端连接的 socket 对象,发现自己队列有数据,就发送给客户端。

import socket
import threading
import datetime
import logging
import selectors
from queue import Queue

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}
        self.event = threading.Event()
        self.selector = selectors.DefaultSelector()  # 创建selector

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        self.sock.setblocking(False)  # 不阻塞
        # 注册
        self.selector.register(self.sock, selectors.EVENT_READ, self.accept)
        threading.Thread(target=self.select, name='selector', daemon=True).start()

    def select(self):  # 阻塞
        while not self.event.is_set():

            # 开始监视,等到某文件对象被监控的事件产生,返回(key, mask)元组
            events = self.selector.select()  # 阻塞,直到events
            for key, mask in events:
                if callable(key.data):
                    callback = key.data  # key对象的data属性,回调
                    callback(key.fileobj, mask)
                else:
                    callback = key.data[0]
                    callback(key, mask)

    def accept(self, sock: socket.socket, mask):  # 接收客户端连接
        conn, raddr = sock.accept()

        conn.setblocking(False)  # 非阻塞
        self.clients[raddr] = (self.handle, Queue())
        # 注册,监视每一个与客户端的连接的socket对象
        self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE,
                               self.clients[raddr])

    def handle(self, key: selectors.SelectorKey, mask):  # 接收客户端数据
        if mask & selectors.EVENT_READ:
            sock = key.fileobj

            raddr = sock.getpeername()
            data = sock.recv(1024)
            if not data or data == b'quit':
                self.selector.unregister(sock)
                sock.close()
                self.clients.pop(raddr)
                return
            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *raddr, data.decode())
            logging.info(msg)
            msg = msg.encode()
            for k in self.selector.get_map().values():
                logging.info(k)
                if isinstance(k.data, tuple):
                    k.data[1].put(data)
        if mask & selectors.EVENT_WRITE:
            # 因为写一直就绪,mask为2,所以一直可以写,从而导致select()不断循环,如同不阻塞一样
            if not key.data[1].empty():
                key.fileobj.send(key.data[1].get())

    def stop(self):  # 停止服务
        self.event.set()
        fobjs = []
        for fd, key in self.selector.get_map().items():
            fobjs.append(key.fileobj)
        for fobj in fobjs:
            self.selector.unregister(fobj)
            fobj.close()
        self.selector.close()

def main():
    cs = ChatServer()
    cs.start()

    while True:
        cmd = input('>>').strip()
        if cmd == 'quit':
            cs.stop()
            threading.Event().wait(3)
            break
        logging.info('-' * 30)
        logging.info("{} {}".format(len(cs.clients), cs.clients))
        logging.info(list(map(lambda x: (x.fileobj.fileno(), x.data), cs.selector.get_map().values())))
        logging.info('-' * 30)
        logging.info(threading.enumerate())

if __name__ == '__main__':
    main()

这个程序最大的问题,在 select() 一直判断可写,几乎一直循环不停。所以对于写不频繁的情况下,就不要监听 EVENT_WRITE。

一般对于 Server 来说,更多的是等待对方发来数据后响应时才发出数据,而不是积极的等着发送数据。所以监听 EVENT_READ,收到数据之后再发送就可以了。

本例只完成基本功能,其他功能如有需要,请自行完成。

标签云