Skip to content

Commit eae47a4

Browse files
authored
refactor: Remove new streaming old multiscan (#21300)
1 parent bb8efc5 commit eae47a4

File tree

8 files changed

+372
-319
lines changed

8 files changed

+372
-319
lines changed

crates/polars-plan/src/plans/conversion/scans.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,60 @@ pub(super) fn ipc_file_info(
131131
Ok((file_info, metadata))
132132
}
133133

134+
#[cfg(feature = "csv")]
135+
pub fn isolated_csv_file_info(
136+
source: ScanSourceRef,
137+
file_options: &FileScanOptions,
138+
csv_options: &mut CsvReadOptions,
139+
_cloud_options: Option<&polars_io::cloud::CloudOptions>,
140+
) -> PolarsResult<FileInfo> {
141+
use std::io::{Read, Seek};
142+
143+
use polars_io::csv::read::schema_inference::SchemaInferenceResult;
144+
use polars_io::utils::get_reader_bytes;
145+
146+
let run_async = source.run_async();
147+
148+
let memslice = source.to_memslice_async_assume_latest(run_async)?;
149+
let owned = &mut vec![];
150+
let mut reader = std::io::Cursor::new(maybe_decompress_bytes(&memslice, owned)?);
151+
if reader.read(&mut [0; 4])? < 2 && csv_options.raise_if_empty {
152+
polars_bail!(NoData: "empty CSV")
153+
}
154+
reader.rewind()?;
155+
156+
let reader_bytes = get_reader_bytes(&mut reader).expect("could not mmap file");
157+
158+
// this needs a way to estimated bytes/rows.
159+
let si_result =
160+
SchemaInferenceResult::try_from_reader_bytes_and_options(&reader_bytes, csv_options)?;
161+
162+
csv_options.update_with_inference_result(&si_result);
163+
164+
let mut schema = csv_options
165+
.schema
166+
.clone()
167+
.unwrap_or_else(|| si_result.get_inferred_schema());
168+
169+
let reader_schema = if let Some(rc) = &file_options.row_index {
170+
let reader_schema = schema.clone();
171+
let mut output_schema = (*reader_schema).clone();
172+
output_schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE)?;
173+
schema = Arc::new(output_schema);
174+
reader_schema
175+
} else {
176+
schema.clone()
177+
};
178+
179+
let estimated_n_rows = si_result.get_estimated_n_rows();
180+
181+
Ok(FileInfo::new(
182+
schema,
183+
Some(Either::Right(reader_schema)),
184+
(None, estimated_n_rows),
185+
))
186+
}
187+
134188
#[cfg(feature = "csv")]
135189
pub fn csv_file_info(
136190
sources: &ScanSources,

crates/polars-plan/src/plans/ir/scan_sources.rs

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,57 @@ pub enum ScanSourceRef<'a> {
4747
}
4848

4949
/// A single source to scan from
50-
#[derive(Debug)]
50+
#[derive(Debug, Clone)]
5151
pub enum ScanSource {
52-
Path(PathBuf),
53-
File(File),
52+
Path(Arc<Path>),
53+
File(Arc<File>),
5454
Buffer(MemSlice),
5555
}
5656

5757
impl ScanSource {
58+
pub fn from_sources(sources: ScanSources) -> Result<Self, ScanSources> {
59+
if sources.len() == 1 {
60+
match sources {
61+
ScanSources::Paths(ps) => Ok(Self::Path(ps.as_ref()[0].clone().into())),
62+
ScanSources::Files(fs) => {
63+
assert_eq!(fs.len(), 1);
64+
let ptr: *const File = Arc::into_raw(fs) as *const File;
65+
// SAFETY: A [T] with length 1 can be interpreted as T
66+
let f: Arc<File> = unsafe { Arc::from_raw(ptr) };
67+
68+
Ok(Self::File(f))
69+
},
70+
ScanSources::Buffers(bs) => Ok(Self::Buffer(bs.as_ref()[0].clone())),
71+
}
72+
} else {
73+
Err(sources)
74+
}
75+
}
76+
5877
pub fn into_sources(self) -> ScanSources {
5978
match self {
60-
ScanSource::Path(p) => ScanSources::Paths([p].into()),
61-
ScanSource::File(f) => ScanSources::Files([f].into()),
79+
ScanSource::Path(p) => ScanSources::Paths([p.to_path_buf()].into()),
80+
ScanSource::File(f) => {
81+
let ptr: *const [File] = std::ptr::slice_from_raw_parts(Arc::into_raw(f), 1);
82+
// SAFETY: A T can be interpreted as [T] with length 1.
83+
let fs: Arc<[File]> = unsafe { Arc::from_raw(ptr) };
84+
ScanSources::Files(fs)
85+
},
6286
ScanSource::Buffer(m) => ScanSources::Buffers([m].into()),
6387
}
6488
}
89+
90+
pub fn as_scan_source_ref(&self) -> ScanSourceRef {
91+
match self {
92+
ScanSource::Path(path) => ScanSourceRef::Path(path.as_ref()),
93+
ScanSource::File(file) => ScanSourceRef::File(file.as_ref()),
94+
ScanSource::Buffer(mem_slice) => ScanSourceRef::Buffer(mem_slice),
95+
}
96+
}
97+
98+
pub fn run_async(&self) -> bool {
99+
self.as_scan_source_ref().run_async()
100+
}
65101
}
66102

67103
/// An iterator for [`ScanSources`]
@@ -261,8 +297,15 @@ impl ScanSourceRef<'_> {
261297
// @TODO: I would like to remove this function eventually.
262298
pub fn into_owned(&self) -> PolarsResult<ScanSource> {
263299
Ok(match self {
264-
ScanSourceRef::Path(path) => ScanSource::Path(path.to_path_buf()),
265-
_ => ScanSource::Buffer(self.to_memslice()?),
300+
ScanSourceRef::Path(path) => ScanSource::Path((*path).into()),
301+
ScanSourceRef::File(file) => {
302+
if let Ok(file) = file.try_clone() {
303+
ScanSource::File(Arc::new(file))
304+
} else {
305+
ScanSource::Buffer(self.to_memslice()?)
306+
}
307+
},
308+
ScanSourceRef::Buffer(buffer) => ScanSource::Buffer((*buffer).clone()),
266309
})
267310
}
268311

@@ -335,6 +378,10 @@ impl ScanSourceRef<'_> {
335378
Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),
336379
}
337380
}
381+
382+
pub(crate) fn run_async(&self) -> bool {
383+
matches!(self, Self::Path(p) if polars_io::is_cloud_url(p) || polars_core::config::force_async())
384+
}
338385
}
339386

340387
impl<'a> Iterator for ScanSourceIter<'a> {

0 commit comments

Comments
 (0)