Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vector type added in MySQL 9.0 #142

Merged
merged 3 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/binlog/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,8 @@ pub enum OptionalMetadataFieldType {
ENUM_AND_SET_COLUMN_CHARSET,
/// A flag that indicates column visibility attribute.
COLUMN_VISIBILITY,
/// Vector column dimensionality.
VECTOR_DIMENSIONALITY,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
Expand Down Expand Up @@ -605,6 +607,7 @@ impl TryFrom<u8> for OptionalMetadataFieldType {
10 => Ok(Self::ENUM_AND_SET_DEFAULT_CHARSET),
11 => Ok(Self::ENUM_AND_SET_COLUMN_CHARSET),
12 => Ok(Self::COLUMN_VISIBILITY),
13 => Ok(Self::VECTOR_DIMENSIONALITY),
x => Err(UnknownOptionalMetadataFieldType(x)),
}
}
Expand Down
62 changes: 62 additions & 0 deletions src/binlog/events/table_map_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,62 @@ impl MySerialize for GeometryTypes<'_> {
}
}

/// Contains a number of dimensions for every vector column.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct VectorDimensionalities<'a> {
dimensionalities: RawBytes<'a, EofBytes>,
}

impl<'a> VectorDimensionalities<'a> {
/// Returns an iterator over dimensionalities.
///
/// It will signal an error and stop iteration if field value is malformed.
pub fn iter_dimensionalities(&'a self) -> IterVectorDimensionalities<'a> {
IterVectorDimensionalities {
buf: ParseBuf(self.dimensionalities.as_bytes()),
}
}
}

pub struct IterVectorDimensionalities<'a> {
buf: ParseBuf<'a>,
}

impl<'a> Iterator for IterVectorDimensionalities<'a> {
type Item = io::Result<u64>;

fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
None
} else {
match self.buf.parse::<RawInt<LenEnc>>(()) {
Ok(x) => Some(Ok(x.0)),
Err(e) => {
self.buf = ParseBuf(b"");
Some(Err(e))
}
}
}
}
}

impl<'de> MyDeserialize<'de> for VectorDimensionalities<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();

fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
dimensionalities: buf.parse(())?,
})
}
}

impl MySerialize for VectorDimensionalities<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.dimensionalities.serialize(buf);
}
}

/// Contains a sequence of PK column indexes where PK doesn't have a prefix.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SimplePrimaryKey<'a> {
Expand Down Expand Up @@ -1146,6 +1202,8 @@ pub enum OptionalMetadataField<'a> {
/// Flags indicating visibility for every numeric column.
&'a BitSlice<u8, Msb0>,
),
/// See [`OptionalMetadataFieldType::VECTOR_DIMENSIONALITY`].
Dimensionality(VectorDimensionalities<'a>),
}

/// Iterator over fields of an optional metadata.
Expand Down Expand Up @@ -1260,6 +1318,9 @@ impl<'a> Iterator for OptionalMetadataIter<'a> {
let flags = &flags[..num_columns];
Ok(OptionalMetadataField::ColumnVisibility(flags))
}
VECTOR_DIMENSIONALITY => {
Ok(OptionalMetadataField::Dimensionality(v.parse(())?))
}
},
Err(_) => Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down Expand Up @@ -1325,6 +1386,7 @@ impl<'a> OptionalMetaExtractor<'a> {
this.enum_and_set_column_charset = Some(x);
}
OptionalMetadataField::ColumnVisibility(_) => (),
OptionalMetadataField::Dimensionality(_) => (),
}
}

Expand Down
94 changes: 91 additions & 3 deletions src/binlog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ impl ColumnType {
| Self::MYSQL_TYPE_TIME2
| Self::MYSQL_TYPE_DATETIME2
| Self::MYSQL_TYPE_TIMESTAMP2
| Self::MYSQL_TYPE_JSON => ptr.get(..1).map(|x| (x, 1)),
| Self::MYSQL_TYPE_JSON
| Self::MYSQL_TYPE_VECTOR => ptr.get(..1).map(|x| (x, 1)),
Self::MYSQL_TYPE_VARCHAR => {
if is_array {
ptr.get(..3).map(|x| (x, 3))
Expand All @@ -302,7 +303,21 @@ impl ColumnType {
.ok()?
.get_metadata(ptr.get(1..)?, true)
.map(|(x, n)| (x, n + 1)),
_ => Some((&[], 0)),
Self::MYSQL_TYPE_DECIMAL
| Self::MYSQL_TYPE_TINY
| Self::MYSQL_TYPE_SHORT
| Self::MYSQL_TYPE_LONG
| Self::MYSQL_TYPE_NULL
| Self::MYSQL_TYPE_TIMESTAMP
| Self::MYSQL_TYPE_LONGLONG
| Self::MYSQL_TYPE_INT24
| Self::MYSQL_TYPE_DATE
| Self::MYSQL_TYPE_TIME
| Self::MYSQL_TYPE_DATETIME
| Self::MYSQL_TYPE_YEAR
| Self::MYSQL_TYPE_NEWDATE
| Self::MYSQL_TYPE_UNKNOWN
| Self::MYSQL_TYPE_VAR_STRING => Some((&[], 0)),
}
}
}
Expand All @@ -311,6 +326,7 @@ impl ColumnType {
mod tests {
use std::{
collections::HashMap,
convert::TryFrom,
io,
iter::{once, repeat},
};
Expand All @@ -322,10 +338,14 @@ mod tests {
};

use crate::{
binlog::{events::RowsEventData, value::BinlogValue},
binlog::{
events::{OptionalMetadataField, RowsEventData},
value::BinlogValue,
},
collations::CollationId,
constants::ColumnFlags,
proto::MySerialize,
row::convert::from_row,
value::Value,
};

Expand Down Expand Up @@ -719,10 +739,12 @@ mod tests {
let file_data = std::fs::read(dbg!(&file_path))?;
let mut binlog_file = BinlogFile::new(BinlogVersion::Version4, &file_data[..])?;

let mut i = 0;
let mut ev_pos = 4;
let mut table_map_events = HashMap::new();

while let Some(ev) = binlog_file.next() {
i += 1;
let ev = ev?;
let _ = dbg!(ev.header().event_type());
let ev_end = ev_pos + ev.header().event_size() as usize;
Expand Down Expand Up @@ -832,6 +854,72 @@ mod tests {
}
}

if file_path.file_name().unwrap() == "vector.binlog" {
let event_data = ev.read_data().unwrap();
match event_data {
Some(EventData::TableMapEvent(ev)) => {
let optional_meta = ev.iter_optional_meta();
match ev.table_name().as_ref() {
"foo" => {
for meta in optional_meta {
match meta.unwrap() {
OptionalMetadataField::Dimensionality(x) => assert_eq!(
x.iter_dimensionalities()
.collect::<Result<Vec<_>, _>>()
.unwrap(),
vec![3],
),
_ => (),
}
}
}
"bar" => {
for meta in optional_meta {
match meta.unwrap() {
OptionalMetadataField::Dimensionality(x) => assert_eq!(
x.iter_dimensionalities()
.collect::<Result<Vec<_>, _>>()
.unwrap(),
vec![2, 4],
),
_ => (),
}
}
}
_ => (),
}
}
Some(EventData::RowsEvent(ev)) if i == 12 => {
let table_map_event =
binlog_file.reader().get_tme(ev.table_id()).unwrap();
let mut rows = ev.rows(table_map_event);

let (None, Some(after)) = rows.next().unwrap().unwrap() else {
panic!("Unexpected data");
};
let (id, vector_column): (u8, Vec<u8>) =
from_row(crate::Row::try_from(after).unwrap());
assert_eq!(id, 1);
assert_eq!(
vector_column,
vec![205, 204, 140, 63, 205, 204, 12, 64, 51, 51, 83, 64]
);

let (None, Some(after)) = rows.next().unwrap().unwrap() else {
panic!("Unexpected data");
};
let (id, vector_column): (u8, Vec<u8>) =
from_row(crate::Row::try_from(after).unwrap());
assert_eq!(id, 2);
assert_eq!(
vector_column,
vec![0, 0, 128, 63, 0, 0, 128, 191, 0, 0, 0, 0]
);
}
_ => (),
}
}

if file_path.file_name().unwrap() == "mysql-enum-string-set.000001" {
if let Some(EventData::RowsEvent(data)) = ev.read_data().unwrap() {
let table_map_event =
Expand Down
3 changes: 2 additions & 1 deletion src/binlog/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ impl<'de> MyDeserialize<'de> for BinlogValue<'de> {
| MYSQL_TYPE_MEDIUM_BLOB
| MYSQL_TYPE_LONG_BLOB
| MYSQL_TYPE_BLOB
| MYSQL_TYPE_GEOMETRY => {
| MYSQL_TYPE_GEOMETRY
| MYSQL_TYPE_VECTOR => {
let nbytes = match col_meta[0] {
1 => *buf.parse::<RawInt<u8>>(())? as usize,
2 => *buf.parse::<RawInt<LeU16>>(())? as usize,
Expand Down
6 changes: 6 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ pub enum ColumnType {
MYSQL_TYPE_DATETIME2,
MYSQL_TYPE_TIME2,
MYSQL_TYPE_TYPED_ARRAY, // Used for replication only
MYSQL_TYPE_VECTOR = 242,
MYSQL_TYPE_UNKNOWN = 243,
MYSQL_TYPE_JSON = 245,
MYSQL_TYPE_NEWDECIMAL = 246,
Expand Down Expand Up @@ -671,6 +672,10 @@ impl ColumnType {
pub fn is_geometry_type(&self) -> bool {
matches!(self, ColumnType::MYSQL_TYPE_GEOMETRY)
}

pub fn is_vector_type(&self) -> bool {
matches!(self, ColumnType::MYSQL_TYPE_VECTOR)
}
}

impl TryFrom<u8> for ColumnType {
Expand Down Expand Up @@ -698,6 +703,7 @@ impl TryFrom<u8> for ColumnType {
0x12_u8 => Ok(ColumnType::MYSQL_TYPE_DATETIME2),
0x13_u8 => Ok(ColumnType::MYSQL_TYPE_TIME2),
0x14_u8 => Ok(ColumnType::MYSQL_TYPE_TYPED_ARRAY),
0xf2_u8 => Ok(ColumnType::MYSQL_TYPE_VECTOR),
0xf3_u8 => Ok(ColumnType::MYSQL_TYPE_UNKNOWN),
0xf5_u8 => Ok(ColumnType::MYSQL_TYPE_JSON),
0xf6_u8 => Ok(ColumnType::MYSQL_TYPE_NEWDECIMAL),
Expand Down
3 changes: 3 additions & 0 deletions src/value/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ impl Value {
| ColumnType::MYSQL_TYPE_BIT
| ColumnType::MYSQL_TYPE_NEWDECIMAL
| ColumnType::MYSQL_TYPE_GEOMETRY
| ColumnType::MYSQL_TYPE_VECTOR
| ColumnType::MYSQL_TYPE_JSON => Ok(Bytes(
buf.checked_eat_lenenc_str()
.ok_or_else(unexpected_buf_eof)?
Expand Down Expand Up @@ -576,6 +577,7 @@ mod test {
Value::Bytes(b"MYSQL_TYPE_STRING".to_vec()),
Value::NULL,
Value::Bytes(b"MYSQL_TYPE_GEOMETRY".to_vec()),
Value::Bytes(b"MYSQL_TYPE_VECTOR".to_vec()),
];

let (body, _) = ComStmtExecuteRequestBuilder::new(0).build(&*values);
Expand Down Expand Up @@ -627,6 +629,7 @@ mod test {
Value::Bytes(b"MYSQL_TYPE_STRING".to_vec()),
Value::NULL,
Value::Bytes(b"MYSQL_TYPE_GEOMETRY".to_vec()),
Value::Bytes(b"MYSQL_TYPE_VECTOR".to_vec()),
];

let (body, _) = ComStmtExecuteRequestBuilder::new(0).build(&*values);
Expand Down
Binary file added test-data/binlogs/vector.binlog
Binary file not shown.