Spark — Write single file per (hive) partitions.

Sandipan Ghosh
6 min readJun 25, 2021

I had faced the problem of small files in Hadoop too many times during the spark ETL process, which is writing data to partitioned Hive tables.

Having too many small files in HDFS(Hive) will decrease general Hadoop performances and query performance.

Here is a solution I have been using for some time.

To illustrate the problem, I have used bellow data sets, which I had generated using the ‘Generate_Data’ script from the ‘Get_test_data’ section.

Records count of data points:

Customer = 143000000 Size = 30 GB

Order = 429010505 Size = 14 GB

Product = 38718 Size = 10 MB

order_product = 473966231 Size = 23 GB

I wanted to merge all the data points into a ‘customer-order-product’ table. This is a de-normalized table in the Hive. I wanted to keep the data partitioned by the order_date column.

I am using 6 nodes Hadoop cluster, in GCP, with ‘Hive 3.1.2’ and spark 3.1.1 and python 3.8 version.

Here is the schema of the tables —

CREATE EXTERNAL TABLE IF NOT EXISTS CUSTOMER(

`row_id` INT,

`full_name` STRING,

`name` STRING,

`gender` STRING,

`address` STRING,

`email` STRING,

`telephone` STRING,

`city` STRING,

`state` STRING,

`birth_date` STRING,

`cust_id` STRING,

`user_create_date` STRING

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ‘|’

STORED AS TEXTFILE

LOCATION ‘./data/spark_test_data_new/ghosh/customer’

;

CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS(

`order_id` STRING,

`cust_id` STRING,

`order_date` STRING,

`shipping_date` STRING,

`delivery_date` STRING,

`shipping_company` STRING

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ‘|’

STORED AS TEXTFILE

LOCATION ‘./data/spark_test_data_new/ghosh/order’

;

CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS_PRODUCT(

`order_id` STRING,

`uniq_id` STRING,

`quantity` INT

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ‘|’

STORED AS TEXTFILE

LOCATION ‘./data/spark_test_data_new/ghosh/order_product’

;

CREATE EXTERNAL TABLE IF NOT EXISTS PRODUCT(

`uniq_id` STRING,

`product_name` STRING,

`manufacturer` STRING,

`price` FLOAT,

`description` STRING

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ‘|’

STORED AS TEXTFILE

LOCATION ‘./data/spark_test_data_new/ghosh/product’

tblproperties(‘skip.header.line.count’=’1')

;

# CREATE FINAL TABLE

CREATE EXTERNAL TABLE IF NOT EXISTS CUSTOMER_ORDERS_PRODUCT(

`row_id` INT,

`full_name` STRING,

`name` STRING,

`gender` STRING,

`address` STRING,

`email` STRING,

`telephone` STRING,

`city` STRING,

`state` STRING,

`birth_date` STRING,

`cust_id` STRING,

`user_create_date` STRING,

`order_id` STRING,

`shipping_date` STRING,

`delivery_date` STRING,

`shipping_company` STRING,

`uniq_id` STRING,

`quantity` INT,

`product_name` STRING,

`manufacturer` STRING,

`price` FLOAT,

`description` STRING,

`total_sales` FLOAT

)

PARTITIONED BY (`order_date` DATE)

STORED AS ORC

LOCATION ‘./data/spark_test_data_new/ghosh/customer_orders_product’

tblproperties(‘orc.compress’=’SNAPPY’);

;

I have kept the final table as partition by order date as most of the reporting and the ad-hoc query would be run based on the ‘order date’. Keeping the column as the partition column will improve the hive query performance.

ORC is a columnar file format that is very efficient for reading, writing and processing hive/Hadoop data. In ORC format, to select few columns, the data reader does not require to read the entire row; it can very efficiently read the individual columns mentioned in the query. So this is very fast for denormalized tables, where users select a set of columns rather than the entire columns set.

Spark Process

Here is the process in a nutshell.

  1. Read the data —

product = spark.read.table(“product”)

orders_product = spark.read.table(“orders_product”)

orders = spark.read.table(“orders”)

customer = spark.read.table(“customer”)

2. Register as a temp table —

product.createOrReplaceTempView(“product”)

orders_product.createOrReplaceTempView(“orders_product”)

orders.createOrReplaceTempView(“orders”)

customer.createOrReplaceTempView(“customer”)

3. Create the merge data set from base tables —

merge_data = spark.sql(“””select

cust.row_id ,

cust.full_name ,

cust.name ,

cust.gender ,

cust.address ,

cust.email ,

cust.telephone ,

cust.city ,

cust.state ,

cust.birth_date ,

cust.cust_id ,

cust.user_create_date ,

ord.order_id ,

ord.shipping_date ,

ord.delivery_date ,

ord.shipping_company ,

ordp.uniq_id ,

ordp.quantity ,

prod.product_name ,

prod.manufacturer ,

prod.price ,

prod.description ,

(prod.price * ordp.quantity) as total_sales ,

cast(ord.order_date as date) as order_date

FROM customer as cust

inner join orders as ord

on cust.cust_id = ord.cust_id

inner join orders_product as ordp

on ordp.order_id = ord.order_id

inner join product as prod

on prod.uniq_id = ordp.uniq_id

“””)

merge_data.createOrReplaceTempView(“merge_data”)

4. Set the spark for dynamic partition —

spark.conf.set(“hive.exec.dynamic.partition”, “true”)

spark.conf.set(“hive.exec.dynamic.partition.mode”, “nonstrict”)

Without the above statements, the spark process will not be able to insert data in the portions.

5. Insert into the final table —

spark.sql(“””insert overwrite table CUSTOMER_ORDERS_PRODUCT partition(order_date)

select

row_id,

full_name,

name,

gender,

address,

email,

telephone,

city,

state,

birth_date,

cust_id,

user_create_date,

order_id,

shipping_date,

delivery_date,

shipping_company,

uniq_id,

quantity,

product_name,

manufacturer,

price,

description,

total_sales,

order_date

from merge_data”””)

Here is the capture of the number of partitions and the number of files per partition. This process has created 511400 files, almost 200 small files per partition.

total number of partition dir (2558) and files(511400)
Number of files(200) per partition
size of each file

The data set has 2557 unique ‘order_date’. So the final table will have 2557 partitions. I am using the default spark shuffle partition, which is 200. This means in every partition; we will have almost 200 files; in total, the spark generated 511400 small files. This is really bad for the name node and hive/spark process.

One way to solve the small files problem is to create single file par partitions. In this way, we will have only 2557 files instead of 511400.

To solve this problem, we can repartition the final data (merge_data DF in the code) by partition column. This will force the spark to re-shuffle the data and put all the same ‘order_date’ data to the same executor/partition. During the spark write operation, the executor will write the data to the particular hive partition. One downside is that as we are forcing the spark to do data shuffle, which can be time-consuming.

Here is the code.

merge_data_parti = merge_data.repartition(“order_date”)

merge_data_parti.createOrReplaceTempView(“merge_data_parti”)

merge_data_parti = merge_data.repartition(“order_date”)

merge_data_parti.createOrReplaceTempView(“merge_data_parti”)

spark.sql(“””insert overwrite table CUSTOMER_ORDERS_PRODUCT partition(order_date)

select

row_id,

full_name,

name,

gender,

address,

email,

telephone,

city,

state,

birth_date,

cust_id,

user_create_date,

order_id,

shipping_date,

delivery_date,

shipping_company,

uniq_id,

quantity,

product_name,

manufacturer,

price,

description,

total_sales,

order_date

from merge_data”””)

total number of partition dir (2558) and files(2557)
Number of files(one) per partition
size of each file (23 MB)

Repartition by partition column should solve the problem!! We can not use coalesce as keeping the data under its own partition needs data to be shuffled. Coalesce will not help here. Also, keep in mind as you are pushing spark to have one spark partition per partitioned data, the performance of the spark job will degrade. This will become worse if you have skewed data. Bellow is the capture of the spark job from the SQL above with skewed data in the orders table for the date 2020–01–01. This date has 10 GB of orders data. This will significantly slow down the spark process and will generate an enormous amount of shuffle files. In my cluster, with the skewed data, above SQL did not run. It runs for 10 hours, creates 300 GB of shuffle file, then eventually fail. In my next article, I will discuss how to optimize spark for skewed data.

Enormous shuffle file due to skewed data. check out spill to memory and disk.
Fail due to skewed data

Please checkout the git location for the updated code — https://github.com/ghoshm21/spark_book

Please email me for any feedback/comment/help — contact@sandipan.tech

--

--

Sandipan Ghosh

Bigdata solution architect & Lead data engg with experience in building data-intensive applications,tackling challenging architectural and scalability problems.