first-data-crawler-pdf/crawl.py

226 lines
6.7 KiB
Python
Raw Normal View History

2019-03-27 17:00:04 +01:00
#!/usr/bin/env python3
# pylint: disable=missing-docstring,invalid-name
# pylint: disable=no-member # mechanize.Browser has some lazy-loading methods that pylint doesn't see
# import logging
import argparse
import random
import sys
import os
import pathlib
from typing import BinaryIO, List
from datetime import date as DTDate
from datetime import datetime as DTDateTime
from datetime import timedelta as DTTimeDelta
from dateutil.relativedelta import FR, relativedelta
import appdirs
2019-05-03 23:16:00 +02:00
import utils
2019-03-27 17:00:04 +01:00
import mechanize as m
CARD_MASTERCARD = ['0']
CARD_VISA = ['1']
DIRECTION_TO_EUR = 0
DIRECTION_FROM_EUR = 1
## Argument parsing
parser = argparse.ArgumentParser(
description='Currency conversion using First Data cards.')
parser.add_argument(
'-t', '--card-type',
# argument_default='VISA', # UnsupportedError
choices=['VISA', 'MC'],
dest='card_type',
type=str,
help='Card Type'
)
parser.add_argument(
'-g', '--fetch-date',
dest='date',
type=str,
help='Date to get values for (default: yesterday, Friday on Sat-Mon)'
)
parser.add_argument(
'-r', '--direction',
dest='reverse',
action='store_true',
help='Reverse direction (EUR -> currency)'
)
2019-05-03 23:40:14 +02:00
parser.add_argument(
'--cache-dir',
dest='cache_dir',
help='Override the default cache directory with your own path'
)
2019-03-27 17:00:04 +01:00
exc_group = parser.add_mutually_exclusive_group()
exc_group.add_argument(
'-i', '--interactive',
dest='interactive',
action='store_true',
help='Calculate interactively on stdin'
)
vals_group = exc_group.add_argument_group()
vals_group.add_argument(
'currency',
type=str,
help='Currency abbreviation to convert from/to (e.g. EUR)',
nargs='?'
)
vals_group.add_argument(
'amt',
type=float,
help='Amount',
nargs='?'
)
2019-05-03 23:16:00 +02:00
def _process_stdin(argv: str, res: utils.CurrencyResult) -> None:
argv=argv.split()
2019-03-27 17:00:04 +01:00
try:
2019-05-03 23:16:00 +02:00
if argv[0] in ['q', 'exit', 'quit']:
sys.exit()
elif argv[0] in ['date', 'd']:
print(res.date)
2019-05-03 23:28:20 +02:00
elif len(argv[0])==3 or len(argv[1])==3:
# more than 3 letter abbreviations are invalid
if is_float(argv[0]):
# amount first -> convert to currency in argv[1]
print(fmt_and_calc(
cur=argv[1].upper(),
amt=float(argv[0]),
res=res,
direction=DIRECTION_FROM_EUR))
elif is_float(argv[1]):
# currency first -> convert to EUR
print(fmt_and_calc(
cur=argv[0].upper(),
amt=float(argv[1]),
res=res,
direction=DIRECTION_TO_EUR))
else:
print("Not implemented: '" + " ".join(argv) + "'")
2019-03-27 17:00:04 +01:00
except IndexError:
2019-05-03 23:16:00 +02:00
if argv is None:
2019-03-27 17:00:04 +01:00
pass
else:
print("Too few arguments: '" + " ".join(argv) + "'")
def is_float(string: str) -> bool:
try:
float(string)
return True
except ValueError:
return False
def _retrieve_file(date: DTDate, card_type: List[str] = CARD_VISA) -> BinaryIO: # pylint: disable=dangerous-default-value
2019-05-03 23:40:14 +02:00
print('Downloading newest rates...', end='')
2019-03-27 17:00:04 +01:00
b = m.Browser()
# Firefox 64 User-Agent
# ua = 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:64.0) Gecko/20100101 Firefox/64.0'
# b.set_header('User-Agent', ua)
# Ignore robots.txt
# b.set_handle_robots(False)
# Debugging flags
2019-05-03 23:16:00 +02:00
# b.set_debug_http(True)
# b.set_debug_redirects(True)
# b.set_debug_responses(True)
2019-03-27 17:00:04 +01:00
# PDF URL
b.open('https://misc.firstdata.eu/CurrencyCalculator/fremdwaehrungskurse/pdf')
fm = b.forms()
fm_i = fm[0]
fm_i.set_all_readonly(False)
# Configure form
fm_i['creditCardsRadio'] = card_type # VISA
fm_i['selectedDatesString'] = str(date.strftime('%Y%m%d') + ',')
# Retrieve file using button click
rq = fm_i.click(name='submitButton', coord=(random.randint(1, 119), random.randint(1, 20)))
rq.add_header('Accept', '*/*')
rp = b.retrieve(rq)
2019-05-03 23:40:14 +02:00
print(' Done.')
2019-03-27 17:00:04 +01:00
return open(rp[0], 'rb')
def _get_date() -> DTDate:
if DTDate.today().weekday() in [6, 0]:
date = DTDate.today() + relativedelta(weekday=FR(-1))
else:
date = DTDate.today() - DTTimeDelta(1)
return date
def _parse_date_from_args(date_str: str) -> DTDate:
return DTDateTime.strptime(date_str).date()
2019-05-03 23:16:00 +02:00
def calc_result(amt: float, rate: utils.Rate, direction: int, duty: float = 0) -> float:
2019-03-27 17:00:04 +01:00
if direction == DIRECTION_FROM_EUR:
result = amt * rate.ask / 1+duty
elif direction == DIRECTION_TO_EUR:
result = amt / rate.bid * 1+duty
else:
raise ValueError('direction must be DIRECTION_FROM_EUR or DIRECTION_TO_EUR')
return result
2019-05-03 23:16:00 +02:00
def fmt_and_calc(amt: float, cur: str, res: utils.CurrencyResult, direction: str) -> str:
2019-03-27 17:00:04 +01:00
cur = cur.upper()
2019-05-03 23:16:00 +02:00
if cur in res.rates:
2019-03-27 17:00:04 +01:00
numeric_result = calc_result(amt, res.rates[cur], direction)
if direction == DIRECTION_FROM_EUR:
fmt_vals = ('EUR', round(amt, 2), cur, round(numeric_result, 2))
else:
fmt_vals = (cur, round(amt, 2), 'EUR', round(numeric_result, 2))
return '%s %0.2f = %s %0.2f' % fmt_vals
else:
return 'Currency %s could not be found' % cur
2019-05-03 23:16:00 +02:00
# args = parser.parse_args('USD 1000'.split())
args = parser.parse_args()
#logger = logging.getLogger('mechanize')
#logger.addHandler(logging.StreamHandler(sys.stdout))
#logger.setLevel(logging.DEBUG)
2019-03-27 17:00:04 +01:00
# determine card type
if args.card_type == 'VISA':
use_card_type = CARD_VISA
elif args.card_type == 'MC':
use_card_type = CARD_MASTERCARD
elif args.card_type is None:
use_card_type = CARD_VISA
else:
sys.exit('Unsupported card type ' + args.card_type)
if args.reverse:
direction = DIRECTION_FROM_EUR
else:
direction = DIRECTION_TO_EUR
if args.date:
retrieve_date = _parse_date_from_args(args.date)
else:
retrieve_date = _get_date()
2019-05-03 23:40:14 +02:00
if args.cache_dir is not None:
filepath = pathlib.Path(os.path.abspath(args.cache_dir))
else:
filepath = pathlib.Path(appdirs.user_cache_dir('FirstDataCrawler', 'iwonder'))
2019-03-27 17:00:04 +01:00
if not filepath.exists():
filepath.mkdir(parents=True)
filename = filepath / (retrieve_date.strftime('%Y%m%d') + '.pdf')
if os.path.exists(filename):
with open(filename, 'rb') as f:
2019-05-03 23:16:00 +02:00
results = utils.get_results_from_pdf(f)
2019-03-27 17:00:04 +01:00
else:
buf = _retrieve_file(retrieve_date, card_type=use_card_type)
with open(filename, 'wb') as f:
f.write(buf.read())
buf.seek(0)
2019-05-03 23:16:00 +02:00
results = utils.get_results_from_pdf(buf)
2019-03-27 17:00:04 +01:00
#
# processing
#
if args.interactive:
try:
while True:
2019-05-03 23:16:00 +02:00
_process_stdin(input('> '), results)
except (KeyboardInterrupt, EOFError):
2019-03-27 17:00:04 +01:00
sys.exit()
else:
print(fmt_and_calc(args.amt, args.currency, results, direction))