import org.apache.commons.lang3.StringUtils
import java.net.HttpURLConnection;
import java.net.URL;
//function to expand once the urls
def expandUrl(url: String) : String = {
var connection: HttpURLConnection = null
try {
connection = new URL(url)
.openConnection
.asInstanceOf[HttpURLConnection]
connection.setInstanceFollowRedirects(false)
connection.setUseCaches(false)
connection.setRequestMethod("GET")
connection.connect()
val redirectedUrl = connection.getHeaderField("Location")
if(StringUtils.isNotEmpty(redirectedUrl)){
redirectedUrl
} else {
url
}
} catch {
case e: Throwable => url
} finally {
if(connection != null)
connection.disconnect()
}
}
//function to keep expanding until you get the same
def expandUrluntilsame(url:String) : String ={
var expanded=expandUrl(url)
var unexpanded=url
while(! StringUtils.equals(expanded,unexpanded)){
unexpanded=expanded
expanded=expandUrl(expanded)
}
expanded
}
import org.apache.commons.lang3.StringUtils
import java.net.HttpURLConnection
import java.net.URL
expandUrl: (url: String)String
expandUrluntilsame: (url: String)String
//the class for the web article
case class Articles(
title: String,
body: String,
domain: String, //problem: If we add more stuff to the Articles class, getArticles needs to be changed
description: String,
keywords: String,
status: String
)
//the scraper
def getArticles(url_article: String) : Articles = {
try{
//using configuration to make goose skip retrieving images
val conf: Configuration = new Configuration
conf.setEnableImageFetching(false)
val goosy = new Goose(conf)
val article = goosy.extractContent(url_article)
val d = article.domain
val t = article.title
val b = article.cleanedArticleText
val desc=article.metaDescription
val keyw=article.metaKeywords
Articles(t, b, d,desc,keyw, "found")
}
catch {
case _: Throwable => Articles("null","null","null","","","not found")
// case uhe: java.net.UnknownHostException => Seq(Articles("null","null","null","not found")).toDF()
}
}
//user defined function to be used with dataframes
val ArticleUserDefined = udf((s:String) => getArticles(expandUrluntilsame(s))) //this actually creates a new instance of goose for each article,this is slow and not scalable as they say in the book (i think)
defined class Articles
getArticles: (url_article: String)Articles
ArticleUserDefined: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StructType(StructField(title,StringType,true), StructField(body,StringType,true), StructField(domain,StringType,true), StructField(description,StringType,true), StructField(status,StringType,true)),Some(List(StringType)))
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
implicit class DataFrameFlattener(df: DataFrame) {
def flattenSchema: DataFrame = {
df.select(flatten(Nil, df.schema): _*)
}
protected def flatten(path: Seq[String], schema: DataType): Seq[Column] = schema match {
case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name, f.dataType))
//case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil //original
case other => col(path.map(n => s"`$n`").mkString(".")).as(path.last) :: Nil //i just want the lowest nested name (is last too computationally expensive?)
}
}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
defined class DataFrameFlattener
SDS-2.2, Scalable Data Science