Monday, July 9, 2018

Spark samples (Spark SQL, Window functions , persist )

Image result for spark sql png

WRITE AS CSV
df_sample.write.csv("./spark-warehouse/SAMPLE.csv")

WRITE AS CSV WITH HEADER
df_sample.write.csv("./spark-warehouse/SAMPLE_5.csv",header=True) 


DISPLAY ONLY COLUMNS 
#Load csv as dataframe
data = spark.read.csv("./spark-warehouse/LOADS.csv", header=True)

#Register temp viw
data.createOrReplaceTempView("vw_data")

#load data based on the select query
load = spark.sql("Select * from vw_data limit 5")
load.columns



SAVE DATAFRAME AS TABLE

#Save dataframe as table
df_sample.select("ID").write.saveAsTable("load")

WRITE AND READ FROM HIVE METASTORE
#Save dataframe as table
df_sample.select("ID").write.saveAsTable("load")
#read from persisted table
readFromTable = spark.read.table("new_load")
readFromTable.show()

CONCAT TWO COLUMNS 
SELECT distinct concat(l.id,'-', l.nrml_yyyymm) as user_id

FILTER BASED ON DATE
 where ID=1 and Date > cast('2017-01-31' as date)"

CONVERT STRING TO UNIX TIMESTAMP
TO_DATE(CAST(UNIX_TIMESTAMP(Date, 'MM/dd/yyyy') AS TIMESTAMP)) AS Date

USING ROW_NUMBER
row_one = spark.sql("SELECT row_number() over (order by registered_date) as rnk \
                                      WHERE registered_date <= cast('2017-01-10' as date)")

WINDOW FUNCTION ON RUNNING SUM
spark.sql("SELECT  sum(salary) 
                  over(order by unique_id rows 
                        between unbounded preceding and current row) as total_salary\
                            FROM vw_employee  \
                                 WHERE credit_date <= cast('2017-01-20' as date)")

WINDOW SUM ON RUNNING SUM BASED ON PARTITION
 over(partition by id order by unique_id 
rows between unbounded preceding and current row) \
                                              as total_profit \
                                            ,

GET MAXIMUM ROW WINDOW
 # Fecth only the rows where we have both 
sample_data.createOrReplaceTempView("vw_split_row")
sample_data_row=spark.sql("SELECT max(prev_total_loads) over (order by 
id rows unbounded preceding) 
as max FROM vw_split_row \")  

FIND PREVIOUS LAG
 df_sample.createOrReplaceTempView("vw_split_month")
  # Get the row number which needs to be considered 
    tier =spark.sql("SELECT lag(total_loads,1) over (order by id) as prev_total_loads \
                            FROM vw_split_month ")

5 comments:

  1. Thanks for the blog post buddy! Keep them coming... camping mit kind und hund

    ReplyDelete
  2. Such an excellent and interesting information in your blog, it is awesome to read and do post like this with more informations. Salesforce Classes Singapore  

    ReplyDelete