Flink: Support writing shredded variant in Flink#15596
Conversation
15ff223 to
5b448b9
Compare
8f6198a to
b03caf6
Compare
88045e1 to
cbfa8c2
Compare
fae2814 to
f3a2fba
Compare
b07b00b to
c95d78f
Compare
fc8c45a to
b116f25
Compare
b116f25 to
650cb7a
Compare
770d9c4 to
7d48389
Compare
| .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS) | ||
| .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT) |
There was a problem hiding this comment.
How will we handle when ORC supports shredding variants?
There was a problem hiding this comment.
Good catch . I rename shred-variants to parquet-shred-variants to clarify this feature is only support parquet . If orc support this, then we can add another config.
There was a problem hiding this comment.
Let's do parquet for now since we followed that pattern for the Spark implementation.
| FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); | ||
| FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), | ||
| new FlinkVariantShreddingAnalyzer(), | ||
| (row, rowType) -> new RowDataSerializer(rowType).copy(row))); |
There was a problem hiding this comment.
Isn't this costly to recreate every time when we copy a row?
There was a problem hiding this comment.
It will increase the cost, but without copying, there would be issues with data corruption when buffer data. We ran into this during early development, and the unit tests can reproduce it.
There was a problem hiding this comment.
Can we reuse the RowDataSerializer?
There was a problem hiding this comment.
With the current BiFunction, (row, rowType) -> new RowDataSerializer(rowType).copy(row) creates a new RowDataSerializer for every buffered row (default buffer = 100). This construction is not free, as it involves walking rowType.getChildren(), building a TypeSerializer[] via InternalSerializers.create, a BinaryRowDataSerializer, and a RowData.FieldGetter[]. Since the engine schema is fixed for the entire file, a factory allows us to build it once and reuse it. Using the Factory Pattern, we can avoid recreating the serializer for a given table schema with every incoming record.
There was a problem hiding this comment.
Yes, we can use Function<S, UnaryOperator<D>> instead of BiFunction<D, S, D> to implement this.
There was a problem hiding this comment.
+1. We should be able to reuse RowDataSerializer so we don't need to create new instance for every row.
63ae5ae to
0f2ae10
Compare
d0fe6b7 to
03fb902
Compare
|
@pvary @talatuyarer @nssalian @aihuaxu Hey all, I rebased main but ran into some CI failures. Looks like a new check was added recently that doesn't allow modifying the ParquetFormatModel parameter types directly. As a workaround, I added a new method createWithCopyFuncFactory in ParquetFormatModel. The original create method now delegates to it, so the Spark code stays untouched, while FlinkFormatModels calls createWithCopyFuncFactory explicitly. Would really appreciate it if you could help take another look at these changes. Thanks a lot! https://github.com/apache/iceberg/actions/runs/26136650474/job/76873207059?pr=15596 |
0cdb80e to
e8ad73e
Compare
e8ad73e to
9480782
Compare
|
After discussing with @pvary , we decided to keep the same create name and go with deprecating the old method instead. |
| } | ||
|
|
||
| /** | ||
| * @deprecated Will be removed in 1.12.0; use {@link #create(Class, Class, WriterFunction, |
There was a problem hiding this comment.
Since the 1.11.0 is released now, this change will be in 1.12.0 and we will remove it in 1.13.0.
Please adjust the comment
There was a problem hiding this comment.
Thanks for pointing it out. Adjusted it now.
| */ | ||
| private FileAppender<D> buildShreddedAppender() { | ||
| UnaryOperator<D> copyFunc = copyFuncFactory.apply(engineSchema); | ||
| Preconditions.checkState(copyFunc != null, "copyFunc must not return null"); |
There was a problem hiding this comment.
Checking only the copyFunc seems a bit odd to me. Should we check the factory first?
There was a problem hiding this comment.
Ok, add a check for copyFuncFactory first.
|
@nssalian: Any more comments? |
nssalian
left a comment
There was a problem hiding this comment.
one nit in the documentation description. rest looks good.
| | write-parallelism | Upstream operator parallelism | Overrides the writer parallelism | | ||
| | uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table | | ||
| | shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write | | ||
| | variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size this write | |
There was a problem hiding this comment.
| | variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size this write | | |
| | variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size for this write | |
There was a problem hiding this comment.
Thanks . Add it now.
nssalian
left a comment
There was a problem hiding this comment.
Thanks for all the work @Guosmilesmile. lgtm
|
Seem unrelate error in kafka connect . Retrigger CI. https://github.com/apache/iceberg/actions/runs/26236304974/job/77210319934?pr=15596 |
|
Merged to main. |
This PR is mainly to add support in Flink for writing shredding-variant data to Iceberg tables, based on #14297.
This PR is based on #14297 and will be adjusted in sync with it.