#!/usr/bin/env python3 from collections import Counter, defaultdict import csv from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from logging import warning from pathlib import Path import re from statistics import mean, median, stdev from sys import argv from urllib.parse import unquote, urlparse from typing import Dict, List, Tuple import user_agents ACCESS_RE = re.compile(' '.join(( r'(?P
\S+)', r'\S+', r'\S+', r'(?P\[\S+ \S+\])', r'"GET (?P[^ ?]+)(\?\S+)? [^"]+"', r'200 [0-9]+', r'"(?P[^"]+)(\?\S+)?"', r'"(?P[^"]+)"' ))) DATE_FMT = '[%d/%b/%Y:%H:%M:%S %z]' VISIT_MAX_DURATION = timedelta(hours=1) DOMAINS = { 'quatuorbellefeuille.com', 'quatuorbellefeuille.fr', 'klg.uber.space' } def normalize_path(p): if p == '/': return '/index.html' return unquote(p) @dataclass class Access: address: str useragent: str referrer: str time: datetime resource: str @classmethod def from_log(cls, info): resource = normalize_path(info['resource']) referrer = urlparse(info['referer']) if referrer.netloc in DOMAINS: ref = normalize_path(referrer.path) else: ref = referrer.netloc return cls( info['address'], info['useragent'], ref, datetime.strptime(info['date'], DATE_FMT), resource ) def interesting(resource): return resource.endswith('.html') or resource == '/' def parse(logs_paths): logs = [] for lp in logs_paths: with open(lp) as logs_file: logs += logs_file.read().splitlines() matches = (ACCESS_RE.match(l) for l in logs) return tuple( Access.from_log(m) for m in matches if (m is not None and interesting(m['resource'])) ) Visit = List[Access] class UserAgentKind(Enum): PC = 'pc' MOBILE = 'mobile' TABLET = 'tablet' BOT = 'bot' NA = 'n/a' @classmethod def from_useragent(cls, ua_string): ua = user_agents.parse(ua_string) # is_bot is not mutually exclusive with other is_* predicates. if ua.is_bot: return cls.BOT if ua.is_pc: return cls.PC if ua.is_mobile: return cls.MOBILE if ua.is_tablet: return cls.TABLET warning(f'Unknown user agent kind: {ua_string}') return cls.NA def is_human(self): return self in { UserAgentKind.PC, UserAgentKind.MOBILE, UserAgentKind.TABLET } @dataclass class Visitor: address: str useragent: UserAgentKind referrers: List[str] visits: List[Visit] def sort_visits(accesses): visitors: Dict[Tuple(str, str), Visitor] = {} for a in accesses: key = (a.address, a.useragent) visitor = visitors.get(key) if visitor is None: visitor = Visitor( a.address, UserAgentKind.from_useragent(a.useragent), a.referrer, [[a]] ) visitors[key] = visitor continue last_visit = visitor.visits[-1] last_access = last_visit[-1].time if a.time - last_access < VISIT_MAX_DURATION: last_visit.append(a) continue visitor.visits.append([a]) return visitors def datetime_day(dt): return dt.replace(hour=0, minute=0, second=0) def find_days(visits): return { datetime_day(v[0].time) for v in visits } def find_pages(visitors): return sorted({ access.resource for v in visitors for visit in v.visits for access in visit if v.useragent.is_human() }) def daily_visitors(visitors, output_path): days: Dict[datetime, Counter] = defaultdict(Counter) columns = ('mobile', 'tablet', 'pc', 'bot', 'n/a') print('Visitors:') for v in visitors.values(): for day in find_days(v.visits): days[day][v.useragent.value] += 1 with open(output_path, 'w') as f: out = csv.writer(f) out.writerow(('day', 'total', *columns)) print('day', 'total', *columns, sep='\t') for day in sorted(days): counter = days[day] counters = tuple(counter[c] for c in columns) values = (day.strftime('%F'), sum(counters), *counters) out.writerow(values) print(*values, sep='\t') def daily_visits(visitors, output_path): days: Dict[datetime, Counter] = defaultdict(Counter) columns = ('mobile', 'tablet', 'pc', 'bot', 'n/a') print('Visits:') for v in visitors.values(): for visit in v.visits: day = datetime_day(visit[0].time) days[day][v.useragent.value] += 1 with open(output_path, 'w') as f: out = csv.writer(f) out.writerow(('day', 'total', *columns)) print('day', 'total', *columns, sep='\t') for day in sorted(days): counter = days[day] counters = tuple(counter[c] for c in columns) values = (day.strftime('%F'), sum(counters), *counters) out.writerow(values) print(*values, sep='\t') def daily_pages_per_visit(visitors, output_path): days: Dict[datetime, list] = defaultdict(list) columns = ('min', 'max', 'med', 'avg', 'dev') print('Pages/visit:') for v in visitors.values(): if not v.useragent.is_human(): continue for visit in v.visits: day = datetime_day(visit[0].time) days[day].append(len(visit)) with open(output_path, 'w') as f: out = csv.writer(f) out.writerow(('day', *columns)) print('day', *columns, sep='\t') for day in sorted(days): view_counts = days[day] values = ( day.strftime('%F'), min(view_counts), max(view_counts), median(view_counts), mean(view_counts), stdev(view_counts) ) out.writerow(values) print(*values[:4], *(f'{v:.2f}' for v in values[4:]), sep='\t') def daily_page_hits(visitors, output_path): days: Dict[datetime, Counter] = defaultdict(Counter) columns = find_pages(visitors.values()) print('Page hits:') for v in visitors.values(): if not v.useragent.is_human(): continue for visit in v.visits: day = datetime_day(visit[0].time) for access in visit: days[day][access.resource] += 1 with open(output_path, 'w') as f: out = csv.writer(f) out.writerow(('day', *columns)) for day in sorted(days): page_hits = days[day] values = (day.strftime('%F'), *(page_hits[page] for page in columns)) out.writerow(values) print(day.strftime('%F')) for page, hits in page_hits.most_common(5): print(hits, page, sep='\t') def daily_stats(visitors, output_dir): output_dir = Path(output_dir) daily_visitors(visitors, output_dir.joinpath('dailyvisitors.csv')) daily_visits(visitors, output_dir.joinpath('dailyvisits.csv')) daily_pages_per_visit(visitors, output_dir.joinpath('dailypagespervisit.csv')) daily_page_hits(visitors, output_dir.joinpath('dailypagehits.csv')) # daily_referrers(visitors, output_dir.joinpath('dailyreferrers.csv')) def global_stats(visitors, output_dir): pass def main(logs_paths, output_dir): accesses = parse(logs_paths) visitors = sort_visits(accesses) daily_stats(visitors, output_dir) global_stats(visitors, output_dir) if __name__ == '__main__': main(argv[1:-1], argv[-1])