Skip to content

Commit

Permalink
Use zero page "holes" to optimize sparse byte array usage (#108709)
Browse files Browse the repository at this point in the history
Add the notion of a "zero page" or "hole" to big arrays. We have some use cases where we run up byte arrays of hundreds of MB that are extremely sparse.
Each page starts out as a "hole" and only gets replaced by a real page from the pool on write similar to how FS holes work.
This change adds a small amount of overhead to the write side but is performance neutral or better on the read side (for sparse arrays we likely get a big improvement from using less CPU cache).

The only change outside of the array itself this needed was in CCR, see inline comments for that.
  • Loading branch information
original-brownbear committed May 17, 2024
1 parent befb6ff commit 8ff8eff
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ protected <T> T registerNewPage(Recycler.V<T> v, int page, int expectedSize) {

protected final void releasePage(int page) {
if (recycler != null) {
assert cache[page] != null;
cache[page].close();
cache[page] = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,31 @@

import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.recycler.Recycler;

import java.io.IOException;
import java.util.Arrays;

abstract class AbstractBigByteArray extends AbstractBigArray {

protected static final byte[] ZERO_PAGE = new byte[PageCacheRecycler.BYTE_PAGE_SIZE];

protected byte[][] pages;

protected AbstractBigByteArray(int pageSize, BigArrays bigArrays, boolean clearOnResize, long size) {
super(pageSize, bigArrays, clearOnResize);
this.size = size;
pages = new byte[numPages(size)][];
for (int i = 0; i < pages.length; ++i) {
pages[i] = newBytePage(i);
Arrays.fill(pages, ZERO_PAGE);
assert assertZeroPageClean();
}

private static boolean assertZeroPageClean() {
for (byte b : ZERO_PAGE) {
assert b == 0 : b;
}
return true;
}

/** Change the size of this array. Content between indexes <code>0</code> and <code>min(size(), newSize)</code> will be preserved. */
Expand All @@ -35,16 +45,17 @@ public void resize(long newSize) {
pages = Arrays.copyOf(pages, ArrayUtil.oversize(numPages, RamUsageEstimator.NUM_BYTES_OBJECT_REF));
}
for (int i = numPages - 1; i >= 0 && pages[i] == null; --i) {
pages[i] = newBytePage(i);
pages[i] = ZERO_PAGE;
}
for (int i = numPages; i < pages.length && pages[i] != null; ++i) {
assert pages[i] != ZERO_PAGE;
pages[i] = null;
releasePage(i);
}
this.size = newSize;
}

protected final byte[] newBytePage(int page) {
private byte[] newBytePage(int page) {
if (recycler != null) {
final Recycler.V<byte[]> v = recycler.bytePage(clearOnResize);
return registerNewPage(v, page, PageCacheRecycler.BYTE_PAGE_SIZE);
Expand All @@ -68,22 +79,40 @@ protected static void fillBySelfCopy(byte[] page, int fromBytes, int toBytes, in
/**
* Bulk copies array to paged array
*/
protected void set(long index, byte[] buf, int offset, int len, byte[][] pages, int shift) {
protected void set(long index, byte[] buf, int offset, int len, int shift) {
assert index + len <= size();
int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
if (indexInPage + len <= pageSize()) {
System.arraycopy(buf, offset << shift, pages[pageIndex], indexInPage << shift, len << shift);
System.arraycopy(buf, offset << shift, getPageForWriting(pageIndex), indexInPage << shift, len << shift);
} else {
int copyLen = pageSize() - indexInPage;
System.arraycopy(buf, offset << shift, pages[pageIndex], indexInPage, copyLen << shift);
System.arraycopy(buf, offset << shift, getPageForWriting(pageIndex), indexInPage, copyLen << shift);
do {
++pageIndex;
offset += copyLen;
len -= copyLen;
copyLen = Math.min(len, pageSize());
System.arraycopy(buf, offset << shift, pages[pageIndex], 0, copyLen << shift);
System.arraycopy(buf, offset << shift, getPageForWriting(pageIndex), 0, copyLen << shift);
} while (len > copyLen);
}
}

protected byte[] getPageForWriting(int pageIndex) {
byte[] foundPage = pages[pageIndex];
if (foundPage == ZERO_PAGE) {
foundPage = newBytePage(pageIndex);
pages[pageIndex] = foundPage;
}
return foundPage;
}

protected void readPages(StreamInput in) throws IOException {
int remainedBytes = in.readVInt();
for (int i = 0; i < pages.length && remainedBytes > 0; i++) {
int len = Math.min(remainedBytes, pages[0].length);
in.readBytes(getPageForWriting(i), 0, len);
remainedBytes -= len;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Streams;
import org.elasticsearch.indices.breaker.CircuitBreakerService;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;

import static org.elasticsearch.common.util.BigDoubleArray.VH_PLATFORM_NATIVE_DOUBLE;
Expand Down Expand Up @@ -162,8 +164,8 @@ public BytesRef next() {
}

@Override
public void fillWith(StreamInput in) throws IOException {
in.readBytes(array, 0, Math.toIntExact(size()));
public void fillWith(InputStream in) throws IOException {
Streams.readFully(in, array, 0, Math.toIntExact(size()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Streams;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;

import static org.elasticsearch.common.util.BigLongArray.writePages;
Expand Down Expand Up @@ -49,7 +50,7 @@ public byte get(long index) {
public byte set(long index, byte value) {
final int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
final byte[] page = pages[pageIndex];
final byte[] page = getPageForWriting(pageIndex);
final byte ret = page[indexInPage];
page[indexInPage] = value;
return ret;
Expand Down Expand Up @@ -90,16 +91,16 @@ public void set(long index, byte[] buf, int offset, int len) {
int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
if (indexInPage + len <= pageSize()) {
System.arraycopy(buf, offset, pages[pageIndex], indexInPage, len);
System.arraycopy(buf, offset, getPageForWriting(pageIndex), indexInPage, len);
} else {
int copyLen = pageSize() - indexInPage;
System.arraycopy(buf, offset, pages[pageIndex], indexInPage, copyLen);
System.arraycopy(buf, offset, getPageForWriting(pageIndex), indexInPage, copyLen);
do {
++pageIndex;
offset += copyLen;
len -= copyLen;
copyLen = Math.min(len, pageSize());
System.arraycopy(buf, offset, pages[pageIndex], 0, copyLen);
System.arraycopy(buf, offset, getPageForWriting(pageIndex), 0, copyLen);
} while (len > copyLen);
}
}
Expand All @@ -112,13 +113,13 @@ public void fill(long fromIndex, long toIndex, byte value) {
final int fromPage = pageIndex(fromIndex);
final int toPage = pageIndex(toIndex - 1);
if (fromPage == toPage) {
Arrays.fill(pages[fromPage], indexInPage(fromIndex), indexInPage(toIndex - 1) + 1, value);
Arrays.fill(getPageForWriting(fromPage), indexInPage(fromIndex), indexInPage(toIndex - 1) + 1, value);
} else {
Arrays.fill(pages[fromPage], indexInPage(fromIndex), pages[fromPage].length, value);
Arrays.fill(getPageForWriting(fromPage), indexInPage(fromIndex), pages[fromPage].length, value);
for (int i = fromPage + 1; i < toPage; ++i) {
Arrays.fill(pages[i], value);
Arrays.fill(getPageForWriting(i), value);
}
Arrays.fill(pages[toPage], 0, indexInPage(toIndex - 1) + 1, value);
Arrays.fill(getPageForWriting(toPage), 0, indexInPage(toIndex - 1) + 1, value);
}
}

Expand Down Expand Up @@ -153,11 +154,11 @@ public BytesRef next() {
}

@Override
public void fillWith(StreamInput in) throws IOException {
public void fillWith(InputStream in) throws IOException {
for (int i = 0; i < pages.length - 1; i++) {
in.readBytes(pages[i], 0, PAGE_SIZE_IN_BYTES);
Streams.readFully(in, getPageForWriting(i), 0, PAGE_SIZE_IN_BYTES);
}
in.readBytes(pages[pages.length - 1], 0, Math.toIntExact(size - (pages.length - 1L) * PAGE_SIZE_IN_BYTES));
Streams.readFully(in, getPageForWriting(pages.length - 1), 0, Math.toIntExact(size - (pages.length - 1L) * PAGE_SIZE_IN_BYTES));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.lang.invoke.VarHandle;
import java.nio.ByteOrder;

import static org.elasticsearch.common.util.BigLongArray.readPages;
import static org.elasticsearch.common.util.BigLongArray.writePages;
import static org.elasticsearch.common.util.PageCacheRecycler.DOUBLE_PAGE_SIZE;

Expand Down Expand Up @@ -46,7 +45,7 @@ public double get(long index) {
public double set(long index, double value) {
final int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
final byte[] page = pages[pageIndex];
final byte[] page = getPageForWriting(pageIndex);
final double ret = (double) VH_PLATFORM_NATIVE_DOUBLE.get(page, indexInPage << 3);
VH_PLATFORM_NATIVE_DOUBLE.set(page, indexInPage << 3, value);
return ret;
Expand All @@ -56,7 +55,7 @@ public double set(long index, double value) {
public double increment(long index, double inc) {
final int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
final byte[] page = pages[pageIndex];
final byte[] page = getPageForWriting(pageIndex);
final double newVal = (double) VH_PLATFORM_NATIVE_DOUBLE.get(page, indexInPage << 3) + inc;
VH_PLATFORM_NATIVE_DOUBLE.set(page, indexInPage << 3, newVal);
return newVal;
Expand All @@ -75,13 +74,13 @@ public void fill(long fromIndex, long toIndex, double value) {
final int fromPage = pageIndex(fromIndex);
final int toPage = pageIndex(toIndex - 1);
if (fromPage == toPage) {
fill(pages[fromPage], indexInPage(fromIndex), indexInPage(toIndex - 1) + 1, value);
fill(getPageForWriting(fromPage), indexInPage(fromIndex), indexInPage(toIndex - 1) + 1, value);
} else {
fill(pages[fromPage], indexInPage(fromIndex), pageSize(), value);
fill(getPageForWriting(fromPage), indexInPage(fromIndex), pageSize(), value);
for (int i = fromPage + 1; i < toPage; ++i) {
fill(pages[i], 0, pageSize(), value);
fill(getPageForWriting(i), 0, pageSize(), value);
}
fill(pages[toPage], 0, indexInPage(toIndex - 1) + 1, value);
fill(getPageForWriting(toPage), 0, indexInPage(toIndex - 1) + 1, value);
}
}

Expand All @@ -94,7 +93,7 @@ public static void fill(byte[] page, int from, int to, double value) {

@Override
public void fillWith(StreamInput in) throws IOException {
readPages(in, pages);
readPages(in);
}

/** Estimates the number of bytes that would be consumed by an array of the given size. */
Expand All @@ -104,7 +103,7 @@ public static long estimateRamBytes(final long size) {

@Override
public void set(long index, byte[] buf, int offset, int len) {
set(index, buf, offset, len, pages, 3);
set(index, buf, offset, len, 3);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ final class BigFloatArray extends AbstractBigByteArray implements FloatArray {
public float set(long index, float value) {
final int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
final byte[] page = pages[pageIndex];
final byte[] page = getPageForWriting(pageIndex);
final float ret = (float) VH_PLATFORM_NATIVE_FLOAT.get(page, indexInPage << 2);
VH_PLATFORM_NATIVE_FLOAT.set(page, indexInPage << 2, value);
return ret;
Expand All @@ -59,13 +59,13 @@ public void fill(long fromIndex, long toIndex, float value) {
final int fromPage = pageIndex(fromIndex);
final int toPage = pageIndex(toIndex - 1);
if (fromPage == toPage) {
fill(pages[fromPage], indexInPage(fromIndex), indexInPage(toIndex - 1) + 1, value);
fill(getPageForWriting(fromPage), indexInPage(fromIndex), indexInPage(toIndex - 1) + 1, value);
} else {
fill(pages[fromPage], indexInPage(fromIndex), pageSize(), value);
fill(getPageForWriting(fromPage), indexInPage(fromIndex), pageSize(), value);
for (int i = fromPage + 1; i < toPage; ++i) {
fill(pages[i], 0, pageSize(), value);
fill(getPageForWriting(i), 0, pageSize(), value);
}
fill(pages[toPage], 0, indexInPage(toIndex - 1) + 1, value);
fill(getPageForWriting(toPage), 0, indexInPage(toIndex - 1) + 1, value);
}
}

Expand All @@ -83,6 +83,6 @@ public static long estimateRamBytes(final long size) {

@Override
public void set(long index, byte[] buf, int offset, int len) {
set(index, buf, offset, len, pages, 2);
set(index, buf, offset, len, 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.lang.invoke.VarHandle;
import java.nio.ByteOrder;

import static org.elasticsearch.common.util.BigLongArray.readPages;
import static org.elasticsearch.common.util.BigLongArray.writePages;
import static org.elasticsearch.common.util.PageCacheRecycler.INT_PAGE_SIZE;

Expand Down Expand Up @@ -50,7 +49,7 @@ public int get(long index) {
public int set(long index, int value) {
final int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
final byte[] page = pages[pageIndex];
final byte[] page = getPageForWriting(pageIndex);
final int ret = (int) VH_PLATFORM_NATIVE_INT.get(page, indexInPage << 2);
VH_PLATFORM_NATIVE_INT.set(page, indexInPage << 2, value);
return ret;
Expand All @@ -60,7 +59,7 @@ public int set(long index, int value) {
public int increment(long index, int inc) {
final int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
final byte[] page = pages[pageIndex];
final byte[] page = getPageForWriting(pageIndex);
final int newVal = (int) VH_PLATFORM_NATIVE_INT.get(page, indexInPage << 2) + inc;
VH_PLATFORM_NATIVE_INT.set(page, indexInPage << 2, newVal);
return newVal;
Expand All @@ -74,19 +73,19 @@ public void fill(long fromIndex, long toIndex, int value) {
final int fromPage = pageIndex(fromIndex);
final int toPage = pageIndex(toIndex - 1);
if (fromPage == toPage) {
fill(pages[fromPage], indexInPage(fromIndex), indexInPage(toIndex - 1) + 1, value);
fill(getPageForWriting(fromPage), indexInPage(fromIndex), indexInPage(toIndex - 1) + 1, value);
} else {
fill(pages[fromPage], indexInPage(fromIndex), pageSize(), value);
fill(getPageForWriting(fromPage), indexInPage(fromIndex), pageSize(), value);
for (int i = fromPage + 1; i < toPage; ++i) {
fill(pages[i], 0, pageSize(), value);
fill(getPageForWriting(i), 0, pageSize(), value);
}
fill(pages[toPage], 0, indexInPage(toIndex - 1) + 1, value);
fill(getPageForWriting(toPage), 0, indexInPage(toIndex - 1) + 1, value);
}
}

@Override
public void fillWith(StreamInput in) throws IOException {
readPages(in, pages);
readPages(in);
}

public static void fill(byte[] page, int from, int to, int value) {
Expand All @@ -108,6 +107,6 @@ public static long estimateRamBytes(final long size) {

@Override
public void set(long index, byte[] buf, int offset, int len) {
set(index, buf, offset, len, pages, 2);
set(index, buf, offset, len, 2);
}
}

0 comments on commit 8ff8eff

Please sign in to comment.