Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO schema_registry_decode issue #1161

Closed
mihaitodor opened this issue Mar 9, 2022 · 2 comments · Fixed by #1198
Closed

AVRO schema_registry_decode issue #1161

mihaitodor opened this issue Mar 9, 2022 · 2 comments · Fixed by #1198
Labels
bug processors Any tasks or issues relating specifically to processors waiting for upstream Blocked on changes needed in an upstream dependency

Comments

@mihaitodor
Copy link
Collaborator

mihaitodor commented Mar 9, 2022

I'm seeing some issues when decoding AVRO logical types It looks like codec.TextualFromNative must be called here, but even then there are some issues.

To reproduce, create the following schema.avsc file:

{
  "type": "record",
  "name": "LongList2",
  "fields": [
    {
      "default": null,
      "name": "int_time_millis",
      "type": [
        "null",
        {
          "type": "int",
          "logicalType": "time-millis"
        }
      ]
    },
    {
      "default": null,
      "name": "long_time_micros",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "time-micros"
        }
      ]
    },
    {
      "default": null,
      "name": "long_timestamp_micros",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-micros"
        }
      ]
    },
    {
      "default": null,
      "name": "pos_0_33333333",
      "type": [
        "null",
        {
          "logicalType": "decimal",
          "precision": 16,
          "scale": 2,
          "type": "bytes"
        }
      ]
    }
  ]
}

And the following message.json file:

{"int_time_millis":{"int": 35245000},"long_time_micros":{"long":20192000000000},"long_timestamp_micros":{"long":62135596800000000},"pos_0_33333333":{"bytes":"!"}}

Start Redpanda in a Docker container:

> docker run --rm -p 8081:8081 -p 8082:8082 -p 9092:9092 --name redpanda docker.vectorized.io/vectorized/redpanda redpanda start --smp 1 --overprovisioned --kafka-addr 0.0.0.0:9092 --advertise-kafka-addr host.docker.internal:9092 --pandaproxy-addr 0.0.0.0:8082 --advertise-pandaproxy-addr host.docker.internal:8082

Start a Kafka tools Docker container and produce the above message on a topic called test:

> docker run --rm -it -v$(pwd):/workspace confluentinc/cp-schema-registry bash
>> cd /workspace
>> kafka-avro-console-producer --broker-list host.docker.internal:9092 --topic test --property schema.registry.url=http://host.docker.internal:8081 --property value.schema.file=schema.avsc < message.json

Then create the following Benthos avro.yaml config:

input:
  kafka:
    addresses:
      - localhost:9092
    topics: [test]
    target_version: 1.0.0
    consumer_group: benthos_consumer_group
    client_id: benthos_kafka_input
    rack_id: ""
    start_from_oldest: true
    checkpoint_limit: 1
    commit_period: 1s
    max_processing_period: 100ms
    extract_tracing_map: ""
    group:
      session_timeout: 10s
      heartbeat_interval: 3s
      rebalance_timeout: 60s
    fetch_buffer_cap: 256

pipeline:
  threads: 1
  processors:
    - schema_registry_decode:
        url: "http://localhost:8081"

output:
  stdout:
    codec: lines

Running Benthos v3.65.0 produces this output:

{"int_time_millis":{"int.time-millis":35245000000000},"long_time_micros":{"long.time-micros":20192000000000000},"long_timestamp_micros":{"long.timestamp-micros":"3939-01-01T00:00:00Z"},"pos_0_33333333":{"bytes.decimal":"33/100"}}

And changing the code here to call codec.TextualFromNative produces this output:

{"pos_0_33333333":{"bytes.decimal":"!"},"int_time_millis":{"int.time-millis":35245000},"long_time_micros":{"long.time-micros":1358741504},"long_timestamp_micros":{"long.timestamp-micros":62135596800000000}}

While the change does improve things, it looks like there's still an issue with long_time_micros which seems to require a bit of digging into goavro.

@Jeffail Jeffail added bug processors Any tasks or issues relating specifically to processors waiting for upstream Blocked on changes needed in an upstream dependency labels Mar 9, 2022
@Jeffail
Copy link
Collaborator

Jeffail commented Mar 9, 2022

Thanks @mihaitodor, I've marked this as waiting for upstream as I'm guess that if we can't solve this with codec.TextualFromNative then we're stuck without upstream fixes.

@mihaitodor
Copy link
Collaborator Author

mihaitodor commented Mar 25, 2022

Opened a bug upstream: linkedin/goavro#242 and a potential fix: linkedin/goavro#243

mihaitodor added a commit to mihaitodor/connect that referenced this issue Apr 5, 2022
When dealing with logical types, we can't serialise the native
struct emitted by goavro directly to JSON, since that will discard
schema information that `codec.TextualFromNative` uses to produce
the expected JSON.

Also, the tip version of goavro is required because the fix for
this regression linkedin/goavro#242 was
merged, but there isn't a tagged release yet.

Fixes redpanda-data#1161.
mihaitodor added a commit to mihaitodor/connect that referenced this issue Apr 5, 2022
When dealing with logical types, we can't serialise the native
struct emitted by goavro directly to JSON, since that will discard
schema information that `codec.TextualFromNative` uses to produce
the expected JSON.

Also, the tip version of goavro is required because the fix for
this regression linkedin/goavro#242 was
merged, but there isn't a tagged release yet.

Fixes redpanda-data#1161.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug processors Any tasks or issues relating specifically to processors waiting for upstream Blocked on changes needed in an upstream dependency
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants