-
Notifications
You must be signed in to change notification settings - Fork 778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Simplified RequestQueueV2 implementation #2775
Conversation
…nishes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 💪
I would do some testing myself, but the first what about some unit tests, did you consider,add some? There are none -> https://github.com/apify/crawlee/blob/03951bdba8fb34f6bed00d1b68240ff7cd0bacbf/test/core/storages/request_queue.test.ts
Honesly, we are dealing with various bugs during time and we do not have any tests for these features still.
The build did not finish, can you check @janbuchar ? |
I can, but only later this week - I have different stuff to finish first. |
@drobnikj the unit tests are now passing so you should be able to build. I'm still working on some e2e tests, if you have any ideas for scenarios to test (e2e, unit, doesn't matter), I'd love to hear those. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I did not find any issue, even during testing.
I have a few more comments, can you check pls? @janbuchar
@@ -361,7 +430,8 @@ export class RequestQueue extends RequestProvider { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot comment it below but during code review, I see that we are removing locks one by one in _clearPossibleLock
.
see
while ((requestId = this.queueHeadIds.removeFirst()) !== null) { |
There is 200 rps rate limit. I would remove lock in some batches maybe 10 to speed it up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean? I don't think there is a batch unlock endpoint. Launching those requests in parallel surely won't help against rate limiting, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean to unlock in some batches like
protected async _clearPossibleLocks() {
this.queuePausedForMigration = true;
let requestId: string | null;
const batchSize = 10;
const deleteRequests: Promise<void>[] = [];
// eslint-disable-next-line no-cond-assign
while ((requestId = this.queueHeadIds.removeFirst()) !== null) {
deleteRequests.push(
this.client.deleteRequestLock(requestId).catch(() => {
// We don't have the lock, or the request was never locked. Either way it's fine
})
);
if (deleteRequests.length >= batchSize) {
// Process the batch of 10
await Promise.all(deleteRequests);
deleteRequests.length = 0; // Reset the array for the next batch
}
}
// Process any remaining requests that didn't form a full batch
if (deleteRequests.length > 0) {
await Promise.all(deleteRequests);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. However, I still doubt that there will be any measurable benefit - this code is only executed on migration and there shouldn't be more than ~25 requests in the queue head.
@barjin I gave the forefront handling a makeover. If you could check that out, I'd be super grateful. |
Looking good to me 👍🏽 I remember reversing the forefront array somewhere already (likely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like almost all my notes were addressed and I commented the rest.
@@ -361,7 +430,8 @@ export class RequestQueue extends RequestProvider { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean to unlock in some batches like
protected async _clearPossibleLocks() {
this.queuePausedForMigration = true;
let requestId: string | null;
const batchSize = 10;
const deleteRequests: Promise<void>[] = [];
// eslint-disable-next-line no-cond-assign
while ((requestId = this.queueHeadIds.removeFirst()) !== null) {
deleteRequests.push(
this.client.deleteRequestLock(requestId).catch(() => {
// We don't have the lock, or the request was never locked. Either way it's fine
})
);
if (deleteRequests.length >= batchSize) {
// Process the batch of 10
await Promise.all(deleteRequests);
deleteRequests.length = 0; // Reset the array for the next batch
}
}
// Process any remaining requests that didn't form a full batch
if (deleteRequests.length > 0) {
await Promise.all(deleteRequests);
}
}
Co-authored-by: Vlad Frangu <me@vladfrangu.dev>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm once the format is fixed (woops, sorryy ;w;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few comments from my end, nothing really blocking so approving
@@ -673,7 +673,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext | |||
this.requestQueue.internalTimeoutMillis = this.internalTimeoutMillis; | |||
// for request queue v2, we want to lock requests by the timeout that would also account for internals (plus 5 seconds padding), but | |||
// with a minimum of a minute | |||
this.requestQueue.requestLockSecs = Math.max(this.internalTimeoutMillis / 1000 + 5, 60); | |||
this.requestQueue.requestLockSecs = Math.max(this.requestHandlerTimeoutMillis / 1000 + 5, 60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment still mentions the internal timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm honestly not sure what it was trying to say so I reworded it.
this.inProgressRequestBatches.push(promise); | ||
void promise.finally(() => { | ||
this.inProgressRequestBatches = this.inProgressRequestBatches.filter((it) => it !== promise); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how many items do you we expect in that array in a high concurrency run? this solution is not the best one, but if the size wont be large, we can keep it.
how is this different than a simple integer counter? that would be the most performant approach, just increment instead of push and decrement in the finally block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The integer counter was in fact the previous implementation. However, it could not work with multiple clients, and we cannot reliably detect that - the queueHadMultipleClients
flag is set even if the other client was a pre-migration instance of the same run, if that makes sense.
You are right that each forefront request might make us lock 25 more requests, and that could unbalance parallel instances quite a bit. Maybe we should give up "excess" requests after we're done checking for forefront requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure I follow why the counter wouldn't be enough, how is this better? Each client will have its own local cache (this new var). You store values in an array and wipe them based on identity, but the promises are not really used anywhere. My suggestion is doing the same, just without the memory/perf overhead.
Just to be sure, this is what I meant, it still uses the promise.finally
:
this.inProgressRequestBatches++;
void promise.finally(() => {
this.inProgressRequestBatches--;
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh damn, I'm sorry. I thought you are commenting on a different part of code - the one that handles forefront requests. If it's any help, you pushed me to tie up a loose end that I forgot about.
Regarding the batches, you're probably right 😁
if (this.queueHeadIds.length() > 0) { | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess the duplicity (same check 5 lines later) here is for performance reasons?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. If the queueHeadIds
is non-empty, we return immediately, otherwise we try to fetch something from the upstream queue, which may take time. I'll add a comment.
This PR ports over the changes from apify/crawlee#2775. Key changes: - tracking of "locked" or "in progress" requests was moved from `storages.RequestQueue` to request storage client implementations - queue head cache gets invalidated after we enqueue a new forefront request (before that, it would only be processed after the current head cache is consumed) - the `RequestQueue.is_finished` function has been rewritten to avoid race conditions - I tried running SDK integration tests with these changes and they passed
queueHasLockedRequests
to simplify RequestQueue v2 #2767