#!/usr/bin/env python3
from collections import Counter, defaultdict
import csv
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from logging import warning
from pathlib import Path
import re
from statistics import mean, median, stdev
from sys import argv
from urllib.parse import urlparse
from typing import Dict, List, Tuple
import user_agents
ACCESS_RE = re.compile(' '.join((
r'(?P
\S+)',
r'\S+',
r'\S+',
r'(?P\[\S+ \S+\])',
r'"GET (?P[^ ?]+)(\?\S+)? [^"]+"',
r'200 [0-9]+',
r'"(?P[^"]+)(\?\S+)?"',
r'"(?P[^"]+)"'
)))
DATE_FMT = '[%d/%b/%Y:%H:%M:%S %z]'
VISIT_MAX_DURATION = timedelta(hours=1)
DOMAINS = {
'quatuorbellefeuille.com',
'quatuorbellefeuille.fr',
'klg.uber.space'
}
def normalize_path(p):
if p == '/':
return '/index.html'
return p
@dataclass
class Access:
address: str
useragent: str
referrer: str
time: datetime
resource: str
@classmethod
def from_log(cls, info):
resource = normalize_path(info['resource'])
referrer = urlparse(info['referer'])
if referrer.netloc in DOMAINS:
ref = normalize_path(referrer.path)
else:
ref = referrer.netloc
return cls(
info['address'], info['useragent'], ref,
datetime.strptime(info['date'], DATE_FMT), resource
)
def interesting(resource):
return resource.endswith('.html') or resource == '/'
def parse(logs_paths):
logs = []
for lp in logs_paths:
with open(lp) as logs_file:
logs += logs_file.read().splitlines()
matches = (ACCESS_RE.match(l) for l in logs)
return tuple(
Access.from_log(m) for m in matches
if (m is not None and interesting(m['resource']))
)
Visit = List[Access]
class UserAgentKind(Enum):
PC = 'pc'
MOBILE = 'mobile'
TABLET = 'tablet'
BOT = 'bot'
NA = 'n/a'
@classmethod
def from_useragent(cls, ua_string):
ua = user_agents.parse(ua_string)
# is_bot is not mutually exclusive with other is_* predicates.
if ua.is_bot:
return cls.BOT
if ua.is_pc:
return cls.PC
if ua.is_mobile:
return cls.MOBILE
if ua.is_tablet:
return cls.TABLET
warning(f'Unknown user agent kind: {ua_string}')
return cls.NA
def is_human(self):
return self in {
UserAgentKind.PC,
UserAgentKind.MOBILE,
UserAgentKind.TABLET
}
@dataclass
class Visitor:
address: str
useragent: UserAgentKind
referrers: List[str]
visits: List[Visit]
def sort_visits(accesses):
visitors: Dict[Tuple(str, str), Visitor] = {}
for a in accesses:
key = (a.address, a.useragent)
visitor = visitors.get(key)
if visitor is None:
visitor = Visitor(
a.address,
UserAgentKind.from_useragent(a.useragent),
a.referrer,
[[a]]
)
visitors[key] = visitor
continue
last_visit = visitor.visits[-1]
last_access = last_visit[-1].time
if a.time - last_access < VISIT_MAX_DURATION:
last_visit.append(a)
continue
visitor.visits.append([a])
return visitors
def datetime_day(dt):
return dt.replace(hour=0, minute=0, second=0)
def find_days(visits):
return {
datetime_day(v[0].time)
for v in visits
}
def daily_visitors(visitors, output_path):
days: Dict[datetime, Counter] = defaultdict(Counter)
columns = ('mobile', 'tablet', 'pc', 'bot', 'n/a')
print('Visitors:')
for v in visitors.values():
for day in find_days(v.visits):
days[day][v.useragent.value] += 1
with open(output_path, 'w') as f:
out = csv.writer(f)
out.writerow(('day', 'total', *columns))
print('day', 'total', *columns, sep='\t')
for day in sorted(days):
counter = days[day]
counters = tuple(counter[c] for c in columns)
values = (day.strftime('%F'), sum(counters), *counters)
out.writerow(values)
print(*values, sep='\t')
def daily_visits(visitors, output_path):
days: Dict[datetime, Counter] = defaultdict(Counter)
columns = ('mobile', 'tablet', 'pc', 'bot', 'n/a')
print('Visits:')
for v in visitors.values():
for visit in v.visits:
day = datetime_day(visit[0].time)
days[day][v.useragent.value] += 1
with open(output_path, 'w') as f:
out = csv.writer(f)
out.writerow(('day', 'total', *columns))
print('day', 'total', *columns, sep='\t')
for day in sorted(days):
counter = days[day]
counters = tuple(counter[c] for c in columns)
values = (day.strftime('%F'), sum(counters), *counters)
out.writerow(values)
print(*values, sep='\t')
def daily_pages_per_visit(visitors, output_path):
days: Dict[datetime, list] = defaultdict(list)
columns = ('min', 'max', 'med', 'avg', 'dev')
print('Pages/visit:')
for v in visitors.values():
if not v.useragent.is_human():
continue
for visit in v.visits:
day = datetime_day(visit[0].time)
days[day].append(len(visit))
with open(output_path, 'w') as f:
out = csv.writer(f)
out.writerow(('day', *columns))
print('day', *columns, sep='\t')
for day in sorted(days):
view_counts = days[day]
values = (
day.strftime('%F'),
min(view_counts),
max(view_counts),
median(view_counts),
mean(view_counts),
stdev(view_counts)
)
out.writerow(values)
print(*values[:4], *(f'{v:.2f}' for v in values[4:]), sep='\t')
def daily_stats(visitors, output_dir):
output_dir = Path(output_dir)
daily_visitors(visitors, output_dir.joinpath('dailyvisitors.csv'))
daily_visits(visitors, output_dir.joinpath('dailyvisits.csv'))
daily_pages_per_visit(visitors, output_dir.joinpath('dailypagespervisit.csv'))
# daily_page_hits(visitors, output_dir.joinpath('dailypagehits.csv'))
# daily_referrers(visitors, output_dir.joinpath('dailyreferrers.csv'))
def global_stats(visitors, output_dir):
pass
def main(logs_paths, output_dir):
accesses = parse(logs_paths)
visitors = sort_visits(accesses)
daily_stats(visitors, output_dir)
global_stats(visitors, output_dir)
if __name__ == '__main__':
main(argv[1:-1], argv[-1])