Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize schemas feature #400 #402

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 98 additions & 3 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ type Schema interface {
// String returns the canonical form of the schema.
String() string

// Resolve returns the resolved canonical form of the schema.
Resolve(cache *SchemaCache) string

// Fingerprint returns the SHA256 fingerprint of the schema.
Fingerprint() [32]byte

Expand Down Expand Up @@ -492,6 +495,11 @@ func (s *PrimitiveSchema) String() string {
return `{"type":"` + string(s.typ) + `",` + s.logical.String() + `}`
}

// Resolve returns the resolved form of the schema.
func (s *PrimitiveSchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
func (s *PrimitiveSchema) MarshalJSON() ([]byte, error) {
if s.logical == nil && len(s.props) == 0 {
Expand Down Expand Up @@ -609,8 +617,48 @@ func (s *RecordSchema) String() string {
if len(fields) > 0 {
fields = fields[:len(fields)-1]
}
if len(s.Namespace()) == 0 {
return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}`
} else {
return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"` + typ + `","fields":[` + fields + `]}`
}
}

// Resolve returns the resolved form of the schema.
func (s *RecordSchema) Resolve(cache *SchemaCache) string {
typ := "record"
if s.isError {
typ = "error"
}

fields := ""
for _, f := range s.fields {
switch lookup := f.typ.(type) {
case *RefSchema:
switch found := lookup.actual.(type) {
case *RecordSchema:
if strings.Contains(found.full, ".") {
fields += `{"name":"` + f.Name() + `","type":` + found.Resolve(cache) + `},`
} else {
fields += f.String() + ","
}
default:
fields += f.String() + ","
}
default:
fields += f.String() + ","
}
}
if len(fields) > 0 {
fields = fields[:len(fields)-1]
}

if len(s.Namespace()) == 0 {
return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}`
} else {
return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"` + typ + `","fields":[` + fields + `]}`
}

return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}`
}

// MarshalJSON marshals the schema to json.
Expand Down Expand Up @@ -966,8 +1014,17 @@ func (s *EnumSchema) String() string {
if len(symbols) > 0 {
symbols = symbols[:len(symbols)-1]
}
if len(s.Namespace()) == 0 {
return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}`
} else {
return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"enum","symbols":[` + symbols + `]}`
}

}

return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}`
// Resolve returns the resolved form of the schema.
func (s *EnumSchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
Expand Down Expand Up @@ -1060,6 +1117,11 @@ func (s *ArraySchema) String() string {
return `{"type":"array","items":` + s.items.String() + `}`
}

// Resolve returns the resolved form of the schema.
func (s *ArraySchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
func (s *ArraySchema) MarshalJSON() ([]byte, error) {
buf := new(bytes.Buffer)
Expand Down Expand Up @@ -1130,6 +1192,11 @@ func (s *MapSchema) String() string {
return `{"type":"map","values":` + s.values.String() + `}`
}

// Resolve returns the resolved form of the schema.
func (s *MapSchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
func (s *MapSchema) MarshalJSON() ([]byte, error) {
buf := new(bytes.Buffer)
Expand Down Expand Up @@ -1242,6 +1309,19 @@ func (s *UnionSchema) String() string {
return `[` + types + `]`
}

// Resolve returns the resolved form of the schema.
func (s *UnionSchema) Resolve(cache *SchemaCache) string {
types := ""
for _, typ := range s.types {
types += typ.Resolve(cache) + ","
}
if len(types) > 0 {
types = types[:len(types)-1]
}

return `[` + types + `]`
}

// MarshalJSON marshals the schema to json.
func (s *UnionSchema) MarshalJSON() ([]byte, error) {
return jsoniter.Marshal(s.types)
Expand Down Expand Up @@ -1323,7 +1403,12 @@ func (s *FixedSchema) String() string {
logical = "," + s.logical.String()
}

return `{"name":"` + s.FullName() + `","type":"fixed","size":` + size + logical + `}`
return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"fixed","size":` + size + logical + `}`
}

// Resolve returns the resolved form of the schema.
func (s *FixedSchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
Expand Down Expand Up @@ -1386,6 +1471,11 @@ func (s *NullSchema) String() string {
return `"null"`
}

// Resolve returns the resolved form of the schema.
func (s *NullSchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
func (s *NullSchema) MarshalJSON() ([]byte, error) {
return []byte(`"null"`), nil
Expand Down Expand Up @@ -1433,6 +1523,11 @@ func (s *RefSchema) String() string {
return `"` + s.actual.FullName() + `"`
}

// Resolve returns the resolved form of the schema.
func (s *RefSchema) Resolve(cache *SchemaCache) string {
return s.actual.Resolve(cache)
}

// MarshalJSON marshals the schema to json.
func (s *RefSchema) MarshalJSON() ([]byte, error) {
return []byte(`"` + s.actual.FullName() + `"`), nil
Expand Down
14 changes: 14 additions & 0 deletions schema_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,21 @@ func Parse(schema string) (Schema, error) {
return ParseBytes([]byte(schema))
}

// Serialize serializes a schema object.
func Serialize(schema Schema) string {
return SerializeWithCache(schema, DefaultSchemaCache)
}

// ParseWithCache parses a schema string using the given namespace and schema cache.
func ParseWithCache(schema, namespace string, cache *SchemaCache) (Schema, error) {
return ParseBytesWithCache([]byte(schema), namespace, cache)
}

// SerializeWithCache serializes a schema using the given namespace and schema cache.
func SerializeWithCache(schema Schema, cache *SchemaCache) string {
return serializeType(schema, cache)
}

// MustParse parses a schema string, panicing if there is an error.
func MustParse(schema string) Schema {
parsed, err := Parse(schema)
Expand Down Expand Up @@ -75,6 +85,10 @@ func ParseBytesWithCache(schema []byte, namespace string, cache *SchemaCache) (S
return derefSchema(s), nil
}

func serializeType(schema Schema, cache *SchemaCache) string {
return schema.Resolve(cache)
}

func parseType(namespace string, v any, seen seenCache, cache *SchemaCache) (Schema, error) {
switch val := v.(type) {
case nil:
Expand Down
9 changes: 9 additions & 0 deletions schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ func TestParseFiles(t *testing.T) {
assert.Equal(t, avro.String, s.Type())
}

func TestSerialize(t *testing.T) {
s, err := avro.ParseFiles("testdata/superhero-part1.avsc", "testdata/superhero-part2.avsc")

data := avro.Serialize(s)
n, err := avro.Parse(data)
require.NoError(t, err)
assert.Equal(t, avro.Record, n.Type())
}

func TestParseFiles_FileDoesntExist(t *testing.T) {
_, err := avro.ParseFiles("test.something")

Expand Down