Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for custom logical type conversions to TextualFromNative #253

Open
mihaitodor opened this issue Jul 18, 2022 · 9 comments
Open

Comments

@mihaitodor
Copy link
Collaborator

While trying to replicate the Snowflake Kafka Connector functionality using Benthos, I noticed that in Java they are adding genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()) in addition to the implicit conversions when converting AVRO to JSON. I believe this is done such that a bytes type with logical type decimal can be transformed in a way which preserves some information about the scale and precision. For example, the following Java code:

// > export JAVA_HOME=/usr/local/opt/openjdk
// > export PATH="${JAVA_HOME}/bin:$PATH"
// > java --version
// openjdk 18.0.1 2022-04-19
// OpenJDK Runtime Environment Homebrew (build 18.0.1+0)
// OpenJDK 64-Bit Server VM Homebrew (build 18.0.1+0, mixed mode, sharing)
// > java -cp avro_1.11/avro-1.11.0.jar:avro_1.11/jackson-core-2.12.5.jar:avro_1.11/jackson-annotations-2.12.5.jar:avro_1.11/jackson-databind-2.12.5.jar:avro_1.11/slf4j-api-1.7.32.jar Main.java

import java.math.BigInteger;
import java.util.Arrays;
import java.io.*;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.*;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.Conversions;
import org.apache.avro.file.DataFileReader;
import java.util.HexFormat;

public class Main {
    static byte[] fromJsonToAvro(String json, Schema schema) throws Exception {
        InputStream input = new ByteArrayInputStream(json.getBytes());
        DataInputStream din = new DataInputStream(input);

        Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

        DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
        Object datum = reader.read(null, decoder);

        GenericDatumWriter<Object>  w = new GenericDatumWriter<Object>(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);

        w.write(datum, e);
        e.flush();

        return outputStream.toByteArray();
    }

    static String schemaJSON = """
{
	"type": "record",
	"name": "bytesdecimal",
	"fields": [
		{
			"default": null,
			"name": "pos_0_33333333",
			"type": [
				"null",
				{
					"logicalType": "decimal",
					"precision": 16,
					"scale": 2,
					"type": "bytes"
				}
			]
		}
	]
}
""";

    static String inputJSON = """
{
	"pos_0_33333333": {
		"bytes": "!"
	}
}
""";

    public static void main(String[] args) {
        try {
            Schema schema = new Schema.Parser().parse(schemaJSON);

            byte[] avroByteArray = fromJsonToAvro(inputJSON, schema);

            final GenericData genericData = new GenericData();
            genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
            DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema, schema, genericData);

            Decoder decoder = DecoderFactory.get().binaryDecoder(avroByteArray, null);
            GenericRecord record = reader.read(null, decoder);
            System.out.println(record);
        }
        catch (Exception e) {
            System.out.println(e);
        }
    }
}

outputs {"pos_0_33333333": 0.33}, while this Go code:

package main

import (
	"testing"

	"github.com/linkedin/goavro/v2"
	"github.com/stretchr/testify/require"
)

func TestLogicalTypeToJSON(t *testing.T) {
	schema := `
{
	"type": "record",
	"name": "bytesdecimal",
	"fields": [
		{
			"default": null,
			"name": "pos_0_33333333",
			"type": [
				"null",
				{
					"logicalType": "decimal",
					"precision": 16,
					"scale": 2,
					"type": "bytes"
				}
			]
		}
	]
}`

	jsonString := `
{
	"pos_0_33333333": "!"
}`

	codec, err := goavro.NewCodecForStandardJSON(schema)
	require.NoError(t, err)

	native, _, err := codec.NativeFromTextual([]byte(jsonString))
	require.NoError(t, err)

	bs, err := codec.TextualFromNative(nil, native)
	require.NoError(t, err)

	require.Equal(t, `{"pos_0_33333333":{"bytes.decimal":"!"}}`, string(bs))
}

outputs {"pos_0_33333333":{"bytes.decimal":"!"}}. While this might be more intuitive, a consumer loses any information on precision and scale. It also exhibits the issue raised in #252 regarding bytes.decimal vs bytes.

Note: There are other converters which can be added in Java, but this Snowflake connector only uses this DecimalConversion.

Would there be any interest to support this functionality? I'm happy to put in the work to write the new code with unit tests and whatever is needed for it, but I'd like to know what your thoughts are on this first and maybe how you'd expect the API to be extended for this purpose. Maybe adding an extra method on the codec, like AddLogicalTypeConversion?

@xmcqueen
Copy link
Contributor

This and the preceding issue #252 will require some more thinking over here. It sounds like its functionally beneficial in that it helps preserve the scale and precision information from the incoming data. Is this upstream java change required by the specs and implemented in the apache avro java library or is it an enhancement created by the Benthos project?

@mihaitodor
Copy link
Collaborator Author

Is this upstream java change required by the specs and implemented in the apache avro java library or is it an enhancement created by the Benthos project?

I don't know if / which spec document covers it, but the Java AVRO implementation exposes GenericData.addLogicalTypeConversion(Conversion<?> conversion) and, unfortunately, it's being used in the wild. In the case of Benthos, I have a requirement to match the JSON format emitted by the Snowflake Kafka Connector, which, under the hood, calls addLogicalTypeConversion. I think it would be best to have a generic mechanism that matches the functionality of this addLogicalTypeConversion Java method, but I'd be OK to have a custom flag somewhere to enable this specific decimal conversion, if that makes more sense.

Unfortunately, right now, I think the only way to enable this specific conversion is to write my own implementation of TextualFromNative, which I'd very much rather avoid...

@xmcqueen
Copy link
Contributor

@mihaitodor i'd be happy to see your contribution to solve these two issues, if you still have the enthusiasm to get that going!

Thanks

@mihaitodor
Copy link
Collaborator Author

@mihaitodor i'd be happy to see your contribution to solve these two issues, if you still have the enthusiasm to get that going!

Thanks

Cool! Do you mean this one and #252? Happy to put some time into it. For this one, I think adding some extra API function(s) might suffice, but for #252 I think the best thing to do is a breaking change along with a major version bump, if you agree that the current behaviour isn't what the spec demands.

@xmcqueen
Copy link
Contributor

xmcqueen commented Aug 22, 2022

Yes I was thinking of #252. They seem related and if you know the subject it would be great if you would do both of them. I do think the version bump on the other issue (#252) is appropriate. I think it would be best for me to cut a new version with the recent commits, and then a subsequent version with the bump to pull in this breaking change from #252. So I guess the priority is this issue here #253 and then a version release, and then #252 and version bump.

@xmcqueen
Copy link
Contributor

Hi @mihaitodor, I'm not clear from the comments if this is waiting on me or not. Its been 5 months. It looks like the most recent release is after this, so I must have done the "new version with the recent commits". Is there any interest in getting these two tickets going?

@mihaitodor
Copy link
Collaborator Author

Hey @xmcqueen, thanks for following up and reminding me to post an update! I started digging into this one, but it ended up a bit more involved than I expected and I had to put it on pause...

#252 should be a small code change + test updates, assuming you folks are still fine with a breaking change and a major version bump. After that's done, I guess this one (#253) can be implemented separately by exposing some extra API method without breaking any existing API. WDYT?

Long term, I'd like to see both of these issues addressed, but do let me know if yourself or anyone else is interested in tackling them in the next few months. Otherwise, I'll get to them, since some new users are going to have issues with adopting this sooner or later...

@xmcqueen
Copy link
Contributor

Great, let's see if we get any other opinions. A breaking change sounds scary to me, though I know we will clearly indicate it with a major version bump.

I'll go post in the other ticket (#252)

@mihaitodor
Copy link
Collaborator Author

The good news is that people will need to import github.com/linkedin/goavro/v3 instead of github.com/linkedin/goavro/v2 to get the updated logic after #252. The bad news is that the JSON data they produced with github.com/linkedin/goavro/v2 will differ from data which they'll produce with github.com/linkedin/goavro/v3 if there are logical types in it, which might be an issue for some users. While one could maintain both versions in parallel by following this guide (either as duplicate code in a subfolder or as a separate branch), given that there aren't a lot of changes to this library, I think just publishing a new v3.0.0 tag and making any other new additions exclusively in v3 will be OK.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants