Sunday, June 24, 2018

Simple transformations in Spark



MAP :
  • map    is    a    transformation    operation    in    Spark    hence    it    is    lazily    evaluated   
  • It    is    a    narrow    operation    as    it    is    not    shuffling    data    from    one    partition    to    multiple partitions
Note:  In this example map takes the values from the array and assigns 1 to all the elements in the array
scala> val x=sc.parallelize(List("spark","rdd","example","sample","example"),3)
x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4]  
at parallelize at <console>:27

scala> val y=x.map(x=>(x,1))
y: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] 
at map at <console>:29

scala> y.collect
res0: Array[(String, Int)] = Array((spark,1), (rdd,1), (example,1), 
(sample,1), (example,1))





The same rdd can be re written with shorter syntax in scala as

scala> val y=x.map((_,1))
y: org.apache.spark.rdd.RDD[(String, Int)] =  
MapPartitionsRDD[9] at map at <console>:29

scala> y.collect
res3: Array[(String, Int)] = Array((spark,1), (rdd,1),  
(example,1), (sample,1), (example,1))

Taking tuple and finding its length
scala> val y=x.map(x=>(x,x.length))
y: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10]  
at map at <console>:29

scala> y.collect
res4: Array[(String, Int)] = Array((spark,5), (rdd,3), (example,7),  
(sample,6), (example,7))

FLATMAP:
  • flatMap is a transformation operation in Spark hence it is lazily evaluated 
  • It is a narrow operation as it is not shuffling data from one partition to multiple partitions 
  • Output of flatMap is flatten 
  • flatMap parameter function should return array, list or sequence (any subtype of scala.TraversableOnce) 
Note
1)Map    operation    will    return    Array    of    Arrays    in    following    case    :    check    type    of    res0
2) FlatMap    operation    will    return    Array    of    words    in    following    case  


scala> var x=sc.parallelize(List("spark rdd example","sample example"),2)
x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14]  
at parallelize at <console>:27

scala> val y=x.flatMap(x=>x.split(" "))
y: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[15]  
at map at <console>:29

scala> val z=x.groupBy(_.charAt(0))
z: org.apache.spark.rdd.RDD[(Char, Iterable[String])] = ShuffledRDD[17]  
at groupBy at <console>:29

scala> y.collect
res5: Array[Array[String]] = Array(Array(spark, rdd, example), Array(sample, example))

scala> z.collect
res6: Array[(Char, Iterable[String])] 
= Array((s,CompactBuffer(spark rdd example, sample example)))

RDD can be written in shorter syntax

Another example for map and flatmap:

Map function

scala> val x=sc.parallelize(List("spark rdd example","sample example"),2)
x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11]  
at parallelize at <console>:27

scala> val y=x.map(x=>x.split(""))
y: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[12] 
at map at <console>:29

scala> y.collect
res7: Array[Array[String]] = Array(Array("", s, p, a, r, k, " ",
  r, d, d, " ", e, x, a, m, p, l, e), Array("", s, a, m, p, l, e, " ", 
 e, x, a, m, p, l, e))

Flat Map function .

Contents are flattened into a single array
scala> val z=x.flatMap(x=>x.split(""))
z: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:29

scala> z.collect
res8: Array[String] = Array("", s, p, a, r, k, " ", r, d, d,  
" ", e, x, a, m, p, l, e, "", s, a, m, p, l, e, " ", e, x, a, m, p, l, e)

REDUCEBYKEY :
scala> val y=x.flatMap(_.split(" "))
y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at flatMap at 
<console>:29

scala> y.collect
res7: Array[String] = Array(spark, rdd, example, sample, example)

scala> 
  • reduceByKey is a transformation operation in Spark hence it is lazily evaluated
  • It is a wide operation as it shuffles data from multiple partitions and creates another RDD 
  • Before sending data across the partitions, it also merges the data locally using the same associative function for optimized data shuffling 
  • It can only be used with RDDs which contains key and value pairs kind of elements

scala> val x=sc.parallelize(Array(("a",1),("b",1),("a",1),("a",1),
("b",1),("b",1),("b",1),("b",1)),3);
x: org.apache.spark.rdd.RDD[(String, Int)] = 
ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val y=x.reduceByKey((accum,n)=>(accum+n))
y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at 
 reduceByKey at <console>:29

scala> y.collect
res0: Array[(String, Int)] = Array((a,3), (b,5))        

applying associative function
sscala> val y=x.reduceByKey(_ + _)
y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at 
reduceByKey at <console>:29

scala> y.collect
res1: Array[(String, Int)] = Array((a,3), (b,5))

scala> 

FILTER:
  • Filter is a transformation operation in Spark hence it is lazily evaluated 
  • It is a narrow operation as it is not shuffling data from one partition to multiple partitions 
  • Filter accepts predicate as an argument and will filter the elements from source RDD which are not satisfied by predicate function
scala> val x=sc.parallelize(1 to 10,2)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at 
parallelize at <console>:27

scala> val y=x.filter(e=>e%2==0)
y: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at filter at <console>:29

scala> y.collect
res2: Array[Int] = Array(2, 4, 6, 8, 10)


shorter syntax
val y=x.filter(_ %2 == 0)
y: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:29

scala> y.collect
res3: Array[Int] = Array(2, 4, 6, 8, 10)

GROUPBY:
  • Scala groupBy is a transformation operation in Spark hence it is lazily evaluated 
  • It is a wide operation as it shuffles data from multiple partitions and create another RDD
  • It is a costly operation as it doesn’t use combiner local to a partition to reduce the data transfer
  • Not recommended to use when you need to do further aggregation on grouped data
This operation will return the new RDD which basically is made up with a KEY (which is a group) and list of items of that group (in a form of Iterator). Order of element within the group may not same when you apply the same operation on the same RDD over and over. Additionally, this operation considered to be very costly when you are trying to perform so
scala> val y=x.groupBy(_.charAt(0))
y: org.apache.spark.rdd.RDD[(Char, Iterable[String])] = ShuffledRDD[10] at 
groupBy at <console>:29

scala> y.collect
res5: Array[(Char, Iterable[String])] =  
Array((T,CompactBuffer(Tina, Thomas)), (C,CompactBuffer(Cory, Christine)),  
(J,CompactBuffer(Joseph, Jimmy, James, Jackeline, Juna)))

USING MAP AND FILTER ON ARRAY OF INTEGERS
scala> var num =Array(5,10,15,20,25)
num: Array[Int] = Array(5, 10, 15, 20, 25)

scala> val y=num.map(num=>num*num)
y: Array[Int] = Array(25, 100, 225, 400, 625)

MAP/flatMap accepts the argument function which returns array, list or sequence of elements instead of a single element. As a final result it flattens all the elements of the resulting RDD in case individual elements are in form of list, array, sequence or any such collection In our case we are trying to filter the numbers which aren’t equal to 15. So it filters and displays the numbers which were not equals to 15 in ‘num’.

scala> val y=num.filter(e=>e!=15)
y: Array[Int] = Array(5, 10, 20, 25)


READ FROM CSV AND IGNORE HEADER (SPARK 1.6 VERSION)
val sourceFile = "c:\\source.tsv"
    val input = sc.textFile(sourceFile)

    //Remove header
    val header =input.first()
    val rows = input.filter (line=>line != header)

STORE TEXT FILE AS PARQUET (SPARK 1.6)
  def writeParquet(dfName:DataFrame, fileName:String):Unit={
    dfName.write
      .option("header", "true")
      .mode("overwrite")
      .save("C:\\Boxes"+fileName)
    }

INCLUDE SCHEMA DURING DATE LOAD
/****
     Read text file from the specified location.
     Add schema to the file
     */
    val sourceFile = "C:\\Boxes\\purchase_order_header.tsv"
    val input = sc.textFile(sourceFile)

    val inputDF = input.flatMap { line =>
      //split and assign it to a variable
      val record = line.split("\\t")
      //Validate if the actual input is of the actual length
      if (record.length == 13){
        //flatMap would unbox and return the activity
        Some(domain.PurchaseOrder(record(0), record(1), record(2), record(3), record(4), 
record(5), record(6),record(7), record(8), record(9), record(10), record(11), 
 record(12)))
      }
      else
        None
    }.toDF()

specify schema
Link to github lambda_architecture
package object domain {
  case class Activity(timestamp_hour:Long,
                      referrer:String,
                      action:String,
                      prevPage:String,
                      visitor:String,
                      page:String,
                      product:String,
                      inputProps:Map[String,String]=Map()
                     )
  case class PurchaseOrder(order_number :String,
                           cosmos_customerid:String,
                            storeid :String,
                            purchase_channel:String,
                            header_purchase_date :String,
                            order_date :String,
                            total_net_amount :String,
                            total_gross_amount :String,
                            total_gross_units:String,
                            total_returned_units:String,
                            total_discount_amount:String,
                            total_return_sales:String,
                            dt:String,
                            inputProps:Map[String,String]=Map()
                     )
}

GET MONTH DIFFERENCE BETWEEN 2 SPECIFIED DATES
val parquetSourceFile = "C:\\Boxes\purchase_order_parquet"
    val parquetFile = sqlContext.read
      .option("mergeSchema","true")
      .parquet(parquetSourceFile)
    //Register temp table
    parquetFile.registerTempTable("purchase_order_tb")
    parquetFile.printSchema()
    val activeUsers=sqlContext.sql("""select order_number
                                  , cosmos_customerid
                                  , header_purchase_date
                                  , datediff(from_unixtime(unix_timestamp()), 
                                    to_date(header_purchase_date)) as diff
                                  , months_between(from_unixtime(unix_timestamp()),
                                    header_purchase_date) as month_diff
                                   from purchase_order_tb""")
    activeUsers.foreach(println)

ADD NEW COLUMN USING WITH COLUMN
.withColumn("rolling_range",lit(range24))

USING AGGREGATED FUNCTION MAX AND WHERE WITH GROUP BY
val maxTransactionsOnline=loyalCustomers
                   .where(upper(col("purchase_channel")) === "ONLINE")
                   .groupBy("cosmos_customerid")
                    .agg(max("header_purchase_date"))

CHECK FOR NULL
sourceDF.withColumn(
  "is_person_name_null",
  col("person_name").isNull
).show()

CREATE SPARK SESSION
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
val sparkSession=SparkSession
val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

omniTransactions.createOrReplaceTempView("omni")
val result= spark.sql(""" select *
                    from omni""") 

result.show()
SHA-256 HASH
 select reflect('org.apache.commons.codec.digest.DigestUtils', 'sha256Hex', 
'COLUMN_TO_BE_HASEHED')  as email_hash_sha256
 

No comments:

Post a Comment