How can we join multiple coroutines? Await waits only on a single coroutine

:information_source: Attention Topic was automatically imported from the old Question2Answer platform.
:bust_in_silhouette: Asked By pseudonymous_person

Hi all,
I’m very excited about the new await keyword and coroutines.

In the event where you need to fire off multiple coroutines, say to download 3 files, what’s the recommended pattern to do it simultaneously?

With just await, it has to be done sequentially:

await download_coroutine_1
await download_coroutine_2
await download_coroutine_3

Ideally, it would have the join feature in other languages:

join(download_coroutine_1, download_coroutine_2, download_coroutine_3)

Or

await [download_coroutine_1, download_coroutine_2, download_coroutine_3]

Or even better, hack’s concurrent block: Asynchronous Operations: Concurrent

concurrent {
  first = await download_coroutine_1
  second = await download_coroutine_2
  third = await download_coroutine_3
}

If this isn’t implemented yet, please share a code pointer or two to get me started and I’ll implement it for us.

:bust_in_silhouette: Reply From: kienhoang

Hi, I made it by creating a small utility called Counter and using lambda functions to make different coroutine blocks.:

class_name Counter

signal completed

var _counter: int
var _total: int


func _init(p_total: int):
    _total = p_total
    _counter = 0


func submit():
    _counter = _counter + 1
    if _counter == _total:
        completed.emit()

In my main functions:

func main():
    var jobs = Counter.new(3) # I have 3 jobs here

    var job_1 = func():
        await download_coroutine_1
        jobs.submit()

    job_1.call()

    var job_2 = func():
        await download_coroutine_2
        jobs.submit()

    job_2.call()

    var job_3 = func():
        await download_coroutine_3
        jobs.submit()
    
    job_3.call()

    await jobs.completed

Thanks for this feedback. I think that could work.

In your example, don’t you still need to invoke the lambdas? And do you do that without awaits so you only block at the bottom?

‘’’
job_1
job_2
job_3
await jobs.completed
‘’’

pseudonymous_person | 2023-02-26 14:21

Oh yes, that’s true. Thanks

kienhoang | 2023-02-26 14:46

:bust_in_silhouette: Reply From: jovialthunder

I liked kienhoang’s answer and made a version that fires off all of the jobs in quick succession after you queue them up. Create this object, queue up as many objects as you’d like (with their params, start method, and finished signal), and then run them all at once with run_all(). Await completed on the object for them all to finish.

If you want to do stuff with the objects’ return params as they complete, connect your own functions to their signals before calling run_all.

Usage:

func http_and_timer(http_request:HTTPRequest, timer:Timer):
	var parallel := ParallelCoroutines.new()
	parallel.append(http_request, ['www.google.com'], 'request', 'request_completed')
	parallel.append(timer, [1000], 'start', 'timeout')
	parallel.run_all()
	await parallel.completed

Code:

extends Object
class_name ParallelCoroutines

signal completed

var queued_coroutines:Array[Dictionary]
var total_count: int
var completed_count: int

func append(object:Object, params:Array, call_method:StringName, complete_signal:StringName):
	queued_coroutines.append({'object': object, 'params': params, 'call_method': call_method, 'complete_signal': complete_signal})

func run_all():
	total_count = queued_coroutines.size()
	for routine in queued_coroutines:
		var object:Object = routine['object']
		var params:Array = routine['params']
		var call_method:StringName = routine['call_method']
		var complete_signal:StringName = routine['complete_signal']
		object.callv(call_method, params)
		object.connect(complete_signal, _on_completed)

func _on_completed(a=null, b=null, c=null, d=null, e=null, f=null, g=null): # I don't know how else to allow but ignore parameters
	completed_count = completed_count + 1
	if completed_count == total_count:
		completed.emit()

I had a need for lots of concurrent HTTP requests and felt the pain of not having an easy-to-use concurrency model for coroutines.

So I made a Concurrently utility. The usage looks like this:

Common case of just “run a bunch of side effects together but I need to await for all of them to be done before the next part of the code”

await Concurrently.fire_all(	[some_coroutine, another_coroutine])

Another common case of “run these concurrently but I want all the results back”

var fired:= await Concurrently.fire_all([some_coroutine, another_coroutine])
# Order is preserved from the array passed in to .fire_all()

var some_coroutine_result := fired[0].result
var another_coroutine_result := fired[1].result

Also common case of “Need to run coroutines together that have arguments which will come from the scope I’m in” - here we can use lambdas

await Concurrently.fire_all(
	[
		func (): return await some_coroutine(local_var_1, local_var_2),
		func (): return await other_coroutine(local_var_3)
	]
)

And finally, a rarer but VERY helpful case of “I want to fire them off concurrently, but I want to await for their results at different times”. Instead of .fire_all(), we can use individual Concurrently instances. This also covers the even rarer but useful case of “I want to fire them off at different times too.”

# something that can run on the side and we want to fire as eagerly as possible
var concurrent_1 := Concurrently.new(some_independent_coroutine)

# regular ol' function here to get some data that's needed for next step
var required_data := some_immediately_needed_func()

var concurrent_2 := Concurrently.new(
    func(): return await some_dependent_coroutine(required_data)
)

#could continue to do other work as necessary in this function, then when needed, do:
await concurrent_2.is_done()
print(concurrent_2.result)

## could also await this other one right away if needed, or continue working first, then:
await concurrent_1.is_done()
print(concurrent_1.result)


# note that the .is_done() method also returns the result, as a convenience

If this is helpful for you and the API surface is to your liking, then to use this in your own projects just create a script file of your choosing (probably concurrently.gd), and here’s the (surprisingly lightweight) code:

class_name Concurrently extends RefCounted

signal _done_signal
var _is_complete: bool = false
var coroutine: Callable
var result: Variant

func fire() -> void:
	result = await coroutine.call()
	_is_complete = true
	_done_signal.emit()

func _init(incoming_coroutine: Callable) -> void:
	coroutine = incoming_coroutine
    fire()


func is_done() -> Variant:
	if not _is_complete:
		await _done_signal
	return result


static func fire_all(funcs: Array[Callable]) -> Array:
	var fired: Array[Concurrently]
	for this_func: Callable in funcs:
		var concurrent_func: Concurrently = Concurrently.new(this_func)
		fired.append(concurrent_func)
	for item: Concurrently in fired:
		await item.is_done()
	return fired
1 Like