033_OBO_xx2_OBOnlineExampleScala(Scala)

Loading...

ScaDaMaLe Course site and book

Old Bailey Online Analysis - for time-varying Bayesian Binomial Partition Models

Benny Avelin and Raazesh Sainudiin

Analyse Data

Here, suppose you have an executible for linux x86 64 bit processor with all dependencies pre-compiled into one executibe.

Say this executible is IsIt10r2Coins.

This executible comes from the following dockerised build:

You can replace the executible with any other executible with appropriate I/O to it.

Then you upload the executible to databricks' FileStore.

%fs ls "/FileStore/tables/IsIt1or2Coins"
 
path
name
size
1
dbfs:/FileStore/tables/IsIt1or2Coins
IsIt1or2Coins
6548544

Showing all 1 rows.

dbutils.fs.cp("dbfs:/FileStore/tables/IsIt1or2Coins", "file:/tmp/IsIt1or2Coins")
res1: Boolean = true
%sh
chmod +x /tmp/IsIt1or2Coins
%py
dbutils.fs.put("dbfs:/tmp/suckAs.sh",
"""#!/bin/bash
# this script will simply evaluate the string 'sucked in' As is!
# see http://unix.stackexchange.com/questions/61183/bash-script-that-reads-filenames-from-a-pipe-or-from-command-line-args
#http://stackoverflow.com/questions/2355148/run-a-string-as-a-command-within-a-bash-script
#http://tldp.org/LDP/Bash-Beginners-Guide/html/sect_03_04.html#sect_03_04_07
#http://tldp.org/LDP/Bash-Beginners-Guide/html/sect_08_02.html
#http://tldp.org/LDP/Bash-Beginners-Guide/html/sect_08_02.html
#USAGE: echo 'flow-cat bindata/S1.1 | ft2nfdump | nfdump -q -o "fmt: %ts, %sa, %da, %pkt, %byt, %fl"' | ./suckAs.sh
#OUTPUT: 2014-11-01 01:59:45.034,   203.35.135.168,   74.125.237.221,        1,       52,     1
IFS=$'\\n' read -d '' -r -a inputs
#echo "${inputs[0]}"
#echo "${inputs[1]}"
inpLen=${#inputs[@]}
for (( i=0; i<${inpLen}; i++));
do
    eval "${inputs[i]}"
done
#eval "${inputs[0]}"
""", True)
 
dbutils.fs.cp("dbfs:/tmp/suckAs.sh", "file:/tmp/suckAs.sh")
 
import os
import shutil
  
num_worker_nodes = 3
 
def copyFile(filepath):
  shutil.copyfile("/dbfs%s" % filepath, filepath)
  os.system("chmod u+x %s" % filepath)
  
sc.parallelize(range(0, 2 * (1 + num_worker_nodes))).map(lambda s: copyFile("/tmp/IsIt1or2Coins")).count()
sc.parallelize(range(0, 2 * (1 + num_worker_nodes))).map(lambda s: copyFile("/tmp/suckAs.sh")).count()
Wrote 885 bytes. Out[2]: 8
val oboDF = spark.read.format("csv").option("header", "true").option("inferSchema","True").load("dbfs:/tmp/obo.csv")
oboDF: org.apache.spark.sql.DataFrame = [breakingPeace: int, corporal: int ... 18 more fields]
import org.apache.spark.sql.functions._
 
val oboDFWI = oboDF.withColumn("ID0",monotonically_increasing_id)
import org.apache.spark.sql.functions._ oboDFWI: org.apache.spark.sql.DataFrame = [breakingPeace: int, corporal: int ... 19 more fields]
display(oboDFWI)
 
breakingPeace
corporal
damage
death
deception
guilty
id
imprison
kill
miscPunish
miscVerdict
miscellaneous
noPunish
notGuilty
royalOffences
sexual
specialVerdict
theft
transport
violentTheft
ID0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
0
0
0
1
0
1
t18361024-2239
0
0
0
0
0
0
0
0
0
0
0
0
1
0
0
0
0
1
0
2
t18361024-2240
0
0
0
0
0
0
0
0
0
0
0
0
1
1
0
0
0
0
0
1
t18361024-2241
1
0
0
0
0
0
0
1
0
0
0
0
0
2
0
0
0
0
0
0
t18361024-2242
0
0
0
0
1
0
1
0
0
0
0
0
0
3
0
0
0
0
0
0
t18361024-2243
0
0
0
0
1
0
1
0
0
0
0
0
0
4
0
0
0
0
0
1
t18361024-2244
1
0
0
0
0
0
0
0
1
0
0
0
0
5
0
0
0
0
0
1
t18361024-2245
1
0
0
0
0
0
0
0
0
0
1
0
0
6
0
0
0
0
0
1
t18361024-2246
0
0
0
0
0
0
0
0
0
0
1
1
0
7
0
0
0
0
0
1
t18361024-2247
1
0
0
0
0
0
0
0
0
0
1
0
0
8
0
0
0
0
0
1
t18361024-2248
0
0
0
0
0
0
1
0
0
0
1
1
0
9
0
0
0
0
0
0
t18361024-2249
0
0
0
0
0
0
1
0
0
0
1
0
0
10
0
0
0
0
0
1
t18361024-2250
0
0
0
0
0
0
0
0
0
0
1
1
0
11
0
0
0
0
0
1
t18361024-2251
0
0
0
0
0
0
0
0
0
0
1
1
0
12
0
0
0
0
0
0
t18361024-2252
0
0
0
0
0
0
1
0
0
0
1
0
0
13
0
0
0
0
0
1
t18361024-2253
1
0
0
0
0
0
0
0
0
0
1
0
0
14
0
0
0
0
0
1
t18361024-2254
1
0
0
0
0
0
0
0
0
0
1
0
0
15
0
0
0
0
0
1
t18361024-2255
1
0
0
0
0
0
0
0
0
0
1
0
0
16

Truncated results, showing first 1000 rows.

oboDF.printSchema
root |-- breakingPeace: integer (nullable = true) |-- corporal: integer (nullable = true) |-- damage: integer (nullable = true) |-- death: integer (nullable = true) |-- deception: integer (nullable = true) |-- guilty: integer (nullable = true) |-- id: string (nullable = true) |-- imprison: integer (nullable = true) |-- kill: integer (nullable = true) |-- miscPunish: integer (nullable = true) |-- miscVerdict: integer (nullable = true) |-- miscellaneous: integer (nullable = true) |-- noPunish: integer (nullable = true) |-- notGuilty: integer (nullable = true) |-- royalOffences: integer (nullable = true) |-- sexual: integer (nullable = true) |-- specialVerdict: integer (nullable = true) |-- theft: integer (nullable = true) |-- transport: integer (nullable = true) |-- violentTheft: integer (nullable = true)
// Goal, create a window function which takes blocks of times and produces a contingency table over death !death and kill !kill
// Step 1: Parse the timestamps so that we have a reasonable way of performing the window functions
display(oboDF.select($"id").groupBy($"id").count())
 
id
count
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
t18180218-30
1
t18180401-133
1
t18180617-160
1
t18180617-177
1
t18180909-249
1
t18180909-326
1
t18190217-99
1
t18190421-230
1
t18190526-128
1
t18190707-9
1
t18191027-112
1
t18191201-83
1
t18191201-188
1
t18200112-131
1
t18200217-139
1
t18200412-59
1
t18200517-40
1

Truncated results, showing first 1000 rows.

import org.apache.spark.sql.functions._
// This UDF removes the first t in the id string
val removeT = udf((id:String) => id.slice(1,200))
// This UDF takes the remainder id after the time string i.e. tyyyymmdd-id
val getID = udf((id:String) => id.split("-").last)
import org.apache.spark.sql.functions._ removeT: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType))) getID: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
val killDeath = oboDFWI
        .filter($"guilty" === true)
        .withColumn("TimeOfEvent",to_timestamp(removeT($"id"),"yyyyMMdd"))
        .withColumn("death",when($"death" > 0, 1).otherwise(0))
        .withColumn("kill",when($"kill" > 0, 1).otherwise(0))
        .withColumn("CrimeID",$"ID0")
        .select($"TimeOfEvent",$"CrimeID",$"death",$"kill")
 
display(killDeath)
 
TimeOfEvent
CrimeID
death
kill
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1836-10-24T00:00:00.000+0000
0
1
0
1836-10-24T00:00:00.000+0000
2
0
0
1836-10-24T00:00:00.000+0000
5
0
0
1836-10-24T00:00:00.000+0000
6
0
0
1836-10-24T00:00:00.000+0000
7
0
0
1836-10-24T00:00:00.000+0000
8
0
0
1836-10-24T00:00:00.000+0000
9
0
0
1836-10-24T00:00:00.000+0000
11
0
0
1836-10-24T00:00:00.000+0000
12
0
0
1836-10-24T00:00:00.000+0000
14
0
0
1836-10-24T00:00:00.000+0000
15
0
0
1836-10-24T00:00:00.000+0000
16
0
0
1836-10-24T00:00:00.000+0000
17
0
0
1836-10-24T00:00:00.000+0000
18
0
0
1836-10-24T00:00:00.000+0000
19
0
0
1836-10-24T00:00:00.000+0000
21
0
0
1836-10-24T00:00:00.000+0000
22
0
0

Truncated results, showing first 1000 rows.