16、Python3 日志处理
16.1、概述
生产中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载、健康状况,可以分析客户的分布情况、客户的行为,甚至基于这些分析可以做出预测。
一般采集流程:
日志产出 -> 采集( Logstash、Flume、 Scribe ) -> 存储 -> 分析 -> 存储(数据库、NOSQL) -> 可视化。
开源实时日志分析 ELK 平台:
Logstash 收集日志,并存放到 ElasticSearch 集群中,Kibana 则从 ES 集群中查询数据生成图表,返回浏览器端。
16.2、半结构化数据
日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。
16.2.1、文本分析
日志是文本文件,需要依赖文件 IO、字符串操作、正则表达式等技术。
通过这些技术就能够把日志中需要的数据提取出来。
123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
这是最常见的日志,nginx、tomcat 等 WEB Server 都会产生这样的日志。如何提取出数据?
这里面每一段有效的数据对后期的分析都是必须的。
16.2.2、提取数据
16.2.3、空格分割
with open('access.log') as f:
for line in f:
for field in line.split():
print(field)
缺点:
数据并没有按照业务分割好,比如时间就被分开了,URL 相关的也被分开了,User Agent 的空格最多,被分割了。
所以,定义的时候不选用这种在 filed 中出现的字符就可以省很多事,例如使用 \x01
这个不可见的 ASCIl,print('\x01')
试一试。
能否依旧是空格分割,但是遇到双引号、中括号特殊处理一下?
思路 :
先按照空格切分,然后一个个字符迭代,但如果发现是 [
或者 "
,就不判断是否空格,直到 ]
或者 "
结尾,这个区间获取的就是整体数据。
fragment = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''
CHARSET = set(" \t")
def mkkey(line: str):
start = 0
skip = False
for i, c in enumerate(line):
if not skip and c in '"[':
skip = True
start = i + 1
elif skip and c in '"]':
skip = False
yield line[start:i]
start = i + 1
continue
if skip:
continue
if c in CHARSET:
if start == i:
start += 1
continue
yield line[start:i]
start = i + 1
else:
if start < len(line):
yield line[start:]
print(tuple(mkkey(fragment)))
16.2.3.1、类型转换
fields 中的数据是有类型的,例如时间、状态码等。对不同的 field 要做不同的类型转换,甚至是自定义的转换。
时间转换:
06/Apr/2017:18:09:25 +0800
对应格式是 %d/%b/%Y:%H:%M:%S %z
。
使用的函数是 datetime 类的 strptime 方法。
import datetime
def convert_time(timestr):
return datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%M:%S %z')
# lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%M:%S %z')
状态码和字节数:都是整型,使用 int 函数转换。
16.2.3.2、请求信息的解析
GET / HTTP/1.1
method url protocol 三部分都非常重要
def get_request(request: str):
return dict(zip(['method', 'url', 'protocol'], request.split()))
# lambda request: dict(zip(['method', 'url', 'protocol'], request.split()))
16.2.3.3、映射
对每一个字段命名,然后值和类型转换的方法对应。解析每一行是有顺序的。
import datetime
fragment = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''
CHARSET = set(" \t")
def mkkey(line: str):
start = 0
skip = False
for i, c in enumerate(line):
if not skip and c in '"[':
skip = True
start = i + 1
elif skip and c in '"]':
skip = False
yield line[start:i]
start = i + 1
continue
if skip:
continue
if c in CHARSET:
if start == i:
start += 1
continue
yield line[start:i]
start = i + 1
else:
if start < len(line):
yield line[start:]
names = ('remote', '', '', 'datetime', 'request', 'status', 'length', '', 'useragent')
ops = (None, None, None,
lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
lambda request: dict(zip(['method', 'url', 'protocol'], request.split())),
int, int, None, None
)
def extract(line: str):
return dict(map(lambda item: (item[0], item[2](item[1]) if item[2] is not None else item[1]),
zip(names, mkkey(line), ops)))
print(extract(fragment))
16.2.4、正则表达式提取
构造一个正则表达式提取需要的字段,改造 extract 函数、names 和 ops。
names = ('remote', 'datetime', 'method', 'url', 'protocol', 'status', 'length', 'useragent')
ops = (None,
lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
None, None, None, int, int, None)
pattern = '''([\d.]{7,}) - - \[([/\w +:]+)\] "(\w+) (\S+) ([\w/\d.]+)" (\d+) (\d+) .+ "(.+)"'''
能使用命名分组吗?
进一步改造 pattern 为命名分组,ops 也就可以和名词对应了,names 就没有必要存在了。
ops = {'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'length': int
}
pattern = r'''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''
改造后的代码:
import datetime
import re
fragment = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''
CHARSET = set(" \t")
ops = {'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'length': int
}
pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''
regex = re.compile(pattern)
def extract(line: str) -> dict:
matcher = regex.match(line)
return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
print(extract(fragment))
16.2.5、异常处理
日志中不免会出现一些不匹配的行,需要处理。
这里使用 re.match 方法,有可能匹配不上。所以要增加一个判断。
采用抛出异常的方式,让调用者获得异常并自行处理。
def extract(line: str) -> dict:
"""返回字段的字典,抛出异常说明匹配失败"""
matcher = regex.match(line)
if matcher:
return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
else:
raise Exception("No match")
16.3、滑动窗口
16.3.1、数据载入
对于本项目来说,数据就是日志的一行行记录,载入数据就是文件 IO 的读取。将获取数据的方法封装成函数。
def load(path):
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败就抛弃,或者打印日志
16.3.2、时间窗口分析
16.3.2.1、概念
很多数据,例如日志,都是和时间相关的,都是按照时间顺序产生的。
产生的数据在分析的时候,要按照时间求值。
interval 表示每一次求值的时间间隔。
width 时间窗口宽度,指的一次求值的时间窗口宽度。
16.3.2.2、width > interval
数据求值时会有重叠。
16.3.2.3、width = interval
数据求值时没有重叠。
16.3.2.4、width < interval
一般不采纳这种方案,会有数据丢失。
16.3.3、时序数据
运维环境中,日志、监控等产生的数据都是与时间相关的数据,按照时间先后产生并记录下来的数据,所以一般按照时间对数据进行分析。
16.3.4、数据分析基本程序结构
函数无限的生成随机数,产生时间相关的数据,返回时间和随机数的字典,每次取3个数据,求平均值。
import random
import datetime
import time
# 数据源函数,生成随机数
def source():
while True:
yield {'value': random.randint(1, 100), 'datetime': datetime.datetime.now()}
time.sleep(1)
# 获取数据
src = source()
items = [next(src) for _ in range(3)]
# 处理函数
def handler(iterable):
return sum(map(lambda item: item['value'], iterable)) / len(iterable)
print(items)
print('{:.2f}'.format(handler(items)))
上面代码模拟了,一段时间内产生了数据,等了一段固定的时间取数据来计算平均值。
16.3.5、窗口函数实现
将上面获取数据的程序扩展为 window 函数。使用重叠的方案。
import random
import datetime
import time
def source(second=1):
"""生成数据"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
time.sleep(second)
# 处理函数
def handler(iterable):
return sum(map(lambda item: item['value'], iterable)) / len(iterable)
def window(iterator, handler, width: int, interval: int):
"""
窗口函数
:param iterator: 数据源,生成器,用来拿数据
:param handler: 数据处理函数
:param width: 时间窗口宽度, 秒
:param interval: 处理时间间隔,秒
"""
start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = []
delta = datetime.timedelta(seconds=width - interval)
while True:
# 从数据源获取数据
data = next(iterator)
if data:
buffer.append(data) # 存入临时缓冲等待计算
current = data['datetime']
# 每隔interval计算buffer中的数据一次
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('{:.2f}'.format(ret))
start = current
# 清除超出width的数据
buffer = [x for x in buffer if x['datetime'] > current - delta]
window(source(), handler, 10, 5)
时间的计算:
16.4、数据分发
16.4.1、生产者消费者模型
对于一个监控系统,需要处理很多数据,包括日志。对其中已有数据的采集、分析。
被监控对象就是数据的生产者 producer,数据的处理程序就是数据的消费者 consumer。
生产者消费者传统模型:
传统的生产者消费者模型,生产者生产,消费者消费。但这种模型有些问题。
开发的代码耦合太高,如果生产规模扩大,不易扩展,生产和消费的速度很难匹配等。
解决的办法:使用队列 queue。作用是解耦、缓冲。
日志生产者往往会部署好几个程序,日志产生的也很多,而消费者也会有多个程序,去提取日志分析处理。
数据的生产是不稳定的,会造成短时间数据的“潮涌”,需要缓冲。
消费者消费能力不一样,有快有慢,消费者可以自己决定消费缓冲区中的数据。
单机可以使用 queue 内建的模块构建进程内的队列,满足多个线程间的生产消费需要。
大型系统可以使用第三方消息中间件:RabbitMQ、RocketMQ、Kafka。
16.4.2、queue 模块
queue 模块提供了一个先进先出的队列 Queue。
queue.Queue(maxsize=0):创建 FIFO 队列,返回 Queue 对象。maxsize 小于等于 0,队列长度没有限制。
Queue.get(block=True, timeout=None):从队列中移除元素并返回这个元素。block 为阻塞,timeout 为超时。
- 如果 block 为 True,是阻塞,timeout 为 None 就是一直阻塞。
- 如果 block 为 True 但是 timeout 有值,就阻塞到一定秒数抛出 Empty 异常。
- block 为 False,是非阻塞,timeout 将被忽略,要么成功返回一个元素,要么抛出 empty 异常。
Queue.get_nowait():等价于 get(False),也就是说要么成功返回一个元素,要么抛出 empty 异常。但是 queue 的这种阻塞效果,在多线程的时候演示才明显。
Queue.put(item, block=True, timeout=None):把一个元素加入到队列中去。
- block=True,timeout=None,一直阻塞直至有空位放元素。
- block=True,timeout=5,阻塞 5 秒就抛出 Full 异常。
- block=False,timeout 失效,立即返回,能塞进去就塞,不能则抛出 Full 异常。
Queue.put_nowait(item):等价于 put(item, False),也就是能塞进去就塞,不能则抛出 Full 异常。
from queue import Queue
import random
q = Queue()
q.put(random.randint(1, 100))
q.put(random.randint(1, 100))
print(q.get())
print(q.get())
# print(q.get()) # 阻塞
print(q.get(timeout=3)) # 阻塞3秒,抛出异常
16.4.3、分发器的实现
生产者(数据源)生产数据,缓冲到消息队列中。
数据处理流程:数据加载 --> 提取 --> 分析(滑动窗口函数)。
处理大量数据的时候,对于一个数据源来说,需要多个消费者处理。但是如何分配数据就是个问题了。
需要一个分发器(调度器),把数据分发给不同的消费者处理。
每一个消费者拿到数据后,有自己的处理函数。所以要有一种注册机制。
数据加载 --> 提取 --> 分发 --> 分析函数 1
--> 分析函数 2
分析函数 1 和分析函数 2 是不同的 handler,不同的窗口宽度、间隔时间。
如何分发?
-
这里就简单一点,轮询策略。
-
一对多的副本发送,一个数据通过分发器,发送到 n 个消费者。
消息队列?
- 在生产者和消费者之间使用消息队列,那么所有消费者共用一个消息队列,还是各自拥有一个消息队列呢?
- 共用一个消息队列也可以,但是需要解决争抢的问题。相对来说每一个消费者自己拥有一个队列较为容易。
如何注册?
- 在调度器内部记录有哪些消费者,每一个消费者拥有自己的队列。
线程?
-
由于一条数据会被多个不同的注册过的 handler 处理,所以最好的方式是多线程。
import threading # 定义线程 t = threading.Thread(target=window, args=(src, handler, width, interval)) t.start()
import random
import datetime
import time
from queue import Queue
import threading
def source(second=1):
"""生成数据"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
time.sleep(second)
# 处理函数
def handler(iterable):
return sum(map(lambda item: item['value'], iterable)) / len(iterable)
def window(src: Queue, handler, width: int, interval: int):
"""
窗口函数
:param iterator: 数据源,生成器,用来拿数据
:param handler: 数据处理函数
:param width: 时间窗口宽度, 秒
:param interval: 处理时间间隔,秒
"""
start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = []
delta = datetime.timedelta(seconds=width - interval)
while True:
# 从数据源获取数据
data = src.get()
if data:
buffer.append(data) # 存入临时缓冲等待计算
current = data['datetime']
# 每隔interval计算buffer中的数据一次
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('{:.2f}'.format(ret))
start = current
# 清除超出width的数据
buffer = [x for x in buffer if x['datetime'] > current - delta]
def dispatcher(src):
# 分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler, width: int, interval: int):
"""
注册 窗口处理函数
:param handler: 注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
"""
q = Queue()
queues.append(q)
h = threading.Thread(target=window, args=(q, handler, width, interval))
handlers.append(h)
def run():
for t in handlers:
t.start()
for item in src:
for q in queues:
q.put(item)
return reg, run
reg, run = dispatcher(source())
reg(handler, 10, 5) # 注册
run() # 运行
注意,以上代码也只是现阶段所学知识的一种实现,项目中建议使用消息队列服务的“订阅“模式,消费者各自消费自己的队列的数据。
16.5、文件加载
16.5.1、整合代码
load 函数就是从日志中提取合格数据的生成器函数。
它可以作为 dispatcher 函数的数据源。
原来写的 handler 函数处理一个字典的 datetime
字段,不能处理日志抽取函数 extract 返回的字典,提供一个新的函数。
import random
import datetime
import time
from queue import Queue
import threading
import re
PATTERN = r'''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''
regex = re.compile(PATTERN)
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int
}
def extract(line: str) -> dict:
matcher = regex.match(line)
if matcher:
return {
name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()
}
def load(path):
"""装载日志文件"""
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败则抛弃或者记录日志
# 数据处理
def source(second=1):
"""生成数据"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
time.sleep(second)
def window(src: Queue, handler, width: int, interval: int):
"""
注册 窗口处理函数
:param handler: 注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
"""
start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = []
delta = datetime.timedelta(seconds=width - interval)
while True:
# 从数据源获取数据
data = src.get()
if data:
buffer.append(data) # 存入临时缓冲等待计算
current = data['datetime']
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('{}'.format(ret))
start = current
buffer = [x for x in buffer if x['datetime'] > current - delta]
def handler(iterable):
return sum(map(lambda x: x['value']), iterable) / len(iterable)
def donothing_handler(iterable):
return iterable
def dispatcher(src):
# 分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler, width: int, interval: int):
"""
注册 窗口处理函数
:param handler: 注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
"""
q = Queue()
queues.append(q)
h = threading.Thread(target=window, args=(q, handler, width, interval))
handlers.append(h)
def run():
for t in handlers:
t.start()
for item in src:
for q in queues:
q.put(item)
return reg, run
if __name__ == '__main__':
import sys
path = 'access.log'
reg, run = dispatcher(load(path))
reg(donothing_handler, 10, 5)
run()
16.5.2、完成分析功能
分析日志很重要,通过海量数据分析就能够知道是否遭受了攻击,是否被爬取及爬取高峰期,是否有盗链等。
百度(Baidu) 爬虫名称(Baiduspider)
谷歌(Google) 爬虫名称(Googlebot)
16.5.2.1、状态码分析
状态码中包含了很多信息。例如:
- 304,服务器收到客户端提交的请求参数,发现资源未变化,要求浏览器使用静态资源的缓存
- 404,服务器找不到请求的资源。
304 占比大,说明静态缓存效果明显。404 占比大,说明网站出现了错误链接,或者尝试嗅探网站资源。
如果 400、500 占比突然开始增大,网站一定出问题了。
# 状态码占比
def status_handler(iterable):
# 时间窗口内的一批数据
status = {}
for item in iterable:
key = item['status']
status[key] = status.get(key, 0) + 1
total = len(iterable)
return {k: status[k] / total for k, v in status.items()}
如果还需要什么分析,增加分析函数 handler 注册就行了。
16.5.2.2、日志文件的加载
目前实现的代码中,只能接受一个路径,修改为接受一批路径。
可以约定一下路径下文件的存放方式:
-
如果送来的是一批路径,就迭代其中路径。
-
如果路径是一个普通文件,就按照行读取内容。
-
如果路径是一个目录,就遍历路径下所有普通文件,每一个文件按照行处理。不递归处理子目录。
from pathlib import Path
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass # 和下面处理一样
elif p.is_file():
with open(str(p)) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败则抛弃或者记录日志
写的过程中发现重复的地方,把文件处理部分提出来写成函数。
from pathlib import Path
def openfile(path: str):
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败则抛弃或者记录日志
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
yield from openfile(str(file))
elif p.is_file():
yield from openfile(str(p))
16.5.3、完整代码
import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path
PATTERN = r'''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''
regex = re.compile(PATTERN)
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int
}
def extract(line: str) -> dict:
matcher = regex.match(line)
if matcher:
return {
name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()
}
def openfile(path: str):
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败则抛弃或者记录日志
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
yield from openfile(str(file))
elif p.is_file():
yield from openfile(str(p))
# 数据处理
def source(second=1):
"""生成数据"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
time.sleep(second)
def window(src: Queue, handler, width: int, interval: int):
"""
注册 窗口处理函数
:param handler: 注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
"""
start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = []
delta = datetime.timedelta(seconds=width - interval)
while True:
# 从数据源获取数据
data = src.get()
if data:
buffer.append(data) # 存入临时缓冲等待计算
current = data['datetime']
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('{}'.format(ret))
start = current
buffer = [x for x in buffer if x['datetime'] > current - delta]
def handler(iterable):
return sum(map(lambda x: x['value']), iterable) / len(iterable)
def donothing_handler(iterable):
return iterable
# 状态码占比
def status_handler(iterable):
# 时间窗口内的一批数据
status = {}
for item in iterable:
key = item['status']
status[key] = status.get(key, 0) + 1
total = len(iterable)
return {k: status[k] / total for k, v in status.items()}
def dispatcher(src):
# 分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler, width: int, interval: int):
"""
注册 窗口处理函数
:param handler: 注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
"""
q = Queue()
queues.append(q)
h = threading.Thread(target=window, args=(q, handler, width, interval))
handlers.append(h)
def run():
for t in handlers:
t.start()
for item in src:
for q in queues:
q.put(item)
return reg, run
if __name__ == '__main__':
import sys
path = 'access.log'
reg, run = dispatcher(load(path))
reg(donothing_handler, 10, 5)
reg(status_handler, 10, 5)
run()
到这里,一个离线日志分析项目基本完成。
- 可以指定文件或目录,对日志进行数据分析
- 分析函数可以动态注册
- 数据可以分发给不同的分析处理程序处理
16.6、浏览器分析
16.6.1、useragent
这里指的是,软件按照一定的格式向远端的服务器提供一个标识自己的字符串。
在 HTTP 协议中,使用 user-agent 字段传送这个字符串。
注意:这个值可以被修改
格式:
现在浏览器的user-agent值格式一般如下:
Mozilla/[version] ([system and browser information]) [platform] ([platform details])[extensions]
例如:
Chrome
Mozilla/5.0 (Window NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36
Firefox
Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0
Mozilla/5.0 (X11; Ubuntu; Linux x86 64; rv:52.0) Gecko/20100101 Firefox/52.0
IE
Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)
16.6.2、信息提取
pyyaml、ua-parser、user-agents 模块。
# 安装
pip install pyyaml ua-parser user-agents
使用:
from user_agents import parse
useragents = [
"Mozilla/5.0 (Window NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0",
"Mozilla/5.0 (X11; Ubuntu; Linux x86 64; rv:52.0) Gecko/20100101 Firefox/52.0",
"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)"
]
for uastring in useragents:
ua = parse(uastring)
print(ua.browser, ua.browser.family, ua.browser.version, ua.browser.version_string)
运行结果:
Browser(family='Chrome', version=(57, 0, 2987), version_string='57.0.2987') Chrome (57, 0, 2987) 57.0.2987
Browser(family='Firefox', version=(56, 0), version_string='56.0') Firefox (56, 0) 56.0
Browser(family='Firefox', version=(52, 0), version_string='52.0') Firefox (52, 0) 52.0
Browser(family='IE', version=(10, 0), version_string='10.0') IE (10, 0) 10.0
ua.browser.family
和 ua.browser.version_string
分别返回浏览器名称、版本号。
16.6.3、数据分析
ops 增加对 useragent 的处理:
from user_agents import parse
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int,
'useragent': lambda ua: parse(ua)
}
增加浏览器分析函数:
def browser_handler(iterable):
browsers = {}
for item in iterable:
ua = item['useragent']
key = (ua.browser.family, ua.browser.version_string)
browsers[key] = browsers.get(key, 0) + 1
return browsers
注册handler,注意时间窗口宽度:
reg(browser_handler, 5, 5)
问题:如果想知道所有浏览器的统计,怎么办?
allbrowsers = {}
def browser_handler(iterable):
browsers = {}
for item in iterable:
ua = item['useragent']
key = (ua.browser.family, ua.browser.version_string)
browsers[key] = browsers.get(key, 0) + 1
allbrowsers[key] = allbrowsers.get(key, 0) + 1
print(sorted(allbrowsers.items(), key=lambda x:x[1], reverse=True)[:10])
return browsers
16.6.4、完整代码
import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path
from user_agents import parse
PATTERN = r'''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''
regex = re.compile(PATTERN)
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int,
'useragent': lambda ua: parse(ua)
}
def extract(line: str) -> dict:
matcher = regex.match(line)
if matcher:
return {
name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()
}
def openfile(path: str):
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败则抛弃或者记录日志
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
yield from openfile(str(file))
elif p.is_file():
yield from openfile(str(p))
# 数据处理
def source(second=1):
"""生成数据"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
time.sleep(second)
def window(src: Queue, handler, width: int, interval: int):
"""
注册 窗口处理函数
:param handler: 注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
"""
start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = []
delta = datetime.timedelta(seconds=width - interval)
while True:
# 从数据源获取数据
data = src.get()
if data:
buffer.append(data) # 存入临时缓冲等待计算
current = data['datetime']
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('{}'.format(ret))
start = current
buffer = [x for x in buffer if x['datetime'] > current - delta]
def handler(iterable):
return sum(map(lambda x: x['value']), iterable) / len(iterable)
def donothing_handler(iterable):
return iterable
allbrowsers = {}
def browser_handler(iterable):
browsers = {}
for item in iterable:
ua = item['useragent']
key = (ua.browser.family, ua.browser.version_string)
browsers[key] = browsers.get(key, 0) + 1
allbrowsers[key] = allbrowsers.get(key, 0) + 1
print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])
return browsers
# 状态码占比
def status_handler(iterable):
# 时间窗口内的一批数据
status = {}
for item in iterable:
key = item['status']
status[key] = status.get(key, 0) + 1
total = len(iterable)
return {k: status[k] / total for k, v in status.items()}
def dispatcher(src):
# 分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler, width: int, interval: int):
"""
注册 窗口处理函数
:param handler: 注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
"""
q = Queue()
queues.append(q)
h = threading.Thread(target=window, args=(q, handler, width, interval))
handlers.append(h)
def run():
for t in handlers:
t.start()
for item in src:
for q in queues:
q.put(item)
return reg, run
if __name__ == '__main__':
import sys
path = 'access.log'
reg, run = dispatcher(load(path))
reg(status_handler, 10, 5)
reg(browser_handler, 5, 5)
run()