Async Aio workflow

Ropendal’s async handles are non-blocking by default and are designed for explicit waiting/inspection.

library(Ropendal)

root <- file.path(tempdir(), "ropendal-aio")
unlink(root, recursive = TRUE)
dir.create(root, recursive = TRUE)
fs <- opendal("fs", root = root)

fs_write(fs, "value.bin", as.raw(1:4))
#> [1] TRUE

Submit and wait

*_aio() variants return an OpendalAio handle.

aio <- fs_read_aio(fs, "value.bin")
call_aio(aio)
val <- collect_aio(aio)
val
#> [1] 01 02 03 04

Poll or race

unresolved() is a tiny sentinel/predicate for unfinished work. Local filesystem operations may already be ready by the time R inspects them, so FALSE here is normal.

aio2 <- fs_exists_aio(fs, "value.bin")
unresolved(aio2$value)
#> [1] FALSE
race_aio(aio2, timeout = 100)
#> $index
#> [1] 1
#> 
#> $name
#> [1] ""
#> 
#> $event
#> [1] "ready"
#> 
#> $aio
#> <opendal aio> ready 
#> 
#> attr(,"class")
#> [1] "opendalAioRace"
collect_aio(aio2)
#> [1] TRUE

Coordinating several handles with condition variables

cv() is a lightweight wait gate that pairs with aio_monitor() for batched completion.

read_aio <- fs_read_aio(fs, "value.bin")
cv_gate <- cv()
mon <- aio_monitor(list(read = read_aio), cv = cv_gate)
cv_until(cv_gate, 1000)
#> [1] TRUE
read_monitor(mon)
#>   index name event    state
#> 1     1 read ready resolved
collect_aio(read_aio)
#> [1] 01 02 03 04