Skip to content

Flink: Support writing shredded variant in Flink#15596

Merged
pvary merged 15 commits into
apache:mainfrom
Guosmilesmile:flink_shredded_varisnt_fileformat
May 22, 2026
Merged

Flink: Support writing shredded variant in Flink#15596
pvary merged 15 commits into
apache:mainfrom
Guosmilesmile:flink_shredded_varisnt_fileformat

Conversation

@Guosmilesmile
Copy link
Copy Markdown
Contributor

@Guosmilesmile Guosmilesmile commented Mar 12, 2026

This PR is mainly to add support in Flink for writing shredding-variant data to Iceberg tables, based on #14297.

This PR is based on #14297 and will be adjusted in sync with it.

@Guosmilesmile Guosmilesmile marked this pull request as draft March 12, 2026 08:09
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 15ff223 to 5b448b9 Compare March 12, 2026 08:22
@github-actions github-actions Bot removed the ORC label Mar 12, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 2 times, most recently from 8f6198a to b03caf6 Compare March 12, 2026 08:59
@Guosmilesmile Guosmilesmile changed the title Core,Flink: Support writing shredded variant in Flink Flink: Support writing shredded variant in Flink Mar 12, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 3 times, most recently from 88045e1 to cbfa8c2 Compare March 13, 2026 07:17
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from fae2814 to f3a2fba Compare March 24, 2026 05:50
@github-actions github-actions Bot added the core label Mar 24, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 4 times, most recently from b07b00b to c95d78f Compare March 24, 2026 08:36
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 3 times, most recently from fc8c45a to b116f25 Compare April 1, 2026 01:50
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from b116f25 to 650cb7a Compare April 10, 2026 09:42
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 770d9c4 to 7d48389 Compare May 7, 2026 06:55
@Guosmilesmile Guosmilesmile marked this pull request as ready for review May 7, 2026 08:34
@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

Hi @aihuaxu @nssalian @pvary @mxm . Since the Spark part has been merged, the Flink part has been adjusted accordingly. If you have time, please help review it.

Thanks!
GuoYu.

Comment on lines +270 to +271
.tableProperty(TableProperties.PARQUET_SHRED_VARIANTS)
.defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will we handle when ORC supports shredding variants?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch . I rename shred-variants to parquet-shred-variants to clarify this feature is only support parquet . If orc support this, then we can add another config.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do parquet for now since we followed that pattern for the Spark implementation.

FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
new FlinkVariantShreddingAnalyzer(),
(row, rowType) -> new RowDataSerializer(rowType).copy(row)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this costly to recreate every time when we copy a row?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will increase the cost, but without copying, there would be issues with data corruption when buffer data. We ran into this during early development, and the unit tests can reproduce it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the RowDataSerializer?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current BiFunction, (row, rowType) -> new RowDataSerializer(rowType).copy(row) creates a new RowDataSerializer for every buffered row (default buffer = 100). This construction is not free, as it involves walking rowType.getChildren(), building a TypeSerializer[] via InternalSerializers.create, a BinaryRowDataSerializer, and a RowData.FieldGetter[]. Since the engine schema is fixed for the entire file, a factory allows us to build it once and reuse it. Using the Factory Pattern, we can avoid recreating the serializer for a given table schema with every incoming record.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can use Function<S, UnaryOperator<D>> instead of BiFunction<D, S, D> to implement this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We should be able to reuse RowDataSerializer so we don't need to create new instance for every row.

@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 63ae5ae to 0f2ae10 Compare May 9, 2026 05:33
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from d0fe6b7 to 03fb902 Compare May 20, 2026 02:00
@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

@pvary @talatuyarer @nssalian @aihuaxu Hey all, I rebased main but ran into some CI failures. Looks like a new check was added recently that doesn't allow modifying the ParquetFormatModel parameter types directly.

As a workaround, I added a new method createWithCopyFuncFactory in ParquetFormatModel. The original create method now delegates to it, so the Spark code stays untouched, while FlinkFormatModels calls createWithCopyFuncFactory explicitly.

Would really appreciate it if you could help take another look at these changes. Thanks a lot!

java.method.parameterTypeChanged: The type of the parameter changed from 'java.util.function.UnaryOperator<D extends java.lang.Object>' to 'java.util.function.Function<S extends java.lang.Object, java.util.function.UnaryOperator<D extends java.lang.Object>>'.

old: parameter <D, S> org.apache.iceberg.parquet.ParquetFormatModel<D, S, org.apache.iceberg.parquet.ParquetValueReader<?>> org.apache.iceberg.parquet.ParquetFormatModel<D, S, R>::create(java.lang.Class<D>, java.lang.Class<S>, org.apache.iceberg.formats.BaseFormatModel.WriterFunction<org.apache.iceberg.parquet.ParquetValueWriter<?>, S, org.apache.parquet.schema.MessageType>, org.apache.iceberg.formats.BaseFormatModel.ReaderFunction<org.apache.iceberg.parquet.ParquetValueReader<?>, S, org.apache.parquet.schema.MessageType>, org.apache.iceberg.parquet.VariantShreddingAnalyzer<D, S>, ===java.util.function.UnaryOperator<D>===)
new: parameter <D, S> org.apache.iceberg.parquet.ParquetFormatModel<D, S, org.apache.iceberg.parquet.ParquetValueReader<?>> org.apache.iceberg.parquet.ParquetFormatModel<D, S, R>::create(java.lang.Class<D>, java.lang.Class<S>, org.apache.iceberg.formats.BaseFormatModel.WriterFunction<org.apache.iceberg.parquet.ParquetValueWriter<?>, S, org.apache.parquet.schema.MessageType>, org.apache.iceberg.formats.BaseFormatModel.ReaderFunction<org.apache.iceberg.parquet.ParquetValueReader<?>, S, org.apache.parquet.schema.MessageType>, org.apache.iceberg.parquet.VariantShreddingAnalyzer<D, S>, ===java.util.function.Function<S, java.util.function.UnaryOperator<D>>===)

https://github.com/apache/iceberg/actions/runs/26136650474/job/76873207059?pr=15596

@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 0cdb80e to e8ad73e Compare May 20, 2026 07:45
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from e8ad73e to 9480782 Compare May 20, 2026 07:50
@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

After discussing with @pvary , we decided to keep the same create name and go with deprecating the old method instead.

}

/**
* @deprecated Will be removed in 1.12.0; use {@link #create(Class, Class, WriterFunction,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the 1.11.0 is released now, this change will be in 1.12.0 and we will remove it in 1.13.0.

Please adjust the comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. Adjusted it now.

*/
private FileAppender<D> buildShreddedAppender() {
UnaryOperator<D> copyFunc = copyFuncFactory.apply(engineSchema);
Preconditions.checkState(copyFunc != null, "copyFunc must not return null");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking only the copyFunc seems a bit odd to me. Should we check the factory first?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, add a check for copyFuncFactory first.

@pvary
Copy link
Copy Markdown
Contributor

pvary commented May 21, 2026

@nssalian: Any more comments?

Copy link
Copy Markdown
Contributor

@nssalian nssalian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit in the documentation description. rest looks good.

Comment thread docs/docs/flink-configuration.md Outdated
| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |
| uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table |
| shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write |
| variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size this write |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size this write |
| variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size for this write |

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks . Add it now.

Copy link
Copy Markdown
Contributor

@nssalian nssalian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the work @Guosmilesmile. lgtm

@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

Seem unrelate error in kafka connect . Retrigger CI.

TestIntegrationDynamicTable > testIcebergSink(String) > [1] null FAILED
    java.lang.AssertionError at IntegrationTestBase.java:237

https://github.com/apache/iceberg/actions/runs/26236304974/job/77210319934?pr=15596

@pvary pvary merged commit 10ba4ee into apache:main May 22, 2026
108 of 110 checks passed
@pvary
Copy link
Copy Markdown
Contributor

pvary commented May 22, 2026

Merged to main.
Thanks @Guosmilesmile for the PR and @nssalian, @talatuyarer and @aihuaxu for the reviews!

@Guosmilesmile Guosmilesmile deleted the flink_shredded_varisnt_fileformat branch May 22, 2026 04:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants