// This will return DataFrameReader to read data source println(spark.read) val df = spark.range(0, 10) // This will return DataFrameWriter to save DataFrame println(df.write)
org.apache.spark.sql.DataFrameReader@6e36cd55
org.apache.spark.sql.DataFrameWriter@6bb686b5
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
// Saving Parquet table in Scala val df_save = spark.table("social_media_usage").select("platform", "visits") df_save.write.mode("overwrite").parquet("/tmp/platforms.parquet") // Loading Parquet table in Scala val df = spark.read.parquet("/tmp/platforms.parquet") df.show(5)
+----------+------+
| platform|visits|
+----------+------+
| SMS| 61652|
| SMS| 44547|
| Flickr| null|
|Newsletter| null|
| Twitter| 389|
+----------+------+
only showing top 5 rows
df_save: org.apache.spark.sql.DataFrame = [platform: string, visits: int]
df: org.apache.spark.sql.DataFrame = [platform: string, visits: int]
// Saving JSON dataset in Scala val df_save = spark.table("social_media_usage").select("platform", "visits") df_save.write.json("/tmp/platforms.json") // Loading JSON dataset in Scala val df = spark.read.json("/tmp/platforms.json") df.show(5)
+----------+------+
| platform|visits|
+----------+------+
| SMS| 61652|
| SMS| 44547|
| Flickr| null|
|Newsletter| null|
| Twitter| 389|
+----------+------+
only showing top 5 rows
df_save: org.apache.spark.sql.DataFrame = [platform: string, visits: int]
df: org.apache.spark.sql.DataFrame = [platform: string, visits: bigint]
val json = sqlContext.read.format("json").load("/tmp/platforms.json") json.select("platform").show(10) val parquet = sqlContext.read.format("parquet").load("/tmp/platforms.parquet") parquet.select("platform").show(10)
+----------+
| platform|
+----------+
| SMS|
| SMS|
| Flickr|
|Newsletter|
| Twitter|
| Twitter|
| Android|
| Android|
| Android|
|Broadcastr|
+----------+
only showing top 10 rows
+----------+
| platform|
+----------+
| SMS|
| SMS|
| Flickr|
|Newsletter|
| Twitter|
| Twitter|
| Android|
| Android|
| Android|
|Broadcastr|
+----------+
only showing top 10 rows
json: org.apache.spark.sql.DataFrame = [platform: string, visits: bigint]
parquet: org.apache.spark.sql.DataFrame = [platform: string, visits: int]
// First of all list tables to see that table we are about to create does not exist spark.catalog.listTables.show()
+--------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+--------------------+--------+-----------+---------+-----------+
| adult| default| null| EXTERNAL| false|
| business_csv_csv| default| null| EXTERNAL| false|
| checkin_table| default| null| MANAGED| false|
| diamonds| default| null| EXTERNAL| false|
| inventory| default| null| MANAGED| false|
|item_merchant_cat...| default| null| MANAGED| false|
| items_left_csv| default| null| EXTERNAL| false|
| logistic_detail| default| null| MANAGED| false|
| merchant_ratings| default| null| MANAGED| false|
| order_data| default| null| MANAGED| false|
| order_ids_left_csv| default| null| EXTERNAL| false|
| repeat_csv| default| null| MANAGED| false|
| review_2019_csv| default| null| EXTERNAL| false|
|sample_logistic_t...| default| null| EXTERNAL| false|
| sentimentlex_csv| default| null| EXTERNAL| false|
| social_media_usage| default| null| MANAGED| false|
| tip_json| default| null| EXTERNAL| false|
| tips_csv_csv| default| null| EXTERNAL| false|
| users_csv| default| null| EXTERNAL| false|
+--------------------+--------+-----------+---------+-----------+
val df = spark.range(0, 100) df.write.saveAsTable("simple_range") // Verify that table is saved and it is marked as persistent ("isTemporary" value should be "false") spark.catalog.listTables.show()
+--------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+--------------------+--------+-----------+---------+-----------+
| adult| default| null| EXTERNAL| false|
| business_csv_csv| default| null| EXTERNAL| false|
| checkin_table| default| null| MANAGED| false|
| diamonds| default| null| EXTERNAL| false|
| inventory| default| null| MANAGED| false|
|item_merchant_cat...| default| null| MANAGED| false|
| items_left_csv| default| null| EXTERNAL| false|
| logistic_detail| default| null| MANAGED| false|
| merchant_ratings| default| null| MANAGED| false|
| order_data| default| null| MANAGED| false|
| order_ids_left_csv| default| null| EXTERNAL| false|
| repeat_csv| default| null| MANAGED| false|
| review_2019_csv| default| null| EXTERNAL| false|
|sample_logistic_t...| default| null| EXTERNAL| false|
| sentimentlex_csv| default| null| EXTERNAL| false|
| simple_range| default| null| MANAGED| false|
| social_media_usage| default| null| MANAGED| false|
| tip_json| default| null| EXTERNAL| false|
| tips_csv_csv| default| null| EXTERNAL| false|
| users_csv| default| null| EXTERNAL| false|
+--------------------+--------+-----------+---------+-----------+
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("/tmp/platforms.parquet") // Parquet files can also be registered as tables and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile") val platforms = sqlContext.sql("SELECT platform FROM parquetFile WHERE visits > 0") platforms.distinct.map(t => "Name: " + t(0)).collect().foreach(println)
Name: Flickr
Name: iPhone
Name: YouTube
Name: WordPress
Name: SMS
Name: iPhone App
Name: Youtube
Name: Instagram
Name: iPhone app
Name: Linked-In
Name: Twitter
Name: TOTAL
Name: Tumblr
Name: Newsletter
Name: Pinterest
Name: Android
Name: Foursquare
Name: Google+
Name: Foursquare (Badge Unlock)
Name: Facebook
parquetFile: org.apache.spark.sql.DataFrame = [platform: string, visits: int]
platforms: org.apache.spark.sql.DataFrame = [platform: string]
// Create a simple DataFrame, stored into a partition directory val df1 = sc.parallelize(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.mode("overwrite").parquet("/tmp/data/test_table/key=1") // Create another DataFrame in a new partition directory, adding a new column and dropping an existing column val df2 = sc.parallelize(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.mode("overwrite").parquet("/tmp/data/test_table/key=2") // Read the partitioned table val df3 = spark.read.option("mergeSchema", "true").parquet("/tmp/data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: integer (nullable = true) // |-- double: integer (nullable = true) // |-- triple: integer (nullable = true) // |-- key: integer (nullable = true))
root
|-- single: integer (nullable = true)
|-- double: integer (nullable = true)
|-- triple: integer (nullable = true)
|-- key: integer (nullable = true)
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]
// A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "/tmp/platforms.json" val platforms = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method. platforms.printSchema() // root // |-- platform: string (nullable = true) // |-- visits: long (nullable = true) // Register this DataFrame as a table. platforms.createOrReplaceTempView("platforms") // SQL statements can be run by using the sql methods provided by sqlContext. val facebook = spark.sql("SELECT platform, visits FROM platforms WHERE platform like 'Face%k'") facebook.show() // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val rdd = sc.parallelize("""{"name":"IWyn","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPlatforms = spark.read.json(rdd) anotherPlatforms.show()
root
|-- platform: string (nullable = true)
|-- visits: long (nullable = true)
+--------+------+
|platform|visits|
+--------+------+
|Facebook| 3|
|Facebook| 36|
|Facebook| 47|
|Facebook| 90|
|Facebook| 105|
|Facebook| 123|
|Facebook| 119|
|Facebook| 148|
|Facebook| 197|
|Facebook| 179|
|Facebook| 200|
|Facebook| 200|
|Facebook| 183|
|Facebook| 190|
|Facebook| 227|
|Facebook| 194|
|Facebook| 243|
|Facebook| 237|
|Facebook| 234|
|Facebook| 238|
+--------+------+
only showing top 20 rows
+----------------+----+
| address|name|
+----------------+----+
|[Columbus, Ohio]|IWyn|
+----------------+----+
notebook:22: warning: method json in class DataFrameReader is deprecated: Use json(Dataset[String]) instead.
val anotherPlatforms = spark.read.json(rdd)
^
path: String = /tmp/platforms.json
platforms: org.apache.spark.sql.DataFrame = [platform: string, visits: bigint]
facebook: org.apache.spark.sql.DataFrame = [platform: string, visits: bigint]
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[27606] at parallelize at command-112937334110760:21
anotherPlatforms: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, name: string]
SDS-2.x, Scalable Data Engineering Science
Last refresh: Never