Try Iceberg FastForward Procedure
From Iceberg 1.4.0, the fast_forward
procedure is supported (see the release note). Using the procedure with the branching feature, you can check your table records before publishing records to data consumers. Through this post, we look when and how to run the procedure.
Scenario
Go through the following steps. The Iceberg configuration for spark is described in Appendix 1 at the bottom.
- Create an Iceberg table with sample records
- Create a branch
audit
based on the current table records - Switch the current branch
main
toaudit
and add records into the Iceberg table - Review the records in
audit
- Run FastForward
1. Create an Iceberg table with sample records
Initially create an Iceberg table via CTAS.
val location = "s3://bucket/path"
spark.sql(s"""
CREATE TABLE hive_catalog.db.tbl
USING iceberg
LOCATION '$location'
PARTITIONED BY (dt)
AS SELECT id, name, dt FROM VALUES
(1, 'tom', '2024-01-28'), (2, 'tan', '2024-01-29'), (3, 'dev', '2024-01-29') AS rec(id, name, dt)
""")
spark.sql(s"SELECT * FROM hive_catalog.db.tbl").show(false)
/*
+---+----+----------+
|id |name|dt |
+---+----+----------+
|3 |dev |2024-01-29|
|2 |tan |2024-01-29|
|1 |tom |2024-01-28|
+---+----+----------+
*/
// spark.sql("DESCRIBE EXTENDED hive_catalog.hivedb.ice_ff").show(false)
/*
+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
|col_name |data_type |comment|
+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
|id |int |NULL |
|name |string |NULL |
| | | |
|# Metadata Columns | | |
|_spec_id |int | |
|_partition |struct<> | |
|_file |string | |
|_pos |bigint | |
|_deleted |boolean | |
| | | |
|# Detailed Table Information| | |
|Name |hive_catalog.db.tbl | |
|Type |MANAGED | |
|Location |s3://bucket/path/ | |
|Provider |iceberg | |
|Owner |hadoop | |
|Table Properties |[current-snapshot-id=1760975580047100766,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]| |
+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
*/
The table metadata is stored in the following path:
spark.sql(s"SELECT * FROM hive_catalog.db.tbl.metadata_log_entries").show(false)
/*
+-----------------------+----------------------------------------------------------------------------------+-------------------+----------------+----------------------+
|timestamp |file |latest_snapshot_id |latest_schema_id|latest_sequence_number|
+-----------------------+----------------------------------------------------------------------------------+-------------------+----------------+----------------------+
|2024-01-30 05:04:04.189|s3://bucket/path/metadata/00000-3c3df3b3-e356-4bf7-8aec-85f3c35c474b.metadata.json|5729127839002563214|0 |1 |
+-----------------------+----------------------------------------------------------------------------------+-------------------+----------------+----------------------+
*/
The table metadata has the following Iceberg table information:
// $ aws s3 cp s3://bucket/path/metadata/00000-3c3df3b3-e356-4bf7-8aec-85f3c35c474b.metadata.json -
{
"format-version" : 2,
"table-uuid" : "2dff40ad-cd3d-4880-8e91-8d0cbc9bd3fd",
"location" : "s3://bucket/path",
"last-sequence-number" : 1,
"last-updated-ms" : 1706591044189,
"last-column-id" : 3,
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false,
"type" : "int"
}, {
"id" : 2,
"name" : "name",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "dt",
"required" : false,
"type" : "string"
} ]
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "dt",
"transform" : "identity",
"source-id" : 3,
"field-id" : 1000
} ]
} ],
"last-partition-id" : 1000,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "hadoop",
"write.parquet.compression-codec" : "zstd"
},
"current-snapshot-id" : 5729127839002563214,
"refs" : {
"main" : {
"snapshot-id" : 5729127839002563214,
"type" : "branch"
}
},
"snapshots" : [ {
"sequence-number" : 1,
"snapshot-id" : 5729127839002563214,
"timestamp-ms" : 1706591044189,
"summary" : {
"operation" : "append",
"spark.app.id" : "application_1706583192068_0003",
"added-data-files" : "2",
"added-records" : "3",
"added-files-size" : "1794",
"changed-partition-count" : "2",
"total-records" : "3",
"total-files-size" : "1794",
"total-data-files" : "2",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "s3://bucket/path/metadata/snap-5729127839002563214-1-5b5646a8-d7ae-440b-ba72-ffae2b3975c5.avro",
"schema-id" : 0
} ],
"statistics" : [ ],
"snapshot-log" : [ {
"timestamp-ms" : 1706591044189,
"snapshot-id" : 5729127839002563214
} ],
"metadata-log" : [ ]
}
2. Create a audit
branch from the current table records
In this section, create a new branch from the current 3 records in the Iceberg table. To do this, write.wap.enabled
is required to set true
.
spark.sql(s"ALTER TABLE hive_catalog.db.tbl SET TBLPROPERTIES ('write.wap.enabled'='true')")
Then create a new branch audit
for review new data that will be written into the Iceberg table. The branch is retained forever, and the snapshots are retained for 3 days.
spark.sql(s"ALTER TABLE hive_catalog.db.tbl CREATE BRANCH audit WITH SNAPSHOT RETENTION 3 DAYS")
3. Switch the current branch main
to audit
and add records into the Iceberg table
Change the current branch to audit
. You can check the current branch with the Spark query: SET spark.wap.branch
as follows:
spark.sql("SET spark.wap.branch = audit")
// Check the current branch
spark.sql("SET spark.wap.branch").show(false)
/*
+----------------+-----+
|key |value|
+----------------+-----+
|spark.wap.branch|audit|
+----------------+-----+
*/
Check the records in the audit
branch. The records are the same as the main
branch.
spark.sql(s"SELECT * FROM hive_catalog.db.tbl").show(false)
/*
+---+----+----------+
|id |name|dt |
+---+----+----------+
|3 |dev |2024-01-29|
|2 |tan |2024-01-29|
|1 |tom |2024-01-28|
+---+----+----------+
*/
Add new two records into the table. Here, one of the records (1000, 'Invalid value')
is assumed to be a broken record, the other one is a normal record.
spark.sql(s"INSERT INTO hive_catalog.db.tbl VALUES (4, 'iceberg', '2024-01-30'), (1000, 'Invalid value', '2024-01-30')")
spark.sql(s"SELECT * FROM hive_catalog.db.tbl").show(false)
/*
+----+-------------+----------+
|id |name |dt |
+----+-------------+----------+
|4 |iceberg |2024-01-30|
|1000|Invalid value|2024-01-30|
|3 |dev |2024-01-29|
|2 |tan |2024-01-29|
|1 |tom |2024-01-28|
+----+-------------+----------+
*/
Review the difference of records between the branches. The main
branch has still 3 records and the new 2 records are not in the table.
spark.sql("SET spark.wap.branch = main")
spark.sql(s"SELECT * FROM hive_catalog.db.tbl").show(false)
/*
+---+----+----------+
|id |name|dt |
+---+----+----------+
|3 |dev |2024-01-29|
|2 |tan |2024-01-29|
|1 |tom |2024-01-28|
+---+----+----------+
*/
Review the S3 metadata
In “1. Create an Iceberg table with sample records”, only 00000-3c3df3b3-e356-4bf7-8aec-85f3c35c474b.metadata.json
was created, but there are 3 new files are created as follows now:
00000-3c3df3b3-e356-4bf7-8aec-85f3c35c474b.metadata.json
00001-2549ebf9-a6db-470b-a129-7b4d63211006.metadata.json -> "write.wap.enabled" : "true" is added
00002-585ebf63-98f2-4ca8-ab35-d446cb362cb6.metadata.json -> "audit" is added in the "refs" part
00003-388f56e8-0949-484d-b842-cd79b70d9c20.metadata.json -> new snapshotid pointed from audit is added
See the diff
result between 00000
and 00003
in Appendix 2 at the bottom.
Check all branches
spark.sql(s"SELECT * FROM hive_catalog.db.tbl.refs").show(false)
/*
+-----+------+-------------------+-----------------------+---------------------+----------------------+
|name |type |snapshot_id |max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+-----+------+-------------------+-----------------------+---------------------+----------------------+
|main |BRANCH|5729127839002563214|NULL |NULL |NULL |
|audit|BRANCH|3060270616263082424|NULL |NULL |259200000 |
+-----+------+-------------------+-----------------------+---------------------+----------------------+
*/
4. Review the newly added records into audit
Before fast-forwarding main
to audit
, let’s review the records that added in the previous section. Two records were added, and one of them is broken. To achieve the WAP (Write-Audit-Publish) pattern, this review process is important for high data quality delivery to data consumers. In this post, we don’t focus on the data quality process, to simply demonstrate that process, just check the new records manually, and fix the broken record.
To fix the broken record, run the following UPDATE
query.
spark.sql(s"UPDATE hive_catalog.db.tbl SET id = 5, name = 'icecube' WHERE id = 1000")
Then, check if the broken record is fixed.
spark.sql(s"SELECT * FROM hive_catalog.db.tbl").show(false)
/*
+---+-------+----------+
|id |name |dt |
+---+-------+----------+
|4 |iceberg|2024-01-30|
|5 |icecube|2024-01-30|
|3 |dev |2024-01-29|
|2 |tan |2024-01-29|
|1 |tom |2024-01-28|
+---+-------+----------+
*/
5. Run fast forwarding
Run the following procedure to fast forward the current position in main
to audit
.
spark.sql(s"CALL hive_catalog.system.fast_forward(table => 'db.tbl', branch => 'main', to => 'audit')").show(false)
/*
+--------------+-------------------+-------------------+
|branch_updated|previous_ref |updated_ref |
+--------------+-------------------+-------------------+
|main |5729127839002563214|5571823322290436651|
+--------------+-------------------+-------------------+
*/
Review the main
branch records:
spark.sql("SET spark.wap.branch = main")
spark.sql(s"SELECT * FROM hive_catalog.db.tbl").show(false)
/*
+---+-------+----------+
|id |name |dt |
+---+-------+----------+
|4 |iceberg|2024-01-30|
|5 |icecube|2024-01-30|
|3 |dev |2024-01-29|
|2 |tan |2024-01-29|
|1 |tom |2024-01-28|
+---+-------+----------+
*/
Appendix
1. Iceberg configuration for Spark
Hive metastore is used as its backend. Here’s the configuration that are passed to Spark.
$ spark-shell/spark-submit --master yarn --deploy-mode client \
--jars ./iceberg-aws-bundle-1.4.3.jar,./iceberg-spark-runtime-3.5_2.12-1.4.3.jar \
--conf spark.sql.catalog.hive_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hive_catalog.type=hive \
--conf spark.sql.catalog.hive_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
2. Difference between 00000
and 00003
snapshots
5,6c5,6
< "last-sequence-number" : 1,
< "last-updated-ms" : 1706593145650,
---
> "last-sequence-number" : 2,
> "last-updated-ms" : 1706594326918,
57c57
< "snapshot-id" : 5729127839002563214,
---
> "snapshot-id" : 3060270616263082424,
81a82,102
> }, {
> "sequence-number" : 2,
> "snapshot-id" : 3060270616263082424,
> "parent-snapshot-id" : 5729127839002563214,
> "timestamp-ms" : 1706594326918,
> "summary" : {
> "operation" : "append",
> "spark.app.id" : "application_1706583192068_0003",
> "added-data-files" : "1",
> "added-records" : "2",
> "added-files-size" : "960",
> "changed-partition-count" : "1",
> "total-records" : "5",
> "total-files-size" : "2754",
> "total-data-files" : "3",
> "total-delete-files" : "0",
> "total-position-deletes" : "0",
> "total-equality-deletes" : "0"
> },
> "manifest-list" : "s3://bucket/path/metadata/snap-3060270616263082424-1-6616ee87-3667-4b77-aef9-66cf35dacb0d.avro",
> "schema-id" : 0
93a115,117
> }, {
> "timestamp-ms" : 1706593145650,
> "metadata-file" : "s3://bucket/path/metadata/00002-585ebf63-98f2-4ca8-ab35-d446cb362cb6.metadata.json"
➜ fastforward code .
➜ fastforward diff 00000-3c3df3b3-e356-4bf7-8aec-85f3c35c474b.metadata.json 00003-388f56e8-0949-484d-b842-cd79b70d9c20.metadata.json
5,6c5,6
< "last-sequence-number" : 1,
< "last-updated-ms" : 1706591044189,
---
> "last-sequence-number" : 2,
> "last-updated-ms" : 1706594326918,
46a47
> "write.wap.enabled" : "true",
53a55,59
> },
> "audit" : {
> "snapshot-id" : 3060270616263082424,
> "type" : "branch",
> "max-snapshot-age-ms" : 259200000
75a82,102
> }, {
> "sequence-number" : 2,
> "snapshot-id" : 3060270616263082424,
> "parent-snapshot-id" : 5729127839002563214,
> "timestamp-ms" : 1706594326918,
> "summary" : {
> "operation" : "append",
> "spark.app.id" : "application_1706583192068_0003",
> "added-data-files" : "1",
> "added-records" : "2",
> "added-files-size" : "960",
> "changed-partition-count" : "1",
> "total-records" : "5",
> "total-files-size" : "2754",
> "total-data-files" : "3",
> "total-delete-files" : "0",
> "total-position-deletes" : "0",
> "total-equality-deletes" : "0"
> },
> "manifest-list" : "s3://bucket/path/metadata/snap-3060270616263082424-1-6616ee87-3667-4b77-aef9-66cf35dacb0d.avro",
> "schema-id" : 0
82c109,118
< "metadata-log" : [ ]
---
> "metadata-log" : [ {
> "timestamp-ms" : 1706591044189,
> "metadata-file" : "s3://bucket/path/metadata/00000-3c3df3b3-e356-4bf7-8aec-85f3c35c474b.metadata.json"
> }, {
> "timestamp-ms" : 1706591507126,
> "metadata-file" : "s3://bucket/path/metadata/00001-2549ebf9-a6db-470b-a129-7b4d63211006.metadata.json"
> }, {
> "timestamp-ms" : 1706593145650,
> "metadata-file" : "s3://bucket/path/metadata/00002-585ebf63-98f2-4ca8-ab35-d446cb362cb6.metadata.json"
> } ]