16、Python3 日志处理

作者: Brinnatt 分类: python 术 发布时间: 2023-03-30 10:24

16.1、概述

生产中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载、健康状况,可以分析客户的分布情况、客户的行为,甚至基于这些分析可以做出预测。

一般采集流程:
日志产出 -> 采集( Logstash、Flume、 Scribe ) -> 存储 -> 分析 -> 存储(数据库、NOSQL) -> 可视化。

开源实时日志分析 ELK 平台:

Logstash 收集日志,并存放到 ElasticSearch 集群中,Kibana 则从 ES 集群中查询数据生成图表,返回浏览器端。

16.2、半结构化数据

日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。

16.2.1、文本分析

日志是文本文件,需要依赖文件 IO、字符串操作、正则表达式等技术。

通过这些技术就能够把日志中需要的数据提取出来。

123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"

这是最常见的日志,nginx、tomcat 等 WEB Server 都会产生这样的日志。如何提取出数据?

这里面每一段有效的数据对后期的分析都是必须的。

16.2.2、提取数据

16.2.3、空格分割

with open('access.log') as f:
    for line in f:
        for field in line.split():
            print(field)

缺点:
数据并没有按照业务分割好,比如时间就被分开了,URL 相关的也被分开了,User Agent 的空格最多,被分割了。

所以,定义的时候不选用这种在 filed 中出现的字符就可以省很多事,例如使用 \x01 这个不可见的 ASCIl,print('\x01')试一试。

能否依旧是空格分割,但是遇到双引号、中括号特殊处理一下?

思路 :

先按照空格切分,然后一个个字符迭代,但如果发现是 [ 或者 ",就不判断是否空格,直到 ] 或者 " 结尾,这个区间获取的就是整体数据。

fragment = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

CHARSET = set(" \t")

def mkkey(line: str):
    start = 0
    skip = False
    for i, c in enumerate(line):
        if not skip and c in '"[':
            skip = True
            start = i + 1
        elif skip and c in '"]':
            skip = False
            yield line[start:i]
            start = i + 1
            continue

        if skip:
            continue

        if c in CHARSET:
            if start == i:
                start += 1
                continue
            yield line[start:i]
            start = i + 1
    else:
        if start < len(line):
            yield line[start:]

print(tuple(mkkey(fragment)))

16.2.3.1、类型转换

fields 中的数据是有类型的,例如时间、状态码等。对不同的 field 要做不同的类型转换,甚至是自定义的转换。

时间转换:

06/Apr/2017:18:09:25 +0800 对应格式是 %d/%b/%Y:%H:%M:%S %z

使用的函数是 datetime 类的 strptime 方法。

import datetime

def convert_time(timestr):
    return datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%M:%S %z')

# lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%M:%S %z')

状态码和字节数:都是整型,使用 int 函数转换。

16.2.3.2、请求信息的解析

GET / HTTP/1.1

method url protocol 三部分都非常重要

def get_request(request: str):
    return dict(zip(['method', 'url', 'protocol'], request.split()))

# lambda request: dict(zip(['method', 'url', 'protocol'], request.split()))

16.2.3.3、映射

对每一个字段命名,然后值和类型转换的方法对应。解析每一行是有顺序的。

import datetime

fragment = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

CHARSET = set(" \t")

def mkkey(line: str):
    start = 0
    skip = False
    for i, c in enumerate(line):
        if not skip and c in '"[':
            skip = True
            start = i + 1
        elif skip and c in '"]':
            skip = False
            yield line[start:i]
            start = i + 1
            continue

        if skip:
            continue

        if c in CHARSET:
            if start == i:
                start += 1
                continue
            yield line[start:i]
            start = i + 1
    else:
        if start < len(line):
            yield line[start:]

names = ('remote', '', '', 'datetime', 'request', 'status', 'length', '', 'useragent')

ops = (None, None, None,
       lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
       lambda request: dict(zip(['method', 'url', 'protocol'], request.split())),
       int, int, None, None
       )

def extract(line: str):
    return dict(map(lambda item: (item[0], item[2](item[1]) if item[2] is not None else item[1]),
                    zip(names, mkkey(line), ops)))

print(extract(fragment))

16.2.4、正则表达式提取

构造一个正则表达式提取需要的字段,改造 extract 函数、names 和 ops。

names = ('remote', 'datetime', 'method', 'url', 'protocol', 'status', 'length', 'useragent')

ops = (None,
       lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
       None, None, None, int, int, None)

pattern = '''([\d.]{7,}) - - \[([/\w +:]+)\] "(\w+) (\S+) ([\w/\d.]+)" (\d+) (\d+) .+ "(.+)"'''

能使用命名分组吗?
进一步改造 pattern 为命名分组,ops 也就可以和名词对应了,names 就没有必要存在了。

ops = {'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
       'status': int,
       'length': int
       }

pattern = r'''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

改造后的代码:

import datetime
import re

fragment = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

CHARSET = set(" \t")

ops = {'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
       'status': int,
       'length': int
       }

pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

regex = re.compile(pattern)

def extract(line: str) -> dict:
    matcher = regex.match(line)
    return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}

print(extract(fragment))

16.2.5、异常处理

日志中不免会出现一些不匹配的行,需要处理。

这里使用 re.match 方法,有可能匹配不上。所以要增加一个判断。

采用抛出异常的方式,让调用者获得异常并自行处理。

def extract(line: str) -> dict:
    """返回字段的字典,抛出异常说明匹配失败"""
    matcher = regex.match(line)
    if matcher:
        return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
    else:
        raise Exception("No match")

16.3、滑动窗口

16.3.1、数据载入

对于本项目来说,数据就是日志的一行行记录,载入数据就是文件 IO 的读取。将获取数据的方法封装成函数。

def load(path):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue  # TODO 解析失败就抛弃,或者打印日志

16.3.2、时间窗口分析

16.3.2.1、概念

很多数据,例如日志,都是和时间相关的,都是按照时间顺序产生的。

产生的数据在分析的时候,要按照时间求值。

interval 表示每一次求值的时间间隔。

width 时间窗口宽度,指的一次求值的时间窗口宽度。

16.3.2.2、width > interval

python3_window

数据求值时会有重叠。

16.3.2.3、width = interval

python3_window1

数据求值时没有重叠。

16.3.2.4、width < interval

一般不采纳这种方案,会有数据丢失。

16.3.3、时序数据

运维环境中,日志、监控等产生的数据都是与时间相关的数据,按照时间先后产生并记录下来的数据,所以一般按照时间对数据进行分析。

16.3.4、数据分析基本程序结构

函数无限的生成随机数,产生时间相关的数据,返回时间和随机数的字典,每次取3个数据,求平均值。

import random
import datetime
import time

# 数据源函数,生成随机数
def source():
    while True:
        yield {'value': random.randint(1, 100), 'datetime': datetime.datetime.now()}
        time.sleep(1)

# 获取数据
src = source()
items = [next(src) for _ in range(3)]

# 处理函数
def handler(iterable):
    return sum(map(lambda item: item['value'], iterable)) / len(iterable)

print(items)
print('{:.2f}'.format(handler(items)))

上面代码模拟了,一段时间内产生了数据,等了一段固定的时间取数据来计算平均值。

16.3.5、窗口函数实现

将上面获取数据的程序扩展为 window 函数。使用重叠的方案。

import random
import datetime
import time

def source(second=1):
    """生成数据"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
            'value': random.randint(1, 100)
        }
        time.sleep(second)

# 处理函数
def handler(iterable):
    return sum(map(lambda item: item['value'], iterable)) / len(iterable)

def window(iterator, handler, width: int, interval: int):
    """
    窗口函数
    :param iterator: 数据源,生成器,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度, 秒
    :param interval: 处理时间间隔,秒
    """
    start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')

    buffer = []
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 从数据源获取数据
        data = next(iterator)
        if data:

            buffer.append(data)  # 存入临时缓冲等待计算
            current = data['datetime']

        # 每隔interval计算buffer中的数据一次
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{:.2f}'.format(ret))
            start = current

            # 清除超出width的数据
            buffer = [x for x in buffer if x['datetime'] > current - delta]

window(source(), handler, 10, 5)

时间的计算:

python3_buffer_window

16.4、数据分发

16.4.1、生产者消费者模型

对于一个监控系统,需要处理很多数据,包括日志。对其中已有数据的采集、分析。

被监控对象就是数据的生产者 producer,数据的处理程序就是数据的消费者 consumer。

生产者消费者传统模型:

python3_prod_cons

传统的生产者消费者模型,生产者生产,消费者消费。但这种模型有些问题。

开发的代码耦合太高,如果生产规模扩大,不易扩展,生产和消费的速度很难匹配等。

解决的办法:使用队列 queue。作用是解耦、缓冲。

python3_prod_cons1

日志生产者往往会部署好几个程序,日志产生的也很多,而消费者也会有多个程序,去提取日志分析处理。

数据的生产是不稳定的,会造成短时间数据的“潮涌”,需要缓冲。

消费者消费能力不一样,有快有慢,消费者可以自己决定消费缓冲区中的数据。

单机可以使用 queue 内建的模块构建进程内的队列,满足多个线程间的生产消费需要。

大型系统可以使用第三方消息中间件:RabbitMQ、RocketMQ、Kafka。

16.4.2、queue 模块

queue 模块提供了一个先进先出的队列 Queue。

queue.Queue(maxsize=0):创建 FIFO 队列,返回 Queue 对象。maxsize 小于等于 0,队列长度没有限制。

Queue.get(block=True, timeout=None):从队列中移除元素并返回这个元素。block 为阻塞,timeout 为超时。

  • 如果 block 为 True,是阻塞,timeout 为 None 就是一直阻塞。
  • 如果 block 为 True 但是 timeout 有值,就阻塞到一定秒数抛出 Empty 异常。
  • block 为 False,是非阻塞,timeout 将被忽略,要么成功返回一个元素,要么抛出 empty 异常。

Queue.get_nowait():等价于 get(False),也就是说要么成功返回一个元素,要么抛出 empty 异常。但是 queue 的这种阻塞效果,在多线程的时候演示才明显。

Queue.put(item, block=True, timeout=None):把一个元素加入到队列中去。

  • block=True,timeout=None,一直阻塞直至有空位放元素。
  • block=True,timeout=5,阻塞 5 秒就抛出 Full 异常。
  • block=False,timeout 失效,立即返回,能塞进去就塞,不能则抛出 Full 异常。

Queue.put_nowait(item):等价于 put(item, False),也就是能塞进去就塞,不能则抛出 Full 异常。

from queue import Queue
import random

q = Queue()
q.put(random.randint(1, 100))
q.put(random.randint(1, 100))

print(q.get())
print(q.get())
# print(q.get()) # 阻塞
print(q.get(timeout=3))  # 阻塞3秒,抛出异常

16.4.3、分发器的实现

生产者(数据源)生产数据,缓冲到消息队列中。

数据处理流程:数据加载 --> 提取 --> 分析(滑动窗口函数)。

处理大量数据的时候,对于一个数据源来说,需要多个消费者处理。但是如何分配数据就是个问题了。

需要一个分发器(调度器),把数据分发给不同的消费者处理。

每一个消费者拿到数据后,有自己的处理函数。所以要有一种注册机制。

数据加载 --> 提取 --> 分发 --> 分析函数 1

​ --> 分析函数 2

分析函数 1 和分析函数 2 是不同的 handler,不同的窗口宽度、间隔时间。

如何分发?

  • 这里就简单一点,轮询策略。

  • 一对多的副本发送,一个数据通过分发器,发送到 n 个消费者。

消息队列?

  • 在生产者和消费者之间使用消息队列,那么所有消费者共用一个消息队列,还是各自拥有一个消息队列呢?
  • 共用一个消息队列也可以,但是需要解决争抢的问题。相对来说每一个消费者自己拥有一个队列较为容易。

如何注册?

  • 在调度器内部记录有哪些消费者,每一个消费者拥有自己的队列。

线程?

  • 由于一条数据会被多个不同的注册过的 handler 处理,所以最好的方式是多线程。

    import threading
    
    # 定义线程
    t = threading.Thread(target=window, args=(src, handler, width, interval))
    t.start()
import random
import datetime
import time
from queue import Queue
import threading

def source(second=1):
    """生成数据"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
            'value': random.randint(1, 100)
        }
        time.sleep(second)

# 处理函数
def handler(iterable):
    return sum(map(lambda item: item['value'], iterable)) / len(iterable)

def window(src: Queue, handler, width: int, interval: int):
    """
    窗口函数
    :param iterator: 数据源,生成器,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度, 秒
    :param interval: 处理时间间隔,秒
    """
    start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')

    buffer = []
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 从数据源获取数据
        data = src.get()
        if data:
            buffer.append(data)  # 存入临时缓冲等待计算
            current = data['datetime']

        # 每隔interval计算buffer中的数据一次
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{:.2f}'.format(ret))
            start = current

            # 清除超出width的数据
            buffer = [x for x in buffer if x['datetime'] > current - delta]

def dispatcher(src):
    # 分发器中记录handler,同时保存各自的队列
    handlers = []
    queues = []

    def reg(handler, width: int, interval: int):
        """
        注册 窗口处理函数
        :param handler: 注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        """
        q = Queue()
        queues.append(q)

        h = threading.Thread(target=window, args=(q, handler, width, interval))
        handlers.append(h)

    def run():
        for t in handlers:
            t.start()
        for item in src:
            for q in queues:
                q.put(item)

    return reg, run

reg, run = dispatcher(source())

reg(handler, 10, 5)  # 注册
run()  # 运行

注意,以上代码也只是现阶段所学知识的一种实现,项目中建议使用消息队列服务的“订阅“模式,消费者各自消费自己的队列的数据。

16.5、文件加载

16.5.1、整合代码

load 函数就是从日志中提取合格数据的生成器函数。

它可以作为 dispatcher 函数的数据源。

原来写的 handler 函数处理一个字典的 datetime 字段,不能处理日志抽取函数 extract 返回的字典,提供一个新的函数。

import random
import datetime
import time
from queue import Queue
import threading
import re

PATTERN = r'''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

regex = re.compile(PATTERN)

ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int
}

def extract(line: str) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {
            name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()
        }

def load(path):
    """装载日志文件"""
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue  # TODO 解析失败则抛弃或者记录日志

# 数据处理
def source(second=1):
    """生成数据"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
            'value': random.randint(1, 100)
        }
        time.sleep(second)

def window(src: Queue, handler, width: int, interval: int):
    """
    注册 窗口处理函数
    :param handler: 注册的数据处理函数
    :param width: 时间窗口宽度
    :param interval: 时间间隔
    """

    start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')

    buffer = []
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 从数据源获取数据
        data = src.get()
        if data:
            buffer.append(data)  # 存入临时缓冲等待计算
            current = data['datetime']

        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{}'.format(ret))
            start = current

            buffer = [x for x in buffer if x['datetime'] > current - delta]

def handler(iterable):
    return sum(map(lambda x: x['value']), iterable) / len(iterable)

def donothing_handler(iterable):
    return iterable

def dispatcher(src):
    # 分发器中记录handler,同时保存各自的队列
    handlers = []
    queues = []

    def reg(handler, width: int, interval: int):
        """
        注册 窗口处理函数
        :param handler: 注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        """
        q = Queue()
        queues.append(q)

        h = threading.Thread(target=window, args=(q, handler, width, interval))
        handlers.append(h)

    def run():
        for t in handlers:
            t.start()
        for item in src:
            for q in queues:
                q.put(item)

    return reg, run

if __name__ == '__main__':
    import sys

    path = 'access.log'

    reg, run = dispatcher(load(path))
    reg(donothing_handler, 10, 5)
    run()

16.5.2、完成分析功能

分析日志很重要,通过海量数据分析就能够知道是否遭受了攻击,是否被爬取及爬取高峰期,是否有盗链等。

百度(Baidu) 爬虫名称(Baiduspider)

谷歌(Google) 爬虫名称(Googlebot)

16.5.2.1、状态码分析

状态码中包含了很多信息。例如:

  • 304,服务器收到客户端提交的请求参数,发现资源未变化,要求浏览器使用静态资源的缓存
  • 404,服务器找不到请求的资源。

304 占比大,说明静态缓存效果明显。404 占比大,说明网站出现了错误链接,或者尝试嗅探网站资源。

如果 400、500 占比突然开始增大,网站一定出问题了。

# 状态码占比
def status_handler(iterable):
    # 时间窗口内的一批数据
    status = {}

    for item in iterable:
        key = item['status']
        status[key] = status.get(key, 0) + 1

    total = len(iterable)
    return {k: status[k] / total for k, v in status.items()}

如果还需要什么分析,增加分析函数 handler 注册就行了。

16.5.2.2、日志文件的加载

目前实现的代码中,只能接受一个路径,修改为接受一批路径。

可以约定一下路径下文件的存放方式:

  • 如果送来的是一批路径,就迭代其中路径。

  • 如果路径是一个普通文件,就按照行读取内容。

  • 如果路径是一个目录,就遍历路径下所有普通文件,每一个文件按照行处理。不递归处理子目录。

from pathlib import Path

def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    pass  # 和下面处理一样
        elif p.is_file():
            with open(str(p)) as f:
                for line in f:
                    fields = extract(line)
                    if fields:
                        yield fields
                    else:
                        continue  # TODO 解析失败则抛弃或者记录日志

写的过程中发现重复的地方,把文件处理部分提出来写成函数。

from pathlib import Path

def openfile(path: str):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue  # TODO 解析失败则抛弃或者记录日志

def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
        elif p.is_file():
            yield from openfile(str(p))

16.5.3、完整代码

import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path

PATTERN = r'''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

regex = re.compile(PATTERN)

ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int
}

def extract(line: str) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {
            name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()
        }

def openfile(path: str):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue  # TODO 解析失败则抛弃或者记录日志

def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
        elif p.is_file():
            yield from openfile(str(p))

# 数据处理
def source(second=1):
    """生成数据"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
            'value': random.randint(1, 100)
        }
        time.sleep(second)

def window(src: Queue, handler, width: int, interval: int):
    """
    注册 窗口处理函数
    :param handler: 注册的数据处理函数
    :param width: 时间窗口宽度
    :param interval: 时间间隔
    """

    start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')

    buffer = []
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 从数据源获取数据
        data = src.get()
        if data:
            buffer.append(data)  # 存入临时缓冲等待计算
            current = data['datetime']

        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{}'.format(ret))
            start = current

            buffer = [x for x in buffer if x['datetime'] > current - delta]

def handler(iterable):
    return sum(map(lambda x: x['value']), iterable) / len(iterable)

def donothing_handler(iterable):
    return iterable

# 状态码占比
def status_handler(iterable):
    # 时间窗口内的一批数据
    status = {}

    for item in iterable:
        key = item['status']
        status[key] = status.get(key, 0) + 1

    total = len(iterable)
    return {k: status[k] / total for k, v in status.items()}

def dispatcher(src):
    # 分发器中记录handler,同时保存各自的队列
    handlers = []
    queues = []

    def reg(handler, width: int, interval: int):
        """
        注册 窗口处理函数
        :param handler: 注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        """
        q = Queue()
        queues.append(q)

        h = threading.Thread(target=window, args=(q, handler, width, interval))
        handlers.append(h)

    def run():
        for t in handlers:
            t.start()
        for item in src:
            for q in queues:
                q.put(item)

    return reg, run

if __name__ == '__main__':
    import sys

    path = 'access.log'

    reg, run = dispatcher(load(path))
    reg(donothing_handler, 10, 5)
    reg(status_handler, 10, 5)
    run()

到这里,一个离线日志分析项目基本完成。

  1. 可以指定文件或目录,对日志进行数据分析
  2. 分析函数可以动态注册
  3. 数据可以分发给不同的分析处理程序处理

16.6、浏览器分析

16.6.1、useragent

这里指的是,软件按照一定的格式向远端的服务器提供一个标识自己的字符串。

在 HTTP 协议中,使用 user-agent 字段传送这个字符串。

注意:这个值可以被修改

格式:

现在浏览器的user-agent值格式一般如下:
Mozilla/[version] ([system and browser information]) [platform] ([platform details])[extensions]

例如:
Chrome
Mozilla/5.0 (Window NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36

Firefox
Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0
Mozilla/5.0 (X11; Ubuntu; Linux x86 64; rv:52.0) Gecko/20100101 Firefox/52.0

IE
Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)

16.6.2、信息提取

pyyaml、ua-parser、user-agents 模块。

# 安装
pip install pyyaml ua-parser user-agents

使用:

from user_agents import parse

useragents = [
    "Mozilla/5.0 (Window NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36",
    "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86 64; rv:52.0) Gecko/20100101 Firefox/52.0",
    "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)"
]

for uastring in useragents:
    ua = parse(uastring)
    print(ua.browser, ua.browser.family, ua.browser.version, ua.browser.version_string)

运行结果:
Browser(family='Chrome', version=(57, 0, 2987), version_string='57.0.2987') Chrome (57, 0, 2987) 57.0.2987
Browser(family='Firefox', version=(56, 0), version_string='56.0') Firefox (56, 0) 56.0
Browser(family='Firefox', version=(52, 0), version_string='52.0') Firefox (52, 0) 52.0
Browser(family='IE', version=(10, 0), version_string='10.0') IE (10, 0) 10.0

ua.browser.familyua.browser.version_string 分别返回浏览器名称、版本号。

16.6.3、数据分析

ops 增加对 useragent 的处理:

from user_agents import parse

ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int,
    'useragent': lambda ua: parse(ua)
}

增加浏览器分析函数:

def browser_handler(iterable):
    browsers = {}
    for item in iterable:
        ua = item['useragent']

        key = (ua.browser.family, ua.browser.version_string)
        browsers[key] = browsers.get(key, 0) + 1
    return browsers

注册handler,注意时间窗口宽度:

reg(browser_handler, 5, 5)

问题:如果想知道所有浏览器的统计,怎么办?

allbrowsers = {}
def browser_handler(iterable):
    browsers = {}
    for item in iterable:
        ua = item['useragent']

        key = (ua.browser.family, ua.browser.version_string)
        browsers[key] = browsers.get(key, 0) + 1
        allbrowsers[key] = allbrowsers.get(key, 0) + 1
    print(sorted(allbrowsers.items(), key=lambda x:x[1], reverse=True)[:10])
    return browsers

16.6.4、完整代码

import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path
from user_agents import parse

PATTERN = r'''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

regex = re.compile(PATTERN)

ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int,
    'useragent': lambda ua: parse(ua)
}

def extract(line: str) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {
            name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()
        }

def openfile(path: str):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue  # TODO 解析失败则抛弃或者记录日志

def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
        elif p.is_file():
            yield from openfile(str(p))

# 数据处理
def source(second=1):
    """生成数据"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
            'value': random.randint(1, 100)
        }
        time.sleep(second)

def window(src: Queue, handler, width: int, interval: int):
    """
    注册 窗口处理函数
    :param handler: 注册的数据处理函数
    :param width: 时间窗口宽度
    :param interval: 时间间隔
    """

    start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')

    buffer = []
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 从数据源获取数据
        data = src.get()
        if data:
            buffer.append(data)  # 存入临时缓冲等待计算
            current = data['datetime']

        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{}'.format(ret))
            start = current

            buffer = [x for x in buffer if x['datetime'] > current - delta]

def handler(iterable):
    return sum(map(lambda x: x['value']), iterable) / len(iterable)

def donothing_handler(iterable):
    return iterable

allbrowsers = {}

def browser_handler(iterable):
    browsers = {}
    for item in iterable:
        ua = item['useragent']

        key = (ua.browser.family, ua.browser.version_string)
        browsers[key] = browsers.get(key, 0) + 1
        allbrowsers[key] = allbrowsers.get(key, 0) + 1
    print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])
    return browsers

# 状态码占比
def status_handler(iterable):
    # 时间窗口内的一批数据
    status = {}

    for item in iterable:
        key = item['status']
        status[key] = status.get(key, 0) + 1

    total = len(iterable)
    return {k: status[k] / total for k, v in status.items()}

def dispatcher(src):
    # 分发器中记录handler,同时保存各自的队列
    handlers = []
    queues = []

    def reg(handler, width: int, interval: int):
        """
        注册 窗口处理函数
        :param handler: 注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        """
        q = Queue()
        queues.append(q)

        h = threading.Thread(target=window, args=(q, handler, width, interval))
        handlers.append(h)

    def run():
        for t in handlers:
            t.start()
        for item in src:
            for q in queues:
                q.put(item)

    return reg, run

if __name__ == '__main__':
    import sys

    path = 'access.log'

    reg, run = dispatcher(load(path))
    reg(status_handler, 10, 5)
    reg(browser_handler, 5, 5)
    run()
标签云