#!/usr/bin/env python3
from collections import Counter, defaultdict
import csv
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
import re
from sys import argv
from urllib.parse import urlparse
from typing import Dict, List, Tuple
from warnings import warn
import user_agents
ACCESS_RE = re.compile(' '.join((
r'(?P
\S+)',
r'\S+',
r'\S+',
r'(?P\[\S+ \S+\])',
r'"GET (?P[^ ?]+)(\?\S+)? [^"]+"',
r'200 [0-9]+',
r'"(?P[^"]+)(\?\S+)?"',
r'"(?P[^"]+)"'
)))
DATE_FMT = '[%d/%b/%Y:%H:%M:%S %z]'
VISIT_MAX_DURATION = timedelta(hours=1)
DOMAINS = {
'quatuorbellefeuille.com',
'quatuorbellefeuille.fr',
'klg.uber.space'
}
def normalize_path(p):
if p == '/':
return '/index.html'
return p
@dataclass
class Access:
address: str
useragent: str
referrer: str
time: datetime
resource: str
@classmethod
def from_log(cls, info):
resource = normalize_path(info['resource'])
referrer = urlparse(info['referer'])
if referrer.netloc in DOMAINS:
ref = normalize_path(referrer.path)
else:
ref = referrer.netloc
return cls(
info['address'], info['useragent'], ref,
datetime.strptime(info['date'], DATE_FMT), resource
)
def interesting(resource):
return resource.endswith('.html') or resource == '/'
def parse(logs_paths):
logs = []
for lp in logs_paths:
with open(lp) as logs_file:
logs += logs_file.read().splitlines()
matches = (ACCESS_RE.match(l) for l in logs)
return tuple(
Access.from_log(m) for m in matches
if (m is not None and interesting(m['resource']))
)
Visit = List[Access]
@dataclass
class Visitor:
address: str
useragent: str
referrers: List[str]
visits: List[Visit]
def useragent_kind(ua_string):
ua = user_agents.parse(ua_string)
if ua.is_pc:
return 'pc'
if ua.is_mobile:
return 'mobile'
if ua.is_tablet:
return 'tablet'
if ua.is_bot:
return 'bot'
warn(f'Unknown user agent kind: {ua_string}')
return 'n/a'
def sort_visits(accesses):
visitors: Dict[Tuple(str, str), Visitor] = {}
for a in accesses:
key = (a.address, a.useragent)
visitor = visitors.get(key)
if visitor is None:
visitor = Visitor(
a.address,
useragent_kind(a.useragent),
a.referrer,
[[a]]
)
visitors[key] = visitor
continue
last_visit = visitor.visits[-1]
last_access = last_visit[-1].time
if a.time - last_access < VISIT_MAX_DURATION:
last_visit.append(a)
continue
visitor.visits.append([a])
return visitors
def find_days(visits):
return {
v[0].time.replace(hour=0, minute=0, second=0)
for v in visits
}
def daily_visitors(visitors, output_path):
days: Dict[datetime, Counter] = defaultdict(Counter)
columns = ('mobile', 'tablet', 'pc', 'bot', 'n/a')
print('Visitors:')
for v in visitors.values():
for day in find_days(v.visits):
days[day][v.useragent] += 1
with open(output_path, 'w') as f:
out = csv.writer(f)
out.writerow(('day', 'total', *columns))
print('day', 'total', *columns, sep='\t')
for day in sorted(days):
counter = days[day]
counters = tuple(counter[c] for c in columns)
values = (day.strftime('%F'), sum(counters), *counters)
out.writerow(values)
print(*values, sep='\t')
def daily_stats(visitors, output_dir):
output_dir = Path(output_dir)
daily_visitors(visitors, output_dir.joinpath('dailyvisitors.csv'))
# daily_visits(visitors, output_dir.joinpath('dailyvisits.csv'))
# daily_pages_per_visit(visitors, output_dir.joinpath('dailypagespervisit.csv'))
# daily_page_hits(visitors, output_dir.joinpath('dailypagehits.csv'))
# daily_referrers(visitors, output_dir.joinpath('dailyreferrers.csv'))
def global_stats(visitors, output_dir):
pass
def main(logs_paths, output_dir):
accesses = parse(logs_paths)
visitors = sort_visits(accesses)
daily_stats(visitors, output_dir)
global_stats(visitors, output_dir)
if __name__ == '__main__':
main(argv[1:-1], argv[-1])