Compare commits

...

8 Commits

4 changed files with 82 additions and 35 deletions

View File

@ -1,7 +1,7 @@
# CLI for First Data conversion rates # CLI for First Data conversion rates
## Requirements ## Requirements
* python3 with modules `PyPDF3`, `appdirs`, `mechanize` * python3 with modules `PyPDF3`, `appdirs`, `mechanize`, `dateutil`
## Usage: ## Usage:
`python3 crawl.py [-t {VISA,MC}] [-g ISO_DATE] [-r] {-i | CURRENCY AMOUNT}` `python3 crawl.py [-t {VISA,MC}] [-g ISO_DATE] [-r] {-i | CURRENCY AMOUNT}`
@ -10,6 +10,7 @@
#### `AMOUNT` #### `AMOUNT`
This must be a number. This must be a number.
#### `CURRENCY` #### `CURRENCY`
This must be the three-letter currency abbreviation, case is irrelevant. This must be the three-letter currency abbreviation, case is irrelevant.
@ -22,16 +23,29 @@ Format: ISO date
#### `-r`, `--direction` #### `-r`, `--direction`
Reverse conversion direction (EUR to specified currency, instead of specified currency to EUR) Reverse conversion direction (EUR to specified currency, instead of specified currency to EUR)
#### `-c`, `--csv`
Write the currency results to standard output, formatted as CSV:
|ISO4217 abbreviation|Full German name|Asking rate|Bidding rate|Date the rate was valid on|
|:---|:---|:---|:---|:---|
#### `-q`, `--quiet`
Do not output informational messages such as "Parsing..." or "Downloading..."
#### `-i`, `--interactive` #### `-i`, `--interactive`
Calculate interactively on stdin Calculate interactively on stdin
##### `q`, `exit`, `quit` ##### `q`, `exit`, `quit`
Quit the program. Quit the program.
##### `AMOUNT CURRENCY` ##### `AMOUNT CURRENCY`
Convert AMOUNT euros to CURRENCY. Convert AMOUNT euros to CURRENCY.
##### `CURRENCY AMOUNT` ##### `CURRENCY AMOUNT`
Convert AMOUNT CURRENCY to euros. Convert AMOUNT CURRENCY to euros.
##### `d`, `date` ##### `d`, `date`
Print the date the data is from. Print the date the data is from.

View File

@ -8,6 +8,7 @@ import pathlib
from datetime import date as DTDate from datetime import date as DTDate
from datetime import datetime as DTDateTime from datetime import datetime as DTDateTime
import appdirs import appdirs
import csv
import utils import utils
@ -15,7 +16,7 @@ import utils
DIRECTION_TO_EUR = 0 DIRECTION_TO_EUR = 0
DIRECTION_FROM_EUR = 1 DIRECTION_FROM_EUR = 1
## Argument parsing # Argument parsing
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Currency conversion using First Data cards.') description='Currency conversion using First Data cards.')
parser.add_argument( parser.add_argument(
@ -38,6 +39,18 @@ parser.add_argument(
action='store_true', action='store_true',
help='Reverse direction (EUR -> currency)' help='Reverse direction (EUR -> currency)'
) )
parser.add_argument(
'-c', '--csv',
dest='csv',
action='store_true',
help='Write the results to stdout as CSV'
)
parser.add_argument(
'-q', '--quiet',
dest='quiet',
action='store_true',
help='Do not output the \'Downloading...\' and \'Parsing...\' messages.'
)
parser.add_argument( parser.add_argument(
'--cache-dir', '--cache-dir',
dest='cache_dir', dest='cache_dir',
@ -134,7 +147,8 @@ def is_float(string: str) -> bool:
def _parse_date_from_args(date_str: str) -> DTDate: def _parse_date_from_args(date_str: str) -> DTDate:
return DTDateTime.strptime(date_str).date() from dateutil.parser import isoparse
return isoparse(date_str).date()
def calc_result(amt: float, rate: utils.Rate, direction: int, duty: float = 0) -> float: def calc_result(amt: float, rate: utils.Rate, direction: int, duty: float = 0) -> float:
@ -143,7 +157,8 @@ def calc_result(amt: float, rate: utils.Rate, direction: int, duty: float = 0) -
elif direction == DIRECTION_TO_EUR: elif direction == DIRECTION_TO_EUR:
result = amt / rate.bid * 1+duty result = amt / rate.bid * 1+duty
else: else:
raise ValueError('direction must be DIRECTION_FROM_EUR or DIRECTION_TO_EUR') raise ValueError(
'direction must be DIRECTION_FROM_EUR or DIRECTION_TO_EUR')
return result return result
@ -159,6 +174,7 @@ def fmt_and_calc(amt: float, cur: str, res: utils.CurrencyResult, direction: str
else: else:
return 'Currency %s could not be found' % cur return 'Currency %s could not be found' % cur
# args = parser.parse_args('USD 1000'.split()) # args = parser.parse_args('USD 1000'.split())
args = parser.parse_args() args = parser.parse_args()
#logger = logging.getLogger('mechanize') #logger = logging.getLogger('mechanize')
@ -186,28 +202,36 @@ else:
if args.cache_dir is not None: if args.cache_dir is not None:
filepath = pathlib.Path(args.cache_dir).resolve() filepath = pathlib.Path(args.cache_dir).resolve()
else: else:
filepath = pathlib.Path(appdirs.user_cache_dir('FirstDataCrawler', 'iwonder')) filepath = pathlib.Path(appdirs.user_cache_dir(
'FirstDataCrawler', 'iwonder'))
if not filepath.exists(): if not filepath.exists():
filepath.mkdir(parents=True) filepath.mkdir(parents=True)
filename = filepath / utils.mk_filename(retrieve_date, use_card_type) filename = filepath / utils.mk_filename(retrieve_date, use_card_type)
if os.path.exists(filename): if os.path.exists(filename):
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
results = utils.get_results_from_pdf(f) results = utils.get_results_from_pdf(f, quiet=args.quiet)
else: else:
buf = utils.get_fileio(retrieve_date, card_type=use_card_type) buf = utils.get_fileio(retrieve_date, card_type=use_card_type, quiet=args.quiet)
with open(filename, 'wb') as f: with open(filename, 'wb') as f:
f.write(buf.read()) f.write(buf.read())
buf.seek(0) buf.seek(0)
results = utils.get_results_from_pdf(buf) results = utils.get_results_from_pdf(buf, quiet=args.quiet)
# #
# processing # processing
# #
if args.interactive: if args.interactive:
try: try:
while True: while True:
_process_stdin(input('> '), results) _process_stdin(input('> '), results)
except (KeyboardInterrupt, EOFError): except (KeyboardInterrupt, EOFError):
sys.exit() sys.exit()
elif args.csv:
w = csv.writer(sys.stdout)
for rate in results.rates.values():
w.writerow([rate.abbr, rate.full_name, rate.ask, rate.bid, rate.date])
else: else:
if args.amt and args.currency:
print(fmt_and_calc(args.amt, args.currency, results, direction)) print(fmt_and_calc(args.amt, args.currency, results, direction))
else:
print("Missing arguments. Exiting", file=sys.stderr)
exit(1)

View File

@ -1,3 +1,4 @@
PyPDF3 PyPDF3
appdirs appdirs
mechanize mechanize
python-dateutil

View File

@ -7,22 +7,23 @@ from collections import namedtuple
from datetime import date as DTDate from datetime import date as DTDate
from datetime import timedelta as DTTimeDelta from datetime import timedelta as DTTimeDelta
from datetime import datetime as DTDateTime from datetime import datetime as DTDateTime
from typing import BinaryIO, List from typing import BinaryIO, List, Dict
from collections import OrderedDict
from sys import stderr
import mechanize as m import mechanize as m
import PyPDF3 import PyPDF3
from dateutil.relativedelta import FR, relativedelta from dateutil.relativedelta import FR, relativedelta
Rate = namedtuple('Rate', ['abbr', 'full_name', 'ask', 'bid']) Rate = namedtuple('Rate', ['abbr', 'full_name', 'ask', 'bid', 'date'])
# Constants # Constants
CARD_MASTERCARD = ['0'] CARD_MASTERCARD = ['0']
CARD_VISA = ['1'] CARD_VISA = ['1']
class CurrencyResult: class CurrencyResult:
def __init__(self): def __init__(self):
self.rates = list() self.rates = Dict[str, Rate]
self.card_type = str() self.card_type = str()
self.date = None self.date = None
@ -63,7 +64,7 @@ def _array_remove_empty(obj: list) -> List[str]:
return obj return obj
def _parse_line(line: str) -> Rate or None: def _parse_line(line: str, ctx: CurrencyResult) -> Rate or None:
arr = line.split(" ") # 3 spaces = minimum separation in PDF arr = line.split(" ") # 3 spaces = minimum separation in PDF
arr = _array_remove_empty(arr) arr = _array_remove_empty(arr)
# process currency name # process currency name
@ -72,13 +73,14 @@ def _parse_line(line: str) -> Rate or None:
abbr=names[0], abbr=names[0],
full_name=names[1].strip("()"), full_name=names[1].strip("()"),
ask=_parse_rate(arr[1]), ask=_parse_rate(arr[1]),
bid=_parse_rate(arr[2]) bid=_parse_rate(arr[2]),
date=ctx.date
) )
return rate return rate
def get_results_from_text(text: str, currency: str = None) -> CurrencyResult: def get_results_from_text(text: str, currency: str = None, quiet: bool = False) -> CurrencyResult:
rates = {} rates = OrderedDict()
result = CurrencyResult() result = CurrencyResult()
lines = text.splitlines() lines = text.splitlines()
# skip intro lines # skip intro lines
@ -92,43 +94,46 @@ def get_results_from_text(text: str, currency: str = None) -> CurrencyResult:
# now the rates begin # now the rates begin
if currency is None: if currency is None:
for line in lines: for line in lines:
line_result = _parse_line(line) line_result = _parse_line(line, result)
rates[line_result.abbr] = line_result rates[line_result.abbr] = line_result
else: else:
pattern = re.compile("^"+currency) pattern = re.compile("^"+currency)
for line in lines: for line in lines:
if pattern.match(line): if pattern.match(line):
line_result = _parse_line(line) line_result = _parse_line(line, result)
rates[line_result.abbr] = line_result rates[line_result.abbr] = line_result
result.rates = rates result.rates = rates
return result return result
def get_results_from_pdf(buf: BinaryIO or str, currency: str = None) -> CurrencyResult: def get_results_from_pdf(buf: BinaryIO or str, currency: str = None, quiet: bool = False) -> CurrencyResult:
print('Parsing data... ', end='') if not quiet:
print('Parsing data... ', end='', file=stderr)
reader = PyPDF3.PdfFileReader(buf) reader = PyPDF3.PdfFileReader(buf)
text = str() text = str()
for num in range(0, reader.getNumPages()-1): for num in range(0, reader.getNumPages()-1):
text += reader.getPage(num).extractText() text += reader.getPage(num).extractText()
print('Done.') if not quiet:
return get_results_from_text(text, currency=currency) print('Done.', file=stderr)
return get_results_from_text(text, currency=currency, quiet=quiet)
def get_fileio(date: DTDate, card_type: List[str] = CARD_VISA) -> BinaryIO: # pylint: disable=dangerous-default-value def get_fileio(date: DTDate, card_type: List[str] = CARD_VISA, quiet: bool = False) -> BinaryIO: # pylint: disable=dangerous-default-value
# pylint: disable=no-member # mechanize.Browser has some lazy-loading methods that pylint doesn't see # pylint: disable=no-member # mechanize.Browser has some lazy-loading methods that pylint doesn't see
print('Downloading rates for ' + date.strftime('%Y-%m-%d') + '... ', end='') if not quiet:
print('Downloading rates for ' + date.strftime('%Y-%m-%d') + '... ', end='', file=stderr)
b = m.Browser() b = m.Browser()
# Firefox 64 User-Agent # Firefox 64 User-Agent
# ua = 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:64.0) Gecko/20100101 Firefox/64.0' # ua = 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:64.0) Gecko/20100101 Firefox/64.0'
# b.set_header('User-Agent', ua) # b.set_header('User-Agent', ua)
# Ignore robots.txt # Ignore robots.txt
# b.set_handle_robots(False) b.set_handle_robots(False)
# Debugging flags # Debugging flags
# b.set_debug_http(True) # b.set_debug_http(True)
# b.set_debug_redirects(True) # b.set_debug_redirects(True)
# b.set_debug_responses(True) # b.set_debug_responses(True)
# PDF URL # PDF URL
b.open('https://misc.firstdata.eu/CurrencyCalculator/fremdwaehrungskurse/pdf') b.open('https://online.firstdata.com/CurrencyCalculator/fremdwaehrungskurse/pdf')
fm = b.forms()[0] fm = b.forms()[0]
# This must be done because I can't change the options otherwise # This must be done because I can't change the options otherwise
fm.set_all_readonly(False) fm.set_all_readonly(False)
@ -140,7 +145,8 @@ def get_fileio(date: DTDate, card_type: List[str] = CARD_VISA) -> BinaryIO: # py
rq = fm.click(name='submitButton', coord=(random.randint(1, 114), random.randint(1, 20))) rq = fm.click(name='submitButton', coord=(random.randint(1, 114), random.randint(1, 20)))
rq.add_header('Accept', '*/*') rq.add_header('Accept', '*/*')
rp = b.retrieve(rq) rp = b.retrieve(rq)
print(' Done.') if not quiet:
print(' Done.', file=stderr)
# Returns an open file-like object with the PDF as contents # Returns an open file-like object with the PDF as contents
return open(rp[0], 'rb') return open(rp[0], 'rb')
@ -164,3 +170,5 @@ def mk_filename(date: DTDate, card_type: List[str]) -> str:
else: else:
raise TypeError("not a valid card type") raise TypeError("not a valid card type")
return fn return fn