vrr-accountability/monitor.py

308 lines
9.9 KiB
Python
Executable File

#!/usr/bin/env python3
import requests
import json
from typing import List
# from pprint import pprint, pformat
import datetime
import pause
import sys
import mysql.connector
import configparser
import argparse
TABLE = """
CREATE TABLE IF NOT EXISTS vrr (
line_code varchar(9) not null,
direction_code varchar(1) not null,
station_id int not null,
orig_datetime datetime not null,
status enum('on_time', 'early', 'late', 'cancelled', 'no_data') not null,
delay_value int,
primary key (line_code, direction_code, station_id, orig_datetime));
"""
class MOT:
LONG_DISTANCE_TRAIN = 0
REGIONAL_TRAIN = 1
COMMUTER_TRAIN = 2
UNDERGROUND_TRAIN = 3
TRAM = 4
BUS = 15
ELEVATED_TRAIN = 6
ALL_MODES = [LONG_DISTANCE_TRAIN, REGIONAL_TRAIN, COMMUTER_TRAIN, UNDERGROUND_TRAIN, TRAM, BUS, ELEVATED_TRAIN]
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--setup", help="Run the setup routine.", action="store_true", dest="setup")
args = parser.parse_args()
if args.setup:
import setup
setup.setup()
sys.exit(0)
# Parse the configuration file:
cfg = configparser.ConfigParser()
try:
with open('vrr.ini') as f:
cfg.read_file(f)
except IOError:
sys.exit("Could not open the configuration file.")
try:
db_config = {
'user': cfg['db']['user'],
'password': cfg['db']['pass'],
'host': cfg['db']['host'],
'database': cfg['db']['database'],
}
USE_MODES = []
if cfg['crawl'].getboolean('use_long_distance'):
USE_MODES.append(MOT.LONG_DISTANCE_TRAIN)
if cfg['crawl'].getboolean('use_regional_trains'):
USE_MODES.append(MOT.REGIONAL_TRAIN)
if cfg['crawl'].getboolean('use_commuter_trains'):
USE_MODES.append(MOT.COMMUTER_TRAIN)
if cfg['crawl'].getboolean('use_trams'):
USE_MODES.append(MOT.TRAM)
if cfg['crawl'].getboolean('use_buses'):
USE_MODES.append(MOT.BUS)
if cfg['crawl'].getboolean('use_elevated_trains'):
USE_MODES.append(MOT.ELEVATED_TRAIN)
if cfg['crawl']['station_id'] is not None:
USE_STATION_ID = cfg['crawl'].getint('station_id')
else:
sys.exit("Please specify a station_id in the [crawl] section of vrr.ini")
USE_LINES = cfg['crawl']['use_lines'].split(',')
except (IndexError, configparser.NoOptionError, configparser.NoSectionError):
sys.exit("There is something wrong with the configuration file. Exiting.")
ALL_LINES = []
TRIP_CANCELLED = -9999
# Initialize Requests session
HTTP = requests.session()
def make_request_data(station_id: int, result_count: int = 8, modes: List = MOT.ALL_MODES,
lines: List[str] = ALL_LINES) -> dict:
"""
Prepare a request data dictionary to put into get_data()
:param station_id: an EFA station ID
:param result_count: how many departures to return
:param modes: which modes of transport to use
:param lines: which lines to use (line identifiers look like 'provider:line ID: :direction ID',
e.g. 'rbg:70070: :H' for the Rheinbahn U70 to Düsseldorf Hbf.
:return: a dictionary with the data necessary to make a request to the Abfahrtsmonitor API.
"""
"""
The request data dictionary can have the following items:
stationID: a numerical EFA station ID
stationName: (optional) the station's name
platformVisibility: (optional) ???
transport: a comma-separated list of the modes of transport to be displayed. See the constants for values.
useAllLines: display all available lines or filter them using the linesFilter
linesFilter: a JSON array with the lines to be displayed. See lines_filter for the format
optimizedForStation: (optional) ???
rowCount: the amount of results to be returned
refreshInterval: (optional) (display parameter) refresh rate in seconds for the browser UI
distance: (optional) (display parameter) distance from the monitor to the stop
marquee: (optional) (display parameter) make the path text scroll sideways
sortBy: (optional) ???
"""
request_data = {
'stationId': int(station_id),
'rowCount': result_count
}
# sanity check: do the modes exist?
for mode in modes:
if mode not in MOT.ALL_MODES:
raise ValueError(str(mode) + "Unknown transport mode!")
# Add the list to the data dictionary
request_data['transport'] = ','.join("{0}".format(n) for n in modes).rstrip(',')
if lines is ALL_LINES:
request_data['useAllLines'] = 1
else:
lines_dictarr = [{'data': v.replace(' ', '+')} for v in lines]
request_data['linesFilter'] = json.dumps(lines_dictarr)
request_data['useAllLines'] = 0
# finally, add the HTML naming
request_data = {"table[departure][{0}]".format(k): v for k, v in request_data.items()}
return request_data
def get_data(request_data: dict, headers: dict = None, cookies: dict = None) -> dict:
url = 'https://abfahrtsmonitor.vrr.de/backend/api/stations/table'
reply = HTTP.post(url, data=request_data, headers=headers, cookies=cookies)
reply.raise_for_status()
print('Request time elapsed: ' + str(reply.elapsed), file=sys.stderr)
return reply.json()
def is_cancelled(trip: dict) -> bool:
if trip['delay'] is not None:
return int(trip['delay']) == TRIP_CANCELLED
return False
def is_late(trip: dict) -> bool:
if trip['delay'] is not None:
return int(trip['delay']) > 0
return False
def is_early(trip: dict) -> bool:
if trip['delay'] is not None:
return int(trip['delay']) < 0 and int(trip['delay']) != TRIP_CANCELLED
return False
def is_on_time(trip: dict) -> bool:
return int(trip['delay']) == 0
def has_realtime(trip: dict) -> bool:
return trip['delay'] != ''
def fixup_data(d: dict) -> dict:
for trip in d['departureData']:
if trip['delay'] == '':
trip['delay'] = None
return d
def print_trip(trip: dict, full_text: bool = False) -> None:
if full_text:
fmt = {
"cancelled": "is cancelled.",
"late": "is {} min late.",
"early": "is {} min early.",
"on_time": "is on time.",
"no_rt": "has no real-time data.",
"trip": "The {}:{} {} service to {} "
}
else:
fmt = {
"cancelled": "cancelled",
"late": "+{} min",
"early": "-{} min",
"on_time": "on time",
"no_rt": "n/a",
"trip": "{}:{} {:<6}-> {:<38}"
}
def fmt_trip(fmt_str: str, fmt_args: List[str] = []) -> str:
return (fmt['trip'] + fmt_str).format(trip['orgHour'], trip['orgMinute'], trip['lineNumber'], trip['direction'],
*fmt_args)
if has_realtime(trip):
if is_cancelled(trip):
print(fmt_trip(fmt['cancelled']))
elif is_late(trip):
print(fmt_trip(fmt['late'], fmt_args=[trip['delay']]))
elif is_early(trip):
print(fmt_trip(fmt['early'], fmt_args=[-trip['delay']]))
elif is_on_time(trip):
print(fmt_trip(fmt['on_time']))
else:
print(fmt_trip(fmt['no_rt']))
def get_next_refresh(data: dict):
times = []
if data is not None:
for trip in data['departureData']:
times.append(trip['orgFullTime'])
times.append(trip['fullTime'])
times = [int(time) for time in times if int(time) > datetime.datetime.now().timestamp()]
times.sort()
for time in times:
if (datetime.datetime.fromtimestamp(time) - datetime.datetime.now()) > datetime.timedelta(seconds=30):
if (datetime.datetime.fromtimestamp(time) - datetime.datetime.now()) > datetime.timedelta(minutes=5):
return (datetime.datetime.now() + datetime.timedelta(minutes=5)).timestamp()
return time
return (datetime.datetime.now() + datetime.timedelta(seconds=90)).timestamp()
def _make_delay_value_for_sql(value: str or int or None) -> int or None:
if value == '':
value = None
else:
value = int(value)
return value
def _make_status_value_for_sql(trip: dict) -> str:
if not has_realtime(trip):
status = 'no_data'
else:
if is_late(trip):
status = 'late'
elif is_cancelled(trip):
status = 'cancelled'
elif is_early(trip):
status = 'early'
elif is_on_time(trip):
status = 'on_time'
else:
raise ValueError("unknown delay value")
return status
def update(station_id: int, lines: List[str]) -> dict or None:
try:
reply_data = get_data(
make_request_data(
station_id,
8,
lines=lines
)
)
except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as e:
print("Could not get the data: " + str(e), file=sys.stderr)
return None
for trip in reply_data['departureData']:
print_trip(trip)
return reply_data
def wait(cxn: mysql.connector.MySQLConnection, station_id: int):
cur = cxn.cursor()
while True:
data = update(station_id, USE_LINES)
if data is not None:
for t in data['departureData']:
cur.execute('REPLACE INTO vrr '
'(line_code, direction_code, station_id, orig_datetime, status, delay_value)'
' VALUES (%s, %s, %s, '
'from_unixtime(%s), %s, %s)',
(t['lineCode'], t['directionCode'], station_id,
t['orgFullTime'], _make_status_value_for_sql(t), _make_delay_value_for_sql(t['delay'])))
cxn.commit()
next_refresh = get_next_refresh(data)
print("Sleeping until " + datetime.datetime.fromtimestamp(next_refresh).isoformat(), file=sys.stderr)
pause.until(next_refresh)
def main():
cxn = mysql.connector.connect(**db_config)
cursor = cxn.cursor()
cursor.execute(TABLE)
wait(cxn, USE_STATION_ID)
main()