-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel][Writes] Add support of inserting data into tables #3030
Conversation
04235fd
to
9a2e59c
Compare
.collect(Collectors.toMap(e -> e.getKey().toLowerCase(), Map.Entry::getValue)); | ||
} | ||
|
||
public static int findColIndex(StructType schema, String colNameLowerCase) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this to avoid case sensitivity? Maybe comment that for the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done and moved this to SchemaUtils
|
||
String targetDirectory = getTargetDirectory( | ||
getTableRoot(transactionState), | ||
toLowerCaseList(getPartitionColumnsList(transactionState)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are partition column names always lowercase in the file path? If this should be documented in the function for the expected input
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored and fixed it to always preserve the case as given by the connector when the table is created.
partitionValuesLowerCaseName); | ||
return new DataWriteContext( | ||
targetDirectory, | ||
partitionValuesLowerCaseName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the lower-case nature of this should be documented somewhere? Because the keys here will be different from like transaction.getPartitionColumns()
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer exposed to the connect.
Row addFileRow = AddFile.convertDataFileStatus( | ||
tableRoot, | ||
dataFileStatus, | ||
dataWriteContext.getPartitionValues(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't these now lower case keys? Isn't that incorrect or is that how they are stored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I simplified this logic. Now it always preserves the case as given by the connector when the table is created.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
* child paths. | ||
* @return | ||
*/ | ||
public static Path relativizePath(Path child, URI root) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test that uses this? I didn't see one can we add one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add it in a follow up PR. Given this code calling another well-tested API, it should be ok.
if (partValue == null) { | ||
return new Tuple2<>(partColName, (String) null); | ||
} else { | ||
return new Tuple2<>(partColName, serializePartitionValue(partValue)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make sure we have tests with null partition values?
6cc1a69
to
ffda586
Compare
863215c
to
f89a592
Compare
Row addFileRow = AddFile.convertDataFileStatus( | ||
tableRoot, | ||
dataFileStatus, | ||
dataWriteContext.getPartitionValues(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I simplified this logic. Now it always preserves the case as given by the connector when the table is created.
|
||
String targetDirectory = getTargetDirectory( | ||
getTableRoot(transactionState), | ||
toLowerCaseList(getPartitionColumnsList(transactionState)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored and fixed it to always preserve the case as given by the connector when the table is created.
partitionValuesLowerCaseName); | ||
return new DataWriteContext( | ||
targetDirectory, | ||
partitionValuesLowerCaseName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer exposed to the connect.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java
Outdated
Show resolved
Hide resolved
* child paths. | ||
* @return | ||
*/ | ||
public static Path relativizePath(Path child, URI root) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add it in a follow up PR. Given this code calling another well-tested API, it should be ok.
.collect(Collectors.toMap(e -> e.getKey().toLowerCase(), Map.Entry::getValue)); | ||
} | ||
|
||
public static int findColIndex(StructType schema, String colNameLowerCase) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done and moved this to SchemaUtils
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments. The partition name stuff is a lot cleaner thanks :)
@@ -104,7 +118,37 @@ static CloseableIterator<FilteredColumnarBatch> transformLogicalData( | |||
Row transactionState, | |||
CloseableIterator<FilteredColumnarBatch> dataIter, | |||
Map<String, Literal> partitionValues) { | |||
throw new UnsupportedOperationException("Not implemented yet"); | |||
List<String> partitionColNames = getPartitionColumnsList(transactionState); | |||
validatePartitionValues(partitionColNames, partitionValues); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we validate the types here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, what do we even use the partitionValues
for here? Isn't this param unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the discussions where we concluded that taking the partition values as input forces the connector to not pass data from multiple partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type validation makes sense. Adding those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you maybe add a comment about why we have partitionValues
there then? For future reference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added.
* <li>{@code map}: only a {@code map} with {@code string} key type is supported</li> | ||
* <li>{@code map}: only a {@code map} with {@code string} key type is supported. If an | ||
* entry value is {@code null}, it should be written to the file.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this change? How are partitionValues
serialized in delta spark? Is this with the other JSON serialization rules..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out Spark writes the null values in partitionValues
to Delta Log. Yeah, this is one of those undocumented detail, which was found through testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this done though? Since I thought we saw that null values weren't written for maps some other way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val addFile = AddFile(
path = "sdfsdf.parquet",
partitionValues = Map("a" -> "b", "c" -> null),
size = 12345,
modificationTime = 54321,
dataChange = true,
stats = null
)
val json = addFile.json
assert(json == "sdfsd")
returns {"add":{"path":"sdfsdf.parquet","partitionValues":{"a":"b","c":null},"size":12345,"modificationTime":54321,"dataChange":true}}
Basically, the ObjectMapper,
when .setSerializationInclusion(Include.NON_ABSENT)
doesn't write any property (in the above example stats
) whose value is null, but nulls
in the map are always written.
Applying that to Kernel: any struct fields that are null - not written. For any nulls in Map, written out.
kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultParquetHandler.java
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala
Show resolved
Hide resolved
|
||
verifyCommitResult(commitResult0, expVersion = 0, expIsReadyForCheckpoint = false) | ||
verifyCommitInfo(tblPath, version = 0, expPartCols, operation = WRITE) | ||
verifyWrittenContent(tblPath, schema, expV0Data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this enough to be sure the add.partitionValues
has the right case sensitivity?
kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/PartitionUtils.java
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but could you follow up on the remaining question I had? Just for my understanding
…a-io#3030) (Split from delta-io#2944) Adds support for inserting data into the table. Tests for inserting into partitioned and unpartitioned tables with various combinations of the types, partition values etc. Also tests the checkpoint is ready to create.
Description
(Split from #2944)
Adds support for inserting data into the table.
How was this patch tested?
Tests for inserting into partitioned and unpartitioned tables with various combinations of the types, partition values etc. Also tests the checkpoint is ready to create.