- Why IEX?
- Why Parquet?
- System Outline
- Code
- Links

IEX is a relatively new exchange (founded in 2012). For our purposes, what makes them different from other exchanges is they provide a robust FREE API to query their stock exchange data. As a result we can leverage the pandas-datareader framework to query IEX data quite simply.

I don't use Hadoop, however Parquet is a great storage format within the pandas ecosystem as well. It is fast, stable, flexible, and comes with easy compression builtin. I originally learned about the format when some of my datasets were too large to fit in-memory and I started to use Dask as a drop-in replacement for Pandas. It blows away CSV's and I found it more stable and consistent than HDF5 files.

This system will query and store ~630 ETF symbol quotes every 30 seconds during market hours. To view the project setup visit the ** Github Repo.** First we start by outlining the system process.

- The system starts with the
script. This script will;**iex_downloader.py**- instantiate the logger,
- get today's market hours and date
- handle timezone conversions to confirm the script is only running during market hours
- if market hours it will query the IEX API format the data and write the data to an interim data storage location
- if not market hours no data is queried and a warning is issued.

- The second component is the
script. This script provides utility functions to format the response data and store it properly.**iex_downloader_utils.py** - The third component is the
script. This script's tasks are:*iex_eod_processor.py*- to run after the end of the market session
- read the single day's worth of intraday data collected, as a Pandas dataframe (if dataset is too big for memory can switch to Dask)
- drop any duplicates or NaN rows.
- store into a final `processed` data folder as a single compressed file containing one day's worth of compressed intraday quote data.
- delete the day's data stored `interim` folder to manage hard disk memory.

- The final component is the task scheduler. In Linux this is carried out using the `crontab` application. For Windows/Mac systems you will have to adapt the logic to your specific OS. In the `./src/data/iex_cronjob.txt` file I give a template of the tasks that need to be scheduled. These tasks are:
- Every minute, between 7am-2pm Mountain Time, Monday through Friday run the
script.*iex_downloader.py* - Every minute, wait 30 secs, between 7am-2pm Mountain Time, Monday through Friday run the
script. Note that crontab doesn't have resolution less than a minute so we can overcome that by using a timed delay and repeating a task.**iex_downloader.py** - 10 minutes after 2pm Mountain Time, Monday through Friday run the
script.**iex_eod_processor.py**

- Every minute, between 7am-2pm Mountain Time, Monday through Friday run the

First the ** iex_downloader.py **script.

```
from pathlib import PurePath, Path
import sys
import tzlocal # pip install
## get project dir
pdir = PurePath("/YOUR/DIRECTORY/iex_intraday_equity_downloader")
data_dir = pdir/'data'
script_dir = pdir /'src'/'data'
sys.path.append(script_dir.as_posix())
from iex_downloader_utils import split_timestamp, write_to_parquet
import pandas as pd
import pandas_datareader.data as web
pd.options.display.float_format = '{:,.4f}'.format
import numpy as np
import pandas_market_calendars as mcal # pip install
import pyarrow as pa
import pyarrow.parquet as pq
import logzero
from logzero import logger
#=============================================================================
# get current timestamp
now = pd.to_datetime('today')
#=============================================================================
## setup logger
logfile = PurePath(pdir/'logs'/'equity_downloader_logs'/f'iex_downloader_log_{now.date()}.log').as_posix()
log_format = '%(color)s[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d]%(end_color)s %(message)s'
formatter = logzero.LogFormatter(fmt=log_format, datefmt='%Y-%m-%d %I:%M:%S')
logzero.setup_default_logger(logfile=logfile, formatter=formatter)
#=============================================================================
# confirm market hours
local_tz = tzlocal.get_localzone() # get local timezone via tzlocal package
now_local_tz = now.tz_localize(local_tz) # localize current timestamp
nyse = mcal.get_calendar('NYSE') # get NYSE calendar
nyseToday = nyse.schedule(start_date=now.date(), end_date=now.date())
mktOpen = nyseToday.market_open.iloc[0].tz_convert(local_tz)
mktClose = nyseToday.market_close.iloc[0].tz_convert(local_tz)
if mktOpen <= now_local_tz <= mktClose: # only run during market hours
#==========================================================================
# import symbols
logger.info('importing symbols...')
symfp = Path(data_dir/'external'/'ETFList.Options.Nasdaq__M.csv')
symbols = (pd.read_csv(symfp).Symbol).tolist()
#==========================================================================
# request data
logger.info('requesting data from iex...')
data = (web.DataReader(symbols,'iex-tops')
.assign(lastSaleTime=lambda df:pd.to_datetime(df.lastSaleTime,unit='ms'))
.assign(lastUpdated=lambda df:pd.to_datetime(df.lastUpdated,unit='ms'))
.pipe(split_timestamp, timestamp=now)
.dropna())
# force float conversion for the following columns
# this is due to a problem reading in the data when schema changes
# for example when these columns are populated the data is float, when not,
# value is 0, then int64 dtypes causes schema change and read error
to_float = ['askPrice','bidPrice','lastSalePrice','marketPercent']
data.loc[:,to_float] = data.loc[:,to_float].astype(float)
if data.empty: logger.warn('data df is empty!')
#==========================================================================
# store data
logger.info('storing data to interim intraday_store')
outfp = PurePath(data_dir/'interim'/'intraday_store').as_posix()
write_to_parquet(data, outfp, logger=logger)
else:
logger.warn('system outside of market hours, no data queried')
```

Next the ** iex_downloader_utils.py **script.

```
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
def split_timestamp(df, timestamp):
# use current timestamp
df = df.assign(queryTime=lambda df: timestamp,
year=lambda df: timestamp.year, # make year
month=lambda df: timestamp.month, # make month
day=lambda df: timestamp.day, # make day
time=lambda df: timestamp.strftime('%H:%M:%S')) # make time
return df
def write_to_parquet(df, root_path,
partition_cols=['year','month','day','time'],
logger=None):
"""
fn: wrapper for pyarrow write_to_dataset
Params
------
df : pd.DataFrame
formatted dataframe data
root_path : str, data store directory
partition_cols : list of columns (as str dtype) to partition parquet storage directory
logger : logger object
"""
if not logger: raise ValueError('must use logger object')
try:
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path=root_path,
partition_cols=partition_cols)
except Exception as e:
logger.exception(e)
```

Next the ** iex_eod_processor.py** script.

```
from pathlib import PurePath, Path
import sys
import shutil
## get project dir
pdir = PurePath("/YOUR/DIRECTORY/iex_intraday_equity_downloader")
data_dir = pdir/'data'
script_dir = pdir /'src'/'data'
sys.path.append(script_dir.as_posix())
import pandas as pd
pd.options.display.float_format = '{:,.4f}'.format
import logzero
from logzero import logger
#=============================================================================
# get current timestamp
now = pd.to_datetime('today')
#=============================================================================
## setup logger
logfile = PurePath(pdir/'logs'/'equity_downloader_logs'/f'iex_downloader_log_{now.date()}.log').as_posix()
log_format = '%(color)s[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d]%(end_color)s %(message)s'
formatter = logzero.LogFormatter(fmt=log_format, datefmt='%Y-%m-%d %I:%M:%S')
logzero.setup_default_logger(logfile=logfile, formatter=formatter)
#=============================================================================
# read intraday data into one dataframe
logger.info('reading all intraday data for today as dataframe...')
infp = PurePath(data_dir/'interim'/'intraday_store').as_posix()
try:
df = pd.read_parquet(infp).drop_duplicates().dropna()
if df.empty: logger.warn('empty dataframe for eod processing')
#==========================================================================
# store intraday data into one compressed dataframe
logger.info('storing all intraday data for today as compressed parquet file...')
outfp = PurePath(data_dir/'processed'/'intraday_store'/f'etf_intraday_data_{now.date()}.parq')
df.to_parquet(outfp, engine='fastparquet')
#==========================================================================
# delete interim store
logger.info('deleting all interim intraday data.')
rmfp = Path(data_dir/'interim'/'intraday_store'/f'year={now.year}')
shutil.rmtree(rmfp)
except Exception as e:
logger.error(f'{e}\tlikely no data today: {now.date()}')
```

Finally the cronjob task template.

* 7-14 * * mon-fri /YOUR/DIRECTORY/anaconda3/envs/iex_downloader_env/bin/python3.6 '/YOUR/DIRECTORY/iex_intraday_equity_downloader/src/data/iex_downloader.py' >> /YOUR/DIRECTORY/iex_intraday_equity_downloader/logs/equity_downloader_logs/iex_downloader_log.log * 7-14 * * mon-fri sleep 30; /YOUR/DIRECTORY/anaconda3/envs/iex_downloader_env/bin/python3.6 '/YOUR/DIRECTORY/iex_intraday_equity_downloader/src/data/iex_downloader.py' >> /YOUR/DIRECTORY/iex_intraday_equity_downloader/logs/equity_downloader_logs/iex_downloader_log.log 10 14 * * mon-fri /YOUR/DIRECTORY/anaconda3/envs/iex_downloader_env/bin/python3.6 '/YOUR/DIRECTORY/iex_intraday_equity_downloader/src/data/iex_eod_processor.py' >> /YOUR/DIRECTORY/iex_intraday_equity_downloader/logs/equity_downloader_logs/iex_downloader_log.log

When everything is running correctly you should see an example log file that looks like the image below.

The interim folder system will look something like the below image.

After the market closes and the eod processor script runs we can import the final dataset into a Jupyter notebook easily.

How many unique symbols?

**Introduction****Links****Embedded Notebook**

This post focuses on Chapter 3 in the new book Advances in Financial Machine Learning by Marcos Lopez De Prado. In this chapter De Prado demonstrates a workflow for improved return labeling for the purposes of supervised classification models. He introduces multiple concepts but focuses on the Triple-Barrier Labeling method, which incorporates profit-taking, stop-loss, and holding period information, and also meta-labeling which is a technique designed to address several issues. Those issues include how to improve the f1-scores and recall accuracy of a primary model such e.g. a moving average crossover model, and how to reduce the likelihood of overfitting a model by splitting up the decision of which side to trade from the decision to trade at all.

Please note that I am publishing my experimental results with the hope that errors/gaps in my understanding will be corrected by those with better comprehension of the material. Additionally note that, in his book, his example dataset is a long history of a continuous SP500 E-Mini futures time series with tick-level resolution, whereas mine is an admittedly, somewhat dirty, tick series of the IVE ETF.

Github Repo: https://github.com/BlackArbsCEO/Adv_Fin_ML_Exercises

Github Notebook: Link

Parquet dataset for download: Link

**Introduction****Links****Embedded Notebook**

This post explores a concept at the heart of quantitative financial research. Most qfin researchers utilize statistical techniques that require varying degrees of stationarity. As many of you are aware financial time series violate pretty much all the rules of stationarity and yet many researchers, including me, have applied or will apply techniques when not appropriate thereby calling into question many of the resulting conclusions.

In the new book Advances in Financial Machine Learning by Marcos Lopez De Prado he proposes that qfin researchers utilize a different type of price bar. His research has shown that by using alternatives to fixed time interval bars (minute, hour, day, week, etc.), the return series will exhibit better statistical properties. In other words using alternative bar types, the return series will better approximate normality/stationarity which will make our research and conclusions more robust.

In this post we will experiment with the following bar types: *Tick, Volume, Dollar Volume, and Dollar Volume Imbalance. *

Github Repo: https://github.com/BlackArbsCEO/Adv_Fin_ML_Exercises

Github Notebook: Link

**Recap****Chapter Goals and Outline****Links****Embedded Jupyter Notebook**

* See <Mixture Model Trading (Part 1, Part 2, Part 3, Part 4, Part 5, Github Repo)>*. This research demonstrates a systematic trading strategy development workflow from theory to implementation to testing. It focuses on the concept of using Gaussian Mixture Models as a method for return distribution prediction and then using a simple market timing strategy to take advantage of the predicted asset return outliers.

- Demonstrate how to extract algorithm portfolio equity from Quantconnect backtest
- Demonstrate how to predict future return paths using bayesian cones.
- Demonstrate how to estimate distribution of algorithm CAGRs.
- Demonstrate how to use model averaging to aid predictions.

- Read in Algorithm Portfolio Equity
- Choose the Best Algorithm Among 4 Variants
- Choose Best Bayesian Model of Algorithm Returns
- Compare Bayesian Cones for all Algos and all Return Models
- Compare Best Algo Predicted Portfolio Ending Values
- Compare Best Algo Predicted CAGR Distributions
- Model Averaging

**Chapter Goals and Outline****Links****Introduction****Mixture Model Trading Algorithm Outline****GMM Algorithm Implementation****Next Steps**

- Use Part 3 - strategy research as a basis for algorithmic trading strategy.
- Implement strategy using the Quantconnect platform.

This notebook will walkthrough the algorithm implementation process on the quantconnect platform. Please be advised that this notebook will not actually run the algorithm as I have not installed the quantconnect backtesting engine locally. This is a demonstration of the process. The script is available to copy and paste into the quantconnect environment within the ./scripts/ directory of the github repo.

- They use Python 2.7 and I do not know when/if Python 3 will be supported.
- There is no interactive debugger at this time. Troubleshooting can be difficult if your algorithm is not logically structured for modularity.
- There are some minor data issues that their team is hard at work correcting. At times there are trades that get filled that are in error so investigating the trade level data is important and fortunately straightforward to do.
- Calls to the History() function create major RAM/time penalties so it is important to code your algorithm to be efficient with its data requests.

The algorithm will use Gaussian Mixture Models (GMM) to determine return outliers. Based on outlier direction the algorithm will go long (or short) the ETF. Based on the research conducted in chapter 3 I determined one tradeable pattern to be a long-only strategy with a 63 day holding period, post outlier event. The basic structure of the algorithm is:

Check open orders:

- confirm all orders are filled
- track fill dates

Check if any current holdings meet liquidation criteria.

*In this implementation the only liquidation criteria is whether we have held the security for the 63 day period.*- check if today's date is greater than or equal to liquidation date.
- if so liquidate the position.

Run the main algorithm computation. In this implementation we use a lookback of 252 days or approximately 1 trading year.

- fit the GMM using N components.
- extract hidden states and their parameters
- sample from the chosen distribution using those parameters
- compute confidence intervals
- compare intervals with current return to identify outliers
- assess direction of outliers e.g. too_low or too_high
- assign securities to long (or short) based on direction of outliers

Use computed results to send orders.

- this implementation uses MarketOnOpenOrders. This means that market orders are sent for the next day's open after an outlier event is triggered.

First the Quantconnect algorithm imports

```
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Indicators")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
import pandas as pd
import numpy as np
from math import ceil, floor
import scipy.stats as stats
import sklearn.mixture as mix
from datetime import datetime, timedelta
import time
import json
```

Next we setup a **PARAMETER_REGISTRY**. This helps associate the chosen set of parameters with each backtest. Without it there is no way to know what parameters were used with which backtest when you go to compare results at a later date. However by registering the parameters we can log them. These backtest logs are always available for download when you load the results of your backtest.

```
# ------------------------------------------------------------------------------
# setup parameter registry
# ------------------------------------------------------------------------------
PARAMETER_REGISTRY = {}
def register_param(name, value):
PARAMETER_REGISTRY[name] = value
return value
```

Next up we define and *register* the **global** parameters that the algorithm class will use. These parameters contain a flag which logs whether the strategy was implemented as long-only, the number of samples for our confidence interval sampling, the chosen distribution we are using, and the parameters for the sklearn GMM we will implement.

```
# strategy information
is_long_only = register_param('is_long_only', True)
N_SAMPLES = register_param('n samples (bootstrapping distr.)', 1000)
### choose distribution ###
sample_distr = register_param('sampling distr', 'normal distribution')
#sample_distr = register_param('sampling distr', 'laplace')
#sample_distr = register_param('sampling distr.', 'johnsonsu')
### if using jsu register a, b parameters ###
#a, b = register_param('a (jsu)', 0.2), register_param('b (jsu)', 0.9)
### gmm init variables ###
RANDOM_STATE = register_param('random state', 777)
ALPHA = register_param('alpha', 0.95) # for sampling confidence intervals
N_COMPONENTS = register_param('n components (GMM)', 4)
MAX_ITER = register_param('max iterations (GMM)', 100)
N_INIT = register_param('n inits (GMM)', 25)
```

Next we define a couple of **global** functions to make the algorithm computation a little simpler.

```
# ------------------------------------------------------------------------------
# global funcs
# ------------------------------------------------------------------------------
def make_gmm(n_components=N_COMPONENTS, max_iter=MAX_ITER,
n_init=N_INIT, random_state=RANDOM_STATE):
"""fn: create gmm object"""
model_kwds = dict(n_components=n_components,
max_iter=max_iter,
n_init=n_init,
init_params='random',
random_state=random_state)
gmm = mix.GaussianMixture(**model_kwds)
return gmm
def make_returns(df):
return np.log(df/df.shift(1)).dropna()
```

Now we define the algorithm class which will implement the strategy. In quantconnect all algorithms are a class with at least 2 functions defined: Initialize() and OnData().

Initialize contains the algorithm setup including universes, class level objects, brokerage models, and scheduled functions.

OnData is the event handler that is called at the resolution we choose e.g. minute, hour, daily. However because this algorithm uses scheduled functions this function is not needed and is simply pass(ed).

```
# ------------------------------------------------------------------------------
# algorithm
# ------------------------------------------------------------------------------
class TradingWithGMM(QCAlgorithm):
"""Algorithm which implements GMM framework"""
def Initialize(self):
'''All algorithms must initialized.'''
self.SetStartDate(2007,1,1) #Set Start Date
self.SetEndDate(2017,12,31) #Set End Date
self.SetCash(100000) #Set Strategy Cash
# -----------------------------------------------------------------------------
# init brokerage model, important for realistic slippage/commission modeling
# especially important if using leverage which requires margin account
# -----------------------------------------------------------------------------
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage,
AccountType.Margin)
# -----------------------------------------------------------------------------
# init custom universe
# -----------------------------------------------------------------------------
symbol_list = ["SPY", "QQQ", "DIA", "EFA", "EEM", "TLT", 'AGG', 'LQD', "GLD"]
self.symbols = register_param('symbols', symbol_list)
for sym in self.symbols: self.AddEquity(sym, Resolution.Minute)
# note that the `AddEquity` resolution is `Minute`
# this impacts how often `OnData` is called which determines whether
# scheduled functions are called by Minute, Hour, or Daily
# -----------------------------------------------------------------------------
# init placeholders
# -----------------------------------------------------------------------------
self.openMarketOnOpenOrders = []
self._longs = False
self._shorts = False
# -----------------------------------------------------------------------------
# other algo parameter settings
# -----------------------------------------------------------------------------
self.HOLDING_PERIOD = register_param('holding period (days)', 63)
self.LOOKBACK = register_param('lookback (days)', 252)
self.BET_SIZE = register_param('bet size', 0.05)
self.LEVERAGE = register_param('leverage', 1.)
# -----------------------------------------------------------------------------
# track RAM and computation time for main func, also leverage and cash
# -----------------------------------------------------------------------------
self.splotName = 'Strategy Info'
sPlot = Chart(self.splotName)
sPlot.AddSeries(Series('RAM', SeriesType.Line, 0))
sPlot.AddSeries(Series('Time', SeriesType.Line, 1))
sPlot.AddSeries(Series('Leverage', SeriesType.Line, 2))
sPlot.AddSeries(Series('Cash', SeriesType.Line, 3))
self.AddChart(sPlot)
self.time_to_run_main_algo = 0
# -----------------------------------------------------------------------------
# scheduled functions
# -----------------------------------------------------------------------------
# make buy list
self.Schedule.On(
self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday),
self.TimeRules.AfterMarketOpen("SPY", 10),
Action(self.run_main_algo))
# send orders
self.Schedule.On(
self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday),
self.TimeRules.AfterMarketOpen("SPY", 30),
Action(self.send_orders))
# check trade dates and liquidate if date condition
self.Schedule.On(
self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Friday),
self.TimeRules.AfterMarketOpen("SPY", 35),
Action(self.check_liquidate))
# plot RAM
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.AfterMarketOpen("SPY", 40),
Action(self.CHART_RAM))
# -----------------------------------------------------------------------------
# initialize historical prices
# cache the price data so we don't have to request the entire df for
# every self.History() call
# -----------------------------------------------------------------------------
self.prices = (self.History(self.symbols, self.LOOKBACK, Resolution.Daily)
["close"]
.unstack(level=0)
.astype(np.float32))
# -----------------------------------------------------------------------------
# LOG PARAMETER REGISTRY
# this makes it easy to link backtest parameter settings with the saved results
# by logging/printing the information at the top of every backtest log
# -----------------------------------------------------------------------------
self.Debug('\n'+'-'*77+'\nPARAMETER REGISTRY\n{}...'.format(
json.dumps(PARAMETER_REGISTRY, indent=2)
))
```

The initialize function has a lot going on. In addition to setting the parameters we create the custom charts to track leverage, cash, RAM usage, and computation time.

A quick note on the schedule functions; The way to read it is that the main functions are scheduled twice weekly on Monday and Friday to run after the market opens for the SPY etf at the designated number of minutes afterwards. The Action is the function we want to run at that time.

Another important note is that we initialize our price history dataframe. We call it **once** here for the full 252 day lookback. Later we define a function called update_prices() which computes the number of additional days of history to request between the current date and the last date of our self.prices dataframe. Then it requests only that limited history, concatenates and cleans up the data so we only have data for the specified lookback period. This methodology saves massive RAM/time during the backtest runs.

```
def update_prices(self):
"""fn: to update prices in an efficient manner"""
# get last date of stored prices
most_recent_date = self.prices.index.max()
current_date = self.Time
# request only days that are missing from our dataset
days_to_request = (current_date - most_recent_date).days
# if prices up to date return
if days_to_request==0:
return
# get prices
new_prices = (self.History(self.symbols, days_to_request, Resolution.Daily)
["close"]
.unstack(level=0)
.astype(np.float32))
self.prices = pd.concat([self.prices, new_prices]) # combine datasets
# clean it up and keep only lookback period
self.prices = self.prices.drop_duplicates().sort_index().iloc[-self.LOOKBACK:]
return
```

Next we define the check_liquidate() function which implements numbers 1 and 2 from the algorithm outline specified above.

```
def check_liquidate(self):
"""fn: to check if todays date matches exit date and liquidate"""
self.Log('\n'+'-'*77+'\n[{}] checking liquidation status...'.format(self.UtcTime))
orders = self.Transactions.GetOrders(None)
if orders: pass
else: return
# current time is gt_eq order time + holding period
crit1 = lambda order: self.UtcTime >= (order.Time + timedelta(self.HOLDING_PERIOD))
# order time is within today - holding period window
# 7 day overlap between crit1 and crit2
crit2 = lambda order: order.Time >= (self.UtcTime - timedelta(self.HOLDING_PERIOD + 7))
for order in orders:
if crit1(order) & crit2(order):
if self.Portfolio[order.Symbol].Invested:
self.Liquidate(order.Symbol)
fmt_args = (self.UtcTime, order.Symbol, order.Time, self.UtcTime - order.Time)
self.Log('[{}] liquidating {}, order date: {}, time delta: {}'.format(*fmt_args))
```

Next we define two functions to implement the main algorithm computation. First we define the function compute() which takes a single symbol, fits the GMM, extracts the hidden states and their parameters and determines if any outlier events have occurred.

Then we define the run_main_algo() function which aggregates the compute() information into a dataframe from a list of rows **if** and only **if** outlier events have occurred. This is also to save RAM/time. This function constructs the long (and/or short) **numpy arrays** that will be sent to the send_orders() function.

```
def compute(self, sym):
"""fn: computation for bootstrapped confidence intervals for individual symbol"""
train_px = self.prices[sym]
train_df = make_returns(train_px)
tmp_x = train_df.reshape(-1, 1)
### fit GMM ###
gmm = make_gmm().fit(tmp_x)
hidden_states = gmm.predict(tmp_x)
### get last state estimate ###
last_state = hidden_states[-1]
last_mean = gmm.means_[last_state]
last_var = np.diag(gmm.covariances_[last_state])
### sample from distribution using last state parameters ###
### must match distribution selected in global parameter section ###
## normal distribution ##
rvs = stats.norm.rvs(loc=last_mean, scale=np.sqrt(last_var),
size=N_SAMPLES, random_state=RANDOM_STATE)
low_ci, high_ci = stats.norm.interval(alpha=ALPHA,
loc=np.mean(rvs), scale=np.std(rvs))
## laplace distribution ##
#rvs = stats.laplace.rvs(loc=last_mean, scale=np.sqrt(last_var),
# size=N_SAMPLES, random_state=RANDOM_STATE)
#low_ci, high_ci = stats.laplace.interval(alpha=ALPHA,
# loc=np.mean(rvs), scale=np.std(rvs))
## johnson su distribution ##
#rvs = stats.johnsonsu.rvs(a=a, b=b,
# loc=last_mean, scale=np.sqrt(last_var),
# size=N_SAMPLES, random_state=RANDOM_STATE)
#low_ci, high_ci = stats.johnsonsu.interval(alpha=ALPHA,
# a=a, b=b,
# loc=np.mean(rvs), scale=np.std(rvs))
## get current return ##
tmp_ret = np.log(float(self.Securities[sym].Price) / train_px.iloc[-1])
r_gt = (tmp_ret > high_ci)
r_lt = (tmp_ret < low_ci)
if r_gt: result_tag = 'too_high'
elif r_lt: result_tag = 'too_low'
else: result_tag = 'hit'
### row order: (symbol, low ci, high ci, current return, result_tag) ###
sym_row = (sym, low_ci, high_ci, tmp_ret, result_tag)
return sym_row
def run_main_algo(self):
"""fn: run main algorithm computation"""
start_time = time.time()
self.Log('\n'+'-'*77+'\n[{}] Begin main algo computation...'.format(self.UtcTime))
### set buy/sell lists to False to confirm no carryover ###
self._longs = False
self._shorts = False
### update prices ###
self.update_prices()
### compute data ###
tmp_data_list = [self.compute(asset)
for asset in self.prices.columns
if not self.Portfolio[asset].Invested]
### construct long/short arrays ###
if tmp_data_list:
cols = ['symbol', 'low_ci', 'high_ci', 'current_return', 'result_tag']
df = (pd.DataFrame(tmp_data_list, columns=cols))
self.Log('[{}] algo data:\n\t{}'.format(self.UtcTime, df))
### Choose between mean reversion algorithm ###
self._longs = np.asarray(df.query('result_tag=="too_low"')['symbol'].unique())
#self._shorts = np.asarray(df.query('result_tag=="too_high"')['symbol'].unique())
### or breakout strategy ###
#self._longs = np.asarray(df.query('result_tag=="too_high"')['symbol'].unique())
#self._shorts = np.asarray(df.query('result_tag=="too_low"')['symbol'].unique())
log_str = (self.UtcTime, self._longs, self._shorts)
self.Log('\n'+'-'*77+'\n[{0}] longs: {1}\n[{0}] shorts: {2}'.format(*log_str))
else:
self.Log('[{}] already fully invested, exiting...'.format(self.UtcTime))
self.time_to_run_main_algo = time.time() - start_time
return
```

Next we define the send_orders() function which is responsible for sending the orders and updating our list of order tickets contained in the self.openMarketOnOpenOrders list. It contains some checks for efficiency and error handling purposes.

```
def send_orders(self):
"""fn: send orders"""
self.Log('\n'+'-'*77+'\n[{}] checking L/S arrays to send orders...'.format(self.UtcTime))
### confirm lists are proper array datatype ###
if isinstance(self._shorts, np.ndarray):
if self._shorts.size: # confirm not empty
for sym in self._shorts:
if not self.Portfolio[sym].Invested: # only send order if not invested
self.Log('[{}] sending short order for {}...'.format(self.UtcTime, sym))
short_shares = self.CalculateOrderQuantity(sym, -self.LEVERAGE*self.BET_SIZE)
newTicket = self.MarketOnOpenOrder(sym, short_shares)
self.openMarketOnOpenOrders.append(newTicket) # track ticket
else:
self.Log('[{}] no shorts listed, no orders sent...'.format(self.UtcTime))
### confirm lists are proper array datatype ###
if isinstance(self._longs, np.ndarray):
if self._longs.size: # confirm not empty
for sym in self._longs:
if not self.Portfolio[sym].Invested: # only send order if not invested
self.Log('[{}] sending long order for {}...'.format(self.UtcTime, sym))
long_shares = self.CalculateOrderQuantity(sym, self.LEVERAGE*self.BET_SIZE)
newTicket = self.MarketOnOpenOrder(sym, long_shares)
self.openMarketOnOpenOrders.append(newTicket) # track ticket
else:
self.Log('[{}] no longs listed, no orders sent...'.format(self.UtcTime))
return
```

Finally we define our CHART_RAM() function which actually tracks RAM usage, computation time, leverage and cash. We also define the OnData() function which we simply pass as all functions are scheduled.

```
def CHART_RAM(self):
"""fn: to track Ram, Computation Time, Leverage, Cash"""
self.Plot(self.splotName,'RAM', OS.ApplicationMemoryUsed/1024.)
self.Plot(self.splotName,'Time', self.time_to_run_main_algo)
P = self.Portfolio
self.track_leverage = P.TotalAbsoluteHoldingsCost / P.TotalPortfolioValue
self.Plot(self.splotName, 'Leverage', float(self.track_account_leverage))
self.Plot(self.splotName, 'Cash', float(self.Portfolio.Cash))
def OnData(self, data):
'''OnData event is the primary entry point for your algorithm.
Each new data point will be pumped in here.
Not always necessary especially when using scheduled functions
'''
pass
```

Again the full script can be found in the ./scripts/ directory of the github repo. Sign up to Quantconnect.com and paste the script into the Algorithm Lab (backtesting) environment. Test the algorithm with various parameters and see what you discover.

In part 5 we will evaluate the results of my backtests using 1,2, and 4 GMM components

]]>**Introduction****Links****Notebook****Next Steps**

This is the beginning of a three part series that I completed towards the end of 2017 as a learning module for Quantinsti.com. **Th****e purpose of the series is to demonstrate a research workflow focused around the theory and application of mixture models as the core framework behind a algorithmic trading strategy. **Below is a quote taken from the README of the github repo:

“The primary goal of this repo is to demonstrate the workflow between research of a quantitative idea or theory to implementation as a potential live trading strategy. Unlike other finance based tutorials the results will not be cherry picked to show only the best of the best examples. Sometimes results are counterintuitive, sometimes they are conflicting. Real strategy development is often dirty, complex, full of starts and stops and requires us to use all of our skills to extract the signal from the noise. With that said I welcome interactive engagement, ideas, insight, and constructive criticism, especially if errors or bugs are found.”

— https://github.com/BlackArbsCEO/mixture_model_trading_public

I will be presenting each of the notebooks on the blog although you can feel free to read ahead by visiting the github repo directly.** What is new however is that at the end of three part series I will be publishing a Part 4 where I will describe an actual implementation of the strategy and release the code for the actual algorithm for my readers to dissect, alter, and experiment with on the Quantconnect.com platform. **

Be on the lookout for the brand new **Part 4 - Algorithm Implementation.**

**Introduction****Links****Notebook****Next Steps**

This is the beginning of a three part series that I completed towards the end of 2017 as a learning module for Quantinsti.com. **Th****e purpose of the series is to demonstrate a research workflow focused around the theory and application of mixture models as the core framework behind a algorithmic trading strategy. **Below is a quote taken from the README of the github repo:

“The primary goal of this repo is to demonstrate the workflow between research of a quantitative idea or theory to implementation as a potential live trading strategy. Unlike other finance based tutorials the results will not be cherry picked to show only the best of the best examples. Sometimes results are counterintuitive, sometimes they are conflicting. Real strategy development is often dirty, complex, full of starts and stops and requires us to use all of our skills to extract the signal from the noise. With that said I welcome interactive engagement, ideas, insight, and constructive criticism, especially if errors or bugs are found.”

— https://github.com/BlackArbsCEO/mixture_model_trading_public

I will be presenting each of the notebooks on the blog although you can feel free to read ahead by visiting the github repo directly.** What is new however is that at the end of three part series I will be publishing a Part 4 where I will describe an actual implementation of the strategy and release the code for the actual algorithm for my readers to dissect, alter, and experiment with on the Quantconnect.com platform. **

Be on the lookout for **Part 3 - Strategy Research.**

**Introduction****Links****Notebook****Next Steps**

This is the beginning of a three part series that I completed towards the end of 2017 as a learning module for Quantinsti.com. **Th****e purpose of the series is to demonstrate a research workflow focused around the theory and application of mixture models as the core framework behind a algorithmic trading strategy. **Below is a quote taken from the README of the github repo:

I will be presenting each of the notebooks on the blog although you can feel free to read ahead by visiting the github repo directly.** What is new however is that at the end of three part series I will be publishing a Part 4 where I will describe an actual implementation of the strategy and release the code for the actual algorithm for my readers to dissect, alter, and experiment with on the Quantconnect.com platform. **

Be on the lookout for **Part 2 - Gaussian Mixtures. **

**Introduction****Links + Datasets****Notebook****Next Steps**

This article series provides an opportunity to move towards more interactive analysis. My plan is to integrate more **Jupyter notebooks** and **Github repos** into my research/publishing workflow. For datasets that are too big to share through github I will provide a download link both here and in the github readme.

I will be posting the notebooks into this blog using iframes. If you experience any issues with formatting I recommend viewing the notebook at github directly. If you're using mobile, you will have to "request the desktop site" for the ipynb to render.

- Github Repo
- Notebook Link
- Raw Hourly Options Data from 2017-09-13 to 2017-10-18
- Processed Hourly Options Data from 2017-09-13 to 2017-10-18
****Note: please select download all**

The next step in this process will be analysis of the skew metric and how we might apply it to develop a trading strategy.

]]>**Purpose****Intuitive explanation****Code****Next Steps**

This is a simple reference article for readers that might wonder where I get/got my options data from. In this regard I would like to shout out the contributors to the pandas-datareader, without their efforts this process would be much more complex.

So this code consists of three components. The first is the actual script that wraps the pandas-datareader functions and downloads the options data. The second is a helper script to save the aggregated data to disk. The helper script which I call *file_handler* is designed to save the data in multiple formats in a structured file directory. Internally it checks to see if today's folder is created with a particular date and naming convention, if it isn't it will create the folder and then store all the data files there. What gives this code the ability to aggregate intraday data is the third component which simply requires making use of your system's task scheduler. For example, if you have Linux/Ubuntu you can package this script to run as a cronjob quite easily. After the code below I show an example cronjob template that works.

```
import sys
import os
import time
PROJECT_DIR = '/YOUR/CODE/DIR/option_skew_project/'
sys.path.append(PROJECT_DIR)
from pandas_datareader.data import Options
import pandas as pd
pd.options.display.float_format = '{:,.4f}'.format
import numpy as np
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")
from file_handler import file_handler
# -----------------------------------------------------------------------------
# import symbols
# -----------------------------------------------------------------------------
symbols = (pd.read_csv(PROJECT_DIR+'data/symbols.csv', header=None, index_col=False).rename(columns={0:'symbols'}))
# ------------------------------------------------------------------------------
# define conv. fn.
# ------------------------------------------------------------------------------
def cprint(df):
print('-'*50)
print(df.sample(5))
print()
print(df.info())
print()
def random_wait():
"""fn: randomly choose a wait time based on probability"""
wait_times = [0.2, 0.5, 1, 2]
probs = [0.3, 0.4, 0.2, 0.1 ]
choice = np.random.choice(wait_times, size=1, p=probs)
return choice
# ------------------------------------------------------------------------------
# init file handler
# ------------------------------------------------------------------------------
fh = file_handler(PROJECT_DIR)
# ------------------------------------------------------------------------------
# run aggregation func
# ------------------------------------------------------------------------------
errors = []
dfs_dict = {}
for sym in tqdm(symbols.symbols.values):
print('-'*50)
print('downloading {} ...'.format(sym))
try:
tmp_df = Options(sym, 'yahoo').get_all_data()
dfs_dict[sym] = tmp_df
except Exception as e:
errors.append(sym)
print('{} error: {}'.format(sym, e))
continue
else:
print('{} complete'.format(sym))
print()
time.sleep(random_wait())
# ------------------------------------------------------------------------------
# concat dfs drop unnecessary columns
# ------------------------------------------------------------------------------
data = (pd.concat(list(dfs_dict.values())).drop(['JSON'], axis=1))
error_series = pd.Series(errors)
cprint(data)
print(error_series)
# ------------------------------------------------------------------------------
# save data
# ------------------------------------------------------------------------------
fh.save_data(error_series, format='csv', resolution='date', errors=True)
try:
fh.save_data(data, format='parquet')
except Exception as e:
print(e)
fh.save_data(data, format='h5')
```

This is the code for the *file_handler* script. It can save in 1 of the following 4 formats: parquet, h5, feather, csv. I save the list of symbol errors as a CSV since this list is generally quite small. As seen above I save the options data in parquet format first, and a backup in the form of an h5 file. Generally I prefer to work with parquet files because the are compressed by default, contain metadata, and integrate better with the Dask. This code requires the installation of the pyarrow package.

```
import os
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
class file_handler:
'''
class for handling directory/folder creation + data saving
Attributes
project_dir : str(), main project directory
Methods
save_data : actual public save function
|__> _create_dir : internal fn to create dir if it does not exist
|_> __check_exists_or_create : private fn to check if file exists
|__> __create_date_str : private fn to create date str
|__> __create_timestamp_str : private fn to create timestamp str
'''
def __init__(self, project_dir):
self.project_dir = project_dir
def __check_exists_or_create(self, _dir):
"""fn: to check if file/path exists"""
if not os.path.exists(_dir):
try:
os.mkdir(_dir)
except Exception as e:
print(e)
return
def _create_dir(self):
"""fn: create daily directory if not already created"""
_dir = self.project_dir+'/Yahoo_Options_Data/'+str(pd.to_datetime('now').date())+'/'
self.__check_exists_or_create(_dir)
return _dir
def __create_timestamp_str(self):
"""fn: to create time stamp str"""
return str(pd.to_datetime('now').tz_localize('utc').tz_convert('US/Eastern')).replace(' ', '_').replace(':','.')
def __create_date_str(self):
"""fn: to create date str"""
return str(pd.to_datetime('now').date())
def save_data(self, data, format='parquet', resolution='time', errors=False):
"""fn: to save data to directory
Args
data : pd.DataFrame
format : str, ('parquet', 'h5', 'csv', 'feather')
resolution : str, date or time
if date uses default str format,
if time will use YYYY-MM-DD_HH.MM.SS
errors : bool,
if True change filepath name
if False use options data filepath name
"""
_dir = self._create_dir()
if resolution=='time':
_timestamp = self.__create_timestamp_str()
elif resolution=='date':
_timestamp = self.__create_date_str()
if errors:
_fp = _dir + f'yahoo_options_scraper_errors_{_timestamp}.{format}'
else:
_fp = _dir + f'yahoo_options_data_{_timestamp}.{format}'
if format=='parquet':
_table = pa.Table.from_pandas(data)
pq.write_table(_table, _fp)
elif format == 'h5': data.to_hdf(_fp, key='data')
elif format == 'csv': data.to_csv(_fp, index=False)
elif format == 'feather': data.to_feather(_fp)
return
```

Finally, below is an example of my cronjob. It is set to run Monday through Friday, hourly, from market open to close. Note the log directory and log file after the **">>"**; all the print statements contained in the script will output to that log file including any exceptions.

** 30 7-15 * * 1-5 /YOUR/CODE/DIR/option_skew_project/scripts/options_downloader.py' >> /YOUR/LOG/DIR/options_downloader_cronlog.log 2>&1**

The next article will document the code I refactored to calculate the option skew metric from the paper "What Does Individual Option Volatility Smirk Tell Us About Future Equity Returns?" by Yuhang Xing, Xiaoyan Zhang and Rui Zhao. If you have been a long time reader, you may recall I did a series where I tracked a theoretical ETF equity strategy that was based on this metric. Over time, people have asked how it is performing, and I did not have an answer because I stopped tracking it, as I have been busy with other projects. However, the strategy showed promise then and I wondered if it could be applied directly in options trading. My goal is to research the possibility of implementing this strategy live, and if the results show an edge, implementing it and tracking the results publicly.

To accomplish this task I first needed to gather data which this article shows. In the next article I make heavy use of Dask because the volume of intraday data aggregated over a month is over 14 million rows and operating on the dataframe in-memory is slow and/or unfeasible on most people's systems including mine.

Additionally the next article will be a jupyter notebook I will embed as a blog post here directly, but recommend it be viewed on the github repo I will make public.

]]>- Notes on Part-2
- The Data
- Bid-Ask Spread Analysis
- How Do Aggregate Bid-Ask Spreads Vary with Days To Expiration?
- How Do Bid-Ask Spreads Vary with Volume?
- How Do Bid-Ask Spreads Vary with Volatility?

- Summary Conclusions

Some astute readers in the comments noted that analysis based on the absolute difference in bid-ask price is not robust when considering the price of the underlying option and can lead to spurious conclusions. They recommended defining bid-ask spread as a percent of the option's spot price.

Additionally, I failed to constrain the analysis to include only options with a certain level of "moneyness". That is, options far away from the strike price behave differently than options that are closer, and the prior analysis failed to incorporate that understanding. In Part 2 of this exploration we re-examine the conclusions drawn in Part-1, after incorporating the aforementioned suggestions. With that said, this post will largely follow the format of Part-1, so if you feel you are missing context for this analysis start there.

The data is a cleaned **hdf5**/**.h5** file comprised of a collection of daily options data collected over the period of 05/17/2017 to 07/24/2017. By cleaned I mean I aggregated the daily data into one set, removed some unnecessary columns, cleaned up the data types and added the underlying ETF prices from Yahoo. I make no claims about the accuracy of the data itself, and I present it as is. It is approximately a 1 GB in size and I have made it available for download at the following link:

Options Data

To import the data into your python environment:

import pandas as pd; data = pd.read_hdf('option_data_2017-05-17_to_2017-07-24.h5', key='data')

```
%load_ext watermark
%watermark
import sys
import os
import pandas as pd
pd.options.display.float_format = '{:,.4f}'.format
import numpy as np
import scipy.stats as stats
import pymc3 as pm
from mpl_toolkits import mplot3d
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
plt.style.use('seaborn-muted')
import plotnine as pn
import mizani.breaks as mzb
import mizani.formatters as mzf
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")
p=print
p()
%watermark -p pymc3,pandas,pandas_datareader,numpy,scipy,matplotlib,seaborn,plotnine
```

```
# convenience functions
# add spread as percentage of spot
def create_spread_percent(df):
return (df.assign(spread_pct = lambda df: df.spread / df.askPrice))
# add instrinsic values
def create_intrinsic(df):
# create intrinsic value column
call_intrinsic = df.query('optionType == "Call"').loc[:, 'underlyingPrice']\
- df.query('optionType == "Call"').loc[:, 'strikePrice']
put_intrinsic = df.query('optionType == "Put"').loc[:, 'strikePrice']\
- df.query('optionType == "Put"').loc[:, 'underlyingPrice']
df['intrinsic_value'] = [np.nan] * df.shape[0]
(df.loc[df['optionType'] == "Call", ['intrinsic_value']]) = call_intrinsic
(df.loc[df['optionType'] == "Put", ['intrinsic_value']]) = put_intrinsic
return df
# fn: code adapted from https://github.com/jonsedar/pymc3_vs_pystan/blob/master/convenience_functions.py
def custom_describe(df, nidx=3, nfeats=20):
''' Concat transposed topN rows, numerical desc & dtypes '''
print(df.shape)
nrows = df.shape[0]
rndidx = np.random.randint(0,len(df),nidx)
dfdesc = df.describe().T
for col in ['mean','std']:
dfdesc[col] = dfdesc[col].apply(lambda x: np.round(x,2))
dfout = pd.concat((df.iloc[rndidx].T, dfdesc, df.dtypes), axis=1, join='outer')
dfout = dfout.loc[df.columns.values]
dfout.rename(columns={0:'dtype'}, inplace=True)
# add count nonNAN, min, max for string cols
nan_sum = df.isnull().sum()
dfout['count'] = nrows - nan_sum
dfout['min'] = df.min().apply(lambda x: x[:6] if type(x) == str else x)
dfout['max'] = df.max().apply(lambda x: x[:6] if type(x) == str else x)
dfout['nunique'] = df.apply(pd.Series.nunique)
dfout['nan_count'] = nan_sum
dfout['pct_nan'] = nan_sum / nrows
return dfout.iloc[:nfeats, :]
```

```
%%time
op_data = (pd.read_hdf('option_data_2017-05-17_to_2017-07-24.h5', key='data')
.dropna(subset=['underlyingPrice', 'spread', 'askPrice'])
.pipe(create_spread_percent)
.pipe(create_intrinsic)
.reset_index(drop=True))
### filter by moneyness
# within 20% of strike in either direction
def filter_by_moneyness(df, pct_cutoff=0.2):
crit1 = (1-pct_cutoff)*df.strikePrice < df.underlyingPrice
crit2 = df.underlyingPrice < (1+pct_cutoff)*df.strikePrice
return (df.loc[crit1 & crit2].reset_index(drop=True))
data = filter_by_moneyness(op_data)
data_describe = custom_describe(data)
data_describe
```

```
sprd_by_dtm = (data.groupby(['symbol', 'daysToExpiration', 'optionType'],
as_index=False)['spread_pct'].median()
.groupby(['daysToExpiration', 'optionType'], as_index=False).median()
.assign(bins = lambda x: pd.qcut(x.daysToExpiration, 10, labels=False)))
sprd_by_dtm.sample(5)
```

```
def plot_spread_dtm(sprd_by_dtm):
"""
given df plot scatter with regression line
# Params
df: pd.DataFrame()
# Returns
g: plotnine figure
"""
g = (pn.ggplot(sprd_by_dtm, pn.aes('daysToExpiration', 'spread_pct', color='factor(bins)'))
+ pn.geom_point(pn.aes(shape='factor(bins)'))
+ pn.stat_smooth(method='glm')
+ pn.scale_y_continuous(breaks=mzb.mpl_breaks(),
labels=mzf.percent_format(),
limits=(0, sprd_by_dtm.spread_pct.max()))
+ pn.scale_x_continuous(breaks=range(0, sprd_by_dtm.daysToExpiration.max(), 50),
limits=(0, sprd_by_dtm.daysToExpiration.max()))
+ pn.theme_linedraw()
+ pn.theme(figure_size=(12,6), panel_background=pn.element_rect(fill='black'),
axis_text_x=pn.element_text(rotation=50),)
+ pn.ylab('bid-ask spread')
+ pn.ggtitle('Option Spread by DTM'))
return g
# ------------------------------
# Example use of func for both calls and puts
g = plot_spread_dtm(sprd_by_dtm)
g.save(filename='call-put option bid-ask spreads - daysToExpiration scatter plot-PERCENT.png')
g.draw();
```

What jumps out at me is how large the spread is as a percentage of the option's ask price as you move closer to expiration. From ~220 days and below (or bin 4.5+) the pattern appears to show a a significant increase in spreads. With days to expiration longer than ~220 both calls and puts show a flattening.

My first guess as to what could cause this pattern is that, as the contract expiration approaches, the probability of being ITM is low for a vast majority of contracts. As a result the demand from market participants dries up so the cost to the market maker increases and to compensate spreads widen. I welcome any insight readers may have on this.

```
median_sprd = data.groupby(['symbol', 'daysToExpiration', 'optionType'],
as_index=False)['spread_pct'].median()
test_syms = ['SPY', 'DIA', 'QQQ', 'TLT', 'GLD', 'USO', 'SLV', 'XLF']
sel_med_sprd = median_sprd.query('symbol in @test_syms').dropna(subset=['spread_pct'])
# to plot symbols have to cast to type str
sel_med_sprd.symbol = sel_med_sprd.symbol.astype(str)
p(sel_med_sprd.head())
p()
p(sel_med_sprd.info())
```

```
def plot_boxplot(df, x, y, optionType='Call'):
"""given df plot boxplot
# Params
df: pd.DataFrame()
x: str(), column
y: str(), column
optionType: str()
# Returns
g: plotnine figure
"""
df = df.query('optionType == @optionType')
g = (pn.ggplot(df, pn.aes(x, y, color=f'factor({x})'))
+ pn.geom_boxplot()
+ pn.scale_y_continuous(breaks=mzb.minor_breaks(10),
labels=mzf.percent_format(),
limits=(0., 1.))
+ pn.theme_linedraw()
+ pn.theme(figure_size=(12,6), panel_background=pn.element_rect(fill='black'))
+ pn.ylab('bid-ask spread')
+ pn.ggtitle(f'Selected Symbol {optionType} Option Spreads'))
return g
# ------------------------------
# example of box plot function
g = plot_boxplot(sel_med_sprd, 'symbol', 'spread_pct')
g.save(filename='call-option bid-ask spreads - boxplot-PERCENT.png')
g.draw();
```

From these two plots we can see that the bulk of the bid-ask spreads are below 15% for both calls and puts. I find it interesting that for calls SLV, and XLF have more extreme tails than the others. DIA and XLF calls also appear to be priced consistently higher than the other symbols.

Looking at the put options we see DIA is more expensive with more extreme values than any other symbol. The tails for SPY, TLT, QQQ, and GLD are more extreme/dispersed than their call option counterparts.

```
grp_cols = ['symbol', 'daysToExpiration', 'optionType']
agg_cols = ['spread_pct', 'openInterest', 'volume', 'volatility', 'intrinsic_value']
median_sprd = data.groupby(grp_cols, as_index=False)[agg_cols].median()
test_syms = ['SPY', 'DIA', 'QQQ', 'TLT', 'GLD', 'USO', 'SLV', 'XLF']
sel_med_sprd = (median_sprd.query('symbol in @test_syms')
.dropna(subset=['spread_pct', 'openInterest']))
# to plot symbols have to cast to type str
sel_med_sprd.symbol = sel_med_sprd.symbol.astype(str)
p(sel_med_sprd.head())
p()
p(sel_med_sprd.info())
```

```
def plot_log_points(df, x, y, color='factor(symbol)', size='openInterest'):
g = (pn.ggplot(df, pn.aes(x, y, color=color))
+ pn.geom_point(pn.aes(size=size, shape='factor(symbol)'), alpha=0.75, stroke=.75)
+ pn.geom_hline(yintercept=pm.hpd(df[y]), size=2, color='red')
+ pn.scale_x_log10(breaks=[0,0.5,1,10,100,250,500,750,1_000])
+ pn.theme_linedraw()
+ pn.theme(figure_size=(12,6), panel_background=pn.element_rect(fill='black'),
axis_text_x=pn.element_text(rotation=50))
+ pn.scale_y_continuous(breaks=mzb.minor_breaks(10),
labels=mzf.percent_format(),
limits=(0., 1.))
+ pn.ylab('bid-ask spread'))
return g
# ------------------------------
df = sel_med_sprd.copy()
# example with both call and puts
g = plot_log_points(df, x='volume', y='spread_pct')
g.save(filename='call-put option bid-ask spreads - volume scatter plot-PERCENT.png')
g.draw();
```

The red lines indicate the 95% interval for the data. We can see that the two plots are very similar except for minor cosmetic differences. Looking at the puts It still appears that, as volume increases the spreads are compressed a bit more than the calls even though the 95% intervals are nearly identical. Looking at the calls, there appears to be more extreme values at lower volumes than the puts.

Furthermore it appears that in this admittedly small sampling, spreads decline as open-interest and volume increase. This should not be surprising to readers, but it is noteworthy. The hypothesized mechanism for this is simple, as volume/open-interest increase, it becomes less risky for market-makers to provide their services, thus lowering the overall cost to trade.

The following two plots make this point a little bit clearer...

```
def facet_plot_log_points(df, x, y, color='factor(symbol)', size='openInterest'):
g = (pn.ggplot(df, pn.aes(x, y, color=color))
+ pn.geom_point(pn.aes(size=size, shape='factor(symbol)'), alpha=0.75, stroke=.75)
+ pn.stat_smooth(method='loess')
+ pn.scale_x_log10(breaks=[0,0.5,1,10,100,250,500,750,1_000])
+ pn.theme_linedraw()
+ pn.theme(figure_size=(12,6), panel_background=pn.element_rect(fill='black'),
axis_text_x=pn.element_text(rotation=50))
+ pn.scale_y_continuous(breaks=mzb.minor_breaks(5),
labels=mzf.percent_format(),
limits=(0., 1.))
+ pn.facet_wrap('~symbol', ncol=2)
+ pn.ylab('bid-ask spread'))
return g
# ------------------------------
# example use
g = facet_plot_log_points(df.query('optionType=="Call"'), x='volume', y='spread_pct')
g.save(filename='FACET-call option bid-ask spreads - volume scatter plot-PERCENT.png')
g.draw();
```

You could argue that the above plots show that market makers overall are pretty good at keeping spreads low regardless of the volume.

Also notice how much volume/open-interest there is in USO; both calls and puts are traded at a sharply higher volume than the other symbols. Next closest appears to be SLV, with XLF having some very popular contracts functioning as outliers. DIA and TLT appear to be least traded however DIA appears to be priced most inefficiently compared to the other symbols.

```
def facet_plot_points(df, x, y, color='factor(symbol)', size='openInterest'):
g = (pn.ggplot(df, pn.aes(x, y, color=color))
+ pn.geom_point(pn.aes(size=size, shape='factor(symbol)'), alpha=0.75, stroke=.75)
+ pn.stat_smooth(method='loess')
+ pn.theme_linedraw()
+ pn.theme(figure_size=(12,6), panel_background=pn.element_rect(fill='black'),
axis_text_x=pn.element_text(rotation=50))
+ pn.scale_y_continuous(breaks=mzb.minor_breaks(5),
labels=mzf.percent_format(),
limits=(0., 1.))
+ pn.facet_wrap('~symbol', ncol=2)
+ pn.ylab('bid-ask spread'))
return g
# ------------------------------
# example use
g = facet_plot_points(df, 'volatility', 'spread_pct')
g.save(filename='FACET-call-put option bid-ask spreads - volatility scatter plot.png')
g.draw();
```

*In aggregate* it appears that there is some relationship between volatility and spreads, with DIA, SPY, USO, SLV, TLT, and XLF showing increases in spreads co-occurring with increases in volatility. However, the relationship looks more tenuous when we disaggregate the options into calls and puts. For example USO calls appear to show a relationship between spreads and volatility quite clearly, but USO puts show no relationship at all. The same can be said about SLV, and XLF.

- Spreads increase dramatically as the contract nears expiration. The exact cause of this is only speculative and worthy of more investigation.
- Examining selected symbols, it appears that most of the contracts are priced competitively with each other with DIA and XLF showing the most extreme outliers.
- USO options have high interest from market participants as both calls and puts are traded at a higher volume.
- The sample size is too small to conclude anything about volatility and spreads. This relationship needs to be researched further, as common wisdom suggests spreads get wider as volatility increases. Is that true in aggregate, for calls or puts? Is that relationship stronger intraday? Would it even show up in daily or weekly samplings?

- The Objective
- The Data
- Basic Data Analysis
- Bid-Ask Spread Analysis
- How Do Aggregate Bid-Ask Spreads Vary with Days To Expiration?
- How Do Bid-Ask Spreads Vary with Volume?
- How Do Bid-Ask Spreads Vary with Volatility?

- Summary Conclusions

Compared to the equity market, the options market is a level up in complexity. For each symbol there are multiple expiration dates, strike prices for each expiration date, implied volatilities, and that's before we get to the option greeks.

The increased complexity presents us with more opportunity. More complexity means less ground truth, more errors, more gaps, and more structural asymmetries. Consider that THE dominant factor underlying options pricing - implied volatility - cannot be directly measured only estimated! To estimate it requires other observable factors and a pricing **model. **We already know *"All models are wrong. Some are Useful" *thus there are opportunities to exploit the errors of others. To do that requires a better understanding than our competitors thus beginning our study of the options market.

This is the next step in the series for developing an options trading dashboard using Python and Python based tools. Thus far I have demonstrated two methods [1] [2] of scraping the necessary data. Now that the data has been collecting for a bit we can begin some initial exploratory analysis. As this is a purpose driven process we should set an objective for our study.

In this particular article I want to focus on exploring bid-ask spreads as that data is often unavailable for free.

The data is a cleaned **hdf5**/**.h5** file comprised of a collection of daily options data collected over the period of 05/17/2017 to 07/24/2017. By cleaned I mean I aggregated the daily data into one set, removed some unnecessary columns, cleaned up the data types and added the underlying ETF prices from Yahoo. I make no claims about the accuracy of the data itself, and I present it as is. It is approximately a 1 GB in size and I have made it available for download at the following link:

Options Data

To import the data into your python environment:

import pandas as pd; data = pd.read_hdf('option_data_2017-05-17_to_2017-07-24.h5', key='data')

First the package imports.

```
%load_ext watermark
%watermark
import sys
import os
import pandas as pd
pd.options.display.float_format = '{:,.4f}'.format
import numpy as np
import scipy.stats as stats
import pymc3 as pm
from mpl_toolkits import mplot3d
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
plt.style.use('seaborn-muted')
import plotnine as pn
import mizani.breaks as mzb
import mizani.formatters as mzf
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")
p=print
p()
%watermark -p pymc3,pandas,pandas_datareader,numpy,scipy,matplotlib,seaborn,plotnine
```

Some convenience functions...

```
# convenience functions
# add instrinsic values
def create_intrinsic(df):
# create intrinsic value column
call_intrinsic = df.query('optionType == "Call"').loc[:, 'underlyingPrice']\
- df.query('optionType == "Call"').loc[:, 'strikePrice']
put_intrinsic = df.query('optionType == "Put"').loc[:, 'strikePrice']\
- df.query('optionType == "Put"').loc[:, 'underlyingPrice']
df['intrinsic_value'] = [np.nan] * df.shape[0]
(df.loc[df['optionType'] == "Call", ['intrinsic_value']]) = call_intrinsic
(df.loc[df['optionType'] == "Put", ['intrinsic_value']]) = put_intrinsic
return df
# fn: code adapted from https://github.com/jonsedar/pymc3_vs_pystan/blob/master/convenience_functions.py
def custom_describe(df, nidx=3, nfeats=20):
''' Concat transposed topN rows, numerical desc & dtypes '''
print(df.shape)
nrows = df.shape[0]
rndidx = np.random.randint(0,len(df),nidx)
dfdesc = df.describe().T
for col in ['mean','std']:
dfdesc[col] = dfdesc[col].apply(lambda x: np.round(x,2))
dfout = pd.concat((df.iloc[rndidx].T, dfdesc, df.dtypes), axis=1, join='outer')
dfout = dfout.loc[df.columns.values]
dfout.rename(columns={0:'dtype'}, inplace=True)
# add count nonNAN, min, max for string cols
nan_sum = df.isnull().sum()
dfout['count'] = nrows - nan_sum
dfout['min'] = df.min().apply(lambda x: x[:6] if type(x) == str else x)
dfout['max'] = df.max().apply(lambda x: x[:6] if type(x) == str else x)
dfout['nunique'] = df.apply(pd.Series.nunique)
dfout['nan_count'] = nan_sum
dfout['pct_nan'] = nan_sum / nrows
return dfout.iloc[:nfeats, :]
```

Let's import the data and view some basic info...

```
sprd_by_dtm = (data.groupby(['symbol', 'daysToExpiration', 'optionType'],
as_index=False)['spread'].median()
.groupby(['daysToExpiration', 'optionType'], as_index=False).median()
.assign(bins = lambda x: pd.qcut(x.daysToExpiration, 10, labels=False)))
sprd_by_dtm.head()
```

Let's define a convenience function to plot the data.

```
def plot_spread_dtm(sprd_by_dtm):
"""given df plot scatter with regression line
# Params
df: pd.DataFrame()
# Returns
g: plotnine figure
"""
g = (pn.ggplot(sprd_by_dtm, pn.aes('daysToExpiration', 'spread', color='factor(bins)'))
+ pn.geom_point(pn.aes(shape='factor(bins)'))
+ pn.stat_smooth(method='lm')
+ pn.scale_y_continuous(breaks=range(0, int(sprd_by_dtm.spread.max()+2)),
labels=mzf.currency_format(), limits=(0, sprd_by_dtm.spread.max()))
+ pn.scale_x_continuous(breaks=range(0, sprd_by_dtm.daysToExpiration.max(), 50),
limits=(0, sprd_by_dtm.daysToExpiration.max()))
+ pn.theme_linedraw()
+ pn.theme(figure_size=(12,6), panel_background=pn.element_rect(fill='black'),
axis_text_x=pn.element_text(rotation=50),)
+ pn.ylab('bid-ask spread')
+ pn.ggtitle('Option Spread by DTM'))
return g
```

```
# Example use of func for both calls and puts
g = plot_spread_dtm(sprd_by_dtm)
g.save(filename='call-put option bid-ask spreads - daysToExpiration scatter plot.png')
g.draw();
```

Some things are interesting. From ~250 through ~600 days in both call and put options the bid-ask spreads are compressed towards zero. There also appears to be less dispersion in put bid-ask spreads overall.

We can look at a few select ETFs.

```
median_sprd = data.groupby(['symbol', 'daysToExpiration', 'optionType'],
as_index=False)['spread'].median()
test_syms = ['SPY', 'DIA', 'QQQ', 'TLT', 'GLD', 'USO', 'SLV', 'XLF']
sel_med_sprd = median_sprd.query('symbol in @test_syms').dropna(subset=['spread'])
# to plot symbols have to cast to type str
sel_med_sprd.symbol = sel_med_sprd.symbol.astype(str)
p(sel_med_sprd.head())
p(sel_med_sprd.info())
```

A convenience plotting function for boxplots.

```
def plot_boxplot(df, x, y, optionType='Call'):
"""given df plot boxplot
# Params
df: pd.DataFrame()
x: str(), column
y: str(), column
optionType: str()
# Returns
g: plotnine figure
"""
df = df.query('optionType == @optionType')
g = (pn.ggplot(df, pn.aes(x, y, color=f'factor({x})'))
+ pn.geom_boxplot()
+ pn.theme_linedraw()
+ pn.theme(figure_size=(12,6), panel_background=pn.element_rect(fill='black'))
+ pn.ylab('bid-ask spread')
+ pn.ggtitle(f'Selected Symbol {optionType} Option Spreads'))
return g
```

```
g = plot_boxplot(sel_med_sprd, 'symbol', 'spread')
g.save(filename='call-option bid-ask spreads - boxplot.pdf')
g.draw();
```

Looking at these plots we see further evidence of bid-ask spreads showing less dispersion across puts vs calls. Also it's surprising to see DIA options having such a wide range of values compared to SPY and QQQ; this is especially true for the call options.

```
grp_cols = ['symbol', 'daysToExpiration', 'optionType']
agg_cols = ['spread', 'openInterest', 'volume', 'volatility', 'intrinsic_value']
median_sprd = data.groupby(grp_cols, as_index=False)[agg_cols].median()
test_syms = ['SPY', 'DIA', 'QQQ', 'TLT', 'GLD', 'USO', 'SLV', 'XLF']
sel_med_sprd = (median_sprd.query('symbol in @test_syms')
.dropna(subset=['spread', 'openInterest']))
# to plot symbols have to cast to type str
sel_med_sprd.symbol = sel_med_sprd.symbol.astype(str)
p(sel_med_sprd.head())
p(sel_med_sprd.info())
```

A convenience function for plotting...

```
def plot_log_points(df, x, y, color='factor(symbol)', size='openInterest'):
g = (pn.ggplot(df, pn.aes(x, y, color=color))
+ pn.geom_point(pn.aes(size=size))
+ pn.scale_x_log10(breaks=[0,0.5,1,10,100,250,500,750,1_000])
+ pn.theme_linedraw()
+ pn.theme(figure_size=(12,6), panel_background=pn.element_rect(fill='black'),
axis_text_x=pn.element_text(rotation=50))
+ pn.scale_y_continuous(breaks=range(0, int(df.spread.max()+2)),
labels=mzf.currency_format(), limits=(0, df.spread.max()))
+ pn.ylab('bid-ask spread'))
return g
```

```
df = sel_med_sprd.copy()
# example with both call and puts
g = plot_log_points(df, x='volume', y='spread')
g.save(filename='call-put option bid-ask spreads - volume scatter plot.png')
g.draw();
```

Again we see put bid-ask spreads squeezed towards zero even as volume increases. We also see SPY and USO with small spreads as both volume and open interest increases. This suggests there are symbols/contracts with higher relative trading capacity.

```
# example with both call and puts
g = plot_log_points(df, 'volatility', 'spread')
g.save(filename='call-put option bid-ask spreads - volatility scatter plot.png')
g.draw();
```

Some notes. DIA again appears to have the highest dispersion in bid-ask spreads for both calls and puts. GLD is also notable. It is also somewhat surprising that for these selected ETFs increased volatility doesn't appear with increased bid-ask spreads.

- Put options have less overall dispersion in bid-ask spreads than calls relative to days to expiration, volume, and volatility.
- Bid-ask spreads have a major compression range between ~250 to ~600 days to maturity that appear smaller than all other buckets.
- Bid-ask spreads show greater dispersion at lower levels of implied volatility.
- DIA in particular shows the greatest variability in bid-ask spreads of the selected ETFs.
- SPY and USO show high capacity as bid-ask spreads remain near zero even at elevated volume and open interest levels.

**Recap****The Problem****The Solution****Barchart Scraper Class****Barchart Parser Class****Utility Functions****Putting it all together****The Simple Trick****Next Steps**

In the previous post I revealed a web scraping trick that allows us to defeat AJAX/JavaScript based web pages and extract the tables we need. We also covered how to use that trick to scrape a large volume of options prices quickly and asynchronously using the combination of **aiohttp** and** asyncio**.

It worked beautifully until... I told people about it. Shortly after publishing, my code stopped functioning. After investigating, it was clear no data was being returned during the aiohttp call to the Barchart server. I attempted to fix the code by adding the **semaphore** option to the asyncio call. Roughly speaking, in this context the semaphore option allows you to specify the max number of calls that can be made simultaneously. I tried, 100, 50, 10, 2 and they all failed.

I do not know what happened for sure, but if I had to guess, the increase in server loads per unit time measure, was significant enough for Barchart system/network staff to update their server settings and squash the multiple simultaneous calls.

We simply build a sequential scraper instead of an asynchronous one. To make it more robust we have to add a simple twist to the code that makes it more difficult to diagnose human vs automated traffic.

This class is similar to the previous version except asyncio is stripped out. It's main function is to create the POST url, call the server and return the response data. Please note, I tested this class with a dynamic referer symbol and random user agents and this simple hardcoded setup has worked most consistently for me.

```
import requests as r
class barchart_scraper:
def __init__(self, symbol):
self.__request_headers = {
"Accept":"application/json",
"Accept-Encoding":"gzip, deflate, sdch, br",
"Accept-Language":"en-US,en;q=0.8",
"Connection":"keep-alive",
"Host":"core-api.barchart.com",
"Origin":"https://www.barchart.com",
"Referer":"https://www.barchart.com/etfs-funds/quotes/SPY/options",
"User-Agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.81 Safari/537.36",
}
self.__base_url_str = 'https://core-api.barchart.com/v1/options/chain?symbol={}&fields=strikePrice%2ClastPrice%2CpercentFromLast%2CbidPrice%2Cmidpoint%2CaskPrice%2CpriceChange%2CpercentChange%2Cvolatility%2Cvolume%2CopenInterest%2CoptionType%2CdaysToExpiration%2CexpirationDate%2CsymbolCode%2CsymbolType&groupBy=optionType&raw=1&meta=field.shortName%2Cfield.type%2Cfield.description'
self.__expiry_url_str = "https://core-api.barchart.com/v1/options/chain?symbol={}&fields=strikePrice%2ClastPrice%2CpercentFromLast%2CbidPrice%2Cmidpoint%2CaskPrice%2CpriceChange%2CpercentChange%2Cvolatility%2Cvolume%2CopenInterest%2CoptionType%2CdaysToExpiration%2CexpirationDate%2CsymbolCode%2CsymbolType&groupBy=optionType&expirationDate={}&raw=1&meta=field.shortName%2Cfield.type%2Cfield.description"
self.symbol = symbol
# ------------------------------------------------
def _construct_url(self):
return self.__base_url_str.format(self.symbol)
def _construct_expiry_url(self, expiry):
return self.__expiry_url_str.format(self.symbol, expiry)
# ------------------------------------------------
def post_url(self, expiry=None):
if not expiry:
return r.post(
url = self._construct_url(),
headers = self.__request_headers
)
else:
return r.post(
url = self._construct_expiry_url(expiry=expiry),
headers = self.__request_headers
)
# ------------------------------------------------
def get_expirys(self, response):
return response.json()['meta']['expirations']
```

This class is essentially identical to the previous parser class and simply extracts call/put data into pandas dataframes.

```
import pandas as pd
import numpy as np
class barchart_parser:
def __init__(self, symbol, response):
self.symbol = symbol
self.response = response
# ------------------------------------------------
# create call df
def create_call_df(self):
"""fn: to create call df"""
json_calls = self.response.json()['data']['Call']
list_dfs = []
for quote in json_calls:
list_dfs.append(pd.DataFrame.from_dict(quote['raw'], orient='index'))
df = (
pd.concat(list_dfs, axis=1).T.reset_index(drop=True)
.replace('NA', np.nan)
.apply(pd.to_numeric, errors='ignore')
.assign(expirationDate = lambda x: pd.to_datetime(x['expirationDate']))
)
df['symbol'] = [self.symbol] * len(df.index)
return df
# ------------------------------------------------
# create put df
def create_put_df(self):
"""fn: to create put df"""
json_puts = self.response.json()['data']['Put']
list_dfs = []
for quote in json_puts:
list_dfs.append(pd.DataFrame.from_dict(quote['raw'], orient='index'))
df = (
pd.concat(list_dfs, axis=1).T.reset_index(drop=True)
.replace('NA', np.nan)
.apply(pd.to_numeric, errors='ignore')
.assign(expirationDate = lambda x: pd.to_datetime(x['expirationDate']))
)
df['symbol'] = [self.symbol] * len(df.index)
return df
```

Next we devise 2 utility functions. The first function is simply a convenience function to run the first iteration of the scraper. We need to do that for each symbol in order to extract the expiration dates dynamically.

```
def get_first_data(symbol):
"""fn: to get first data and extract expiry dates"""
# scrape
scraper = barchart_scraper(symbol)
response = scraper.post_url()
expirys = scraper.get_expirys(response)
# parse response
parser = barchart_parser(symbol, response)
first_call_df = parser.create_call_df()
first_put_df = parser.create_put_df()
# merge calls + puts
first_concat = pd.concat([first_call_df, first_put_df], axis=0)
return first_concat, expirys
```

The second function is a little lambda function that gets the symbol's last daily price from Google Finance which we add to our dataset before saving to disk.

```
get_price = lambda symbol: web.DataReader(
symbol, 'google', today - 1*BDay(), today)['Close']
```

Next we can implement the main script body. Essentially it runs a main loop and an inner loop. For each symbol get the default first data, extract the expirys, and then for each expiration extract the data. At the end of the inner loop, all data for that symbol is concatenated and then appended to a list containing all the symbols' dataframes. Finally all the symbols dataframes are concatenated and saved to hdf.

```
import requests as r
import pandas as pd
import pandas_datareader.data as web
from pandas.tseries.offsets import BDay
import numpy as np
import time
from tqdm import tqdm
from barchart_scraper import barchart_scraper
from barchart_parser import barchart_parser
today = pd.datetime.today().date()
project_dir = '/YOUR/PROJECT/DIR'
# -----------------------------------------------------------------------------
# define utility functions
# -----------------------------------------------------------------------------
def get_first_data(symbol):
"""fn: to get first data and extract expiry dates"""
# scrape
scraper = barchart_scraper(symbol)
response = scraper.post_url()
expirys = scraper.get_expirys(response)
# parse response
parser = barchart_parser(symbol, response)
first_call_df = parser.create_call_df()
first_put_df = parser.create_put_df()
# merge calls + puts
first_concat = pd.concat([first_call_df, first_put_df], axis=0)
return first_concat, expirys
# function to get last daily close from Google Finance
get_price = lambda symbol: web.DataReader(
symbol, 'google', today - 1*BDay(), today)['Close']
# -----------------------------------------------------------------------------
# import symbols
# -----------------------------------------------------------------------------
FILE = project_dir + 'ETFList.Options.Nasdaq__M.csv'
ALL_ETFS = pd.read_csv(FILE)['Symbol']
drop_symbols = ['ADRE', 'AUNZ', 'CGW', 'DGT', 'DSI', \
'EMIF', 'EPHE', 'EPU', 'EUSA', 'FAN', \
'FDD', 'FRN', 'GAF', 'GII', 'GLDI', 'GRU', \
'GUNR', 'ICN', 'INXX', 'IYY', 'KLD', 'KWT', \
'KXI', 'MINT', 'NLR', 'PBP', 'PBS', 'PEJ', \
'PIO', 'PWB', 'PWV', 'SCHO', 'SCHR', 'SCPB', \
'SDOG', 'SHM', 'SHV', 'THRK', 'TLO', 'UHN', \
'USCI', 'USV', 'VCSH']
ETFS = [x for x in ALL_ETFS if x not in set(drop_symbols)]
# -----------------------------------------------------------------------------
# run main script body
#
# loop through all etfs
# loop through expirys for each etf
# -----------------------------------------------------------------------------
t0 = time.time()
all_etfs_data = []
error_symbols = []
for symbol in tqdm(ETFS):
print()
print('-'*79)
print('scraping: ', symbol)
try:
last_close_price = get_price(symbol).iloc[0]
first_concat, expirys = get_first_data(symbol)
list_dfs_by_expiry = []
list_dfs_by_expiry.append(first_concat)
for expiry in tqdm(expirys[1:]):
print()
print('scraping expiry: ', expiry)
scraper = barchart_scraper(symbol)
tmp_response = scraper.post_url(expiry=expiry)
print('parsing... ')
parser = barchart_parser(symbol, tmp_response)
call_df = parser.create_call_df()
put_df = parser.create_put_df()
concat = pd.concat([call_df, put_df], axis=0)
concat['underlyingPrice'] = [last_close_price] * concat.shape[0]
list_dfs_by_expiry.append(concat)
print('parsing complete')
random_wait = np.random.choice([1,1.25,2.5,3], p=[0.3,0.3,0.25,0.15])
time.sleep(random_wait)
all_etfs_data.append(pd.concat(list_dfs_by_expiry, axis=0))
except Exception as e:
error_symbols.append(symbol)
print(f'symbol: {symbol}\n error: {e}')
print()
continue
# -----------------------------------------------------------------------------
duration = time.time() - t0
print(f'script run time: ', pd.to_timedelta(duration, unit='s'))
dfx = pd.concat(all_etfs_data, axis=0)
print(dfx.head())
print(dfx.info())
print(f'error symbols:\n{error_symbols}')
# -----------------------------------------------------------------------------
# store table as hdf
# -----------------------------------------------------------------------------
today = pd.datetime.today().date()
file_ = project_dir + f'/Barchart_Options_Data/ETF_options_data_{today}.h5'
dfx.to_hdf(file_, key='data', format='table', mode='w')
# -----------------------------------------------------------------------------
# kill python process after running script to prevent leakage
# -----------------------------------------------------------------------------
time.sleep(5)
os.kill(os.getpid(), 9)
```

Did you notice the **random_wait** at the end of the inner loop? We simply pass an array of reasonable wait times *(measured in seconds)* and their probabilities to numpy's **random_choice()** and pass the result to the **time.sleep()** function before iterating to the next symbol. This isn't guaranteed to always work, but in cases where servers *may* be restricting traffic loads it makes it much harder to identify your traffic as automated.

Ultimately, it's also a respectful way to operate our scraper.

Next up in the series I plan to explore the data collected over the last 6 weeks I've been running this script. I hope to explore multiple angles and dynamics in the data.

Do you have any suggestions for exploration topics? If so, leave a comment or contact me via email or twitter.

]]>**Intro****Disclaimers****The Secret to Scraping AJAX Sites****The async_option_scraper script****first_async_scraper class****expirys class****xp_async_scraper class****last_price_scraper class**

**The option_parser Module****The Implementation Script****References**

This is Part 1 of a new series I'm doing in semi real-time to build a functional options data dashboard using Python. There are many underlying motivations to attempt this, and several challenges to implementing a tool like this from scratch.

- Where to get the data? Is it affordable? Easily accessible? API?
- How to parse the results?
- How to aggregate and organize the data for analysis?
- How to store the data? TXT, CSV, SQL database, HDF5??
- How often should it run?
- How to display the data? What dynamic graphic library to use? D3.js, MPL3d, Plotly, Bokeh, etc.?

These are some of the problems that need to be solved in order to create the tool.

In this post I show a current working solution to where to get the data, how to scrape it, how to parse it, and a storage method for fast read write access. We will scrape Barchart.com's basic option quotes using **aiohttp** and **asyncio**, both are included in Python 3.6 standard library. We will parse it using **Pandas** and **Numpy** and store the data in the **HDF5** file format.

This is primarily an academic exercise. I have no intent to harm or cause others to harm Barchart.com or its vendors. My belief is that, by facilitating knowledge sharing, we will increase the number of educated participants in the options markets; thereby increasing the total addressable market for businesses like Barchart and its vendors. By designing tools like this we improve our own understanding of the use cases and applications (option valuation and trading) and can provide better feedback to those in the product development process.

First let's create a mental model of what AJAX really is.

So looking at this, we can say AJAX is a set of web development techniques to increase the efficiency and user experience during website interaction. For example, you go to a website with cool data tables on it. You want to change one of the filters on the data so you select the option you want and click. What happens from there?

In simply designed or older websites your request would be sent to the server, then to update the data table with your selected filters would require the server response to reload the entire page. This is inefficient for many reasons but one is that, often the element in need of updating is only a fraction of the entire webpage.

AJAX allows websites to send requests to the server and update page elements on an element by element basis negating the need for reloading the entire page every time you interact with the page.

This improvement in efficiency comes at the added cost of complexity, for web designers and developers and for web scrapers. Generally speaking the url you use to go to an AJAX page is not the actual url that gets sent to the server to load the page you view.

To build this understanding, let's look at a sample option quote page using the following link <https://www.barchart.com/stocks/quotes/spy/options>.

Warning: To follow along with the rest of this example you need access to developer mode in Chrome or its equivalent in other browsers.

Let's look behind the curtain so to speak. Click anywhere in the page and click inspect. Navigate to the Network tab in Chrome developer tools.

We're going to press F5 to reload the page and look for the following: Request Headers, and the Request URL.

We will need the **Request URL **and the **Request Headers** in order to construct our calls to the server a little later. Simply put, this is the secret! We can replicate our browser's behavior when it requests data from the server if we know the *actual *request url and the request headers. This will be made clearer in the next section.

This is the key module for scraping the data. First the imports.

```
import asyncio
import aiohttp
```

If you noticed when the page loads, it loads the nearest expiration date by default.

We know there are generally multiple expiration dates per symbol. However, some ETFs have weekly contracts, monthly, and/or quarterly. Instead of guessing the expiration dates, the **first_async_scraper** class scrapes the default pages so we can later extract the expiration dates directly from the page's JSON/dict response.

This class takes no initialization parameters.

```
# ================================================
# for first run only
class first_async_scraper:
def __init__(self):
pass
async def _fetch(self, symbol, url, session, headers):
"""fn: to retrieve option quotes as JSON
Params:
symbol : str(), ETF
url : str(), request url
session : aiohttp.ClientSession() object
headers : dict() containing header info
Returns:
response : JSON/Python Dict
"""
async with session.post(url.format(symbol), headers=headers) as response:
return await response.json(content_type=None)
async def run(self, symbols, user_agent):
"""fn: to aggregate response option quotes
Params:
symbols : list of str(), ETF symbols
user_agent : str()
Returns:
responses : list of JSON
"""
url = 'https://core-api.barchart.com/v1/options/chain?symbol={}&fields=strikePrice%2ClastPrice%2CpercentFromLast%2CbidPrice%2Cmidpoint%2CaskPrice%2CpriceChange%2CpercentChange%2Cvolatility%2Cvolume%2CopenInterest%2CoptionType%2CdaysToExpiration%2CexpirationDate%2CsymbolCode%2CsymbolType&groupBy=optionType&raw=1&meta=field.shortName%2Cfield.type%2Cfield.description'
headers = {
"Accept":"application/json",
"Accept-Encoding":"gzip, deflate, sdch, br",
"Accept-Language":"en-US,en;q = 0.8",
"Connection":"keep-alive",
"Host":"core-api.barchart.com",
"Origin":"https://www.barchart.com",
"Referer":"https://www.barchart.com/etfs-funds/quotes/{}/options",
"User-Agent":user_agent,
}
tasks = []
async with aiohttp.ClientSession() as session:
for symbol in symbols:
headers['Referer'] = headers['Referer'].format(symbol)
task = asyncio.ensure_future(self._fetch(symbol, url, session, headers))
tasks.append(task)
# gather returns responses in original order not arrival order
# https://docs.python.org/3/library/asyncio-task.html#task-functions
responses = await asyncio.gather(*tasks)
return responses
```

The workhorse function is **run** which calls the internal function **_fetch**. Inside the run function I've hardcoded a request url similar to the one we found before. I've also hardcoded the headers we found earlier as well. Notice both objects are string formats which can be dynamically updated with our ETF symbol.

The **_fetch** function takes the ETF symbol, the url string, session object, and our request headers and makes the call to the server returning the response as a JSON /dict object.

The **run **function takes a list of symbols, and a user agent string - *more on this later.*

The aiohttp package has a very similar interface to the requests module. We first create a **ClientSession **object which acts like a context manager. After creating the session object, we loop through each symbol using the **asyncio.ensure_future** function to create and schedule the event task. The **gather** function executes the tasks asynchronously waiting until all tasks have completed. It returns a list of JSON responses, each representing one ETF.

Once we have the list of responses we need to extract the expiry dates from each page source, collecting them for later use. The class is initialized with two parameters - a list of ETF symbols, and the list of page responses from the first scrape job.

It uses two functions. The internal function **_get_dict_expiry **takes a single response object and returns the list of expirations for a single symbol. The exposed function **get_expirys** loops through the list of ETFs and responses aggregating them into a dictionary. The dictionary keys are the ETF symbols and the values are lists of expirations for that symbol.

```
# ================================================
class expirys:
def __init__(self, ETFS, first_future_result):
"""Class to extract expiration data from Dict
Params:
ETFS : list of ETF symbol str()
first_future_result : list of response objects (dict/JSON) from the first scraper
"""
self.ETFS = ETFS
self.first_future_result = first_future_result
def _get_dict_expiry(self, response):
"""fn: to get expirations from response dict
Params:
response : dict/JSON object
Returns:
list() of date str(), "YYYY-MM-DD"
"""
if response['count'] == 0:
return None
else:
return response['meta']['expirations']
def get_expirys(self):
"""fn: to create dict with k, v = symbol, list of expirys
we have to do this b/c JSON/dict response data doesn't
contain symbol identifier
Returns:
dict(symbol = list of expiry dates)
"""
from itertools import zip_longest
expirys = {}
for symbol, resp in zip_longest(self.ETFS, self.first_future_result):
# we can do this because results are in order of submission not arrival
# gather returns responses in original order not arrival order
# https://docs.python.org/3/library/asyncio-task.html#task-functions
expirys[symbol] = self._get_dict_expiry(resp)
return expirys
```

The final scraper class is nearly identical to the **first_async_scraper** except for some additional arguments for the functions **xp_run()**, and **_xp_fetch()** to accept the expiry dates. Also notice that the hard coded URL in the **xp_run** function is slightly different in that it is formatted to accept the ETF symbol and an expiration date.

```
# ================================================
# async by url + expirations
class xp_async_scraper:
def __init__(self):
pass
async def _xp_fetch(self, symbol, expiry, url, session, headers):
"""fn: to retrieve option quotes as JSON
Params:
symbol : str(), ETF
expiry : str(), "YYYY-MM-DD"
url : str(), request url
session : aiohttp.ClientSession() object
headers : dict() containing header info
Returns:
response : JSON/Python Dict
"""
async with session.post(url.format(symbol, expiry), headers=headers) as response:
return await response.json(content_type=None)
async def xp_run(self, symbol, expirys, user_agent):
"""fn: to aggregate response option quotes
Params:
symbol : str(), ETF
expirys : list of date str() "YYYY-MM-DD"
user_agent : str()
Returns:
responses : list of JSON
"""
url = "https://core-api.barchart.com/v1/options/chain?symbol={}&fields=strikePrice%2ClastPrice%2CpercentFromLast%2CbidPrice%2Cmidpoint%2CaskPrice%2CpriceChange%2CpercentChange%2Cvolatility%2Cvolume%2CopenInterest%2CoptionType%2CdaysToExpiration%2CexpirationDate%2CsymbolCode%2CsymbolType&groupBy=optionType&expirationDate={}&raw=1&meta=field.shortName%2Cfield.type%2Cfield.description"
headers = {
"Accept":"application/json",
"Accept-Encoding":"gzip, deflate, sdch, br",
"Accept-Language":"en-US,en;q=0.8",
"Connection":"keep-alive",
"Host":"core-api.barchart.com",
"Origin":"https://www.barchart.com",
"Referer":"https://www.barchart.com/etfs-funds/quotes/{}/options",
"User-Agent":user_agent,
}
tasks = []
async with aiohttp.ClientSession() as session:
for expiry in expirys:
headers['Referer'] = headers['Referer'].format(symbol)
task = asyncio.ensure_future(self._xp_fetch(symbol, expiry, url, session, headers))
tasks.append(task)
# gather returns responses in original order not arrival order
# https://docs.python.org/3/library/asyncio-task.html#task-functions
responses = await asyncio.gather(*tasks)
return responses
```

This class has the same structure and form as the other scraper classes except slightly simpler. The purpose of this class is to simply retrieve the basic html source for each ETF so that we can later extract the last quote price for the underlying equity.

```
# ================================================
# async get html page source
class last_price_scraper:
def __init__(self):
pass
async def _fetch(self, symbol, url, session):
"""fn: to retrieve option quotes as JSON
Params:
symbol : str(), ETF
url : str(), request url
session : aiohttp.ClientSession() object
Returns:
response : text object
"""
async with session.get(url.format(symbol)) as response:
return await response.text()
async def run(self, symbols):
"""fn: to aggregate response option quotes
Params:
symbols : list of str(), ETF symbols
Returns:
responses : list of text
"""
url = 'https://www.barchart.com/stocks/quotes/{}/options'
tasks = []
async with aiohttp.ClientSession() as session:
for symbol in symbols:
task = asyncio.ensure_future(self._fetch(symbol, url, session))
tasks.append(task)
# gather returns responses in original order not arrival order
# https://docs.python.org/3/library/asyncio-task.html#task-functions
responses = await asyncio.gather(*tasks)
return responses
```

Once we have all the data we need to be able to parse it for easy analysis and storage. Fortunately this is relatively simple to do with Pandas. The **option_parser.py **module contains one class-**option_parser**, and three functions-**extract_last_price(), ****create_call_df(), create_put_df()**.

The **option_parser** class is initialized with an ETF symbol and the appropriate response object. The create dataframe functions extract the call/put data from the JSON/dict response, then iterates through each quote combining them into dataframes taking care to clean the data set and change the datatypes from objects to numeric/datetime where appropriate. The **extract_last_price **function is used to get the underlying quote price from the basic html source.

```
import pandas as pd
import numpy as np
# ================================================
class option_parser:
def __init__(self, symbol, response):
self.symbol = symbol
self.response = response
# ------------------------------------------------
# extract last price from html
def extract_last_price(self, html_text):
"""fn: extract price from html"""
reg_exp = r'(?<="lastPrice":)(\d{1,3}.{1}\d{2})'
prices = re.findall(reg_exp, html_text)
if len(prices) < 1:
return np.nan
else:
return float(prices[0])
# ------------------------------------------------
# create call df
def create_call_df(self):
"""fn: to create call df"""
json_calls = self.response['data']['Call']
list_dfs = []
for quote in json_calls:
list_dfs.append(pd.DataFrame.from_dict(quote['raw'], orient='index'))
df = (
pd.concat(list_dfs, axis=1).T.reset_index(drop=True)
.replace('NA', np.nan)
.apply(pd.to_numeric, errors='ignore')
.assign(expirationDate = lambda x: pd.to_datetime(x['expirationDate']))
)
df['symbol'] = [self.symbol] * len(df.index)
return df
# ------------------------------------------------
# create put df
def create_put_df(self):
"""fn: to create put df"""
json_puts = self.response['data']['Put']
list_dfs = []
for quote in json_puts:
list_dfs.append(pd.DataFrame.from_dict(quote['raw'], orient='index'))
df = (
pd.concat(list_dfs, axis=1).T.reset_index(drop=True)
.replace('NA', np.nan)
.apply(pd.to_numeric, errors='ignore')
.assign(expirationDate = lambda x: pd.to_datetime(x['expirationDate']))
)
df['symbol'] = [self.symbol] * len(df.index)
return df
```

Finally we can combine the modules into a script and run it. Note that this script requires the **fake-useragent **package. This package has a nice feature where it generates a random user agent string on every call. We need to do this so our requests are not blocked by the server.

The script imports a list of ETF symbols originally sourced from Nasdaq. Some of these symbols don't have options data, so they are filtered out. The script runs in the following order: basic html scraper -> first async scraper -> extracts the expiry dates -> xp async scraper which aggregates all the option data -> parses the collected data into a dataframe format -> downloads and inserts any missing underlying prices -> then saves it to disk as an HDF5 file.

```
import os
import sys
import pandas as pd
import pandas_datareader.data as web
import numpy as np
import time
import asyncio
from fake_useragent import UserAgent
'''set path variables'''
project_dir = "YOUR/PROJECT/DIR"
sys.path.append(project_dir)
import async_option_scraper
import option_parser
# ================================================
today = pd.datetime.today().date()
# ================================================
file_start = time.time()
print('\nAsync Barchart Scraper starting...')
# --------------- \\\
# import symbols
FILE = project_dir + 'ETFList.Options.Nasdaq__M.csv'
ALL_ETFS = pd.read_csv(FILE)['Symbol']
drop_symbols = ['ADRE', 'AUNZ', 'CGW', 'DGT', 'DSI', 'EMIF', 'EPHE', 'EPU', 'EUSA', 'FAN', 'FDD', 'FRN', 'GAF', 'GII', 'GLDI', 'GRU', 'GUNR', 'ICN', 'INXX', 'IYY', 'KLD', 'KWT', 'KXI', 'MINT', 'NLR', 'PBP', 'PBS', 'PEJ', 'PIO', 'PWB', 'PWV', 'SCHO', 'SCHR', 'SCPB', 'SDOG', 'SHM', 'SHV', 'THRK', 'TLO', 'UHN', 'USCI', 'USV', 'VCSH']
ETFS = [x for x in ALL_ETFS if x not in set(drop_symbols)]
# ================================================
# GET HTML SOURCE FOR LAST SYMBOL EQUITY PRICE
# ================================================
t0_price = time.time()
# --------------- \\\
loop = asyncio.get_event_loop()
px_scraper = async_option_scraper.last_price_scraper()
px_run_future = asyncio.ensure_future(px_scraper.run(ETFS))
loop.run_until_complete(px_run_future)
px_run = px_run_future.result()
# ------------- ///
duration_price = time.time() - t0_price
print('\nprice scraper script run time: ',
pd.to_timedelta(duration_price, unit='s'))
# ------------- ///
# create price dictionary
px_dict = {}
for k, v in zip(ETFS, px_run):
px_dict[k] = v
# ================================================
# RUN FIRST ASYNC SCRAPER
# ================================================
t0_first = time.time()
# --------------- \\\
ua = UserAgent()
loop = asyncio.get_event_loop()
first_scraper = async_option_scraper.first_async_scraper()
first_run_future = asyncio.ensure_future(
first_scraper.run(ETFS, ua.random)
)
loop.run_until_complete(first_run_future)
first_run = first_run_future.result()
# ------------- ///
first_duration = time.time() - t0_first
print('\nfirst async scraper script run time: ',
pd.to_timedelta(first_duration, unit='s'))
# ================================================
# EXTRACT EXPIRYS FROM FIRST RUN SCRAPER
# ================================================
xp = async_option_scraper.expirys(ETFS, first_run)
expirys = xp.get_expirys()
# ================================================
# SCRAPE AND AGGREGATE ALL SYMBOLS BY EXPIRY
# ================================================
t0_xp = time.time()
# -------------- \\\
# dict key=sym, values=list of json data by expiry
# create helper logic to test if expirys is None before passing
sym_xp_dict = {}
ua = UserAgent()
xp_scraper = async_option_scraper.xp_async_scraper()
for symbol in ETFS:
print()
print('-'*50)
print('scraping: ', symbol)
if not expirys[symbol]:
print('symbol ' + symbol + ' missing expirys')
continue
try:
xp_loop = asyncio.get_event_loop()
xp_future = asyncio.ensure_future(
xp_scraper.xp_run(symbol, expirys[symbol], ua.random)
)
xp_loop.run_until_complete(xp_future)
sym_xp_dict[symbol] = xp_future.result()
except Exception as e:
print(symbol + ' error: ' + e)
# ------------- ///
duration_xp = time.time() - t0_xp
print('\nall async scraper script run time: ',
pd.to_timedelta(duration_xp, unit='s'))
# ================================================
# PARSE ALL COLLECTED DATA
# ================================================
t0_agg = time.time()
# -------------- \\\
all_etfs_data = []
for symbol, xp_list in sym_xp_dict.items():
print()
print('-'*50)
print('parsing: ', symbol)
list_dfs_by_expiry = []
try:
for i in range(len(xp_list)):
try:
parser = option_parser.option_parser(
symbol, xp_list[i])
call_df = parser.create_call_df()
put_df = parser.create_put_df()
concat = pd.concat([call_df, put_df], axis=0)
concat['underlyingPrice'] = np.repeat(
parser.extract_last_price(px_dict[symbol]),
len(concat.index))
list_dfs_by_expiry.append(concat)
except: continue
except Exception as e:
print(f'symbol: {symbol}\n error: {e}')
print()
continue
all_etfs_data.append(pd.concat(list_dfs_by_expiry, axis=0))
# ------------- ///
duration_agg = time.time() - t0_agg
print('\nagg parse data script run time: ',
pd.to_timedelta(duration_agg, unit='s'))
# -------------- \\\
dfx = pd.concat(all_etfs_data, axis=0).reset_index(drop=True)
print(dfx.info())
# ------------- ///
# ================================================
# GET ANY MISSING UNDERLYING PRICE
# ================================================
print('\nCollecting missing prices...')
grp = dfx.groupby(['symbol'])['underlyingPrice'].count()
missing_symbol_prices = grp[grp == 0].index
get_price = lambda symbol: web.DataReader(
symbol, 'google', today)['Close']
prices = []
for symbol in missing_symbol_prices:
px = get_price(symbol).iloc[0]
prices.append((symbol, px))
df_prices = pd.DataFrame(prices).set_index(0)
for symbol in df_prices.index:
(dfx.loc[dfx['symbol'] == symbol,
['underlyingPrice']]) = df_prices.loc[symbol].iloc[0]
dfx['underlyingPrice'] = dfx.underlyingPrice.astype(float)
print('\nmissing prices added')
# ================================================
# store dataframe as hdf
# ================================================
print(dfx.head(20))
print(dfx.info())
file_duration = time.time() - file_start
print('\nfile script run time: ', pd.to_timedelta(file_duration, unit='s'))
file_ = project_dir + f'/ETF_options_data_{today}.h5'
dfx.to_hdf(file_, key='data', mode='w')
# ================================================
# kill python process after running script
# ================================================
time.sleep(2)
os.kill(os.getpid(), 9)
```

Here's some sample output:

UPDATE: Here is the list of Nasdaq ETF symbols for download <ETF Symbol List CSV>

- Wikipedia.org - AJAX definition
- W3Schools.com - AJAX introduction
- Making 1 Million Requests with Python-aiohttp via https://pawelmhm.github.io - Great article on implementing asyncio with aiohttp
- Barchart.com - "Barchart, the leading provider of market data solutions for individuals and businesses."

**Recap****Webinar Hypothesis****Anaylsis/Conclusions****Jupyter (IPython) Notebook****Github Links and Resources**

Thus far in the series we've explored the idea of using Gaussian mixture models (GMM) to predict outlier returns. Specifically, we were measuring two things:

- The accuracy of the strategy implementation in predicting return distributions.
- The return pattern after an outlier event.

During the exploratory phase of this project there were some interesting results worthy of more investigation. The initial results implied that the strategy implementation was adaptable to changes in the means and volatilities of a small number of ETF's returns.

Recently I had the opportunity to present my first webinar with QuantInsti.com. I definitely have some areas for improvement, but the experience was great overall, and I learned a lot.

I chose this topic to present, and through the process I was able to refine the hypothesis, the code, and my thinking on the subject. The hypothesis is simple:

Can a GMM based strategy predict asset return distributions such that a strategy which "buys" the asset post an outlier event can "earn" a positive return?

There were a couple of takeaways from the project. Overall the strategy showed promise. What really impressed me was the difference in the sampled confidence intervals when using the Normal distribution vs. the JohnsonSU distribution. See the following example:

On the left, we have the same strategy except the sampled confidence intervals are drawn from a normal distribution. On the right we use the JohnsonSU distribution. In terms of predicted return distribution accuracy it's not even close-JohnsonSU is the clear winner, even showing an ability to adjust to periods of clustered volatility.

However note the equity curves in the example. The normal distribution wins handily but that is because the strategy is so inaccurate that it predicts outlier returns occurred ~97% of the time, so technically that would be a buy and hold strategy which benefits from the strong uptrend in SPY post 2009.

Another takeaway is that the model shows a bias towards US based ETFs. You can see that by examining the Seaborn facetgrid plots in the notebook I will share at the end. First, by aggregating the results in to a **tidy-data **format the analysis was rendered so simple, I kicked myself for not adhering to these principles sooner. In the examples I examine the strategy results according to median returns and the sum_ratio.

Median returns are simply the median returns of the strategy for that set of parameters. The sum_ratio is the sum of all strategy returns that ended positively divided by the sum of all returns that ended negatively for a set of parameters. A "successful" strategy should have a sum_ratio > 1 across multiple dimensions as well as consistent positive median returns.

In the analysis I look at the two metrics across different lookback periods (1 year, 3 year, and expanding), different numbers of mixture model components (k=2, 3, 5, 7, 9, 13, 17, 21) and across a number of holding periods in days (steps = 1, 2, 3, 5, 7, 10, 21).

When applied to SPY, QQQ, and TLT the strategy showed consistent positive results across a wide spectrum of parameter combinations whereas the application to GLD, EFA, and EEM were a little more mixed and definitely not as encouraging.

One theory I have for this result is that the factors I used as input to the GMM are US based interest rate spreads. These are likely to have a much stronger relationship to the behavior of SPY, QQQ, TLT vs the other ETFs. To improve performance I believe one would have to locate indicators based on the asset/ETF one wants to trade.

To sum up, I'm encouraged by the strategy framework, but would like to see a wider array of stocks, asset classes, and ETFs tested with various combinations of factors.

Here is a sample exploratory notebook I put together for the webinar that demonstrates the conclusions drawn above.

- Recap
- Model Update
- Model Testing
- Model Results
- Conclusions
- Code

In the previous post I gave a basic "proof" of concept, where we designed a trading strategy using Sklearn's implementation of Gaussian mixture models. The strategy attempts to predict an asset's return distribution such that returns that fall outside the predicted distribution are considered *outliers* and likely to mean revert. It showed some promise but had many areas in need of improvement.

In this version I've refactored a lot of the code into a more object oriented structure. Now the code uses three classes.

- ModelRunner() class - This is the class for executing the model and returning our prediction dataframe and some key parameters.
- ResultEval() class - This takes the data from the prediction dataframe and key parameters and outputs our strategy returns and summary information.
- ModelPlots() class - This takes our data and outputs key plots to help visualize the strategy performance.

I did this for several reasons.

- Reduce the likelihood of input errors by creating objects that share parameters.
- Increase the ease of model testing.
- Increase interpretability.

In this version, we are going to expand the analysis to include other, actively traded ETFs, and test the reproducibility of the results, and generalization ability of the model.

Here are the ETFs we will examine:

symbols = ['SPY', 'DIA', 'QQQ', 'GLD', 'TLT', 'EEM', 'ACWI']

Assuming the correct imports, with the refactored code we can run the model in the following fashion. We'll focus on the **TOO_LOW** events although I encourage readers to experiment with both.

```
# Project Directory
DIR = 'YOUR/PROJECT/DIRECTORY/'
# get fed data
f1 = 'TEDRATE' # ted spread
f2 = 'T10Y2Y' # constant maturity ten yer - 2 year
f3 = 'T10Y3M' # constant maturity 10yr - 3m
factors = [f1, f2, f3]
ft_cols = factors + ['lret']
start = pd.to_datetime('2002-01-01')
end = pd.to_datetime('2017-01-01')
symbols = ['SPY', 'DIA', 'QQQ', 'GLD', 'TLT', 'EEM', 'ACWI']
for mkt in symbols:
data = get_mkt_data(mkt, start, end, factors)
# Model Params
# ------------
a, b = (.2, .7) # found via coarse parameter search
alpha = 0.99
max_iter = 100
k = 2 # n_components
init = 'random' # or 'kmeans'
nSamples = 2_000
year = 2009 # cutoff
lookback = 1 # years
step_fwd = 5 # days
MR = ModelRunner(data, ft_cols, k, init, max_iter)
dct = MR.prediction_cycle(year, alpha, a, b, nSamples)
res = ResultEval(dct, step_fwd=step_fwd)
event_dict = res._get_event_states()
event = list(event_dict.keys())[1] # TOO_LOW
post_events = res.get_post_events(event_dict[event])
end_vals = res.get_end_vals(post_events)
smry = res.create_summary(end_vals)
p()
p('*'*25)
p(mkt, event.upper())
p(smry.T)
mp = ModelPlots(mkt, post_events, event, DIR, year)
mp.plot_pred_results(dct['pred'], dct['year'], dct['a'], dct['b'])
mp.plot_equity_timeline()
```

In this post I'm going to skip to the results and conclusions, and provide the refactored code at the end.

First let's look at the model results using SPY.

The first thing I noticed was that the confidence intervals were less responsive to increases in return volatility. The difference shows up in the reduction in accuracy. In Part 1, I believe the accuracy was ~71% whereas in the updated model the accuracy has dipped to ~68%! Does that hurt our strategy?

Judging by the equity curve, our strategy is not noticeably impacted by the reduced model accuracy!

The plotted equity curve is the cumulative sum of each event's returns assuming every event was a "trade". This should include overlapping events.

Let's look at the model results for the other ETFs.

The model has some interesting output. Notice that model accuracy ranges from ~57% (TLT) to ~83% (EEM). However, both of these equity curves end positively. GLD is distinctly volatile, and ends poorly, however the model was 75% accurate. DIA, QQQ, SPY, and ACWI all have stable sharply positive equity curves.

This supports my initial findings that model accuracy seems loosely, if at all, related to the strategy's equity curve. These results do indicate that the strategy is worth further evaluation but I'm hesitant to declare success.

I need to test the strategy over a longer period of time and make sure to include 2008/9. Also, I need to drill down into evaluating the strategy results vs the correlation of asset returns. For example, DIA, QQQ, and SPY are highly correlated, so we would expect the strategy to have similar results among those ETFs, but what about negatively and uncorrelated assets? TLT is generally negatively correlated with SPY while GLD is likely uncorrelated. Is the strategy performance for those two ETFs representative of other negatively/uncorrelated ETFs?

```
%load_ext watermark
%watermark
import pandas as pd
import pandas_datareader.data as web
import numpy as np
import sklearn.mixture as mix
import scipy.stats as scs
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
import missingno as msno
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")
import affirm
sns.set(font_scale=1.25)
style_kwds = {'xtick.major.size': 3, 'ytick.major.size': 3,
'font.family':u'courier prime code', 'legend.frameon': True}
sns.set_style('white', style_kwds)
p=print
p()
%watermark -p pandas,pandas_datareader,numpy,scipy,sklearn,matplotlib,seaborn
# **********************************************************************
def get_mkt_data(mkt, start, end, factors):
"""Function to get benchmark data from
Yahoo and Factor data from FRED
Params:
mkt : str(), symbol
start : pd.DateTime()
end : pd.DateTime()
factors : list() of str()
Returns:
data : pd.DataFrame()
"""
MKT = (web.DataReader([mkt], 'yahoo', start, end)['Adj Close']
.rename(columns={mkt:mkt})
.assign(lret=lambda x: np.log(x[mkt]/x[mkt].shift(1)))
.dropna())
data = (web.DataReader(factors, 'fred', start, end)
.join(MKT, how='inner')
.dropna())
return data
# **********************************************************************
class ModelRunner():
def __init__(self, *args, **kwargs):
"""Class to run mixture model model
Params:
data : pd.DataFrame()
ft_cols : list() of feature columns str()
k : int(), n_components
max_iter : int(), max iterations
init : str() {random, kmeans}
"""
self.data = data
self.ft_cols = ft_cols
self.k = k
self.max_iter = max_iter
self.init = init
np.random.seed(123457) # make results reproducible
def _run_model(self, bgm=None, **kwargs):
"""Function to run mixture model
Params:
data : pd.DataFrame()
ft_cols : list of str()
k : int(), n_components
max_iter : int()
init : str() {random, kmeans}
Returns:
model : sklearn model object
hidden_states : array-like, hidden states
"""
X = self.data[self.ft_cols].values
if bgm:
model = mix.BayesianGaussianMixture(n_components=self.k,
max_iter=self.max_iter,
init_params=self.init,
**kwargs,
).fit(X)
else:
model = mix.GaussianMixture(n_components=self.k,
max_iter=self.max_iter,
init_params=self.init,
**kwargs,
).fit(X)
hidden_states = model.predict(X)
return model, hidden_states
def _get_state_est(self, model, hidden_states):
"""Function to return estimated state mean and state variance
Params:
model : sklearn model object
hidden_states : {array-like}
Returns:
mr_i : mean return of last estimated state
mvar_i : model variance of last estimated state
"""
# get last state
last_state = hidden_states[-1]
# last value is mean return for ith state
mr_i = model.means_[last_state][-1]
mvar_i = np.diag(model.covariances_[last_state])[-1]
return mr_i, mvar_i
def _get_ci(self, mr_i, mvar_i, alpha, a, b, nSamples):
"""Function to sample confidence intervals
from the JohnsonSU distribution
Params:
mr_i : float()
mvar_i : float()
alpha : float()
a : float()
b : float()
nsamples : int()
Returns:
ci : tuple(float(), float()), (low_ci, high_ci)
"""
rvs_ = scs.johnsonsu.rvs(a, b, loc=mr_i, scale=mvar_i, size=nSamples)
ci = scs.johnsonsu.interval(alpha=alpha, a=a, b=b,
loc=np.mean(rvs_), scale=np.std(rvs_))
return ci
def prediction_cycle(self, *args, **kwargs):
"""Function to make walk forward predictions from cutoff year onwards
Params:
year : int(), cutoff year
alpha : float()
a : float()
b : float()
nsamples : int()
Returns:
dict() :
pred : pd.DataFrame()
year : str()
a, b : float(), float()
"""
cutoff = year
train_df = self.data.ix[str(cutoff - lookback):str(cutoff)].dropna()
oos = self.data.ix[str(cutoff+1):].dropna()
# confirm that train_df end index is different than oos start index
assert train_df.index[-1] != oos.index[0]
# create pred list to hold tuple rows
preds = []
for t in tqdm(oos.index):
if t == oos.index[0]:
insample = train_df
# run model func to return model object and hidden states using params
model, hstates = self._run_model(**kwargs)
# get hidden state mean and variance
mr_i, mvar_i = self._get_state_est(model, hstates)
# get confidence intervals from sampled distribution
low_ci, high_ci = self._get_ci(mr_i, mvar_i, alpha, a, b, nSamples)
# append tuple row to pred list
preds.append((t, hstates[-1], mr_i, mvar_i, low_ci, high_ci))
# increment insample dataframe
insample = data.ix[:t]
cols = ['ith_state', 'ith_ret', 'ith_var', 'low_ci', 'high_ci']
pred = (pd.DataFrame(preds, columns=['Dates']+cols)
.set_index('Dates').assign(tgt = oos['lret']))
# logic to see if error exceeds neg or pos CI
pred_copy = pred.copy().reset_index()
# Identify indices where target return falls between CI
win = pred_copy.query("low_ci < tgt < high_ci").index
# create list of binary variables representing in/out CI
in_rng_list = [1 if i in win else 0 for i in pred_copy.index]
# assign binary variables sequence to new column
pred['in_rng'] = in_rng_list
return {'pred':pred, 'year':year, 'a':a, 'b':b}
# **********************************************************************
class ResultEval():
def __init__(self, data, step_fwd):
"""Class to evaluate prediction results
Params:
data : dict() containing results of ModelRunner()
step_fwd : int(), number of days to evalute post event
"""
self.df = data['pred'].copy().reset_index()
self.step_fwd=step_fwd
def _get_event_states(self):
"""Function to get event indexes
Index bjects must be called 'too_high', 'too_low'
Returns:
dict() : values are index objects
"""
too_high = self.df.query("tgt > high_ci").index
too_low = self.df.query("tgt < low_ci").index
return {'too_high':too_high, 'too_low':too_low}
def get_post_events(self, event):
"""Function to return dictionary where key, value is integer
index, and Pandas series consisting of returns post event
Params:
df : pd.DataFrame(), prediction df
event : {array-like}, index of target returns that exceed CI high or low
step_fwd : int(), how many days to include after event
Returns:
after_event : dict() w/ values = pd.Series()
"""
after_event = {}
for i in range(len(event)):
tmp_ret = self.df.ix[event[i]:event[i]+self.step_fwd, ['Dates','tgt']]
# series of returns with date index
after_event[i] = tmp_ret.set_index('Dates', drop=True).squeeze()
return after_event
def get_end_vals(self, post_events):
"""Function to sum and agg each post events' returns"""
end_vals = []
for k in post_events.keys():
tmp = post_events[k].copy()
tmp.iloc[0] = 0 # set initial return to zero
end_vals.append(tmp.sum())
return end_vals
def create_summary(self, end_vals):
"""Function to take ending values and calculate summary
Will fail if count of ending values (>0) or (<0) is less than 1
"""
gt0 = [x for x in end_vals if x>0]
lt0 = [x for x in end_vals if x<0]
assert len(gt0) > 1
assert len(lt0) > 1
summary = (pd.DataFrame(index=['value'])
.assign(mean = f'{np.mean(end_vals):.4f}')
.assign(median = f'{np.median(end_vals):.4f}')
.assign(max_ = f'{np.max(end_vals):.4f}')
.assign(min_ = f'{np.min(end_vals):.4f}')
.assign(gt0_cnt = f'{len(gt0):d}')
.assign(lt0_cnt = f'{len(lt0):d}')
.assign(sum_gt0 = f'{sum(gt0):.4f}')
.assign(sum_lt0 = f'{sum(lt0):.4f}')
.assign(sum_ratio = f'{sum(gt0) / abs(sum(lt0)):.4f}')
.assign(gt_pct = f'{len(gt0) / (len(gt0) + len(lt0)):.4f}')
.assign(lt_pct = f'{len(lt0) / (len(gt0) + len(lt0)):.4f}')
)
return summary
# **********************************************************************
class ModelPlots():
def __init__(self, mkt, post_events, event_state, project_dir, year):
"""Class to visualize prediction results and summary
Params:
mkt : str(), symbol
post_events : dict() of pd.Series()
event_state : str(), 'too_high', 'too_low'
project_dir : str()
year : int(), cutoff year
"""
self.mkt = mkt
self.post_events = post_events
self.event_state = event_state
self.DIR = project_dir
self.year = year
def plot_equity_timeline(self):
"""Function to plot event timeline with equity curve second axis"""
agg_tmp = []
fig, ax = plt.subplots(figsize=(10, 7))
ax1 = ax.twinx()
ax.axhline(y=0, color='k', lw=3)
for k in self.post_events.keys():
tmp = self.post_events[k].copy()
tmp.iloc[0] = 0 # set initial return to zero
agg_tmp.append(tmp)
if tmp.sum() > 0: color = 'dodgerblue'
else: color = 'red'
ax.plot(tmp.index, tmp.cumsum(), color=color, alpha=0.5)
ax.set_xlim(pd.to_datetime(str(self.year) + '-12-31'), tmp.index[-1])
ax.set_xlabel('Dates')
ax.set_title(f"{self.mkt} {self.event_state.upper()}", fontsize=16)
#sns.despine(offset=2)
agg_df = pd.concat(agg_tmp).cumsum()
ax1.plot(agg_df.index, agg_df.values, color='k', lw=5)
ax.set_ylabel('Event Returns')
ax1.set_ylabel('Equity Curve')
fig.savefig(self.DIR + f'{self.mkt} {self.event_state.upper()} post events timeline {pd.datetime.today()}.png', dpi=300)
return
def plot_events_timeline(self):
"""Function to plot even timeline only"""
fig, ax = plt.subplots(figsize=(10, 7))
ax.axhline(y=0, color='k', lw=3)
for k in self.post_events.keys():
tmp = self.post_events[k].copy()
tmp.iloc[0] = 0 # set initial return to zero
if tmp.sum() > 0: color = 'dodgerblue'
else: color = 'red'
ax.plot(tmp.index, tmp.cumsum(), color=color, alpha=0.5)
ax.set_xlim(pd.to_datetime('2009-12-31'), tmp.index[-1])
ax.set_xlabel('Dates')
ax.set_title(f"{self.mkt} {self.event_state.upper()}", fontsize=16, fontweight='demi')
sns.despine(offset=2)
fig.savefig(self.DIR + f'{self.mkt} {self.event_state.upper()} post events timeline.png', dpi=300)
return
def plot_events_post(self):
"""Function to plot events from zero until n days after"""
fig, ax = plt.subplots(figsize=(10, 7))
ax.axhline(y=0, color='k', lw=3)
for k in self.post_events.keys():
tmp = self.post_events[k].copy()
tmp.iloc[0] = 0 # set initial return to zero
if tmp.sum() > 0: color = 'dodgerblue'
else: color = 'red'
tmp.cumsum().reset_index(drop=True).plot(color=color, alpha=0.5, ax=ax)
ax.set_xlabel('Days')
ax.set_title(f"{self.mkt} {self.event_state.upper()}", fontsize=16, fontweight='demi')
sns.despine(offset=2)
fig.savefig(self.DIR + f'{self.mkt} {self.event_state.upper()} post events.png', dpi=300)
return
def plot_distplot(self, ending_values, summary):
"""Function to plot histogram of ending values"""
colors = sns.color_palette('RdYlBu', 4)
fig, ax = plt.subplots(figsize=(10, 7))
sns.distplot(pd.DataFrame(ending_values), bins=15, color=colors[0],
kde_kws={"color":colors[3]}, hist_kws={"color":colors[3], "alpha":0.35}, ax=ax)
ax.axvline(x=float(summary['mean'][0]), label='mean', color='dodgerblue', lw=3, ls='-.')
ax.axvline(x=float(summary['median'][0]), label='median', color='red', lw=3, ls=':')
ax.axvline(x=0, color='black', lw=1, ls='-')
ax.legend(loc='best')
sns.despine(offset=2)
ax.set_title(f"{self.mkt} {self.event_state.upper()}", fontsize=16, fontweight='demi')
fig.savefig(self.DIR + f'{self.mkt} {self.event_state.upper()} distplot.png', dpi=300)
return
def plot_pred_results(self, df, year, a, b):
"""Function to plot prediction results and confidence intervals"""
# colorblind safe palette http://colorbrewer2.org/
colors = sns.color_palette('RdYlBu', 4)
fig, ax = plt.subplots(figsize=(10, 7))
ax.scatter(df.index, df.tgt, c=[colors[1] if x==1 else colors[0] for x in df['in_rng']], alpha=0.85)
df['high_ci'].plot(ax=ax, alpha=0.65, marker='.', color=colors[2])
df['low_ci'].plot(ax=ax, alpha=0.65, marker='.', color=colors[3])
ax.set_xlim(df.index[0], df.index[-1])
nRight = df.query('in_rng==1').shape[0]
accuracy = nRight / df.shape[0]
ax.set_title('{:^10}\ncutoff year: {} | accuracy: {:2.3%} | errors: {} | a={}, b={}'
.format(self.mkt, year, accuracy, df.shape[0] - nRight, a, b))
in_ = mpl.lines.Line2D(range(1), range(1), color="white", marker='o', markersize=10, markerfacecolor=colors[1])
out_ = mpl.lines.Line2D(range(1), range(1), color="white", marker='o', markersize=10, markerfacecolor=colors[0])
hi_ci = mpl.lines.Line2D(range(1), range(1), color="white", marker='.', markersize=15, markerfacecolor=colors[2])
lo_ci = mpl.lines.Line2D(range(1), range(1), color="white", marker='.', markersize=15, markerfacecolor=colors[3])
leg = ax.legend([in_, out_, hi_ci, lo_ci],["in", "out", 'high_ci', 'low_ci'],
loc = "center left", bbox_to_anchor = (1, 0.85), numpoints = 1)
sns.despine(offset=2)
file_str = self.DIR+f'{self.mkt} prediction success {pd.datetime.today()}.png'
fig.savefig(file_str, dpi=300, bbox_inches="tight")
return
```

]]>**Recap****Hypothesis****Strategy****Conclusion****Caveats and Areas of Exploration****References**

In Part 1 we learned about Hidden Markov Models and their application using a toy example involving a lazy pet dog. In Part 2 we learned about the expectation-maximization algorithm, K-Means, and how Mixture Models improve on K-Means weaknesses. If you still have some questions or fuzzy understanding about these topics, I would recommend reviewing the prior posts. In those posts I also provide links to resources that really helped my understanding.

Given what we know about Mixture Models and their ability to characterize general distributions, can we use it to model a return series, such that we can identify **outlier** returns that are likely to mean revert?

**This strategy attempts to predict an asset's return distribution**. Actual returns that fall outside the predicted confidence intervals are considered ** outliers** and likely to revert to the mean.

We first fit a Gaussian Mixture Model to the historical daily return series. We use the model's estimate of the hidden state's mean and variance as parameters to a random sampling from the JohnsonSU distribution. We then calculate confidence intervals from the sampled distribution.

From there we evaluate model accuracy and the n days cumulative returns after each outlier event. We compute some summary statistics and try to answer the hypothesis.

Searching the net I found a useful bit of code from this site. Instead of assuming our asset return distribution is normal, we can use Python and Scipy.stats to find the brute force answer. We can cycle through each continuous distribution and run a goodness-of-fit procedure called the KS-test. The KS-test is a non-parametric method which examines the distance between a *known *cumulative distribution function and the CDF of the your sample data. The KS-test outputs the probability that your sample data comes from the benchmark distribution.

```
# code sample from:
# http://www.aizac.info/simple-check-of-a-sample-against-80-distributions/
cdfs = [
"norm", #Normal (Gaussian)
"alpha", #Alpha
"anglit", #Anglit
"arcsine", #Arcsine
"beta", #Beta
"betaprime", #Beta Prime
"bradford", #Bradford
"burr", #Burr
"cauchy", #Cauchy
"chi", #Chi
"chi2", #Chi-squared
"cosine", #Cosine
"dgamma", #Double Gamma
"dweibull", #Double Weibull
"erlang", #Erlang
"expon", #Exponential
"exponweib", #Exponentiated Weibull
"exponpow", #Exponential Power
"fatiguelife", #Fatigue Life (Birnbaum-Sanders)
"foldcauchy", #Folded Cauchy
"f", #F (Snecdor F)
"fisk", #Fisk
"foldnorm", #Folded Normal
"frechet_r", #Frechet Right Sided, Extreme Value Type II
"frechet_l", #Frechet Left Sided, Weibull_max
"gamma", #Gamma
"gausshyper", #Gauss Hypergeometric
"genexpon", #Generalized Exponential
"genextreme", #Generalized Extreme Value
"gengamma", #Generalized gamma
"genlogistic", #Generalized Logistic
"genpareto", #Generalized Pareto
"genhalflogistic", #Generalized Half Logistic
"gilbrat", #Gilbrat
"gompertz", #Gompertz (Truncated Gumbel)
"gumbel_l", #Left Sided Gumbel, etc.
"gumbel_r", #Right Sided Gumbel
"halfcauchy", #Half Cauchy
"halflogistic", #Half Logistic
"halfnorm", #Half Normal
"hypsecant", #Hyperbolic Secant
"invgamma", #Inverse Gamma
"invnorm", #Inverse Normal
"invweibull", #Inverse Weibull
"johnsonsb", #Johnson SB
"johnsonsu", #Johnson SU
"laplace", #Laplace
"logistic", #Logistic
"loggamma", #Log-Gamma
"loglaplace", #Log-Laplace (Log Double Exponential)
"lognorm", #Log-Normal
"lomax", #Lomax (Pareto of the second kind)
"maxwell", #Maxwell
"mielke", #Mielke's Beta-Kappa
"nakagami", #Nakagami
"ncx2", #Non-central chi-squared
# "ncf", #Non-central F
"nct", #Non-central Student's T
"norm" # Normal
"pareto", #Pareto
"powerlaw", #Power-function
"powerlognorm", #Power log normal
"powernorm", #Power normal
"rdist", #R distribution
"reciprocal", #Reciprocal
"rayleigh", #Rayleigh
"rice", #Rice
"recipinvgauss", #Reciprocal Inverse Gaussian
"semicircular", #Semicircular
"t", #Student's T
"triang", #Triangular
"truncexpon", #Truncated Exponential
"truncnorm", #Truncated Normal
"tukeylambda", #Tukey-Lambda
"uniform", #Uniform
"vonmises", #Von-Mises (Circular)
"wald", #Wald
"weibull_min", #Minimum Weibull (see Frechet)
"weibull_max", #Maximum Weibull (see Frechet)
"wrapcauchy", #Wrapped Cauchy
"ksone", #Kolmogorov-Smirnov one-sided (no stats)
"kstwobign"] #Kolmogorov-Smirnov two-sided test for Large N
sample = data['lret'].values
for cdf in cdfs:
try:
#fit our data set against every probability distribution
parameters = eval("scs."+cdf+".fit(sample)");
#Applying the Kolmogorov-Smirnof one sided test
D, p = scs.kstest(sample, cdf, args=parameters);
#pretty-print the results
D = round(D, 5)
p = round(p, 5)
#pretty-print the results
print (cdf.ljust(16) + ("p: "+str(p)).ljust(25)+"D: "+str(D));
except: continue
```

After running this code you should see output similar to the below code. For simplicity sake, just remember the higher the p-value, the more confident the ks-test is that our data came from the given distribution.

I had never heard of the Johnson SU distribution before this code. I had to research it, and I found that the Johnson SU was developed to in order to apply the established methods and theory of the normal distribution to non-normal data sets. What gives it this flexibility is the two shape parameters, gamma and delta, or a, b in Scipy. For more information I recommend this Wolfram reference link and this Scipy.stats link.

After selecting the distribution we can code up the experiment.

```
# import packages
# code was done in Jupyter Notebook
%load_ext watermark
%watermark
import pandas as pd
import pandas_datareader.data as web
import numpy as np
import sklearn.mixture as mix
import scipy.stats as scs
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.dates import YearLocator, MonthLocator
%matplotlib inline
import seaborn as sns
import missingno as msno
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")
sns.set(font_scale=1.25)
style_kwds = {'xtick.major.size': 3, 'ytick.major.size': 3,
'font.family':u'courier prime code', 'legend.frameon': True}
sns.set_style('white', style_kwds)
p=print
p()
%watermark -p pandas,pandas_datareader,numpy,scipy,sklearn,matplotlib,seaborn
```

Now let's get some data.

```
# get fed data
f1 = 'TEDRATE' # ted spread
f2 = 'T10Y2Y' # constant maturity ten yer - 2 year
f3 = 'T10Y3M' # constant maturity 10yr - 3m
start = pd.to_datetime('2002-01-01')
end = pd.to_datetime('2017-01-01')
mkt = 'SPY'
MKT = (web.DataReader([mkt], 'yahoo', start, end)['Adj Close']
.rename(columns={mkt:mkt})
.assign(lret=lambda x: np.log(x[mkt]/x[mkt].shift(1)))
.dropna())
data = (web.DataReader([f1, f2, f3], 'fred', start, end)
.join(MKT, how='inner')
.dropna())
p(data.head())
# gives us a quick visual inspection of the data
msno.matrix(data)
```

Now we create our convenience functions. The first is the **run_model()** function which takes the data, feature columns, and Sklearn mixture parameters to produce a fitted model object and the predicted hidden states. Note that you can use a Bayesian Gaussian mixture if you so choose. The difference between the two models is that the Bayesian mixture model will try to derive the correct number of mixture components up to a chosen maximum. For more information on the Bayesian mixture model I recommend consulting the Sklearn docs.

```
def _run_model(df, ft_cols, k, max_iter, init, bgm=None, **kwargs):
"""Function to run mixture model
Params:
df : pd.DataFrame()
ft_cols : list of str()
k : int(), n_components
max_iter : int()
init : str() {random, kmeans}
Returns:
model : sklearn model object
hidden_states : array-like, hidden states
"""
X = df[ft_cols].values
if bgm:
model = mix.BayesianGaussianMixture(n_components=k,
max_iter=max_iter,
init_params=init,
**kwargs,
).fit(X)
else:
model = mix.GaussianMixture(n_components=k,
max_iter=max_iter,
init_params=init,
**kwargs,
).fit(X)
hidden_states = model.predict(X)
return model, hidden_states
```

The next function takes the model object and predicted hidden states and returns the estimated mean and variance of the last state.

```
def _get_state_est(model, hidden_states):
"""Function to return estimated state mean and state variance
Params:
model : sklearn model object
hidden_states : {array-like}
Returns:
mr_i : model mean return of last estimated state
mvar_i : model variance of last estimated state
"""
# get last state
last_state = hidden_states[-1]
# last value is mean return for ith state
mr_i = model.means_[last_state][-1]
mvar_i = np.diag(model.covariances_[last_state])[-1]
return mr_i, mvar_i
```

Now we take the estimated state mean and variance of the last predicted state and feed it into the **_get_ci() **function. This function takes the alpha and shape parameters, estimated mean and variance and randomly samples from the JohnsonSU distribution. From this distribution we derive confidence intervals.

```
def _get_ci(mr_i, mvar_i, alpha, a, b, nSamples):
"""Function to sample confidence intervals from the JohnsonSU distribution
Params:
mr_i : float()
mvar_i : float()
alpha : float()
a : float()
b : float()
nsamples : int()
Returns:
ci : tuple(float(), float()), (low_ci, high_ci)
"""
np.random.RandomState(0)
rvs_ = scs.johnsonsu.rvs(a, b, loc=mr_i, scale=mvar_i, size=nSamples)
ci = scs.johnsonsu.interval(alpha=alpha, a=a, b=b, loc=np.mean(rvs_), scale=np.std(rvs_))
return ci
```

The final function visualizes our model predictions, by highlighting target returns that fell inside and outside the confidence intervals, along with our predicted confidence intervals.

```
def plot_pred_success(df, year, a, b):
# colorblind safe palette http://colorbrewer2.org/
colors = sns.color_palette('RdYlBu', 4)
fig, ax = plt.subplots(figsize=(10, 7))
ax.scatter(df.index, df.tgt, c=[colors[1] if x==1 else colors[0] for x in df['in_rng']], alpha=0.85)
df['high_ci'].plot(ax=ax, alpha=0.65, marker='.', color=colors[2])
df['low_ci'].plot(ax=ax, alpha=0.65, marker='.', color=colors[3])
ax.set_xlim(df.index[0], df.index[-1])
nRight = df.query('in_rng==1').shape[0]
accuracy = nRight / df.shape[0]
ax.set_title(r'cutoff year: {} | accuracy: {:2.3%} | errors: {} | a={}, b={}'
.format(year, accuracy, df.shape[0] - nRight, a, b))
in_ = mpl.lines.Line2D(range(1), range(1), color="white", marker='o', markersize=10, markerfacecolor=colors[1])
out_ = mpl.lines.Line2D(range(1), range(1), color="white", marker='o', markersize=10, markerfacecolor=colors[0])
hi_ci = mpl.lines.Line2D(range(1), range(1), color="white", marker='.', markersize=15, markerfacecolor=colors[2])
lo_ci = mpl.lines.Line2D(range(1), range(1), color="white", marker='.', markersize=15, markerfacecolor=colors[3])
leg = ax.legend([in_, out_, hi_ci, lo_ci],["in", "out", 'high_ci', 'low_ci'],
loc = "center left", bbox_to_anchor = (1, 0.85), numpoints = 1)
sns.despine(offset=2)
plt.tight_layout()
return
```

Now we can run the model in a walk-forward fashion. The code uses a chosen lookback period up until the cutoff year to fit the model. From there, the code iterates refitting the model each day, outputting the predicted confidence intervals. The code is setup to run using successive cutoff years, however I will leave that to you readers to experiment with. In this demo we will break the loop after the first cutoff year.

```
%%time
# Model Params
# ------------
a, b = (.2, .7) # found via coarse parameter search
alpha = 0.99
max_iter = 100
k = 2
init = 'random' #'kmeans'
nSamples = 2_000
ft_cols = [f1, f2, f3, 'lret']
years = range(2009,2016)
lookback = 1 # chosen for ease of computation
# Iterate Model
# ------------
for year in years:
cutoff = year
train_df = data.ix[str(cutoff - lookback):str(cutoff)].dropna()
oos = data.ix[str(cutoff+1):].dropna()
# confirm that train_df end index is different than oos start index
assert train_df.index[-1] != oos.index[0]
# create pred list to hold tuple rows
preds = []
for t in tqdm(oos.index):
if t == oos.index[0]:
insample = train_df
# run model func to return model object and hidden states using params
model, hstates = _run_model(insample, ft_cols, k, max_iter, init, random_state=0)
# get hidden state mean and variance
mr_i, mvar_i = _get_state_est(model, hstates)
# get confidence intervals from sampled distribution
low_ci, high_ci = _get_ci(mr_i, mvar_i, alpha, a, b, nSamples)
# append tuple row to pred list
preds.append((t, hstates[-1], mr_i, mvar_i, low_ci, high_ci))
# increment insample dataframe
insample = data.ix[:t]
cols = ['ith_state', 'ith_ret', 'ith_var', 'low_ci', 'high_ci']
pred = (pd.DataFrame(preds, columns=['Dates']+cols)
.set_index('Dates').assign(tgt = oos['lret']))
# logic to see if error exceeds neg or pos CI
pred_copy = pred.copy().reset_index()
# Identify indices where target return falls between CI
win = pred_copy.query("low_ci < tgt < high_ci").index
# create list of binary variables representing in/out CI
in_rng_list = [1 if i in win else 0 for i in pred_copy.index]
# assign binary variables sequence to new column
pred['in_rng'] = in_rng_list
plot_pred_success(pred, year, a, b)
break
```

After that's complete we need to set up our analytics functions to evaluate the return patterns post each event. Recall that an **event** is an actual return that fell outside of our predicted confidence intervals.

```
def post_event(df, event, step_fwd=None):
"""Function to return dictionary where key, value is integer
index, and Pandas series consisting of returns post event
Params:
df : pd.DataFrame(), prediction df
event : {array-like}, index of target returns that exceed CI high or low
step_fwd : int(), how many days to include after event
Returns:
after_event : dict()
"""
after_event = {}
for i in range(len(event)):
tmp_ret = df.ix[event[i]:event[i]+step_fwd, ['Dates','tgt']]
# series of returns with date index
after_event[i] = tmp_ret.set_index('Dates', drop=True).squeeze()
return after_event
def plot_events_timeline(post_events, event_state):
fig, ax = plt.subplots(figsize=(10, 7))
ax.axhline(y=0, color='k', lw=3)
for k in post_events.keys():
tmp = post_events[k].copy()
tmp.iloc[0] = 0 # set initial return to zero
if tmp.sum() > 0: color = 'dodgerblue'
else: color = 'red'
ax.plot(tmp.index, tmp.cumsum(), color=color, alpha=0.5)
ax.set_xlim(pd.to_datetime('2009-12-31'), tmp.index[-1])
ax.set_xlabel('Dates')
ax.set_title(f"{mkt} {event_state.upper()}", fontsize=16, fontweight='demi')
sns.despine(offset=2)
return
def plot_events_post(post_events, event_state):
fig, ax = plt.subplots(figsize=(10, 7))
ax.axhline(y=0, color='k', lw=3)
for k in post_events.keys():
tmp = post_events[k].copy()
tmp.iloc[0] = 0 # set initial return to zero
if tmp.sum() > 0: color = 'dodgerblue'
else: color = 'red'
tmp.cumsum().reset_index(drop=True).plot(color=color, alpha=0.5, ax=ax)
ax.set_xlabel('Days')
ax.set_title(f"{mkt} {event_state.upper()}", fontsize=16, fontweight='demi')
sns.despine(offset=2)
return
def plot_distplot(ending_values, summary):
colors = sns.color_palette('RdYlBu', 4)
fig, ax = plt.subplots(figsize=(10, 7))
sns.distplot(pd.DataFrame(ending_values), bins=15, color=colors[0],
kde_kws={"color":colors[3]}, hist_kws={"color":colors[3], "alpha":0.35}, ax=ax)
ax.axvline(x=float(summary['mean'][0]), label='mean', color='dodgerblue', lw=3, ls='-.')
ax.axvline(x=float(summary['median'][0]), label='median', color='red', lw=3, ls=':')
ax.axvline(x=0, color='black', lw=1, ls='-')
ax.legend(loc='best')
sns.despine(offset=2)
ax.set_title(f"{mkt} {event_state.upper()}", fontsize=16, fontweight='demi')
return
def get_end_vals(post_events):
"""Function to sum and agg each post events' returns"""
end_vals = []
for k in post_events.keys():
tmp = post_events[k].copy()
tmp.iloc[0] = 0 # set initial return to zero
end_vals.append(tmp.sum())
return end_vals
def create_summary(end_vals):
gt0 = [x for x in end_vals if x>0]
lt0 = [x for x in end_vals if x<0]
assert len(gt0) > 1
assert len(lt0) > 1
summary = (pd.DataFrame(index=['value'])
.assign(mean = f'{np.mean(end_vals):.4f}')
.assign(median = f'{np.median(end_vals):.4f}')
.assign(max_ = f'{np.max(end_vals):.4f}')
.assign(min_ = f'{np.min(end_vals):.4f}')
.assign(gt0_cnt = f'{len(gt0):d}')
.assign(lt0_cnt = f'{len(lt0):d}')
.assign(sum_gt0 = f'{sum(gt0):.4f}')
.assign(sum_lt0 = f'{sum(lt0):.4f}')
.assign(sum_ratio = f'{sum(gt0) / abs(sum(lt0)):.4f}')
.assign(gt_pct = f'{len(gt0) / (len(gt0) + len(lt0)):.4f}')
.assign(lt_pct = f'{len(lt0) / (len(gt0) + len(lt0)):.4f}')
)
return summary
```

Now we can run the following code to extract the events, output the plots and view the summary.

```
df = pred.copy().reset_index()
too_high = df.query("tgt > high_ci").index
too_low = df.query("tgt < low_ci").index
step_fwd=5 # how many days to look forward
event_states = ['too_high', 'too_low']
for event in event_states:
after_event = post_event(df, eval(event), step_fwd=step_fwd)
ev = get_end_vals(after_event)
smry = create_summary(ev)
p()
p('*'*25)
p(mkt, event.upper())
p(smry.T)
plot_events_timeline(after_event, event)
plot_events_post(after_event, event)
plot_distplot(ev, smry, event)
```

To answer the original hypothesis about finding market bottoms, we can examine the returns after a *too low* event. Looking at the summary we can see that **the mean and median return are +62 and +82 bps respectively**. Looking at the **sum_ratio** we can see that that the sum of all positive return events is almost 2x the sum of all negative returns. We can also see that, given a *too low* event, after 5 days SPY had positive returns 65% of the time!

These are positive indicators that we may be able to predict market bottoms. However, I would emphasize more testing is needed.

- We don't consider market frictions such as commissions or slippage
- The daily prices may or may not represent actual traded values.
- I used a coarse search to find the JohnsonSU shape parameters, a and b. These may or may not be the best values. Just note that we can use these parameters to arbitrarily adjust the confidence intervals to be more or less conservative. I leave this for the reader to explore.
- In many cases both
*too high,*and*too low*events result in majority positive returns, this*could*be an indication of the overall bullishness of the sample period that may or may not affect model results in the future. - I chose k=2 components for computational simplicity, but there may be better values.
- I chose the lookback period for computational simplicity, but there may be better values.
- Varying the
**step_fwd**parameter may hurt or hinder the strategy. - What makes this approach particularly interesting, is that we don't
**want**anything close to 100% accuracy from our predicted confidence intervals, otherwise we won't have enough "trades".**This adds a level of artistry/complexity because the parameter values we choose should create predictable mean reversion opportunities, but the model accuracy is not a good indicator of this.**Testing the strategy with other assets shows "profitability" in some cases where the model accuracy is sub 60%.

**Please contact me if you find any errors. **

- Part 1 Recap
- Part 2 Goals
- Jupyter (IPython) Notebook
- References

In part 1 of this series we got a feel for Markov Models, Hidden Markov Models, and their applications. We went through the process of using a hidden Markov model to solve a toy problem involving a pet dog. We concluded the article by going through a high level quant finance application of Gaussian mixture models to detect historical regimes.

In this post, my goal is to impart a basic understanding of the expectation maximization algorithm which, not only forms the basis of several machine learning algorithms, including K-Means, and Gaussian mixture models, but also has lots of applications beyond finance. We will also cover the K-Means algorithm which is a form of EM, and its weaknesses. Finally we will discuss how Gaussian mixture models improve on several of K-Means weaknesses.

This post is structured as a Jupyter (IPython) Notebook. I used several different resources\references and tried to give proper credit. Please contact me if you find errors, have suggestions, or if any sources were not attributed correctly.

*Click here to view this notebook directly on NBviewer.jupyter.org*

- Who is Andrey Markov?
- What is the Markov Property?
- What is a Markov Model?
- What makes a Markov Model Hidden?
- A Hidden Markov Model for Regime Detection
- Conclusion
- References

Markov was a Russian mathematician best known for his work on stochastic processes. The focus of his early work was number theory but after 1900 he focused on probability theory, so much so that he taught courses after his official retirement in 1905 until his deathbed [2]. During his research Markov was able to extend the law of large numbers and the central limit theorem to apply to certain sequences of dependent random variables, now known as **Markov Chains** [1][2]. Markov chains are widely applicable to physics, economics, statistics, biology, etc. Two of the most well known applications were Brownian motion [3], and random walks.

"...a random process where the future is independent of the past given the present." [4]

Assume a simplified coin toss game with a fair coin. Suspend disbelief and assume that the Markov property is not yet known and we would like to predict the probability of flipping heads after 10 flips. Under the assumption of conditional dependence (the coin has memory of past states and the future state depends on the sequence of past states) we must record the specific sequence that lead up to the 11th flip and the joint probabilities of those flips. So imagine after 10 flips we have a random sequence of heads and tails. The joint probability of that sequence is 0.5^10 = 0.0009765625. Under conditional dependence, the probability of heads on the next flip is 0.0009765625 * 0.5 = 0.00048828125.

Is that the real probability of flipping heads on the 11th flip? Hell no!

We know that the event of flipping the coin does not depend on the result of the flip before it. The coin has no memory. The process of successive flips does not encode the prior results. Each flip is a unique event with equal probability of heads or tails, aka conditionally independent of past states. This is the Markov property.

A Markov chain (model) describes a stochastic process where the assumed probability of future state(s) depends only on the current process state and not on any the states that preceded it (*shocker*).

Let's get into a simple example. Assume you want to model the future probability that your dog is in one of three states given its current state. To do this we need to specify the state space, the initial probabilities, and the transition probabilities.

Imagine you have a very lazy fat dog, so we define the **state space **as sleeping, eating, or pooping. We will set the initial probabilities to 35%, 35%, and 30% respectively.

```
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
%matplotlib inline
# create state space and initial state probabilities
states = ['sleeping', 'eating', 'pooping']
pi = [0.35, 0.35, 0.3]
state_space = pd.Series(pi, index=states, name='states')
print(state_space)
print(state_space.sum())
```

The next step is to define the transition probabilities. They are simply the probabilities of staying in the same state or moving to a different state given the current state.

```
# create transition matrix
# equals transition probability matrix of changing states given a state
# matrix is size (M x M) where M is number of states
q_df = pd.DataFrame(columns=states, index=states)
q_df.loc[states[0]] = [0.4, 0.2, 0.4]
q_df.loc[states[1]] = [0.45, 0.45, 0.1]
q_df.loc[states[2]] = [0.45, 0.25, .3]
print(q_df)
q = q_df.values
print('\n', q, q.shape, '\n')
print(q_df.sum(axis=1))
```

Now that we have the initial and transition probabilities setup we can create a Markov diagram using the **Networkx** package.

To do this requires a little bit of flexible thinking. Networkx creates *Graphs* that consist of *nodes *and *edges*. In our toy example the dog's possible states are the nodes and the edges are the lines that connect the nodes. The transition probabilities are the *weights. *They represent the probability of transitioning to a state given the current state.

Something to note is networkx deals primarily with dictionary objects. With that said, we need to create a dictionary object that holds our edges and their weights.

```
from pprint import pprint
# create a function that maps transition probability dataframe
# to markov edges and weights
def _get_markov_edges(Q):
edges = {}
for col in Q.columns:
for idx in Q.index:
edges[(idx,col)] = Q.loc[idx,col]
return edges
edges_wts = _get_markov_edges(q_df)
pprint(edges_wts)
```

Now we can create the graph. To visualize a Markov model we need to use *nx.MultiDiGraph().* A multidigraph is simply a directed graph which can have multiple arcs such that a single node can be both the origin and destination.

In the following code, we create the graph object, add our nodes, edges, and labels, then draw a bad networkx plot while outputting our graph to a dot file.

```
# create graph object
G = nx.MultiDiGraph()
# nodes correspond to states
G.add_nodes_from(states_)
print(f'Nodes:\n{G.nodes()}\n')
# edges represent transition probabilities
for k, v in edges_wts.items():
tmp_origin, tmp_destination = k[0], k[1]
G.add_edge(tmp_origin, tmp_destination, weight=v, label=v)
print(f'Edges:')
pprint(G.edges(data=True))
pos = nx.drawing.nx_pydot.graphviz_layout(G, prog='dot')
nx.draw_networkx(G, pos)
# create edge labels for jupyter plot but is not necessary
edge_labels = {(n1,n2):d['label'] for n1,n2,d in G.edges(data=True)}
nx.draw_networkx_edge_labels(G , pos, edge_labels=edge_labels)
nx.drawing.nx_pydot.write_dot(G, 'pet_dog_markov.dot')
```

Now a look at the dot file.

Not bad. If you follow the edges from any node, it will tell you the probability that the dog will transition to another state. For example, if the dog is sleeping, we can see there is a 40% chance the dog will keep sleeping, a 40% chance the dog will wake up and poop, and a 20% chance the dog will wake up and eat.

Consider a situation where your dog is acting strangely and you wanted to model the probability that your dog's behavior is due to sickness or simply quirky behavior when otherwise healthy.

In this situation the **true **state of the dog is *unknown*, thus **hidden** from you. One way to model this is to *assume* that the dog has **observable** behaviors that represent the true, hidden state. Let's walk through an example.

First we create our state space - healthy or sick. We assume they are equiprobable.

```
# create state space and initial state probabilities
hidden_states = ['healthy', 'sick']
pi = [0.5, 0.5]
state_space = pd.Series(pi, index=hidden_states, name='states')
print(state_space)
print('\n', state_space.sum())
```

Next we create our transition matrix for the hidden states.

```
# create hidden transition matrix
# a or alpha
# = transition probability matrix of changing states given a state
# matrix is size (M x M) where M is number of states
a_df = pd.DataFrame(columns=hidden_states, index=hidden_states)
a_df.loc[hidden_states[0]] = [0.7, 0.3]
a_df.loc[hidden_states[1]] = [0.4, 0.6]
print(a_df)
a = a_df.values
print('\n', a, a.shape, '\n')
print(a_df.sum(axis=1))
```

This is where it gets a little more interesting. Now we create the **emission or observation** probability matrix. This matrix is size M x O where M is the number of hidden states and O is the number of possible observable states.

The emission matrix tells us the probability the dog is in one of the hidden states, given the current, observable state.

Let's keep the same observable states from the previous example. The dog can be either sleeping, eating, or pooping. For now we make our best guess to fill in the probabilities.

```
# create matrix of observation (emission) probabilities
# b or beta = observation probabilities given state
# matrix is size (M x O) where M is number of states
# and O is number of different possible observations
observable_states = states
b_df = pd.DataFrame(columns=observable_states, index=hidden_states)
b_df.loc[hidden_states[0]] = [0.2, 0.6, 0.2]
b_df.loc[hidden_states[1]] = [0.4, 0.1, 0.5]
print(b_df)
b = b_df.values
print('\n', b, b.shape, '\n')
print(b_df.sum(axis=1))
```

Now we create the graph edges and the graph object.

```
# create graph edges and weights
hide_edges_wts = _get_markov_edges(a_df)
pprint(hide_edges_wts)
emit_edges_wts = _get_markov_edges(b_df)
pprint(emit_edges_wts)
```

```
# create graph object
G = nx.MultiDiGraph()
# nodes correspond to states
G.add_nodes_from(hidden_states)
print(f'Nodes:\n{G.nodes()}\n')
# edges represent hidden probabilities
for k, v in hide_edges_wts.items():
tmp_origin, tmp_destination = k[0], k[1]
G.add_edge(tmp_origin, tmp_destination, weight=v, label=v)
# edges represent emission probabilities
for k, v in emit_edges_wts.items():
tmp_origin, tmp_destination = k[0], k[1]
G.add_edge(tmp_origin, tmp_destination, weight=v, label=v)
print(f'Edges:')
pprint(G.edges(data=True))
pos = nx.drawing.nx_pydot.graphviz_layout(G, prog='neato')
nx.draw_networkx(G, pos)
# create edge labels for jupyter plot but is not necessary
emit_edge_labels = {(n1,n2):d['label'] for n1,n2,d in G.edges(data=True)}
nx.draw_networkx_edge_labels(G , pos, edge_labels=emit_edge_labels)
nx.drawing.nx_pydot.write_dot(G, 'pet_dog_hidden_markov.dot')
```

The hidden Markov graph is a little more complex but the principles are the same. For example, you would expect that if your dog is eating there is a high probability that it is healthy (60%) and a very low probability that the dog is sick (10%).

Now, what if you needed to discern the health of your dog over time given a sequence of observations?

```
# observation sequence of dog's behaviors
# observations are encoded numerically
obs_map = {'sleeping':0, 'eating':1, 'pooping':2}
obs = np.array([1,1,2,1,0,1,2,1,0,2,2,0,1,0,1])
inv_obs_map = dict((v,k) for k, v in obs_map.items())
obs_seq = [inv_obs_map[v] for v in list(obs)]
print( pd.DataFrame(np.column_stack([obs, obs_seq]),
columns=['Obs_code', 'Obs_seq']) )
```

Using the **Viterbi** algorithm we can identify the most likely sequence of hidden states given the sequence of observations.

High level, the Viterbi algorithm increments over each time step, finding the **maximum** probability of any path that gets to state **i**at time **t**, that ** also** has the correct observations for the sequence up to time

The algorithm also keeps track of the state with the highest probability at each stage. At the end of the sequence, the algorithm will iterate backwards selecting the state that "won" each time step, and thus creating the most likely path, or likely sequence of hidden states that led to the sequence of observations.

```
# define Viterbi algorithm for shortest path
# code adapted from Stephen Marsland's, Machine Learning An Algorthmic Perspective, Vol. 2
# https://github.com/alexsosn/MarslandMLAlgo/blob/master/Ch16/HMM.py
def viterbi(pi, a, b, obs):
nStates = np.shape(b)[0]
T = np.shape(obs)[0]
# init blank path
path = np.zeros(T)
# delta --> highest probability of any path that reaches state i
delta = np.zeros((nStates, T))
# phi --> argmax by time step for each state
phi = np.zeros((nStates, T))
# init delta and phi
delta[:, 0] = pi * b[:, obs[0]]
phi[:, 0] = 0
print('\nStart Walk Forward\n')
# the forward algorithm extension
for t in range(1, T):
for s in range(nStates):
delta[s, t] = np.max(delta[:, t-1] * a[:, s]) * b[s, obs[t]]
phi[s, t] = np.argmax(delta[:, t-1] * a[:, s])
print('s={s} and t={t}: phi[{s}, {t}] = {phi}'.format(s=s, t=t, phi=phi[s, t]))
# find optimal path
print('-'*50)
print('Start Backtrace\n')
path[T-1] = np.argmax(delta[:, T-1])
#p('init path\n t={} path[{}-1]={}\n'.format(T-1, T, path[T-1]))
for t in range(T-2, -1, -1):
path[t] = phi[path[t+1], [t+1]]
#p(' '*4 + 't={t}, path[{t}+1]={path}, [{t}+1]={i}'.format(t=t, path=path[t+1], i=[t+1]))
print('path[{}] = {}'.format(t, path[t]))
return path, delta, phi
path, delta, phi = viterbi(pi, a, b, obs)
print('\nsingle best state path: \n', path)
print('delta:\n', delta)
print('phi:\n', phi)
```

Let's take a look at the result.

```
state_map = {0:'healthy', 1:'sick'}
state_path = [state_map[v] for v in path]
(pd.DataFrame()
.assign(Observation=obs_seq)
.assign(Best_Path=state_path))
```

By now you're probably wondering how we can apply what we have learned about hidden Markov models to quantitative finance.

Consider that the largest hurdle we face when trying to apply predictive techniques to asset returns is nonstationary time series. In brief, this means that the expected mean and volatility of asset returns changes over time.

Most time series models assume that the data is stationary. This is a major weakness of these models.

Instead, let us frame the problem differently. We know that time series exhibit temporary periods where the expected means and variances are stable through time. These periods or *regimes* can be likened to *hidden states*.

If that's the case, then all we need are observable variables whose behavior allows us to infer the true hidden state(s). If we can better estimate an asset's most likely regime, including the associated means and variances, then our predictive models become more adaptable and will likely improve. We can also become better risk managers as the estimated regime parameters gives us a great framework for better scenario analysis.

In this example, the observable variables I use are: the underlying asset returns, the Ted Spread, the 10 year - 2 year constant maturity spread, and the 10 year - 3 month constant maturity spread.

```
import pandas as pd
import pandas_datareader.data as web
import sklearn.mixture as mix
import numpy as np
import scipy.stats as scs
import matplotlib as mpl
from matplotlib import cm
import matplotlib.pyplot as plt
from matplotlib.dates import YearLocator, MonthLocator
%matplotlib inline
import seaborn as sns
import missingno as msno
from tqdm import tqdm
p=print
```

Using pandas we can grab data from Yahoo Finance and FRED.

```
# get fed data
f1 = 'TEDRATE' # ted spread
f2 = 'T10Y2Y' # constant maturity ten yer - 2 year
f3 = 'T10Y3M' # constant maturity 10yr - 3m
start = pd.to_datetime('2002-01-01')
end = pd.datetime.today()
mkt = 'SPY'
MKT = (web.DataReader([mkt], 'yahoo', start, end)['Adj Close']
.rename(columns={mkt:mkt})
.assign(sret=lambda x: np.log(x[mkt]/x[mkt].shift(1)))
.dropna())
data = (web.DataReader([f1, f2, f3], 'fred', start, end)
.join(MKT, how='inner')
.dropna()
)
p(data.head())
# gives us a quick visual inspection of the data
msno.matrix(data)
```

Next we will use the **sklearn's GaussianMixture **to fit a model that estimates these regimes. We will explore *mixture models * in more depth in part 2 of this series. The important takeaway is that mixture models implement a closely related unsupervised form of density estimation. It makes use of the expectation-maximization algorithm to estimate the means and covariances of the hidden states (regimes). For now, it is ok to think of it as a magic button for guessing the transition and emission probabilities, and most likely path.

We have to specify the number of components for the mixture model to fit to the time series. In this example the components can be thought of as regimes. We will arbitrarily classify the regimes as High, Neutral and Low Volatility and set the number of components to three.

```
# code adapted from http://hmmlearn.readthedocs.io
# for sklearn 18.1
col = 'sret'
select = data.ix[:].dropna()
ft_cols = [f1, f2, f3, 'sret']
X = select[ft_cols].values
model = mix.GaussianMixture(n_components=3,
covariance_type="full",
n_init=100,
random_state=7).fit(X)
# Predict the optimal sequence of internal hidden state
hidden_states = model.predict(X)
print("Means and vars of each hidden state")
for i in range(model.n_components):
print("{0}th hidden state".format(i))
print("mean = ", model.means_[i])
print("var = ", np.diag(model.covariances_[i]))
print()
sns.set(font_scale=1.25)
style_kwds = {'xtick.major.size': 3, 'ytick.major.size': 3,
'font.family':u'courier prime code', 'legend.frameon': True}
sns.set_style('white', style_kwds)
fig, axs = plt.subplots(model.n_components, sharex=True, sharey=True, figsize=(12,9))
colors = cm.rainbow(np.linspace(0, 1, model.n_components))
for i, (ax, color) in enumerate(zip(axs, colors)):
# Use fancy indexing to plot data in each state.
mask = hidden_states == i
ax.plot_date(select.index.values[mask],
select[col].values[mask],
".-", c=color)
ax.set_title("{0}th hidden state".format(i), fontsize=16, fontweight='demi')
# Format the ticks.
ax.xaxis.set_major_locator(YearLocator())
ax.xaxis.set_minor_locator(MonthLocator())
sns.despine(offset=10)
plt.tight_layout()
fig.savefig('Hidden Markov (Mixture) Model_Regime Subplots.png')
```

In the above image, I've highlighted each regime's daily expected mean and variance of SPY returns. It appears the 1th hidden state is our low volatility regime. Note that the 1th hidden state has the largest expected return and the smallest variance.The 0th hidden state is the neutral volatility regime with the second largest return and variance. Lastly the 2th hidden state is high volatility regime. We can see the expected return is negative and the variance is the largest of the group.

```
sns.set(font_scale=1.5)
states = (pd.DataFrame(hidden_states, columns=['states'], index=select.index)
.join(select, how='inner')
.assign(mkt_cret=select.sret.cumsum())
.reset_index(drop=False)
.rename(columns={'index':'Date'}))
p(states.head())
sns.set_style('white', style_kwds)
order = [0, 1, 2]
fg = sns.FacetGrid(data=states, hue='states', hue_order=order,
palette=scolor, aspect=1.31, size=12)
fg.map(plt.scatter, 'Date', mkt, alpha=0.8).add_legend()
sns.despine(offset=10)
fg.fig.suptitle('Historical SPY Regimes', fontsize=24, fontweight='demi')
fg.savefig('Hidden Markov (Mixture) Model_SPY Regimes.png')
```

Here is the SPY price chart with the color coded regimes overlaid.

In this post we've discussed the concepts of the Markov property, Markov models and hidden Markov models. We used the networkx package to create Markov chain diagrams, and sklearn's GaussianMixture to estimate historical regimes. In part 2 we will discuss mixture models more in depth. For more detailed information I would recommend looking over the references. Setosa.io is especially helpful in covering any gaps due to the highly interactive visualizations.

- https://en.wikipedia.org/wiki/Andrey_Markov
- https://www.britannica.com/biography/Andrey-Andreyevich-Markov
- https://www.reddit.com/r/explainlikeimfive/comments/vbxfk/eli5_brownian_motion_and_what_it_has_to_do_with/
- http://www.math.uah.edu/stat/markov/Introduction.html
- http://setosa.io/ev/markov-chains/
- http://www.cs.jhu.edu/~langmea/resources/lecture_notes/hidden_markov_models.pdf
- https://github.com/alexsosn/MarslandMLAlgo/blob/master/Ch16/HMM.py
- http://hmmlearn.readthedocs.io

**Motivating the Journey****Where Do Edges Come From?****The Problem with Traditional Research****The Hidden Side**

**A Brief Description:****Part 1 - A Visual Introduction to Hidden Markov Models with Python****Part 2 - Exploring Mixture Models with Scikit-Learn and Python****Part 3 - Predicting Market Bottoms with Scikit-Learn and Python**

Edges come from superior ability to identify and execute profitable strategies.

You can see this simply by imagining the first strategy able to identify pricing errors on identical items in different markets. This knowledge is valuable in two scenarios: you can execute the transaction yourself or you know someone who can and will pay you for the "signal".

Abstractly, a signal can be thought of as a glitch in the matrix allowing us a view through a window into probabilistic future states. Signals can come from anywhere and are not always understood.

Our job is to find these signals, vet them, and implement them. This is difficult in practice. The competitive environment we seek to understand is dynamic with positive and negative feedback loops operating at various scales. The system processes are very noisy making signal extraction confusing and difficult. Competitors are always seeking strategies that "work" until they don't.

Generally profitable edges stop working when both your identification and execution strategies are well known. Thus a profit motive for secrecy and obfuscation exists among participants. If you are familiar with poker this will sound very familiar.

This also means that using well known identification techniques puts you at a strategic disadvantage because your competitors have likely incorporated knowledge of your methods into their own strategies.

Therefore we must continuously search for strategies that are not well understood, not well known, or otherwise difficult for our competitors to implement.

Too much published "research" focuses on using well known statistical tools to draw conclusions that do not improve the odds of profitable investment. Worse still, many research papers' results are not reproducible.

For periods of time, techniques involving technical analysis, regression, and simple correlations, were good enough to beat the market. This worked because the methodology was not well known or well understood. Times have changed.

These methods have been taught and promoted to generations of practitioners. These techniques form the foundation of many market participants investment strategies. Therefore the majority of well known strategies are already in use by the market.

This means sophisticated participants have had time and opportunity to develop counter strategies to take advantage of the limitations of publicly known methods.

Typical business finance teachings focus on the theory that stock values are directly tied to the expected value of net cash flows produced by the underlying operating business from now into some future period. Other research links stock prices to any number of other observable factors. My perception is that these well taught methods can bias our exploratory research when it comes to the art and science of **prediction.**

Successful prediction does not require understanding or logic. Prediction does not require expertise in the industry or business which generated the data. These things can help solidify our belief in the power of the prediction, however successful prediction methods only require a stable, positive payoff function relative to prediction accuracy over an expanding time period. Nothing more, nothing less.

Rigid knowledge structures can blind us to potential opportunities. Using statistics to explore observable factors only, ignores the entire spectrum of hidden, unobserved factors influencing asset returns.

By definition a hidden factor is not directly observable. Its presence or influence is detected by its effect on observable factor(s) or on a delayed basis.

Conceptualizing the influence of hidden factors is difficult for many decision makers to either understand or incorporate into already existing processes.

The combination of bias created by traditional finance and difficulty conceptualizing hidden factors, creates the barriers to entry we need for successful strategy development. We can reasonably assume this research pathway is still rich with profitable edges and worth pursuing.

In part 1, we will discuss Markov Models, Hidden Markov Models and a toy application for regime detection.

In part 2, we will explore the motivation behind mixture models and how they improve on the weaknesses of K-means algorithms. We will also discuss the connection between Mixture Models and Hidden Markov Models. Finally we will extend our toy regime detector to use a mixture model instead.

In part 3, we will implement a toy strategy using mixture models to predict market bottoms. The strategy assumes that we can calibrate a model to predict the market return distribution such that actual returns that fall below the confidence intervals are profitable long entries over short time periods.

Post thumbnail picture taken from Bayesian Intelligence Slideshare presentation.

]]>