Do you think IIT Guwahati certified course can help you in your career?
No
Introduction
Have you ever wondered how airlines know what we like, when we travel or how they stay on schedule? Every flight, every reservation, and every passenger's travel generates extensive data(big data) in the quick-paced world of aviation that is as endless as the sky itself.
A method of gaining insight from big datasets of airline data is known as airline dataset analysis. This information can enhance customer service, marketing initiatives, and airline operations. In this article, we'll briefly examine how one can perform airline dataset analysis in big data to extract useful information.
Airline Dataset Analysis in Big Data
The process of collecting, preparing, and evaluating huge amounts of data about airlines, such as flight, passenger, and weather data, is known as airline dataset analysis. This information can be used to learn more about several topics, such as:
Customer experience
By personalising offers, predicting traveller behaviour, and responding to customer complaints more rapidly, airlines may use big data to enhance the customer experience.
Flight Operations
Big data can help airlines plan their flights more efficiently, consume less fuel, and have shorter delays.
Marketing and Sales
Airlines may use big data to target their advertising efforts and boost sales more precisely.
Risk Management
Airlines may utilise big data to recognise and reduce risks connected to machinery breakdowns, weather-related delays, and security concerns.
Technologies Used
The following Big Data technologies are frequently used in the analysis of airline datasets:
SQL: A relational database's data can be accessed and managed using SQL, a standard language.
Spark: Spark is a unified analytics engine for handling enormous amounts of data.
PySpark: PySpark is a Python API for Apache Spark, an open-source distributed general-purpose cluster computing system. Pyspark allows the processing of massive amounts of data in real-time and across a distributed environment.
Parquet: It is a columnar data storage format designed to support analytical queries.
PyFlink: PyFlink is a Python library for using Flink, a different, unified analytics engine for handling enormous amounts of data.
Airline Dataset Analysis
We will cover the step-by-step process of airline dataset analysis in big data. This process will include
Data Collection: Collecting data from diverse sources is the first stage. Flight schedules, ticket reservations, passenger information, and flight information may be included. Many other sources, including airline servers, governmental organisations, and third-party data suppliers, can be used to get the data.
Data Cleaning: The obtained data must frequently be cleaned before it can be analysed because it is often unclean. Errors, duplication, and missing values must be eliminated. It also entails formatting the data in a way that makes analysis simple.
Data Preprocessing: The cleaned data is then processed to extract insights. This could involve aggregation, normalisation, and statistical analysis. The goal is to identify patterns and trends in the data that can be used to make informed decisions.
Exploratory Data Analysis (EDA): Exploratory analysis should be done to comprehend the properties of the data. Analyses of the data distribution, summary statistics, and visualisations can all be used to spot trends, patterns, and anomalies.
Analysis: Analyse flight data to learn more about the efficiency of operations. Determine the causes of delays, cancellations, and on-time performance.
Sharing data: Stakeholders can receive access to the examined data to help in decision-making. Travel brokers, government representatives, and airline executives may fall under this category.
For this project on airline dataset analysis in big data, we are using 2015 flight delay and cancellation data. You can download the dataset from Kaggle. You can upload this file to your environment to start working on it.
We will start with importing important libraries and specifying our files' directories.
#Importing Libraries
import numpy as np
import pandas as pd
import os
# Specify the root directory where your data files are located
data_root = '/content/drive/MyDrive/Colab Notebooks' # You can update this path as per your folder structure
data_paths = {}
# Walk through the specified directory
for dirname, _, filenames in os.walk(data_root):
for filename in filenames:
data_paths[filename] = os.path.join(dirname, filename)
print(os.path.join(dirname, filename)) ## Any output that you write is saved in the current directory.
To deal with large amounts of data, we need to install Pyspark.
The command for installing Pyspark is as follows:
!pip install pyspark
Output:
Using Spark in Python
The first step of using Spark in Python is connecting to a cluster. The cluster will be on a remote machine linked to every other node. The cluster system consists of two major components:
Master: One machine, referred to as the master, will be in charge of managing the computations and data splitting.
Workers/Slaves: The remaining computers in the cluster, the workers, are connected to the master. The workers get data and instructions from the master to perform calculations, and they reply with the results.
It's easier to run a cluster locally while learning Spark. Therefore, our computations will be performed on DataCamp's servers in a simulated cluster rather than connecting to a different machine.
Simply creating an instance of the SparkContext class will establish the connection. The class constructor accepts a few optional parameters and arguments, allowing you to specify the characteristics of the cluster you're connecting to.
With the help of the SparkConf() constructor, an object containing all these properties may be generated. A Spark application can be configured using a SparkConf object, and to create a SparkConf object, we need the PySpark library's SparkConf() constructor.
The following is the syntax of the SparkConf() constructor:
SparkConf()
Note: This constructor creates a SparkConf object with the default parameters.
Here is the code for importing the SparkConf() constructor:
from pyspark import SparkContext, SparkConf
sc = SparkContext('local')
print(sc.version)
Output:
Using Spark DataFrames
Spark DataFrame is one of the most important part of Apache Spark, a framework for big data processing that is open-source and designed towards scalability and speed. A distributed set of data with named columns is called a Spark DataFrame. The Spark DataFrame was created to function very similarly to an SQL table.
You must first create a SparkSession object from your SparkContext to begin working with Spark DataFrames. Consider your connection to the cluster as the SparkContext and your interaction with it as the SparkSession.
Here is the code for importing the SparkSession class in PySpark:
You can start exploring your cluster's data once a SparkSession has been created.
# Print the catalog's tables.
print(spark.catalog.listTables())
We will get an empty list as output now. We need to connect a Spark cluster with a pandas DataFrame to obtain the output here.
Spark Cluster with a Pandas DataFrame
This method is covered in the SparkSession class. A pandas DataFrame is passed to the.createDataFrame() method, which returns a Spark DataFrame.
Here is the code:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))
# Creating the spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)
# Examining all the tables in the catalog
print(spark.catalog.listTables())
# Adding spark_temp to the catalog
spark_temp.createOrReplaceTempView('temp')
# Examining the tables in the catalog again
print(spark.catalog.listTables())
Output:
Explanation:
You must save the data as a temporary table, to access the data frame. We can do this with the help of the .createTempView() method. It is a Spark DataFrame method, which simply needs the name of the temporary table you want to register as a parameter.
The method .createOrReplaceTempView() securely generates a new temporary table if none previously existed and modifies any defined tables.
See the given diagram to understand the interaction between your Spark data structures.
Creating a DataFrame
Multiple methods for reading data from various sources into Spark DataFrames are available in the.read attribute of the SparkSession class. These allow you to generate a DataFrame from a.csv file in the same way that pandas DataFrames do.
# Read in the airports data using the Spark warehouse path
airports = spark.read.csv("airports.csv", header=True)
# Show the data
airports.show(10)
Output:
Now, we will read the flight data to print the table catalog. We need to add data into spark view for further analysis using SQL queries.
# Read in the airports data
flights = spark.read.csv("flights.csv", header=True)
# Show the data shape
print((flights.count(), len(flights.columns)))
# print the tables in catalog
print(spark.catalog.listTables())
# adding data into spark view for sql querying
flights.createOrReplaceTempView('flights')
# print the tables in catalog
print(spark.catalog.listTables())
Output:
You can also see all the columns in the data using this code:
# see all columns in the table
print(flights.columns)
Output:
Now, we are performing SQL Queries on Spark DataFrame to view different results.
Query 1: To get the first five rows of the flights table.
query = "SELECT AIRLINE, FLIGHT_NUMBER, TAIL_NUMBER, ORIGIN_AIRPORT, DESTINATION_AIRPORT, SCHEDULED_DEPARTURE FROM flights LIMIT 5"
# Get the first 5 rows of flights
flights5 = spark.sql(query)
# Show the results
flights5.show()
Output:
Query 2: To convert the data frame to a pandas data frame.
query = "SELECT ORIGIN_AIRPORT, DESTINATION_AIRPORT, COUNT(*) as N FROM flights GROUP BY ORIGIN_AIRPORT, DESTINATION_AIRPORT"
# Run the query
flight_counts = spark.sql(query)
# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()
# Print the head of pd_counts
print(pd_counts.head())
Output:
Column-wise Operations
The .withColumn() method is used in spark to perform column-wise operations. It takes two arguments:
A string with the name of your new column
The new column itself
A Spark DataFrame is immutable; updating one differs somewhat from working with a pandas dataframe. As a result, columns cannot be altered in place since they cannot be changed.
So, a new DataFrame is returned by each of these methods. You must reassign the returned DataFrame using the following technique to replace the original DataFrame:
data = data.withColumn("newCol", data.oldCol + 1)
Here is the code for overwriting an existing column:
# Create the DataFrame flights
flights = spark.table("flights")
# Add hours_duration
flights = flights.withColumn('hours_duration', flights.AIR_TIME/60.)
# Show the head
flights.select('hours_duration').show(10)
Output:
To filter values to clean data, here is an example code:
# Flights can be filtered by passing a string
long_flights1 = flights.filter("DISTANCE > 1000")
# By passing a column of boolean values, filter flights
long_flights2 = flights.filter(flights.DISTANCE > 1000)
Preprocessing Data
Now, we are preprocessing data and selecting features to train our model.
# Selecting the first set of columns
selected1 = flights.select('TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT',)
# Selecting the second set of columns
temp = flights.select(flights.ORIGIN_AIRPORT, flights.DESTINATION_AIRPORT, flights.AIRLINE)
temp.show()
Output:
Applying fiter to our data frame.
# Define first filter
filterA = flights.ORIGIN_AIRPORT == "SEA"
# Define second filter
filterB = flights.DESTINATION_AIRPORT == "PDX"
# Filter the data, first by filterA then by filterB
selected2 = temp.filter(filterA).filter(filterB)
Defining parameters for our feature creation:
# Define avg_speed
avg_speed = (flights.DISTANCE/(flights.AIR_TIME/60)).alias("avg_speed")
# Select the correct columns
speed1 = flights.select('TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', avg_speed)
# Create the same table using a SQL expression
speed2 = flights.selectExpr('TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', "DISTANCE/(AIR_TIME/60) as avg_speed")
We need to convert all our columns to integer data types or decimals. Here is the code to cast them to numeric values:
It is the process of modifying current data and generating new features to enhance the functionality of machine learning models.
Here, we are using aggregation methods.
# Shortest flight from PDX in terms of distance
flights.filter(flights.ORIGIN_AIRPORT == 'PDX').groupBy().min('DISTANCE').show()
# Longest flight from SEA in terms of air time
flights.filter(flights.ORIGIN_AIRPORT == 'SEA').groupBy().max('AIR_TIME').show()
Output:
We are applying group by function to two columns.
# Import pyspark.sql.functions as Ps
import pyspark.sql.functions as Ps
# cast to integer value
flights = flights.withColumn("DEPARTURE_DELAY", flights.DEPARTURE_DELAY.cast("integer"))
# Group by month and dest
by_month_dest = flights.groupBy('MONTH', 'DESTINATION_AIRPORT')
# Average departure delay by month and destination
by_month_dest.avg('DEPARTURE_DELAY').show(10)
# Standard deviation of departure delay
by_month_dest.agg(Ps.stddev('DEPARTURE_DELAY')).show(10)
Output:
# Group by tailnum
by_plane = flights.groupBy("TAIL_NUMBER")
# Number of flights made by each plane
by_plane.count().show(5)
# Group by origin
by_origin = flights.groupBy("ORIGIN_AIRPORT")
# Average duration of flights from PDX and SEA
by_origin.avg("AIR_TIME").show(5)
Output:
After modifying the data, the best practice is to view the data. Use the following code to view the data.
print(airports.columns)
# Examine the data
print(airports.show(5))
Output:
Applying left outer join to columns.
# Rename the column
airports = airports.withColumnRenamed("IATA_CODE", "DESTINATION_AIRPORT")
# Join the DataFrames
flights_with_airports = flights.join(airports , on = 'DESTINATION_AIRPORT', how = 'leftouter')
# Examine the new DataFrame
print(flights_with_airports.columns)
print(flights_with_airports.count())
It will print new columns and the data count in new columns, i.e., 103220.
# Read in the airports data
airlines = spark.read.csv("/airlines.csv", header=True)
# Show the data shape
print((airlines.count(), len(airlines.columns)))
airlines.show()
Output:
Machine Learning Models
Now, we will see machine learning pipelines and models to predict the flight delay. The Transformer and Estimator classes are the foundation of the PySpark.ml module, and we will use these classes here.
The .transform() function: A DataFrame can be transformed using transformer classes' .transform() function, which often adds a new column to the original DataFrame.
The .fit function: Each estimator class implements the .fit() function. These methods also accept a DataFrame, but they return a model object as opposed to another DataFrame.
# filtering columns for our model
data_model = flights.select('MONTH', 'DAY_OF_WEEK', 'AIRLINE', 'TAIL_NUMBER', 'DESTINATION_AIRPORT', 'AIR_TIME', 'DISTANCE', 'ARRIVAL_DELAY',)
# Removing missing values/ Data Cleaning
data_model = data_model.filter("ARRIVAL_DELAY is not NULL and AIRLINE is not NULL and AIR_TIME is not NULL and TAIL_NUMBER is not NULL")
# rows left
data_model.count()
Output:
# Create is_late (label) for our model
data_model = data_model.withColumn("is_late", data_model.ARRIVAL_DELAY > 0)
# casting to integer values
data_model = data_model.withColumn("is_late", data_model.is_late.cast("integer"))
# renaming the column
data_model = data_model.withColumnRenamed("is_late", 'label')
For proper working of our model we need to encode our categorical data. This includes two basic steps:
Step 1: Creating a StringIndexer is the initial step in encoding your categorised feature. Members of this class are Estimators, which take a column of strings from a DataFrame and map each one to a number.
The Estimator then gives back a Transformer, which takes a DataFrame, adds the mapping as metadata, and then gives back a new DataFrame with a numeric column corresponding to the string column.
Step 2: The next step is to use a OneHotEncoder to encode this numerical column as a one-hot vector. By first building an Estimator and then a Transformer, it functions just like the StringIndexer. Our category feature is finally encoded as a vector in a column that may be used by MLA(machine learning algorithms).
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
# Create a StringIndexer for our pipeline
airline_indexer = StringIndexer(inputCol="AIRLINE", outputCol="airline_index")
# Create a OneHotEncoder to map a column
airline_encoder = OneHotEncoder(inputCol="airline_index", outputCol="airline_fact")
# Create a StringIndexer
tail_indexer = StringIndexer(inputCol="TAIL_NUMBER", outputCol="tail_index")
# Create a OneHotEncoder
tail_encoder = OneHotEncoder(inputCol="tail_index", outputCol="tail_fact")
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="DESTINATION_AIRPORT", outputCol="dest_index")
# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")
The Pipeline's final step is to create a single column from all the columns that contain our features. Every Spark modelling function requires the data to be in this format, so this must be done before modelling can begin.
from pyspark.ml.feature import VectorAssembler
#This Transformer takes all of the columns you specify and combines them into a new vector column.
# Make a VectorAssembler of 'MONTH', 'DAY_OF_WEEK', 'AIR_TIME', 'DISTANCE', 'ARRIVAL_DELAY','AIRLINE', 'TAIL_NUMBER', 'DESTINATION_AIRPORT'
vec_assembler = VectorAssembler(inputCols=["MONTH", "DAY_OF_WEEK", "AIR_TIME", "DISTANCE", "airline_fact", "dest_fact", "tail_fact"], outputCol="features")
Creating Pipeline
In the pyspark.ml module, a class called Pipeline, collects all the Estimators and Transformers you've already made.
# Import Pipeline
from pyspark.ml import Pipeline
# Create the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, airline_indexer, airline_encoder, tail_indexer, tail_encoder, vec_assembler])
piped_data = flights_pipe.fit(data_model).transform(data_model)
train_data, test_data = piped_data.randomSplit([.7, .3])#Splitting training and testing data into 70% and 30%
print('data points(rows) in train data :', train_data.count())
print('data points(rows) in train data :', test_data.count())
Output:
Creating Model
Now, we will create a machine learning model to do the prediction. We are using a logistic regression model. You can also choose any other model.
# Importing LogisticRegression
from pyspark.ml.classification import LogisticRegression
# Creating a LogisticRegression Estimator
model = LogisticRegression()
Creating a Grid
For the purpose of cross-validation, you must establish a grid using the .addGrid() and .build() methods.
# Import the tuning submodule
import pyspark.ml.tuning as tune
# Create the parameter grid
grid = tune.ParamGridBuilder()
# Adding the hyperparameter
grid = grid.addGrid(model.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(model.elasticNetParam, [0, 1])
# Building the grid
grid = grid.build()
Cross-Validation
A statistical technique for assessing the efficiency of machine learning models is cross-validation. It is a resampling technique that uses several data subsets for model testing and training throughout various iterations. Here is the code:
# Import the evaluation submodule
import pyspark.ml.evaluation as ev
# Create a BinaryClassificationEvaluator
evaluator = ev.BinaryClassificationEvaluator(metricName="areaUnderROC")
Creating a Validator
A class called CrossValidator is included in the submodule pyspark.ml.tuning and is used for cross-validation. The modeller you want to fit, the grid of hyperparameters you established, and the evaluator you want to employ to compare your models are all inputs into this estimator.
# Create the CrossValidator
validator = tune.CrossValidator(estimator=model,
estimatorParamMaps=grid,
evaluator=evaluator)
Fit the Model
Now, the final step is to fit the model. We can fit various models to select the best one.
# Use the model to predict the test set
test_results = best_model.transform(test_data)
# Evaluate the predictions and print
print(evaluator.evaluate(test_results))
Output:
Frequently Asked Questions
How does Big Data affect the analysis of airline data?
Big Data improves airline data analysis by effectively managing massive datasets, revealing insights from flight operations, passenger behaviour, and safety measures, and ultimately improving operational effectiveness and consumer experiences.
How does using Big Data analytics improve the traveller experience?
A more personalised and effective travel experience is achieved by using Big Data analysis to assist airlines in optimising flight schedules, reducing delays, and customising services to client preferences.
What are typical difficulties observed while using big data to analyse airline data?
Difficulties include controlling real-time data volume and velocity, assuring data security and privacy, and creating algorithms for predictive maintenance and route optimisation.
Conclusion
In this article, we extensively discussed airline dataset analysis in big data. We created a logistic regression model to predict airline delays. This model can help to predict data that can enhance customer service, marketing initiatives, and airline operations.
We hope this article helps you. To read more about analysis in big data, you can visit more articles.