Skip to content

Commit

Permalink
BatchCursor refactorings (#1246)
Browse files Browse the repository at this point in the history
- Added SingleBatchCursor
- QueryResult and QueryBatchCursor renaming
   - Renamed and moved QueryResult to CommandCursorResult
   - Renamed QueryBatchCursor to CommandBatchCursor
   - Renamed AsyncQueryBatchCursor to AsyncCommandBatchCursor
- Unified Async & Sync CommandBatchCursor testing
- Added a CursorResourceManager for both Async & Sync

JAVA-5159
  • Loading branch information
rozza committed Nov 9, 2023
1 parent 275dbc0 commit 540c612
Show file tree
Hide file tree
Showing 59 changed files with 3,502 additions and 3,151 deletions.
3 changes: 2 additions & 1 deletion .evergreen/run-load-balancer-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ echo $second
-Dorg.mongodb.test.uri=${SINGLE_MONGOS_LB_URI} \
-Dorg.mongodb.test.multi.mongos.uri=${MULTI_MONGOS_LB_URI} \
${GRADLE_EXTRA_VARS} --stacktrace --info --continue driver-core:test \
--tests QueryBatchCursorFunctionalSpecification
--tests CommandBatchCursorFunctionalTest \
--tests AsyncCommandBatchCursorFunctionalTest
third=$?
echo $third

Expand Down
6 changes: 0 additions & 6 deletions config/codenarc/codenarc.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@
<exclude name="ComparisonWithSelf"/>
</ruleset-ref>
<ruleset-ref path='rulesets/braces.xml'/>
<ruleset-ref path='rulesets/concurrency.xml'>
<rule-config name='BusyWait'>
<property name='doNotApplyToFileNames' value='AsyncQueryBatchCursorFunctionalSpecification.groovy'/>
</rule-config>

</ruleset-ref>
<ruleset-ref path='rulesets/convention.xml'>
<rule-config name='NoDef'>
<property name='doNotApplyToFilesMatching' value='.*Specification.groovy'/>
Expand Down
14 changes: 14 additions & 0 deletions driver-core/src/main/com/mongodb/assertions/Assertions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.mongodb.lang.Nullable;

import java.util.Collection;
import java.util.function.Supplier;

/**
* <p>Design by contract assertions.</p> <p>This class is not part of the public API and may be removed or changed at any time.</p>
Expand Down Expand Up @@ -226,6 +227,19 @@ public static AssertionError fail(final String msg) throws AssertionError {
throw new AssertionError(assertNotNull(msg));
}

/**
* @param supplier the supplier to check
* @return {@code supplier.get()}
* @throws AssertionError If {@code supplier.get()} throws an exception
*/
public static <T> T doesNotThrow(final Supplier<T> supplier) throws AssertionError {
try {
return supplier.get();
} catch (Exception e) {
throw new AssertionError(e.getMessage(), e);
}
}

private Assertions() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.mongodb.internal.async;

import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;

Expand All @@ -25,8 +26,10 @@
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncAggregateResponseBatchCursor<T> extends AsyncBatchCursor<T> {
@Nullable
BsonDocument getPostBatchResumeToken();

@Nullable
BsonTimestamp getOperationTime();

boolean isFirstBatchEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb.internal.async;

import com.mongodb.internal.operation.BatchCursor;

import java.io.Closeable;
import java.util.List;

Expand All @@ -28,9 +30,9 @@
*/
public interface AsyncBatchCursor<T> extends Closeable {
/**
* Returns the next batch of results. A tailable cursor will block until another batch exists. After the last batch, the next call
* to this method will execute the callback with a null result to indicate that there are no more batches available and the cursor
* has been closed.
* Returns the next batch of results. A tailable cursor will block until another batch exists.
* Unlike the {@link BatchCursor} this method will automatically mark the cursor as closed when there are no more expected results.
* Care should be taken to check {@link #isClosed()} between calls.
*
* @param callback callback to receive the next batch of results
* @throws java.util.NoSuchElementException if no next batch exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@

import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.client.model.AggregationLevel;
import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
Expand All @@ -40,15 +38,13 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
Expand Down Expand Up @@ -239,25 +235,16 @@ BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVe
return commandDocument;
}

private QueryResult<T> createQueryResult(final BsonDocument result, final ConnectionDescription description) {
assertNotNull(result);
return cursorDocumentToQueryResult(result.getDocument(CURSOR), description.getServerAddress());
}

private CommandReadTransformer<BsonDocument, QueryBatchCursor<T>> transformer() {
return (result, source, connection) -> {
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
return new QueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, comment,
source, connection, result);
};
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
return (result, source, connection) ->
new CommandBatchCursor<>(result, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection);
}

private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
return (result, source, connection) -> {
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
return new AsyncQueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection, result);
};
return (result, source, connection) ->
new AsyncCommandBatchCursor<>(result, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection);
}

interface AggregateTarget {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.operation;

import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;

Expand All @@ -27,8 +28,10 @@
*/
@NotThreadSafe
public interface AggregateResponseBatchCursor<T> extends BatchCursor<T> {
@Nullable
BsonDocument getPostBatchResumeToken();

@Nullable
BsonTimestamp getOperationTime();

boolean isFirstBatchEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.mongodb.MongoException;
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.lang.NonNull;
Expand Down Expand Up @@ -50,11 +51,11 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
* {@code wrapped} containing {@code null} and {@link #isClosed} being {@code false}.
* This represents a situation in which the wrapped object was closed by {@code this} but {@code this} remained open.
*/
private final AtomicReference<AsyncAggregateResponseBatchCursor<RawBsonDocument>> wrapped;
private final AtomicReference<AsyncCommandBatchCursor<RawBsonDocument>> wrapped;
private final AtomicBoolean isClosed;

AsyncChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
final AsyncAggregateResponseBatchCursor<RawBsonDocument> wrapped,
final AsyncCommandBatchCursor<RawBsonDocument> wrapped,
final AsyncReadBinding binding,
@Nullable final BsonDocument resumeToken,
final int maxWireVersion) {
Expand All @@ -68,13 +69,13 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
}

@NonNull
AsyncAggregateResponseBatchCursor<RawBsonDocument> getWrapped() {
AsyncCommandBatchCursor<RawBsonDocument> getWrapped() {
return assertNotNull(wrapped.get());
}

@Override
public void next(final SingleResultCallback<List<T>> callback) {
resumeableOperation((cursor, callback1) -> cursor.next(callback1), callback, false);
resumeableOperation(AsyncBatchCursor::next, callback, false);
}

@Override
Expand Down Expand Up @@ -129,15 +130,15 @@ private void nullifyAndCloseWrapped() {

/**
* This method guarantees that the {@code newValue} argument is closed even if
* {@link #setWrappedOrCloseIt(AsyncAggregateResponseBatchCursor)} is called concurrently with or after (in the happens-before order)
* {@code setWrappedOrCloseIt(AsyncCommandBatchCursor)} is called concurrently with or after (in the happens-before order)
* the method {@link #close()}.
*/
private void setWrappedOrCloseIt(final AsyncAggregateResponseBatchCursor<RawBsonDocument> newValue) {
private void setWrappedOrCloseIt(final AsyncCommandBatchCursor<RawBsonDocument> newValue) {
if (isClosed()) {
assertNull(this.wrapped.get());
assertNull(wrapped.get());
newValue.close();
} else {
assertNull(this.wrapped.getAndSet(newValue));
assertNull(wrapped.getAndSet(newValue));
if (isClosed()) {
nullifyAndCloseWrapped();
}
Expand All @@ -164,8 +165,8 @@ public int getMaxWireVersion() {
return maxWireVersion;
}

private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
BsonDocument resumeToken = queryBatchCursor.getPostBatchResumeToken();
private void cachePostBatchResumeToken(final AsyncCommandBatchCursor<RawBsonDocument> cursor) {
BsonDocument resumeToken = cursor.getPostBatchResumeToken();
if (resumeToken != null) {
this.resumeToken = resumeToken;
}
Expand All @@ -182,13 +183,13 @@ private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResult
tryNext ? "tryNext()" : "next()")));
return;
}
AsyncAggregateResponseBatchCursor<RawBsonDocument> wrappedCursor = getWrapped();
AsyncCommandBatchCursor<RawBsonDocument> wrappedCursor = getWrapped();
asyncBlock.apply(wrappedCursor, (result, t) -> {
if (t == null) {
try {
List<T> convertedResults;
try {
convertedResults = convertAndProduceLastId(result, changeStreamOperation.getDecoder(),
convertedResults = convertAndProduceLastId(assertNotNull(result), changeStreamOperation.getDecoder(),
lastId -> resumeToken = lastId);
} finally {
cachePostBatchResumeToken(wrappedCursor);
Expand All @@ -215,14 +216,15 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb
if (t != null) {
callback.onResult(null, t);
} else {
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion());
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken,
assertNotNull(source).getServerDescription().getMaxWireVersion());
source.release();
changeStreamOperation.executeAsync(binding, (result, t1) -> {
if (t1 != null) {
callback.onResult(null, t1);
} else {
try {
setWrappedOrCloseIt(((AsyncChangeStreamBatchCursor<T>) result).getWrapped());
setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor<T>) result).getWrapped());
} finally {
try {
binding.release(); // release the new change stream batch cursor's reference to the binding
Expand Down

0 comments on commit 540c612

Please sign in to comment.