Monday, July 9, 2018

Spark samples (Spark SQL, Window functions , persist )

Image result for spark sql png

WRITE AS CSV
df_sample.write.csv("./spark-warehouse/SAMPLE.csv")

WRITE AS CSV WITH HEADER
df_sample.write.csv("./spark-warehouse/SAMPLE_5.csv",header=True) 


DISPLAY ONLY COLUMNS 
#Load csv as dataframe
data = spark.read.csv("./spark-warehouse/LOADS.csv", header=True)

#Register temp viw
data.createOrReplaceTempView("vw_data")

#load data based on the select query
load = spark.sql("Select * from vw_data limit 5")
load.columns



SAVE DATAFRAME AS TABLE

#Save dataframe as table
df_sample.select("ID").write.saveAsTable("load")

WRITE AND READ FROM HIVE METASTORE
#Save dataframe as table
df_sample.select("ID").write.saveAsTable("load")
#read from persisted table
readFromTable = spark.read.table("new_load")
readFromTable.show()

CONCAT TWO COLUMNS 
SELECT distinct concat(l.id,'-', l.nrml_yyyymm) as user_id

FILTER BASED ON DATE
 where ID=1 and Date > cast('2017-01-31' as date)"

CONVERT STRING TO UNIX TIMESTAMP
TO_DATE(CAST(UNIX_TIMESTAMP(Date, 'MM/dd/yyyy') AS TIMESTAMP)) AS Date

USING ROW_NUMBER
row_one = spark.sql("SELECT row_number() over (order by registered_date) as rnk \
                                      WHERE registered_date <= cast('2017-01-10' as date)")

WINDOW FUNCTION ON RUNNING SUM
spark.sql("SELECT  sum(salary) 
                  over(order by unique_id rows 
                        between unbounded preceding and current row) as total_salary\
                            FROM vw_employee  \
                                 WHERE credit_date <= cast('2017-01-20' as date)")

WINDOW SUM ON RUNNING SUM BASED ON PARTITION
 over(partition by id order by unique_id 
rows between unbounded preceding and current row) \
                                              as total_profit \
                                            ,

GET MAXIMUM ROW WINDOW
 # Fecth only the rows where we have both 
sample_data.createOrReplaceTempView("vw_split_row")
sample_data_row=spark.sql("SELECT max(prev_total_loads) over (order by 
id rows unbounded preceding) 
as max FROM vw_split_row \")  

FIND PREVIOUS LAG
 df_sample.createOrReplaceTempView("vw_split_month")
  # Get the row number which needs to be considered 
    tier =spark.sql("SELECT lag(total_loads,1) over (order by id) as prev_total_loads \
                            FROM vw_split_month ")

No comments:

Post a Comment