This tutorial walks thru a machine learning pipeline. This example excludes the Extract component in the often referenced ETL (Extract, Transform, Learn) machine learning nomenclature. The overall goal of this analysis is to predict NDVI dynamics from soil and lagged precipitation, temperature, and vapor pressure deficit observations. The brief outline of the tutorial is:
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
import dask_jobqueue as jq
import dask
from dask import dataframe as ddf
from dask import array as da
import os
from dask.distributed import Client, wait
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from tqdm.notebook import tqdm
We will use dask-jobqueue to launch and scale a cluster. For a more detailed example of how this works, please see the other tutorials in the SCINet Geospatial 2020 Workshop. For a quick review, the workflow for defining a cluster and scaling is:
partition
)num_processes
).num_threads_per_process
)mem
), which is spread evenly between the dask workers.In this example, we are defining one process (dask worker) per sbatch script. Each process will have 40 cpus (an entire node). We then scale the cluster to 9 workers, which total 360 threads at 1.15TB of memory.
partition='short,brief-low'
num_processes = 1
num_threads_per_process = 40
mem = 3.2*num_processes*num_threads_per_process
n_cores_per_job = num_processes*num_threads_per_process
container = '/lustre/project/geospatial_tutorials/wg_2020_ws/data_science_im_rs_vSCINetGeoWS_2020.sif'
env = 'py_geo'
clust = jq.SLURMCluster(queue=partition,
processes=num_processes,
memory=str(mem)+'GB',
cores=n_cores_per_job,
interface='ib0',
local_directory='$TMPDIR',
death_timeout=30,
python="singularity exec {} /opt/conda/envs/{}/bin/python".format(container,env),
walltime='01:30:00',
job_extra=["--output=/dev/null","--error=/dev/null"])
cl=Client(clust)
cl
print('The Dask dashboard address is: /user/'+os.environ['USER']+'/proxy/'+cl.dashboard_link.split(':')[-1].split('/')[0]+'/status')
View Cluster Dashboard To view the cluster with the dask dashboard interaface click the dask icon on the left menu pane. Copy and paste the above dashboard address (in the form of /user/{User.Name}/proxy/{port#}/status) into the address bar. Then click on the "Workers", "Progress", "Task Stream", and "CPU" to open those tabs. Drag and arrange in convineint layout on right-hand side of the screen. Note these panes should be mostly blank as we have yet to scale the cluster, which is the next step below.
Dask Icon:
Scale the Cluster to 9 workers (40 cpus per worker). This may take 5-20 seconds to complete.
#scale the cluster
n_workers=9
clust.scale(n=n_workers*num_processes)
#Wait for the cluster to load, show progress bar.
with tqdm(total=n_workers*num_processes) as pbar:
while (((cl.status == "running") and (len(cl.scheduler_info()["workers"]) < n_workers*num_processes))):
pbar.update(len(cl.scheduler_info()["workers"])-pbar.n)
pbar.update(len(cl.scheduler_info()["workers"])-pbar.n)
cl
#Lets see the workers are running in SLURM
me = os.environ['USER']
!squeue -u $me
Link to data repository: https://hls.gsfc.nasa.gov/
Workflow:
Notice that the data is not read to memory. The only information stored is the "task graph" and metadata about the final results.
#Read the data with Xarray and rechunk
ndvi = xr.open_zarr('/lustre/project/geospatial_tutorials/wg_2020_ws/data/cper_hls_ndvi.zarr/').chunk({'x':20,'y':20,'date':-1})
ndvi
#Select relevant months and then convert to a dataframe
ndvi_df = ndvi.sel(date=ndvi['date.month'].isin([5,6,7,8,9])).to_dask_dataframe()
#Only include reasonable values (.1 < NDVI < 1.0) in the analysis
ndvi_df = ndvi_df[(ndvi_df.ndvi>.1)&(ndvi_df.ndvi<1.)]
print('There are '+f'{len(ndvi_df):,}'+' NDVI observations.')
ndvi_df
Paper Describing the Data: https://agupubs.onlinelibrary.wiley.com/doi/abs/10.1029/2018WR022797
Data Repository Source: http://hydrology.cee.duke.edu/POLARIS/PROPERTIES/v1.0/
Workflow:
soil = xr.open_zarr('/lustre/project/geospatial_tutorials/wg_2020_ws/data/polaris_soils.zarr/')
#Interpolate to the HLS NDVI grid
soil_df = soil.interp(x=ndvi.x,y=ndvi.y,method='linear').squeeze().to_dataframe().reset_index()
soil_df
PRISM Data in a CSV file. Note this data was queried at a single point at the center of CPER.
Workflow:
df_env = pd.read_csv('/lustre/project/geospatial_tutorials/wg_2020_ws/data/PRISM_ppt_tmin_tmean_tmax_tdmean_vpdmin_vpdmax_provisional_4km_20120101_20200101_40.8269_-104.7154.csv',
skiprows=10,
infer_datetime_format=True,
parse_dates = ['Date']).rename(columns={'Date':'date'}).set_index('date').sort_index(ascending=False)
df_env
Here we develop a class to merge the three dataset. Note the most import code is in the def transform
function.
#Costum transformer in the scikit-learn API syntax
class merge_dsets(BaseEstimator,TransformerMixin):
def __init__(self, df_soil, df_env,lag):
self.soil = df_soil
self.env = df_env
self.lag = lag
#self.lag_interv = lag_interval
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
df = X.copy()
df = df.merge(self.soil, on =['x','y'])
df_env_m = pd.DataFrame()
for i,d in enumerate(df.date.unique()):
df_env_temp = df_env[df_env.index<d+pd.Timedelta('1days')].resample('1W-'+d.day_name()[0:3].upper(),
label='right').agg({'ppt (mm)':'sum',
'tmean (degrees C)':'mean',
'vpdmin (hPa)':'mean',
'vpdmax (hPa)':'mean'}).sort_index(ascending=False).iloc[0:self.lag].reset_index().reset_index().rename(columns={'index':'week'})
df_env_temp = df_env_temp.drop(columns='date').melt(id_vars='week')
df_env_temp['col']='week'+df_env_temp.week.astype(str)+'_'+df_env_temp.variable.str.split(' ',expand=True).values[:,0]
df_env_temp = df_env_temp.set_index('col').drop(columns=['week','variable']).T
df_env_temp['date']=d
df_env_temp = df_env_temp.set_index('date',drop=True)
df_env_m = df_env_m.append(df_env_temp)
df = df.merge(df_env_m,left_on='date',right_index=True)
df['DOY'] = df.date.dt.dayofyear
return(df.drop(columns=['date','x','y','ndvi']),df[['ndvi']])#.to_dask_array(lengths=True))
The "learn" portion in the ETL pipeline.
from sklearn.pipeline import Pipeline
import xgboost as xgb
#from dask_ml.xgboost import XGBRegressor as dask_XGBRegressor
from dask_ml.model_selection import train_test_split
from sklearn.metrics import r2_score
from dask_ml.model_selection import GridSearchCV
from sklearn.model_selection import GridSearchCV as sk_GridSearchCV
import joblib
Shuffle and subset data to a managable size (e.g. will fit in memory when running 360 simaltaneous models). We will use a grid-search, combined with 3-fold cross validation, approach to optimize the relevant hyperparameters (see table below).
Hyperparameter | Grid | n |
---|---|---|
n_estimators | [150, 250, 300, 350] | 4 |
learning_rate | [0.05, 0.1, 0.2, 0.3] | 4 |
max_depth | [5, 7, 9, 11] | 4 |
colsample_bytree | [.1, .2, .3] | 3 |
gamma | [.05, .1, .2] | 3 |
A total of 1728 models (4 4 4 3 3 * 3) will be fit. The hyperparameters assocated with the best scoring model (highest R2) will be used to train the remianing data.
This search can take ~1-2 hour using 360 cores. To run the hyperparameter gridsearch cross validation, set the optimize_hyperparameter
variable to True
(see two cells below). If you leave as False
, we will skip the hyperparameter calculatoins, and just use the hyperparameter values previously calculated.
X_train_hyp, X = train_test_split(ndvi_df,
test_size=0.97,
shuffle=True,
random_state=34)
X_train_hyp,Y_train_hyp = dask.compute(*merge_dsets(df_soil=soil_df,
df_env=df_env,
lag=26).transform(X_train_hyp))
X_train_hyp
# Set to True if you want to run the Gridsearch. This can take >1.5 hrs. Therefore,
# if set to false, the results (best hyperparameters) hardcoded from a previous run
# of the model
optimize_hyperparameters = False
if optimize_hyperparameters:
#Define the grid - space
param_dist = {'n_estimators': [150,250,300,350],
'learning_rate': [0.05, 0.1, 0.2, 0.3],
'max_depth': [5, 7, 9, 11],
'colsample_bytree': [.1, .2, .3],
'gamma': [.05, .1, .2]}
#Define the XGBoost model
reg = xgb.XGBRegressor(n_jobs=1,verbosity=3)
#Setup the GridsearchCV function
gs = GridSearchCV(reg,param_dist,cv=3,scheduler=cl,refit=False,cache_cv=False)
#Fit all the models
gs.fit(X_train_hyp.values,Y_train_hyp.values)
#Get the best fitting parameters
df_params = pd.DataFrame(gs.cv_results_)
best_params = df_params[df_params.mean_test_score==df_params.mean_test_score.max()]
best_params = best_params.params.values[0]
print(best_params)
else:
#Best fit parameters from previous run
best_params = {'colsample_bytree': 0.2,
'gamma': 0.1,
'learning_rate': 0.05,
'max_depth': 7,
'n_estimators': 350}
print('Using the previously calculated parameters, which are:')
print(best_params)
dask.persist
to trigger the calculation (rather than dask.compute).# Split the data
X_train, X_test = train_test_split(X,
test_size=0.2,
shuffle=True)
#Merge the weather/soil data and persist the data across the cluster
[X_train,Y_train],[X_test,Y_test] = dask.persist(*[merge_dsets(df_soil=soil_df,df_env=df_env,lag=26).transform(X_train),
merge_dsets(df_soil=soil_df,df_env=df_env,lag=26).transform(X_test)])
wait([X_train,X_test,Y_train,Y_test])
X_train
#Setup the Distributed XGBoost model and train it on the "training" data
dtrain = xgb.dask.DaskDMatrix(cl, X_train, Y_train)
reg_b = xgb.dask.train(cl,
best_params,
dtrain,
num_boost_round=100,
evals=[(dtrain, 'train')])
print(reg_b)
#Get the R2 results for the testing data
dtest = xgb.dask.DaskDMatrix(cl, X_test)
pred = xgb.dask.predict(cl, reg_b['booster'], dtest)
reg_r2 = r2_score(Y_test.ndvi.compute().values,pred)
print("The overall R2 is: "+str(reg_r2))
#Big Data Plotting Libraries
import datashader as ds
import holoviews as hv
from holoviews.operation.datashader import datashade, shade, dynspread, rasterize
hv.extension('bokeh')
#Plot the results
Y_plotting = Y_test.compute()
Y_plotting['pred']=pred.compute()
Y_plotting
#To plot all the points, we need to rasterize the data (aka a 2d histogram)
pts_res = hv.Points(Y_plotting.values,label="")
rasterize(pts_res).redim.range(Count=(10, 2000)).opts(cmap='viridis',
tools=['hover'],
xlim=(0.15,.6),
ylim=(0.15,.6),
clipping_colors={'min': 'transparent'},
xlabel='HLS NDVI',
ylabel='Predicted NDVI',
logz=True)
Use the SHAP (SHapley Additive exPlanations) package to interpret the model results and better understand the features "driving" ndvi dynamics.
SHAP Papers: https://www.nature.com/articles/s42256-019-0138-9 and http://papers.nips.cc/paper/7062-a-unified-approach-to-interpreting-model-predictions
SHAP Blog: https://towardsdatascience.com/interpretable-machine-learning-with-xgboost-9ec80d148d27
#Standard approaches can render different results
#Show the top 20 most import features as defined by the XGBoost model
xgb.plot_importance(reg_b['booster'],max_num_features=20,importance_type='weight')
xgb.plot_importance(reg_b['booster'],max_num_features=20,importance_type='gain')
xgb.plot_importance(reg_b['booster'],max_num_features=20,importance_type='cover')
#Import the SHAP libraries
import shap
import matplotlib.pyplot as plt
shap.initjs()
#Split data into better manageable slices
X_shap, _= train_test_split(X_test,test_size=0.95,shuffle=True)
Apply SHAP Model: Below we split the data by month, and examine the effect of the features on the model (by month).
#Day of Year for each month
months = {'May':[121,152],
'June':[153,182],
'July':[183,2013],
'August':[214,244],
'September':[245,274]}
#Function for calculating SHAP values. We will map this function across the data on the cluster
def calc_shap_vals(block,explainer):
if len(block)>0:
block_vals = explainer.shap_values(block)
return(block_vals)
else:
return(np.empty((0,184)))
#Loop over each month and create plot
explainer = shap.TreeExplainer(reg_b['booster'])
for k in months.keys():
print(k)
start = months[k][0]
end = months[k][1]
#Select only the data in the month
X_shap1 = X_shap[(X_shap.DOY>=start)&(X_shap.DOY<=end)].repartition(npartitions=9).persist()
wait(X_shap1)
#Compute the SHAP values
shap_vals = X_shap1.to_dask_array(lengths=True).map_blocks(calc_shap_vals,explainer=explainer,dtype='float32').compute()
#Show the SHAP summary plots for each month
print('Using an array of size:' +str(shap_vals.shape))
plt.title(k)
shap.summary_plot(shap_vals, X_shap1.compute(),max_display=20,title=k)
shap_vals = X_shap.to_dask_array(lengths=True).map_blocks(calc_shap_vals,explainer=explainer,dtype='float32').compute()
shap_vals = shap_vals[~np.isnan(shap_vals).any(axis=1)]
shap.dependence_plot("week0_tmean", shap_vals, X_shap.compute(),interaction_index='DOY')
shap.dependence_plot("week0_ppt", shap_vals, X_shap.compute(),interaction_index='DOY')
shap.dependence_plot("week4_vpdmax", shap_vals, X_shap.compute(),interaction_index='DOY')