Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchCursor refactorings #1246

Merged
merged 5 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .evergreen/run-load-balancer-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ echo $second
-Dorg.mongodb.test.uri=${SINGLE_MONGOS_LB_URI} \
-Dorg.mongodb.test.multi.mongos.uri=${MULTI_MONGOS_LB_URI} \
${GRADLE_EXTRA_VARS} --stacktrace --info --continue driver-core:test \
--tests QueryBatchCursorFunctionalSpecification
--tests CommandBatchCursorFunctionalTest \
--tests AsyncCommandBatchCursorFunctionalTest
third=$?
echo $third

Expand Down
6 changes: 0 additions & 6 deletions config/codenarc/codenarc.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@
<exclude name="ComparisonWithSelf"/>
</ruleset-ref>
<ruleset-ref path='rulesets/braces.xml'/>
<ruleset-ref path='rulesets/concurrency.xml'>
<rule-config name='BusyWait'>
<property name='doNotApplyToFileNames' value='AsyncQueryBatchCursorFunctionalSpecification.groovy'/>
</rule-config>

</ruleset-ref>
<ruleset-ref path='rulesets/convention.xml'>
<rule-config name='NoDef'>
<property name='doNotApplyToFilesMatching' value='.*Specification.groovy'/>
Expand Down
14 changes: 14 additions & 0 deletions driver-core/src/main/com/mongodb/assertions/Assertions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.mongodb.lang.Nullable;

import java.util.Collection;
import java.util.function.Supplier;

/**
* <p>Design by contract assertions.</p> <p>This class is not part of the public API and may be removed or changed at any time.</p>
Expand Down Expand Up @@ -226,6 +227,19 @@ public static AssertionError fail(final String msg) throws AssertionError {
throw new AssertionError(assertNotNull(msg));
}

/**
* @param supplier the supplier to check
* @return {@code supplier.get()}
* @throws AssertionError If {@code supplier.get()} throws an exception
*/
public static <T> T doesNotThrow(final Supplier<T> supplier) throws AssertionError {
try {
return supplier.get();
} catch (Exception e) {
throw new AssertionError(e.getMessage(), e);
}
}

private Assertions() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.mongodb.internal.async;

import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;

Expand All @@ -25,8 +26,10 @@
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncAggregateResponseBatchCursor<T> extends AsyncBatchCursor<T> {
@Nullable
BsonDocument getPostBatchResumeToken();

@Nullable
BsonTimestamp getOperationTime();

boolean isFirstBatchEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb.internal.async;

import com.mongodb.internal.operation.BatchCursor;

import java.io.Closeable;
import java.util.List;

Expand All @@ -28,9 +30,9 @@
*/
public interface AsyncBatchCursor<T> extends Closeable {
/**
* Returns the next batch of results. A tailable cursor will block until another batch exists. After the last batch, the next call
* to this method will execute the callback with a null result to indicate that there are no more batches available and the cursor
* has been closed.
* Returns the next batch of results. A tailable cursor will block until another batch exists.
* Unlike the {@link BatchCursor} this method will automatically mark the cursor as closed when there are no more expected results.
* Care should be taken to check {@link #isClosed()} between calls.
*
* @param callback callback to receive the next batch of results
* @throws java.util.NoSuchElementException if no next batch exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@

import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.client.model.AggregationLevel;
import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
Expand All @@ -40,15 +38,13 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
Expand Down Expand Up @@ -239,25 +235,16 @@ BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVe
return commandDocument;
}

private QueryResult<T> createQueryResult(final BsonDocument result, final ConnectionDescription description) {
assertNotNull(result);
return cursorDocumentToQueryResult(result.getDocument(CURSOR), description.getServerAddress());
}

private CommandReadTransformer<BsonDocument, QueryBatchCursor<T>> transformer() {
return (result, source, connection) -> {
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
return new QueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, comment,
source, connection, result);
};
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
return (result, source, connection) ->
new CommandBatchCursor<>(result, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection);
}

private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
return (result, source, connection) -> {
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
return new AsyncQueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection, result);
};
return (result, source, connection) ->
new AsyncCommandBatchCursor<>(result, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection);
}

interface AggregateTarget {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.operation;

import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;

Expand All @@ -27,8 +28,10 @@
*/
@NotThreadSafe
public interface AggregateResponseBatchCursor<T> extends BatchCursor<T> {
@Nullable
BsonDocument getPostBatchResumeToken();

@Nullable
BsonTimestamp getOperationTime();

boolean isFirstBatchEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.mongodb.MongoException;
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.lang.NonNull;
Expand Down Expand Up @@ -50,11 +51,11 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
* {@code wrapped} containing {@code null} and {@link #isClosed} being {@code false}.
* This represents a situation in which the wrapped object was closed by {@code this} but {@code this} remained open.
*/
private final AtomicReference<AsyncAggregateResponseBatchCursor<RawBsonDocument>> wrapped;
private final AtomicReference<AsyncCommandBatchCursor<RawBsonDocument>> wrapped;
private final AtomicBoolean isClosed;

AsyncChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
final AsyncAggregateResponseBatchCursor<RawBsonDocument> wrapped,
final AsyncCommandBatchCursor<RawBsonDocument> wrapped,
final AsyncReadBinding binding,
@Nullable final BsonDocument resumeToken,
final int maxWireVersion) {
Expand All @@ -68,13 +69,13 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
}

@NonNull
AsyncAggregateResponseBatchCursor<RawBsonDocument> getWrapped() {
AsyncCommandBatchCursor<RawBsonDocument> getWrapped() {
return assertNotNull(wrapped.get());
}

@Override
public void next(final SingleResultCallback<List<T>> callback) {
resumeableOperation((cursor, callback1) -> cursor.next(callback1), callback, false);
resumeableOperation(AsyncBatchCursor::next, callback, false);
}

@Override
Expand Down Expand Up @@ -129,15 +130,15 @@ private void nullifyAndCloseWrapped() {

/**
* This method guarantees that the {@code newValue} argument is closed even if
* {@link #setWrappedOrCloseIt(AsyncAggregateResponseBatchCursor)} is called concurrently with or after (in the happens-before order)
* {@code setWrappedOrCloseIt(AsyncCommandBatchCursor)} is called concurrently with or after (in the happens-before order)
* the method {@link #close()}.
*/
private void setWrappedOrCloseIt(final AsyncAggregateResponseBatchCursor<RawBsonDocument> newValue) {
private void setWrappedOrCloseIt(final AsyncCommandBatchCursor<RawBsonDocument> newValue) {
if (isClosed()) {
assertNull(this.wrapped.get());
assertNull(wrapped.get());
newValue.close();
} else {
assertNull(this.wrapped.getAndSet(newValue));
assertNull(wrapped.getAndSet(newValue));
if (isClosed()) {
nullifyAndCloseWrapped();
}
Expand All @@ -164,8 +165,8 @@ public int getMaxWireVersion() {
return maxWireVersion;
}

private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
BsonDocument resumeToken = queryBatchCursor.getPostBatchResumeToken();
private void cachePostBatchResumeToken(final AsyncCommandBatchCursor<RawBsonDocument> cursor) {
BsonDocument resumeToken = cursor.getPostBatchResumeToken();
if (resumeToken != null) {
this.resumeToken = resumeToken;
}
Expand All @@ -182,13 +183,13 @@ private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResult
tryNext ? "tryNext()" : "next()")));
return;
}
AsyncAggregateResponseBatchCursor<RawBsonDocument> wrappedCursor = getWrapped();
AsyncCommandBatchCursor<RawBsonDocument> wrappedCursor = getWrapped();
asyncBlock.apply(wrappedCursor, (result, t) -> {
if (t == null) {
try {
List<T> convertedResults;
try {
convertedResults = convertAndProduceLastId(result, changeStreamOperation.getDecoder(),
convertedResults = convertAndProduceLastId(assertNotNull(result), changeStreamOperation.getDecoder(),
lastId -> resumeToken = lastId);
} finally {
cachePostBatchResumeToken(wrappedCursor);
Expand All @@ -215,14 +216,15 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb
if (t != null) {
callback.onResult(null, t);
} else {
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion());
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken,
assertNotNull(source).getServerDescription().getMaxWireVersion());
source.release();
changeStreamOperation.executeAsync(binding, (result, t1) -> {
if (t1 != null) {
callback.onResult(null, t1);
} else {
try {
setWrappedOrCloseIt(((AsyncChangeStreamBatchCursor<T>) result).getWrapped());
setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor<T>) result).getWrapped());
} finally {
try {
binding.release(); // release the new change stream batch cursor's reference to the binding
Expand Down