Tuesday, July 10, 2018

Spark samples (RDD, DataFrames,DSL)



SHARK :THE BEGGING OF THE API 
  • SQL using Spark execution engine
  • Evolved into Spark SQL in 1.0
SCHEMA RDD
  • RDD with schema information
  • For unit testing and debugging Spark SQL
  • Drew attention by spark developers
  • Released as DataFrame API in 1.3

SPARK SQL
  • Module for structured data processing
  • Schema give more information on the data
  • As it has the schema it can perform more optimizations
  • Interactive with SQL queries
  • Works with Hive
  • Any data source compatible with Spark
DATAFRAME
  • Distributed collection of Row objects
  • High level api is unified in version 2.0 as Dataset[Row]
  • Equivalent to a database table with rows and columns and known schema
  • Similar to dataframe in python
  • We can work with structured and unstructured data. Conversion to and from RDD is possible

QUERIES
  • Domain Specific Language(DSL)
  • Relational
  • Allows for optimizations
EXECUTION

SPARK SESSION
  • Entry point to spark SQL
  • Merges SQLContext, HiveContext and Spark context
  • It is possible to have multiple spark session
  • Independent SQLConf,UDF's and registered temporary views
  • Shared SparkContext and table cache
MULTIPLE SPARK SESSION
import findspark
findspark.init()

from pyspark.sql import SparkSession

#open first spark session 
spark_one = SparkSession \
    .builder \
    .appName("First spark session") \
    .getOrCreate()

#open second spark session 
spark_two = SparkSession \
    .builder \
    .appName("First spark session") \
    .getOrCreate()

print(spark_one)
print(spark_two)

WORD COUNT PROGRAM
#text="Sample Word count program. Sample program"
text_file=sc.textFile("./spark-warehouse/wordcount")
counts =text_file.flatMap(lambda line:line.split(" ")) \
                 .map(lambda word:(word,1)) \
                 .reduceByKey(lambda a,b:a + b)

counts.collect()

MANUALLY CREATE A DATAFRAME
qa_listDF=spark_one.createDataFrame([(1,'prathap'),(2,'sandeep'),(3,'john')])
print(type(qa_listDF))
qa_listDF

<class 'pyspark.sql.dataframe.DataFrame'>
DataFrame[_1: bigint, _2: string]

CREATE DATAFRAME WITH ROW OBJECTS
  • List of Row() objects
  • A Row represents a row of data in a DataFrame
  • Each Row object is a container, with additional attributes
from pyspark.sql import Row
# create 3 rows
df_from_row=spark_one.createDataFrame([Row(1,'prathap'),Row(2,'sandeep'),Row(3,'john')])
df_from_row.collect() 

[Row(_1=1, _2='prathap'), Row(_1=2, _2='sandeep'), Row(_1=3, _2='john')]

ADD COLUMN NAMES
### Create DF with column names
df_from_row_column=df_from_row.toDF("user_id","Name")
df_from_row_column.show()

+-------+-------+
|user_id|   Name|
+-------+-------+
|      1|prathap|
|      2|sandeep|
|      3|   john|
+-------+-------+

DATA FRAME TO RDD AND VICE VERSA
  •  we can create data-frames from rdd using toDF() or createDataFrame()
#Create a new rdd
rdd=spark_one.sparkContext.parallelize([(1,'prathap'),(2,'sandeep'),(3,'john')])
print(rdd) 
#Create a new datarame using toDF
dataFrame_rdd=rdd.toDF()
print(dataFrame_rdd)
#Create dataFrame using CreateDataFrame
dataFrame_rdd_create=spark_one.createDataFrame(rdd)
print(dataFrame_rdd_create)
Note: There is no change in the type 

FROM  DATAFRAME GET RDD
We need to use rdd on DataFrame to get an RDD
rdd=dataFrame_rdd_create.rdd.collect()

GET SPARK UI MASTER ADDRESS AND APP NAME
#Get the spark UI address
print(sc.getConf().get('spark.driver.appUIAddress'))

#Get the master address
print(sc.master)

#Get app name
print(sc.appName)

LOAD DATAFRAME WITH CUSTOM SCHEMA SCHEMA IN JSON FORMAT
 [
     { 
        "column_name" :"first_name",
        "column_type" :"StringType",
        "pk"          :"",
        "cust_ind"    :"false",
        "desc"        :"first name",
        "values"      :"String value"
    },
    { 
        "column_name" :"last_name",
        "column_type" :"StringType",
        "pk"          :"true",
        "cust_ind"    :"false",
        "desc"        :"last name",
        "values"      :"string value"
    }
     
  ]
    

SPARK CODE
 # Load the JSON file to read the schema.
        with open('cust_schema.json') as sch:
            forecast_cust_schema = json.load(sch)
            
            #Generate fields based on the schema defintion.
            fields = [StructField(field['column_name'], \
               get_data_type(field['column_type'])) for field in forecast_cust_schema]
           
            #Create a List of StructField
            cust_schema = StructType(fields)
            print(cust_schema)
            #Load data
            interval_load=self.__spark.read.schema(cust_schema) \
                      .csv('./spark-warehouse/employee.csv')
            print(interval_load.show())


MATRIX NOTATION Get individual data from a cell using Matrix notation
df.collect()[0][4]
df.collect()[0][1]
'01/01/2017'

EXTRACT SPECIFIC COLUMNS BY CONVERTING THE DATAFRAME TO AN RDD
 Here we can perform a map operation
df.rdd \
    .map(lambda x:(x.fname,x.lname))\
    .collect()

SELECT METHOD TO GET SPECIFIC COLUMN
df.select('fname','lname') \
        .show()
       

USING MAP OPERATION TO APPEND STRING
Note:Data Frames would not support map operation
df.rdd \
    .map(lambda x:(x.birthdate + "_Year_Date"))\
    .collect()
       

APPEND USING DATAFRAME
Note: Here we are getting the sum of 2 rows
df.select(
          "DAY1"
          , "DAY2"
            ) \
          .withColumn(
                      "Sum_DAY"
                       , df.DAY1 + df.DAY2
                     ).show()

RENAME INDIVIDUAL COLUMN USING WITHCOLUMNRENAMED
df.withColumnRenamed("fname","first_Name").show()

RENAME INDIVIDUAL COLUMN USING ALIAS
df.select(df.fname.alias("first_name")).show(2)

CONVERT SPARK DATAFRAMES TO PANDAS DATAFRAMES
Note: Pandas dataframe would be in memory in a single machine
df_pandas=df.toPandas()
df_pandas

CREATE SPARK DATAFRAME USING PANDAS DATAFRAMES
df_spark=spark.createDataFrame(df_pandas).show(2)

CONVERT COMPLEX DATA (RDD) INTO DATAFRAME
from pyspark.sql.types import Row

from datetime import datetime

# create rdd
rdd =sc.parallelize([Row(id=1
                            , fname="prathap"
                            , hobby=["tennis","soccer"]
                            , interest ={"math" : 50,"computers": 80}
                            , joined = datetime(2018,8,1,14,1,5)),
                        Row(id=2
                            , fname="Sandeep"
                            , hobby=["running","soccer"]
                            , interest ={"math" : 50,"computers": 80}
                            , joined = datetime(2017,8,1,14,1,5))
                       ])

#Convert RDD to DataFrame
rdd_df=rdd.toDF()
rdd_df.show()

//output
+-------+-----------------+---+--------------------+--------------------+
|  fname|            hobby| id|            interest|              joined|
+-------+-----------------+---+--------------------+--------------------+
|prathap| [tennis, soccer]|  1|Map(computers -> ...|2018-08-01 14:01:...|
|Sandeep|[running, soccer]|  2|Map(computers -> ...|2017-08-01 14:01:...|
+-------+-----------------+---+--------------------+--------------------+

ACCESS DATA FROM COMPLEX DATA TYPE
#Register a temporary view
rdd_df.createOrReplaceTempView("tb_hobby")

spark.sql('SELECT \
                    id \
                    , hobby[1] \
                    , interest["computers"] FROM tb_hobby').show()
//output
+---+--------+-------------------+
| id|hobby[1]|interest[computers]|
+---+--------+-------------------+
|  1|  soccer|                 80|
|  2|  soccer|                 80|
+---+--------+-------------------+

CREATE DATAFRAMES USING TUPLES
data =[ ('John',50),
        ('bob',80),
        ('Henry',90)
        ]
spark.createDataFrame(data).show()
+-----+---+
|   _1| _2|
+-----+---+
| John| 50|
|  bob| 80|
|Henry| 90|
+-----+---+

CREATE EMPTY DATAFRAMES
  df = self.__spark.createDataFrame( self.__spark.sparkContext.emptyRDD(), schema)
          


WINDOW FUNCTION FOR THE RUNNING SUM BASED ON PARTITION
 over(partition by segment_id order by unique_id 
rows between unbounded preceding and current row) \
                                              as total_loads \
                                            ,

CHANGE CONFIGURATION IN SPARK
spark.conf.set

CAST TO DATE TIME USING DSL
df_cast=df.withColumn("login_date", (col("registered_date").cast("date"))) \
          .withColumn("login_date",df.registered_date)df_cust.show()

CONCAT WITH LITERAL
df_cast=df.withColumn("id", concat(df.id,lit("-"),df.registered_date))df_cast.show()

SUM TWO COLUMNS
df_cast=dfwithColumn("sum_up",df.h1 + df.h2 + df.h24)

DATE FILTERS
In PySpark(python) one of the option is to have the column in unix_timestamp format.We can convert string to unix_timestamp and specify the format as shown below. Note we need to import unix_timestamp function
from pyspark.sql.functions import unix_timestamp

df.withColumn("tx_date", to_date(unix_timestamp(df_cast["date"], "MM/dd/yyyy").cast("timestamp")))
Now we can apply the filters
df_cast.filter(df_cast["tx_date"] >= lit('2017-01-01')) \
       .filter(df_cast["tx_date"] <= lit('2017-01-31')).show()
LAG USING DSL (however has bugs)
from pyspark.sql.functions import  lag
#Window lag
window_lag = Window.orderBy("unique_id")  
#Final 
df_final = df_total_loads.withColumn("prev_load", lag("total_load", 0) \
                                     .over(window_lag))

CREATE DATAFRAME FROM LIST
stack = {0}

stack.add(1)
stack.add(2)
stack.add(3)

print(stack)
row = Row("seg_id")
ids=spark_session.sparkContext \
                    .parallelize(stack)
                   
df=ids.map(row).toDF()
df.show()

APPEND IN DATA FRAMES
 firstDF = spark.range(3).toDF("myCol") newRow = spark.createDataFrame([[20]]) 
appended = firstDF.union(newRow) display(appended)

2 comments: