#!/usr/bin/env python3
from collections import Counter, defaultdict
import csv
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from logging import warning
from pathlib import Path
import re
from statistics import mean, median, stdev
from sys import argv
from urllib.parse import unquote, urlparse
from typing import Dict, List, Tuple
import user_agents
ACCESS_RE = re.compile(' '.join((
r'(?P
\S+)',
r'\S+',
r'\S+',
r'(?P\[\S+ \S+\])',
r'"GET (?P[^ ?]+)(\?\S+)? [^"]+"',
r'200 [0-9]+',
r'"(?P[^"]+)(\?\S+)?"',
r'"(?P[^"]+)"'
)))
DATE_FMT = '[%d/%b/%Y:%H:%M:%S %z]'
VISIT_MAX_DURATION = timedelta(hours=1)
DOMAINS = (
'quatuorbellefeuille.com',
'quatuorbellefeuille.fr',
'klg.uber.space'
)
def normalize_path(p):
if p in ('', '/'):
return '/index.html'
return unquote(p)
@dataclass
class Access:
address: str
useragent: str
referrer: str
time: datetime
resource: str
@classmethod
def from_log(cls, info):
resource = normalize_path(info['resource'])
referrer = urlparse(info['referer'])
if referrer.netloc.endswith(DOMAINS):
ref = normalize_path(referrer.path)
elif referrer.netloc:
ref = referrer.netloc
else:
ref = 'n/a'
return cls(
info['address'], info['useragent'], ref,
datetime.strptime(info['date'], DATE_FMT), resource
)
def interesting(resource):
return resource.endswith('.html') or resource == '/'
def parse(logs_paths):
logs = []
for lp in logs_paths:
with open(lp) as logs_file:
logs += logs_file.read().splitlines()
matches = (ACCESS_RE.match(l) for l in logs)
return tuple(
Access.from_log(m) for m in matches
if (m is not None and interesting(m['resource']))
)
Visit = List[Access]
class UserAgentKind(Enum):
PC = 'pc'
MOBILE = 'mobile'
TABLET = 'tablet'
BOT = 'bot'
NA = 'n/a'
@classmethod
def from_useragent(cls, ua_string):
ua = user_agents.parse(ua_string)
# is_bot is not mutually exclusive with other is_* predicates.
if ua.is_bot:
return cls.BOT
if ua.is_pc:
return cls.PC
if ua.is_mobile:
return cls.MOBILE
if ua.is_tablet:
return cls.TABLET
warning(f'Unknown user agent kind: {ua_string}')
return cls.NA
def is_human(self):
return self in {
UserAgentKind.PC,
UserAgentKind.MOBILE,
UserAgentKind.TABLET
}
@dataclass
class Visitor:
address: str
useragent: UserAgentKind
visits: List[Visit]
def sort_visits(accesses):
visitors: Dict[Tuple(str, str), Visitor] = {}
for a in accesses:
key = (a.address, a.useragent)
visitor = visitors.get(key)
if visitor is None:
visitor = Visitor(
a.address,
UserAgentKind.from_useragent(a.useragent),
[[a]]
)
visitors[key] = visitor
continue
last_visit = visitor.visits[-1]
last_access = last_visit[-1].time
if a.time - last_access < VISIT_MAX_DURATION:
last_visit.append(a)
continue
visitor.visits.append([a])
return visitors
def datetime_day(dt):
return dt.replace(hour=0, minute=0, second=0)
def find_days(visits):
return {
datetime_day(v[0].time)
for v in visits
}
def find_pages(visitors):
return sorted({
access.resource
for v in visitors
for visit in v.visits
for access in visit
if v.useragent.is_human()
})
def external_referrer(ref):
return ref != 'n/a' and not ref.startswith('/')
def simplify_referrer(ref):
parts = ref.split('.')
# Remove leading parts (www., l., m.…) and extension (.com, .fr…).
return parts[-2]
def find_referrers(visitors):
return sorted({
simplify_referrer(access.referrer)
for v in visitors
for visit in v.visits
for access in visit
if external_referrer(access.referrer)
})
def daily_visitors(visitors, output_dir):
days: Dict[datetime, Counter] = defaultdict(Counter)
columns = ('mobile', 'tablet', 'pc', 'bot', 'n/a')
print('Visitors:')
for v in visitors.values():
for day in find_days(v.visits):
days[day][v.useragent.value] += 1
with open(Path(output_dir).joinpath('dailyvisitors.csv'), 'w') as f:
out = csv.writer(f)
out.writerow(('day', 'total', *columns))
print('day', 'total', *columns, sep='\t')
for day in sorted(days):
counter = days[day]
counters = tuple(counter[c] for c in columns)
values = (day.strftime('%F'), sum(counters), *counters)
out.writerow(values)
print(*values, sep='\t')
return days
def daily_visits(visitors, output_dir):
days: Dict[datetime, Counter] = defaultdict(Counter)
columns = ('mobile', 'tablet', 'pc', 'bot', 'n/a')
print('Visits:')
for v in visitors.values():
for visit in v.visits:
day = datetime_day(visit[0].time)
days[day][v.useragent.value] += 1
with open(Path(output_dir, 'dailyvisits.csv'), 'w') as f:
out = csv.writer(f)
out.writerow(('day', 'total', *columns))
print('day', 'total', *columns, sep='\t')
for day in sorted(days):
counter = days[day]
counters = tuple(counter[c] for c in columns)
values = (day.strftime('%F'), sum(counters), *counters)
out.writerow(values)
print(*values, sep='\t')
return days
def daily_pages_per_visit(visitors, output_dir):
days: Dict[datetime, list] = defaultdict(list)
columns = ('min', 'max', 'med', 'avg', 'dev')
print('Pages/visit:')
for v in visitors.values():
if not v.useragent.is_human():
continue
for visit in v.visits:
day = datetime_day(visit[0].time)
days[day].append(len(visit))
with open(Path(output_dir, 'dailypagespervisit.csv'), 'w') as f:
out = csv.writer(f)
out.writerow(('day', *columns))
print('day', *columns, sep='\t')
for day in sorted(days):
view_counts = days[day]
values = (
day.strftime('%F'),
min(view_counts),
max(view_counts),
median(view_counts),
mean(view_counts),
stdev(view_counts)
)
out.writerow(values)
print(*values[:4], *(f'{v:.2f}' for v in values[4:]), sep='\t')
return days
def daily_page_hits(visitors, output_dir):
days: Dict[datetime, Counter] = defaultdict(Counter)
columns = find_pages(visitors.values())
print('Page hits:')
for v in visitors.values():
if not v.useragent.is_human():
continue
for visit in v.visits:
day = datetime_day(visit[0].time)
for access in visit:
days[day][access.resource] += 1
with open(Path(output_dir, 'dailypagehits.csv'), 'w') as f:
out = csv.writer(f)
out.writerow(('day', *columns))
for day in sorted(days):
page_hits = days[day]
values = (day.strftime('%F'),
*(page_hits[page] for page in columns))
out.writerow(values)
print(day.strftime('%F'))
for page, hits in page_hits.most_common(5):
print(hits, page, sep='\t')
return days
def daily_referrers(visitors, output_dir):
days: Dict[datetime, Counter] = defaultdict(Counter)
columns = find_referrers(visitors.values())
print('Referrers:')
for v in visitors.values():
for visit in v.visits:
day = datetime_day(visit[0].time)
for access in visit:
if not external_referrer(access.referrer):
continue
days[day][simplify_referrer(access.referrer)] += 1
with open(Path(output_dir, 'dailyreferrers.csv'), 'w') as f:
out = csv.writer(f)
out.writerow(('day', *columns))
print('day', *columns, sep='\t')
for day in sorted(days):
refcounts = days[day]
values = (day.strftime('%F'), *(refcounts[ref] for ref in columns))
out.writerow(values)
print(*values, sep='\t')
return days
def dump_stats(visitors, output_dir):
output_dir = Path(output_dir)
daily_visitors(visitors, output_dir)
visits_pday = daily_visits(visitors, output_dir)
pagespervisit_pday = daily_pages_per_visit(visitors, output_dir)
pagehits_pday = daily_page_hits(visitors, output_dir)
referrers_pday = daily_referrers(visitors, output_dir)
ua_values = tuple(ua.value for ua in UserAgentKind)
nb_visitors = {
ua.value: sum(1 for v in visitors.values() if v.useragent == ua)
for ua in UserAgentKind
}
nb_visits = {
ua: sum(visits_pday[day][ua] for day in visits_pday)
for ua in ua_values
}
pages_per_visit = tuple(
nb for day in pagespervisit_pday for nb in pagespervisit_pday[day]
)
hits_per_page = {
page: sum(pagehits_pday[day][page] for day in pagehits_pday)
for page in find_pages(visitors.values())
}
referrers = {
ref: sum(referrers_pday[day][ref] for day in referrers_pday)
for ref in find_referrers(visitors.values())
}
with open(Path(output_dir, 'global.csv'), 'w') as f:
out = csv.writer(f)
out.writerow(('#visitors',))
out.writerows(
(ua, nb_visitors[ua]) for ua in ua_values
)
out.writerow(('total', sum(nb_visitors.values())))
out.writerow(('#visits',))
out.writerows(
(ua, nb_visits[ua]) for ua in ua_values
)
out.writerow(('total', sum(nb_visits.values())))
out.writerow(('#pages/visit',))
out.writerows((
('min', min(pages_per_visit)),
('max', max(pages_per_visit)),
('med', median(pages_per_visit)),
('avg', mean(pages_per_visit)),
('dev', stdev(pages_per_visit))
))
out.writerow(('#views/page',))
lines = reversed(sorted(hits_per_page.items(), key=lambda kv: kv[1]))
out.writerows(lines)
out.writerow(('#referrers',))
lines = reversed(sorted(referrers.items(), key=lambda kv: kv[1]))
out.writerows(lines)
def main(logs_paths, output_dir):
accesses = parse(logs_paths)
visitors = sort_visits(accesses)
dump_stats(visitors, output_dir)
if __name__ == '__main__':
main(argv[1:-1], argv[-1])