%py
dbutils.fs.put("dbfs:/tmp/suckAs.sh",
"""#!/bin/bash
# this script will simply evaluate the string 'sucked in' As is!
# see http://unix.stackexchange.com/questions/61183/bash-script-that-reads-filenames-from-a-pipe-or-from-command-line-args
#http://stackoverflow.com/questions/2355148/run-a-string-as-a-command-within-a-bash-script
#http://tldp.org/LDP/Bash-Beginners-Guide/html/sect_03_04.html#sect_03_04_07
#http://tldp.org/LDP/Bash-Beginners-Guide/html/sect_08_02.html
#http://tldp.org/LDP/Bash-Beginners-Guide/html/sect_08_02.html
#USAGE: echo 'flow-cat bindata/S1.1 | ft2nfdump | nfdump -q -o "fmt: %ts, %sa, %da, %pkt, %byt, %fl"' | ./suckAs.sh
#OUTPUT: 2014-11-01 01:59:45.034, 203.35.135.168, 74.125.237.221, 1, 52, 1
IFS=$'\\n' read -d '' -r -a inputs
#echo "${inputs[0]}"
#echo "${inputs[1]}"
inpLen=${#inputs[@]}
for (( i=0; i<${inpLen}; i++));
do
eval "${inputs[i]}"
done
#eval "${inputs[0]}"
""", True)
dbutils.fs.cp("dbfs:/tmp/suckAs.sh", "file:/tmp/suckAs.sh")
import os
import shutil
num_worker_nodes = 3
def copyFile(filepath):
shutil.copyfile("/dbfs%s" % filepath, filepath)
os.system("chmod u+x %s" % filepath)
sc.parallelize(range(0, 2 * (1 + num_worker_nodes))).map(lambda s: copyFile("/tmp/IsIt1or2Coins")).count()
sc.parallelize(range(0, 2 * (1 + num_worker_nodes))).map(lambda s: copyFile("/tmp/suckAs.sh")).count()
Wrote 885 bytes.
Out[2]: 8
oboDF.printSchema
root
|-- breakingPeace: integer (nullable = true)
|-- corporal: integer (nullable = true)
|-- damage: integer (nullable = true)
|-- death: integer (nullable = true)
|-- deception: integer (nullable = true)
|-- guilty: integer (nullable = true)
|-- id: string (nullable = true)
|-- imprison: integer (nullable = true)
|-- kill: integer (nullable = true)
|-- miscPunish: integer (nullable = true)
|-- miscVerdict: integer (nullable = true)
|-- miscellaneous: integer (nullable = true)
|-- noPunish: integer (nullable = true)
|-- notGuilty: integer (nullable = true)
|-- royalOffences: integer (nullable = true)
|-- sexual: integer (nullable = true)
|-- specialVerdict: integer (nullable = true)
|-- theft: integer (nullable = true)
|-- transport: integer (nullable = true)
|-- violentTheft: integer (nullable = true)
import org.apache.spark.sql.functions._
// This UDF removes the first t in the id string
val removeT = udf((id:String) => id.slice(1,200))
// This UDF takes the remainder id after the time string i.e. tyyyymmdd-id
val getID = udf((id:String) => id.split("-").last)
import org.apache.spark.sql.functions._
removeT: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
getID: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
val killDeath = oboDFWI
.filter($"guilty" === true)
.withColumn("TimeOfEvent",to_timestamp(removeT($"id"),"yyyyMMdd"))
.withColumn("death",when($"death" > 0, 1).otherwise(0))
.withColumn("kill",when($"kill" > 0, 1).otherwise(0))
.withColumn("CrimeID",$"ID0")
.select($"TimeOfEvent",$"CrimeID",$"death",$"kill")
display(killDeath)
ScaDaMaLe Course site and book