import org.apache.spark.sql.functions.rand // Define the class for pii case class pii(firstName: String, lastName: String, gender: String, dob: String) """ firstName,lastName,gender,dob Howard,Philips,M,1890-08-20 Edgar,Allan,M,1809-01-19 Arthur,Ignatius,M,1859-05-22 Mary,Wollstonecraft,F,1797-08-30 """ // Read the data from a csv file, then convert to dataset // /FileStore/tables/demo_pii.txt was uploaded val IDs = spark.read.option("header", "true").csv("/FileStore/tables/demo_pii.txt").orderBy(rand()) val ids = IDs.as[pii] ids.show
+---------+--------------+------+----------+
|firstName| lastName|gender| dob|
+---------+--------------+------+----------+
| Mary|Wollstonecraft| F|1797-08-30|
| Edgar| Allan| M|1809-01-19|
| Arthur| Ignatius| M|1859-05-22|
| Howard| Philips| M|1890-08-20|
+---------+--------------+------+----------+
notebook:6: warning: a pure expression does nothing in statement position; you may be omitting necessary parentheses
"""
^
import org.apache.spark.sql.functions.rand
defined class pii
IDs: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [firstName: string, lastName: string ... 2 more fields]
ids: org.apache.spark.sql.Dataset[pii] = [firstName: string, lastName: string ... 2 more fields]
import org.apache.spark.sql.functions.rand // Define the class for pii case class pii(firstName: String, lastName: String, gender: String, dob: String) """ firstName,lastName,gender,dob Howard,Philips,M,1890-08-20 Edgar,Allan,M,1809-01-19 Arthur,Ignatius,M,1859-05-22 Mary,Wollstonecraft,F,1797-08-30 """ // Read the data from a csv file, then convert to dataset // /FileStore/tables/demo_pii.txt was uploaded val IDs = spark.read.option("header", "true").csv("/FileStore/tables/demo_pii.txt").orderBy(rand()) val ids = IDs.as[pii] ids.show
+---------+--------------+------+----------+
|firstName| lastName|gender| dob|
+---------+--------------+------+----------+
| Arthur| Ignatius| M|1859-05-22|
| Howard| Philips| M|1890-08-20|
| Edgar| Allan| M|1809-01-19|
| Mary|Wollstonecraft| F|1797-08-30|
+---------+--------------+------+----------+
notebook:6: warning: a pure expression does nothing in statement position; you may be omitting necessary parentheses
"""
^
import org.apache.spark.sql.functions.rand
defined class pii
IDs: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [firstName: string, lastName: string ... 2 more fields]
ids: org.apache.spark.sql.Dataset[pii] = [firstName: string, lastName: string ... 2 more fields]
import java.security.MessageDigest import java.util.Base64 import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType} // Perform SHA-256 hashin def sha_256(in: String): String = { val md: MessageDigest = MessageDigest.getInstance("SHA-256") // Instantiate MD with algo SHA-256 new String(Base64.getEncoder.encode(md.digest(in.getBytes)),"UTF-8") // Encode the resulting byte array as a base64 string } // Generate UDFs from the above functions (not directly evaluated functions) // If you wish to call these functions directly then use the names in parenthesis, if you wish to use them in a transform use the udf() names. val sha__256 = udf(sha_256 _)
import java.security.MessageDigest
import java.util.Base64
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
sha_256: (in: String)String
sha__256: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
val myStr = "Hello!" // Input is a string val myHash = sha_256(myStr) println(myHash) // The basic function takes a strings and outputs a string of bits in base64 ("=" represents padding becuse of the block based hashes)
M00Bb3Vc1txYxTqG4YOIL47BT1L7BTRYh8il7dQsh7c=
myStr: String = Hello!
myHash: String = M00Bb3Vc1txYxTqG4YOIL47BT1L7BTRYh8il7dQsh7c=
// Concantination with a pipe character val p = lit("|") // Define our serialization rules as a column val rules: Column = { //Use all of the pii for the pseudo ID concat(upper('firstName), p, upper('lastName), p, upper('gender), p,'dob) }
p: org.apache.spark.sql.Column = |
rules: org.apache.spark.sql.Column = concat(upper(firstName), |, upper(lastName), |, upper(gender), |, dob)
import javax.crypto.KeyGenerator import javax.crypto.Cipher import javax.crypto.spec.SecretKeySpec var myKey: String = "" // Perform AES-128 encryption def encrypt(in: String): String = { val raw = KeyGenerator.getInstance("AES").generateKey.getEncoded() //Initiates a key generator object of type AES, generates the key, and encodes & returns the key myKey = new String(Base64.getEncoder.encode(raw),"UTF-8") //String representation of the key for decryption val skeySpec = new SecretKeySpec(raw, "AES") //Creates a secret key object from our generated key val cipher = Cipher.getInstance("AES") //Initate a cipher object of type AES cipher.init(Cipher.ENCRYPT_MODE, skeySpec) // Initialize the cipher with our secret key object, specify encryption new String(Base64.getEncoder.encode(cipher.doFinal(in.getBytes)),"UTF-8") // Encode the resulting byte array as a base64 string } // Perform AES-128 decryption def decrypt(in: String): String = { val k = new SecretKeySpec(Base64.getDecoder.decode(myKey.getBytes), "AES") //Decode the key from base 64 representation val cipher = Cipher.getInstance("AES") //Initate a cipher object of type AES cipher.init(Cipher.DECRYPT_MODE, k) // Initialize the cipher with our secret key object, specify decryption new String((cipher.doFinal(Base64.getDecoder.decode(in))),"UTF-8") // Encode the resulting byte array as a base64 string } val myEnc = udf(encrypt _) val myDec = udf(decrypt _)
import javax.crypto.KeyGenerator
import javax.crypto.Cipher
import javax.crypto.spec.SecretKeySpec
myKey: String = ""
encrypt: (in: String)String
decrypt: (in: String)String
myEnc: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
myDec: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
// Here we define the PII transformation to be applied to our dataset, which outputs a dataframe modified according to our specifications. def removePII(ds: Dataset[_]): DataFrame = ds.toDF.select(quasiIdCols ++ Seq(psids): _*)
removePII: (ds: org.apache.spark.sql.Dataset[_])org.apache.spark.sql.DataFrame
SDS-2.x, Scalable Data Engineering Science
Last refresh: Never