import org.apache.spark.sql.types.{StructType, StructField, StringType}; import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.ColumnName import org.apache.spark.sql.DataFrame def fromParquetFile2DF(InputDFAsParquetFilePatternString: String): DataFrame = { sqlContext. read.parquet(InputDFAsParquetFilePatternString) } def tweetsJsonStringDF2TweetsDF(tweetsAsJsonStringInputDF: DataFrame): DataFrame = { sqlContext .read .json(tweetsAsJsonStringInputDF.map({case Row(val1: String) => val1})) } def tweetsIDLong_JsonStringPairDF2TweetsDF(tweetsAsIDLong_JsonStringInputDF: DataFrame): DataFrame = { sqlContext .read .json(tweetsAsIDLong_JsonStringInputDF.map({case Row(val0:Long, val1: String) => val1})) } def tweetsDF2TTTDF(tweetsInputDF: DataFrame): DataFrame = { tweetsInputDF.select( unix_timestamp($"createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CurrentTweetDate"), $"id".as("CurrentTwID"), unix_timestamp($"retweetedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInRT"), $"retweetedStatus.id".as("OriginalTwIDinRT"), unix_timestamp($"quotedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInQT"), $"quotedStatus.id".as("OriginalTwIDinQT"), $"inReplyToStatusId".as("OriginalTwIDinReply"), $"user.id".as("CPostUserId"), unix_timestamp($"user.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("userCreatedAtDate"), $"retweetedStatus.user.id".as("OPostUserIdinRT"), $"quotedStatus.user.id".as("OPostUserIdinQT"), $"inReplyToUserId".as("OPostUserIdinReply"), $"user.name".as("CPostUserName"), $"retweetedStatus.user.name".as("OPostUserNameinRT"), $"quotedStatus.user.name".as("OPostUserNameinQT"), $"user.screenName".as("CPostUserSN"), $"retweetedStatus.user.screenName".as("OPostUserSNinRT"), $"quotedStatus.user.screenName".as("OPostUserSNinQT"), $"inReplyToScreenName".as("OPostUserSNinReply"), $"user.favouritesCount", $"user.followersCount", $"user.friendsCount", $"user.isVerified", $"user.isGeoEnabled", $"text".as("CurrentTweet"), $"retweetedStatus.userMentionEntities.id".as("UMentionRTiD"), $"retweetedStatus.userMentionEntities.screenName".as("UMentionRTsN"), $"quotedStatus.userMentionEntities.id".as("UMentionQTiD"), $"quotedStatus.userMentionEntities.screenName".as("UMentionQTsN"), $"userMentionEntities.id".as("UMentionASiD"), $"userMentionEntities.screenName".as("UMentionASsN") ).withColumn("TweetType", when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1, "Original Tweet") .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1, "Reply Tweet") .when($"OriginalTwIDinRT".isNotNull &&$"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1, "ReTweet") .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1, "Quoted Tweet") .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1, "Retweet of Quoted Tweet") .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1, "Retweet of Reply Tweet") .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1, "Reply of Quoted Tweet") .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1, "Retweet of Quoted Rely Tweet") .otherwise("Unclassified")) .withColumn("MentionType", when($"UMentionRTid".isNotNull && $"UMentionQTid".isNotNull, "RetweetAndQuotedMention") .when($"UMentionRTid".isNotNull && $"UMentionQTid".isNull, "RetweetMention") .when($"UMentionRTid".isNull && $"UMentionQTid".isNotNull, "QuotedMention") .when($"UMentionRTid".isNull && $"UMentionQTid".isNull, "AuthoredMention") .otherwise("NoMention")) .withColumn("Weight", lit(1L)) } def tweetsDF2TTTDFWithURLsAndHastags(tweetsInputDF: DataFrame): DataFrame = { tweetsInputDF.select( unix_timestamp($"createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CurrentTweetDate"), $"id".as("CurrentTwID"), unix_timestamp($"retweetedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInRT"), $"retweetedStatus.id".as("OriginalTwIDinRT"), unix_timestamp($"quotedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInQT"), $"quotedStatus.id".as("OriginalTwIDinQT"), $"inReplyToStatusId".as("OriginalTwIDinReply"), $"user.id".as("CPostUserId"), unix_timestamp($"user.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("userCreatedAtDate"), $"retweetedStatus.user.id".as("OPostUserIdinRT"), $"quotedStatus.user.id".as("OPostUserIdinQT"), $"inReplyToUserId".as("OPostUserIdinReply"), $"user.name".as("CPostUserName"), $"retweetedStatus.user.name".as("OPostUserNameinRT"), $"quotedStatus.user.name".as("OPostUserNameinQT"), $"user.screenName".as("CPostUserSN"), $"retweetedStatus.user.screenName".as("OPostUserSNinRT"), $"quotedStatus.user.screenName".as("OPostUserSNinQT"), $"inReplyToScreenName".as("OPostUserSNinReply"), $"user.favouritesCount", $"user.followersCount", $"user.friendsCount", $"user.isVerified", $"user.isGeoEnabled", $"text".as("CurrentTweet"), $"retweetedStatus.userMentionEntities.id".as("UMentionRTiD"), $"retweetedStatus.userMentionEntities.screenName".as("UMentionRTsN"), $"quotedStatus.userMentionEntities.id".as("UMentionQTiD"), $"quotedStatus.userMentionEntities.screenName".as("UMentionQTsN"), $"userMentionEntities.id".as("UMentionASiD"), $"userMentionEntities.screenName".as("UMentionASsN"), $"urlEntities.expandedURL".as("URLs"), $"hashtagEntities.text".as("hashTags") ).withColumn("TweetType", when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1, "Original Tweet") .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1, "Reply Tweet") .when($"OriginalTwIDinRT".isNotNull &&$"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1, "ReTweet") .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1, "Quoted Tweet") .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1, "Retweet of Quoted Tweet") .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1, "Retweet of Reply Tweet") .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1, "Reply of Quoted Tweet") .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1, "Retweet of Quoted Rely Tweet") .otherwise("Unclassified")) .withColumn("MentionType", when($"UMentionRTid".isNotNull && $"UMentionQTid".isNotNull, "RetweetAndQuotedMention") .when($"UMentionRTid".isNotNull && $"UMentionQTid".isNull, "RetweetMention") .when($"UMentionRTid".isNull && $"UMentionQTid".isNotNull, "QuotedMention") .when($"UMentionRTid".isNull && $"UMentionQTid".isNull, "AuthoredMention") .otherwise("NoMention")) .withColumn("Weight", lit(1L)) } println("""USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName"))) val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName"))) """)
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
SDS-2.x, Scalable Data Engineering Science
Last refresh: Never