Skip to content

Commit

Permalink
[Fix] Fix Bytes Schema (#1173)
Browse files Browse the repository at this point in the history
  • Loading branch information
petermnhull committed Feb 20, 2024
1 parent f476814 commit c2ca7e8
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pulsar/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]s
var schemaDef = string(schemaData)
var s Schema
switch schemaType {
case BYTES:
s = NewBytesSchema(properties)
case STRING:
s = NewStringSchema(properties)
case JSON:
Expand Down
64 changes: 64 additions & 0 deletions pulsar/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package pulsar

import (
"context"
"fmt"
"log"
"testing"
"time"

pb "github.com/apache/pulsar-client-go/integration-tests/pb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type testJSON struct {
Expand Down Expand Up @@ -55,6 +58,67 @@ func createClient() Client {
return client
}

func TestBytesSchema(t *testing.T) {
client := createClient()
defer client.Close()

topic := newTopicName()

properties := make(map[string]string)
properties["pulsar"] = "hello"
producerSchemaBytes := NewBytesSchema(properties)
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Schema: producerSchemaBytes,
})
assert.NoError(t, err)

_, err = producer.Send(context.Background(), &ProducerMessage{
Value: []byte(`{"key": "value"}`),
})
require.NoError(t, err)
_, err = producer.Send(context.Background(), &ProducerMessage{
Value: []byte(`something else`),
})
require.NoError(t, err)
producer.Close()

// Create consumer
consumerSchemaBytes := NewBytesSchema(nil)
assert.NotNil(t, consumerSchemaBytes)
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Schema: consumerSchemaBytes,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// Receive first message
var out1 []byte
msg1, err := consumer.Receive(ctx)
assert.NoError(t, err)
err = msg1.GetSchemaValue(&out1)
assert.NoError(t, err)
assert.Equal(t, []byte(`{"key": "value"}`), out1)
consumer.Ack(msg1)
require.NoError(t, err)

// Receive second message
var out2 []byte
msg2, err := consumer.Receive(ctx)
fmt.Println(string(msg2.Payload()))
assert.NoError(t, err)
err = msg2.GetSchemaValue(&out2)
assert.NoError(t, err)
assert.Equal(t, []byte(`something else`), out2)

defer consumer.Close()
}

func TestJsonSchema(t *testing.T) {
client := createClient()
defer client.Close()
Expand Down
7 changes: 7 additions & 0 deletions pulsar/table_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ func TestTableViewSchemas(t *testing.T) {
expValueOut interface{}
valueCheck func(t *testing.T, got interface{}) // Overrides expValueOut for more complex checks
}{
{
name: "BytesSchema",
schema: NewBytesSchema(nil),
schemaType: []byte(`any`),
producerValue: []byte(`hello pulsar`),
expValueOut: []byte(`hello pulsar`),
},
{
name: "StringSchema",
schema: NewStringSchema(nil),
Expand Down

0 comments on commit c2ca7e8

Please sign in to comment.