Spark — Write single file per (hive) partitions.
I had faced the problem of small files in Hadoop too many times during the spark ETL process, which is writing data to partitioned Hive tables.
Having too many small files in HDFS(Hive) will decrease general Hadoop performances and query performance.
Here is a solution I have been using for some time.
To illustrate the problem, I have used bellow data sets, which I had generated using the ‘Generate_Data’ script from the ‘Get_test_data’ section.
Records count of data points:
Customer = 143000000 Size = 30 GB
Order = 429010505 Size = 14 GB
Product = 38718 Size = 10 MB
order_product = 473966231 Size = 23 GB
I wanted to merge all the data points into a ‘customer-order-product’ table. This is a de-normalized table in the Hive. I wanted to keep the data partitioned by the order_date column.
I am using 6 nodes Hadoop cluster, in GCP, with ‘Hive 3.1.2’ and spark 3.1.1 and python 3.8 version.
Here is the schema of the tables —
CREATE EXTERNAL TABLE IF NOT EXISTS CUSTOMER(
`row_id` INT,
`full_name` STRING,
`name` STRING,
`gender` STRING,
`address` STRING,
`email` STRING,
`telephone` STRING,
`city` STRING,
`state` STRING,
`birth_date` STRING,
`cust_id` STRING,
`user_create_date` STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘|’
STORED AS TEXTFILE
LOCATION ‘./data/spark_test_data_new/ghosh/customer’
;
CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS(
`order_id` STRING,
`cust_id` STRING,
`order_date` STRING,
`shipping_date` STRING,
`delivery_date` STRING,
`shipping_company` STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘|’
STORED AS TEXTFILE
LOCATION ‘./data/spark_test_data_new/ghosh/order’
;
CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS_PRODUCT(
`order_id` STRING,
`uniq_id` STRING,
`quantity` INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘|’
STORED AS TEXTFILE
LOCATION ‘./data/spark_test_data_new/ghosh/order_product’
;
CREATE EXTERNAL TABLE IF NOT EXISTS PRODUCT(
`uniq_id` STRING,
`product_name` STRING,
`manufacturer` STRING,
`price` FLOAT,
`description` STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘|’
STORED AS TEXTFILE
LOCATION ‘./data/spark_test_data_new/ghosh/product’
tblproperties(‘skip.header.line.count’=’1')
;
# CREATE FINAL TABLE
CREATE EXTERNAL TABLE IF NOT EXISTS CUSTOMER_ORDERS_PRODUCT(
`row_id` INT,
`full_name` STRING,
`name` STRING,
`gender` STRING,
`address` STRING,
`email` STRING,
`telephone` STRING,
`city` STRING,
`state` STRING,
`birth_date` STRING,
`cust_id` STRING,
`user_create_date` STRING,
`order_id` STRING,
`shipping_date` STRING,
`delivery_date` STRING,
`shipping_company` STRING,
`uniq_id` STRING,
`quantity` INT,
`product_name` STRING,
`manufacturer` STRING,
`price` FLOAT,
`description` STRING,
`total_sales` FLOAT
)
PARTITIONED BY (`order_date` DATE)
STORED AS ORC
LOCATION ‘./data/spark_test_data_new/ghosh/customer_orders_product’
tblproperties(‘orc.compress’=’SNAPPY’);
;
I have kept the final table as partition by order date as most of the reporting and the ad-hoc query would be run based on the ‘order date’. Keeping the column as the partition column will improve the hive query performance.
ORC is a columnar file format that is very efficient for reading, writing and processing hive/Hadoop data. In ORC format, to select few columns, the data reader does not require to read the entire row; it can very efficiently read the individual columns mentioned in the query. So this is very fast for denormalized tables, where users select a set of columns rather than the entire columns set.
Spark Process
Here is the process in a nutshell.
- Read the data —
product = spark.read.table(“product”)
orders_product = spark.read.table(“orders_product”)
orders = spark.read.table(“orders”)
customer = spark.read.table(“customer”)
2. Register as a temp table —
product.createOrReplaceTempView(“product”)
orders_product.createOrReplaceTempView(“orders_product”)
orders.createOrReplaceTempView(“orders”)
customer.createOrReplaceTempView(“customer”)
3. Create the merge data set from base tables —
merge_data = spark.sql(“””select
cust.row_id ,
cust.full_name ,
cust.name ,
cust.gender ,
cust.address ,
cust.email ,
cust.telephone ,
cust.city ,
cust.state ,
cust.birth_date ,
cust.cust_id ,
cust.user_create_date ,
ord.order_id ,
ord.shipping_date ,
ord.delivery_date ,
ord.shipping_company ,
ordp.uniq_id ,
ordp.quantity ,
prod.product_name ,
prod.manufacturer ,
prod.price ,
prod.description ,
(prod.price * ordp.quantity) as total_sales ,
cast(ord.order_date as date) as order_date
FROM customer as cust
inner join orders as ord
on cust.cust_id = ord.cust_id
inner join orders_product as ordp
on ordp.order_id = ord.order_id
inner join product as prod
on prod.uniq_id = ordp.uniq_id
“””)
merge_data.createOrReplaceTempView(“merge_data”)
4. Set the spark for dynamic partition —
spark.conf.set(“hive.exec.dynamic.partition”, “true”)
spark.conf.set(“hive.exec.dynamic.partition.mode”, “nonstrict”)
Without the above statements, the spark process will not be able to insert data in the portions.
5. Insert into the final table —
spark.sql(“””insert overwrite table CUSTOMER_ORDERS_PRODUCT partition(order_date)
select
row_id,
full_name,
name,
gender,
address,
email,
telephone,
city,
state,
birth_date,
cust_id,
user_create_date,
order_id,
shipping_date,
delivery_date,
shipping_company,
uniq_id,
quantity,
product_name,
manufacturer,
price,
description,
total_sales,
order_date
from merge_data”””)
Here is the capture of the number of partitions and the number of files per partition. This process has created 511400 files, almost 200 small files per partition.
The data set has 2557 unique ‘order_date’. So the final table will have 2557 partitions. I am using the default spark shuffle partition, which is 200. This means in every partition; we will have almost 200 files; in total, the spark generated 511400 small files. This is really bad for the name node and hive/spark process.
One way to solve the small files problem is to create single file par partitions. In this way, we will have only 2557 files instead of 511400.
To solve this problem, we can repartition the final data (merge_data DF in the code) by partition column. This will force the spark to re-shuffle the data and put all the same ‘order_date’ data to the same executor/partition. During the spark write operation, the executor will write the data to the particular hive partition. One downside is that as we are forcing the spark to do data shuffle, which can be time-consuming.
Here is the code.
merge_data_parti = merge_data.repartition(“order_date”)
merge_data_parti.createOrReplaceTempView(“merge_data_parti”)
merge_data_parti = merge_data.repartition(“order_date”)
merge_data_parti.createOrReplaceTempView(“merge_data_parti”)
spark.sql(“””insert overwrite table CUSTOMER_ORDERS_PRODUCT partition(order_date)
select
row_id,
full_name,
name,
gender,
address,
email,
telephone,
city,
state,
birth_date,
cust_id,
user_create_date,
order_id,
shipping_date,
delivery_date,
shipping_company,
uniq_id,
quantity,
product_name,
manufacturer,
price,
description,
total_sales,
order_date
from merge_data”””)
Repartition by partition column should solve the problem!! We can not use coalesce as keeping the data under its own partition needs data to be shuffled. Coalesce will not help here. Also, keep in mind as you are pushing spark to have one spark partition per partitioned data, the performance of the spark job will degrade. This will become worse if you have skewed data. Bellow is the capture of the spark job from the SQL above with skewed data in the orders table for the date 2020–01–01. This date has 10 GB of orders data. This will significantly slow down the spark process and will generate an enormous amount of shuffle files. In my cluster, with the skewed data, above SQL did not run. It runs for 10 hours, creates 300 GB of shuffle file, then eventually fail. In my next article, I will discuss how to optimize spark for skewed data.
Please checkout the git location for the updated code — https://github.com/ghoshm21/spark_book
Please email me for any feedback/comment/help — contact@sandipan.tech