Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove new streaming old multiscan #21300

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,60 @@ pub(super) fn ipc_file_info(
Ok((file_info, metadata))
}

#[cfg(feature = "csv")]
pub fn isolated_csv_file_info(
source: ScanSourceRef,
file_options: &FileScanOptions,
csv_options: &mut CsvReadOptions,
_cloud_options: Option<&polars_io::cloud::CloudOptions>,
) -> PolarsResult<FileInfo> {
use std::io::{Read, Seek};

use polars_io::csv::read::schema_inference::SchemaInferenceResult;
use polars_io::utils::get_reader_bytes;

let run_async = source.run_async();

let memslice = source.to_memslice_async_assume_latest(run_async)?;
let owned = &mut vec![];
let mut reader = std::io::Cursor::new(maybe_decompress_bytes(&memslice, owned)?);
if reader.read(&mut [0; 4])? < 2 && csv_options.raise_if_empty {
polars_bail!(NoData: "empty CSV")
}
reader.rewind()?;

let reader_bytes = get_reader_bytes(&mut reader).expect("could not mmap file");

// this needs a way to estimated bytes/rows.
let si_result =
SchemaInferenceResult::try_from_reader_bytes_and_options(&reader_bytes, csv_options)?;

csv_options.update_with_inference_result(&si_result);

let mut schema = csv_options
.schema
.clone()
.unwrap_or_else(|| si_result.get_inferred_schema());

let reader_schema = if let Some(rc) = &file_options.row_index {
let reader_schema = schema.clone();
let mut output_schema = (*reader_schema).clone();
output_schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE)?;
schema = Arc::new(output_schema);
reader_schema
} else {
schema.clone()
};

let estimated_n_rows = si_result.get_estimated_n_rows();

Ok(FileInfo::new(
schema,
Some(Either::Right(reader_schema)),
(None, estimated_n_rows),
))
}

#[cfg(feature = "csv")]
pub fn csv_file_info(
sources: &ScanSources,
Expand Down
61 changes: 54 additions & 7 deletions crates/polars-plan/src/plans/ir/scan_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,57 @@ pub enum ScanSourceRef<'a> {
}

/// A single source to scan from
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ScanSource {
Path(PathBuf),
File(File),
Path(Arc<Path>),
File(Arc<File>),
Buffer(MemSlice),
}

impl ScanSource {
pub fn from_sources(sources: ScanSources) -> Result<Self, ScanSources> {
if sources.len() == 1 {
match sources {
ScanSources::Paths(ps) => Ok(Self::Path(ps.as_ref()[0].clone().into())),
ScanSources::Files(fs) => {
assert_eq!(fs.len(), 1);
let ptr: *const File = Arc::into_raw(fs) as *const File;
// SAFETY: A [T] with length 1 can be interpreted as T
let f: Arc<File> = unsafe { Arc::from_raw(ptr) };

Ok(Self::File(f))
},
ScanSources::Buffers(bs) => Ok(Self::Buffer(bs.as_ref()[0].clone())),
}
} else {
Err(sources)
}
}

pub fn into_sources(self) -> ScanSources {
match self {
ScanSource::Path(p) => ScanSources::Paths([p].into()),
ScanSource::File(f) => ScanSources::Files([f].into()),
ScanSource::Path(p) => ScanSources::Paths([p.to_path_buf()].into()),
ScanSource::File(f) => {
let ptr: *const [File] = std::ptr::slice_from_raw_parts(Arc::into_raw(f), 1);
// SAFETY: A T can be interpreted as [T] with length 1.
let fs: Arc<[File]> = unsafe { Arc::from_raw(ptr) };
ScanSources::Files(fs)
},
ScanSource::Buffer(m) => ScanSources::Buffers([m].into()),
}
}

pub fn as_scan_source_ref(&self) -> ScanSourceRef {
match self {
ScanSource::Path(path) => ScanSourceRef::Path(path.as_ref()),
ScanSource::File(file) => ScanSourceRef::File(file.as_ref()),
ScanSource::Buffer(mem_slice) => ScanSourceRef::Buffer(mem_slice),
}
}

pub fn run_async(&self) -> bool {
self.as_scan_source_ref().run_async()
}
}

/// An iterator for [`ScanSources`]
Expand Down Expand Up @@ -261,8 +297,15 @@ impl ScanSourceRef<'_> {
// @TODO: I would like to remove this function eventually.
pub fn into_owned(&self) -> PolarsResult<ScanSource> {
Ok(match self {
ScanSourceRef::Path(path) => ScanSource::Path(path.to_path_buf()),
_ => ScanSource::Buffer(self.to_memslice()?),
ScanSourceRef::Path(path) => ScanSource::Path((*path).into()),
ScanSourceRef::File(file) => {
if let Ok(file) = file.try_clone() {
ScanSource::File(Arc::new(file))
} else {
ScanSource::Buffer(self.to_memslice()?)
}
},
ScanSourceRef::Buffer(buffer) => ScanSource::Buffer((*buffer).clone()),
})
}

Expand Down Expand Up @@ -335,6 +378,10 @@ impl ScanSourceRef<'_> {
Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),
}
}

pub(crate) fn run_async(&self) -> bool {
matches!(self, Self::Path(p) if polars_io::is_cloud_url(p) || polars_core::config::force_async())
}
}

impl<'a> Iterator for ScanSourceIter<'a> {
Expand Down
Loading
Loading