Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Sink connector with data writers and converters #9466

Merged
merged 2 commits into from
Feb 3, 2024

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented Jan 13, 2024

This PR is the next stage in submitting the Iceberg Kafka Connect sink connector, and is a follow up to #8701. It includes the initial sink connector and configuration, along with the data writers and converters. In the interest of reducing the scope of the PR, it does not include the sink task, commit controller, integration tests, or docs.

For reference, the current sink implementation can be found at https://github.com/tabular-io/iceberg-kafka-connect.

}

public static Object extractFromRecordValue(Object recordValue, String fieldName) {
String[] fields = fieldName.split("\\.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually avoid using split like this because it breaks for names that include ..

To avoid this, we would normally index the schema to produce Accessor instances for fields, then look up the correct accessor by the full fieldName that is passed in. Is that something that we can do here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field name here is a config used to look up a value in a Kafka record to use for certain purposes like dynamic table routing (which is not part of this PR). One solution could be to escape the dots in the name when setting the config, though I felt that was not a common case so I left it out in the interest of keeping it simpler.

}

public static TaskWriter<Record> createTableWriter(
Table table, String tableName, IcebergSinkConfig config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect tableName to be something other than table.name() or table.toString()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be different. The tableName parameter is used to look up table-specific configuration parameters and is always namespace + name. table.name() can have the catalog name prepended in some cases.

public static TaskWriter<Record> createTableWriter(
Table table, String tableName, IcebergSinkConfig config) {
Map<String, String> tableProps = Maps.newHashMap(table.properties());
tableProps.putAll(config.writeProps());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other engines, we typically use shorter property names for overrides. For example, the table property for format is write.format.default but the write property to override it in Spark is write-format. That avoids some odd cases, like setting a default for a single write in this case.

Would it also make sense to do this for the KC sink?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that is possible, though then won't we need to maintain an additional set of properties? i.e. if a new table property is added, we will need to remember to add it to the KC sink also...?

return value;
}

Preconditions.checkState(value instanceof Struct, "Expected a struct type");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can structs only contain structs or a value and not maps? And vice versa. Is that a KC guarantee?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is used to extract a primitive value from a nested field given a field name with dot notation. This is used for some configs like the route field name, e.g. home.address.city. Currently this expects nested levels to be structs.

OutputFileFactory fileFactory =
OutputFileFactory.builderFor(table, 1, System.currentTimeMillis())
.defaultSpec(table.spec())
.operationId(UUID.randomUUID().toString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reusable ID that we can supply here instead? In Spark this is the write's UUID, or for streaming it is the write's UUID and the epoch ID. If we have an equivalent in KC it would be nice to use it here. (Not a blocker)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of anything we could use here instead that would be an improvement.

import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionedFanoutWriter;

public class PartitionedAppendWriter extends PartitionedFanoutWriter<Record> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this live in data since it is using Iceberg's generic Record class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionedFanoutWriter and UnpartitionedWriter are in core, perhaps it should sit alongside those?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds good to me. That should avoid people re-creating this implementation for other purposes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to move this, but there is a dependency on InternalRecordWrapper to extract the partition key, and that class lives in data unfortunately.

private final IcebergSinkConfig config;
private final Map<Integer, Map<String, NestedField>> structNameMap = Maps.newHashMap();

public RecordConverter(Table table, IcebergSinkConfig config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the purpose of this is to create a new Iceberg generic Record from Kafka's object model (Struct). Is that needed? In Flink and Spark, we use writers that are adapted to the in-memory object model for those engines to avoid copying a record and then writing. I'm not familiar enough the the KC object model to know whether this is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main purpose of the converter is to convert the Kafka Connect record to an Iceberg row, and it is also used to detect schema changes for the purpose of schema evolution.

@@ -30,3 +30,30 @@ project(":iceberg-kafka-connect:iceberg-kafka-connect-events") {
useJUnitPlatform()
}
}

project(":iceberg-kafka-connect:iceberg-kafka-connect") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the duplicate name? Could this be :iceberg-kafka-connect:iceberg-sink?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the convention of the spark projects to some extent, e.g. :iceberg-spark:iceberg-spark-<version>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so it sounds like the idea is to make the final artifact name iceberg-kafka-connect. That makes sense.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryanck thanks for submitting this! It looks really good after my first pass. Overall, I think it would be easier to get in with a couple of modifications, since this is a pretty large PR. First, I would like to separate some of the utils and tests out along with the changes that only add preconditions. Getting those easy changes in would help us focus time on validating the bigger changes. Second, I think focusing on the append use case would also help us move faster. That would mean adding half the writer classes and less config. Usually smaller PRs can have much faster review turn-around because there aren't as many updates and everyone loses less context between iterations.

@bryanck
Copy link
Contributor Author

bryanck commented Jan 16, 2024

Sure, thanks, I'll scale down this PR.

@bryanck
Copy link
Contributor Author

bryanck commented Jan 16, 2024

I stripped out the delta writers and record converter, along with related tests and config.

Comment on lines +87 to +97
} catch (Exception e) {
LOG.error(
"Unable to create partition spec {}, table {} will be unpartitioned",
partitionBy,
identifier,
e);
spec = PartitionSpec.unpartitioned();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we recovering from exceptions here?
Personally, I would prefer if the connector hard-failed so that I know I've done something wrong in my connector configurations and have to fix it rather than have the connector default to unpartitioned silently. Or am i missing some nuance here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was to have the sink be more permissive in this case, i.e. if the sink is fanning out to several different tables, don't error out if one of them can't be partitioned as that can be difficult to recover from. There is room for improvement in error handling to help with recovery, e.g. adding in DLQ support for records that can't be processed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we don't want to fail if the partitioning doesn't work. For example, if you expect events to have a event_ts column but it isn't in all event types. We should still try to make progress rather than ignoring the events or (worse) failing operation.

Comment on lines 102 to 111
try {
result.set(catalog.loadTable(identifier));
} catch (NoSuchTableException e) {
result.set(
catalog.createTable(
identifier, schema, partitionSpec, config.autoCreateProps()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm a little concerned about this. I'm worried that creating and evolving tables inside each task of a connector will result in contention issues, particularly for topics with many partitions. I'm curious if you've seen any such issues?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit of a crazy idea that I haven't fully thought through in terms of feasibility but curious if you folks ever considered delaying committing schema updates until the moment we commit the corresponding data files (potentially even in the same transaction)? That could help reduce any (potential) contention issues because in effect only one task would be performing the schema updates (the same task that is committing datafiles).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that and I agree it would have been a nice solution to reduce the contention. The issue is that you need to have the schema's field IDs assigned when you write the data. You need some type of coordination to prevent different workers from using the same ID for different fields. The catalog is acting as that coordinator currently.

writer.write(row);
}
} catch (Exception e) {
throw new DataException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the result of throwing here? Does it cause the sink to stop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sink won't stop initially, but the task will fail. Depending on the platform (Strimzi, Confluent, etc) the task will restart and the source topic partitions rebalanced across the tasks. If the failure is not recoverable then after a number of retries the sink will fail.

.collect(toList());

// filter out columns that have already been made optional
List<MakeOptional> makeOptionals =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might consider moving these checks into SchemaUpdate.Consumer. You could have that update itself for the current table schema and then check empty().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work, though I'd prefer to make that change when we introduce the record converter which is calling the consumer, to see what that ends up looking like.

update -> updateSchema.addColumn(update.parentName(), update.name(), update.type()));
updateTypes.forEach(update -> updateSchema.updateColumn(update.name(), update.type()));
makeOptionals.forEach(update -> updateSchema.makeColumnOptional(update.name()));
updateSchema.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this whole method can be retried since it calls refresh at the start.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole method is being retried currently (unless I overlooked something).


public static PartitionSpec createPartitionSpec(
org.apache.iceberg.Schema schema, List<String> partitionBy) {
if (partitionBy.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle null here, too?

return field.isOptional();
}

public static PartitionSpec createPartitionSpec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems really useful. We may want to move it to core. We currently use engines for this but this is a good simple implementation!

}
}

Optional<Type> inferIcebergType(Object value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: Iceberg doesn't generally use Optional and just passes null instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed these to do null checks instead.

} else if (value instanceof List) {
List<?> list = (List<?>) value;
if (list.isEmpty()) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of strings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt it was better to skip adding it if we can't infer the element type, then let schema evolution add it when some data comes in.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few comments, but there is nothing that I think is major and blocking. Thanks @bryanck! I'll leave it to you to reply and possibly update but I think we can merge this when you're ready.

@rdblue rdblue merged commit f4ba90d into apache:main Feb 3, 2024
@rdblue
Copy link
Contributor

rdblue commented Feb 3, 2024

Thanks, @bryanck! This is looking great and I'm excited to get the next steps in.

Also thanks to @fqaiser94 for reviewing!

Copy link
Member

@ajantha-bhat ajantha-bhat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was out of office and couldn't review it early.
Happy to see the progress.

I have few comments, that can be handled in a follow up.

overall LGTM.

ConfigDef.Type.STRING,
null,
Importance.MEDIUM,
"Coordinator threads to use for table commits, default is (cores * 2)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this definition is wrong.

notUsed -> {
try {
result.set(
catalog.createTable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have to create the namespace also if doesn't exist.

Many catalogs doesn't support implicit namespaces and expects the namespace to exist before table creation.

String transform = matcher.group(1);
switch (transform) {
case "year":
case "years":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think plurals was a carry over from spark transforms and it was not as per spec. So, recently we added singular to the same spark class.
https://iceberg.apache.org/spec/#partition-transforms

I think we don't have to support years, months, days, hours syntax as it is not as per the spec and this connector is nothing to do with spark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants