#!/usr/bin/env python3 import requests import json from typing import List, Dict from io import BytesIO # from pprint import pprint, pformat import datetime import pause import sys import mysql.connector import configparser import argparse from cachecontrol import CacheControl from cachecontrol.caches.file_cache import FileCache VRR_TABLE = """ CREATE TABLE IF NOT EXISTS vrr ( line_code varchar(9) not null, direction_code varchar(1) not null, station_id int not null, orig_datetime datetime not null, status enum('on_time', 'early', 'late', 'cancelled', 'no_data') not null, delay_value int, primary key (line_code, direction_code, station_id, orig_datetime)); """ VRR_LINES_TABLE = """ CREATE TABLE IF NOT EXISTS vrr_lines ( line_code varchar(9) not null primary key, line_number varchar(6) not null); """ class MOT: LONG_DISTANCE_TRAIN = 0 REGIONAL_TRAIN = 1 COMMUTER_TRAIN = 2 UNDERGROUND_TRAIN = 3 TRAM = 4 BUS = 15 ELEVATED_TRAIN = 6 ALL_MODES = [LONG_DISTANCE_TRAIN, REGIONAL_TRAIN, COMMUTER_TRAIN, UNDERGROUND_TRAIN, TRAM, BUS, ELEVATED_TRAIN] class Line: line_code = str line_name = str def __init__(self, code, name): self.line_code = code self.line_name = name last_reply = None parser = argparse.ArgumentParser() parser.add_argument("-s", "--setup", help="Run the setup routine.", action="store_true", dest="setup") parser.add_argument('-c', '--config', help='A configuration file', action='store', default='vrr.ini', dest='config') args = parser.parse_args() if args.setup: import setup setup.setup() sys.exit(0) # Parse the configuration file: cfg = configparser.ConfigParser() try: with open(args.config) as f: cfg.read_file(f) except IOError: sys.exit("Could not open the configuration file.") try: db_config = { 'user': cfg['db']['user'], 'password': cfg['db']['pass'], 'host': cfg['db']['host'], 'database': cfg['db']['database'], } USE_MODES = [] if cfg['crawl'].getboolean('use_long_distance'): USE_MODES.append(MOT.LONG_DISTANCE_TRAIN) if cfg['crawl'].getboolean('use_regional_trains'): USE_MODES.append(MOT.REGIONAL_TRAIN) if cfg['crawl'].getboolean('use_commuter_trains'): USE_MODES.append(MOT.COMMUTER_TRAIN) if cfg['crawl'].getboolean('use_trams'): USE_MODES.append(MOT.TRAM) if cfg['crawl'].getboolean('use_buses'): USE_MODES.append(MOT.BUS) if cfg['crawl'].getboolean('use_elevated_trains'): USE_MODES.append(MOT.ELEVATED_TRAIN) if cfg['crawl']['station_id'] is not None: USE_STATION_ID = cfg['crawl'].getint('station_id') else: sys.exit('Please specify a station_id in the [crawl] section of {}'.format(args.config)) USE_LINES = cfg['crawl']['use_lines'].split(',') except (IndexError, configparser.NoOptionError, configparser.NoSectionError): sys.exit("There is something wrong with the configuration file. Exiting.") ALL_LINES = [] TRIP_CANCELLED = -9999 # Initialize Requests session HTTP = CacheControl(requests.session(), cache=FileCache('.cache')) def make_request_data(station_id: int, result_count: int = 8, modes: List = MOT.ALL_MODES, lines: List[str] = ALL_LINES) -> dict: """ Prepare a request data dictionary to put into get_data() :param station_id: an EFA station ID :param result_count: how many departures to return :param modes: which modes of transport to use :param lines: which lines to use (line identifiers look like 'provider:line ID: :direction ID', e.g. 'rbg:70070: :H' for the Rheinbahn U70 to Düsseldorf Hbf. :return: a dictionary with the data necessary to make a request to the Abfahrtsmonitor API. """ """ The request data dictionary can have the following items: stationID: a numerical EFA station ID stationName: (optional) the station's name platformVisibility: (optional) ??? transport: a comma-separated list of the modes of transport to be displayed. See the constants for values. useAllLines: display all available lines or filter them using the linesFilter linesFilter: a JSON array with the lines to be displayed. See lines_filter for the format optimizedForStation: (optional) ??? rowCount: the amount of results to be returned refreshInterval: (optional) (display parameter) refresh rate in seconds for the browser UI distance: (optional) (display parameter) distance from the monitor to the stop marquee: (optional) (display parameter) make the path text scroll sideways sortBy: (optional) ??? """ request_data = { 'stationId': int(station_id), 'rowCount': result_count } # sanity check: do the modes exist? for mode in modes: if mode not in MOT.ALL_MODES: raise ValueError(str(mode) + "Unknown transport mode!") # Add the list to the data dictionary request_data['transport'] = ','.join("{0}".format(n) for n in modes).rstrip(',') if lines is ALL_LINES: request_data['useAllLines'] = 1 else: lines_dictarr = [{'data': v.replace(' ', '+')} for v in lines] request_data['linesFilter'] = json.dumps(lines_dictarr) request_data['useAllLines'] = 0 # finally, add the HTML naming request_data = {"table[departure][{0}]".format(k): v for k, v in request_data.items()} return request_data def get_data(request_data: dict, headers: dict = None, cookies: dict = None) -> dict: url = 'https://abfahrtsmonitor.vrr.de/backend/api/stations/table' reply = HTTP.post(url, data=request_data, headers=headers, cookies=cookies) last_reply = reply reply.raise_for_status() print('Request time elapsed: ' + str(reply.elapsed), file=sys.stderr) return reply.json() def is_cancelled(trip: dict) -> bool: if trip['delay'] is not None: return int(trip['delay']) == TRIP_CANCELLED return False def is_late(trip: dict) -> bool: if trip['delay'] is not None: return int(trip['delay']) > 0 return False def is_early(trip: dict) -> bool: if trip['delay'] is not None: return int(trip['delay']) < 0 and int(trip['delay']) != TRIP_CANCELLED return False def is_on_time(trip: dict) -> bool: return int(trip['delay']) == 0 def has_realtime(trip: dict) -> bool: return trip['delay'] != '' def fixup_data(d: dict) -> dict: for trip in d['departureData']: if trip['delay'] == '': trip['delay'] = None return d def make_linecode_table(data: dict) -> Dict[str, str]: codes = {} for trip in data['departureData']: if trip['lineCode'] in codes: if trip['lineNumber'] != codes[trip['lineCode']]: print('lineCode {} already present as {}! Replacing it with {}.'.format( trip['lineCode'], codes[trip['lineCode']], trip['lineNumber']) ) codes[trip['lineCode']] = trip['lineNumber'] return codes def print_trip(trip: dict, full_text: bool = False) -> None: if full_text: fmt = { "cancelled": "is cancelled.", "late": "is {} min late.", "early": "is {} min early.", "on_time": "is on time.", "no_rt": "has no real-time data.", "trip": "The {}:{} {} service to {} " } else: fmt = { "cancelled": "cancelled", "late": "+{} min", "early": "-{} min", "on_time": "on time", "no_rt": "n/a", "trip": "{}:{} {:<6}-> {:<38}" } def fmt_trip(fmt_str: str, fmt_args: List[str] = []) -> str: return (fmt['trip'] + fmt_str).format(trip['orgHour'], trip['orgMinute'], trip['lineNumber'], trip['direction'], *fmt_args) if has_realtime(trip): if is_cancelled(trip): print(fmt_trip(fmt['cancelled'])) elif is_late(trip): print(fmt_trip(fmt['late'], fmt_args=[trip['delay']])) elif is_early(trip): print(fmt_trip(fmt['early'], fmt_args=[-trip['delay']])) elif is_on_time(trip): print(fmt_trip(fmt['on_time'])) else: print(fmt_trip(fmt['no_rt'])) def get_next_refresh(data: dict): times = [] if data is not None: for trip in data['departureData']: times.append(trip['orgFullTime']) times.append(trip['fullTime']) times = [int(time) for time in times if int(time) > datetime.datetime.now().timestamp()] times.sort() for time in times: if (datetime.datetime.fromtimestamp(time) - datetime.datetime.now()) > datetime.timedelta(seconds=30): if (datetime.datetime.fromtimestamp(time) - datetime.datetime.now()) > datetime.timedelta(minutes=5): return (datetime.datetime.now() + datetime.timedelta(minutes=5)).timestamp() return time return (datetime.datetime.now() + datetime.timedelta(seconds=90)).timestamp() def _make_delay_value_for_sql(value: str or int or None) -> int or None: if value == '': value = None else: value = int(value) return value def _make_status_value_for_sql(trip: dict) -> str: if not has_realtime(trip): status = 'no_data' else: if is_late(trip): status = 'late' elif is_cancelled(trip): status = 'cancelled' elif is_early(trip): status = 'early' elif is_on_time(trip): status = 'on_time' else: raise ValueError("unknown delay value") return status def update(station_id: int, lines: List[str]) -> dict or None: try: reply_data = get_data( make_request_data( station_id, 8, lines=lines ) ) except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as e: print("Could not get the data: " + str(e), file=sys.stderr) return None for trip in reply_data['departureData']: print_trip(trip) return reply_data def wait(station_id: int): cxn = mysql.connector.MySQLConnection(**db_config) cur = cxn.cursor() while True: data = update(station_id, USE_LINES) if data is not None: line_codes = make_linecode_table(data) for t in data['departureData']: cur.execute('REPLACE INTO vrr ' '(line_code, direction_code, station_id, orig_datetime, status, delay_value)' ' VALUES (%s, %s, %s, ' 'from_unixtime(%s), %s, %s)', (t['lineCode'], t['directionCode'], station_id, t['orgFullTime'], _make_status_value_for_sql(t), _make_delay_value_for_sql(t['delay']))) for k, v in line_codes.items(): cur.execute('REPLACE INTO vrr_lines' '(line_code, line_number)' 'VALUES (%s, %s)', (k, v)) cxn.commit() next_refresh = get_next_refresh(data) print("Sleeping until " + datetime.datetime.fromtimestamp(next_refresh).isoformat(), file=sys.stderr) pause.until(next_refresh) def main(): cxn = mysql.connector.connect(**db_config) cursor = cxn.cursor() cursor.execute(VRR_TABLE) cursor.execute(VRR_LINES_TABLE) cursor.close() cxn.close() try: wait(USE_STATION_ID) except (ValueError, TypeError) as e: with open("fault.json", "wb") as o: if last_reply is not None: o.write(last_reply.content) # requests.Response.content is a ByteIO raise e main()