Databricks Liquid Clustering
What is Liquid clustering?
Delta Lake liquid clustering replaces table partitioning and ZORDER to simplify data layout decisions and optimize query performance. Liquid clustering provides flexibility to redefine clustering keys without rewriting existing data, allowing data layout to evolve alongside analytic needs over time. Liquid clustering accomplishes these by leveraging a tree-based algorithm to incrementally map the data layout and maintain the associated metadata as part of the table’s Delta Lake logs.
Why Liquid Clustering?
Before dive deep into Liquid Clustering lets try to understand what are the limitation of Hive-Style Partitioning and Z-Order indexing.
Hive-style partitioning
Hive-style partitioning clusters data such that every file contains exactly one distinct combination of partition column values. Although Hive-style partitioning works well if tuned correctly, there are limitations:
- Since partition values are physical boundaries for files, Hive-style partitioning by high cardinality columns will create many small files that cannot be combined, resulting in poor scan performance.
- In Spark/Delta, once a table is partitioned, its partition strategy cannot be changed, thus being unable to adapt to new use cases such as query pattern changes, etc.
Z-Order clustering
ZORDER is a multi-dimensional clustering technique used in Delta tables. The OPTIMIZE ZORDER BY command applies ZORDER clustering and improves the performance of queries that utilize ZORDER BY columns in their predicates. However, it has the following limitations:
- OPTIMIZE ZORDER BY is an operation that rewrites all data in the table, resulting in high write amplification. Also, no partial results are saved when execution fails.
- ZORDER BY columns are not persisted and the user is required to remember the previous ZORDER BY columns, often causing user errors.
- Z-Order curve has some locality preservation issues.
Now we understand the limitations of hive style partitioning and z-order indexing. Its time to deep dive on Liquid clustering!!
How Liquid clustering works in background?
Note: You can skip this and jump on to liquid clustering usage if you are only interested in liquid clustering usage and benefits.
Liquid clustering uses Hilbert curve, a continuous fractal space-filling curve as a multi-dimensional clustering technique for Liquid, which significantly improves data skipping over ZORDER.
Similar to how ZORDER works, Hilbert curve maps multidimensional data onto a 1D space by fitting them on the curve. This preserves the locality very well meaning points that are close in the 1D space should also be close in multi-dimensional space. We can leverage this property to do efficient data skipping. Take the following example of a simple two-column table, with 64 distinct records. Each dotted rectangle represents a single file, and each file contains 4 records.
In this example, Liquid clustering first translate the two columns A and B into a numeric range of [0, 7] by range partitioning the value distribution. The Hilbert curve gives us a nice property that adjacent points on the curve always have a distance of 1. To make use of this, we partition the data by the points of the curve, and then pack the nearby points into good-size files. This means each file contains points that are close to each other on the curve, which means they’ll have close min/max ranges for each of the clustering dimensions.
Liquid clustering is incremental clustering capability with Liquid, which allows users to run OPTIMIZE without rewriting all data. OPTIMIZE will also be completed in batches, where each batch will produce a single OPTIMIZE commit, so that not all progress is lost when something goes wrong.
Incremental clustering is built around the concept of ZCubes. A ZCube is a group of files produced by the same OPTIMIZE job. Since Liquid clustering only want to rewrite fresh/unoptimized files, it distinguish between already optimized files (that are part of some ZCubes) from unoptimized files using the ZCUBE_ID tag in AddFile. It will generate a unique ZCUBE_ID using UUID for each new ZCube.
Liquid Clustering introduces two configs to provide flexibility for controlling which files to consider for clustering:
- MIN_CUBE_SIZE: ZCube size for which new data will no longer be merged with it during incremental OPTIMIZE. Defaults to 100 GB.
- TARGET_CUBE_SIZE: Liquid Cluster creates target size of the ZCubes. This is not a hard max; we will continue adding files to a ZCube until their combined size exceeds this value. This value must be greater than or equal to MIN_CUBE_SIZE. Defaults to 150 GB.
Any ZCubes with a size less than MIN_CUBE_SIZE a partial ZCube. All new files will be considered for OPTIMIZE but, also consider any existing partial ZCubes. Once a partial ZCube accumulates enough data and crosses the MIN_CUBE_SIZE threshold, it becomes a stable ZCube, at which point ZCube no longer considered it for rewriting.
Stable ZCubes may become partial again if DML operations delete too many files, at which point files for partial ZCubes will be considered for clustering.
When to use liquid clustering?
- Tables often filtered by high cardinality columns.
- Tables with significant skew in data distribution.
- Tables that grow quickly and require maintenance and tuning effort.
- Tables with concurrent write requirements.
- Tables with access patterns that change over time.
- Tables where a typical partition key could leave the table with too many or too few partitions
What happens to the tables that are already z-ordered? Can you still apply liquid clustering.
Yes, we can apply Liquid clustering on tables which are currently using z-order index. Use same columns used for Z-order indexing(max 4) and Liquid clustering will cluster the data as per column specified.
Where/How to use liquid clustering?
To enable liquid clustering, add the CLUSTERBY phrase to a table creation or while saving dataframe as table
-- SQL
-- Create an empty table
CREATE TABLE Country(country_id int, country_name string) CLUSTER BY (country_id);
-- Using a CTAS statement
CREATE EXTERNAL TABLE Country CLUSTER BY (country_id) -- specify clustering after table name, not in subquery
LOCATION 'path/to/location'
AS SELECT * FROM Geo;
#pyspark code
df = spark.read.table("non_cluster_country")
df.write.clusterBy("country_id").saveAsTable("Country")
Choose clustering keys
Databricks recommends choosing clustering keys based on the columns most frequently used in query filters. Clustering keys can be defined in any order. If two columns are highly correlated, you only need to include one of them as a clustering key.
You can specify up to four clustering keys. For smaller tables (under 10 TB), using more clustering keys (for example, four) can degrade performance when filtering on a single column compared to using fewer clustering keys (for example, two). However, as table size increases, the performance difference with using more clustering keys for single-column queries becomes negligible.
Trigger clustering table
To trigger clustering, you must use Databricks Runtime 13.3 LTS or above. Use the OPTIMIZE command on your table.
spark.sql("OPTIMIZE Country")
We have used this liquid clustering for one of our job and tried to compare the monthly cost on before and after using Liquid clustering. Here are the results. We are able to save 900 dollars per month i.e, ~11k dollars per year.
Summary
In this post we have discussed about What is Liquid Clustering, Why we need it, how it works in the background and how we can leverage liquid clustering to cluster data to simplify data layouts.
Now We know how to implement liquid clustering. With this we can increase the performance of data pipelines and reduce overall job duration.
Thanks for reading.
If you like this article, please hold the clap button!
Happy Clustering !!!
References: