1use crate::{
20 AvroResult, Error,
21 bigdecimal::{deserialize_big_decimal, serialize_big_decimal},
22 decimal::Decimal,
23 duration::Duration,
24 error::Details,
25 schema::{
26 DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, Precision, RecordField,
27 RecordSchema, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema,
28 },
29};
30use bigdecimal::BigDecimal;
31use log::{debug, error};
32use serde_json::{Number, Value as JsonValue};
33use std::{
34 borrow::Borrow,
35 collections::{BTreeMap, HashMap},
36 fmt::Debug,
37 hash::BuildHasher,
38 str::FromStr,
39};
40use uuid::Uuid;
41
42fn max_prec_for_len(len: usize) -> Result<usize, Error> {
44 let len = i32::try_from(len).map_err(|e| Details::ConvertLengthToI32(e, len))?;
45 Ok((2.0_f64.powi(8 * len - 1) - 1.0).log10().floor() as usize)
46}
47
48#[derive(Clone, Debug, PartialEq, strum_macros::EnumDiscriminants)]
53#[strum_discriminants(name(ValueKind))]
54pub enum Value {
55 Null,
57 Boolean(bool),
59 Int(i32),
61 Long(i64),
63 Float(f32),
65 Double(f64),
67 Bytes(Vec<u8>),
69 String(String),
71 Fixed(usize, Vec<u8>),
74 Enum(u32, String),
81 Union(u32, Box<Value>),
88 Array(Vec<Value>),
90 Map(HashMap<String, Value>),
92 Record(Vec<(String, Value)>),
99 Date(i32),
104 Decimal(Decimal),
106 BigDecimal(BigDecimal),
108 TimeMillis(i32),
110 TimeMicros(i64),
112 TimestampMillis(i64),
114 TimestampMicros(i64),
116 TimestampNanos(i64),
118 LocalTimestampMillis(i64),
120 LocalTimestampMicros(i64),
122 LocalTimestampNanos(i64),
124 Duration(Duration),
126 Uuid(Uuid),
128}
129
130macro_rules! to_value(
131 ($type:ty, $variant_constructor:expr) => (
132 impl From<$type> for Value {
133 fn from(value: $type) -> Self {
134 $variant_constructor(value)
135 }
136 }
137 );
138);
139
140to_value!(bool, Value::Boolean);
141to_value!(i32, Value::Int);
142to_value!(i64, Value::Long);
143to_value!(f32, Value::Float);
144to_value!(f64, Value::Double);
145to_value!(String, Value::String);
146to_value!(Vec<u8>, Value::Bytes);
147to_value!(uuid::Uuid, Value::Uuid);
148to_value!(Decimal, Value::Decimal);
149to_value!(BigDecimal, Value::BigDecimal);
150to_value!(Duration, Value::Duration);
151
152impl From<()> for Value {
153 fn from(_: ()) -> Self {
154 Self::Null
155 }
156}
157
158impl From<usize> for Value {
159 fn from(value: usize) -> Self {
160 i64::try_from(value)
161 .expect("cannot convert usize to i64")
162 .into()
163 }
164}
165
166impl From<&str> for Value {
167 fn from(value: &str) -> Self {
168 Self::String(value.to_owned())
169 }
170}
171
172impl From<&[u8]> for Value {
173 fn from(value: &[u8]) -> Self {
174 Self::Bytes(value.to_owned())
175 }
176}
177
178impl<T> From<Option<T>> for Value
179where
180 T: Into<Self>,
181{
182 fn from(value: Option<T>) -> Self {
183 Self::Union(
185 value.is_some() as u32,
186 Box::new(value.map_or_else(|| Self::Null, Into::into)),
187 )
188 }
189}
190
191impl<K, V, S> From<HashMap<K, V, S>> for Value
192where
193 K: Into<String>,
194 V: Into<Self>,
195 S: BuildHasher,
196{
197 fn from(value: HashMap<K, V, S>) -> Self {
198 Self::Map(
199 value
200 .into_iter()
201 .map(|(key, value)| (key.into(), value.into()))
202 .collect(),
203 )
204 }
205}
206
207#[derive(Debug, Clone)]
209pub struct Record<'a> {
210 pub fields: Vec<(String, Value)>,
214 schema_lookup: &'a BTreeMap<String, usize>,
215}
216
217impl Record<'_> {
218 pub fn new(schema: &Schema) -> Option<Record<'_>> {
222 match *schema {
223 Schema::Record(RecordSchema {
224 fields: ref schema_fields,
225 lookup: ref schema_lookup,
226 ..
227 }) => {
228 let mut fields = Vec::with_capacity(schema_fields.len());
229 for schema_field in schema_fields.iter() {
230 fields.push((schema_field.name.clone(), Value::Null));
231 }
232
233 Some(Record {
234 fields,
235 schema_lookup,
236 })
237 }
238 _ => None,
239 }
240 }
241
242 pub fn put<V>(&mut self, field: &str, value: V)
248 where
249 V: Into<Value>,
250 {
251 if let Some(&position) = self.schema_lookup.get(field) {
252 self.fields[position].1 = value.into()
253 }
254 }
255
256 pub fn get(&self, field: &str) -> Option<&Value> {
259 self.schema_lookup
260 .get(field)
261 .map(|&position| &self.fields[position].1)
262 }
263}
264
265impl<'a> From<Record<'a>> for Value {
266 fn from(value: Record<'a>) -> Self {
267 Self::Record(value.fields)
268 }
269}
270
271impl From<JsonValue> for Value {
272 fn from(value: JsonValue) -> Self {
273 match value {
274 JsonValue::Null => Self::Null,
275 JsonValue::Bool(b) => b.into(),
276 JsonValue::Number(ref n) if n.is_i64() => {
277 let n = n.as_i64().unwrap();
278 if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
279 Value::Int(n as i32)
280 } else {
281 Value::Long(n)
282 }
283 }
284 JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
285 JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), JsonValue::String(s) => s.into(),
287 JsonValue::Array(items) => Value::Array(items.into_iter().map(Value::from).collect()),
288 JsonValue::Object(items) => Value::Map(
289 items
290 .into_iter()
291 .map(|(key, value)| (key, value.into()))
292 .collect(),
293 ),
294 }
295 }
296}
297
298impl TryFrom<Value> for JsonValue {
300 type Error = crate::error::Error;
301 fn try_from(value: Value) -> AvroResult<Self> {
302 match value {
303 Value::Null => Ok(Self::Null),
304 Value::Boolean(b) => Ok(Self::Bool(b)),
305 Value::Int(i) => Ok(Self::Number(i.into())),
306 Value::Long(l) => Ok(Self::Number(l.into())),
307 Value::Float(f) => Number::from_f64(f.into())
308 .map(Self::Number)
309 .ok_or_else(|| Details::ConvertF64ToJson(f.into()).into()),
310 Value::Double(d) => Number::from_f64(d)
311 .map(Self::Number)
312 .ok_or_else(|| Details::ConvertF64ToJson(d).into()),
313 Value::Bytes(bytes) => Ok(Self::Array(bytes.into_iter().map(|b| b.into()).collect())),
314 Value::String(s) => Ok(Self::String(s)),
315 Value::Fixed(_size, items) => {
316 Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
317 }
318 Value::Enum(_i, s) => Ok(Self::String(s)),
319 Value::Union(_i, b) => Self::try_from(*b),
320 Value::Array(items) => items
321 .into_iter()
322 .map(Self::try_from)
323 .collect::<Result<Vec<_>, _>>()
324 .map(Self::Array),
325 Value::Map(items) => items
326 .into_iter()
327 .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
328 .collect::<Result<Vec<_>, _>>()
329 .map(|v| Self::Object(v.into_iter().collect())),
330 Value::Record(items) => items
331 .into_iter()
332 .map(|(key, value)| Self::try_from(value).map(|v| (key, v)))
333 .collect::<Result<Vec<_>, _>>()
334 .map(|v| Self::Object(v.into_iter().collect())),
335 Value::Date(d) => Ok(Self::Number(d.into())),
336 Value::Decimal(ref d) => <Vec<u8>>::try_from(d)
337 .map(|vec| Self::Array(vec.into_iter().map(|v| v.into()).collect())),
338 Value::BigDecimal(ref bg) => {
339 let vec1: Vec<u8> = serialize_big_decimal(bg)?;
340 Ok(Self::Array(vec1.into_iter().map(|b| b.into()).collect()))
341 }
342 Value::TimeMillis(t) => Ok(Self::Number(t.into())),
343 Value::TimeMicros(t) => Ok(Self::Number(t.into())),
344 Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
345 Value::TimestampMicros(t) => Ok(Self::Number(t.into())),
346 Value::TimestampNanos(t) => Ok(Self::Number(t.into())),
347 Value::LocalTimestampMillis(t) => Ok(Self::Number(t.into())),
348 Value::LocalTimestampMicros(t) => Ok(Self::Number(t.into())),
349 Value::LocalTimestampNanos(t) => Ok(Self::Number(t.into())),
350 Value::Duration(d) => Ok(Self::Array(
351 <[u8; 12]>::from(d).iter().map(|&v| v.into()).collect(),
352 )),
353 Value::Uuid(uuid) => Ok(Self::String(uuid.as_hyphenated().to_string())),
354 }
355 }
356}
357
358impl Value {
359 pub fn validate(&self, schema: &Schema) -> bool {
364 self.validate_schemata(vec![schema])
365 }
366
367 pub fn validate_schemata(&self, schemata: Vec<&Schema>) -> bool {
368 let rs = ResolvedSchema::try_from(schemata.clone())
369 .expect("Schemata didn't successfully resolve");
370 let schemata_len = schemata.len();
371 schemata.iter().any(|schema| {
372 let enclosing_namespace = schema.namespace();
373
374 match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
375 Some(reason) => {
376 let log_message =
377 format!("Invalid value: {self:?} for schema: {schema:?}. Reason: {reason}");
378 if schemata_len == 1 {
379 error!("{log_message}");
380 } else {
381 debug!("{log_message}");
382 };
383 false
384 }
385 None => true,
386 }
387 })
388 }
389
390 fn accumulate(accumulator: Option<String>, other: Option<String>) -> Option<String> {
391 match (accumulator, other) {
392 (None, None) => None,
393 (None, s @ Some(_)) => s,
394 (s @ Some(_), None) => s,
395 (Some(reason1), Some(reason2)) => Some(format!("{reason1}\n{reason2}")),
396 }
397 }
398
399 pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema> + Debug>(
401 &self,
402 schema: &Schema,
403 names: &HashMap<Name, S>,
404 enclosing_namespace: &Namespace,
405 ) -> Option<String> {
406 match (self, schema) {
407 (_, Schema::Ref { name }) => {
408 let name = name.fully_qualified_name(enclosing_namespace);
409 names.get(&name).map_or_else(
410 || {
411 Some(format!(
412 "Unresolved schema reference: '{:?}'. Parsed names: {:?}",
413 name,
414 names.keys()
415 ))
416 },
417 |s| self.validate_internal(s.borrow(), names, &name.namespace),
418 )
419 }
420 (&Value::Null, &Schema::Null) => None,
421 (&Value::Boolean(_), &Schema::Boolean) => None,
422 (&Value::Int(_), &Schema::Int) => None,
423 (&Value::Int(_), &Schema::Date) => None,
424 (&Value::Int(_), &Schema::TimeMillis) => None,
425 (&Value::Int(_), &Schema::Long) => None,
426 (&Value::Long(_), &Schema::Long) => None,
427 (&Value::Long(_), &Schema::TimeMicros) => None,
428 (&Value::Long(_), &Schema::TimestampMillis) => None,
429 (&Value::Long(_), &Schema::TimestampMicros) => None,
430 (&Value::Long(_), &Schema::LocalTimestampMillis) => None,
431 (&Value::Long(_), &Schema::LocalTimestampMicros) => None,
432 (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
433 (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
434 (&Value::TimestampNanos(_), &Schema::TimestampNanos) => None,
435 (&Value::LocalTimestampMicros(_), &Schema::LocalTimestampMicros) => None,
436 (&Value::LocalTimestampMillis(_), &Schema::LocalTimestampMillis) => None,
437 (&Value::LocalTimestampNanos(_), &Schema::LocalTimestampNanos) => None,
438 (&Value::TimeMicros(_), &Schema::TimeMicros) => None,
439 (&Value::TimeMillis(_), &Schema::TimeMillis) => None,
440 (&Value::Date(_), &Schema::Date) => None,
441 (&Value::Decimal(_), &Schema::Decimal { .. }) => None,
442 (&Value::BigDecimal(_), &Schema::BigDecimal) => None,
443 (&Value::Duration(_), &Schema::Duration) => None,
444 (&Value::Uuid(_), &Schema::Uuid) => None,
445 (&Value::Float(_), &Schema::Float) => None,
446 (&Value::Float(_), &Schema::Double) => None,
447 (&Value::Double(_), &Schema::Double) => None,
448 (&Value::Bytes(_), &Schema::Bytes) => None,
449 (&Value::Bytes(_), &Schema::Decimal { .. }) => None,
450 (&Value::String(_), &Schema::String) => None,
451 (&Value::String(_), &Schema::Uuid) => None,
452 (&Value::Fixed(n, _), &Schema::Fixed(FixedSchema { size, .. })) => {
453 if n != size {
454 Some(format!(
455 "The value's size ({n}) is different than the schema's size ({size})"
456 ))
457 } else {
458 None
459 }
460 }
461 (Value::Bytes(b), &Schema::Fixed(FixedSchema { size, .. })) => {
462 if b.len() != size {
463 Some(format!(
464 "The bytes' length ({}) is different than the schema's size ({})",
465 b.len(),
466 size
467 ))
468 } else {
469 None
470 }
471 }
472 (&Value::Fixed(n, _), &Schema::Duration) => {
473 if n != 12 {
474 Some(format!(
475 "The value's size ('{n}') must be exactly 12 to be a Duration"
476 ))
477 } else {
478 None
479 }
480 }
481 (&Value::Fixed(_n, _), &Schema::Decimal { .. }) => None,
483 (Value::String(s), Schema::Enum(EnumSchema { symbols, .. })) => {
484 if !symbols.contains(s) {
485 Some(format!("'{s}' is not a member of the possible symbols"))
486 } else {
487 None
488 }
489 }
490 (
491 &Value::Enum(i, ref s),
492 Schema::Enum(EnumSchema {
493 symbols, default, ..
494 }),
495 ) => symbols
496 .get(i as usize)
497 .map(|ref symbol| {
498 if symbol != &s {
499 Some(format!("Symbol '{s}' is not at position '{i}'"))
500 } else {
501 None
502 }
503 })
504 .unwrap_or_else(|| match default {
505 Some(_) => None,
506 None => Some(format!("No symbol at position '{i}'")),
507 }),
508 (&Value::Union(i, ref value), Schema::Union(inner)) => inner
510 .variants()
511 .get(i as usize)
512 .map(|schema| value.validate_internal(schema, names, enclosing_namespace))
513 .unwrap_or_else(|| Some(format!("No schema in the union at position '{i}'"))),
514 (v, Schema::Union(inner)) => {
515 match inner.find_schema_with_known_schemata(v, Some(names), enclosing_namespace) {
516 Some(_) => None,
517 None => Some("Could not find matching type in union".to_string()),
518 }
519 }
520 (Value::Array(items), Schema::Array(inner)) => items.iter().fold(None, |acc, item| {
521 Value::accumulate(
522 acc,
523 item.validate_internal(&inner.items, names, enclosing_namespace),
524 )
525 }),
526 (Value::Map(items), Schema::Map(inner)) => {
527 items.iter().fold(None, |acc, (_, value)| {
528 Value::accumulate(
529 acc,
530 value.validate_internal(&inner.types, names, enclosing_namespace),
531 )
532 })
533 }
534 (
535 Value::Record(record_fields),
536 Schema::Record(RecordSchema {
537 fields,
538 lookup,
539 name,
540 ..
541 }),
542 ) => {
543 let non_nullable_fields_count =
544 fields.iter().filter(|&rf| !rf.is_nullable()).count();
545
546 if record_fields.len() < non_nullable_fields_count {
548 return Some(format!(
549 "The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
550 record_fields.len(),
551 non_nullable_fields_count
552 ));
553 } else if record_fields.len() > fields.len() {
554 return Some(format!(
555 "The value's records length ({}) is greater than the schema's ({} fields)",
556 record_fields.len(),
557 fields.len(),
558 ));
559 }
560
561 record_fields
562 .iter()
563 .fold(None, |acc, (field_name, record_field)| {
564 let record_namespace = if name.namespace.is_none() {
565 enclosing_namespace
566 } else {
567 &name.namespace
568 };
569 match lookup.get(field_name) {
570 Some(idx) => {
571 let field = &fields[*idx];
572 Value::accumulate(
573 acc,
574 record_field.validate_internal(
575 &field.schema,
576 names,
577 record_namespace,
578 ),
579 )
580 }
581 None => Value::accumulate(
582 acc,
583 Some(format!("There is no schema field for field '{field_name}'")),
584 ),
585 }
586 })
587 }
588 (Value::Map(items), Schema::Record(RecordSchema { fields, .. })) => {
589 fields.iter().fold(None, |acc, field| {
590 if let Some(item) = items.get(&field.name) {
591 let res = item.validate_internal(&field.schema, names, enclosing_namespace);
592 Value::accumulate(acc, res)
593 } else if !field.is_nullable() {
594 Value::accumulate(
595 acc,
596 Some(format!(
597 "Field with name '{:?}' is not a member of the map items",
598 field.name
599 )),
600 )
601 } else {
602 acc
603 }
604 })
605 }
606 (v, s) => Some(format!(
607 "Unsupported value-schema combination! Value: {v:?}, schema: {s:?}"
608 )),
609 }
610 }
611
612 pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
619 self.resolve_schemata(schema, Vec::with_capacity(0))
620 }
621
622 pub fn resolve_schemata(self, schema: &Schema, schemata: Vec<&Schema>) -> AvroResult<Self> {
629 let enclosing_namespace = schema.namespace();
630 let rs = if schemata.is_empty() {
631 ResolvedSchema::try_from(schema)?
632 } else {
633 ResolvedSchema::try_from(schemata)?
634 };
635 self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
636 }
637
638 pub(crate) fn resolve_internal<S: Borrow<Schema> + Debug>(
639 mut self,
640 schema: &Schema,
641 names: &HashMap<Name, S>,
642 enclosing_namespace: &Namespace,
643 field_default: &Option<JsonValue>,
644 ) -> AvroResult<Self> {
645 if SchemaKind::from(&self) == SchemaKind::Union
647 && SchemaKind::from(schema) != SchemaKind::Union
648 {
649 let v = match self {
651 Value::Union(_i, b) => *b,
652 _ => unreachable!(),
653 };
654 self = v;
655 }
656 match *schema {
657 Schema::Ref { ref name } => {
658 let name = name.fully_qualified_name(enclosing_namespace);
659
660 if let Some(resolved) = names.get(&name) {
661 debug!("Resolved {name:?}");
662 self.resolve_internal(resolved.borrow(), names, &name.namespace, field_default)
663 } else {
664 error!("Failed to resolve schema {name:?}");
665 Err(Details::SchemaResolutionError(name.clone()).into())
666 }
667 }
668 Schema::Null => self.resolve_null(),
669 Schema::Boolean => self.resolve_boolean(),
670 Schema::Int => self.resolve_int(),
671 Schema::Long => self.resolve_long(),
672 Schema::Float => self.resolve_float(),
673 Schema::Double => self.resolve_double(),
674 Schema::Bytes => self.resolve_bytes(),
675 Schema::String => self.resolve_string(),
676 Schema::Fixed(FixedSchema { size, .. }) => self.resolve_fixed(size),
677 Schema::Union(ref inner) => {
678 self.resolve_union(inner, names, enclosing_namespace, field_default)
679 }
680 Schema::Enum(EnumSchema {
681 ref symbols,
682 ref default,
683 ..
684 }) => self.resolve_enum(symbols, default, field_default),
685 Schema::Array(ref inner) => {
686 self.resolve_array(&inner.items, names, enclosing_namespace)
687 }
688 Schema::Map(ref inner) => self.resolve_map(&inner.types, names, enclosing_namespace),
689 Schema::Record(RecordSchema { ref fields, .. }) => {
690 self.resolve_record(fields, names, enclosing_namespace)
691 }
692 Schema::Decimal(DecimalSchema {
693 scale,
694 precision,
695 ref inner,
696 }) => self.resolve_decimal(precision, scale, inner),
697 Schema::BigDecimal => self.resolve_bigdecimal(),
698 Schema::Date => self.resolve_date(),
699 Schema::TimeMillis => self.resolve_time_millis(),
700 Schema::TimeMicros => self.resolve_time_micros(),
701 Schema::TimestampMillis => self.resolve_timestamp_millis(),
702 Schema::TimestampMicros => self.resolve_timestamp_micros(),
703 Schema::TimestampNanos => self.resolve_timestamp_nanos(),
704 Schema::LocalTimestampMillis => self.resolve_local_timestamp_millis(),
705 Schema::LocalTimestampMicros => self.resolve_local_timestamp_micros(),
706 Schema::LocalTimestampNanos => self.resolve_local_timestamp_nanos(),
707 Schema::Duration => self.resolve_duration(),
708 Schema::Uuid => self.resolve_uuid(),
709 }
710 }
711
712 fn resolve_uuid(self) -> Result<Self, Error> {
713 Ok(match self {
714 uuid @ Value::Uuid(_) => uuid,
715 Value::String(ref string) => {
716 Value::Uuid(Uuid::from_str(string).map_err(Details::ConvertStrToUuid)?)
717 }
718 other => return Err(Details::GetUuid(other).into()),
719 })
720 }
721
722 fn resolve_bigdecimal(self) -> Result<Self, Error> {
723 Ok(match self {
724 bg @ Value::BigDecimal(_) => bg,
725 Value::Bytes(b) => Value::BigDecimal(deserialize_big_decimal(&b).unwrap()),
726 other => return Err(Details::GetBigDecimal(other).into()),
727 })
728 }
729
730 fn resolve_duration(self) -> Result<Self, Error> {
731 Ok(match self {
732 duration @ Value::Duration { .. } => duration,
733 Value::Fixed(size, bytes) => {
734 if size != 12 {
735 return Err(Details::GetDecimalFixedBytes(size).into());
736 }
737 Value::Duration(Duration::from([
738 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
739 bytes[8], bytes[9], bytes[10], bytes[11],
740 ]))
741 }
742 other => return Err(Details::ResolveDuration(other).into()),
743 })
744 }
745
746 fn resolve_decimal(
747 self,
748 precision: Precision,
749 scale: Scale,
750 inner: &Schema,
751 ) -> Result<Self, Error> {
752 if scale > precision {
753 return Err(Details::GetScaleAndPrecision { scale, precision }.into());
754 }
755 match inner {
756 &Schema::Fixed(FixedSchema { size, .. }) => {
757 if max_prec_for_len(size)? < precision {
758 return Err(Details::GetScaleWithFixedSize { size, precision }.into());
759 }
760 }
761 Schema::Bytes => (),
762 _ => return Err(Details::ResolveDecimalSchema(inner.into()).into()),
763 };
764 match self {
765 Value::Decimal(num) => {
766 let num_bytes = num.len();
767 if max_prec_for_len(num_bytes)? < precision {
768 Err(Details::ComparePrecisionAndSize {
769 precision,
770 num_bytes,
771 }
772 .into())
773 } else {
774 Ok(Value::Decimal(num))
775 }
776 }
778 Value::Fixed(_, bytes) | Value::Bytes(bytes) => {
779 if max_prec_for_len(bytes.len())? < precision {
780 Err(Details::ComparePrecisionAndSize {
781 precision,
782 num_bytes: bytes.len(),
783 }
784 .into())
785 } else {
786 Ok(Value::Decimal(Decimal::from(bytes)))
788 }
789 }
790 other => Err(Details::ResolveDecimal(other).into()),
791 }
792 }
793
794 fn resolve_date(self) -> Result<Self, Error> {
795 match self {
796 Value::Date(d) | Value::Int(d) => Ok(Value::Date(d)),
797 other => Err(Details::GetDate(other).into()),
798 }
799 }
800
801 fn resolve_time_millis(self) -> Result<Self, Error> {
802 match self {
803 Value::TimeMillis(t) | Value::Int(t) => Ok(Value::TimeMillis(t)),
804 other => Err(Details::GetTimeMillis(other).into()),
805 }
806 }
807
808 fn resolve_time_micros(self) -> Result<Self, Error> {
809 match self {
810 Value::TimeMicros(t) | Value::Long(t) => Ok(Value::TimeMicros(t)),
811 Value::Int(t) => Ok(Value::TimeMicros(i64::from(t))),
812 other => Err(Details::GetTimeMicros(other).into()),
813 }
814 }
815
816 fn resolve_timestamp_millis(self) -> Result<Self, Error> {
817 match self {
818 Value::TimestampMillis(ts) | Value::Long(ts) => Ok(Value::TimestampMillis(ts)),
819 Value::Int(ts) => Ok(Value::TimestampMillis(i64::from(ts))),
820 other => Err(Details::GetTimestampMillis(other).into()),
821 }
822 }
823
824 fn resolve_timestamp_micros(self) -> Result<Self, Error> {
825 match self {
826 Value::TimestampMicros(ts) | Value::Long(ts) => Ok(Value::TimestampMicros(ts)),
827 Value::Int(ts) => Ok(Value::TimestampMicros(i64::from(ts))),
828 other => Err(Details::GetTimestampMicros(other).into()),
829 }
830 }
831
832 fn resolve_timestamp_nanos(self) -> Result<Self, Error> {
833 match self {
834 Value::TimestampNanos(ts) | Value::Long(ts) => Ok(Value::TimestampNanos(ts)),
835 Value::Int(ts) => Ok(Value::TimestampNanos(i64::from(ts))),
836 other => Err(Details::GetTimestampNanos(other).into()),
837 }
838 }
839
840 fn resolve_local_timestamp_millis(self) -> Result<Self, Error> {
841 match self {
842 Value::LocalTimestampMillis(ts) | Value::Long(ts) => {
843 Ok(Value::LocalTimestampMillis(ts))
844 }
845 Value::Int(ts) => Ok(Value::LocalTimestampMillis(i64::from(ts))),
846 other => Err(Details::GetLocalTimestampMillis(other).into()),
847 }
848 }
849
850 fn resolve_local_timestamp_micros(self) -> Result<Self, Error> {
851 match self {
852 Value::LocalTimestampMicros(ts) | Value::Long(ts) => {
853 Ok(Value::LocalTimestampMicros(ts))
854 }
855 Value::Int(ts) => Ok(Value::LocalTimestampMicros(i64::from(ts))),
856 other => Err(Details::GetLocalTimestampMicros(other).into()),
857 }
858 }
859
860 fn resolve_local_timestamp_nanos(self) -> Result<Self, Error> {
861 match self {
862 Value::LocalTimestampNanos(ts) | Value::Long(ts) => Ok(Value::LocalTimestampNanos(ts)),
863 Value::Int(ts) => Ok(Value::LocalTimestampNanos(i64::from(ts))),
864 other => Err(Details::GetLocalTimestampNanos(other).into()),
865 }
866 }
867
868 fn resolve_null(self) -> Result<Self, Error> {
869 match self {
870 Value::Null => Ok(Value::Null),
871 other => Err(Details::GetNull(other).into()),
872 }
873 }
874
875 fn resolve_boolean(self) -> Result<Self, Error> {
876 match self {
877 Value::Boolean(b) => Ok(Value::Boolean(b)),
878 other => Err(Details::GetBoolean(other).into()),
879 }
880 }
881
882 fn resolve_int(self) -> Result<Self, Error> {
883 match self {
884 Value::Int(n) => Ok(Value::Int(n)),
885 Value::Long(n) => Ok(Value::Int(n as i32)),
886 other => Err(Details::GetInt(other).into()),
887 }
888 }
889
890 fn resolve_long(self) -> Result<Self, Error> {
891 match self {
892 Value::Int(n) => Ok(Value::Long(i64::from(n))),
893 Value::Long(n) => Ok(Value::Long(n)),
894 other => Err(Details::GetLong(other).into()),
895 }
896 }
897
898 fn resolve_float(self) -> Result<Self, Error> {
899 match self {
900 Value::Int(n) => Ok(Value::Float(n as f32)),
901 Value::Long(n) => Ok(Value::Float(n as f32)),
902 Value::Float(x) => Ok(Value::Float(x)),
903 Value::Double(x) => Ok(Value::Float(x as f32)),
904 Value::String(ref x) => match Self::parse_special_float(x) {
905 Some(f) => Ok(Value::Float(f)),
906 None => Err(Details::GetFloat(self).into()),
907 },
908 other => Err(Details::GetFloat(other).into()),
909 }
910 }
911
912 fn resolve_double(self) -> Result<Self, Error> {
913 match self {
914 Value::Int(n) => Ok(Value::Double(f64::from(n))),
915 Value::Long(n) => Ok(Value::Double(n as f64)),
916 Value::Float(x) => Ok(Value::Double(f64::from(x))),
917 Value::Double(x) => Ok(Value::Double(x)),
918 Value::String(ref x) => match Self::parse_special_float(x) {
919 Some(f) => Ok(Value::Double(f64::from(f))),
920 None => Err(Details::GetDouble(self).into()),
921 },
922 other => Err(Details::GetDouble(other).into()),
923 }
924 }
925
926 fn parse_special_float(value: &str) -> Option<f32> {
929 match value {
930 "NaN" => Some(f32::NAN),
931 "INF" | "Infinity" => Some(f32::INFINITY),
932 "-INF" | "-Infinity" => Some(f32::NEG_INFINITY),
933 _ => None,
934 }
935 }
936
937 fn resolve_bytes(self) -> Result<Self, Error> {
938 match self {
939 Value::Bytes(bytes) => Ok(Value::Bytes(bytes)),
940 Value::String(s) => Ok(Value::Bytes(s.into_bytes())),
941 Value::Array(items) => Ok(Value::Bytes(
942 items
943 .into_iter()
944 .map(Value::try_u8)
945 .collect::<Result<Vec<_>, _>>()?,
946 )),
947 other => Err(Details::GetBytes(other).into()),
948 }
949 }
950
951 fn resolve_string(self) -> Result<Self, Error> {
952 match self {
953 Value::String(s) => Ok(Value::String(s)),
954 Value::Bytes(bytes) | Value::Fixed(_, bytes) => Ok(Value::String(
955 String::from_utf8(bytes).map_err(Details::ConvertToUtf8)?,
956 )),
957 other => Err(Details::GetString(other).into()),
958 }
959 }
960
961 fn resolve_fixed(self, size: usize) -> Result<Self, Error> {
962 match self {
963 Value::Fixed(n, bytes) => {
964 if n == size {
965 Ok(Value::Fixed(n, bytes))
966 } else {
967 Err(Details::CompareFixedSizes { size, n }.into())
968 }
969 }
970 Value::String(s) => Ok(Value::Fixed(s.len(), s.into_bytes())),
971 Value::Bytes(s) => {
972 if s.len() == size {
973 Ok(Value::Fixed(size, s))
974 } else {
975 Err(Details::CompareFixedSizes { size, n: s.len() }.into())
976 }
977 }
978 other => Err(Details::GetStringForFixed(other).into()),
979 }
980 }
981
982 pub(crate) fn resolve_enum(
983 self,
984 symbols: &[String],
985 enum_default: &Option<String>,
986 _field_default: &Option<JsonValue>,
987 ) -> Result<Self, Error> {
988 let validate_symbol = |symbol: String, symbols: &[String]| {
989 if let Some(index) = symbols.iter().position(|item| item == &symbol) {
990 Ok(Value::Enum(index as u32, symbol))
991 } else {
992 match enum_default {
993 Some(default) => {
994 if let Some(index) = symbols.iter().position(|item| item == default) {
995 Ok(Value::Enum(index as u32, default.clone()))
996 } else {
997 Err(Details::GetEnumDefault {
998 symbol,
999 symbols: symbols.into(),
1000 }
1001 .into())
1002 }
1003 }
1004 _ => Err(Details::GetEnumDefault {
1005 symbol,
1006 symbols: symbols.into(),
1007 }
1008 .into()),
1009 }
1010 }
1011 };
1012
1013 match self {
1014 Value::Enum(_raw_index, s) => validate_symbol(s, symbols),
1015 Value::String(s) => validate_symbol(s, symbols),
1016 other => Err(Details::GetEnum(other).into()),
1017 }
1018 }
1019
1020 fn resolve_union<S: Borrow<Schema> + Debug>(
1021 self,
1022 schema: &UnionSchema,
1023 names: &HashMap<Name, S>,
1024 enclosing_namespace: &Namespace,
1025 field_default: &Option<JsonValue>,
1026 ) -> Result<Self, Error> {
1027 let v = match self {
1028 Value::Union(_i, v) => *v,
1030 v => v,
1032 };
1033 let (i, inner) = schema
1034 .find_schema_with_known_schemata(&v, Some(names), enclosing_namespace)
1035 .ok_or_else(|| Details::FindUnionVariant {
1036 schema: schema.clone(),
1037 value: v.clone(),
1038 })?;
1039
1040 Ok(Value::Union(
1041 i as u32,
1042 Box::new(v.resolve_internal(inner, names, enclosing_namespace, field_default)?),
1043 ))
1044 }
1045
1046 fn resolve_array<S: Borrow<Schema> + Debug>(
1047 self,
1048 schema: &Schema,
1049 names: &HashMap<Name, S>,
1050 enclosing_namespace: &Namespace,
1051 ) -> Result<Self, Error> {
1052 match self {
1053 Value::Array(items) => Ok(Value::Array(
1054 items
1055 .into_iter()
1056 .map(|item| item.resolve_internal(schema, names, enclosing_namespace, &None))
1057 .collect::<Result<_, _>>()?,
1058 )),
1059 other => Err(Details::GetArray {
1060 expected: schema.into(),
1061 other,
1062 }
1063 .into()),
1064 }
1065 }
1066
1067 fn resolve_map<S: Borrow<Schema> + Debug>(
1068 self,
1069 schema: &Schema,
1070 names: &HashMap<Name, S>,
1071 enclosing_namespace: &Namespace,
1072 ) -> Result<Self, Error> {
1073 match self {
1074 Value::Map(items) => Ok(Value::Map(
1075 items
1076 .into_iter()
1077 .map(|(key, value)| {
1078 value
1079 .resolve_internal(schema, names, enclosing_namespace, &None)
1080 .map(|value| (key, value))
1081 })
1082 .collect::<Result<_, _>>()?,
1083 )),
1084 other => Err(Details::GetMap {
1085 expected: schema.into(),
1086 other,
1087 }
1088 .into()),
1089 }
1090 }
1091
1092 fn resolve_record<S: Borrow<Schema> + Debug>(
1093 self,
1094 fields: &[RecordField],
1095 names: &HashMap<Name, S>,
1096 enclosing_namespace: &Namespace,
1097 ) -> Result<Self, Error> {
1098 let mut items = match self {
1099 Value::Map(items) => Ok(items),
1100 Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
1101 other => Err(Error::new(Details::GetRecord {
1102 expected: fields
1103 .iter()
1104 .map(|field| (field.name.clone(), field.schema.clone().into()))
1105 .collect(),
1106 other,
1107 })),
1108 }?;
1109
1110 let new_fields = fields
1111 .iter()
1112 .map(|field| {
1113 let value = match items.remove(&field.name) {
1114 Some(value) => value,
1115 None => match field.default {
1116 Some(ref value) => match field.schema {
1117 Schema::Enum(EnumSchema {
1118 ref symbols,
1119 ref default,
1120 ..
1121 }) => Value::from(value.clone()).resolve_enum(
1122 symbols,
1123 default,
1124 &field.default.clone(),
1125 )?,
1126 Schema::Union(ref union_schema) => {
1127 let first = &union_schema.variants()[0];
1128 match first {
1131 Schema::Null => Value::Union(0, Box::new(Value::Null)),
1132 _ => Value::Union(
1133 0,
1134 Box::new(Value::from(value.clone()).resolve_internal(
1135 first,
1136 names,
1137 enclosing_namespace,
1138 &field.default,
1139 )?),
1140 ),
1141 }
1142 }
1143 _ => Value::from(value.clone()),
1144 },
1145 None => {
1146 return Err(Details::GetField(field.name.clone()).into());
1147 }
1148 },
1149 };
1150 value
1151 .resolve_internal(&field.schema, names, enclosing_namespace, &field.default)
1152 .map(|value| (field.name.clone(), value))
1153 })
1154 .collect::<Result<Vec<_>, _>>()?;
1155
1156 Ok(Value::Record(new_fields))
1157 }
1158
1159 fn try_u8(self) -> AvroResult<u8> {
1160 let int = self.resolve(&Schema::Int)?;
1161 if let Value::Int(n) = int {
1162 if n >= 0 && n <= i32::from(u8::MAX) {
1163 return Ok(n as u8);
1164 }
1165 }
1166
1167 Err(Details::GetU8(int).into())
1168 }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173 use super::*;
1174 use crate::{
1175 duration::{Days, Millis, Months},
1176 error::Details,
1177 schema::RecordFieldOrder,
1178 };
1179 use apache_avro_test_helper::{
1180 TestResult,
1181 logger::{assert_logged, assert_not_logged},
1182 };
1183 use num_bigint::BigInt;
1184 use pretty_assertions::assert_eq;
1185 use serde_json::json;
1186
1187 #[test]
1188 fn avro_3809_validate_nested_records_with_implicit_namespace() -> TestResult {
1189 let schema = Schema::parse_str(
1190 r#"{
1191 "name": "record_name",
1192 "namespace": "space",
1193 "type": "record",
1194 "fields": [
1195 {
1196 "name": "outer_field_1",
1197 "type": {
1198 "type": "record",
1199 "name": "middle_record_name",
1200 "namespace": "middle_namespace",
1201 "fields": [
1202 {
1203 "name": "middle_field_1",
1204 "type": {
1205 "type": "record",
1206 "name": "inner_record_name",
1207 "fields": [
1208 { "name": "inner_field_1", "type": "double" }
1209 ]
1210 }
1211 },
1212 { "name": "middle_field_2", "type": "inner_record_name" }
1213 ]
1214 }
1215 }
1216 ]
1217 }"#,
1218 )?;
1219 let value = Value::Record(vec![(
1220 "outer_field_1".into(),
1221 Value::Record(vec![
1222 (
1223 "middle_field_1".into(),
1224 Value::Record(vec![("inner_field_1".into(), Value::Double(1.2f64))]),
1225 ),
1226 (
1227 "middle_field_2".into(),
1228 Value::Record(vec![("inner_field_1".into(), Value::Double(1.6f64))]),
1229 ),
1230 ]),
1231 )]);
1232
1233 assert!(value.validate(&schema));
1234 Ok(())
1235 }
1236
1237 #[test]
1238 fn validate() -> TestResult {
1239 let value_schema_valid = vec![
1240 (Value::Int(42), Schema::Int, true, ""),
1241 (Value::Int(43), Schema::Long, true, ""),
1242 (Value::Float(43.2), Schema::Float, true, ""),
1243 (Value::Float(45.9), Schema::Double, true, ""),
1244 (
1245 Value::Int(42),
1246 Schema::Boolean,
1247 false,
1248 "Invalid value: Int(42) for schema: Boolean. Reason: Unsupported value-schema combination! Value: Int(42), schema: Boolean",
1249 ),
1250 (
1251 Value::Union(0, Box::new(Value::Null)),
1252 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1253 true,
1254 "",
1255 ),
1256 (
1257 Value::Union(1, Box::new(Value::Int(42))),
1258 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1259 true,
1260 "",
1261 ),
1262 (
1263 Value::Union(0, Box::new(Value::Null)),
1264 Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int])?),
1265 false,
1266 "Invalid value: Union(0, Null) for schema: Union(UnionSchema { schemas: [Double, Int], variant_index: {Int: 1, Double: 0} }). Reason: Unsupported value-schema combination! Value: Null, schema: Double",
1267 ),
1268 (
1269 Value::Union(3, Box::new(Value::Int(42))),
1270 Schema::Union(UnionSchema::new(vec![
1271 Schema::Null,
1272 Schema::Double,
1273 Schema::String,
1274 Schema::Int,
1275 ])?),
1276 true,
1277 "",
1278 ),
1279 (
1280 Value::Union(1, Box::new(Value::Long(42i64))),
1281 Schema::Union(UnionSchema::new(vec![
1282 Schema::Null,
1283 Schema::TimestampMillis,
1284 ])?),
1285 true,
1286 "",
1287 ),
1288 (
1289 Value::Union(2, Box::new(Value::Long(1_i64))),
1290 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1291 false,
1292 "Invalid value: Union(2, Long(1)) for schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }). Reason: No schema in the union at position '2'",
1293 ),
1294 (
1295 Value::Array(vec![Value::Long(42i64)]),
1296 Schema::array(Schema::Long),
1297 true,
1298 "",
1299 ),
1300 (
1301 Value::Array(vec![Value::Boolean(true)]),
1302 Schema::array(Schema::Long),
1303 false,
1304 "Invalid value: Array([Boolean(true)]) for schema: Array(ArraySchema { items: Long, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(true), schema: Long",
1305 ),
1306 (
1307 Value::Record(vec![]),
1308 Schema::Null,
1309 false,
1310 "Invalid value: Record([]) for schema: Null. Reason: Unsupported value-schema combination! Value: Record([]), schema: Null",
1311 ),
1312 (
1313 Value::Fixed(12, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
1314 Schema::Duration,
1315 true,
1316 "",
1317 ),
1318 (
1319 Value::Fixed(11, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1320 Schema::Duration,
1321 false,
1322 "Invalid value: Fixed(11, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) for schema: Duration. Reason: The value's size ('11') must be exactly 12 to be a Duration",
1323 ),
1324 (
1325 Value::Record(vec![("unknown_field_name".to_string(), Value::Null)]),
1326 Schema::Record(RecordSchema {
1327 name: Name::new("record_name").unwrap(),
1328 aliases: None,
1329 doc: None,
1330 fields: vec![RecordField {
1331 name: "field_name".to_string(),
1332 doc: None,
1333 default: None,
1334 aliases: None,
1335 schema: Schema::Int,
1336 order: RecordFieldOrder::Ignore,
1337 position: 0,
1338 custom_attributes: Default::default(),
1339 }],
1340 lookup: Default::default(),
1341 attributes: Default::default(),
1342 }),
1343 false,
1344 r#"Invalid value: Record([("unknown_field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }). Reason: There is no schema field for field 'unknown_field_name'"#,
1345 ),
1346 (
1347 Value::Record(vec![("field_name".to_string(), Value::Null)]),
1348 Schema::Record(RecordSchema {
1349 name: Name::new("record_name").unwrap(),
1350 aliases: None,
1351 doc: None,
1352 fields: vec![RecordField {
1353 name: "field_name".to_string(),
1354 doc: None,
1355 default: None,
1356 aliases: None,
1357 schema: Schema::Ref {
1358 name: Name::new("missing").unwrap(),
1359 },
1360 order: RecordFieldOrder::Ignore,
1361 position: 0,
1362 custom_attributes: Default::default(),
1363 }],
1364 lookup: [("field_name".to_string(), 0)].iter().cloned().collect(),
1365 attributes: Default::default(),
1366 }),
1367 false,
1368 r#"Invalid value: Record([("field_name", Null)]) for schema: Record(RecordSchema { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, aliases: None, default: None, schema: Ref { name: Name { name: "missing", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {"field_name": 0}, attributes: {} }). Reason: Unresolved schema reference: 'Name { name: "missing", namespace: None }'. Parsed names: []"#,
1369 ),
1370 ];
1371
1372 for (value, schema, valid, expected_err_message) in value_schema_valid.into_iter() {
1373 let err_message =
1374 value.validate_internal::<Schema>(&schema, &HashMap::default(), &None);
1375 assert_eq!(valid, err_message.is_none());
1376 if !valid {
1377 let full_err_message = format!(
1378 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1379 value,
1380 schema,
1381 err_message.unwrap()
1382 );
1383 assert_eq!(expected_err_message, full_err_message);
1384 }
1385 }
1386
1387 Ok(())
1388 }
1389
1390 #[test]
1391 fn validate_fixed() -> TestResult {
1392 let schema = Schema::Fixed(FixedSchema {
1393 size: 4,
1394 name: Name::new("some_fixed").unwrap(),
1395 aliases: None,
1396 doc: None,
1397 default: None,
1398 attributes: Default::default(),
1399 });
1400
1401 assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
1402 let value = Value::Fixed(5, vec![0, 0, 0, 0, 0]);
1403 assert!(!value.validate(&schema));
1404 assert_logged(
1405 format!(
1406 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1407 value, schema, "The value's size (5) is different than the schema's size (4)"
1408 )
1409 .as_str(),
1410 );
1411
1412 assert!(Value::Bytes(vec![0, 0, 0, 0]).validate(&schema));
1413 let value = Value::Bytes(vec![0, 0, 0, 0, 0]);
1414 assert!(!value.validate(&schema));
1415 assert_logged(
1416 format!(
1417 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1418 value, schema, "The bytes' length (5) is different than the schema's size (4)"
1419 )
1420 .as_str(),
1421 );
1422
1423 Ok(())
1424 }
1425
1426 #[test]
1427 fn validate_enum() -> TestResult {
1428 let schema = Schema::Enum(EnumSchema {
1429 name: Name::new("some_enum").unwrap(),
1430 aliases: None,
1431 doc: None,
1432 symbols: vec![
1433 "spades".to_string(),
1434 "hearts".to_string(),
1435 "diamonds".to_string(),
1436 "clubs".to_string(),
1437 ],
1438 default: None,
1439 attributes: Default::default(),
1440 });
1441
1442 assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
1443 assert!(Value::String("spades".to_string()).validate(&schema));
1444
1445 let value = Value::Enum(1, "spades".to_string());
1446 assert!(!value.validate(&schema));
1447 assert_logged(
1448 format!(
1449 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1450 value, schema, "Symbol 'spades' is not at position '1'"
1451 )
1452 .as_str(),
1453 );
1454
1455 let value = Value::Enum(1000, "spades".to_string());
1456 assert!(!value.validate(&schema));
1457 assert_logged(
1458 format!(
1459 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1460 value, schema, "No symbol at position '1000'"
1461 )
1462 .as_str(),
1463 );
1464
1465 let value = Value::String("lorem".to_string());
1466 assert!(!value.validate(&schema));
1467 assert_logged(
1468 format!(
1469 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1470 value, schema, "'lorem' is not a member of the possible symbols"
1471 )
1472 .as_str(),
1473 );
1474
1475 let other_schema = Schema::Enum(EnumSchema {
1476 name: Name::new("some_other_enum").unwrap(),
1477 aliases: None,
1478 doc: None,
1479 symbols: vec![
1480 "hearts".to_string(),
1481 "diamonds".to_string(),
1482 "clubs".to_string(),
1483 "spades".to_string(),
1484 ],
1485 default: None,
1486 attributes: Default::default(),
1487 });
1488
1489 let value = Value::Enum(0, "spades".to_string());
1490 assert!(!value.validate(&other_schema));
1491 assert_logged(
1492 format!(
1493 "Invalid value: {:?} for schema: {:?}. Reason: {}",
1494 value, other_schema, "Symbol 'spades' is not at position '0'"
1495 )
1496 .as_str(),
1497 );
1498
1499 Ok(())
1500 }
1501
1502 #[test]
1503 fn validate_record() -> TestResult {
1504 let schema = Schema::Record(RecordSchema {
1517 name: Name::new("some_record").unwrap(),
1518 aliases: None,
1519 doc: None,
1520 fields: vec![
1521 RecordField {
1522 name: "a".to_string(),
1523 doc: None,
1524 default: None,
1525 aliases: None,
1526 schema: Schema::Long,
1527 order: RecordFieldOrder::Ascending,
1528 position: 0,
1529 custom_attributes: Default::default(),
1530 },
1531 RecordField {
1532 name: "b".to_string(),
1533 doc: None,
1534 default: None,
1535 aliases: None,
1536 schema: Schema::String,
1537 order: RecordFieldOrder::Ascending,
1538 position: 1,
1539 custom_attributes: Default::default(),
1540 },
1541 RecordField {
1542 name: "c".to_string(),
1543 doc: None,
1544 default: Some(JsonValue::Null),
1545 aliases: None,
1546 schema: Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
1547 order: RecordFieldOrder::Ascending,
1548 position: 2,
1549 custom_attributes: Default::default(),
1550 },
1551 ],
1552 lookup: [
1553 ("a".to_string(), 0),
1554 ("b".to_string(), 1),
1555 ("c".to_string(), 2),
1556 ]
1557 .iter()
1558 .cloned()
1559 .collect(),
1560 attributes: Default::default(),
1561 });
1562
1563 assert!(
1564 Value::Record(vec![
1565 ("a".to_string(), Value::Long(42i64)),
1566 ("b".to_string(), Value::String("foo".to_string())),
1567 ])
1568 .validate(&schema)
1569 );
1570
1571 let value = Value::Record(vec![
1572 ("b".to_string(), Value::String("foo".to_string())),
1573 ("a".to_string(), Value::Long(42i64)),
1574 ]);
1575 assert!(value.validate(&schema));
1576
1577 let value = Value::Record(vec![
1578 ("a".to_string(), Value::Boolean(false)),
1579 ("b".to_string(), Value::String("foo".to_string())),
1580 ]);
1581 assert!(!value.validate(&schema));
1582 assert_logged(
1583 r#"Invalid value: Record([("a", Boolean(false)), ("b", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Unsupported value-schema combination! Value: Boolean(false), schema: Long"#,
1584 );
1585
1586 let value = Value::Record(vec![
1587 ("a".to_string(), Value::Long(42i64)),
1588 ("c".to_string(), Value::String("foo".to_string())),
1589 ]);
1590 assert!(!value.validate(&schema));
1591 assert_logged(
1592 r#"Invalid value: Record([("a", Long(42)), ("c", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Could not find matching type in union"#,
1593 );
1594 assert_not_logged(
1595 r#"Invalid value: String("foo") for schema: Int. Reason: Unsupported value-schema combination"#,
1596 );
1597
1598 let value = Value::Record(vec![
1599 ("a".to_string(), Value::Long(42i64)),
1600 ("d".to_string(), Value::String("foo".to_string())),
1601 ]);
1602 assert!(!value.validate(&schema));
1603 assert_logged(
1604 r#"Invalid value: Record([("a", Long(42)), ("d", String("foo"))]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: There is no schema field for field 'd'"#,
1605 );
1606
1607 let value = Value::Record(vec![
1608 ("a".to_string(), Value::Long(42i64)),
1609 ("b".to_string(), Value::String("foo".to_string())),
1610 ("c".to_string(), Value::Null),
1611 ("d".to_string(), Value::Null),
1612 ]);
1613 assert!(!value.validate(&schema));
1614 assert_logged(
1615 r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null), ("d", Null)]) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: The value's records length (4) is greater than the schema's (3 fields)"#,
1616 );
1617
1618 assert!(
1619 Value::Map(
1620 vec![
1621 ("a".to_string(), Value::Long(42i64)),
1622 ("b".to_string(), Value::String("foo".to_string())),
1623 ]
1624 .into_iter()
1625 .collect()
1626 )
1627 .validate(&schema)
1628 );
1629
1630 assert!(
1631 !Value::Map(
1632 vec![("d".to_string(), Value::Long(123_i64)),]
1633 .into_iter()
1634 .collect()
1635 )
1636 .validate(&schema)
1637 );
1638 assert_logged(
1639 r#"Invalid value: Map({"d": Long(123)}) for schema: Record(RecordSchema { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, aliases: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, aliases: None, default: Some(Null), schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"a": 0, "b": 1, "c": 2}, attributes: {} }). Reason: Field with name '"a"' is not a member of the map items
1640Field with name '"b"' is not a member of the map items"#,
1641 );
1642
1643 let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema])?);
1644
1645 assert!(
1646 Value::Union(
1647 1,
1648 Box::new(Value::Record(vec![
1649 ("a".to_string(), Value::Long(42i64)),
1650 ("b".to_string(), Value::String("foo".to_string())),
1651 ]))
1652 )
1653 .validate(&union_schema)
1654 );
1655
1656 assert!(
1657 Value::Union(
1658 1,
1659 Box::new(Value::Map(
1660 vec![
1661 ("a".to_string(), Value::Long(42i64)),
1662 ("b".to_string(), Value::String("foo".to_string())),
1663 ]
1664 .into_iter()
1665 .collect()
1666 ))
1667 )
1668 .validate(&union_schema)
1669 );
1670
1671 Ok(())
1672 }
1673
1674 #[test]
1675 fn resolve_bytes_ok() -> TestResult {
1676 let value = Value::Array(vec![Value::Int(0), Value::Int(42)]);
1677 assert_eq!(
1678 value.resolve(&Schema::Bytes)?,
1679 Value::Bytes(vec![0u8, 42u8])
1680 );
1681
1682 Ok(())
1683 }
1684
1685 #[test]
1686 fn resolve_string_from_bytes() -> TestResult {
1687 let value = Value::Bytes(vec![97, 98, 99]);
1688 assert_eq!(
1689 value.resolve(&Schema::String)?,
1690 Value::String("abc".to_string())
1691 );
1692
1693 Ok(())
1694 }
1695
1696 #[test]
1697 fn resolve_string_from_fixed() -> TestResult {
1698 let value = Value::Fixed(3, vec![97, 98, 99]);
1699 assert_eq!(
1700 value.resolve(&Schema::String)?,
1701 Value::String("abc".to_string())
1702 );
1703
1704 Ok(())
1705 }
1706
1707 #[test]
1708 fn resolve_bytes_failure() {
1709 let value = Value::Array(vec![Value::Int(2000), Value::Int(-42)]);
1710 assert!(value.resolve(&Schema::Bytes).is_err());
1711 }
1712
1713 #[test]
1714 fn resolve_decimal_bytes() -> TestResult {
1715 let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1716 value.clone().resolve(&Schema::Decimal(DecimalSchema {
1717 precision: 10,
1718 scale: 4,
1719 inner: Box::new(Schema::Bytes),
1720 }))?;
1721 assert!(value.resolve(&Schema::String).is_err());
1722
1723 Ok(())
1724 }
1725
1726 #[test]
1727 fn resolve_decimal_invalid_scale() {
1728 let value = Value::Decimal(Decimal::from(vec![1, 2]));
1729 assert!(
1730 value
1731 .resolve(&Schema::Decimal(DecimalSchema {
1732 precision: 2,
1733 scale: 3,
1734 inner: Box::new(Schema::Bytes),
1735 }))
1736 .is_err()
1737 );
1738 }
1739
1740 #[test]
1741 fn resolve_decimal_invalid_precision_for_length() {
1742 let value = Value::Decimal(Decimal::from((1u8..=8u8).rev().collect::<Vec<_>>()));
1743 assert!(
1744 value
1745 .resolve(&Schema::Decimal(DecimalSchema {
1746 precision: 1,
1747 scale: 0,
1748 inner: Box::new(Schema::Bytes),
1749 }))
1750 .is_ok()
1751 );
1752 }
1753
1754 #[test]
1755 fn resolve_decimal_fixed() {
1756 let value = Value::Decimal(Decimal::from(vec![1, 2, 3, 4, 5]));
1757 assert!(
1758 value
1759 .clone()
1760 .resolve(&Schema::Decimal(DecimalSchema {
1761 precision: 10,
1762 scale: 1,
1763 inner: Box::new(Schema::Fixed(FixedSchema {
1764 name: Name::new("decimal").unwrap(),
1765 aliases: None,
1766 size: 20,
1767 doc: None,
1768 default: None,
1769 attributes: Default::default(),
1770 }))
1771 }))
1772 .is_ok()
1773 );
1774 assert!(value.resolve(&Schema::String).is_err());
1775 }
1776
1777 #[test]
1778 fn resolve_date() {
1779 let value = Value::Date(2345);
1780 assert!(value.clone().resolve(&Schema::Date).is_ok());
1781 assert!(value.resolve(&Schema::String).is_err());
1782 }
1783
1784 #[test]
1785 fn resolve_time_millis() {
1786 let value = Value::TimeMillis(10);
1787 assert!(value.clone().resolve(&Schema::TimeMillis).is_ok());
1788 assert!(value.resolve(&Schema::TimeMicros).is_err());
1789 }
1790
1791 #[test]
1792 fn resolve_time_micros() {
1793 let value = Value::TimeMicros(10);
1794 assert!(value.clone().resolve(&Schema::TimeMicros).is_ok());
1795 assert!(value.resolve(&Schema::TimeMillis).is_err());
1796 }
1797
1798 #[test]
1799 fn resolve_timestamp_millis() {
1800 let value = Value::TimestampMillis(10);
1801 assert!(value.clone().resolve(&Schema::TimestampMillis).is_ok());
1802 assert!(value.resolve(&Schema::Float).is_err());
1803
1804 let value = Value::Float(10.0f32);
1805 assert!(value.resolve(&Schema::TimestampMillis).is_err());
1806 }
1807
1808 #[test]
1809 fn resolve_timestamp_micros() {
1810 let value = Value::TimestampMicros(10);
1811 assert!(value.clone().resolve(&Schema::TimestampMicros).is_ok());
1812 assert!(value.resolve(&Schema::Int).is_err());
1813
1814 let value = Value::Double(10.0);
1815 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1816 }
1817
1818 #[test]
1819 fn test_avro_3914_resolve_timestamp_nanos() {
1820 let value = Value::TimestampNanos(10);
1821 assert!(value.clone().resolve(&Schema::TimestampNanos).is_ok());
1822 assert!(value.resolve(&Schema::Int).is_err());
1823
1824 let value = Value::Double(10.0);
1825 assert!(value.resolve(&Schema::TimestampNanos).is_err());
1826 }
1827
1828 #[test]
1829 fn test_avro_3853_resolve_timestamp_millis() {
1830 let value = Value::LocalTimestampMillis(10);
1831 assert!(value.clone().resolve(&Schema::LocalTimestampMillis).is_ok());
1832 assert!(value.resolve(&Schema::Float).is_err());
1833
1834 let value = Value::Float(10.0f32);
1835 assert!(value.resolve(&Schema::LocalTimestampMillis).is_err());
1836 }
1837
1838 #[test]
1839 fn test_avro_3853_resolve_timestamp_micros() {
1840 let value = Value::LocalTimestampMicros(10);
1841 assert!(value.clone().resolve(&Schema::LocalTimestampMicros).is_ok());
1842 assert!(value.resolve(&Schema::Int).is_err());
1843
1844 let value = Value::Double(10.0);
1845 assert!(value.resolve(&Schema::LocalTimestampMicros).is_err());
1846 }
1847
1848 #[test]
1849 fn test_avro_3916_resolve_timestamp_nanos() {
1850 let value = Value::LocalTimestampNanos(10);
1851 assert!(value.clone().resolve(&Schema::LocalTimestampNanos).is_ok());
1852 assert!(value.resolve(&Schema::Int).is_err());
1853
1854 let value = Value::Double(10.0);
1855 assert!(value.resolve(&Schema::LocalTimestampNanos).is_err());
1856 }
1857
1858 #[test]
1859 fn resolve_duration() {
1860 let value = Value::Duration(Duration::new(
1861 Months::new(10),
1862 Days::new(5),
1863 Millis::new(3000),
1864 ));
1865 assert!(value.clone().resolve(&Schema::Duration).is_ok());
1866 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1867 assert!(Value::Long(1i64).resolve(&Schema::Duration).is_err());
1868 }
1869
1870 #[test]
1871 fn resolve_uuid() -> TestResult {
1872 let value = Value::Uuid(Uuid::parse_str("1481531d-ccc9-46d9-a56f-5b67459c0537")?);
1873 assert!(value.clone().resolve(&Schema::Uuid).is_ok());
1874 assert!(value.resolve(&Schema::TimestampMicros).is_err());
1875
1876 Ok(())
1877 }
1878
1879 #[test]
1880 fn avro_3678_resolve_float_to_double() {
1881 let value = Value::Float(2345.1);
1882 assert!(value.resolve(&Schema::Double).is_ok());
1883 }
1884
1885 #[test]
1886 fn test_avro_3621_resolve_to_nullable_union() -> TestResult {
1887 let schema = Schema::parse_str(
1888 r#"{
1889 "type": "record",
1890 "name": "root",
1891 "fields": [
1892 {
1893 "name": "event",
1894 "type": [
1895 "null",
1896 {
1897 "type": "record",
1898 "name": "event",
1899 "fields": [
1900 {
1901 "name": "amount",
1902 "type": "int"
1903 },
1904 {
1905 "name": "size",
1906 "type": [
1907 "null",
1908 "int"
1909 ],
1910 "default": null
1911 }
1912 ]
1913 }
1914 ],
1915 "default": null
1916 }
1917 ]
1918 }"#,
1919 )?;
1920
1921 let value = Value::Record(vec![(
1922 "event".to_string(),
1923 Value::Record(vec![("amount".to_string(), Value::Int(200))]),
1924 )]);
1925 assert!(value.resolve(&schema).is_ok());
1926
1927 let value = Value::Record(vec![(
1928 "event".to_string(),
1929 Value::Record(vec![("size".to_string(), Value::Int(1))]),
1930 )]);
1931 assert!(value.resolve(&schema).is_err());
1932
1933 Ok(())
1934 }
1935
1936 #[test]
1937 fn json_from_avro() -> TestResult {
1938 assert_eq!(JsonValue::try_from(Value::Null)?, JsonValue::Null);
1939 assert_eq!(
1940 JsonValue::try_from(Value::Boolean(true))?,
1941 JsonValue::Bool(true)
1942 );
1943 assert_eq!(
1944 JsonValue::try_from(Value::Int(1))?,
1945 JsonValue::Number(1.into())
1946 );
1947 assert_eq!(
1948 JsonValue::try_from(Value::Long(1))?,
1949 JsonValue::Number(1.into())
1950 );
1951 assert_eq!(
1952 JsonValue::try_from(Value::Float(1.0))?,
1953 JsonValue::Number(Number::from_f64(1.0).unwrap())
1954 );
1955 assert_eq!(
1956 JsonValue::try_from(Value::Double(1.0))?,
1957 JsonValue::Number(Number::from_f64(1.0).unwrap())
1958 );
1959 assert_eq!(
1960 JsonValue::try_from(Value::Bytes(vec![1, 2, 3]))?,
1961 JsonValue::Array(vec![
1962 JsonValue::Number(1.into()),
1963 JsonValue::Number(2.into()),
1964 JsonValue::Number(3.into())
1965 ])
1966 );
1967 assert_eq!(
1968 JsonValue::try_from(Value::String("test".into()))?,
1969 JsonValue::String("test".into())
1970 );
1971 assert_eq!(
1972 JsonValue::try_from(Value::Fixed(3, vec![1, 2, 3]))?,
1973 JsonValue::Array(vec![
1974 JsonValue::Number(1.into()),
1975 JsonValue::Number(2.into()),
1976 JsonValue::Number(3.into())
1977 ])
1978 );
1979 assert_eq!(
1980 JsonValue::try_from(Value::Enum(1, "test_enum".into()))?,
1981 JsonValue::String("test_enum".into())
1982 );
1983 assert_eq!(
1984 JsonValue::try_from(Value::Union(1, Box::new(Value::String("test_enum".into()))))?,
1985 JsonValue::String("test_enum".into())
1986 );
1987 assert_eq!(
1988 JsonValue::try_from(Value::Array(vec![
1989 Value::Int(1),
1990 Value::Int(2),
1991 Value::Int(3)
1992 ]))?,
1993 JsonValue::Array(vec![
1994 JsonValue::Number(1.into()),
1995 JsonValue::Number(2.into()),
1996 JsonValue::Number(3.into())
1997 ])
1998 );
1999 assert_eq!(
2000 JsonValue::try_from(Value::Map(
2001 vec![
2002 ("v1".to_string(), Value::Int(1)),
2003 ("v2".to_string(), Value::Int(2)),
2004 ("v3".to_string(), Value::Int(3))
2005 ]
2006 .into_iter()
2007 .collect()
2008 ))?,
2009 JsonValue::Object(
2010 vec![
2011 ("v1".to_string(), JsonValue::Number(1.into())),
2012 ("v2".to_string(), JsonValue::Number(2.into())),
2013 ("v3".to_string(), JsonValue::Number(3.into()))
2014 ]
2015 .into_iter()
2016 .collect()
2017 )
2018 );
2019 assert_eq!(
2020 JsonValue::try_from(Value::Record(vec![
2021 ("v1".to_string(), Value::Int(1)),
2022 ("v2".to_string(), Value::Int(2)),
2023 ("v3".to_string(), Value::Int(3))
2024 ]))?,
2025 JsonValue::Object(
2026 vec![
2027 ("v1".to_string(), JsonValue::Number(1.into())),
2028 ("v2".to_string(), JsonValue::Number(2.into())),
2029 ("v3".to_string(), JsonValue::Number(3.into()))
2030 ]
2031 .into_iter()
2032 .collect()
2033 )
2034 );
2035 assert_eq!(
2036 JsonValue::try_from(Value::Date(1))?,
2037 JsonValue::Number(1.into())
2038 );
2039 assert_eq!(
2040 JsonValue::try_from(Value::Decimal(vec![1, 2, 3].into()))?,
2041 JsonValue::Array(vec![
2042 JsonValue::Number(1.into()),
2043 JsonValue::Number(2.into()),
2044 JsonValue::Number(3.into())
2045 ])
2046 );
2047 assert_eq!(
2048 JsonValue::try_from(Value::TimeMillis(1))?,
2049 JsonValue::Number(1.into())
2050 );
2051 assert_eq!(
2052 JsonValue::try_from(Value::TimeMicros(1))?,
2053 JsonValue::Number(1.into())
2054 );
2055 assert_eq!(
2056 JsonValue::try_from(Value::TimestampMillis(1))?,
2057 JsonValue::Number(1.into())
2058 );
2059 assert_eq!(
2060 JsonValue::try_from(Value::TimestampMicros(1))?,
2061 JsonValue::Number(1.into())
2062 );
2063 assert_eq!(
2064 JsonValue::try_from(Value::TimestampNanos(1))?,
2065 JsonValue::Number(1.into())
2066 );
2067 assert_eq!(
2068 JsonValue::try_from(Value::LocalTimestampMillis(1))?,
2069 JsonValue::Number(1.into())
2070 );
2071 assert_eq!(
2072 JsonValue::try_from(Value::LocalTimestampMicros(1))?,
2073 JsonValue::Number(1.into())
2074 );
2075 assert_eq!(
2076 JsonValue::try_from(Value::LocalTimestampNanos(1))?,
2077 JsonValue::Number(1.into())
2078 );
2079 assert_eq!(
2080 JsonValue::try_from(Value::Duration(
2081 [
2082 1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8
2083 ]
2084 .into()
2085 ))?,
2086 JsonValue::Array(vec![
2087 JsonValue::Number(1.into()),
2088 JsonValue::Number(2.into()),
2089 JsonValue::Number(3.into()),
2090 JsonValue::Number(4.into()),
2091 JsonValue::Number(5.into()),
2092 JsonValue::Number(6.into()),
2093 JsonValue::Number(7.into()),
2094 JsonValue::Number(8.into()),
2095 JsonValue::Number(9.into()),
2096 JsonValue::Number(10.into()),
2097 JsonValue::Number(11.into()),
2098 JsonValue::Number(12.into()),
2099 ])
2100 );
2101 assert_eq!(
2102 JsonValue::try_from(Value::Uuid(Uuid::parse_str(
2103 "936DA01F-9ABD-4D9D-80C7-02AF85C822A8"
2104 )?))?,
2105 JsonValue::String("936da01f-9abd-4d9d-80c7-02af85c822a8".into())
2106 );
2107
2108 Ok(())
2109 }
2110
2111 #[test]
2112 fn test_avro_3433_recursive_resolves_record() -> TestResult {
2113 let schema = Schema::parse_str(
2114 r#"
2115 {
2116 "type":"record",
2117 "name":"TestStruct",
2118 "fields": [
2119 {
2120 "name":"a",
2121 "type":{
2122 "type":"record",
2123 "name": "Inner",
2124 "fields": [ {
2125 "name":"z",
2126 "type":"int"
2127 }]
2128 }
2129 },
2130 {
2131 "name":"b",
2132 "type":"Inner"
2133 }
2134 ]
2135 }"#,
2136 )?;
2137
2138 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2139 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2140 let outer = Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2141 outer
2142 .resolve(&schema)
2143 .expect("Record definition defined in one field must be available in other field");
2144
2145 Ok(())
2146 }
2147
2148 #[test]
2149 fn test_avro_3433_recursive_resolves_array() -> TestResult {
2150 let schema = Schema::parse_str(
2151 r#"
2152 {
2153 "type":"record",
2154 "name":"TestStruct",
2155 "fields": [
2156 {
2157 "name":"a",
2158 "type":{
2159 "type":"array",
2160 "items": {
2161 "type":"record",
2162 "name": "Inner",
2163 "fields": [ {
2164 "name":"z",
2165 "type":"int"
2166 }]
2167 }
2168 }
2169 },
2170 {
2171 "name":"b",
2172 "type": {
2173 "type":"map",
2174 "values":"Inner"
2175 }
2176 }
2177 ]
2178 }"#,
2179 )?;
2180
2181 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2182 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2183 let outer_value = Value::Record(vec![
2184 ("a".into(), Value::Array(vec![inner_value1])),
2185 (
2186 "b".into(),
2187 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2188 ),
2189 ]);
2190 outer_value
2191 .resolve(&schema)
2192 .expect("Record defined in array definition must be resolvable from map");
2193
2194 Ok(())
2195 }
2196
2197 #[test]
2198 fn test_avro_3433_recursive_resolves_map() -> TestResult {
2199 let schema = Schema::parse_str(
2200 r#"
2201 {
2202 "type":"record",
2203 "name":"TestStruct",
2204 "fields": [
2205 {
2206 "name":"a",
2207 "type":{
2208 "type":"record",
2209 "name": "Inner",
2210 "fields": [ {
2211 "name":"z",
2212 "type":"int"
2213 }]
2214 }
2215 },
2216 {
2217 "name":"b",
2218 "type": {
2219 "type":"map",
2220 "values":"Inner"
2221 }
2222 }
2223 ]
2224 }"#,
2225 )?;
2226
2227 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2228 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2229 let outer_value = Value::Record(vec![
2230 ("a".into(), inner_value1),
2231 (
2232 "b".into(),
2233 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2234 ),
2235 ]);
2236 outer_value
2237 .resolve(&schema)
2238 .expect("Record defined in record field must be resolvable from map field");
2239
2240 Ok(())
2241 }
2242
2243 #[test]
2244 fn test_avro_3433_recursive_resolves_record_wrapper() -> TestResult {
2245 let schema = Schema::parse_str(
2246 r#"
2247 {
2248 "type":"record",
2249 "name":"TestStruct",
2250 "fields": [
2251 {
2252 "name":"a",
2253 "type":{
2254 "type":"record",
2255 "name": "Inner",
2256 "fields": [ {
2257 "name":"z",
2258 "type":"int"
2259 }]
2260 }
2261 },
2262 {
2263 "name":"b",
2264 "type": {
2265 "type":"record",
2266 "name": "InnerWrapper",
2267 "fields": [ {
2268 "name":"j",
2269 "type":"Inner"
2270 }]
2271 }
2272 }
2273 ]
2274 }"#,
2275 )?;
2276
2277 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2278 let inner_value2 = Value::Record(vec![(
2279 "j".into(),
2280 Value::Record(vec![("z".into(), Value::Int(6))]),
2281 )]);
2282 let outer_value =
2283 Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
2284 outer_value.resolve(&schema).expect("Record schema defined in field must be resolvable in Record schema defined in other field");
2285
2286 Ok(())
2287 }
2288
2289 #[test]
2290 fn test_avro_3433_recursive_resolves_map_and_array() -> TestResult {
2291 let schema = Schema::parse_str(
2292 r#"
2293 {
2294 "type":"record",
2295 "name":"TestStruct",
2296 "fields": [
2297 {
2298 "name":"a",
2299 "type":{
2300 "type":"map",
2301 "values": {
2302 "type":"record",
2303 "name": "Inner",
2304 "fields": [ {
2305 "name":"z",
2306 "type":"int"
2307 }]
2308 }
2309 }
2310 },
2311 {
2312 "name":"b",
2313 "type": {
2314 "type":"array",
2315 "items":"Inner"
2316 }
2317 }
2318 ]
2319 }"#,
2320 )?;
2321
2322 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2323 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2324 let outer_value = Value::Record(vec![
2325 (
2326 "a".into(),
2327 Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
2328 ),
2329 ("b".into(), Value::Array(vec![inner_value1])),
2330 ]);
2331 outer_value
2332 .resolve(&schema)
2333 .expect("Record defined in map definition must be resolvable from array");
2334
2335 Ok(())
2336 }
2337
2338 #[test]
2339 fn test_avro_3433_recursive_resolves_union() -> TestResult {
2340 let schema = Schema::parse_str(
2341 r#"
2342 {
2343 "type":"record",
2344 "name":"TestStruct",
2345 "fields": [
2346 {
2347 "name":"a",
2348 "type":["null", {
2349 "type":"record",
2350 "name": "Inner",
2351 "fields": [ {
2352 "name":"z",
2353 "type":"int"
2354 }]
2355 }]
2356 },
2357 {
2358 "name":"b",
2359 "type":"Inner"
2360 }
2361 ]
2362 }"#,
2363 )?;
2364
2365 let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
2366 let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
2367 let outer1 = Value::Record(vec![
2368 ("a".into(), inner_value1),
2369 ("b".into(), inner_value2.clone()),
2370 ]);
2371 outer1
2372 .resolve(&schema)
2373 .expect("Record definition defined in union must be resolved in other field");
2374 let outer2 = Value::Record(vec![("a".into(), Value::Null), ("b".into(), inner_value2)]);
2375 outer2
2376 .resolve(&schema)
2377 .expect("Record definition defined in union must be resolved in other field");
2378
2379 Ok(())
2380 }
2381
2382 #[test]
2383 fn test_avro_3461_test_multi_level_resolve_outer_namespace() -> TestResult {
2384 let schema = r#"
2385 {
2386 "name": "record_name",
2387 "namespace": "space",
2388 "type": "record",
2389 "fields": [
2390 {
2391 "name": "outer_field_1",
2392 "type": [
2393 "null",
2394 {
2395 "type": "record",
2396 "name": "middle_record_name",
2397 "fields":[
2398 {
2399 "name":"middle_field_1",
2400 "type":[
2401 "null",
2402 {
2403 "type":"record",
2404 "name":"inner_record_name",
2405 "fields":[
2406 {
2407 "name":"inner_field_1",
2408 "type":"double"
2409 }
2410 ]
2411 }
2412 ]
2413 }
2414 ]
2415 }
2416 ]
2417 },
2418 {
2419 "name": "outer_field_2",
2420 "type" : "space.inner_record_name"
2421 }
2422 ]
2423 }
2424 "#;
2425 let schema = Schema::parse_str(schema)?;
2426 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2427 let middle_record_variation_1 = Value::Record(vec![(
2428 "middle_field_1".into(),
2429 Value::Union(0, Box::new(Value::Null)),
2430 )]);
2431 let middle_record_variation_2 = Value::Record(vec![(
2432 "middle_field_1".into(),
2433 Value::Union(1, Box::new(inner_record.clone())),
2434 )]);
2435 let outer_record_variation_1 = Value::Record(vec![
2436 (
2437 "outer_field_1".into(),
2438 Value::Union(0, Box::new(Value::Null)),
2439 ),
2440 ("outer_field_2".into(), inner_record.clone()),
2441 ]);
2442 let outer_record_variation_2 = Value::Record(vec![
2443 (
2444 "outer_field_1".into(),
2445 Value::Union(1, Box::new(middle_record_variation_1)),
2446 ),
2447 ("outer_field_2".into(), inner_record.clone()),
2448 ]);
2449 let outer_record_variation_3 = Value::Record(vec![
2450 (
2451 "outer_field_1".into(),
2452 Value::Union(1, Box::new(middle_record_variation_2)),
2453 ),
2454 ("outer_field_2".into(), inner_record),
2455 ]);
2456
2457 outer_record_variation_1
2458 .resolve(&schema)
2459 .expect("Should be able to resolve value to the schema that is it's definition");
2460 outer_record_variation_2
2461 .resolve(&schema)
2462 .expect("Should be able to resolve value to the schema that is it's definition");
2463 outer_record_variation_3
2464 .resolve(&schema)
2465 .expect("Should be able to resolve value to the schema that is it's definition");
2466
2467 Ok(())
2468 }
2469
2470 #[test]
2471 fn test_avro_3461_test_multi_level_resolve_middle_namespace() -> TestResult {
2472 let schema = r#"
2473 {
2474 "name": "record_name",
2475 "namespace": "space",
2476 "type": "record",
2477 "fields": [
2478 {
2479 "name": "outer_field_1",
2480 "type": [
2481 "null",
2482 {
2483 "type": "record",
2484 "name": "middle_record_name",
2485 "namespace":"middle_namespace",
2486 "fields":[
2487 {
2488 "name":"middle_field_1",
2489 "type":[
2490 "null",
2491 {
2492 "type":"record",
2493 "name":"inner_record_name",
2494 "fields":[
2495 {
2496 "name":"inner_field_1",
2497 "type":"double"
2498 }
2499 ]
2500 }
2501 ]
2502 }
2503 ]
2504 }
2505 ]
2506 },
2507 {
2508 "name": "outer_field_2",
2509 "type" : "middle_namespace.inner_record_name"
2510 }
2511 ]
2512 }
2513 "#;
2514 let schema = Schema::parse_str(schema)?;
2515 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2516 let middle_record_variation_1 = Value::Record(vec![(
2517 "middle_field_1".into(),
2518 Value::Union(0, Box::new(Value::Null)),
2519 )]);
2520 let middle_record_variation_2 = Value::Record(vec![(
2521 "middle_field_1".into(),
2522 Value::Union(1, Box::new(inner_record.clone())),
2523 )]);
2524 let outer_record_variation_1 = Value::Record(vec![
2525 (
2526 "outer_field_1".into(),
2527 Value::Union(0, Box::new(Value::Null)),
2528 ),
2529 ("outer_field_2".into(), inner_record.clone()),
2530 ]);
2531 let outer_record_variation_2 = Value::Record(vec![
2532 (
2533 "outer_field_1".into(),
2534 Value::Union(1, Box::new(middle_record_variation_1)),
2535 ),
2536 ("outer_field_2".into(), inner_record.clone()),
2537 ]);
2538 let outer_record_variation_3 = Value::Record(vec![
2539 (
2540 "outer_field_1".into(),
2541 Value::Union(1, Box::new(middle_record_variation_2)),
2542 ),
2543 ("outer_field_2".into(), inner_record),
2544 ]);
2545
2546 outer_record_variation_1
2547 .resolve(&schema)
2548 .expect("Should be able to resolve value to the schema that is it's definition");
2549 outer_record_variation_2
2550 .resolve(&schema)
2551 .expect("Should be able to resolve value to the schema that is it's definition");
2552 outer_record_variation_3
2553 .resolve(&schema)
2554 .expect("Should be able to resolve value to the schema that is it's definition");
2555
2556 Ok(())
2557 }
2558
2559 #[test]
2560 fn test_avro_3461_test_multi_level_resolve_inner_namespace() -> TestResult {
2561 let schema = r#"
2562 {
2563 "name": "record_name",
2564 "namespace": "space",
2565 "type": "record",
2566 "fields": [
2567 {
2568 "name": "outer_field_1",
2569 "type": [
2570 "null",
2571 {
2572 "type": "record",
2573 "name": "middle_record_name",
2574 "namespace":"middle_namespace",
2575 "fields":[
2576 {
2577 "name":"middle_field_1",
2578 "type":[
2579 "null",
2580 {
2581 "type":"record",
2582 "name":"inner_record_name",
2583 "namespace":"inner_namespace",
2584 "fields":[
2585 {
2586 "name":"inner_field_1",
2587 "type":"double"
2588 }
2589 ]
2590 }
2591 ]
2592 }
2593 ]
2594 }
2595 ]
2596 },
2597 {
2598 "name": "outer_field_2",
2599 "type" : "inner_namespace.inner_record_name"
2600 }
2601 ]
2602 }
2603 "#;
2604 let schema = Schema::parse_str(schema)?;
2605
2606 let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
2607 let middle_record_variation_1 = Value::Record(vec![(
2608 "middle_field_1".into(),
2609 Value::Union(0, Box::new(Value::Null)),
2610 )]);
2611 let middle_record_variation_2 = Value::Record(vec![(
2612 "middle_field_1".into(),
2613 Value::Union(1, Box::new(inner_record.clone())),
2614 )]);
2615 let outer_record_variation_1 = Value::Record(vec![
2616 (
2617 "outer_field_1".into(),
2618 Value::Union(0, Box::new(Value::Null)),
2619 ),
2620 ("outer_field_2".into(), inner_record.clone()),
2621 ]);
2622 let outer_record_variation_2 = Value::Record(vec![
2623 (
2624 "outer_field_1".into(),
2625 Value::Union(1, Box::new(middle_record_variation_1)),
2626 ),
2627 ("outer_field_2".into(), inner_record.clone()),
2628 ]);
2629 let outer_record_variation_3 = Value::Record(vec![
2630 (
2631 "outer_field_1".into(),
2632 Value::Union(1, Box::new(middle_record_variation_2)),
2633 ),
2634 ("outer_field_2".into(), inner_record),
2635 ]);
2636
2637 outer_record_variation_1
2638 .resolve(&schema)
2639 .expect("Should be able to resolve value to the schema that is it's definition");
2640 outer_record_variation_2
2641 .resolve(&schema)
2642 .expect("Should be able to resolve value to the schema that is it's definition");
2643 outer_record_variation_3
2644 .resolve(&schema)
2645 .expect("Should be able to resolve value to the schema that is it's definition");
2646
2647 Ok(())
2648 }
2649
2650 #[test]
2651 fn test_avro_3460_validation_with_refs() -> TestResult {
2652 let schema = Schema::parse_str(
2653 r#"
2654 {
2655 "type":"record",
2656 "name":"TestStruct",
2657 "fields": [
2658 {
2659 "name":"a",
2660 "type":{
2661 "type":"record",
2662 "name": "Inner",
2663 "fields": [ {
2664 "name":"z",
2665 "type":"int"
2666 }]
2667 }
2668 },
2669 {
2670 "name":"b",
2671 "type":"Inner"
2672 }
2673 ]
2674 }"#,
2675 )?;
2676
2677 let inner_value_right = Value::Record(vec![("z".into(), Value::Int(3))]);
2678 let inner_value_wrong1 = Value::Record(vec![("z".into(), Value::Null)]);
2679 let inner_value_wrong2 = Value::Record(vec![("a".into(), Value::String("testing".into()))]);
2680 let outer1 = Value::Record(vec![
2681 ("a".into(), inner_value_right.clone()),
2682 ("b".into(), inner_value_wrong1),
2683 ]);
2684
2685 let outer2 = Value::Record(vec![
2686 ("a".into(), inner_value_right),
2687 ("b".into(), inner_value_wrong2),
2688 ]);
2689
2690 assert!(
2691 !outer1.validate(&schema),
2692 "field b record is invalid against the schema"
2693 ); assert!(
2695 !outer2.validate(&schema),
2696 "field b record is invalid against the schema"
2697 ); Ok(())
2700 }
2701
2702 #[test]
2703 fn test_avro_3460_validation_with_refs_real_struct() -> TestResult {
2704 use crate::ser::Serializer;
2705 use serde::Serialize;
2706
2707 #[derive(Serialize, Clone)]
2708 struct TestInner {
2709 z: i32,
2710 }
2711
2712 #[derive(Serialize)]
2713 struct TestRefSchemaStruct1 {
2714 a: TestInner,
2715 b: String, }
2717
2718 #[derive(Serialize)]
2719 struct TestRefSchemaStruct2 {
2720 a: TestInner,
2721 b: i32, }
2723
2724 #[derive(Serialize)]
2725 struct TestRefSchemaStruct3 {
2726 a: TestInner,
2727 b: Option<TestInner>, }
2729
2730 let schema = Schema::parse_str(
2731 r#"
2732 {
2733 "type":"record",
2734 "name":"TestStruct",
2735 "fields": [
2736 {
2737 "name":"a",
2738 "type":{
2739 "type":"record",
2740 "name": "Inner",
2741 "fields": [ {
2742 "name":"z",
2743 "type":"int"
2744 }]
2745 }
2746 },
2747 {
2748 "name":"b",
2749 "type":"Inner"
2750 }
2751 ]
2752 }"#,
2753 )?;
2754
2755 let test_inner = TestInner { z: 3 };
2756 let test_outer1 = TestRefSchemaStruct1 {
2757 a: test_inner.clone(),
2758 b: "testing".into(),
2759 };
2760 let test_outer2 = TestRefSchemaStruct2 {
2761 a: test_inner.clone(),
2762 b: 24,
2763 };
2764 let test_outer3 = TestRefSchemaStruct3 {
2765 a: test_inner,
2766 b: None,
2767 };
2768
2769 let mut ser = Serializer::default();
2770 let test_outer1: Value = test_outer1.serialize(&mut ser)?;
2771 let mut ser = Serializer::default();
2772 let test_outer2: Value = test_outer2.serialize(&mut ser)?;
2773 let mut ser = Serializer::default();
2774 let test_outer3: Value = test_outer3.serialize(&mut ser)?;
2775
2776 assert!(
2777 !test_outer1.validate(&schema),
2778 "field b record is invalid against the schema"
2779 );
2780 assert!(
2781 !test_outer2.validate(&schema),
2782 "field b record is invalid against the schema"
2783 );
2784 assert!(
2785 !test_outer3.validate(&schema),
2786 "field b record is invalid against the schema"
2787 );
2788
2789 Ok(())
2790 }
2791
2792 fn avro_3674_with_or_without_namespace(with_namespace: bool) -> TestResult {
2793 use crate::ser::Serializer;
2794 use serde::Serialize;
2795
2796 let schema_str = r#"
2797 {
2798 "type": "record",
2799 "name": "NamespacedMessage",
2800 [NAMESPACE]
2801 "fields": [
2802 {
2803 "name": "field_a",
2804 "type": {
2805 "type": "record",
2806 "name": "NestedMessage",
2807 "fields": [
2808 {
2809 "name": "enum_a",
2810 "type": {
2811 "type": "enum",
2812 "name": "EnumType",
2813 "symbols": ["SYMBOL_1", "SYMBOL_2"],
2814 "default": "SYMBOL_1"
2815 }
2816 },
2817 {
2818 "name": "enum_b",
2819 "type": "EnumType"
2820 }
2821 ]
2822 }
2823 }
2824 ]
2825 }
2826 "#;
2827 let schema_str = schema_str.replace(
2828 "[NAMESPACE]",
2829 if with_namespace {
2830 r#""namespace": "com.domain","#
2831 } else {
2832 ""
2833 },
2834 );
2835
2836 let schema = Schema::parse_str(&schema_str)?;
2837
2838 #[derive(Serialize)]
2839 enum EnumType {
2840 #[serde(rename = "SYMBOL_1")]
2841 Symbol1,
2842 #[serde(rename = "SYMBOL_2")]
2843 Symbol2,
2844 }
2845
2846 #[derive(Serialize)]
2847 struct FieldA {
2848 enum_a: EnumType,
2849 enum_b: EnumType,
2850 }
2851
2852 #[derive(Serialize)]
2853 struct NamespacedMessage {
2854 field_a: FieldA,
2855 }
2856
2857 let msg = NamespacedMessage {
2858 field_a: FieldA {
2859 enum_a: EnumType::Symbol2,
2860 enum_b: EnumType::Symbol1,
2861 },
2862 };
2863
2864 let mut ser = Serializer::default();
2865 let test_value: Value = msg.serialize(&mut ser)?;
2866 assert!(test_value.validate(&schema), "test_value should validate");
2867 assert!(
2868 test_value.resolve(&schema).is_ok(),
2869 "test_value should resolve"
2870 );
2871
2872 Ok(())
2873 }
2874
2875 #[test]
2876 fn test_avro_3674_validate_no_namespace_resolution() -> TestResult {
2877 avro_3674_with_or_without_namespace(false)
2878 }
2879
2880 #[test]
2881 fn test_avro_3674_validate_with_namespace_resolution() -> TestResult {
2882 avro_3674_with_or_without_namespace(true)
2883 }
2884
2885 fn avro_3688_schema_resolution_panic(set_field_b: bool) -> TestResult {
2886 use crate::ser::Serializer;
2887 use serde::{Deserialize, Serialize};
2888
2889 let schema_str = r#"{
2890 "type": "record",
2891 "name": "Message",
2892 "fields": [
2893 {
2894 "name": "field_a",
2895 "type": [
2896 "null",
2897 {
2898 "name": "Inner",
2899 "type": "record",
2900 "fields": [
2901 {
2902 "name": "inner_a",
2903 "type": "string"
2904 }
2905 ]
2906 }
2907 ],
2908 "default": null
2909 },
2910 {
2911 "name": "field_b",
2912 "type": [
2913 "null",
2914 "Inner"
2915 ],
2916 "default": null
2917 }
2918 ]
2919 }"#;
2920
2921 #[derive(Serialize, Deserialize)]
2922 struct Inner {
2923 inner_a: String,
2924 }
2925
2926 #[derive(Serialize, Deserialize)]
2927 struct Message {
2928 field_a: Option<Inner>,
2929 field_b: Option<Inner>,
2930 }
2931
2932 let schema = Schema::parse_str(schema_str)?;
2933
2934 let msg = Message {
2935 field_a: Some(Inner {
2936 inner_a: "foo".to_string(),
2937 }),
2938 field_b: if set_field_b {
2939 Some(Inner {
2940 inner_a: "bar".to_string(),
2941 })
2942 } else {
2943 None
2944 },
2945 };
2946
2947 let mut ser = Serializer::default();
2948 let test_value: Value = msg.serialize(&mut ser)?;
2949 assert!(test_value.validate(&schema), "test_value should validate");
2950 assert!(
2951 test_value.resolve(&schema).is_ok(),
2952 "test_value should resolve"
2953 );
2954
2955 Ok(())
2956 }
2957
2958 #[test]
2959 fn test_avro_3688_field_b_not_set() -> TestResult {
2960 avro_3688_schema_resolution_panic(false)
2961 }
2962
2963 #[test]
2964 fn test_avro_3688_field_b_set() -> TestResult {
2965 avro_3688_schema_resolution_panic(true)
2966 }
2967
2968 #[test]
2969 fn test_avro_3764_use_resolve_schemata() -> TestResult {
2970 let referenced_schema =
2971 r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
2972 let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": "enumForReference"}]}"#;
2973
2974 let value: serde_json::Value = serde_json::from_str(
2975 r#"
2976 {
2977 "reference": "A"
2978 }
2979 "#,
2980 )?;
2981
2982 let avro_value = Value::from(value);
2983
2984 let schemas = Schema::parse_list([main_schema, referenced_schema])?;
2985
2986 let main_schema = schemas.first().unwrap();
2987 let schemata: Vec<_> = schemas.iter().skip(1).collect();
2988
2989 let resolve_result = avro_value.clone().resolve_schemata(main_schema, schemata);
2990
2991 assert!(
2992 resolve_result.is_ok(),
2993 "result of resolving with schemata should be ok, got: {resolve_result:?}"
2994 );
2995
2996 let resolve_result = avro_value.resolve(main_schema);
2997 assert!(
2998 resolve_result.is_err(),
2999 "result of resolving without schemata should be err, got: {resolve_result:?}"
3000 );
3001
3002 Ok(())
3003 }
3004
3005 #[test]
3006 fn test_avro_3767_union_resolve_complex_refs() -> TestResult {
3007 let referenced_enum =
3008 r#"{"name": "enumForReference", "type": "enum", "symbols": ["A", "B"]}"#;
3009 let referenced_record = r#"{"name": "recordForReference", "type": "record", "fields": [{"name": "refInRecord", "type": "enumForReference"}]}"#;
3010 let main_schema = r#"{"name": "recordWithReference", "type": "record", "fields": [{"name": "reference", "type": ["null", "recordForReference"]}]}"#;
3011
3012 let value: serde_json::Value = serde_json::from_str(
3013 r#"
3014 {
3015 "reference": {
3016 "refInRecord": "A"
3017 }
3018 }
3019 "#,
3020 )?;
3021
3022 let avro_value = Value::from(value);
3023
3024 let schemata = Schema::parse_list([referenced_enum, referenced_record, main_schema])?;
3025
3026 let main_schema = schemata.last().unwrap();
3027 let other_schemata: Vec<&Schema> = schemata.iter().take(2).collect();
3028
3029 let resolve_result = avro_value.resolve_schemata(main_schema, other_schemata);
3030
3031 assert!(
3032 resolve_result.is_ok(),
3033 "result of resolving with schemata should be ok, got: {resolve_result:?}"
3034 );
3035
3036 assert!(
3037 resolve_result?.validate_schemata(schemata.iter().collect()),
3038 "result of validation with schemata should be true"
3039 );
3040
3041 Ok(())
3042 }
3043
3044 #[test]
3045 fn test_avro_3782_incorrect_decimal_resolving() -> TestResult {
3046 let schema = r#"{"name": "decimalSchema", "logicalType": "decimal", "type": "fixed", "precision": 8, "scale": 0, "size": 8}"#;
3047
3048 let avro_value = Value::Decimal(Decimal::from(
3049 BigInt::from(12345678u32).to_signed_bytes_be(),
3050 ));
3051 let schema = Schema::parse_str(schema)?;
3052 let resolve_result = avro_value.resolve(&schema);
3053 assert!(
3054 resolve_result.is_ok(),
3055 "resolve result must be ok, got: {resolve_result:?}"
3056 );
3057
3058 Ok(())
3059 }
3060
3061 #[test]
3062 fn test_avro_3779_bigdecimal_resolving() -> TestResult {
3063 let schema =
3064 r#"{"name": "bigDecimalSchema", "logicalType": "big-decimal", "type": "bytes" }"#;
3065
3066 let avro_value = Value::BigDecimal(BigDecimal::from(12345678u32));
3067 let schema = Schema::parse_str(schema)?;
3068 let resolve_result: AvroResult<Value> = avro_value.resolve(&schema);
3069 assert!(
3070 resolve_result.is_ok(),
3071 "resolve result must be ok, got: {resolve_result:?}"
3072 );
3073
3074 Ok(())
3075 }
3076
3077 #[test]
3078 fn test_avro_3892_resolve_fixed_from_bytes() -> TestResult {
3079 let value = Value::Bytes(vec![97, 98, 99]);
3080 assert_eq!(
3081 value.resolve(&Schema::Fixed(FixedSchema {
3082 name: "test".into(),
3083 aliases: None,
3084 doc: None,
3085 size: 3,
3086 default: None,
3087 attributes: Default::default()
3088 }))?,
3089 Value::Fixed(3, vec![97, 98, 99])
3090 );
3091
3092 let value = Value::Bytes(vec![97, 99]);
3093 assert!(
3094 value
3095 .resolve(&Schema::Fixed(FixedSchema {
3096 name: "test".into(),
3097 aliases: None,
3098 doc: None,
3099 size: 3,
3100 default: None,
3101 attributes: Default::default()
3102 }))
3103 .is_err(),
3104 );
3105
3106 let value = Value::Bytes(vec![97, 98, 99, 100]);
3107 assert!(
3108 value
3109 .resolve(&Schema::Fixed(FixedSchema {
3110 name: "test".into(),
3111 aliases: None,
3112 doc: None,
3113 size: 3,
3114 default: None,
3115 attributes: Default::default()
3116 }))
3117 .is_err(),
3118 );
3119
3120 Ok(())
3121 }
3122
3123 #[test]
3124 fn avro_3928_from_serde_value_to_types_value() {
3125 assert_eq!(Value::from(serde_json::Value::Null), Value::Null);
3126 assert_eq!(Value::from(json!(true)), Value::Boolean(true));
3127 assert_eq!(Value::from(json!(false)), Value::Boolean(false));
3128 assert_eq!(Value::from(json!(0)), Value::Int(0));
3129 assert_eq!(Value::from(json!(i32::MIN)), Value::Int(i32::MIN));
3130 assert_eq!(Value::from(json!(i32::MAX)), Value::Int(i32::MAX));
3131 assert_eq!(
3132 Value::from(json!(i32::MIN as i64 - 1)),
3133 Value::Long(i32::MIN as i64 - 1)
3134 );
3135 assert_eq!(
3136 Value::from(json!(i32::MAX as i64 + 1)),
3137 Value::Long(i32::MAX as i64 + 1)
3138 );
3139 assert_eq!(Value::from(json!(1.23)), Value::Double(1.23));
3140 assert_eq!(Value::from(json!(-1.23)), Value::Double(-1.23));
3141 assert_eq!(Value::from(json!(u64::MIN)), Value::Int(u64::MIN as i32));
3142 assert_eq!(Value::from(json!(u64::MAX)), Value::Long(u64::MAX as i64));
3143 assert_eq!(
3144 Value::from(json!("some text")),
3145 Value::String("some text".into())
3146 );
3147 assert_eq!(
3148 Value::from(json!(["text1", "text2", "text3"])),
3149 Value::Array(vec![
3150 Value::String("text1".into()),
3151 Value::String("text2".into()),
3152 Value::String("text3".into())
3153 ])
3154 );
3155 assert_eq!(
3156 Value::from(json!({"key1": "value1", "key2": "value2"})),
3157 Value::Map(
3158 vec![
3159 ("key1".into(), Value::String("value1".into())),
3160 ("key2".into(), Value::String("value2".into()))
3161 ]
3162 .into_iter()
3163 .collect()
3164 )
3165 );
3166 }
3167
3168 #[test]
3169 fn avro_4024_resolve_double_from_unknown_string_err() -> TestResult {
3170 let schema = Schema::parse_str(r#"{"type": "double"}"#)?;
3171 let value = Value::String("unknown".to_owned());
3172 match value.resolve(&schema).map_err(Error::into_details) {
3173 Err(err @ Details::GetDouble(_)) => {
3174 assert_eq!(
3175 format!("{err:?}"),
3176 r#"Expected Value::Double, Value::Float, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: String("unknown")"#
3177 );
3178 }
3179 other => {
3180 panic!("Expected Details::GetDouble, got {other:?}");
3181 }
3182 }
3183 Ok(())
3184 }
3185
3186 #[test]
3187 fn avro_4024_resolve_float_from_unknown_string_err() -> TestResult {
3188 let schema = Schema::parse_str(r#"{"type": "float"}"#)?;
3189 let value = Value::String("unknown".to_owned());
3190 match value.resolve(&schema).map_err(Error::into_details) {
3191 Err(err @ Details::GetFloat(_)) => {
3192 assert_eq!(
3193 format!("{err:?}"),
3194 r#"Expected Value::Float, Value::Double, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: String("unknown")"#
3195 );
3196 }
3197 other => {
3198 panic!("Expected Details::GetFloat, got {other:?}");
3199 }
3200 }
3201 Ok(())
3202 }
3203
3204 #[test]
3205 fn avro_4029_resolve_from_unsupported_err() -> TestResult {
3206 let data: Vec<(&str, Value, &str)> = vec![
3207 (
3208 r#"{ "name": "NAME", "type": "int" }"#,
3209 Value::Float(123_f32),
3210 "Expected Value::Int, got: Float(123.0)",
3211 ),
3212 (
3213 r#"{ "name": "NAME", "type": "fixed", "size": 3 }"#,
3214 Value::Float(123_f32),
3215 "String expected for fixed, got: Float(123.0)",
3216 ),
3217 (
3218 r#"{ "name": "NAME", "type": "bytes" }"#,
3219 Value::Float(123_f32),
3220 "Expected Value::Bytes, got: Float(123.0)",
3221 ),
3222 (
3223 r#"{ "name": "NAME", "type": "string", "logicalType": "uuid" }"#,
3224 Value::String("abc-1234".into()),
3225 "Failed to convert &str to UUID: invalid group count: expected 5, found 2",
3226 ),
3227 (
3228 r#"{ "name": "NAME", "type": "string", "logicalType": "uuid" }"#,
3229 Value::Float(123_f32),
3230 "Expected Value::Uuid, got: Float(123.0)",
3231 ),
3232 (
3233 r#"{ "name": "NAME", "type": "bytes", "logicalType": "big-decimal" }"#,
3234 Value::Float(123_f32),
3235 "Expected Value::BigDecimal, got: Float(123.0)",
3236 ),
3237 (
3238 r#"{ "name": "NAME", "type": "fixed", "size": 12, "logicalType": "duration" }"#,
3239 Value::Float(123_f32),
3240 "Expected Value::Duration or Value::Fixed(12), got: Float(123.0)",
3241 ),
3242 (
3243 r#"{ "name": "NAME", "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3 }"#,
3244 Value::Float(123_f32),
3245 "Expected Value::Decimal, Value::Bytes or Value::Fixed, got: Float(123.0)",
3246 ),
3247 (
3248 r#"{ "name": "NAME", "type": "bytes" }"#,
3249 Value::Array(vec![Value::Long(256_i64)]),
3250 "Unable to convert to u8, got Int(256)",
3251 ),
3252 (
3253 r#"{ "name": "NAME", "type": "int", "logicalType": "date" }"#,
3254 Value::Float(123_f32),
3255 "Expected Value::Date or Value::Int, got: Float(123.0)",
3256 ),
3257 (
3258 r#"{ "name": "NAME", "type": "int", "logicalType": "time-millis" }"#,
3259 Value::Float(123_f32),
3260 "Expected Value::TimeMillis or Value::Int, got: Float(123.0)",
3261 ),
3262 (
3263 r#"{ "name": "NAME", "type": "long", "logicalType": "time-micros" }"#,
3264 Value::Float(123_f32),
3265 "Expected Value::TimeMicros, Value::Long or Value::Int, got: Float(123.0)",
3266 ),
3267 (
3268 r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-millis" }"#,
3269 Value::Float(123_f32),
3270 "Expected Value::TimestampMillis, Value::Long or Value::Int, got: Float(123.0)",
3271 ),
3272 (
3273 r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-micros" }"#,
3274 Value::Float(123_f32),
3275 "Expected Value::TimestampMicros, Value::Long or Value::Int, got: Float(123.0)",
3276 ),
3277 (
3278 r#"{ "name": "NAME", "type": "long", "logicalType": "timestamp-nanos" }"#,
3279 Value::Float(123_f32),
3280 "Expected Value::TimestampNanos, Value::Long or Value::Int, got: Float(123.0)",
3281 ),
3282 (
3283 r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-millis" }"#,
3284 Value::Float(123_f32),
3285 "Expected Value::LocalTimestampMillis, Value::Long or Value::Int, got: Float(123.0)",
3286 ),
3287 (
3288 r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-micros" }"#,
3289 Value::Float(123_f32),
3290 "Expected Value::LocalTimestampMicros, Value::Long or Value::Int, got: Float(123.0)",
3291 ),
3292 (
3293 r#"{ "name": "NAME", "type": "long", "logicalType": "local-timestamp-nanos" }"#,
3294 Value::Float(123_f32),
3295 "Expected Value::LocalTimestampNanos, Value::Long or Value::Int, got: Float(123.0)",
3296 ),
3297 (
3298 r#"{ "name": "NAME", "type": "null" }"#,
3299 Value::Float(123_f32),
3300 "Expected Value::Null, got: Float(123.0)",
3301 ),
3302 (
3303 r#"{ "name": "NAME", "type": "boolean" }"#,
3304 Value::Float(123_f32),
3305 "Expected Value::Boolean, got: Float(123.0)",
3306 ),
3307 (
3308 r#"{ "name": "NAME", "type": "int" }"#,
3309 Value::Float(123_f32),
3310 "Expected Value::Int, got: Float(123.0)",
3311 ),
3312 (
3313 r#"{ "name": "NAME", "type": "long" }"#,
3314 Value::Float(123_f32),
3315 "Expected Value::Long or Value::Int, got: Float(123.0)",
3316 ),
3317 (
3318 r#"{ "name": "NAME", "type": "float" }"#,
3319 Value::Boolean(false),
3320 r#"Expected Value::Float, Value::Double, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: Boolean(false)"#,
3321 ),
3322 (
3323 r#"{ "name": "NAME", "type": "double" }"#,
3324 Value::Boolean(false),
3325 r#"Expected Value::Double, Value::Float, Value::Int, Value::Long or Value::String ("NaN", "INF", "Infinity", "-INF" or "-Infinity"), got: Boolean(false)"#,
3326 ),
3327 (
3328 r#"{ "name": "NAME", "type": "string" }"#,
3329 Value::Boolean(false),
3330 "Expected Value::String, Value::Bytes or Value::Fixed, got: Boolean(false)",
3331 ),
3332 (
3333 r#"{ "name": "NAME", "type": "enum", "symbols": ["one", "two"] }"#,
3334 Value::Boolean(false),
3335 "Expected Value::Enum, got: Boolean(false)",
3336 ),
3337 ];
3338
3339 for (schema_str, value, expected_error) in data {
3340 let schema = Schema::parse_str(schema_str)?;
3341 match value.resolve(&schema) {
3342 Err(error) => {
3343 assert_eq!(format!("{error}"), expected_error);
3344 }
3345 other => {
3346 panic!("Expected '{expected_error}', got {other:?}");
3347 }
3348 }
3349 }
3350 Ok(())
3351 }
3352
3353 #[test]
3354 fn avro_rs_130_get_from_record() -> TestResult {
3355 let schema = r#"
3356 {
3357 "type": "record",
3358 "name": "NamespacedMessage",
3359 "namespace": "space",
3360 "fields": [
3361 {
3362 "name": "foo",
3363 "type": "string"
3364 },
3365 {
3366 "name": "bar",
3367 "type": "long"
3368 }
3369 ]
3370 }
3371 "#;
3372
3373 let schema = Schema::parse_str(schema)?;
3374 let mut record = Record::new(&schema).unwrap();
3375 record.put("foo", "hello");
3376 record.put("bar", 123_i64);
3377
3378 assert_eq!(
3379 record.get("foo").unwrap(),
3380 &Value::String("hello".to_string())
3381 );
3382 assert_eq!(record.get("bar").unwrap(), &Value::Long(123));
3383
3384 assert_eq!(record.get("baz"), None);
3386
3387 Ok(())
3388 }
3389}