Ropendal’s async handles are
non-blocking by default and are designed for explicit
waiting/inspection.
library(Ropendal)
root <- file.path(tempdir(), "ropendal-aio")
unlink(root, recursive = TRUE)
dir.create(root, recursive = TRUE)
fs <- opendal("fs", root = root)
fs_write(fs, "value.bin", as.raw(1:4))
#> [1] TRUE
Submit and wait
*_aio() variants return an OpendalAio
handle.
aio <- fs_read_aio(fs, "value.bin")
call_aio(aio)
val <- collect_aio(aio)
val
#> [1] 01 02 03 04
Poll or race
unresolved() is a tiny sentinel/predicate for unfinished
work. Local filesystem operations may already be ready by the time R
inspects them, so FALSE here is normal.
aio2 <- fs_exists_aio(fs, "value.bin")
unresolved(aio2$value)
#> [1] FALSE
race_aio(aio2, timeout = 100)
#> $index
#> [1] 1
#>
#> $name
#> [1] ""
#>
#> $event
#> [1] "ready"
#>
#> $aio
#> <opendal aio> ready
#>
#> attr(,"class")
#> [1] "opendalAioRace"
collect_aio(aio2)
#> [1] TRUE
Coordinating several handles with condition variables
cv() is a lightweight wait gate that pairs with
aio_monitor() for batched completion.
read_aio <- fs_read_aio(fs, "value.bin")
cv_gate <- cv()
mon <- aio_monitor(list(read = read_aio), cv = cv_gate)
cv_until(cv_gate, 1000)
#> [1] TRUE
read_monitor(mon)
#> index name event state
#> 1 1 read ready resolved
collect_aio(read_aio)
#> [1] 01 02 03 04