Analyzing Air Quality in India
(October 2020)
Exploring through big data
by Eugenio Cedric Corro and Chester Patalud
from IPython.display import HTML
HTML('''<script>
code_show=true;
function code_toggle() {
if (code_show){
$('div.input').hide();
} else {
$('div.input').show();
}
code_show = !code_show
}
$( document ).ready(code_toggle);
</script>
<form action="javascript:code_toggle()"><input type="submit" \
value="Click here to toggle on/off the raw code."></form>''')
import re
import json
import glob
import numpy as np
import seaborn as sns
import plotly.express as px
from operator import itemgetter
import matplotlib.pyplot as plt
from datetime import datetime
import dask.dataframe as dd
import dask.bag as db
import dask.array as da
An estimated seven million people were killed worldwide every year due to air pollution according to the World Health Organization (WHO). This study aims to determine the air quality profile of cities in India as this country dominates the world's top 30 polluted cities based on IQAir AirVisual's 2019 World Air quality Report. The dataset was obtained on AWS Registry open data source. The one used comprised of physical air quality real time data from different cities in India in the period of October 1-31, 2020.
The methodology involves data extraction, data preprocessing and exploratory data analysis. This includes visualizing the average monthly pollutant concentration of each Indian cities, determination of top cities per pollutant type, and discovering hourly and daily trends of concentrations in top polluted cities.
Thru air quality profiling, we were able to derive the following conclusions:
We then further recommend the following:
This data would be valuable to India Environmental board as this will be helpful in developing strategies and policies to be implemented regarding environmental impacts and for investors to manage their expectations and decide on possible business ventures.
According to World Health Organization (WHO), air pollution kills an estimated seven million people worldwide every year. WHO data shows that 9 out of 10 people breathe air that exceeds WHO guideline limits containing high levels of pollutants, with low- and middle-income countries suffering from the highest exposures $^1$. Thus, monitoring air quality concentrations is important.
On IQAir AirVisual's 2019 World Air Quality Report, 21 cities out of 30 of the world's top polluted cities were located in India. With this, this study aims to determine the air quality profile of Indian cities based on real time monitoring data from OpenAq.
Knowing the air quality profile in each of the cities will bring value to the following stakeholders:
The dataset was sourced out from AWS Registry Open Data Source https://registry.opendata.aws/openaq/. $^2$ It contains global aggregated physical air quality data from public data sources provided by government, research-grade and other sources. In this study, data used was from India in the period from October 1-31, 2020. It contain values of different pollutant concentrations which were monitored real time.
realtime
folder¶Below is the overall size of the realtime
folder under OpenAQ data in AWS.
! aws s3 ls s3://openaq-fetches/realtime/ --recursive | grep -v -E "(Bucket: |Prefix: |LastWriteTime|^$|--)" | awk 'BEGIN {total=0}{total+=$3}END{print total/1024/1024" MB"}'
We then selected only October 2020 data from the global OpenAQ dataset with an estimated total size of 24GB as shown in the cell below.
! aws s3 ls s3://openaq-fetches/realtime/ --recursive | grep "2020-10-*" | awk 'BEGIN {total=0}{total+=$3}END{print total/1024/1024" MB"}'
To provide an overview of the overall process, the following steps were taken:
Data was extracted using the following client:
from distributed import Client
client = Client('127.0.0.1:8786')
client
The global air quality data was first obtained for the period of October 2020.
df = db.read_text('s3://openaq-fetches/realtime/2020-10-*/*').map(json.loads)
df.count().compute()
df.take(5)
After obtaining the global data for October 2020, we obtained those that are from India.
air_india = (df.filter(lambda x : x['country'] =='IN')
.pluck(['date', 'city', 'parameter', 'value', 'unit', 'coordinates'], {})
).to_dataframe(columns=['date', 'city', 'parameter', 'value', 'unit', 'coordinates'])
air_india.head()
The dataset was preprocessed using the clean_data
function below and the final dataset that will be used has the following description of each column:
date
: the date that the data was observedtime
: time that it was monitoredcity
: specific city location in India parameter
: it indicates the type of pollutant monitored. value
: the pollutant concentration readingunit
: unit of pollutant concentrationlatitude
: latitude component of the locationlongitude
: longitude component of the ocationdef clean_data(data):
"""
Return cleaned dask.dataframe with `time` and `date` extracted
from current `date` column, and `latitude` and `longitude` from
`coordinates` column.
"""
# Extract time of the day in format HH:MM:SS
data['time'] = (data['date']
.apply(lambda x:
datetime.strptime(x['utc'],
'%Y-%m-%dT%H:%M:%S.%fZ')
.time(), meta=('x', 'f8')))
# Extract date in format yyyy-mm-dd
data['date'] = (data['date']
.apply(lambda x:
datetime.strptime(x['utc'],
'%Y-%m-%dT%H:%M:%S.%fZ')
.date(), meta=('x', 'f8')))
# Separate `coordinates` column to latitude and longitude columns
data['latitude'] = data['coordinates'].apply(lambda x: x['latitude'],
meta=('x', 'f8'))
data['longitude'] = data['coordinates'].apply(lambda x: x['longitude'],
meta=('x', 'f8'))
data = data[['date', 'time', 'city', 'parameter', 'value',
'unit', 'latitude', 'longitude', 'coordinates']]
return data
air_india = clean_data(air_india)
air_india.head()
The mean concentration for each pollutant for the month of October in different cities of India were obtained. It was then plotted in the figure below to visualize the concentration of each pollutant in the different cities.
def data_for_map(data):
"""Calculate mean pollutant concentration in a month per pollutant per city."""
data = (data.groupby(['city', 'parameter', 'latitude', 'longitude'])['value']
.aggregate('mean')
.reset_index())
data.value = data.value.apply(lambda x: 0 if x < 0 else int(x), meta=('x', 'f8'))
return data
def plot_map(data):
"""Show map visualization for pollutant concentration in cities in India."""
fig = px.scatter_mapbox(data, lat="latitude", lon="longitude",
color="value", size='value', hover_name='city',
animation_frame='parameter', height=600, zoom=4,
color_continuous_scale='balance',
title="AIR QUALITY PROFILE OF INDIA")
fig.update_mapboxes(style='open-street-map')
fig.show()
return None
# Calculate mean pollutant concentration in a month.
mean_data = data_for_map(air_india).compute()
# Show map visualization for pollutant concentration in cities in India.
plot_map(mean_data)
To further assess which cities mostly emit high concentrations of specific pollutant, we plot the top 5 cities per each type which can be seen in the figure below.
def avg_concentration(air_india, n=10):
"""
Return as pandas dataframe top n Indian cities in terms of pollutant concentration.
Columns are `parameter`, `city` and `value`.
PARAMETERS
----------
air_india : dask.DataFrame
Air quality data of Indian cities
n : int
Top n cities per pollutant concentration
RETURN
------
air_aggn : pandas.DataFrame
"""
# Calculate mean daily pollutant concentration per city and parameter variables
air_agg = air_india.groupby(['city', 'parameter'])['value'].aggregate('mean').reset_index()
# Get cities in the list
cities = air_agg.city.compute()
# Get top n cities in terms of pollutant concentration
air_aggn = (air_agg.groupby('parameter')['value']
.apply(lambda x: x.nlargest(n), meta=('x', 'f8'))
.reset_index()).compute()
air_aggn = air_aggn.rename(columns={'level_1' : 'city', 'x' : 'value'})
air_aggn.city = air_aggn.city.apply(lambda x: cities[x])
return air_aggn
worst5 = avg_concentration(air_india, n=5)
#worst5.head(5)
def plot1(summary):
"""Plot graph of results summary."""
fig, ax = plt.subplots(2,3, figsize=(20,10), constrained_layout=True)
plt.suptitle("Top 5 Indian Cities with Highest Pollutant Concentration", fontsize=25)
for pol, r, c in zip(summary.parameter.unique(), [0,1,0,1,0,1], [0,1,2,0,1,2]):
sns.barplot(x='value', y='city', data=summary[summary.parameter==pol], ax=ax[r,c])
ax[r,c].set_title("Pollutant: {}".format(pol), fontsize=18)
ax[r,c].set_xlabel("Concentration (in µg/m³)", fontsize=15)
ax[r,c].set_ylabel("Indian City", fontsize=15)
fig.show()
return None
plot1(worst5);
Based on this figures, we can see that the city of Greater Noida
is included on 4 out of 6 pollutants with high concentrations. This would mean that this city should be an area of focus for programs on air quality reduction.
Each country also has national ambient air quality standards set by their enviornmental board to define what is the allowable concentration in an ambient air to consider it to be good. In India, this is set by the Ministry of Environment and Forest (MoEF) and was defined on the Revised National Ambient Air Quality standards of 2009$^3$.
Comparing this allowable ANNUAL AVERAGE standards to the result of the profiling, the following obervations can be deduced:
After determining the top cities for each pollutant, we get the first rank for each pollutant and defined them as the top polluted cities. We then observe the hourly trend of different pollutants in this cities as seen in the figure below.
def avg_hourly(data, ct_list):
"""
Return as dask.dataframe the average hourly concentration per
pollutant.
"""
# Get data of 'cities'
data = data.loc[data['city'].isin(ct_list)].copy()
# Extract 'hour' from `time` column
data.time = data.time.apply(lambda x: x.hour, meta=('x', 'f8'))
# Calculate average hourly concentration per pollutant
result = (data.groupby(['city', 'time', 'parameter'])['value']
.aggregate('mean')
.reset_index())
return result
def get_top_n(worst5, top_n=3):
"""
Return top top_n cities in the list of polluted cities
based on any of the pollutants.
"""
ct_list = (worst5.groupby('parameter')['city'].agg(list)
.reset_index()
.city
.apply(lambda x: set(x[0:top_n])).tolist())
print("Most polluted Indian cities")
print("============================")
for ct in ct_list:
print(ct)
return list(set.union(*ct_list))
def plot2(data):
"""Plot average hourly concentration for a list of cities."""
g = sns.relplot(x="time", y="value", data=data, hue='city',
col="parameter", style='city', markers=True,
dashes=False, ci='sd', col_wrap=2, kind="line",
facet_kws={'sharey': False, 'sharex': True})
g.fig.suptitle("Average Hourly Pollutant Concentration", size=20)
(g.set_axis_labels("Time (from 0th to 23rd hour)",
"Pollutant Concentration (in µg/m³)", fontsize=14)
.set_titles("Pollutant: {col_name}", size=18)
.tight_layout(w_pad=10))
g._legend.set_title('Indian City')
plt.show()
return None
# Get unique cities on the top 3 of list of most polluted cities
ct_list = get_top_n(worst5, top_n=1)
# Get average hourly concentration per pollutant.
hourly_data = avg_hourly(air_india, ct_list).compute()
# Plot hourly pollutant concentration.
plot2(hourly_data)
By comparing it HOURLY AVERAGE allowable standards for each pollutant as defined by Revised National Ambient Air Quality Standards for industrial, residential and rural areas the following can be observed:
def avg_daily(data, ct_list):
"""
Return as dask.dataframe the average daily concentration per
pollutant.
"""
# Get data of 'cities'
data = data.loc[data['city'].isin(ct_list)].copy()
# Extract day of the month from date
data['day'] = data.date.apply(lambda x: x.day, meta=('x', 'f8'))
# Calculate average daily concentration per pollutant per city
result = (data.groupby(['city', 'day', 'parameter'])['value']
.aggregate('mean')
.reset_index())
return result
def plot3(data):
"""Plot average daily concentration for a list of cities."""
g = sns.relplot(x="day", y="value", data=data, hue='city',
col="parameter", style='city', markers=True,
dashes=False, ci='sd', col_wrap=2, kind="line",
facet_kws={'sharey': False, 'sharex': True})
g.fig.suptitle("Average Daily Pollutant Concentration", size=20)
(g.set_axis_labels("Day of the Month",
"Pollutant Concentration (in µg/m³)", fontsize=14)
.set_titles("Pollutant: {col_name}", size=18)
.tight_layout(w_pad=10))
g._legend.set_title('Indian City')
plt.show()
return None
# Get average daily concentration per pollutant per city.
daily_data = avg_daily(air_india, ct_list).compute()
# Plot daily pollutant concentration.
plot3(daily_data)
By comparing it 24-hours AVERAGE allowable standards for each pollutant as defined by Revised National Ambient Air Quality Standards for industrial, residential and rural and other areas the following can be observed:
Based on the results of the exploratory data analysis thru air quality profiling using the OpenAq datasets from the period of October 2020 in cities of India, we arrive on the following conclusions:
We further recommend the following:
[1] World health Organization. Air pollution. Retrieved from: https://www.who.int/health-topics/air-pollution#tab=tab_1
[2] AWS OpenAq datasource: https://registry.opendata.aws/openaq/
[3]'Revised National Ambient Air Quality Standards of India' Retrieved from: https://www.ksndmc.org/Uploads/Pollution.pdf
! jupyter nbconvert --to html mp2_openaq_final.ipynb