Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Quadratic memory usage of Table.to_pandas with nested data #20512

Closed
asfimport opened this issue Nov 24, 2022 · 22 comments · Fixed by #15210
Closed

[Python] Quadratic memory usage of Table.to_pandas with nested data #20512

asfimport opened this issue Nov 24, 2022 · 22 comments · Fixed by #15210
Assignees
Labels
Milestone

Comments

@asfimport
Copy link

Reading nested Parquet data and then converting it to a Pandas DataFrame shows quadratic memory usage and will eventually run out of memory for reasonably small files. I had initially thought this was a regression since 7.0.0, but it looks like 7.0.0 has similar quadratic memory usage that kicks in at higher row counts.

Example code to generate nested Parquet data:

import numpy as np
import random
import string
import pandas as pd

_characters = string.ascii_uppercase + string.digits + string.punctuation

def make_random_string(N=10):
    return ''.join(random.choice(_characters) for _ in range(N))

nrows = 1_024_000
filename = 'nested.parquet'

arr_len = 10
nested_col = []
for i in range(nrows):
    nested_col.append(np.array(
            [{
                'a': None if i % 1000 == 0 else np.random.choice(10000, size=3).astype(np.int64),
                'b': None if i % 100 == 0 else random.choice(range(100)),
                'c': None if i % 10 == 0 else make_random_string(5)
            } for i in range(arr_len)]
        ))
df = pd.DataFrame({'c1': nested_col})
df.to_parquet(filename)

And then read into a DataFrame with:

import pyarrow.parquet as pq
table = pq.read_table(filename)
df = table.to_pandas()

Only reading to an Arrow table isn't a problem, it's the to_pandas method that exhibits the large memory usage. I haven't tested generating nested Arrow data in memory without writing Parquet from Pandas but I assume the problem probably isn't Parquet specific.

Memory usage I see when reading different sized files on a machine with 64 GB RAM:

|Num rows|Memory used with 10.0.1 (MB)|Memory used with 7.0.0 (MB)|
|
|-|-|-|-|
|32,000|362|361|
|
|64,000|531|531|
|
|128,000|1,152|1,101|
|
|256,000|2,888|1,402|
|
|512,000|10,301|3,508|
|
|1,024,000|38,697|5,313|
|
|2,048,000|OOM|20,061|
|
|4,096,000| |OOM|



With Arrow 10.0.1, memory usage approximately quadruples when row count doubles above 256k rows. With Arrow 7.0.0 memory usage is more linear but then quadruples from 1024k to 2048k rows.



PyArrow 8.0.0 shows similar memory usage to 10.0.1 so it looks like something changed between 7.0.0 and 8.0.0.|

Environment: Python 3.10.8 on Fedora Linux 36. AMD Ryzen 9 5900 X with 64 GB RAM
Reporter: Adam Reeve / @adamreeve
Assignee: Will Jones / @wjones127

Original Issue Attachments:

PRs and other links:

Note: This issue was originally created as ARROW-18400. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Antoine Pitrou / @pitrou:
@AlenkaF @milesgranger This seems like something we'd like to fix.

@asfimport
Copy link
Author

Alenka Frim / @AlenkaF:
I think it is Parquet specific. There seems to be an issue if data is read from .parquet format and then converted to pandas which doesn't happen if converting to a pandas dataframe from a pyarrow table. This is the code I am working with at the moment:

import numpy as np
import random
import string

_characters = string.ascii_uppercase + string.digits + string.punctuation

def make_random_string(N=10):
    return ''.join(random.choice(_characters) for _ in range(N))

nrows = 1_024_000
filename = 'nested_pandas.parquet'

arr_len = 10
nested_col = []
for i in range(nrows):
    nested_col.append(np.array(
            [{
                'a': None if i % 1000 == 0 else np.random.choice(10000, size=3).astype(np.int64),
                'b': None if i % 100 == 0 else random.choice(range(100)),
                'c': None if i % 10 == 0 else make_random_string(5)
            } for i in range(arr_len)]
        ))

import pyarrow as pa
import pyarrow.parquet as pq
table = pa.table({'c1': nested_col})

# Works correctly
table.to_pandas()
#                                                         c1
# 0        [{'a': None, 'b': None, 'c': None}, {'a': [399...
# 1        [{'a': None, 'b': None, 'c': None}, {'a': [832...
# 2        [{'a': None, 'b': None, 'c': None}, {'a': [731...
# 3        [{'a': None, 'b': None, 'c': None}, {'a': [589...
# 4        [{'a': None, 'b': None, 'c': None}, {'a': [159...
# ...                                                    ...
# 1023995  [{'a': None, 'b': None, 'c': None}, {'a': [922...
# 1023996  [{'a': None, 'b': None, 'c': None}, {'a': [865...
# 1023997  [{'a': None, 'b': None, 'c': None}, {'a': [222...
# 1023998  [{'a': None, 'b': None, 'c': None}, {'a': [143...
# 1023999  [{'a': None, 'b': None, 'c': None}, {'a': [287...

# [1024000 rows x 1 columns]

# Writing to .parquet and loading it into arrow again
pq.write_table(table, filename)
table_from_parquet = pq.read_table(filename)

# Kill - converting to pandas
table_from_parquet.to_pandas()
print(tracemalloc.get_traced_memory())
# zsh: killed     python memory_usage.py

I still have to look into what is causing it but there has to be some extra information being passed from parquet to arrow and then to pandas that is triggering this. Will research further next week.

 

@asfimport
Copy link
Author

Weston Pace / @westonpace:
It might be a good idea to test if the memory usage still happens with use_legacy_dataset=True. These version numbers might line up to when we switched that default.

@asfimport
Copy link
Author

Alenka Frim / @AlenkaF:
You are correct @westonpace

...

import tracemalloc
tracemalloc.start()

import pyarrow as pa
import pyarrow.parquet as pq

table = pa.table({'c1': nested_col})
pq.write_table(table, filename)
table_from_parquet = pq.read_table(filename, use_legacy_dataset=True)
table_from_parquet.to_pandas()
#                                                         c1
# 0        [{'a': None, 'b': None, 'c': None}, {'a': [248...
# 1        [{'a': None, 'b': None, 'c': None}, {'a': [626...
# 2        [{'a': None, 'b': None, 'c': None}, {'a': [148...
# 3        [{'a': None, 'b': None, 'c': None}, {'a': [399...
# 4        [{'a': None, 'b': None, 'c': None}, {'a': [253...
# ...                                                    ...
# 1023995  [{'a': None, 'b': None, 'c': None}, {'a': [779...
# 1023996  [{'a': None, 'b': None, 'c': None}, {'a': [309...
# 1023997  [{'a': None, 'b': None, 'c': None}, {'a': [376...
# 1023998  [{'a': None, 'b': None, 'c': None}, {'a': [391...
# 1023999  [{'a': None, 'b': None, 'c': None}, {'a': [815...

# [1024000 rows x 1 columns]

print(tracemalloc.get_traced_memory()) # in bytes
# (23087183, 4328216706)

tracemalloc.stop()

Weston, do you think this is something on the C++ side of the Dataset API or should I look at the Python/Cython implementation?

@adamreeve could you also try same memory profiling as you did with adding use_legacy_dataset=True to {}pq.read_table{}?

@asfimport
Copy link
Author

Adam Reeve / @adamreeve:
Hi @AlenkaF, I tested switching to use_legacy_dataset=True with PyArrow 10.0.1 and the memory usage is mostly equivalent to version 7.0.0 up until 1,024,000 rows. But then memory usage only increases linearly beyond this so performs much better than 7.0.0. Reading the 2,048,000 row file uses 10,522 MB and reading the 4,096,000 row file uses 20,925 MB rather than being killed for using too much memory.

@asfimport
Copy link
Author

Anja Boskovic / @anjakefala:
The [generic dataset API|#format-columnarhtml] currently supports Parquet, Arrow IPC, and CSV. I thought Arrow IPC made a natural comparison point to see if this issue was Parquet-specific.

I found that the quadratic memory leap happens for larger files for Arrow IPC than it does for Parquet, but it still happens.

 

|Num rows|Peak Memory Usage Parquet (MB)|Peak Memory Usage Arrow IPC (MB)|
|
|-|-|-|-|
|32,000|129|129|
|
|64,000|258|258|
|
|128,000|516|516|
|
|256,000|2,036|1,033|
|
|512,000|14,300|2,065|
|
|1,024,000|OOM (on my machine)|14,200|
|
|2,048,000| |OOM (on my machine)|



Here is the code that I used to run these tests. I am using the generic dataset API directly.



 

java<br> <br>import pyarrow.dataset as ds <br>import tracemalloc                                                                                                                                                                   <br>filename = "nested"                                                                       <br>form = "feather"                                                                                                                                                                     <br>dataset = ds.dataset(f"{filename}.{form}", format=f"{form}")                               <br>table = dataset.to_table()                                                       <br>tracemalloc.start()                                                                        <br>df = table.to_pandas()                                                                     <br>stats = tracemalloc.take_snapshot().statistics("lineno")[:10]                              <br>for stat in stats:                                                                         <br>    print(stat)                                                                            <br>tracemalloc.stop()      <br>

My conclusion is that this is a problem that affects the new dataset API, in general. Though, it is potentially interesting that it begins to happen for higher row numbers for Arrow IPC.|

@asfimport
Copy link
Author

Will Jones / @wjones127:
Under the hood, pyarrow.parquet.read_table is using Dataset. Has anyone looked at the effect of changing batch_readahead and {}fragment_readahead{}? (They can be passed as kwargs to {}.to_table(){}) (docs)

@asfimport
Copy link
Author

Anja Boskovic / @anjakefala:
Ooo, I will try those options out! Thanks!

@asfimport
Copy link
Author

Anja Boskovic / @anjakefala:
Passing lower batch_readahead and fragment_readahead for parquet files of 256,000 rows and 512,000 rows did not have an effect!

@asfimport
Copy link
Author

Alenka Frim / @AlenkaF:
Joris advised me to compare pa.Table in memory if read from a file with legacy or with new dataset implementation. Doing that I found that the issue is coming from the number of chunks and concatenating on the pandas side (as Joris suggested).

Old legacy implementation reads the data into a table with one chunk in comparison to the new dataset implementation which reads the data into a table with multiple chunks. When converting to pandas the chunks have to be concatenated and that is causing high memory usage.

The solution would be to concatenate a table before transforming it into a pandas dataframe:

table_legacy = pq.read_table(filename, use_legacy_dataset=True)
table_dataset = pq.read_table(filename)

col_legacy = table_legacy[0]
col_legacy.num_chunks
# 1

col_dataset = table_dataset[0]
col_dataset.num_chunks
# 8

table_dataset.combine_chunks().to_pandas()
#                                                         c1
# 0        [{'a': None, 'b': None, 'c': None}, {'a': [248...
# 1        [{'a': None, 'b': None, 'c': None}, {'a': [626...
# 2        [{'a': None, 'b': None, 'c': None}, {'a': [148...
# 3        [{'a': None, 'b': None, 'c': None}, {'a': [399...
# 4        [{'a': None, 'b': None, 'c': None}, {'a': [253...
# ...                                                    ...
# 1023995  [{'a': None, 'b': None, 'c': None}, {'a': [779...
# 1023996  [{'a': None, 'b': None, 'c': None}, {'a': [309...
# 1023997  [{'a': None, 'b': None, 'c': None}, {'a': [376...
# 1023998  [{'a': None, 'b': None, 'c': None}, {'a': [391...
# 1023999  [{'a': None, 'b': None, 'c': None}, {'a': [815...

# [1024000 rows x 1 columns]

table_dataset.to_pandas()
# zsh: killed     python

I plan to make a PR in the beginning of next week.

@asfimport
Copy link
Author

Joris Van den Bossche / @jorisvandenbossche:
While combining the chunks before converting to pandas is a useful workaround, that still seems to point to a bug or inefficiency in the pyarrow.Table->pandas.DataFrame conversion for nested columns.

The main conversion logic for this lives in arrow_to_pandas.cc (ConvertStruct and ConvertListLike). For structs, it seems to iterate over the chunks of the ChunkedArray, and in that loop, iterate over the fields to convert each field array to a numpy array and then create python dictionaries combining those fields. And those dictionaries are inserted into the numpy object dtype array for the full column (allocated in advance, for all chunks).
So from that quick look, I don't directly see why combining the chunks in advance would help / why having multiple chunks results in a higher memory usage.

@asfimport
Copy link
Author

Alenka Frim / @AlenkaF:
I did some more testing with new suggestions from Joris and I have found that:

  • the issue of high memory usage only happens with parquet format and chunked table.

  • If I write the same table to a feather file format and read it with feather or dataset API, the table is also constructed as chunked but gets converted to pandas normally.

  • If I read a parquet file with parquet API using the new dataset API or with dataset API directly, I get a segfault.

  • The memory that the tables are holding is equal for all three parquet versions of reading the file (parquet API new and legacy, dataset API) though reading the docstring for pyarrow.Table.get_total_buffer_size it states:

    If a buffer is referenced multiple times then it will only be counted once.Could the table be referencing a buffer multiple times and can that cause an issue when converting it to pandas? If that is a case, it would explain why chunking solves the issue as it creates a copy of the table.

    I am trying to attach a file I am experimenting with, hope Jira will play nice =)

@asfimport
Copy link
Author

@asfimport
Copy link
Author

Joris Van den Bossche / @jorisvandenbossche:
Using Alenka's script, I explored it a bit further, and noticed some things: when reading the parquet file using the dataset reader, it is using more memory to convert to pandas, but based on memray profiler, it didn't seem to take any other code path, all the code paths just allocate twice the memory (twice in my case, but for large dataset it might be x4 or x8 etc). So there needed to be something different about the table created from reading the parquet file with the legacy API vs dataset API. And it seems that with the dataset API, it is returning multiple chunks, but each of those chunks is actually a slice of a single buffer. And in the conversion layer, there is something not taking into account this offset into the buffer/.

Illustrating this, reading the parquet file in two ways (I have been using nrows = 1_024_000 // 4, so the file is a bit smaller, less chunks):

import pyarrow.parquet as pq
import pyarrow.dataset as ds

table1 = pq.read_table("memory_testing.parquet", use_legacy_dataset=True)
dataset = ds.dataset("memory_testing.parquet", format="parquet")
table2 = dataset.to_table()

Table 1 has a single chunk, while table 2 (from reading with dataset API) has two chunks:

>>> table1["c1"].num_chunks
1
>>> table2["c1"].num_chunks
2

Taking the first chunk of each of those, and then looking at those arrays:

arr1 = table1["c1"].chunk(0)
arr2 = table2["c1"].chunk(0)

>>> len(arr1)
256000
>>> len(arr2)  # around half the number of rows (since there are two chunks in this table)
131072
>>> arr1.get_total_buffer_size()
110624012
>>> arr2.get_total_buffer_size()  # but still using the same total memory!
110624012

So the smaller chunk of table2 is not using less memory. That is because the two chunks of table2 are actually each a slice into the same underlying buffers:

>>> table2["c1"].chunk(0).buffers()[1]
<pyarrow.Buffer address=0x7fc5cc907340 size=1024004 is_cpu=True is_mutable=True>
>>> table2["c1"].chunk(1).buffers()[1]  # second chunk points to same memory address and has same size as first chunk
<pyarrow.Buffer address=0x7fc5cc907340 size=1024004 is_cpu=True is_mutable=True>
>>> table2["c1"].chunk(1).offset  # and the second chunk has an offset to account for that
131072

And somehow the conversion code for ListArray to numpy (which creates a numpy array of numpy arrays, by first creating one numpy array of the flat values, and then creating slices into that flat array) doesn't seem to take into account this offset, and ends up converting the full parent buffer twice (in my case twice, because of having 2 chunks, but this can grow quadratically).

The reason this happens for parquet and not for feather in this case, is because the Parquet file actually consists of a single row group (and I assume the dataset API will therefore still read that in one go, and then slice output batches from that to return the expected batch size in the dataset API), while the feather file already consists of multiple batches on disk (and thus doesn't result in sliced batches in memory).

@asfimport
Copy link
Author

Joris Van den Bossche / @jorisvandenbossche:
A small reproducible example to illustrate my explanation above:

# creating a chunked list array that consists of two chunks that are both slices into the same parent array
arr = pa.array([[1, 2], [3, 4, 5], [6], [7, 8]])
chunked_arr = pa.chunked_array([arr.slice(0, 2), arr.slice(2, 2)])

# converting this chunked array to numpy
np_arr = chunked_arr.to_numpy()

# the list array gets converted to a numpy array of numpy arrays. Each element (the nested numpy array) is
# a slice of a numpy array of the flat values. We can get this parent flat numpy array through the .base property
>>> np_arr[0].base
array([[1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8]])

# the flat values are included twice. Comparing to the correct behaviour with original non-chunked array:
>>> arr.to_numpy(zero_copy_only=False)[0].base
array([[1, 2, 3, 4, 5, 6, 7, 8]])

@asfimport
Copy link
Author

Weston Pace / @westonpace:

The reason this happens for parquet and not for feather in this case, is because the Parquet file actually consists of a single row group (and I assume the dataset API will therefore still read that in one go, and then slice output batches from that to return the expected batch size in the dataset API), while the feather file already consists of multiple batches on disk (and thus doesn't result in sliced batches in memory).

Yes. The dataset API processes input in fairly small batches (32ki rows). Partly because this is cache friendly but also because some of the hash-join code uses 16-bit signed integers for row indices. The scanner does not support partial reading of row groups from parquet files (I would very much like to support this someday) and so it reads the entire row group in one chunk. Then it slices that chunk.

It sounds like this numpy conversion bug should be fixed regardless.

I wonder if we also want to someday support better output batch size controls as well. I'll create an issue for it.

@asfimport
Copy link
Author

Will Jones / @wjones127:
Took a look at the issue in Joris' last repro. Is seems to stem from the fact that the ListArray.values() method in C++ doesn't account for slices. I think if it did, the numpy conversion issue would be solved. Created a repro in a PR here: #15210

Do you agree with that assessment?

@asfimport
Copy link
Author

Joris Van den Bossche / @jorisvandenbossche:
Yes, I think it has to use Flatten() instead of values(), that is the proper method to get 'flat' values taking into account offset etc (i.e. that not just returns the underlying memory).

 

The .values() is really an easy way to shoot yourself in the foot .. I remember similar issues related to pyarrow as we expose those two methods on pyarrow.ListArray as well.

@asfimport asfimport added this to the 11.0.0 milestone Jan 11, 2023
@raulcd
Copy link
Member

raulcd commented Jan 11, 2023

@wjones127 should this be a blocker for the release?

@raulcd raulcd removed this from the 11.0.0 milestone Jan 16, 2023
@assignUser assignUser added this to the 11.0.0 milestone Jan 17, 2023
@assignUser assignUser added the Priority: Blocker Marks a blocker for the release label Jan 17, 2023
@assignUser
Copy link
Member

I added the blocker label to keep track of this issue as most other blockers are now closed and inclusion of this seemed potentially important imo.

assignUser pushed a commit that referenced this issue Jan 17, 2023
…set (#15210)

* Closes: #20512

Lead-authored-by: Will Jones <willjones127@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Jacob Wujciak-Jens <jacob@wujciak.de>
raulcd pushed a commit that referenced this issue Jan 18, 2023
…set (#15210)

* Closes: #20512

Lead-authored-by: Will Jones <willjones127@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Jacob Wujciak-Jens <jacob@wujciak.de>
@bielanm
Copy link

bielanm commented Jun 27, 2023

As I understand this issue was completely finished, but I couldn't find it in the release notes for a few past versions. Am I missing something? Does anybody have an estimate of when it's going to be released? Thanks a lot!

@jorisvandenbossche
Copy link
Member

This was included in the pyarrow 11.0.0 release, which was already released in January of this year. It was actually included in the release notes, but you might need to know the details about the fix to find it .. Quoting from https://arrow.apache.org/blog/2023/01/25/11.0.0-release/ in the Python bug fixes section:

Numpy conversion for ListArray is improved taking into account sliced offset, avoiding increased memory usage (GH-20512)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants