1、Python3 网络编程
1.1、Socket 介绍
Socket 套接字:Python 中提供 socket.py 标准库,非常底层的接口库。
Socket 是一种通用的网络编程接口,和网络层次没有一一对应的关系。
协议族:AF 表示 Address Family,用于 socket() 第一个参数。
名称 | 含义 |
---|---|
AF_INET | IPV4 |
AF_INET6 | IPV6 |
AF_UNIX | Unix Domain Socket,windows 没有 |
Socket 类型:
名称 | 含义 |
---|---|
SOCK_STREAM | 面向连接的流套接字。默认值,TCP协议。 |
SOCK_DGRAM | 无连接的数据报文套接字。UDP 协议。 |
1.2、TCP 编程
Socket 编程,需要两端,一般来说需要一个服务端、一个客户端,服务端称为 Server,客户端称为 Client。
1.2.1、TCP 服务端编程
服务器端编程步骤:
-
创建 Socket 对象。
-
绑定 IP 地址 Address 和端口 Port。bind() 方法。
IPv4 地址为一个二元组('IP地址字符串', Port)。
-
开始监听,将在指定的 IP 的端口上监听。listen() 方法。
-
获取用于传送数据的 Socket 对象。
socket.accept() -> (socket object, address info)
accept 方法阻塞等待客户端建立连接,返回一个新的 Socket 对象和客户端地址的二元组。
地址是远程客户端的地址,IPv4 中它是一个二元组(clientaddr, port)。
- 接收数据
recv(bufsize[, flags])
使用缓冲区接收数据。- 发送数据
send(bytes)
发送数据。
问题:两次绑定同一个监听端口会怎么样?
import socket
s = socket.socket() # 创建socket对象
s.bind(('127.0.0.1', 9999)) # 一个二元组
s.listen() # 开始监听
# 开启一个连接
s1, info = s.accept() # 阻塞直到和客户端成功建立连接,返回一个socket对象和客户端地址
data = s1.recv(1024) # 使用缓冲区获取数据
print(1, '-->', data, info)
s1.send(b'www.brinnatt.com ack')
# 开启另外一个连接
s2, _ = s.accept()
data = s2.recv(1024)
print(2, '-->', data)
s2.send(b'hello python')
# 关闭套接字
s.close()
上例 accept 和 recv 是阻塞的,主线程经常被阻塞住而不能工作。怎么办?
1.2.2、群聊程序(服务端)
1.2.2.1、需求分析
聊天工具是 CS 程序,C 是每一个客户端,S 是服务器端。
服务器应该具有的功能:
-
启动服务,包括绑定地址和端口,监听
-
建立连接,能和多个客户端建立连接
-
接收不同用户的信息
-
分发,将接收的某个用户的信息转发到已连接的所有客户端
-
停止服务
-
记录连接的客户端
1.2.2.2、代码实现
服务端应该对应一个类:
import socket
class ChatServer:
def __init__(self, ip, port): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
def start(self): # 启动监听
pass
def accept(self): # 多人连接
pass
def recv(self): # 接收客户端数据
pass
def stop(self): # 停止服务
pass
在此基础上,扩展完成:
import logging
import socket
import threading
import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while True:
sock, client = self.sock.accept() # 阻塞
self.clients[client] = sock # 添加到客户端字典
# 准备接收数据,recv是阻塞的,开启新的线程
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock: socket.socket, client): # 接收客户端数据
while True:
data = sock.recv(1024) # 阻塞到数据到来
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client,
data.decode())
logging.info(msg)
msg = msg.encode()
for s in self.clients.values():
s.send(msg)
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
cs = ChatServer()
cs.start()
基本功能完成,但是有问题。使用 Event 改进。
import logging
import socket
import threading
import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
self.event = threading.Event()
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while not self.event.is_set():
sock, client = self.sock.accept() # 阻塞
self.clients[client] = sock # 添加到客户端字典
# 准备接收数据,recv是阻塞的,开启新的线程
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock: socket.socket, client): # 接收客户端数据
while not self.event.is_set():
data = sock.recv(1024) # 阻塞到数据到来
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client,
data.decode())
logging.info(msg)
msg = msg.encode()
for s in self.clients.values():
s.send(msg)
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
self.event.set()
cs = ChatServer()
cs.start()
while True:
cmd = input(">>").strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
这一版基本能用了,测试通过。但是还有要完善的地方。例如各种异常的判断,客户端断开连接后字典中要移除客户端数据等。
客户端主动断开带来的问题:服务端知道自己何时断开,如果客户端断开,服务器不知道。
所以,好的做法是,客户端断开发出特殊消息通知服务器端断开连接。但是,如果客户端主动断开,服务端主动发
送一个空消息,超时返回异常,捕获异常并清理连接。
即使为客户端提供了断开命令,也不能保证客户端会使用它断开连接。但是还是要增加这个退出功能。
增加客户端退出命令:
import logging
import socket
import threading
import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
self.event = threading.Event()
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while not self.event.is_set():
sock, client = self.sock.accept() # 阻塞
self.clients[client] = sock # 添加到客户端字典
# 准备接收数据,recv是阻塞的,开启新的线程
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock: socket.socket, client): # 接收客户端数据
while not self.event.is_set():
data = sock.recv(1024) # 阻塞到数据到来
msg = data.decode().strip()
# 客户端退出命令
if msg == 'quit':
self.clients.pop(client)
sock.close()
logging.info('{} quits'.format(client))
break
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client,
data.decode())
logging.info(msg)
msg = msg.encode()
for s in self.clients.values():
s.send(msg)
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
self.event.set()
cs = ChatServer()
cs.start()
while True:
cmd = input(">>").strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info(threading.enumerate()) # 用来观察断开后线程的变化
程序还有瑕疵,但是业务功能基本完成了。
1.2.3、socket 常用方法
名称 | 含义 |
---|---|
socket.recv(bufsize[, flags]) | 获取数据。默认是阻塞的方式。 |
socket.recvfrom(bufsize[, flags]) | 获取数据,返回一个二元组(bytes, address)。 |
socket.recv_into(buffer[, nbytes[, flags]]) | 获取到nbytes的数据后,存储到buffer中。如果nbytes没有指定或0,将buffer大小的数据存入buffer中。返回接收的字节数。 |
socket.recvfrom_into(buffer[, nbytes[, flags]]) | 获取数据,返回一个二元组(bytes, address)到buffer中。 |
socket.send(bytes[, flags]) | TCP发送数据。 |
socket.sendall(bytes[, flags]) | TCP发送全部数据,成功返回None。 |
socket.sendto(string[, flag], address) | UDP发送数据。 |
socket.sendfile(file, offset=0, count=None) | 发送一个文件直到EOF,使用高性能的os.sendfile机制,返回发送的字节数。如果win下不支持sendfile,或者不是普通文件,使用send()发送文件。offset告诉起始位置。3.5版本开始。 |
socket.getpeername() | 返回连接套接字的远程地址。返回值通常是元组(ipaddr,port) |
socket.getsockname() | 返回套接字自己的地址。通常是一个元组(ipaddr,port) |
socket.setblocking(flag) | 如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。 非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。 |
socket.settimeout(value) | 设置套接字操作的超时期,timeout是一个浮点数,单位是秒。 值为 None 表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect())。 |
socket.setsockopt(level,optname,value) | 设置套接字选项的值。比如缓冲区大小。太多了,去看文档。 不同系统,不同版本都不尽相同。 |
1.2.4、MakeFile
socket.makefile(mode='r', buffering=None, *, encoding=None, errors=None, newline=None)
创建一个与该套接字相关连的文件对象,将 recv 方法看做读方法,将 send 方法看做写方法。
import socket
socketserver = socket.socket()
ip = '127.0.0.1'
port = 9999
addr = (ip, port)
socketserver.bind(addr)
socketserver.listen()
print('-' * 30)
s, _ = socketserver.accept()
print('-' * 30)
f = s.makefile(mode='rw')
line = f.read(10) # 阻塞
print('-' * 30)
print(line)
f.write('Return your msg: {}'.format(line))
f.flush()
上例不能循环接收消息,修改一下:
import socket
import threading
sockserver = socket.socket()
ip = '127.0.0.1'
port = 9999
addr = (ip, port)
sockserver.bind(addr)
sockserver.listen()
print('-' * 30)
event = threading.Event()
def accept(sock: socket.socket, e: threading.Event):
s, _ = sock.accept()
print(s)
f = s.makefile(mode='rw')
while True:
line = f.readline() # 发送的数据要有回车换行符,才能显示
print(line)
if line.strip() == "quit": # 注意要发quit\n
break
f.write('Return your msg: {}'.format(line))
f.flush()
f.close()
sock.close()
e.wait(3)
t = threading.Thread(target=accept, args=(sockserver, event))
t.start()
t.join()
print(sockserver)
1.2.4.1、makefile 练习
使用 makefile 改写群聊类:
import logging
import socket
import threading
import datetime
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(thread)d %(message)s')
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999):
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {}
self.event = threading.Event()
def start(self):
self.sock.bind(self.addr)
self.sock.listen()
threading.Thread(target=self.accept).start()
def accept(self):
while not self.event.is_set():
sock, client = self.sock.accept()
f = sock.makefile('rw')
self.clients[client] = f
threading.Thread(target=self.recv, args=(f, client), name='recv').start()
def recv(self, f, client):
while not self.event.is_set():
data = f.readline() # 发过来的信息要包含回车换行符
msg = data.strip()
if msg == 'quit':
self.clients.pop(client)
f.close()
logging.info(f'{client} quit')
break
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client,
data)
logging.info(msg)
for s in self.clients.values():
s.write(msg)
s.flush()
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
self.event.set()
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info(threading.enumerate())
上例完成了基本功能,但是,如果客户端主动断开,或者 readline 出现异常,就不会从 clients 中移除作废的 socket。可以使用异常处理解决这个问题。
1.2.4.2、ChatServer 实验完整代码
注意,这个代码为实验用,代码中瑕疵还有很多。Socket太底层了,实际开发中很少使用这么底层的接口。
增加一些异常处理。
import logging
import socket
import threading
import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
self.event = threading.Event()
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while not self.event.is_set():
sock, client = self.sock.accept() # 阻塞
# 准备接收数据,recv是阻塞的,开启新的线程
f = sock.makefile('rw') # 支持读写
self.clients[client] = f # 添加到客户端字典
threading.Thread(target=self.recv, args=(f, client), name='recv').start()
def recv(self, f, client): # 接收客户端数据
while not self.event.is_set():
try:
data = f.readline() # 阻塞到换行符
except Exception as e:
logging.error(e) # 有任何异常,退出
data = 'quit'
msg = data.strip()
# 客户端退出命令
if msg == 'quit':
self.clients.pop(client)
f.close()
logging.info('{} quits'.format(client))
break
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client,
data)
logging.info(msg)
for s in self.clients.values():
s.write(msg)
s.flush()
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
self.event.set()
def main():
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info(threading.enumerate()) # 用来观察断开后线程的变化
if __name__ == '__main__':
main()
1.2.5、TCP 客户端编程
客户端编程步骤:
-
创建 Socket 对象
-
连接到远端服务端的 ip 和 port,connect() 方法
-
传输数据
- 使用 send、recv 方法发送、接收数据
-
关闭连接,释放资源
import socket
client = socket.socket()
ipaddr = ('127.0.0.1', 9999)
client.connect(ipaddr)
client.send(b'abcd\n')
data = client.recv(1024)
print(data)
client.close()
开始编写客户端类:
import socket
import threading
import datetime
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatClient:
def __init__(self, ip='127.0.0.1', port=9999):
self.sock = socket.socket()
self.addr = (ip, port)
self.event = threading.Event()
def start(self): # 启动对远端服务器的连接
self.sock.connect(self.addr)
self.send("I'm ready.")
# 准备接收数据,recv是阻塞的,开启新的线程
threading.Thread(target=self.recv, name="recv").start()
def recv(self): # 接收客户端的数据
while not self.event.is_set():
try:
data = self.sock.recv(1024) # 阻塞
except Exception as e:
logging.error(e)
break
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *self.addr,
data.strip())
logging.info(msg)
def send(self, msg: str):
data = "{}\n".format(msg.strip()).encode() # 服务端需要一个换行符
self.sock.send(data)
def stop(self):
self.sock.close()
self.event.wait(3)
self.event.set()
logging.info('Client stops.')
def main():
cc = ChatClient()
cc.start()
while True:
cmd = input('>>>')
if cmd.strip() == 'quit':
cc.stop()
break
cc.send(cmd) # 发送消息
if __name__ == '__main__':
main()
同样,这样的客户端还是有些问题的,仅用于测试。
1.3、UDP 编程
测试命令:
$ netstat -anp udp | find "9988" # windows查找udp是否启动端口
$ echo "123abc" | nc -u 127.0.0.1 9988 # linux下发给服务端数据
1.3.1、UDP 服务端编程
-
创建 socket 对象,
socket.SOCK_DGRAM
。 -
绑定 IP 和 Port,bind() 方法。
-
传输数据
-
接收数据,socket.recvfrom(bufsize[, flags]),获得一个二元组 (string, address)。
-
发送数据,socket.sendto(string, address) 发给某地址某信息。
-
-
释放资源
server = socket.socket(type=socket.SOCK_DGRAM)
server.bind(('0.0.0.0', 9999)) # 立即绑定一个udp端口
data = server.recv(1024) # 阻塞等待数据
data = server.recvfrom(1024) # 阻塞等待数据(value, (ip, port))
server.sendto(b'7', ('192.168.142.1', 10000))
server.close()
1.3.2、UDP 客户端编程
-
创建 socket 对象,
socket.SOCK_DGRAM
。 -
发送数据,socket.sendto(string, address) 发给某地址某信息
-
接收数据,socket.recvfrom(bufsize[, flags]),获得一个二元组 (string, address)。
-
释放资源
client = socket.socket(type=socket.SOCK_DGRAM)
raddr = ('192.168.142.1', 10000)
client.connect(raddr)
client.sendto(b'8', raddr)
client.send(b'9')
data = client.recvfrom(1024) # 阻塞等待数据(value, (ip, port))
data = client.recv(1024) # 阻塞等待数据
client.close()
注意:UDP 是无连接协议,所以可以只有任何一端,例如客户端数据发往服务端,服务端存在与否无所谓。
UDP 编程中 bind、connect、send、sendto、recv、recvfrom 方法使用。
UDP 的 socket 对象创建后,是没有占用本地地址和端口的。
方法 | 说明 |
---|---|
bind 方法 | 可以指定本地地址和端口 laddr,会立即占用。 |
connect 方法 | 可以立即占用本地地址和端口,填充远端地址和端口 raddr。 |
sendto 方法 | 可以立即占用本地地址和端口,并把数据发往指定远端。只有有了本地绑定端口,sendto 就可以向任何远端发送数据。 |
send 方法 | 需要和 connect 方法配合,可以使用已经从本地端口把数据发往 raddr 指定的远端。 |
recv 方法 | 要求一定要在占用了本地端口后,返回接收的数据。 |
recvfrom 方法 | 要求一定要占用了本地端口后,返回接收的数据和对端地址的二元组。 |
1.3.3、UDP 群聊服务端
# 服务端类的基本架构
class ChatUDPServer:
def __init__(self, ip='127.0.0.1', port=9999):
self.addr = (ip, port)
self.sock = socket.socket(type=socket.SOCK_DGRAM)
def start(self):
self.sock.bind(self.addr) # 立即绑定
self.sock.recvfrom(1024) # 阻塞接收数据
def stop(self):
self.sock.close()
在上面代码的基础之上扩充:
import socket
import threading
import datetime
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUDPServer:
def __init__(self, ip='127.0.0.1', port=9999):
self.addr = (ip, port)
self.sock = socket.socket(type=socket.SOCK_DGRAM)
self.clients = set() # 记录客户端
self.event = threading.Event()
def start(self):
self.sock.bind(self.addr) # 立即绑定
# 启动线程
threading.Thread(target=self.recv, name='recv').start()
def recv(self):
while not self.event.is_set():
data, raddr = self.sock.recvfrom(1024) # 阻塞接收数据
if data.strip() == b'quit':
# 有可能发来数据的不在clients中
if raddr in self.clients:
self.clients.remove(raddr)
logging.info('{} leaving'.format(raddr))
continue
self.clients.add(raddr)
msg = '{}. from {}:{}'.format(data.decode(), *raddr)
logging.info(msg)
msg = msg.encode()
for c in self.clients:
self.sock.sendto(msg, c) # 不保证对方能够收到
def stop(self):
for c in self.clients:
self.sock.sendto(b'bye', c)
self.sock.close()
self.event.set()
def main():
cs = ChatUDPServer()
cs.start()
while True:
cmd = input(">>>")
if cmd.strip() == 'quit':
cs.stop()
break
logging.info(threading.enumerate())
logging.info(cs.clients)
if __name__ == '__main__':
main()
1.3.4、UDP 群聊客户端
import threading
import socket
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUdpClient:
def __init__(self, rip='127.0.0.1', rport=9999):
self.sock = socket.socket(type=socket.SOCK_DGRAM)
self.raddr = (rip, rport)
self.event = threading.Event()
def start(self):
self.sock.connect(self.raddr) # 占用本地地址和端口,设置远端地址和端口
threading.Thread(target=self.recv, name='recv').start()
def recv(self):
while not self.event.is_set():
data, raddr = self.sock.recvfrom(1024)
msg = '{}. from {}:{}'.format(data.decode(), *raddr)
logging.info(msg)
def send(self, msg: str):
self.sock.sendto(msg.encode(), self.raddr)
def stop(self):
self.sock.close()
self.event.set()
def main():
cc1 = ChatUdpClient()
cc2 = ChatUdpClient()
cc1.start()
cc2.start()
print(cc1.sock)
print(cc2.sock)
while True:
cmd = input('Input your words >>')
if cmd.strip() == 'quit':
cc1.stop()
cc2.stop()
break
cc1.send(cmd)
cc2.send(cmd)
if __name__ == '__main__':
main()
上面的例子并不完善,如果客户端断开了,服务端不知道。每一个服务端还需要对所有客户端发送数据,包括已经断开的客户端。
1.3.5、代码改进
1.3.5.1、服务端代码改进
加一个 ack 机制和心跳 heartbeat。 心跳,就是一端定时发往另一端的信息,一般每次数据越少越好。心跳时间间隔约定好就行。 ack 即响应,一端收到另一端的消息后,返回信息。
心跳机制:
- 一般来说是客户端定时发往服务端的,服务端并不需要 ack 回复客户端,只需要记录该客户端还活着就行了。
- 如果是服务端定时发往客户端的,一般需要客户端 ack 响应来表示活着,如果没有收到 ack 的客户端,服务端移除其信息。这种实现较为复杂,用的较少。
- 也可以双向都发心跳的,用的更少。
在服务器端代码中使用第一种机制改进:
import socket
import threading
import datetime
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUDPServer:
def __init__(self, ip='127.0.0.1', port=9999, interval=10):
self.addr = (ip, port)
self.sock = socket.socket(type=socket.SOCK_DGRAM)
self.clients = {} # 记录客户端,改为字典
self.event = threading.Event()
self.interval = interval # 默认10秒,超时就要移除对应的客户端
def start(self):
self.sock.bind(self.addr) # 立即绑定
# 启动线程
threading.Thread(target=self.recv, name='recv').start()
def recv(self):
while not self.event.is_set():
localset = set() # 清理超时
data, raddr = self.sock.recvfrom(1024) # 阻塞接收数据
current = datetime.datetime.now().timestamp() # float
if data.strip() == b'^hb^': # 心跳信息
print('year ^hb^', raddr)
self.clients[raddr] = current
continue
elif data.strip() == b'quit':
# 有可能发来数据的不在clients中
self.clients.pop(raddr, None)
logging.info('{} leaving'.format(raddr))
continue
# 有信息来就更新时间
# 什么时候比较心跳时间呢? 发送信息的时候,反正要遍历一遍
self.clients[raddr] = current
msg = '{}. from {}:{}'.format(data.decode(), *raddr)
logging.info(msg)
msg = msg.encode()
for c,stamp in self.clients.items():
if current - stamp > self.interval:
localset.add(c)
else:
self.sock.sendto(msg, c) # 不保证对方能收到
for c in localset:
self.clients.pop(c)
def stop(self):
for c in self.clients:
self.sock.sendto(b'bye', c)
self.sock.close()
self.event.set()
def main():
cs = ChatUDPServer()
cs.start()
while True:
cmd = input(">>>")
if cmd.strip() == 'quit':
cs.stop()
break
logging.info(threading.enumerate())
logging.info(cs.clients)
if __name__ == '__main__':
main()
1.3.5.2、客户端代码改进
增加定时发送心跳代码:
import threading
import socket
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUdpClient:
def __init__(self, rip='127.0.0.1', rport=9999):
self.sock = socket.socket(type=socket.SOCK_DGRAM)
self.raddr = (rip, rport)
self.event = threading.Event()
def start(self):
self.sock.connect(self.raddr) # 占用本地地址和端口,设置远端地址和端口
threading.Thread(target=self._sendhb, name='heartbeat', daemon=True).start()
threading.Thread(target=self.recv, name='recv').start()
def _sendhb(self): # 心跳
while not self.event.wait(5):
self.send('^hb^')
def recv(self):
while not self.event.is_set():
data, raddr = self.sock.recvfrom(1024)
msg = '{}. from {}:{}'.format(data.decode(), *raddr)
logging.info(msg)
def send(self, msg: str):
self.sock.sendto(msg.encode(), self.raddr)
def stop(self):
self.send('quit') # 通知服务端退出
self.sock.close()
self.event.set()
def main():
cc1 = ChatUdpClient()
cc2 = ChatUdpClient()
cc1.start()
cc2.start()
print(cc1.sock)
print(cc2.sock)
while True:
cmd = input('Input your words >>')
if cmd.strip() == 'quit':
cc1.stop()
cc2.stop()
break
cc1.send(cmd)
cc2.send(cmd)
if __name__ == '__main__':
main()
1.3.5.3、UDP 协议应用
UDP 是无连接协议,它基于以下假设: 网络足够好,消息不会丢包,包不会乱序。
但是,即使是在局域网,也不能保证不丢包,而且包的到达不一定有序。
应用场景,视频、音频传输,一般来说,丢些包,问题不大,最多丢些图像、听不清话语,可以重新发话语来解决。
海量采集数据,例如传感器发来的数据,丢几十、几百条数据也没有关系。 DNS 协议,数据内容小,一个包就能
查询到结果,不存在乱序,丢包,重新请求解析。
一般来说,UDP 性能优于 TCP,但是可靠性要求高的场合的还是要选择 TCP 协议。
1.4、SocketServer
socket 编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对 socket 底层 API 进行封装,Python 的封装就是 socketserver 模块。它是网络服务编程框架,便于企业级快速开发。
1.4.1、类的继承关系
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
SocketServer 简化了网络服务器的编写。
它有 4 个同步类:TCPServer,UDPServer,UnixStreamServer,UnixDatagramServer。
2 个 Mixin 类:ForkingMixIn 和 ThreadingMixIn 类,用来支持异步。
class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
fork 是创建多进程,thread 是创建多线程。
1.4.2、编程接口
socketserver.BaseServer(server_address, RequestHandlerClass)
需要提供服务器绑定的地址信息,和用于处理请求的 RequestHandlerClass 类。
RequestHandlerClass 类必须是 BaseRequestHandler 类的子类,在 BaseServer 中代码如下:
class BaseServer:
timeout = None
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
def finish_request(self, request, client_address): # 处理请求的方法
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self) # RequestHandlerClass构造
BaseRequestHandler 类:
它是和用户连接的用户请求处理类的基类,定义为 BaseRequestHandler(request, client_address, server)。
服务端 Server 实例接收用户请求后,最后会实例化这个类。
它被初始化时,送入 3 个构造参数:request, client_address, server 自身。
以后就可以在 BaseRequestHandler 类的实例上使用以下属性:
self.request 是和客户端的连接的 socket 对象
self.server 是 TCPServer 本身
self.client_address 是客户端地址
这个类在初始化的时候,它会依次调用 3 个方法。子类可以覆盖这些方法。
# BaseRequestHandler 要子类覆盖的方法
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
测试代码:
import threading
import socketserver
class MyHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
super().handle() # 可以不调用,父类handle什么都没有做
print('-' * 80)
print(1, '-->', self.server) # 服务
print(2, '-->', self.request) # 服务端负责客户端连接请求的socket对象
print(3, '-->', self.client_address) # 客户端地址
print(4, '-->', self.__dict__) # 能看到负责accept的socket
print(5, '-->', threading.enumerate())
print(6, '-->', threading.current_thread())
print('-' * 80)
addr = ('10.47.76.53', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever() # 永久
测试结果说明,handle 方法相当于 socket 的 recv 方法。
每个不同的连接上的请求过来后,生成这个连接的 socket 对象即 self.request,客户端地址 self.client_address。
问题:
测试过程中,上面代码,连接后立即断开了,为什么?
怎样才能客户端和服务器端长时间连接?
import logging
import threading
import socketserver
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class MyHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
super().handle() # 可以不调用,父类handle什么都没有做
print('-' * 80)
print(1, '-->', self.server) # 服务
print(2, '-->', self.request) # 服务端负责客户端连接请求的socket对象
print(3, '-->', self.client_address) # 客户端地址
print(4, '-->', self.__dict__) # 能看到负责accept的socket
print(5, '-->', threading.enumerate())
print(6, '-->', threading.current_thread())
print('-' * 80)
for i in range(3):
data = self.request.recv(1024)
logging.info(data)
logging.info('======= end =======')
addr = ('10.47.76.53', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever() # 永久
将 ThreadingTCPServer 换成 TCPServer,同时连接 2 个客户端观察效果。
ThreadingTCPServer 是异步的,可以同时处理多个连接。
TCPServer 是同步的,一个连接处理完了,即一个连接的 handle 方法执行完了,才能处理另一个连接,且只有主线程。
创建服务器需要几个步骤:
- 从 BaseRequestHandler 类派生出子类,并覆盖其 handle() 方法来创建请求处理程序类,此方法将处理
传入请求。- 实例化一个服务器类,传参服务器的地址和请求处理类。
- 调用服务器实例的 handle_request() 或 serve_forever() 方法。
- 调用 server_close() 关闭套接字。
1.4.3、实现 EchoServer
顾名思义,Echo,来什么消息回显什么消息。客户端发来什么信息,返回什么信息。
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys
class EchoHandler(BaseRequestHandler):
def setup(self) -> None:
super().setup()
self.event = threading.Event() # 初始工作
def finish(self) -> None:
super().finish()
self.event.set()
def handle(self) -> None:
super().handle()
while not self.event.is_set():
data = self.request.recv(1024).decode()
msg = "{} {}".format(self.client_address, data).encode()
self.request.send(msg)
print('End')
addr = ('0.0.0.0', 9999)
server = ThreadingTCPServer(addr, EchoHandler)
server_thread = threading.Thread(target=server.serve_forever, name='EchoServer', daemon=True)
server_thread.start()
try:
while True:
cmd = input('>>>')
if cmd.strip() == 'quit':
break
print(threading.enumerate())
except Exception as e:
print(e)
except KeyboardInterrupt:
pass
finally:
print('exit')
sys.exit(0)
1.4.4、改写 ChatServer
使用 ThreadingTCPServer 改写 ChatServer:
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatHandler(BaseRequestHandler):
CLIENTS = {}
def setup(self) -> None:
super().setup()
self.event = threading.Event() # 初始工作
self.CLIENTS[self.client_address] = self.request
def finish(self) -> None:
super().finish() # 清理工作
self.CLIENTS.pop(self.client_address) # 能执行到吗?
self.event.set()
def handle(self) -> None:
super().handle()
while not self.event.is_set():
data = self.request.recv(1024).decode()
if data == 'quit':
break
msg = "{} {}".format(self.client_address, data).encode()
logging.info(msg)
for c in self.CLIENTS.values():
c.send(msg)
print('End')
addr = ('0.0.0.0', 9999)
server = ThreadingTCPServer(addr, ChatHandler)
server_thread = threading.Thread(target=server.serve_forever, name='ChatHandler', daemon=True)
server_thread.start()
try:
while True:
cmd = input('>>>')
if cmd.strip() == 'quit':
break
print(threading.enumerate())
except Exception as e:
print(e)
except KeyboardInterrupt:
pass
finally:
print('exit')
sys.exit(0)
问题:
上例 self.clients.pop(self.client_address) 能执行到吗?
如果连接的线程中 handle 方法中抛出异常,例如客户端主动断开导致的异常,线程崩溃,self.clients 的 pop 方法还能执行吗?
当然能执行,基类源码保证了即使异常,也能执行 finish 方法。但不代表不应该捕获客户端各种异常。
1.4.5、解决客户端主动连接断开问题
如果客户端主动断开,总是抛出一个异常。看看到底发生了什么,在 handle 方法中增加一些语句。
def handle(self) -> None:
super().handle()
while not self.event.is_set():
data = self.request.recv(1024).decode()
print(data, '+++++++++++++++++++') # 增加
if data == 'quit':
break
msg = "{} {}".format(self.client_address, data).encode()
logging.info(msg)
for c in self.CLIENTS.values():
print('+++++++++++++++++++') # 增加
c.send(msg)
print('End')
通过打印可以看到,客户端主动断开,会导致 recv 方法立即返回一个空 bytes,并没有同时抛出异常。当循环回到 recv 这一句的时候就会抛出异常。所以,可以通过判断 data 数据是否为空来客户端是否断开。
def handle(self) -> None:
super().handle()
while not self.event.is_set():
data = self.request.recv(1024).decode()
print(data, '+++++++++++++++++++') # 增加
if not data or data == 'quit':
print("Broken pipe")
break
msg = "{} {}".format(self.client_address, data).encode()
logging.info(msg)
for c in self.CLIENTS.values():
print('+++++++++++++++++++') # 增加
c.send(msg)
print('End')
为每一个连接提供 RequestHandlerClass 类实例,依次调用 setup、handle、finish 方法,且使用了 try...finally 结构保证 finish 方法一定能被调用。
这些方法依次执行完成,如果想维持这个连接和客户端通信,就需要在 handle 函数中使用循环。
socketserver 模块提供不同的类,但是编程接口是一样的,即使是多进程、多线程的类也是一样,大大减少了编程的难度。
1.5、异步编程
1.5.1、同步、异步
函数或方法被调用的时候,调用者是否得到最终结果的。
-
直接得到最终结果的,就是同步调用;
-
不直接得到最终结果的,就是异步调用。
同步就是我让你打饭,你不打好我不走开,直到你打好饭给了我。
异步就是我让你打饭,你打着,我不等你,但是我会盯着你,你打完,我会过来拿走的。异步并不保证多长时间最终打完饭。
1.5.2、阻塞、非阻塞
函数或方法调用的时候,是否立刻返回。
- 立即返回就是非阻塞调用;
- 不立即返回就是阻塞调用。
1.5.3、区别
同步、异步,与阻塞、非阻塞不相关。
同步、异步强调的是,是否得到 最终的结果
;
阻塞、非阻塞强调是时间,是否 等待
。
同步与异步区别在于:调用者是否得到了想要的最终结果。
同步就是一直要执行到返回最终结果;
异步就是直接返回了,但是返回的不是最终结果。调用者不能通过这种调用得到结果,还要通过被调用者,使用其它方式通知调用者,来取回最终结果。
阻塞与非阻塞的区别在于,调用者是否还能干其他事。
阻塞,调用者就只能干等;
非阻塞,调用者可以先去忙会别的,不用一直等。
1.5.4、联系
同步阻塞,我啥事不干,就等你打饭给我。打到饭是结果,而且我啥事不干一直等,同步加阻塞。
同步非阻塞,我等着你打饭给我,但我可以玩会手机、看看电视。打饭是结果,但是我不一直等。
异步阻塞,我要打饭,你说等叫号,并没有返回饭给我,我啥事不干,就干等着饭好了你叫我。
异步非阻塞,我要打饭,你说等叫号,并没有返回饭给我,我在旁边看电视、玩手机,饭打好了叫我。
1.5.5、IO 两个阶段
IO 过程分两阶段:
-
数据准备阶段
-
内核空间复制回用户进程缓冲区阶段
发生 IO 的时候:
-
内核从输入设备读、写数据(淘米,把米放饭锅里煮饭)
-
进程从内核复制数据(盛饭,从内核这个饭锅里面把饭装到碗里来)
系统调用 --> read 函数。
1.5.6、同步 IO
同步 IO 模型包括:阻塞 IO、非阻塞 IO、IO 多路复用。
1.5.6.1、阻塞 IO
进程等待(阻塞),直到读写完成。(全程等待)
read/write 函数。
1.5.6.2、非阻塞 IO
进程调用 read 操作,如果 IO 设备没有准备好,立即返回 EWOULDBLOCK ERROR
异常,进程不阻塞。用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。
第一阶段数据没有准备好,就先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的。
第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。
淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭。
read/write 函数。
1.5.6.3、IO 多路复用
所谓 IO 多路复用,就是同时监控多个 IO,有一个准备好了,就不需要等了,开始处理,提高了同时处理 IO 的能力。
select 几乎所有操作系统平台都支持,poll 是对 select 的升级。
epoll,Linux 系统内核 2.5+ 开始支持,对 select 和 poll 的增强,在监视的基础上,增加回调机制。BSD、Mac 平台有 kqueue,Windows 有 iocp。
以 select 为例,将关注的 IO 操作告诉 select 函数并调用,进程阻塞,内核 “监视” select 关注的文件描述符 fd,被关注的任何一个 fd 对应的 IO 准备好了数据,select 返回。再使用 read 将数据复制到用户进程。
select 举例,食堂供应很多菜(众多的 IO),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要等,你只好等待。其中一样菜好了,大师傅叫你过来说你点的菜有好的了,你得自己找找看哪一样菜好了,请服务员把做好的菜打给你。
epoll 是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了。
一般情况下,select 最多能监听 1024 个 fd(可以修改,但不建议改),但是由于 select 采用轮询的方式,当管理的 IO 多了,每次都要遍历全部 fd,效率低下。
epoll 没有管理的 fd 的上限,且是回调机制,不需遍历,效率很高。
1.5.7、异步 IO
进程发起异步 IO 请求,立即返回。内核完成 IO 的两个阶段,内核给进程发一个信号。
举例,来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两阶段都是异步的。
在整个过程中,进程都可以忙别的,等好了才过来。
举例,今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二
阶段)。
Linux 的 aio 的系统调用,内核从版本 2.6 开始支持。
1.6、Python3 中 IO 多路复用
IO 多路复用
-
大多数操作系统都支持 select 和 poll
-
Linux 2.5+ 支持 epoll
-
BSD、Mac 支持 kqueue
-
Windows 的 IOCP
Python 的 select 库:
实现了 select、poll 系统调用,这个基本上操作系统都支持。部分实现了 epoll。
底层的 IO 多路复用模块。
开发中的选择:
1、完全跨平台,使用 select、poll。但是性能较差。
2、针对不同操作系统自行选择支持的技术,这样做会提高 IO 处理的性能。
1.6.1、selectors 库
3.4 版本提供这个库,高级 IO 复用库。
类层次结构︰
BaseSelector
+-- SelectSelector 实现select
+-- PollSelector 实现poll
+-- EpollSelector 实现epoll
+-- DevpollSelector 实现devpoll
+-- KqueueSelector 实现kqueue
selectors.DefaultSelector 返回当前平台最有效、性能最高的实现。
但是,由于没有实现 Windows 下的 IOCP,所以,只能退化为 select。
# 在selects模块源码最下面有如下代码
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
abstractmethod register(fileobj, events, data=None)
为 selector 注册一个文件对象,监视它的 IO 事件。
fileobj 被监视文件对象,例如 socket 对象。
events 事件,该文件对象必须等待的事件。
data 可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话 ID,关联方法。通过这个参数在关注的事件产生后让 selector 干什么事。
Event 常量 | 含义 |
---|---|
EVENT_READ | 可读 0b01,内核已经准备好输入输出设备,可以开始读了 |
EVENT_WRITE | 可写 0b10,内核准备好了,可以往里写了 |
官方样例:
import selectors
import socket
# 构造缺省性能最优selector
sel = selectors.DefaultSelector()
# 回调函数,自己定义形参
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print(1, '-->', 'accepted', conn, 'from', addr)
conn.setblocking(False) # 非阻塞
sel.register(conn, selectors.EVENT_READ, read)
# 回调函数
def read(conn, mask):
data = conn.recv(1000) # Should be ready
if data:
print(2, '-->', 'echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print(3, '-->', 'closing', conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('0.0.0.0', 1234))
sock.listen(100)
sock.setblocking(False) # 非阻塞
# 注册文件对象sock关注读事件,返回SelectorKey
# 将sock、关注事件、data都绑定到key实例属性上
sltkey = sel.register(sock, selectors.EVENT_READ, accept)
print(4, '-->', sltkey)
while True:
# 开始监视,等到有文件对象监控事件产生,返回(key, mask)元组
events = sel.select()
for key, mask in events:
callback = key.data # 回调函数
callback(key.fileobj, mask)
1.6.2、ChatServer IO 多路复用
将 ChatServer 改写成 IO 多路复用的方式,不需要启动多线程来执行 socket 的 accept、recv 方法了。
import socket
import threading
import datetime
import logging
import selectors
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatServer:
def __init__(self, ip='0.0.0.0', port=9999):
self.sock = socket.socket()
self.addr = (ip, port)
self.event = threading.Event()
self.selector = selectors.DefaultSelector() # 创建selector
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
self.sock.setblocking(False) # 不阻塞
# 注册
self.selector.register(self.sock, selectors.EVENT_READ, self.accept)
threading.Thread(target=self.select, name='selector', daemon=True).start()
def select(self): # 阻塞
while not self.event.is_set():
# 开始监视,等到某文件对象被监控的事件产生,返回(key, mask)元组
events = self.selector.select() # 阻塞,直到events
print('-' * 30)
for key, mask in events:
logging.info(key)
logging.info(mask)
key.data(key.fileobj) # 回调函数,注册时都绑定到了selectkey实例
def accept(self, sock: socket.socket): # 接收客户端连接
conn, raddr = sock.accept()
conn.setblocking(False) # 非阻塞
# 注册,监视每一个与客户端的连接的socket对象
self.selector.register(conn, selectors.EVENT_READ, self.recv)
def recv(self, sock: socket.socket): # 接收客户端数据
data = sock.recv(1024)
if not data or data == b'quit': # 客户端主动断开或退出,注销并关闭socket
self.selector.unregister(sock)
sock.close()
return
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(),
*sock.getpeername(), data.decode())
logging.info(msg)
msg = msg.encode()
# 群发
for key in self.selector.get_map().values():
if key.data == self.recv: # 排除self.accept
key.fileobj.send(msg)
def stop(self): # 停止服务
self.event.set()
fobjs = []
for fd, key in self.selector.get_map().items():
fobjs.append(key.fileobj)
for fobj in fobjs:
self.selector.unregister(fobj)
fobj.close()
self.selector.close()
def main():
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info(threading.enumerate())
if __name__ == '__main__':
main()
进阶:
send 是写操作,也可以让 selector 监听,如何监听?
self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.recv)
注册语句,要监听 selectors.EVENT_READ | selectors.EVENT_WRITE
读与写事件。
回调的时候,需要 mask 来判断究竟是读触发还是写触发了。所以,需要修改方法声明,增加 mask。
def recv(self, sock, mask)
但是由于 recv 方法处理读和写事件,所以叫 recv 不太合适,改名为 def handle(self, sock, mask)
。
注意读和写是分离的,那么 handle 函数应该写成下面这样:
def handle(self, sock:socket.socket, mask): # 接收客户端数据
if mask & selectors.EVENT_READ:
pass
# 注意,这里是某一个socket的写操作
if mask & selectors.EVENT_WRITE: # 写缓冲区准备好了,可以写入数据了
pass
handle 方法里面处理读、写,mask 有可能是 0b01、0b10、0b11。
问题是,假设读取到了客户端发来的数据后,如何写出去?
为每一个与客户端连接的 socket 对象增加对应的队列。
与每一个客户端连接的 socket 对象,自己维护一个队列,某一个客户端收到信息后,会遍历发给所有客户端的队列。这里完成一对多,即一份数据放到了所有队列中。
与每一个客户端连接的 socket 对象,发现自己队列有数据,就发送给客户端。
import socket
import threading
import datetime
import logging
import selectors
from queue import Queue
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999):
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {}
self.event = threading.Event()
self.selector = selectors.DefaultSelector() # 创建selector
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
self.sock.setblocking(False) # 不阻塞
# 注册
self.selector.register(self.sock, selectors.EVENT_READ, self.accept)
threading.Thread(target=self.select, name='selector', daemon=True).start()
def select(self): # 阻塞
while not self.event.is_set():
# 开始监视,等到某文件对象被监控的事件产生,返回(key, mask)元组
events = self.selector.select() # 阻塞,直到events
for key, mask in events:
if callable(key.data):
callback = key.data # key对象的data属性,回调
callback(key.fileobj, mask)
else:
callback = key.data[0]
callback(key, mask)
def accept(self, sock: socket.socket, mask): # 接收客户端连接
conn, raddr = sock.accept()
conn.setblocking(False) # 非阻塞
self.clients[raddr] = (self.handle, Queue())
# 注册,监视每一个与客户端的连接的socket对象
self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE,
self.clients[raddr])
def handle(self, key: selectors.SelectorKey, mask): # 接收客户端数据
if mask & selectors.EVENT_READ:
sock = key.fileobj
raddr = sock.getpeername()
data = sock.recv(1024)
if not data or data == b'quit':
self.selector.unregister(sock)
sock.close()
self.clients.pop(raddr)
return
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *raddr, data.decode())
logging.info(msg)
msg = msg.encode()
for k in self.selector.get_map().values():
logging.info(k)
if isinstance(k.data, tuple):
k.data[1].put(data)
if mask & selectors.EVENT_WRITE:
# 因为写一直就绪,mask为2,所以一直可以写,从而导致select()不断循环,如同不阻塞一样
if not key.data[1].empty():
key.fileobj.send(key.data[1].get())
def stop(self): # 停止服务
self.event.set()
fobjs = []
for fd, key in self.selector.get_map().items():
fobjs.append(key.fileobj)
for fobj in fobjs:
self.selector.unregister(fobj)
fobj.close()
self.selector.close()
def main():
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info('-' * 30)
logging.info("{} {}".format(len(cs.clients), cs.clients))
logging.info(list(map(lambda x: (x.fileobj.fileno(), x.data), cs.selector.get_map().values())))
logging.info('-' * 30)
logging.info(threading.enumerate())
if __name__ == '__main__':
main()
这个程序最大的问题,在 select() 一直判断可写,几乎一直循环不停。所以对于写不频繁的情况下,就不要监听 EVENT_WRITE。
一般对于 Server 来说,更多的是等待对方发来数据后响应时才发出数据,而不是积极的等着发送数据。所以监听 EVENT_READ,收到数据之后再发送就可以了。
本例只完成基本功能,其他功能如有需要,请自行完成。