Download Intraday Stock Data with IEX and Parquet

Post Outline

  • Why IEX?
  • Why Parquet?
  • System Outline
  • Code
  • Links

WHY IEX?

IEX is a relatively new exchange (founded in 2012). For our purposes, what makes them different from other exchanges is they provide a robust FREE API to query their stock exchange data. As a result we can leverage the pandas-datareader framework to query IEX data quite simply. 

WHY PARQUET?

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.
— https://parquet.apache.org/

I don't use Hadoop, however Parquet is a great storage format within the pandas ecosystem as well. It is fast, stable, flexible, and comes with easy compression builtin. I originally learned about the format when some of my datasets were too large to fit in-memory and I started to use Dask as a drop-in replacement for Pandas. It blows away CSV's and I found it more stable and consistent than HDF5 files. 

SYSTEM OUTLINE

This system will query and store ~630 ETF symbol quotes every 30 seconds during market hours. To view the project setup visit the Github Repo. First we start by outlining the system process.

  • The system starts with the iex_downloader.py script. This script will;
    • instantiate the logger,
    • get today's market hours and date
    • handle timezone conversions to confirm the script is only running during market hours
    • if market hours it will query the IEX API format the data and write the data to an interim data storage location
    • if not market hours no data is queried and a warning is issued.
  • The second component is the iex_downloader_utils.py script. This script provides utility functions to format the response data and store it properly.
  • The third component is the iex_eod_processor.py script. This script's tasks are:
    • to run after the end of the market session
    • read the single day's worth of intraday data collected, as a Pandas dataframe (if dataset is too big for memory can switch to Dask)
    • drop any duplicates or NaN rows. 
    • store into a final `processed` data folder as a single compressed file containing one day's worth of compressed intraday quote data.
    • delete the day's data stored `interim`  folder to manage hard disk memory.
  • The final component is the task scheduler. In Linux this is carried out using the `crontab` application. For Windows/Mac systems you will have to adapt the logic to your specific OS. In the `./src/data/iex_cronjob.txt` file I give a template of the tasks that need to be scheduled. These tasks are:
    • Every minute, between 7am-2pm Mountain Time, Monday through Friday run the iex_downloader.py script.
    • Every minute, wait 30 secs, between 7am-2pm Mountain Time, Monday through Friday run the iex_downloader.py script. Note that crontab doesn't have resolution less than a minute so we can overcome that by using a timed delay and repeating a task.
    • 10 minutes after 2pm Mountain Time, Monday through Friday run the iex_eod_processor.py script. 

CODE

First the iex_downloader.py script.


from pathlib import PurePath, Path
import sys
import tzlocal # pip install

## get project dir
pdir = PurePath("/YOUR/DIRECTORY/iex_intraday_equity_downloader")
data_dir = pdir/'data'
script_dir = pdir /'src'/'data'
sys.path.append(script_dir.as_posix())
from iex_downloader_utils import split_timestamp, write_to_parquet

import pandas as pd
import pandas_datareader.data as web
pd.options.display.float_format = '{:,.4f}'.format
import numpy as np
import pandas_market_calendars as mcal # pip install

import pyarrow as pa
import pyarrow.parquet as pq

import logzero
from logzero import logger
#=============================================================================
# get current timestamp

now = pd.to_datetime('today')
#=============================================================================
## setup logger

logfile = PurePath(pdir/'logs'/'equity_downloader_logs'/f'iex_downloader_log_{now.date()}.log').as_posix()
log_format = '%(color)s[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d]%(end_color)s %(message)s'
formatter = logzero.LogFormatter(fmt=log_format, datefmt='%Y-%m-%d %I:%M:%S')
logzero.setup_default_logger(logfile=logfile, formatter=formatter)

#=============================================================================
# confirm market hours

local_tz = tzlocal.get_localzone() # get local timezone via tzlocal package
now_local_tz = now.tz_localize(local_tz) # localize current timestamp
nyse = mcal.get_calendar('NYSE') # get NYSE calendar

nyseToday = nyse.schedule(start_date=now.date(), end_date=now.date())
mktOpen = nyseToday.market_open.iloc[0].tz_convert(local_tz)
mktClose = nyseToday.market_close.iloc[0].tz_convert(local_tz)

if mktOpen <= now_local_tz <= mktClose:  # only run during market hours
    #==========================================================================
    # import symbols

    logger.info('importing symbols...')
    symfp = Path(data_dir/'external'/'ETFList.Options.Nasdaq__M.csv')
    symbols = (pd.read_csv(symfp).Symbol).tolist()

    #==========================================================================
    # request data

    logger.info('requesting data from iex...')
    data = (web.DataReader(symbols,'iex-tops')
            .assign(lastSaleTime=lambda df:pd.to_datetime(df.lastSaleTime,unit='ms'))
            .assign(lastUpdated=lambda df:pd.to_datetime(df.lastUpdated,unit='ms'))
            .pipe(split_timestamp, timestamp=now)
            .dropna())
    # force float conversion for the following columns
    # this is due to a problem reading in the data when schema changes
    # for example when these columns are populated the data is float, when not,
    # value is 0, then int64 dtypes causes schema change and read error
    to_float = ['askPrice','bidPrice','lastSalePrice','marketPercent']
    data.loc[:,to_float] = data.loc[:,to_float].astype(float)

    if data.empty: logger.warn('data df is empty!')
    #==========================================================================
    # store data

    logger.info('storing data to interim intraday_store')
    outfp = PurePath(data_dir/'interim'/'intraday_store').as_posix()
    write_to_parquet(data, outfp, logger=logger)
else:
    logger.warn('system outside of market hours, no data queried')

Next the iex_downloader_utils.py script.


import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

def split_timestamp(df, timestamp):
    # use current timestamp
    df = df.assign(queryTime=lambda df: timestamp,
                   year=lambda df: timestamp.year, # make year
                   month=lambda df: timestamp.month, # make month
                   day=lambda df: timestamp.day, # make day
                   time=lambda df: timestamp.strftime('%H:%M:%S')) # make time
    return df

def write_to_parquet(df, root_path,
                     partition_cols=['year','month','day','time'],
                     logger=None):
    """
    fn: wrapper for pyarrow write_to_dataset

    Params
    ------
    df : pd.DataFrame
        formatted dataframe data
    root_path : str, data store directory
    partition_cols : list of columns (as str dtype) to partition parquet storage directory
    logger : logger object
    """
    if not logger: raise ValueError('must use logger object')
    try:
        table = pa.Table.from_pandas(df)
        pq.write_to_dataset(table, root_path=root_path,
                            partition_cols=partition_cols)
    except Exception as e:
        logger.exception(e)

Next the iex_eod_processor.py script.


from pathlib import PurePath, Path
import sys
import shutil

## get project dir
pdir = PurePath("/YOUR/DIRECTORY/iex_intraday_equity_downloader")
data_dir = pdir/'data'
script_dir = pdir /'src'/'data'
sys.path.append(script_dir.as_posix())

import pandas as pd
pd.options.display.float_format = '{:,.4f}'.format

import logzero
from logzero import logger
#=============================================================================
# get current timestamp

now = pd.to_datetime('today')

#=============================================================================
## setup logger

logfile = PurePath(pdir/'logs'/'equity_downloader_logs'/f'iex_downloader_log_{now.date()}.log').as_posix()
log_format = '%(color)s[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d]%(end_color)s %(message)s'
formatter = logzero.LogFormatter(fmt=log_format, datefmt='%Y-%m-%d %I:%M:%S')
logzero.setup_default_logger(logfile=logfile, formatter=formatter)

#=============================================================================
# read intraday data into one dataframe

logger.info('reading all intraday data for today as dataframe...')
infp = PurePath(data_dir/'interim'/'intraday_store').as_posix()

try:
    df = pd.read_parquet(infp).drop_duplicates().dropna()
    if df.empty: logger.warn('empty dataframe for eod processing')
    #==========================================================================
    # store intraday data into one compressed dataframe

    logger.info('storing all intraday data for today as compressed parquet file...')
    outfp = PurePath(data_dir/'processed'/'intraday_store'/f'etf_intraday_data_{now.date()}.parq')
    df.to_parquet(outfp, engine='fastparquet')

    #==========================================================================
    # delete interim store

    logger.info('deleting all interim intraday data.')
    rmfp = Path(data_dir/'interim'/'intraday_store'/f'year={now.year}')
    shutil.rmtree(rmfp)

except Exception as e:
    logger.error(f'{e}\tlikely no data today: {now.date()}')

Finally the cronjob task template.

* 7-14 * * mon-fri /YOUR/DIRECTORY/anaconda3/envs/iex_downloader_env/bin/python3.6 '/YOUR/DIRECTORY/iex_intraday_equity_downloader/src/data/iex_downloader.py' >> /YOUR/DIRECTORY/iex_intraday_equity_downloader/logs/equity_downloader_logs/iex_downloader_log.log

* 7-14 * * mon-fri sleep 30; /YOUR/DIRECTORY/anaconda3/envs/iex_downloader_env/bin/python3.6 '/YOUR/DIRECTORY/iex_intraday_equity_downloader/src/data/iex_downloader.py' >> /YOUR/DIRECTORY/iex_intraday_equity_downloader/logs/equity_downloader_logs/iex_downloader_log.log

10 14 * * mon-fri /YOUR/DIRECTORY/anaconda3/envs/iex_downloader_env/bin/python3.6 '/YOUR/DIRECTORY/iex_intraday_equity_downloader/src/data/iex_eod_processor.py' >> /YOUR/DIRECTORY/iex_intraday_equity_downloader/logs/equity_downloader_logs/iex_downloader_log.log

When everything is running correctly you should see an example log file that looks like the image below.

iex-downloader-log-output_Screenshot from 2018-06-15 12-08-42.png

The interim folder system will look something like the below image.

interim-intraday-store-file-folder-example-Screenshot from 2018-06-15 12-11-16.png

After the market closes and the eod processor script runs we can import the final dataset into a Jupyter notebook easily.

How many unique symbols?

number-unique-etf-symbols-Screenshot from 2018-06-15 17-27-31.png