SHARK :THE BEGGING OF THE API
- SQL using Spark execution engine
- Evolved into Spark SQL in 1.0
SCHEMA RDD
- RDD with schema information
- For unit testing and debugging Spark SQL
- Drew attention by spark developers
- Released as DataFrame API in 1.3
SPARK SQL
- Module for structured data processing
- Schema give more information on the data
- As it has the schema it can perform more optimizations
- Interactive with SQL queries
- Works with Hive
- Any data source compatible with Spark
DATAFRAME
- Distributed collection of Row objects
- High level api is unified in version 2.0 as Dataset[Row]
- Equivalent to a database table with rows and columns and known schema
- Similar to dataframe in python
- We can work with structured and unstructured data. Conversion to and from RDD is possible
QUERIES
- Domain Specific Language(DSL)
- Relational
- Allows for optimizations
EXECUTION
SPARK SESSION
- Entry point to spark SQL
- Merges SQLContext, HiveContext and Spark context
- It is possible to have multiple spark session
- Independent SQLConf,UDF's and registered temporary views
- Shared SparkContext and table cache
MULTIPLE SPARK SESSION
import findspark findspark.init() from pyspark.sql import SparkSession #open first spark session spark_one = SparkSession \ .builder \ .appName("First spark session") \ .getOrCreate() #open second spark session spark_two = SparkSession \ .builder \ .appName("First spark session") \ .getOrCreate() print(spark_one) print(spark_two)
WORD COUNT PROGRAM
#text="Sample Word count program. Sample program" text_file=sc.textFile("./spark-warehouse/wordcount") counts =text_file.flatMap(lambda line:line.split(" ")) \ .map(lambda word:(word,1)) \ .reduceByKey(lambda a,b:a + b) counts.collect()
MANUALLY CREATE A DATAFRAME
qa_listDF=spark_one.createDataFrame([(1,'prathap'),(2,'sandeep'),(3,'john')]) print(type(qa_listDF)) qa_listDF <class 'pyspark.sql.dataframe.DataFrame'> DataFrame[_1: bigint, _2: string]
CREATE DATAFRAME WITH ROW OBJECTS
- List of Row() objects
- A Row represents a row of data in a DataFrame
- Each Row object is a container, with additional attributes
from pyspark.sql import Row # create 3 rows df_from_row=spark_one.createDataFrame([Row(1,'prathap'),Row(2,'sandeep'),Row(3,'john')]) df_from_row.collect() [Row(_1=1, _2='prathap'), Row(_1=2, _2='sandeep'), Row(_1=3, _2='john')]
ADD COLUMN NAMES
### Create DF with column names df_from_row_column=df_from_row.toDF("user_id","Name") df_from_row_column.show() +-------+-------+ |user_id| Name| +-------+-------+ | 1|prathap| | 2|sandeep| | 3| john| +-------+-------+
DATA FRAME TO RDD AND VICE VERSA
- we can create data-frames from rdd using toDF() or createDataFrame()
#Create a new rdd rdd=spark_one.sparkContext.parallelize([(1,'prathap'),(2,'sandeep'),(3,'john')]) print(rdd) #Create a new datarame using toDF dataFrame_rdd=rdd.toDF() print(dataFrame_rdd) #Create dataFrame using CreateDataFrame dataFrame_rdd_create=spark_one.createDataFrame(rdd) print(dataFrame_rdd_create) Note: There is no change in the type
FROM DATAFRAME GET RDD
We need to use rdd on DataFrame to get an RDD
rdd=dataFrame_rdd_create.rdd.collect()
GET SPARK UI MASTER ADDRESS AND APP NAME
#Get the spark UI address print(sc.getConf().get('spark.driver.appUIAddress')) #Get the master address print(sc.master) #Get app name print(sc.appName)
LOAD DATAFRAME WITH CUSTOM SCHEMA SCHEMA IN JSON FORMAT
[ { "column_name" :"first_name", "column_type" :"StringType", "pk" :"", "cust_ind" :"false", "desc" :"first name",
"values" :"String value" }, { "column_name" :"last_name", "column_type" :"StringType", "pk" :"true", "cust_ind" :"false", "desc" :"last name", "values" :"string value" } ]
SPARK CODE
# Load the JSON file to read the schema. with open('cust_schema.json') as sch: forecast_cust_schema = json.load(sch) #Generate fields based on the schema defintion. fields = [StructField(field['column_name'], \
get_data_type(field['column_type'])) for field in forecast_cust_schema] #Create a List of StructField cust_schema = StructType(fields) print(cust_schema) #Load data interval_load=self.__spark.read.schema(cust_schema) \
.csv('./spark-warehouse/employee.csv') print(interval_load.show())
MATRIX NOTATION Get individual data from a cell using Matrix notation
df.collect()[0][4] df.collect()[0][1] '01/01/2017'
EXTRACT SPECIFIC COLUMNS BY CONVERTING THE DATAFRAME TO AN RDD
Here we can perform a map operation
df.rdd \ .map(lambda x:(x.fname,x.lname))\ .collect()
SELECT METHOD TO GET SPECIFIC COLUMN
df.select('fname','lname') \ .show()
USING MAP OPERATION TO APPEND STRING
Note:Data Frames would not support map operation
df.rdd \ .map(lambda x:(x.birthdate + "_Year_Date"))\ .collect()
APPEND USING DATAFRAME¶
Note: Here we are getting the sum of 2 rows
df.select( "DAY1" , "DAY2" ) \ .withColumn( "Sum_DAY" , df.DAY1 + df.DAY2 ).show()
RENAME INDIVIDUAL COLUMN USING WITHCOLUMNRENAMED
df.withColumnRenamed("fname","first_Name").show()
RENAME INDIVIDUAL COLUMN USING ALIAS
df.select(df.fname.alias("first_name")).show(2)
CONVERT SPARK DATAFRAMES TO PANDAS DATAFRAMES
Note: Pandas dataframe would be in memory in a single machine
df_pandas=df.toPandas() df_pandas
CREATE SPARK DATAFRAME USING PANDAS DATAFRAMES
df_spark=spark.createDataFrame(df_pandas).show(2)
CONVERT COMPLEX DATA (RDD) INTO DATAFRAME
from pyspark.sql.types import Row from datetime import datetime # create rdd rdd =sc.parallelize([Row(id=1 , fname="prathap" , hobby=["tennis","soccer"] , interest ={"math" : 50,"computers": 80} , joined = datetime(2018,8,1,14,1,5)), Row(id=2 , fname="Sandeep" , hobby=["running","soccer"] , interest ={"math" : 50,"computers": 80} , joined = datetime(2017,8,1,14,1,5)) ]) #Convert RDD to DataFrame rdd_df=rdd.toDF() rdd_df.show()
//output
+-------+-----------------+---+--------------------+--------------------+ | fname| hobby| id| interest| joined| +-------+-----------------+---+--------------------+--------------------+ |prathap| [tennis, soccer]| 1|Map(computers -> ...|2018-08-01 14:01:...| |Sandeep|[running, soccer]| 2|Map(computers -> ...|2017-08-01 14:01:...| +-------+-----------------+---+--------------------+--------------------+
ACCESS DATA FROM COMPLEX DATA TYPE
#Register a temporary view rdd_df.createOrReplaceTempView("tb_hobby") spark.sql('SELECT \ id \ , hobby[1] \ , interest["computers"] FROM tb_hobby').show()
//output
+---+--------+-------------------+ | id|hobby[1]|interest[computers]| +---+--------+-------------------+ | 1| soccer| 80| | 2| soccer| 80| +---+--------+-------------------+
CREATE DATAFRAMES USING TUPLES
data =[ ('John',50), ('bob',80), ('Henry',90) ] spark.createDataFrame(data).show()
+-----+---+ | _1| _2| +-----+---+ | John| 50| | bob| 80| |Henry| 90| +-----+---+
CREATE EMPTY DATAFRAMES
df = self.__spark.createDataFrame( self.__spark.sparkContext.emptyRDD(), schema)
WINDOW FUNCTION FOR THE RUNNING SUM BASED ON PARTITION
over(partition by segment_id order by unique_id rows between unbounded preceding and current row) \ as total_loads \ ,
CHANGE CONFIGURATION IN SPARK
spark.conf.set
CAST TO DATE TIME USING DSL
df_cast=df.withColumn("login_date", (col("registered_date").cast("date"))) \ .withColumn("login_date",df.registered_date)df_cust.show()
CONCAT WITH LITERAL
df_cast=df.withColumn("id", concat(df.id,lit("-"),df.registered_date))df_cast.show()
SUM TWO COLUMNS
df_cast=dfwithColumn("sum_up",df.h1 + df.h2 + df.h24)
DATE FILTERS
In PySpark(python) one of the option is to have the column in unix_timestamp format.We can convert string to unix_timestamp and specify the format as shown below. Note we need to import unix_timestamp function
from pyspark.sql.functions import unix_timestamp
df.withColumn("tx_date", to_date(unix_timestamp(df_cast["date"], "MM/dd/yyyy").cast("timestamp")))
Now we can apply the filters
df_cast.filter(df_cast["tx_date"] >= lit('2017-01-01')) \
.filter(df_cast["tx_date"] <= lit('2017-01-31')).show()
LAG USING DSL (however has bugs)
from pyspark.sql.functions import lag #Window lag window_lag = Window.orderBy("unique_id") #Final df_final = df_total_loads.withColumn("prev_load", lag("total_load", 0) \ .over(window_lag))
CREATE DATAFRAME FROM LIST
stack = {0} stack.add(1) stack.add(2) stack.add(3) print(stack) row = Row("seg_id") ids=spark_session.sparkContext \ .parallelize(stack) df=ids.map(row).toDF() df.show()
APPEND IN DATA FRAMES
firstDF = spark.range(3).toDF("myCol") newRow = spark.createDataFrame([[20]]) appended = firstDF.union(newRow) display(appended)
great
ReplyDeleteAs I read the blog I felt a tug on the heartstrings. it exhibits how much effort has been put into this.
ReplyDeleteFinal Year Project Domains for CSE
Spring Training in Chennai
Project Centers in Chennai for CSE
Spring Framework Corporate TRaining