Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request (async): limit() to limit a function concurrency #4766

Open
guy-borderless opened this issue May 17, 2024 · 10 comments
Open

Comments

@guy-borderless
Copy link

guy-borderless commented May 17, 2024

Is your feature request related to a problem? Please describe.
I find p-limit helpful (APIs often limit user concurrency). The async module already has retry, debounce, and pooledMap which are similar.
An API like p-limit in deno-std will be great.

import pLimit from 'p-limit';

const limit = pLimit(10);

// only 10 concurrent requests
export const geocodeAddress = limit(async (address: string) => {...})
@iuioiua
Copy link
Collaborator

iuioiua commented May 19, 2024

Doesn't pooledMap() achieve essentially the same thing with its poolLimit parameter?

@guy-borderless
Copy link
Author

Doesn't pooledMap() achieve essentially the same thing with its poolLimit parameter?

Not easily I think, this scenario is for when we don't know the iterable up front. So we still have to create and manage an iterable here (and probably prefer to use some lib for it). How would you write the above code with pooledMap?

@BlackAsLight
Copy link
Contributor

With the above code, how do you imagine the iterable would be provided to it?

@kt3k
Copy link
Member

kt3k commented May 23, 2024

I'm not sure pLimit is common abstraction of that situation/requirement.

Could Semaphore class in this package ( https://jsr.io/@lambdalisue/async ) achieve that same thing?

This issue ( #4536 ) might be related to this topic.

@guy-borderless
Copy link
Author

guy-borderless commented May 23, 2024

Would that be:

import { Semaphore } from "@std/async";

const limiter = new Semaphore(10);

// only 10 concurrent requests
export const geocodeAddress = limiter.lock(async (address: string) => {...})

If it's about that, np.
Btw, how would you rate limit invocations? (max calls per duration)

@BlackAsLight
Copy link
Contributor

I imagine you could do something like this to accomplish the same thing you're after.

import { pooledMap } from '@std/async'

function pLimit(poolLimit: number) {
  return function <T, R>(iteratorFn: (data: T) => Promise<R>) {
    return function <T>(array: Iterable<T> | AsyncIterable<T>) {
      return pooledMap(poolLimit, array, iteratorFn)
    }
  }
}

const limit = pLimit(10)

const geocodeAddress = limit(async (address: string) => {...})

geocodeAddress(['an array of addresses'])

@guy-borderless
Copy link
Author

guy-borderless commented May 23, 2024

I imagine you could do something like this to accomplish the same thing you're after.

import { pooledMap } from '@std/async'

function pLimit(poolLimit: number) {
  return function <T, R>(iteratorFn: (data: T) => Promise<R>) {
    return function <T>(array: Iterable<T> | AsyncIterable<T>) {
      return pooledMap(poolLimit, array, iteratorFn)
    }
  }
}

const limit = pLimit(10)

const geocodeAddress = limit(async (address: string) => {...})

geocodeAddress(['an array of addresses'])
``

your code is not equivalent. it requires all addresses to be batched in the same lexical place. In practice, to be equivalent you need to introduce state.

@BlackAsLight
Copy link
Contributor

BlackAsLight commented May 23, 2024

your code is not equivalent. it requires all addresses to be batched in the same lexical place. In practice, to be equivalent you need to introduce state.

const { readable, writable } = new TranformStream<string, string>()
geocodeAddress(readable)

const writer = writable.getWriter()

// write an address in different places
writer.write('address')

// call at the very end when you know there is no more addresses
writer.close()

@BlackAsLight
Copy link
Contributor

import { pooledMap } from '@std/async'

const geocodeAddress = function() {
  const { readable, writable } = new TransformStream<string, string>()
  pooledMap(10, readable, async (address: string) => {
    ...
  })
  return writable.getWriter()
}()

geocodeAddress.write('address')

geocodeAddress.close()

@guy-borderless
Copy link
Author

guy-borderless commented May 26, 2024

by the "leakiness" and general friction seen here, it does seem a primitive is missing. I think it would be useful to have something like a semaphore in the standards library

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants