MAP :
- map is a transformation operation in Spark hence it is lazily evaluated
- It is a narrow operation as it is not shuffling data from one partition to multiple partitions
scala> val x=sc.parallelize(List("spark","rdd","example","sample","example"),3) x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4]
at parallelize at <console>:27 scala> val y=x.map(x=>(x,1)) y: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5]
at map at <console>:29 scala> y.collect res0: Array[(String, Int)] = Array((spark,1), (rdd,1), (example,1),
(sample,1), (example,1))
The same rdd can be re written with shorter syntax in scala as
scala> val y=x.map((_,1)) y: org.apache.spark.rdd.RDD[(String, Int)] =
MapPartitionsRDD[9] at map at <console>:29 scala> y.collect res3: Array[(String, Int)] = Array((spark,1), (rdd,1),
(example,1), (sample,1), (example,1))
Taking tuple and finding its length
scala> val y=x.map(x=>(x,x.length)) y: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10]
at map at <console>:29 scala> y.collect res4: Array[(String, Int)] = Array((spark,5), (rdd,3), (example,7),
(sample,6), (example,7))
FLATMAP:
- flatMap is a transformation operation in Spark hence it is lazily evaluated
- It is a narrow operation as it is not shuffling data from one partition to multiple partitions
- Output of flatMap is flatten
- flatMap parameter function should return array, list or sequence (any subtype of scala.TraversableOnce)
1)Map operation will return Array of Arrays in following case : check type of res0
2) FlatMap operation will return Array of words in following case
scala> var x=sc.parallelize(List("spark rdd example","sample example"),2) x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14]
at parallelize at <console>:27 scala> val y=x.flatMap(x=>x.split(" ")) y: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[15]
at map at <console>:29 scala> val z=x.groupBy(_.charAt(0)) z: org.apache.spark.rdd.RDD[(Char, Iterable[String])] = ShuffledRDD[17]
at groupBy at <console>:29 scala> y.collect res5: Array[Array[String]] = Array(Array(spark, rdd, example), Array(sample, example)) scala> z.collect res6: Array[(Char, Iterable[String])]
= Array((s,CompactBuffer(spark rdd example, sample example)))
RDD can be written in shorter syntax
Another example for map and flatmap:
Map function
scala> val x=sc.parallelize(List("spark rdd example","sample example"),2) x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11]
at parallelize at <console>:27 scala> val y=x.map(x=>x.split("")) y: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[12]
at map at <console>:29 scala> y.collect res7: Array[Array[String]] = Array(Array("", s, p, a, r, k, " ",
r, d, d, " ", e, x, a, m, p, l, e), Array("", s, a, m, p, l, e, " ",
e, x, a, m, p, l, e))
Flat Map function .
Contents are flattened into a single array
scala> val z=x.flatMap(x=>x.split("")) z: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:29 scala> z.collect res8: Array[String] = Array("", s, p, a, r, k, " ", r, d, d,
" ", e, x, a, m, p, l, e, "", s, a, m, p, l, e, " ", e, x, a, m, p, l, e)
REDUCEBYKEY :
scala> val y=x.flatMap(_.split(" ")) y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at flatMap at
<console>:29 scala> y.collect res7: Array[String] = Array(spark, rdd, example, sample, example) scala>
- reduceByKey is a transformation operation in Spark hence it is lazily evaluated
- It is a wide operation as it shuffles data from multiple partitions and creates another RDD
- Before sending data across the partitions, it also merges the data locally using the same associative function for optimized data shuffling
- It can only be used with RDDs which contains key and value pairs kind of elements
scala> val x=sc.parallelize(Array(("a",1),("b",1),("a",1),("a",1),
("b",1),("b",1),("b",1),("b",1)),3); x: org.apache.spark.rdd.RDD[(String, Int)] =
ParallelCollectionRDD[0] at parallelize at <console>:27 scala> val y=x.reduceByKey((accum,n)=>(accum+n)) y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at
reduceByKey at <console>:29 scala> y.collect res0: Array[(String, Int)] = Array((a,3), (b,5))
applying associative function
sscala> val y=x.reduceByKey(_ + _) y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at
reduceByKey at <console>:29 scala> y.collect res1: Array[(String, Int)] = Array((a,3), (b,5)) scala>
FILTER:
- Filter is a transformation operation in Spark hence it is lazily evaluated
- It is a narrow operation as it is not shuffling data from one partition to multiple partitions
- Filter accepts predicate as an argument and will filter the elements from source RDD which are not satisfied by predicate function
scala> val x=sc.parallelize(1 to 10,2) x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at
parallelize at <console>:27 scala> val y=x.filter(e=>e%2==0) y: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at filter at <console>:29 scala> y.collect res2: Array[Int] = Array(2, 4, 6, 8, 10)
shorter syntax
val y=x.filter(_ %2 == 0) y: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:29 scala> y.collect res3: Array[Int] = Array(2, 4, 6, 8, 10)
GROUPBY:
- Scala groupBy is a transformation operation in Spark hence it is lazily evaluated
- It is a wide operation as it shuffles data from multiple partitions and create another RDD
- It is a costly operation as it doesn’t use combiner local to a partition to reduce the data transfer
- Not recommended to use when you need to do further aggregation on grouped data
This operation will return the new RDD which basically is made up with a KEY (which is a group) and list of items of that group (in a form of Iterator). Order of element within the group may not same when you apply the same operation on the same RDD over and over. Additionally, this operation considered to be very costly when you are trying to perform so
scala> val y=x.groupBy(_.charAt(0)) y: org.apache.spark.rdd.RDD[(Char, Iterable[String])] = ShuffledRDD[10] at
groupBy at <console>:29 scala> y.collect res5: Array[(Char, Iterable[String])] =
Array((T,CompactBuffer(Tina, Thomas)), (C,CompactBuffer(Cory, Christine)),
(J,CompactBuffer(Joseph, Jimmy, James, Jackeline, Juna)))
USING MAP AND FILTER ON ARRAY OF INTEGERS
scala> var num =Array(5,10,15,20,25) num: Array[Int] = Array(5, 10, 15, 20, 25) scala> val y=num.map(num=>num*num) y: Array[Int] = Array(25, 100, 225, 400, 625)
MAP/flatMap accepts the argument function which returns array, list or sequence of elements instead of a single element. As a final result it flattens all the elements of the resulting RDD in case individual elements are in form of list, array, sequence or any such collection In our case we are trying to filter the numbers which aren’t equal to 15. So it filters and displays the numbers which were not equals to 15 in ‘num’.
scala> val y=num.filter(e=>e!=15) y: Array[Int] = Array(5, 10, 20, 25)
READ FROM CSV AND IGNORE HEADER (SPARK 1.6 VERSION)
val sourceFile = "c:\\source.tsv" val input = sc.textFile(sourceFile) //Remove header val header =input.first() val rows = input.filter (line=>line != header)
STORE TEXT FILE AS PARQUET (SPARK 1.6)
def writeParquet(dfName:DataFrame, fileName:String):Unit={ dfName.write .option("header", "true") .mode("overwrite") .save("C:\\Boxes"+fileName) }
INCLUDE SCHEMA DURING DATE LOAD
/**** Read text file from the specified location. Add schema to the file */ val sourceFile = "C:\\Boxes\\purchase_order_header.tsv" val input = sc.textFile(sourceFile) val inputDF = input.flatMap { line => //split and assign it to a variable val record = line.split("\\t") //Validate if the actual input is of the actual length if (record.length == 13){ //flatMap would unbox and return the activity Some(domain.PurchaseOrder(record(0), record(1), record(2), record(3), record(4), record(5), record(6),record(7), record(8), record(9), record(10), record(11),
record(12))) } else None }.toDF()
specify schema
Link to github lambda_architecture
package object domain { case class Activity(timestamp_hour:Long, referrer:String, action:String, prevPage:String, visitor:String, page:String, product:String, inputProps:Map[String,String]=Map() ) case class PurchaseOrder(order_number :String, cosmos_customerid:String, storeid :String, purchase_channel:String, header_purchase_date :String, order_date :String, total_net_amount :String, total_gross_amount :String, total_gross_units:String, total_returned_units:String, total_discount_amount:String, total_return_sales:String, dt:String, inputProps:Map[String,String]=Map() ) }
GET MONTH DIFFERENCE BETWEEN 2 SPECIFIED DATES
val parquetSourceFile = "C:\\Boxes\purchase_order_parquet" val parquetFile = sqlContext.read .option("mergeSchema","true") .parquet(parquetSourceFile) //Register temp table parquetFile.registerTempTable("purchase_order_tb") parquetFile.printSchema() val activeUsers=sqlContext.sql("""select order_number , cosmos_customerid , header_purchase_date , datediff(from_unixtime(unix_timestamp()), to_date(header_purchase_date)) as diff , months_between(from_unixtime(unix_timestamp()),
header_purchase_date) as month_diff from purchase_order_tb""") activeUsers.foreach(println)
ADD NEW COLUMN USING WITH COLUMN
.withColumn("rolling_range",lit(range24))
USING AGGREGATED FUNCTION MAX AND WHERE WITH GROUP BY
val maxTransactionsOnline=loyalCustomers .where(upper(col("purchase_channel")) === "ONLINE") .groupBy("cosmos_customerid") .agg(max("header_purchase_date"))
CHECK FOR NULL
sourceDF.withColumn( "is_person_name_null", col("person_name").isNull ).show()
CREATE SPARK SESSION
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} val sparkSession=SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() omniTransactions.createOrReplaceTempView("omni") val result= spark.sql(""" select * from omni""") result.show()
select reflect('org.apache.commons.codec.digest.DigestUtils', 'sha256Hex', 'COLUMN_TO_BE_HASEHED') as email_hash_sha256
No comments:
Post a Comment