Tuesday, July 10, 2018

Examples for compression and file format in spark

PARQUET
  • Design based on Google's Dremel paper
  • Schema segregated into footer
  • Column major format with stripes
  • Simpler type-model with logical types
  • All data pushed to leaves of the tree


ORC
  • Originally part of Hive to replace RCFile
  • Schema segregated into footer
  • Column major format with stripes
  • Richt type model, stored top-down
  • Integrated compression,indexes & stats
Note:
Parquet had been aggressively promoted by Cloudera and ORC by Hortonworks



Comparing Parquet vs ORC

- There is not much of storage savings when using using ORC and Parquet when using the same compression code like `SNAPPY vs SNAPPY` and `ZLIB vs GZIP`.

- The time for converting from CSV to ORC and Parquet format is very close, not much difference considering the total time it takes for the conversion.

- Hortonworks blog says that the ORC format provides much better compression ratio when compared to Parquet. This is a bit misleading as the default properties are being used, ZLIB for ORC and SNAPPY for Parquet. By making sure that both the formats use the compression codec, there is not much significant difference in the compression ratio as shown in the above matrix. So, it would be better to focus on the features.

- For aggregation queries like `time for the the delayed flights` there is not such a drastic difference. Both the ORC and Parquet formats perform considerably well when compared to the CSV format.

- While fetching all the columns for a single row using a condition like "where origin = 'LNY' and AirTime = 16;", ORC has an edge over Parquet because the ORC format has a light index along with each file. By using the indexes in ORC, the underlying MapRedeuce or Spark can avoid reading the entire block.

- The indexing in Parquet seems to be a good differentiator. Although the ORC has to create Index while creating the files, there is not significant difference for the conversion and also the size of the files for both the formats.

import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

object FileCompression {

  case class DataFrameSample(name: String, actor: String, episodeDebut: String)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("Spark File Compression Handling")
      .master("local")
      .getOrCreate()

    val df = spark.createDataFrame(
      DataFrameSample("California", "West coast", "Golden gate bridge") ::
        DataFrameSample("New York", "East coast", "Statue of Liberty") ::
        DataFrameSample("Washington", "East coast", "White house") ::
        DataFrameSample("Boston", "East coast", "Harvard") ::
       Nil).toDF().cache()


    df.write.mode("overwrite").format("parquet").option("compression", "none").mode("overwrite").save("/tmp/file_no_compression_parq")
    df.write.mode("overwrite").format("parquet").option("compression", "gzip").mode("overwrite").save("/tmp/file_with_gzip_parq")
    df.write.mode("overwrite").format("parquet").option("compression", "snappy").mode("overwrite").save("/tmp/file_with_snappy_parq")
    //lzo - requires a different method in terms of implementation.

    df.write.mode("overwrite").format("orc").option("compression", "none").mode("overwrite").save("/tmp/file_no_compression_orc")
    df.write.mode("overwrite").format("orc").option("compression", "snappy").mode("overwrite").save("/tmp/file_with_snappy_orc")
    df.write.mode("overwrite").format("orc").option("compression", "zlib").mode("overwrite").save("/tmp/file_with_zlib_orc")
  }

}

No comments:

Post a Comment