Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions parq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,6 @@ def split(
# Split into subdirectory
parq split data.parquet -f 3 -n "output/part-%02d.parquet"
"""
# {{CHENGQI:
# Action: Added; Timestamp: 2025-10-14 21:32:00 +08:00;
# Reason: Add CLI command for split functionality;
# Principle_Applied: Consistent error handling pattern, User-friendly output
# }}
# {{START MODIFICATIONS}}
try:
# Validate mutually exclusive parameters
if file_count is None and record_count is None:
Expand All @@ -226,14 +220,41 @@ def split(
# Start timer
start_time = time.time()

# Create reader and perform split
# Create reader
reader = ParquetReader(str(file))
output_files = reader.split_file(
output_pattern=name_format,
file_count=file_count,
record_count=record_count,

# Setup progress bar
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeRemainingColumn,
)

from parq.output import console
Comment on lines +227 to +235
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better code style and maintainability, it's recommended to move imports to the top of the file, as per PEP 8 guidelines.1 While local imports can sometimes be useful for lazy loading or avoiding circular dependencies, in this case, rich is a core part of the CLI and moving these imports to the top would improve consistency and readability. I've suggested removing them here so you can add them at the top of the file.

Style Guide References

Footnotes

  1. PEP 8: Imports are always put at the top of the file, just after any module comments and docstrings, and before global variables and constants.


with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
console=console,
) as progress:
task = progress.add_task(f"[cyan]Splitting {file.name}...", total=reader.num_rows)

def update_progress(current: int, total: int):
progress.update(task, completed=current)

# Perform split with progress callback
output_files = reader.split_file(
output_pattern=name_format,
file_count=file_count,
record_count=record_count,
progress_callback=update_progress,
)

# Calculate elapsed time
elapsed_time = time.time() - start_time

Expand All @@ -255,8 +276,6 @@ def split(
formatter.print_error(f"Failed to split Parquet file: {e}")
raise typer.Exit(code=1)

# {{END MODIFICATIONS}}


if __name__ == "__main__":
app()
35 changes: 18 additions & 17 deletions parq/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,27 +103,36 @@ def print_table(arrow_table: pa.Table, title: str = "Data Preview") -> None:
"""
Print PyArrow table as a Rich table.

Optimized to avoid pandas conversion, directly reading from PyArrow table
for better performance and reduced memory usage.

Args:
arrow_table: PyArrow table to display
title: Title for the table
"""
# Convert to pandas for easier display
df = arrow_table.to_pandas()

table = Table(
title=f"[bold blue]📄 {title}[/bold blue]",
box=box.ROUNDED,
show_header=True,
header_style="bold magenta",
padding=(0, 1),
show_lines=True,
)

# Add columns
for col in df.columns:
table.add_column(str(col), style="cyan")
# Add columns directly from PyArrow schema
for col_name in arrow_table.column_names:
table.add_column(str(col_name), style="cyan")

# Add rows
for _, row in df.iterrows():
table.add_row(*[str(val) for val in row])
# Add rows using columnar access for better performance
# Convert columns to Python lists first, leveraging PyArrow's optimized operations
columns_data = [arrow_table[col_name].to_pylist() for col_name in arrow_table.column_names]

# Transpose and iterate over rows
for row_idx in range(arrow_table.num_rows):
row_values = [
str(columns_data[col_idx][row_idx]) for col_idx in range(len(columns_data))
]
table.add_row(*row_values)
Comment on lines +131 to +135
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This implementation iterates through the table cell by cell, which can be inefficient. A more performant and Pythonic approach is to convert columns to Python lists first and then iterate over the transposed data. This leverages PyArrow's optimized columnar operations.

Suggested change
for row_idx in range(arrow_table.num_rows):
row_values = []
for col_name in arrow_table.column_names:
value = arrow_table[col_name][row_idx].as_py()
row_values.append(str(value))
table.add_row(*row_values)
# Add rows by transposing columns for efficiency
column_data = [col.to_pylist() for col in arrow_table.columns]
for row in zip(*column_data):
# Convert all values in the row to string before adding
table.add_row(*[str(item) for item in row])


console.print(table)

Expand Down Expand Up @@ -179,12 +188,6 @@ def print_split_result(
total_rows: Total number of rows in source file
elapsed_time: Time taken to split in seconds
"""
# {{CHENGQI:
# Action: Added; Timestamp: 2025-10-14 21:35:00 +08:00;
# Reason: Add output formatter for split command results;
# Principle_Applied: Consistent output formatting, User-friendly display
# }}
# {{START MODIFICATIONS}}

# Calculate statistics
num_files = len(output_files)
Expand Down Expand Up @@ -233,5 +236,3 @@ def print_split_result(
table.add_row(str(idx), str(file_path), size)

console.print(table)

# {{END MODIFICATIONS}}
98 changes: 79 additions & 19 deletions parq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
import pyarrow as pa
import pyarrow.parquet as pq

# Heuristic multiplier for determining when to use fast path vs optimized path
# Files with <= n * FAST_PATH_ROWS_MULTIPLIER rows use full read + slice (simpler)
# Larger files use row group optimization to reduce I/O
FAST_PATH_ROWS_MULTIPLIER = 10


class ParquetReader:
"""Parquet file reader with metadata inspection capabilities."""
Expand Down Expand Up @@ -116,27 +121,87 @@ def read_head(self, n: int = 5) -> pa.Table:
"""
Read first n rows.

Optimized to only read necessary row groups for large files,
significantly reducing memory usage and improving performance.

Args:
n: Number of rows to read
n: Number of rows to read (must be >= 0)

Returns:
PyArrow table with first n rows

Raises:
ValueError: If n < 0
"""
table = self._parquet_file.read()
return table.slice(0, min(n, self.num_rows))
# Input validation
if n < 0:
raise ValueError(f"n must be non-negative, got {n}")

# Guard: zero rows requested - return empty table with correct schema
if n == 0:
return pa.table({field.name: pa.array([], type=field.type) for field in self.schema})

# Fast path: small files or single row group
# Avoids overhead of row group calculation
if self.num_rows <= n * FAST_PATH_ROWS_MULTIPLIER or self.num_row_groups == 1:
table = self._parquet_file.read()
return table.slice(0, min(n, self.num_rows))

# Optimized path: only read necessary row groups
# For large files, this can reduce I/O by 10-100x
rows_read = 0
row_groups = []
for i in range(self.num_row_groups):
row_groups.append(i)
rows_read += self.metadata.row_group(i).num_rows
if rows_read >= n:
break

table = self._parquet_file.read_row_groups(row_groups)
return table.slice(0, n)

def read_tail(self, n: int = 5) -> pa.Table:
"""
Read last n rows.

Optimized to only read necessary row groups from the end of the file,
significantly reducing memory usage and improving performance for large files.

Args:
n: Number of rows to read
n: Number of rows to read (must be >= 0)

Returns:
PyArrow table with last n rows

Raises:
ValueError: If n < 0
"""
table = self._parquet_file.read()
start = max(0, self.num_rows - n)
# Input validation
if n < 0:
raise ValueError(f"n must be non-negative, got {n}")

# Guard: zero rows requested - return empty table with correct schema
if n == 0:
return pa.table({field.name: pa.array([], type=field.type) for field in self.schema})

# Fast path: small files or single row group
if self.num_rows <= n * FAST_PATH_ROWS_MULTIPLIER or self.num_row_groups == 1:
table = self._parquet_file.read()
start = max(0, self.num_rows - n)
return table.slice(start, n)

# Optimized path: read from end, only necessary row groups
# Start from last row group and work backwards
rows_needed = n
row_groups = []
for i in range(self.num_row_groups - 1, -1, -1):
row_groups.insert(0, i) # Maintain order
rows_needed -= self.metadata.row_group(i).num_rows
if rows_needed <= 0:
break

table = self._parquet_file.read_row_groups(row_groups)
start = max(0, len(table) - n)
return table.slice(start, n)

def read_columns(self, columns: Optional[List[str]] = None) -> pa.Table:
Expand All @@ -156,6 +221,7 @@ def split_file(
output_pattern: str,
file_count: Optional[int] = None,
record_count: Optional[int] = None,
progress_callback: Optional[callable] = None,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type hint callable is quite generic. For better type safety and clarity, you can use typing.Callable to specify the signature of the callback function. You will need to add from typing import Callable at the top of the file.

Suggested change
progress_callback: Optional[callable] = None,
progress_callback: Optional["Callable[[int, int], None]"] = None,

) -> List[Path]:
"""
Split parquet file into multiple files.
Expand All @@ -164,6 +230,7 @@ def split_file(
output_pattern: Output file name pattern (e.g., 'result-%06d.parquet')
file_count: Number of output files (mutually exclusive with record_count)
record_count: Number of records per file (mutually exclusive with file_count)
progress_callback: Optional callback function(current, total) for progress updates

Returns:
List of created file paths
Expand All @@ -172,12 +239,6 @@ def split_file(
ValueError: If both or neither of file_count/record_count are provided
IOError: If file write fails
"""
# {{CHENGQI:
# Action: Added; Timestamp: 2025-10-14 21:30:00 +08:00;
# Reason: Implement split command functionality;
# Principle_Applied: SOLID-S (Single Responsibility), DRY, Error handling
# }}
# {{START MODIFICATIONS}}

# Validate parameters
if file_count is None and record_count is None:
Expand Down Expand Up @@ -236,6 +297,7 @@ def split_file(
# Read and distribute data in batches
current_file_idx = 0
current_file_rows = 0
rows_processed = 0

# Use batch reader for memory efficiency
batch_size = min(10000, rows_per_file) # Read in chunks
Expand All @@ -259,6 +321,11 @@ def split_file(

batch_offset += rows_to_write
current_file_rows += rows_to_write
rows_processed += rows_to_write

# Report progress
if progress_callback:
progress_callback(rows_processed, total_rows)

# Move to next file if current is full
if current_file_rows >= rows_per_file and current_file_idx < num_files - 1:
Expand All @@ -271,8 +338,6 @@ def split_file(
if writer:
writer.close()

# {{END MODIFICATIONS}}

return output_files

def _get_compression_type(self) -> str:
Expand All @@ -282,11 +347,6 @@ def _get_compression_type(self) -> str:
Returns:
Compression type string (e.g., 'SNAPPY', 'GZIP', 'NONE')
"""
# {{CHENGUI:
# Action: Added; Timestamp: 2025-10-14 21:30:00 +08:00;
# Reason: Helper method to extract compression type for split files;
# Principle_Applied: DRY, Encapsulation
# }}
if self.num_row_groups > 0:
compression = self.metadata.row_group(0).column(0).compression
return compression
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ dependencies = [
"pyarrow>=18.0.0",
"typer>=0.15.0",
"rich>=13.0.0",
"pandas>=2.0.0",
]
authors = [
{name = "SimonSun", email = "sjf19981112@gmail.com"}
Expand Down
36 changes: 32 additions & 4 deletions tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ def test_read_head(self, sample_parquet_file):
table = reader.read_head(10)
assert len(table) == 5

def test_read_head_edge_cases(self, sample_parquet_file):
"""Test read_head with edge cases."""
reader = ParquetReader(str(sample_parquet_file))

# Test n=0 returns empty table with correct schema
table = reader.read_head(0)
assert len(table) == 0
assert table.num_columns == reader.num_columns
assert table.column_names == reader.schema.names

# Test negative n raises ValueError
with pytest.raises(ValueError, match="must be non-negative"):
reader.read_head(-1)

def test_read_tail(self, sample_parquet_file):
"""Test reading last N rows."""
reader = ParquetReader(str(sample_parquet_file))
Expand All @@ -71,10 +85,24 @@ def test_read_tail(self, sample_parquet_file):
table = reader.read_tail(2)
assert len(table) == 2

# Verify it's the last rows
df = table.to_pandas()
assert df.iloc[0]["id"] == 4
assert df.iloc[1]["id"] == 5
# Verify it's the last rows (using PyArrow's idiomatic to_pylist method)
id_values = table["id"].to_pylist()
assert id_values[0] == 4
assert id_values[1] == 5

def test_read_tail_edge_cases(self, sample_parquet_file):
"""Test read_tail with edge cases."""
reader = ParquetReader(str(sample_parquet_file))

# Test n=0 returns empty table with correct schema
table = reader.read_tail(0)
assert len(table) == 0
assert table.num_columns == reader.num_columns
assert table.column_names == reader.schema.names

# Test negative n raises ValueError
with pytest.raises(ValueError, match="must be non-negative"):
reader.read_tail(-5)

def test_read_columns(self, sample_parquet_file):
"""Test reading specific columns."""
Expand Down
Loading