diff --git a/src/binlog/consts.rs b/src/binlog/consts.rs index 2447a21..99b526f 100644 --- a/src/binlog/consts.rs +++ b/src/binlog/consts.rs @@ -576,6 +576,8 @@ pub enum OptionalMetadataFieldType { ENUM_AND_SET_COLUMN_CHARSET, /// A flag that indicates column visibility attribute. COLUMN_VISIBILITY, + /// Vector column dimensionality. + VECTOR_DIMENSIONALITY, } #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] @@ -605,6 +607,7 @@ impl TryFrom for OptionalMetadataFieldType { 10 => Ok(Self::ENUM_AND_SET_DEFAULT_CHARSET), 11 => Ok(Self::ENUM_AND_SET_COLUMN_CHARSET), 12 => Ok(Self::COLUMN_VISIBILITY), + 13 => Ok(Self::VECTOR_DIMENSIONALITY), x => Err(UnknownOptionalMetadataFieldType(x)), } } diff --git a/src/binlog/events/table_map_event.rs b/src/binlog/events/table_map_event.rs index 84e6d78..1a25638 100644 --- a/src/binlog/events/table_map_event.rs +++ b/src/binlog/events/table_map_event.rs @@ -960,6 +960,62 @@ impl MySerialize for GeometryTypes<'_> { } } +/// Contains a number of dimensions for every vector column. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct VectorDimensionalities<'a> { + dimensionalities: RawBytes<'a, EofBytes>, +} + +impl<'a> VectorDimensionalities<'a> { + /// Returns an iterator over dimensionalities. + /// + /// It will signal an error and stop iteration if field value is malformed. + pub fn iter_dimensionalities(&'a self) -> IterVectorDimensionalities<'a> { + IterVectorDimensionalities { + buf: ParseBuf(self.dimensionalities.as_bytes()), + } + } +} + +pub struct IterVectorDimensionalities<'a> { + buf: ParseBuf<'a>, +} + +impl<'a> Iterator for IterVectorDimensionalities<'a> { + type Item = io::Result; + + fn next(&mut self) -> Option { + if self.buf.is_empty() { + None + } else { + match self.buf.parse::>(()) { + Ok(x) => Some(Ok(x.0)), + Err(e) => { + self.buf = ParseBuf(b""); + Some(Err(e)) + } + } + } + } +} + +impl<'de> MyDeserialize<'de> for VectorDimensionalities<'de> { + const SIZE: Option = None; + type Ctx = (); + + fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result { + Ok(Self { + dimensionalities: buf.parse(())?, + }) + } +} + +impl MySerialize for VectorDimensionalities<'_> { + fn serialize(&self, buf: &mut Vec) { + self.dimensionalities.serialize(buf); + } +} + /// Contains a sequence of PK column indexes where PK doesn't have a prefix. #[derive(Debug, Clone, Eq, PartialEq)] pub struct SimplePrimaryKey<'a> { @@ -1146,6 +1202,8 @@ pub enum OptionalMetadataField<'a> { /// Flags indicating visibility for every numeric column. &'a BitSlice, ), + /// See [`OptionalMetadataFieldType::VECTOR_DIMENSIONALITY`]. + Dimensionality(VectorDimensionalities<'a>), } /// Iterator over fields of an optional metadata. @@ -1260,6 +1318,9 @@ impl<'a> Iterator for OptionalMetadataIter<'a> { let flags = &flags[..num_columns]; Ok(OptionalMetadataField::ColumnVisibility(flags)) } + VECTOR_DIMENSIONALITY => { + Ok(OptionalMetadataField::Dimensionality(v.parse(())?)) + } }, Err(_) => Err(io::Error::new( io::ErrorKind::InvalidData, @@ -1325,6 +1386,7 @@ impl<'a> OptionalMetaExtractor<'a> { this.enum_and_set_column_charset = Some(x); } OptionalMetadataField::ColumnVisibility(_) => (), + OptionalMetadataField::Dimensionality(_) => (), } } diff --git a/src/binlog/mod.rs b/src/binlog/mod.rs index c56e865..b1511a8 100644 --- a/src/binlog/mod.rs +++ b/src/binlog/mod.rs @@ -285,7 +285,8 @@ impl ColumnType { | Self::MYSQL_TYPE_TIME2 | Self::MYSQL_TYPE_DATETIME2 | Self::MYSQL_TYPE_TIMESTAMP2 - | Self::MYSQL_TYPE_JSON => ptr.get(..1).map(|x| (x, 1)), + | Self::MYSQL_TYPE_JSON + | Self::MYSQL_TYPE_VECTOR => ptr.get(..1).map(|x| (x, 1)), Self::MYSQL_TYPE_VARCHAR => { if is_array { ptr.get(..3).map(|x| (x, 3)) @@ -302,7 +303,21 @@ impl ColumnType { .ok()? .get_metadata(ptr.get(1..)?, true) .map(|(x, n)| (x, n + 1)), - _ => Some((&[], 0)), + Self::MYSQL_TYPE_DECIMAL + | Self::MYSQL_TYPE_TINY + | Self::MYSQL_TYPE_SHORT + | Self::MYSQL_TYPE_LONG + | Self::MYSQL_TYPE_NULL + | Self::MYSQL_TYPE_TIMESTAMP + | Self::MYSQL_TYPE_LONGLONG + | Self::MYSQL_TYPE_INT24 + | Self::MYSQL_TYPE_DATE + | Self::MYSQL_TYPE_TIME + | Self::MYSQL_TYPE_DATETIME + | Self::MYSQL_TYPE_YEAR + | Self::MYSQL_TYPE_NEWDATE + | Self::MYSQL_TYPE_UNKNOWN + | Self::MYSQL_TYPE_VAR_STRING => Some((&[], 0)), } } } @@ -311,6 +326,7 @@ impl ColumnType { mod tests { use std::{ collections::HashMap, + convert::TryFrom, io, iter::{once, repeat}, }; @@ -322,10 +338,14 @@ mod tests { }; use crate::{ - binlog::{events::RowsEventData, value::BinlogValue}, + binlog::{ + events::{OptionalMetadataField, RowsEventData}, + value::BinlogValue, + }, collations::CollationId, constants::ColumnFlags, proto::MySerialize, + row::convert::from_row, value::Value, }; @@ -719,10 +739,12 @@ mod tests { let file_data = std::fs::read(dbg!(&file_path))?; let mut binlog_file = BinlogFile::new(BinlogVersion::Version4, &file_data[..])?; + let mut i = 0; let mut ev_pos = 4; let mut table_map_events = HashMap::new(); while let Some(ev) = binlog_file.next() { + i += 1; let ev = ev?; let _ = dbg!(ev.header().event_type()); let ev_end = ev_pos + ev.header().event_size() as usize; @@ -832,6 +854,72 @@ mod tests { } } + if file_path.file_name().unwrap() == "vector.binlog" { + let event_data = ev.read_data().unwrap(); + match event_data { + Some(EventData::TableMapEvent(ev)) => { + let optional_meta = ev.iter_optional_meta(); + match ev.table_name().as_ref() { + "foo" => { + for meta in optional_meta { + match meta.unwrap() { + OptionalMetadataField::Dimensionality(x) => assert_eq!( + x.iter_dimensionalities() + .collect::, _>>() + .unwrap(), + vec![3], + ), + _ => (), + } + } + } + "bar" => { + for meta in optional_meta { + match meta.unwrap() { + OptionalMetadataField::Dimensionality(x) => assert_eq!( + x.iter_dimensionalities() + .collect::, _>>() + .unwrap(), + vec![2, 4], + ), + _ => (), + } + } + } + _ => (), + } + } + Some(EventData::RowsEvent(ev)) if i == 12 => { + let table_map_event = + binlog_file.reader().get_tme(ev.table_id()).unwrap(); + let mut rows = ev.rows(table_map_event); + + let (None, Some(after)) = rows.next().unwrap().unwrap() else { + panic!("Unexpected data"); + }; + let (id, vector_column): (u8, Vec) = + from_row(crate::Row::try_from(after).unwrap()); + assert_eq!(id, 1); + assert_eq!( + vector_column, + vec![205, 204, 140, 63, 205, 204, 12, 64, 51, 51, 83, 64] + ); + + let (None, Some(after)) = rows.next().unwrap().unwrap() else { + panic!("Unexpected data"); + }; + let (id, vector_column): (u8, Vec) = + from_row(crate::Row::try_from(after).unwrap()); + assert_eq!(id, 2); + assert_eq!( + vector_column, + vec![0, 0, 128, 63, 0, 0, 128, 191, 0, 0, 0, 0] + ); + } + _ => (), + } + } + if file_path.file_name().unwrap() == "mysql-enum-string-set.000001" { if let Some(EventData::RowsEvent(data)) = ev.read_data().unwrap() { let table_map_event = diff --git a/src/binlog/value.rs b/src/binlog/value.rs index 3d908e6..5ea56fc 100644 --- a/src/binlog/value.rs +++ b/src/binlog/value.rs @@ -208,7 +208,8 @@ impl<'de> MyDeserialize<'de> for BinlogValue<'de> { | MYSQL_TYPE_MEDIUM_BLOB | MYSQL_TYPE_LONG_BLOB | MYSQL_TYPE_BLOB - | MYSQL_TYPE_GEOMETRY => { + | MYSQL_TYPE_GEOMETRY + | MYSQL_TYPE_VECTOR => { let nbytes = match col_meta[0] { 1 => *buf.parse::>(())? as usize, 2 => *buf.parse::>(())? as usize, diff --git a/src/constants.rs b/src/constants.rs index 487082e..e800748 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -615,6 +615,7 @@ pub enum ColumnType { MYSQL_TYPE_DATETIME2, MYSQL_TYPE_TIME2, MYSQL_TYPE_TYPED_ARRAY, // Used for replication only + MYSQL_TYPE_VECTOR = 242, MYSQL_TYPE_UNKNOWN = 243, MYSQL_TYPE_JSON = 245, MYSQL_TYPE_NEWDECIMAL = 246, @@ -671,6 +672,10 @@ impl ColumnType { pub fn is_geometry_type(&self) -> bool { matches!(self, ColumnType::MYSQL_TYPE_GEOMETRY) } + + pub fn is_vector_type(&self) -> bool { + matches!(self, ColumnType::MYSQL_TYPE_VECTOR) + } } impl TryFrom for ColumnType { @@ -698,6 +703,7 @@ impl TryFrom for ColumnType { 0x12_u8 => Ok(ColumnType::MYSQL_TYPE_DATETIME2), 0x13_u8 => Ok(ColumnType::MYSQL_TYPE_TIME2), 0x14_u8 => Ok(ColumnType::MYSQL_TYPE_TYPED_ARRAY), + 0xf2_u8 => Ok(ColumnType::MYSQL_TYPE_VECTOR), 0xf3_u8 => Ok(ColumnType::MYSQL_TYPE_UNKNOWN), 0xf5_u8 => Ok(ColumnType::MYSQL_TYPE_JSON), 0xf6_u8 => Ok(ColumnType::MYSQL_TYPE_NEWDECIMAL), diff --git a/src/value/mod.rs b/src/value/mod.rs index 7f57a7b..3365980 100644 --- a/src/value/mod.rs +++ b/src/value/mod.rs @@ -414,6 +414,7 @@ impl Value { | ColumnType::MYSQL_TYPE_BIT | ColumnType::MYSQL_TYPE_NEWDECIMAL | ColumnType::MYSQL_TYPE_GEOMETRY + | ColumnType::MYSQL_TYPE_VECTOR | ColumnType::MYSQL_TYPE_JSON => Ok(Bytes( buf.checked_eat_lenenc_str() .ok_or_else(unexpected_buf_eof)? @@ -576,6 +577,7 @@ mod test { Value::Bytes(b"MYSQL_TYPE_STRING".to_vec()), Value::NULL, Value::Bytes(b"MYSQL_TYPE_GEOMETRY".to_vec()), + Value::Bytes(b"MYSQL_TYPE_VECTOR".to_vec()), ]; let (body, _) = ComStmtExecuteRequestBuilder::new(0).build(&*values); @@ -627,6 +629,7 @@ mod test { Value::Bytes(b"MYSQL_TYPE_STRING".to_vec()), Value::NULL, Value::Bytes(b"MYSQL_TYPE_GEOMETRY".to_vec()), + Value::Bytes(b"MYSQL_TYPE_VECTOR".to_vec()), ]; let (body, _) = ComStmtExecuteRequestBuilder::new(0).build(&*values); diff --git a/test-data/binlogs/vector.binlog b/test-data/binlogs/vector.binlog new file mode 100644 index 0000000..475932a Binary files /dev/null and b/test-data/binlogs/vector.binlog differ