Air Quality in India Part 2:
Predicting Air Quality of Most Polluted Indian Cities
(December 2020)
Exploring through big data
by Eugenio Cedric Corro and Chester Patalud
sc
The following packages need to be installed everytime we create new EMR.
sc.install_pypi_package("pandas==0.25.1")
sc.install_pypi_package("matplotlib")
sc.install_pypi_package("seaborn")
sc.install_pypi_package("plotly.express")
sc.install_pypi_package("IPython")
import json
import numpy as np
import seaborn as sns
import pandas as pd
import plotly.express as px
from operator import itemgetter
import matplotlib.pyplot as plt
from datetime import datetime
from IPython.display import display
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import (StringIndexer,
VectorAssembler,
VectorIndexer)
from pyspark.ml.regression import (RandomForestRegressor,
LinearRegression,
GBTRegressor)
from IPython.display import HTML
HTML('''<script>
code_show=true;
function code_toggle() {
if (code_show){
$('div.input').hide();
} else {
$('div.input').show();
}
code_show = !code_show
}
$( document ).ready(code_toggle);
</script>
<form action="javascript:code_toggle()"><input type="submit" \
value="Click here to toggle on/off the raw code."></form>''')
An estimated seven million people were killed worldwide every year due to air pollution according to the World Health Organization (WHO). Most of its effects were heart and lung cancer deaths as reported in the IQAir study in 2015. Thus, knowing the air quality profile beforehand is important to avoid its impact. This study aims to develop a regression model that will predict the short term air quality of identified polluted cities in India as this country dominates the world's top 30 polluted cities based on IQAir AirVisual's 2019 World Air quality Report. The dataset was obtained on AWS Registry open data source. The one used comprised of physical air quality real time data from different cities in India in the period of October to December 2020. In the study, Amazon EMR service was used using the PySpark kernel.
The methodology involves data extraction, data preprocessing, exploratory data analysis, model development and model evaluation. Top cities per average concentration of pollutant were determined. From these identified cities, model will be developed to predict pollutant concentration.
Based on the results of the Exploratory Data Analysis, the city that was selected to model was Visakhapatnam
as this is the top city that has the highest CO
pollutant concentration. Among the models that was developed, Random Forest gave better performance than Gradient Boosting and Linear Regression as it is the fastest in terms of runtime (~approx. 12 minutes) and gave better root mean squared error of $909.276$ compared to $1.0055\times10^{24}$ and $1.2186\times10^{24}$ of Linear Regression and Gradient Boosting respectively.
We further recommend the following:
The model can also be tested in other cities per pollutant. This study only focuses on predictive model of CO
pollutant in Visakhapatnam
.
Add more features to be considered in training the models as the features used were only time variables. Concentration of other pollutants were not considered since these variables also need to be predicted. Such features that contributed to the release of pollutant might be considered such number of industries situated in the city and number of cars in a day.
Other regression models such as univariate time series models can be used. Autoregressive Integrated Moving Average (ARIMA) for prediction can be considered since it can work dealing only with concentration values.
Hyperparameters tuning can also be done to improve the models performance.
According to World Health Organization (WHO), air pollution kills an estimated seven million people worldwide every year. WHO data shows that 9 out of 10 people breathe air that exceeds WHO guideline limits containing high levels of pollutants, with low- and middle-income countries suffering from the highest exposures $^1$.
IQAir study shows tha air pollution worldwide caused 24 % of heart disease deaths, 23 % lung cancer deaths, 21 % stroke deaths and 19 % of cardiovascular deaths. $^2$ Furthermore, outdoor air pollution is an important risk factor for neurodevelopmental disorders in children and neurodegenartive diseases in adults. With these impacts of air pollution, forecasting air quality profile is important to lessen the effects, develop policies and strategies to reduce the severity of local pollution levels.
This study aim to develop a regression model that will predict the short term air quality of identified polluted cities in India. India was chosen as the country of analysis as 21 cities out of 30 of the world's top polluted cities in IQAir AirVisual's 2019 World Air Quality Report belong here $^3$ .
Forecasting air quality will be of value to the following:
India Environmental Board. As they will be able to proactively monitor air quality emissions in a certain area. Predicted non-compliance of air quality parameters before it will happen will lead to do necessary adjustments and investigations to lessen the potential environmental and health impact. They can also utilize it to establish procedures to reduce severity of local pollution levels.
Citizens. As early detection of non-compliance of pollutant parameters were done, early awareness to people would follow. Thus, people can take necessary precautions to lessen exposure.
The dataset was sourced out from AWS Registry Open Data Source https://registry.opendata.aws/openaq/. $^1$ It contains global aggregated physical air quality data from public data sources provided by government, research-grade and other sources. In this study, data used was from India in the period from October - December 2020. It contain values of different pollutant concentrations which were monitored real time.
%%bash
aws s3 ls s3://openaq-fetches/realtime/ --recursive | grep -v -E "(Bucket: |Prefix: |LastWriteTime|^$|--)" | awk 'BEGIN {total=0}{total+=$3}END{print total/1024/1024" MB"}'
%%bash
aws s3 ls s3://openaq-fetches/realtime/ --recursive | grep "2020-1*" | awk 'BEGIN {total=0}{total+=$3}END{print total/1024/1024" MB"}'
To provide an overview of the overall process, the following steps were taken:
Data will be extracted from AWS registry. Amazon EMR service was used using PySpark kernel.
OpenAQ data from the period of October, November and December 2020 was obtained to used in the study.
octo = sc.textFile('s3://openaq-fetches/realtime/2020-10-*/*')
nov = sc.textFile('s3://openaq-fetches/realtime/2020-11-*/*')
dec = sc.textFile('s3://openaq-fetches/realtime/2020-12-*/*')
data = sc.union([octo, nov, dec])
data.count()
data.map(json.loads).take(1)
df = spark.read.json(data)
df.printSchema()
df.createOrReplaceTempView("df_air")
After getting the entire dataset for periods of October to December 2020, we then obtain only those data from India by selecting IN
under country
column.
aq_india = spark.sql(
"""
SELECT date, city, parameter, value
FROM df_air
WHERE country == 'IN'
""")
aq_india.count()
aq_india.printSchema()
We then preprocessed it by separating the timestamps into month, day, hour and minute. The final dataset that will be used has the following description of each column:
city
: specific city location in India parameter
: it indicates the type of pollutant monitored. value
: the pollutant concentration readingdate_ts
: the date that the data was observedmonth
: month that it was monitoredday
: day that it was monitoredhour
: hour that it was monitoredminute
: minute that it was monitored#Extract timestamp
# Get only UTC time
aq_india = aq_india.withColumn('date', aq_india.date['utc'])
# Set `date_ts` as timestamp format of `date`
aq_india = aq_india.withColumn('date_ts',
to_timestamp("date",
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
# Extract time information
aq_india = aq_india.withColumn('month', month('date'))
aq_india = aq_india.withColumn('day', dayofmonth('date'))
aq_india = aq_india.withColumn('hour', hour('date'))
aq_india = aq_india.withColumn('minute', minute('date'))
aq_india = aq_india.drop('date')
The final dataset was shown below:
aq_india.show(5)
Table 1. Final dataset.
We first assess which cities have the highest average concentration per pollutant. We do this by getting the average value per city per pollutant in the entire duration.
aq_india.createOrReplaceTempView("aq_india2")
avg_values = spark.sql(
"""
SELECT
city,
parameter,
AVG(value) as avg_value
FROM aq_india2
GROUP BY city, parameter
ORDER BY avg_value DESC
""").toPandas()
The top cities per pollutant was show in the table below:
top_cities = (avg_values.groupby(['parameter'])[['city','avg_value']]
.apply(lambda x: x.nlargest(1, 'avg_value'))
.reset_index()
.drop('level_1', axis=1))
top_cities
Table 2. Top Cities per pollutant
city_list = tuple(set(top_cities.city))
city_list
top_cities
¶In preparation for model development, we only obtain data from the top polluted cities identified earlier. The sample dataset was shown below:
query = (
"SELECT * " +
"FROM aq_india2 " +
"WHERE city IN {} ".format(city_list) +
"ORDER BY date_ts")
aq_top = spark.sql(query)
aq_top.count()
aq_top.show()
Table 3. Top polluted cities dataset.
Below is the plot of top 5 cities per each type of pollutant:
def plot1(summary):
"""Plot graph of results summary."""
fig, ax = plt.subplots(2,3, figsize=(20,10), constrained_layout=True)
plt.suptitle("Top 5 Indian Cities with Highest Pollutant Concentration", fontsize=25)
for pol, r, c in zip(summary.parameter.unique(), [0,1,0,1,0,1], [0,1,2,0,1,2]):
sns.barplot(x='avg_value', y='city', data=summary[summary.parameter==pol], ax=ax[r,c])
ax[r,c].set_title("Pollutant: {}".format(pol), fontsize=18)
ax[r,c].set_xlabel("Concentration (in µg/m³)", fontsize=15)
ax[r,c].set_ylabel("Indian City", fontsize=15)
return fig
# return None
ax_1 = plot1(top_cities)
display(ax_1)
A regression model will be developed to predict the air pollutant concentration in a specified city. The train_test_split
function was used to separate the data for train and test. Data from October and November 2020 was used as the train set and whole December 2020 for test set.
def train_test_split(data, city, pollutant):
"""Return train and test sets."""
data.createOrReplaceTempView("data1")
# Get data of city and pollutant.
query = (
"SELECT date_ts, month, day, hour, minute, value " +
"FROM data1 " +
"WHERE city = '{}' ".format(city) +
"AND parameter = '{}' ".format(pollutant) +
"ORDER BY date_ts"
)
# Remove duplicates.
df = spark.sql(query).dropDuplicates()
# Get train set.
train = df[df.month.isin([9, 10, 11])]
# Get test set.
test = df.filter(df.month.isin([12]))
return (train, test)
top_cities
Table 4.Top cities per pollutant
For this study, we chose to make a predictive model for determining the CO
pollutant concentration for the city Visakhapatnam
.
train, test = train_test_split(data=aq_top,
city='Visakhapatnam',
pollutant='co')
After getting the split, we use VectorAssembler
to combine the list of columns into a single vector column using the data_indexing
function.
def data_indexing(data):
"""Return feature indexer."""
# Index features.
feature_indexer = Pipeline(stages=[
VectorAssembler(inputCols=['month',
'day',
'hour',
'minute'],
outputCol='features')
])
feature_indexer = feature_indexer.fit(data)
return feature_indexer
#Combining train and test dataset
train_test = train.union(test)
feature_indexer = data_indexing(data=train_test)
We use the model_predict
function in training and make predictions on the model that was developed. Three types of models were used in the study (Linear Regression, Random Forests and Gradient Boosting Method) and model's performance was evaluated by getting the Root Mean Squared Error
thru the get_rmse
function.
def model_predict(train, test, feature_indexer, model='lr'):
"""Return dataframe with predicted values."""
# Set regression models.
if model == 'lr':
mod = LinearRegression(featuresCol='features',
labelCol='value',
maxIter=50,
regParam=0.3,
elasticNetParam=0.8)
if model == 'rf':
mod = RandomForestRegressor(featuresCol="features",
labelCol='value',
numTrees=5,
maxDepth=3,
maxBins=1000,
seed=42)
if model == 'gbt':
mod = GBTRegressor(featuresCol="features",
labelCol='value',
maxDepth=2,
maxBins=1000,
maxIter=50,
seed=42)
pipeline = Pipeline(stages=[feature_indexer,
mod])
model = pipeline.fit(train)
predictions = model.transform(test)
return predictions
def get_rmse(predictions):
"""Display RMSE of prediction."""
evaluator = RegressionEvaluator(
labelCol="value",
predictionCol="prediction",
metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
return rmse
The following is the model implementation and evaluation in the three models:
lr_pred = model_predict(train=train,
test=test,
feature_indexer=feature_indexer,
model='lr')
# lr_pred.show()
# Model evaluation
lr_rmse = get_rmse(lr_pred)
# numTrees=5; maxDepth=2
rf_pred2 = model_predict(train=train,
test=test,
feature_indexer=feature_indexer,
model='rf')
rf_rmse2 = get_rmse(rf_pred2)
gbt_pred = model_predict(train=train,
test=test,
feature_indexer=feature_indexer,
model='gbt')
gbt_rmse = get_rmse(gbt_pred)
Comparing the results of model evaluation, Random Forest gave better performance among the two models as it has the fastest runtime and less root mean squared error (RMSE) as shown in table below. Although the value of 909.276 for RMSE is still high and not good, we can say that it is still better than the Linear Regression and Gradient Boosting which gave RMSE values at order of magnitude of 24 that is very high using base tuned parameters.
$$ $$Model | Run Time (minutes) | Root Mean Squared Error |
---|---|---|
Linear Regression | $$\text{approx. } 36$$ | $$1.0055\times10^{24}$$ |
Random Forest | $$\text{approx. } 12$$ | $$909.2760$$ |
Gradient Boosting | $$\text{approx. }18$$ | $$1.2186\times10^{24}$$ |
With these results, we have identified constraint in improving the model such as run time in terms of model implementation. Training the model takes time as well as tuning the hyperparameters which is caused by huge amount of data.
Below is the summary of statistics of the PySpark dataframe containing the features used in the model, the true values of the target variable (pollutant concentration/value
), and the predicted concentration values.
pred_stats = rf_pred2.describe()
pred_stats.show()
Table 6. Statistics of Best Prediction.
Looking at the value
(true value) column of the summary of statistics, it has a mean value of 1242.40. If we go back to our best RMSE, we got a value of 909.276. One can think of RMSE as a standard deviation of the predicted values from the actual values. Although the RMSE is not really that good, the prediction using the RF model is significantly better than the other two models. Also, the coefficient of variation of the prediction relative to the mean of the actual values is less than 100%. Still, the model can still be improved significantly.
Based on the result, we were able to develop a regressor model predicting air quality concentration of city in India specifically for CO pollutant in Visakhapatnam. Random Forest model gave better performance than Gradient Boosting and Linear Regression as it is the fastest in terms of runtime (~approx. 12 minutes) and gave better root mean squared error of $909.276$ compared to $1.0055\times10^{24}$ and $1.2186\times10^{24}$ of Linear Regression and Gradient Boosting respectively.
We further recommend the following:
The model can also be tested in other cities per pollutant. This study only focuses on predictive model of CO
pollutant in Visakhapatnam
.
Add more features to be considered in training the models as the features used were only time variables. Concentration of other pollutants were not considered since these variables also need to be predicted. Such features that contributed to the release of pollutant might be considered such number of industries situated in the city and number of cars in a day.
Other regression models such as univariate time series models can be used. Autoregressive Integrated Moving Average (ARIMA) for prediction can be considered since it can work dealing only with concentration values.
Hyperparameters tuning can also be done to improve the models performance.
[1] World health Organization. Air pollution. Retrieved from: https://www.who.int/health-topics/air-pollution#tab=tab_1
[2] IQAir. "Can Air pollution be predicted?". Retrieved from: https://www.iqair.com/us/blog/air-quality/can-air-pollution-be-predicted
[3] IQAir. "World's most polluted cities 2019 (PM2.5)". Retreived from: https://www.iqair.com/world-most-polluted-cities
[4] AWS OpenAq datasource: https://registry.opendata.aws/openaq/