Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-460] Elect leader api implemented #2320

Merged
merged 13 commits into from
Oct 10, 2024
Merged

[KIP-460] Elect leader api implemented #2320

merged 13 commits into from
Oct 10, 2024

Conversation

PratRanj07
Copy link
Contributor

@PratRanj07 PratRanj07 commented Oct 3, 2024

Implemented the Elect Leaders API(KIP - 460) and wrote unit tests for it.

The build is gonna fail until the ElectLeaders api is merged in ibrdkafka.

Did the manual testing for various cases:

Single Partition:

dotnet run localhost:9092 elect-leaders 0 quickstart-events 0

One or more elect leaders operations failed.
An error occurred in elect leaders operation in Topic quickstart-events Partition [0]: Code: ElectionNotNeeded, Reason: Leader election not needed for topic partition.

Multiple Partition:

dotnet run localhost:9092 elect-leaders 0 quickstart-events 0 first_topic 0

One or more elect leaders operations failed.
An error occurred in elect leaders operation in Topic quickstart-events Partition [0]: Code: ElectionNotNeeded, Reason: Leader election not needed for topic partition.
An error occurred in elect leaders operation in Topic first_topic Partition [0]: Code: ElectionNotNeeded, Reason: Leader election not needed for topic partition.

Empty Partitions is gonna give us an empty list.

Null partitions perform elections for all the partitions but the results only consists of success cases and cases where error is other than ElectionNotNeeded.

which is the desired result in case Leader Election is not required in the partitions.

Unit tests also ran fine.

`dotnet test --filter "FullyQualifiedName~Confluent.Kafka.UnitTests.ElectLeadersErrorTests"

Passed! - Failed: 0, Passed: 9, Skipped: 0, Total: 9, Duration: 3 s - Confluent.Kafka.UnitTests.dll (net6.0)

@PratRanj07 PratRanj07 requested review from a team as code owners October 3, 2024 18:05
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.


private ElectLeadersReport extractElectLeadersResults(IntPtr resultPtr)
{
IntPtr partitionsPtr = Librdkafka.ElectLeadersResult_partitions(resultPtr, out UIntPtr partitionsCountPtr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to topicPartitionsPtr and topicPartitionsCountPtr

private ElectLeadersReport extractElectLeadersResults(IntPtr resultPtr)
{
IntPtr partitionsPtr = Librdkafka.ElectLeadersResult_partitions(resultPtr, out UIntPtr partitionsCountPtr);
IntPtr[] partitionsPtrArr = new IntPtr[(int)partitionsCountPtr];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topicPartitionsPtrArr

@@ -634,6 +634,36 @@ private ListOffsetsReport extractListOffsetsReport(IntPtr resultPtr)
};
}

private Tuple<ErrorCode,List<TopicPartitionError>> extractTopicPartitionErrors(IntPtr[] topicPartitionErrorsPtr, int topicPartitionErrorsCount)
{
ErrorCode err = ErrorCode.NoError;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to overallError

List<TopicPartitionError> topicPartitionErrors = new List<TopicPartitionError>(topicPartitionErrorsCount);
for(int i=0;i<topicPartitionErrorsCount;i++){
var tpe = Marshal.PtrToStructure<rd_kafka_topic_partition_result>(topicPartitionErrorsPtr[i]);
if(tpe.errCode != ErrorCode.NoError){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add a check here for if overall err is NoError first? Otherwise we'd always overwrite it

@@ -634,6 +634,36 @@ private ListOffsetsReport extractListOffsetsReport(IntPtr resultPtr)
};
}

private Tuple<ErrorCode,List<TopicPartitionError>> extractTopicPartitionErrors(IntPtr[] topicPartitionErrorsPtr, int topicPartitionErrorsCount)
{
ErrorCode err = ErrorCode.NoError;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add null checks in extraction topicPartitionErrorsPtr non null, topicPartitionErrorsCount > 0 and in the loop topicPartitionErrorsPtr[i] not IntPtr.Zero

private ElectLeadersReport extractElectLeadersResults(IntPtr resultPtr)
{
IntPtr partitionsPtr = Librdkafka.ElectLeadersResult_partitions(resultPtr, out UIntPtr partitionsCountPtr);
IntPtr[] partitionsPtrArr = new IntPtr[(int)partitionsCountPtr];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to have a check if partitionsCountPtr > 0

ErrorCode = result.Item1,
PartitionResults = result.Item2
};
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both could be merged in a single method too, something like

private ElectLeadersReport ExtractElectLeadersReport(IntPtr resultPtr)
{
    IntPtr topicPartitionsPtr = Librdkafka.ElectLeadersResult_partitions(resultPtr, out UIntPtr topicPartitionsCountPtr);
    

    IntPtr[] partitionsPtrArr = new IntPtr[(int)partitionsCountPtr];
    if ((int)partitionsCountPtr > 0)
    {
        Marshal.Copy(partitionsPtr, partitionsPtrArr, 0, (int)partitionsCountPtr);
    }

    ErrorCode reportErrorCode = ErrorCode.NoError;
    var topicPartitionErrors = partitionsPtrArr.Select(partitionPtr =>
    {
        var tpe = Marshal.PtrToStructure<rd_kafka_topic_partition_result>(partitionPtr);
        if (tpe.errCode != ErrorCode.NoError && reportErrorCode == ErrorCode.NoError)
        {
            reportErrorCode = tpe.errCode;
        }
        return new TopicPartitionError(
            tpe.topic,
            new Partition(tpe.partition),
            new Error(tpe.errCode, tpe.errorStr));
    }).ToList();

    return new ElectLeadersReport
    {
        ErrorCode = reportErrorCode,
        PartitionResults = topicPartitionErrors
    };
}

/// </param>
public static Task<ElectLeadersResult> ElectLeadersAsync(
this IAdminClient adminClient,
ElectLeadersRequest electLeadersRequest,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we making the user to pass ElectLeadersRequest instead of asking ElectionType and List<TopicPartition>? This is very much different from existing APIs

@@ -0,0 +1,176 @@
// Copyright 2024 Confluent Inc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add an integration test

};

[Fact]
public async void InvalidRequestTimeout()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could rewrite something like this


        [Theory]
        [InlineData(-1)]
        [InlineData(0)]
        public async Task ElectLeadersAsync_InvalidRequestTimeout_ThrowsArgumentException(int timeoutSeconds)
        {
            using var adminClient = new AdminClientBuilder(GetTestConfig()).Build();
            var options = new ElectLeadersOptions { RequestTimeout = TimeSpan.FromSeconds(timeoutSeconds) };

            var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
                adminClient.ElectLeadersAsync(
                    new ElectLeadersRequest { ElectionType = ElectionType.Preferred, Partitions = new List<TopicPartition> { new TopicPartition("topic", 0) } },
                    options)
            );

            Assert.Contains("RequestTimeout", exception.Message);
        }

        [Theory]
        [InlineData(ElectionType.Preferred)]
        [InlineData(ElectionType.Unclean)]
        public async Task ElectLeadersAsync_EmptyPartitions_ThrowsArgumentException(ElectionType electionType)
        {
            using var adminClient = new AdminClientBuilder(GetTestConfig()).Build();

            var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
                adminClient.ElectLeadersAsync(
                    new ElectLeadersRequest { ElectionType = electionType, Partitions = new List<TopicPartition>() },
                    new ElectLeadersOptions())
            );

            Assert.Contains("No partitions specified", exception.Message);
        }

        [Theory]
        [InlineData(ElectionType.Preferred)]
        [InlineData(ElectionType.Unclean)]
        public async Task ElectLeadersAsync_DuplicatePartitions_ThrowsArgumentException(ElectionType electionType)
        {
            using var adminClient = new AdminClientBuilder(GetTestConfig()).Build();

            var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
                adminClient.ElectLeadersAsync(
                    new ElectLeadersRequest
                    {
                        ElectionType = electionType,
                        Partitions = new List<TopicPartition> { new TopicPartition("topic", 0), new TopicPartition("topic", 0) }
                    },
                    new ElectLeadersOptions())
            );

            Assert.Contains("Duplicate partitions specified", exception.Message);
        }

        [Theory]
        [InlineData(ElectionType.Preferred)]
        [InlineData(ElectionType.Unclean)]
        [Timeout(1000)]
        public async Task ElectLeadersAsync_ValidRequest_TimesOut(ElectionType electionType)
        {
            using var adminClient = new AdminClientBuilder(GetTestConfig()).Build();

            var exception = await Assert.ThrowsAsync<KafkaException>(() =>
                adminClient.ElectLeadersAsync(
                    new ElectLeadersRequest
                    {
                        ElectionType = electionType,
                        Partitions = new List<TopicPartition> { new TopicPartition("topic", 0) }
                    },
                    new ElectLeadersOptions())
            );

            Assert.Contains("Local: Timed out", exception.Message);
        }

@PratRanj07 PratRanj07 requested a review from anchitj October 9, 2024 23:30
Comment on lines 327 to 329
if(args.Length == 1){
partitions = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

brace at new line and spaces after if

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents an error that occured during the ElectLeaders opperation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in operation

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Options for ElectLeaders method.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrite as Options for the "AdminClient.ElectLeaders" method. to keep in sync with other classes

/// <summary>
/// Operational Error if any
/// </summary>
public ErrorCode ErrorCode { get; set; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just return Error instead of ErrorCode? This would make sure the design aligns with other classes


/// <summary>
/// Individual partition results. Atleast one of these partitions
/// will be in error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why atleast one will be in error?

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Enumerates the different types of ACL permission types.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to correct this

setOption_RequestTimeout(optionsPtr, options.RequestTimeout);
setOption_OperationTimeout(optionsPtr, options.OperationTimeout);
setOption_completionSource(optionsPtr, completionSourcePtr);
if(partitions != null){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting

{
string topic = topicPartitions.Topic;
Partition partition = topicPartitions.Partition;
IntPtr topic_partition = Librdkafka.topic_partition_list_add(topic_partition_list, topic, partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return type isn't being used so can be removed

@PratRanj07 PratRanj07 requested a review from anchitj October 10, 2024 09:48

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Copy link
Member

@anchitj anchitj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are my proposed changes:

  • rename to PartitionResults as TopicPartitions as in Go
  • move ElectionType to Confluent.Kafka
  • style fixes and a refactor in the example

@@ -393,6 +393,31 @@ static bool ParseListConsumerGroupsArgs(string[] commandArgs,
}
}

static Tuple<ElectionType, List<TopicPartition>> ParseElectLeadersArgs(string[] args)
{
if ((args.Length -1 )%2 != 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ((args.Length -1 )%2 != 0)
if ((args.Length -1 ) % 2 != 0)

{
if ((args.Length -1 )%2 != 0)
{
Console.WriteLine("usage: .. <bootstrapServers> elect-leaders electionType(0/1) <topic1> <partition1> ..");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Console.WriteLine("usage: .. <bootstrapServers> elect-leaders electionType(0/1) <topic1> <partition1> ..");
Console.WriteLine("usage: .. <bootstrapServers> elect-leaders <electionType> <topic1> <partition1> ..");

Comment on lines 431 to 438
static void PrintElectLeaderResults(ElectLeadersResult result)
{
Console.WriteLine(" ElectLeadersResult:");
foreach (var partitionResult in result.PartitionResults)
{
Console.WriteLine($"Election successful in {partitionResult.Topic} {partitionResult.Partition}");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be common for the Result and Report by passing the partition list.

Suggested change
static void PrintElectLeaderResults(ElectLeadersResult result)
{
Console.WriteLine(" ElectLeadersResult:");
foreach (var partitionResult in result.PartitionResults)
{
Console.WriteLine($"Election successful in {partitionResult.Topic} {partitionResult.Partition}");
}
}
static void PrintElectLeaderResults(List<TopicPartitionError> topicPartitions)
{
Console.WriteLine($"ElectLeaders response has {topicPartitions.Count} partition(s):");
foreach (var partitionResult in topicPartitions)
{
if (!partitionResult.Error.IsError)
Console.WriteLine($"Election successful in {partitionResult.Topic} {partitionResult.Partition}");
else
Console.WriteLine($"Election failed in {partitionResult.Topic} {partitionResult.Partition}: " +
$"Code: {partitionResult.Error.Code}" +
$", Reason: {partitionResult.Error.Reason}");
}
}

{
if (commandArgs.Length < 3 && (commandArgs.Length - 1) % 2 != 0)
{
Console.WriteLine("usage: .. <bootstrapServers> elect-leaders electionType(0/1) <topic1> <partition1> ..");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Console.WriteLine("usage: .. <bootstrapServers> elect-leaders electionType(0/1) <topic1> <partition1> ..");
Console.WriteLine("usage: .. <bootstrapServers> elect-leaders <electionType> <topic1> <partition1> ..");

Comment on lines 1041 to 1053
for (int i = 0; i < e.Results.PartitionResults.Count; ++i)
{
var result = e.Results.PartitionResults[i];
if (!result.Error.IsError)
{
Console.WriteLine($"Elected leaders operation completed successfully for Topic {result.Topic} Partition {result.Partition}");
}
else
{
Console.WriteLine($"An error occurred in elect leaders operation in Topic {result.Topic} Partition {result.Partition}: Code: {result.Error.Code}" +
$", Reason: {result.Error.Reason}");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (int i = 0; i < e.Results.PartitionResults.Count; ++i)
{
var result = e.Results.PartitionResults[i];
if (!result.Error.IsError)
{
Console.WriteLine($"Elected leaders operation completed successfully for Topic {result.Topic} Partition {result.Partition}");
}
else
{
Console.WriteLine($"An error occurred in elect leaders operation in Topic {result.Topic} Partition {result.Partition}: Code: {result.Error.Code}" +
$", Reason: {result.Error.Reason}");
}
}
PrintElectLeaderResults(e.Results.TopicPartitions);


using System;

namespace Confluent.Kafka.Admin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add one newline

Comment on lines 645 to 648
{
Error = new Error(ErrorCode.NoError),
PartitionResults = new List<TopicPartitionError>()
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{
Error = new Error(ErrorCode.NoError),
PartitionResults = new List<TopicPartitionError>()
};
return new ElectLeadersReport
{
TopicPartitions = new List<TopicPartitionError>()
};

Copy link
Contributor Author

@PratRanj07 PratRanj07 Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is esssential as it tells us to report no Error in case we dont recieve any topicPartitions in the result and we dont have to raise exception for this case in case of passing an empty list or for a null partitions case where no elections were required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right this is still needed for checking that no error occurred

Marshal.Copy(topicPartitionsPtr, topicPartitionsPtrArr, 0, (int)topicPartitionsCountPtr);

ErrorCode reportErrorCode = ErrorCode.NoError;
var partitionResults = topicPartitionsPtrArr.Select(ptr =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var partitionResults = topicPartitionsPtrArr.Select(ptr =>
var topicPartitions = topicPartitionsPtrArr.Select(ptr =>

return new ElectLeadersReport
{
Error = new Error(reportErrorCode),
PartitionResults = partitionResults
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PartitionResults = partitionResults
TopicPartitions = topicPartitions

Comment on lines 1320 to 1322
var result = new ElectLeadersResult(){
PartitionResults = report.PartitionResults
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var result = new ElectLeadersResult(){
PartitionResults = report.PartitionResults
};
var result = new ElectLeadersResult()
{
TopicPartitions = report.TopicPartitions
};

@PratRanj07 PratRanj07 requested a review from emasab October 10, 2024 19:17
Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks Pratyush!

@emasab emasab changed the title Elect leader api(KIP-460) implemented [KIP-460] Elect leader api implemented Oct 10, 2024
@emasab emasab merged commit 8212364 into master Oct 10, 2024
3 checks passed
@emasab emasab deleted the dev_electLeadersApi branch October 10, 2024 20:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants