Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting ComputeError: buffer's length is too small in mmap when concatenating 100s of parquet files #16227

Open
2 tasks done
DeflateAwning opened this issue May 14, 2024 · 2 comments
Labels
bug Something isn't working incomplete Incomplete issue: needs MWE needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@DeflateAwning
Copy link
Contributor

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

My appologies that I don't have a minumum reproducable example, as the failure seems somewhat random.

def concat_all_parquets_in_list_chunks(
	input_parquet_paths: list[Path],
	final_parquet_path: Path,
	data_schema: dict[str, pl.DataType],
	sort_cols: list[str],
	chunk_size: int = 35,
) -> None:
	
	accumulate_parquet_path_in = final_parquet_path.with_name(f"accumulate_in_{final_parquet_path.name}")
	accumulate_parquet_path_out = final_parquet_path.with_name(f"accumulate_out_{final_parquet_path.name}")

	input_parquet_paths_chunks = list(yield_chunks(input_parquet_paths, chunk_size=chunk_size))
	for chunk_num, input_parquet_paths_chunk in enumerate(tqdm(input_parquet_paths_chunks, unit='chunk_of_parquets')):
		logger.info(f"Starting parquet chunk {chunk_num:,}")
		scanned_parquets = [
			pl.scan_parquet(parquet_path)
			for parquet_path in input_parquet_paths_chunk
		]
		if chunk_num > 0:
			scanned_parquets.append(pl.scan_parquet(accumulate_parquet_path_in))
		
		pl.concat(scanned_parquets).cast(data_schema).sort(sort_cols).sink_parquet(accumulate_parquet_path_out, compression='uncompressed')
		logger.info(f"Finished concatenating parquet chunk {chunk_num:,}")

		# rename and overwrite
		if chunk_num > 0:
			accumulate_parquet_path_in.unlink()
		accumulate_parquet_path_out.rename(accumulate_parquet_path_in)

Log output

Coming soon

Issue description

Calling that function results in successes for 20+ chunks (hundreds of files), and then encounters an error with no indication of what went wrong.

The error is: ComputeError: buffer's length is too small in mmap

Expected behavior

Function should work.

Installed versions

--------Version info---------
Polars:               0.20.25
Index type:           UInt32
Platform:             Linux-6.5.0-1022-oem-x86_64-with-glibc2.35
Python:               3.9.19 (main, Apr  6 2024, 17:57:55) 
[GCC 11.4.0]

----Optional dependencies----
adbc_driver_manager:  0.11.0
cloudpickle:          3.0.0
connectorx:           0.3.3
deltalake:            0.17.4
fastexcel:            0.10.4
fsspec:               2024.3.1
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           3.8.4
nest_asyncio:         1.6.0
numpy:                1.26.4
openpyxl:             3.1.2
pandas:               1.5.3
pyarrow:              11.0.0
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               1.0.10
sqlalchemy:           1.4.52
torch:                <not installed>
xlsx2csv:             0.8.2
xlsxwriter:           3.0.9

@DeflateAwning DeflateAwning added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels May 14, 2024
@ritchie46
Copy link
Member

Would really appreciate a repro here. This doesn't ring a bell.

@ritchie46 ritchie46 added the incomplete Incomplete issue: needs MWE label May 15, 2024
@DeflateAwning
Copy link
Contributor Author

DeflateAwning commented May 20, 2024

The situation seems to be that, depending on the compute environment configuration (amount of RAM), as the computation is running and the system is running out of memory, it either straight runs out of memory, or runs into this bug where this error occurs. I've been unable to create a reliable repro though; will keep trying.

Here's the repro I'm working with, but it doesn't fail reliably, and is highly dependent on available RAM:

from pathlib import Path
import polars as pl
import random

# generate 4000 parquets, each with 100k rows
file_count = 4000
(input_folder_path := Path('./temp_RERPO_16227')).mkdir(exist_ok=True, parents=True)
print(f"Writing to: {input_folder_path.absolute()}")
for file_num in range(file_count):
	row_count = 100_000
	df = pl.DataFrame(
		{
			"file_num": pl.Series([file_num] * row_count),
			"random_num": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
			"random_num_1": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
			"random_num_2": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
			"random_num_3": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
			"random_num_4": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
			"random_num_5": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
			'col1': pl.Series(["123", "abc", "xyz"]).sample(row_count, with_replacement=True),
			'col2': pl.Series(["123", "abc", "xyz"]).sample(row_count, with_replacement=True),
			'col3': pl.Series(["123", "abc", "xyz"]).sample(row_count, with_replacement=True),
			'col4': pl.Series(["123", "abc", "xyz"]).sample(row_count, with_replacement=True),
		}
	).with_row_index("orig_row_number")
	df.write_parquet(input_folder_path / f"in_file_{file_num}.parquet")
	print(f"Made parquet {file_num + 1}/{file_count}")

print(f"Made {file_count:,} parquets. Total size: {sum(f.stat().st_size for f in input_folder_path.glob('*.parquet')):,} bytes")

# then concat them all into one big parquet
(output_folder_path := Path('./temp_RERPO_16227_output')).mkdir(exist_ok=True, parents=True)
output_path = output_folder_path / "out_file.parquet"

dfs = [
	pl.scan_parquet(f)
	for f in input_folder_path.glob("*.parquet")
]
pl.concat(dfs).sort("random_num").sink_parquet(output_path)
print(f"Concatenated {file_count:,} parquets into one big parquet. Total size: {output_path.stat().st_size:,} bytes")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working incomplete Incomplete issue: needs MWE needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

2 participants