ScaDaMaLe Course site and book

Intrusion Detection

Group Project Team:

  • MohamedReza Faridghasemnia
  • Javad Forough
  • Quantao Yang
  • Arman Rahbar

**Video **

https://chalmersuniversity.box.com/s/hbfurlolj3ax8aarxow0fdcuzjxeo206

**Dataset Source **

https://www.unsw.adfa.edu.au/unsw-canberra-cyber/cybersecurity/ADFA-NB15-Datasets/

Problem Definition

With the evolution and pervasive usage of the computer networks and cloud environments, Cyber-attacks such as Distributed Denial of Service (DDoS) become major threads for such environments. For an example, DDoS attacks can prohibit normal usage of the web services through saturating their underlying system’s resources and even in most recent type of attacks namely Low-rate DDoS attacks, they drop the Quality of Service (QoS) of the cloud service providers significantly and bypass the detection systems by behaving similar to the normal users. Modern networked business environments require a high level of security to ensure safe and trusted communication of information between various organizations and to counter such attacks. An Intrusion Detection System is the foremost important step against such threads happening in the network and act as an adaptable safeguard technology for system security after traditional technologies fail. As the network attacks become more sophisticated, it is crucial to equip the system with the state-of-the-art intrusion detection systems. In this project, we investigate different types of learning-based Intrusion detection systems and evaluate them based on different metrics on a large benchmark dataset in a distributed manner using Apache Spark, which is an open-source distributed general-purpose cluster-computing framework.

Loading and Preprocessing Data

For detecting intrusion in the network, we use a dataset named UNSW-NB15, a collection of network traffic data collected by Australian Centre for Cyber Security (ACCS).

UNSW-NB15 Testbed UNSW-NB15 Testbed. Image from UNSW-NB15 website

The raw data of UNSW-NB15 Dataset is a pcap file of network traffic with the size of 100gb, that 49 features (including labels) is extracted from the dataset using Argus, Bro-IDS tools and twelve other algorithms. The extracted features desription is given below.

No.NameTypeDescription
0srcipnominalSource IP address
1sportintegerSource port number
2dstipnominalDestination IP address
3dsportintegerDestination port number
4protonominalTransaction protocol
5statenominalIndicates to the state and its dependent protocol, e.g. ACC, CLO, CON, ECO, ECR, FIN, INT, MAS, PAR, REQ, RST, TST, TXD, URH, URN, and (-) (if not used state)
6durFloatRecord total duration
7sbytesIntegerSource to destination transaction bytes
8dbytesIntegerDestination to source transaction bytes
9sttlIntegerSource to destination time to live value
10dttlIntegerDestination to source time to live value
11slossIntegerSource packets retransmitted or dropped
12dlossIntegerDestination packets retransmitted or dropped
13servicenominalhttp, ftp, smtp, ssh, dns, ftp-data ,irc and (-) if not much used service
14SloadFloatSource bits per second
15DloadFloatDestination bits per second
16SpktsintegerSource to destination packet count
17DpktsintegerDestination to source packet count
18swinintegerSource TCP window advertisement value
19dwinintegerDestination TCP window advertisement value
20stcpbintegerSource TCP base sequence number
21dtcpbintegerDestination TCP base sequence number
22smeanszintegerMean of the ?ow packet size transmitted by the src
23dmeanszintegerMean of the ?ow packet size transmitted by the dst
24trans_depthintegerRepresents the pipelined depth into the connection of http request/response transaction
25resbdylenintegerActual uncompressed content size of the data transferred from the server’s http service.
26SjitFloatSource jitter (mSec)
27DjitFloatDestination jitter (mSec)
28StimeTimestamprecord start time
29LtimeTimestamprecord last time
30SintpktFloatSource interpacket arrival time (mSec)
31DintpktFloatDestination interpacket arrival time (mSec)
32tcprttFloatTCP connection setup round-trip time, the sum of ’synack’ and ’ackdat’.
33synackFloatTCP connection setup time, the time between the SYN and the SYN_ACK packets.
34ackdatFloatTCP connection setup time, the time between the SYN_ACK and the ACK packets.
35issmips_portsBinaryIf source (1) and destination (3)IP addresses equal and port numbers (2)(4) equal then, this variable takes value 1 else 0
36ctstatettlIntegerNo. for each state (6) according to specific range of values for source/destination time to live (10) (11).
37ctflwhttp_mthdIntegerNo. of flows that has methods such as Get and Post in http service.
38isftploginBinaryIf the ftp session is accessed by user and password then 1 else 0.
39ctftpcmdintegerNo of flows that has a command in ftp session.
40ctsrvsrcintegerNo. of connections that contain the same service (14) and source address (1) in 100 connections according to the last time (26).
41ctsrvdstintegerNo. of connections that contain the same service (14) and destination address (3) in 100 connections according to the last time (26).
42ctdstltmintegerNo. of connections of the same destination address (3) in 100 connections according to the last time (26).
43ctsrc ltmintegerNo. of connections of the same source address (1) in 100 connections according to the last time (26).
44ctsrcdport_ltmintegerNo of connections of the same source address (1) and the destination port (4) in 100 connections according to the last time (26).
45ctdstsport_ltmintegerNo of connections of the same destination address (3) and the source port (2) in 100 connections according to the last time (26).
46ctdstsrc_ltmintegerNo of connections of the same source (1) and the destination (3) address in in 100 connections according to the last time (26).
47attack_catnominalThe name of each attack category. In this data set , nine categories e.g. Fuzzers, Analysis, Backdoors, DoS Exploits, Generic, Reconnaissance, Shellcode and Worms
48Labelbinary0 for normal and 1 for attack records

The data is accessible from the dataset source.

We used Spark csv for reading the dataset. In the following cell a spark dataframe from csv is created. In this dataset, the last label is the label, indicating whether an attack happened or not. The problem is a binary classification problem, which the machine learning algorithm has to predict the attack record label.

# load csv with pyspark
# File location and type
file_location = "/FileStore/tables/IDdataset.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

Removing unnecessary data

In the dataaset, ip and port of source and destination are not useful, so we drop those columns.

#dropping ip and port of source and destination 
df= df.drop("_c0","_c1","_c2", "_c3", "_c47")

display(df)

Numerization

Now we have to change categorical data (that are columns 4,5,13) to number, that is called ordinal encoding, and we can do it by StringIndexer.

The next step is to convert all columns types to Double. It is neccesssary, as it seems pyspark returned a string dataframe from csv, and doesnot change data types for numbers.

#handling categorical data
from pyspark.ml.feature import StringIndexer


indexer = StringIndexer(inputCols=["_c4", "_c5", "_c13"], outputCols=["c4", "c5", "c13"])

dff = indexer.fit(df).transform(df)

dff = dff.drop("_c4", "_c5", "_c13")


#changing type to double
from pyspark.sql.types import DoubleType


for col in dff.columns:
  dff = dff.withColumn(col, dff[col].cast(DoubleType()))

Handling null values

Check which cells have null values

One another step in preprocessing is to check if the data has null values. For this, we use .isNull() over rows of each column, and count null values in each column.

#check if it has missing data

#showing number of null data  
from pyspark.sql.functions import isnan, when, count, col

for cl in dff.columns:
  dff.select([count(when(col(cl).isNull(),True))]).show()

Filling null values

We noticed that column 37(ctflwhttpmthd) has 1348145, column 38(isftplogin) has 1429879, and column 39(ctftp_cmd) has 1429879 null values. So we fill them by using Imputer function of pyspark. This function fill the missing values with the mean of the column.

#handling null data

from pyspark.ml.feature import Imputer

dff= Imputer(inputCols= dff.columns, outputCols=dff.columns).fit(dff).transform(dff)

#for cl in dff.columns:
#  dff.select([count(when(col(cl).isNull(),True))]).show()

Creating dataset

For creating dataset, we take the following steps:

Vectorizing

We firstly have to put all features into one big vector, using VectorAssembler. We take all the columns of data, except the first 4 columns(are irrelevant) an the last two (are the labels) into "features" column. Notice that VectorAssembler generates either Sparse, or Dense vectors, in favour of the memory.

Normalization

Next, we normalize data that is vectorized in one column. For this dataset, VEctorAssembler returned a sparse vector, and we chose a normalizer that is compatible with sparse vectors. So we used ml.feature.Normalizer for normalizing data.

Sparse to Dense

After normalization, we convert the sparse vectors of features to dense vectors. For this we defined a UDF function.

Selecting columns of features and labels

Finally, we select two columns in the dataframe to use in further steps, that are "labels", and "features".

from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.feature import MinMaxScaler


from pyspark.sql import functions as F
from pyspark.ml.linalg import SparseVector, DenseVector,VectorUDT, Vector
from pyspark.sql import types as T
from pyspark.ml.feature import Normalizer


numCols = ['c4', 'c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', 'c13', '_c14', '_c15', '_c16', '_c17', '_c18', '_c19', '_c20', '_c21', '_c22', '_c23', '_c24', '_c25', '_c26', '_c27', '_c28', '_c29', '_c30', '_c31', '_c32', '_c33', '_c34', '_c35', '_c36', '_c37', '_c38', '_c39', '_c40', '_c41', '_c42', '_c43', '_c44', '_c45', '_c46']


dfff = VectorAssembler(inputCols=numCols, outputCol="features").transform(dff)


nrm = Normalizer(inputCol="features", outputCol="features_norm", p=1).transform(dfff)



def sparse_to_array(v):
  v = DenseVector(v)
  new_array = list([float(x) for x in v])
  return new_array


sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
featArr = nrm.withColumn('featuresArray', sparse_to_array_udf('features_norm'))


featArr=featArr.withColumnRenamed("_c48", "labels")


trainSet = featArr.select('labels', "featuresArray")

display(trainSet)

Preparing the DataFrame for training classifers

In order to use a dataframe for training the classifiers in the Spark ML Library, we should have a particular format. Specifically, we need to have a single columns for all features and another column for the labels. In this section we first create the the desired format and then use the resulting dataframe for training different classifiers.

Based on the DataFrame created in the preprocessing step (trainSet), we first create an rdd from all available columns (featurestoassemble). To this end, we map all rows in trainSet using the method "functionForRdd".

features_to_assemble = []
for f in range(2,45):
  features_to_assemble.append('_'+str(f))
print(features_to_assemble)
['_2', '_3', '_4', '_5', '_6', '_7', '_8', '_9', '_10', '_11', '_12', '_13', '_14', '_15', '_16', '_17', '_18', '_19', '_20', '_21', '_22', '_23', '_24', '_25', '_26', '_27', '_28', '_29', '_30', '_31', '_32', '_33', '_34', '_35', '_36', '_37', '_38', '_39', '_40', '_41', '_42', '_43', '_44']
# random forest implementation
def functionForRdd(r):
  l = []
  l.append(r[0])
  l = l+list(r[1])
  return l

trainSetRdd = trainSet.rdd.map(functionForRdd)
randomForestDf = trainSetRdd.toDF()

Using VectorAssembler for creating the DataFrame

The best option to create the required format, is using VectorAssembler. Calling the transform function of the assembler object gives us a new DataFrame which includes a new columns called "features". This column together with the labels column ("_1") will be used for training. We also divide our data into train and test in the cell below.

assembler = VectorAssembler(
    inputCols=features_to_assemble,
    outputCol="features")

randomForestDf = assembler.transform(randomForestDf)
train, test = randomForestDf.randomSplit([0.7, 0.3], seed = 2018)
newtrainSet = train.sample(fraction=0.00001)

RandomForestClassifier

The first classifier we use is the RandomForestClassifier avaiable in Spark ML. As mentioned before this classifier requires a single columns for all attributes (features) and a label column (_1). We specify these columns before training and then we use the method fit to train the classifer.

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = '_1')
rfModel = rf.fit(train)

Evaluation

In this part we use the BinaryClassificationMetrics and MulticlassMetrics for evaluating our classifiers. First we need to use our trained model to predict the labels in the test DataFrame. To this aim, we use the transform method. This method add some columns to the test DataFrame. We use the prediction columns which is the predicted classes for data pionts in our test set. BinaryClassificationMetrics and MulticlassMetrics require an rdd of (prediction, truelabel) tuples. We create this rdd using the map function on prediction (rdd) and then calculate different metrics.

from pyspark.mllib.evaluation import BinaryClassificationMetrics

prediction = rfModel.transform(test)
predictionAndLabels = prediction.rdd.map(lambda r: (r.prediction, r._1))

metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

from pyspark.mllib.evaluation import MulticlassMetrics
metrics2 = MulticlassMetrics(predictionAndLabels)
print("Precions = %s" % metrics2.precision(1.0))
print("Recall = %s" % metrics2.recall(1.0))
Area under ROC = 0.9893749378235688
Precions = 0.921533690334014
Recall = 0.9909789543458447
print("Accuracy = %s" % metrics2.accuracy)
Accuracy = 0.9881770066509254

Logistic Regression classifier

We have trained and tested the Logistic Regression classifer on our training and testing set respectively as follows:

#Logistic Regression Classification
from pyspark.ml.classification import LogisticRegression
logr = LogisticRegression(featuresCol = 'features', labelCol = '_1')
logrmodel = logr.fit(train)
from pyspark.mllib.evaluation import BinaryClassificationMetrics

lr_prediction = logrmodel.transform(test)

Logistic Regression classifier evaluation

We have evaluated the Logistic Regression classifier using binary and also multi-class evaluation metrics as follows:

#Logistic Regression Evaluation
lr_predictionAndLabels = lr_prediction.rdd.map(lambda r: (r.prediction, r._1))


metrics = BinaryClassificationMetrics(lr_predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

from pyspark.mllib.evaluation import MulticlassMetrics
metrics2 = MulticlassMetrics(lr_predictionAndLabels)
print("Precions = %s" % metrics2.precision(1.0))
print("Recall = %s" % metrics2.recall(1.0))
Area under ROC = 0.9814394501866226
Precions = 0.9305169153391484
Recall = 0.9734132902477421

Gradient-Boosted Trees (GBTs) classifier

We have trained and tested the GBTs classifer on our training and testing set respectively as follows:

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol = 'features', labelCol = '_1', maxDepth=5)
gbtmodel = gbt.fit(train)
from pyspark.mllib.evaluation import BinaryClassificationMetrics

gbt_prediction = gbtmodel.transform(test)

Gradient-Boosted Trees (GBTs) classifier evaluation

We have evaluated the GBTs classifier using binary and also multi-class evaluation metrics as follows:

gbt_predictionAndLabels = gbt_prediction.rdd.map(lambda r: (r.prediction, r._1))

metrics = BinaryClassificationMetrics(gbt_predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

from pyspark.mllib.evaluation import MulticlassMetrics
metrics2 = MulticlassMetrics(gbt_predictionAndLabels)
print("Precions = %s" % metrics2.precision(1.0))
print("Recall = %s" % metrics2.recall(1.0))
Area under ROC = 0.9801886786459773
Precions = 0.962630715074687
Recall = 0.9658111691109454
# import sys
# # !{sys.executable} -m pip install tensorflow
# # !{sys.executable} -m pip uninstall keras
# # !{sys.executable} -m pip install keras
# # !{sys.executable} -m pip install dist-keras
# # !{sys.executable} -m pip install elephas


# import keras
# from keras.optimizers import *
# from keras.models import Sequential
# from keras.layers import Dense, Dropout, Activation

# from distkeras.trainers import *
# from distkeras.predictors import *
# from distkeras.transformers import *
# from distkeras.evaluators import *
# from distkeras.utils import *

# from pyspark.ml.linalg import Vectors
# from elephas.utils.rdd_utils import to_simple_rdd
# from elephas.spark_model import SparkModel

# trainSett = trainSet.rdd.map(lambda row: Row(
#     labels=row["labels"], 
#     featuresArray=Vectors.dense(row["featuresArray"])
# )).toDF()

# inpDim= len(trainSett.select("featuresArray").first()[0])

# inpDim= len(train.select("features").first()[0])

# model = Sequential()
# model.add(Dense(128, input_dim = inpDim,activation='relu',use_bias=True))
# model.add(Dropout(0.5))
# model.add(Dense(1,activation='sigmoid',use_bias=True)) 
# model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
# print("compile done")
# # model.build()
# print(model.summary())
# trainer = SingleTrainer(keras_model=model, worker_optimizer='adam', loss='binary_crossentropy', num_epoch=1)
# print("trainer done")  
# trained_model = trainer.train(train)#newtrainSet
# print("training done")
# predictor = ModelPredictor(keras_model=trained_model)
# ff=predictor.predict(test.take(50)[15].features)

Multilayer Perceptron Classifier

MLPC consists of multiple layers of nodes. Each layer is fully connected to the next layer in the network. Following code snippet is the implementation of such a model in pyspark. The input layer and the output layer have the size of 43 and 2, with two hidden layers of 5 and 4 neurons respectively.

# Multilayer Perceptron Classifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# specify layers for the neural network:
# input layer of size 43 (features), two intermediate of size 5 and 4
# and output of size 2 (classes)
layers = [43, 5, 4, 2]

# create the trainer and set its parameters
mlpc = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234, featuresCol = 'features', labelCol = '_1')

# train the model
mlpcModel = mlpc.fit(train)

Finally, a trained MLPC model is returned that is ready to evaluate on the test data.

# compute accuracy on the test set
testLabeled = test.withColumnRenamed( '_1', "label")
mlpc_prediction = mlpcModel.transform(testLabeled)

The model is tested using MulticlassClassificationEvaluator with accuracy as an evaluation metric.

predictionAndLabels = mlpc_prediction.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
Test set accuracy = 0.9290082871081126

Decision Tree

The spark.ml implementation supports decision trees for binary and multiclass classification and for regression.

# Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier

# Train a DecisionTree model.
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = '_1')
dtModel = dt.fit(train)

The decision tree model is tested for ROC, Precision, and recall. The area under ROC is 0.9738, Precision is 0.9626 and recall is 0.9531.

# Decision Tree Metrics
prediction = dtModel.transform(test)
predictionAndLabels = prediction.rdd.map(lambda r: (r.prediction, r._1))

from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

from pyspark.mllib.evaluation import MulticlassMetrics
metrics2 = MulticlassMetrics(predictionAndLabels)
print("Precisions = %s" % metrics2.precision(1.0))
print("Recall = %s" % metrics2.recall(1.0))
Area under ROC = 0.9738847246751252
Precisions = 0.9626855522893936
Recall = 0.9531237053608418