vrr-accountability/monitor.py

267 lines
8.7 KiB
Python
Executable File

#!/usr/bin/env python3
import requests
import json
from typing import List
# from pprint import pprint, pformat
import datetime
import pause
import sys
import mysql.connector
import configparser
TABLE = """
CREATE TABLE IF NOT EXISTS vrr (
line_code varchar(9) not null,
direction_code varchar(1) not null,
station_id int not null,
orig_datetime datetime not null,
status enum('on_time', 'early', 'late', 'cancelled', 'no_data') not null,
delay_value int,
primary key (line_code, direction_code, station_id, orig_datetime));
"""
class MOT:
LONG_DISTANCE_TRAIN = 0
REGIONAL_TRAIN = 1
COMMUTER_TRAIN = 2
UNDERGROUND_TRAIN = 3
TRAM = 4
BUS = 15
ELEVATED_TRAIN = 6
ALL_MODES = [LONG_DISTANCE_TRAIN, REGIONAL_TRAIN, COMMUTER_TRAIN, UNDERGROUND_TRAIN, TRAM, BUS, ELEVATED_TRAIN]
# Parse the configuration file:
cfg = configparser.ConfigParser()
cfg.read('vrr.ini')
db_config = {
'user': cfg['db']['user'],
'password': cfg['db']['pass'],
'host': cfg['db']['host'],
'database': cfg['db']['database'],
}
USE_MODES = []
if cfg['crawl']['use_long_distance']:
USE_MODES.append(MOT.LONG_DISTANCE_TRAIN)
if cfg['crawl']['use_regional_trains']:
USE_MODES.append(MOT.REGIONAL_TRAIN)
if cfg['crawl']['use_commuter_trains']:
USE_MODES.append(MOT.COMMUTER_TRAIN)
if cfg['crawl']['use_trams']:
USE_MODES.append(MOT.TRAM)
if cfg['crawl']['use_buses']:
USE_MODES.append(MOT.BUS)
if cfg['crawl']['use_elevated_trains']:
USE_MODES.append(MOT.ELEVATED_TRAIN)
if cfg['crawl']['station_id'] is not None:
USE_STATION_ID = cfg['crawl']['station_id']
else:
sys.exit("Please specify a station_id in the [crawl] section of vrr.ini")
USE_LINES = cfg['crawl']['use_lines'].split(',')
ALL_LINES = []
TRIP_CANCELLED = -9999
def make_request_data(station_id: int, result_count: int = 8, modes: List = MOT.ALL_MODES,
lines: List[str] = ALL_LINES) -> dict:
"""
Prepare a request data dictionary to put into get_data()
:param station_id: an EFA station ID
:param result_count: how many departures to return
:param modes: which modes of transport to use
:param lines: which lines to use (line identifiers look like 'provider:line ID: :direction ID',
e.g. 'rbg:70070: :H' for the Rheinbahn U70 to Düsseldorf Hbf.
:return: a dictionary with the data necessary to make a request to the Abfahrtsmonitor API.
"""
"""
The request data dictionary can have the following items:
stationID: a numerical EFA station ID
stationName: (optional) the station's name
platformVisibility: (optional) ???
transport: a comma-separated list of the modes of transport to be displayed. See the constants for values.
useAllLines: display all available lines or filter them using the linesFilter
linesFilter: a JSON array with the lines to be displayed. See lines_filter for the format
optimizedForStation: (optional) ???
rowCount: the amount of results to be returned
refreshInterval: (optional) (display parameter) refresh rate in seconds for the browser UI
distance: (optional) (display parameter) distance from the monitor to the stop
marquee: (optional) (display parameter) make the path text scroll sideways
sortBy: (optional) ???
"""
request_data = {
'stationId': int(station_id),
'rowCount': result_count
}
# sanity check: do the modes exist?
for mode in modes:
if mode not in MOT.ALL_MODES:
raise ValueError(str(mode) + "Unknown transport mode!")
# Add the list to the data dictionary
request_data['transport'] = ','.join("{0}".format(n) for n in modes).rstrip(',')
if lines is ALL_LINES:
request_data['useAllLines'] = 1
else:
lines_dictarr = [{'data': v.replace(' ', '+')} for v in lines]
request_data['linesFilter'] = json.dumps(lines_dictarr)
request_data['useAllLines'] = 0
# finally, add the HTML naming
request_data = {"table[departure][{0}]".format(k): v for k, v in request_data.items()}
return request_data
def get_data(request_data: dict, headers: dict = None, cookies: dict = None) -> dict:
url = 'https://abfahrtsmonitor.vrr.de/backend/api/stations/table'
reply = requests.post(url, data=request_data, headers=headers, cookies=cookies)
reply.raise_for_status()
print('Request time elapsed: ' + str(reply.elapsed), file=sys.stderr)
return reply.json()
def is_cancelled(trip: dict) -> bool:
if trip['delay'] is not None:
return int(trip['delay']) == TRIP_CANCELLED
return False
def is_late(trip: dict) -> bool:
if trip['delay'] is not None:
return int(trip['delay']) > 0
return False
def is_early(trip: dict) -> bool:
if trip['delay'] is not None:
return int(trip['delay']) < 0 and int(trip['delay']) != TRIP_CANCELLED
return False
def is_on_time(trip: dict) -> bool:
return int(trip['delay']) == 0
def has_realtime(trip: dict) -> bool:
return trip['delay'] != ''
def fixup_data(d: dict) -> dict:
for trip in d['departureData']:
if trip['delay'] == '':
trip['delay'] = None
return d
def print_trip(trip: dict) -> None:
trip_part = "The {}:{} {} (???:{}: :{}) service to {} ".format(trip['orgHour'], trip['orgMinute'],
trip['lineNumber'], trip['lineCode'],
trip['directionCode'], trip['direction'])
if has_realtime(trip):
if is_cancelled(trip):
print(trip_part + "is cancelled.")
elif is_late(trip):
print(trip_part + "is {} minutes late.".format(trip['delay']))
elif is_early(trip):
print(trip_part + "is {} minutes early.".format(-trip['delay']))
elif is_on_time(trip):
print(trip_part + "is on time.")
else:
print(trip_part + "has no real-time data.")
def get_next_refresh(data: dict):
times = []
if data is not None:
for trip in data['departureData']:
times.append(trip['orgFullTime'])
times.append(trip['fullTime'])
times = [int(time) for time in times if int(time) > datetime.datetime.now().timestamp()]
times.sort()
for time in times:
if (datetime.datetime.fromtimestamp(time) - datetime.datetime.now()) > datetime.timedelta(seconds=30):
if (datetime.datetime.fromtimestamp(time) - datetime.datetime.now()) > datetime.timedelta(minutes=5):
return (datetime.datetime.now() + datetime.timedelta(minutes=5)).timestamp()
return time
return (datetime.datetime.now() + datetime.timedelta(seconds=90)).timestamp()
def _make_delay_value_for_sql(value: str or int or None) -> int or None:
if value == '':
value = None
else:
value = int(value)
return value
def _make_status_value_for_sql(trip: dict) -> str:
if not has_realtime(trip):
status = 'no_data'
else:
if is_late(trip):
status = 'late'
elif is_cancelled(trip):
status = 'cancelled'
elif is_early(trip):
status = 'early'
elif is_on_time(trip):
status = 'on_time'
else:
raise ValueError("unknown delay value")
return status
def update(station_id: int, lines: List[str]) -> dict or None:
try:
reply_data = get_data(
make_request_data(
station_id,
8,
lines=lines
)
)
except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as e:
print("Could not get the data: " + str(e), file=sys.stderr)
return None
for trip in reply_data['departureData']:
print_trip(trip)
return reply_data
def wait(cxn: mysql.connector.MySQLConnection, station_id: int):
cur = cxn.cursor()
while True:
data = update(station_id, USE_LINES)
if data is not None:
for t in data['departureData']:
cur.execute('REPLACE INTO vrr '
'(line_code, direction_code, station_id, orig_datetime, status, delay_value)'
' VALUES (%s, %s, %s, '
'from_unixtime(%s), %s, %s)',
(t['lineCode'], t['directionCode'], station_id,
t['orgFullTime'], _make_status_value_for_sql(t), _make_delay_value_for_sql(t['delay'])))
cxn.commit()
next_refresh = get_next_refresh(data)
print("Sleeping until " + datetime.datetime.fromtimestamp(next_refresh).isoformat(), file=sys.stderr)
pause.until(next_refresh)
def main():
cxn = mysql.connector.connect(**db_config)
cursor = cxn.cursor()
cursor.execute(TABLE)
wait(cxn, USE_STATION_ID)
main()