Async Utilities

Utilities for running blocking functions without starving the 1kHz control loop. See Async Mode Guide for usage guidance.

Utilities for running blocking functions without starving the 1kHz control loop.

After controller.start(), the asyncio event loop must not be blocked for more than ~1ms. These helpers offload synchronous work to a thread executor so the real-time loop keeps running.

Usage:

from aiofranka import asyncify

# Decorator class MyPolicy:

@asyncify def get_action(self, obs):

return self.model(obs)

action = await policy.get_action(obs)

# Wrap an existing function model_async = asyncify(model) result = await model_async(input_tensor)

# Original sync function is accessible via .sync result = policy.get_action.sync(obs)

aiofranka.async_utils.asyncify(fn)[source]

Wrap a blocking function so it runs in a thread executor when awaited.

Works as a decorator or as a wrapper for existing functions:

@asyncify
def heavy_compute(x):
    return x ** 2

result = await heavy_compute(42)
result = heavy_compute.sync(42)  # original sync version
class aiofranka.async_utils.CudaInferenceThread[source]

Bases: object

A persistent thread for CUDA inference that avoids run_in_executor overhead.

run_in_executor dispatches to a thread pool, which causes ~40x slowdown for CUDA ops due to thread-pool dispatch and CUDA cross-thread synchronization. This class keeps a single dedicated thread alive with an initialized CUDA context, so the per-call cost is just a queue round-trip (~microseconds).

Usage:

infer = CudaInferenceThread()
infer.start()

# wrap any callable
action = await infer.run(policy.get_action, obs)

# or use as a decorator
@infer.wrap
def get_action(obs):
    return model(obs)

action = await get_action(obs)
action = get_action.sync(obs)  # original sync version

infer.stop()
__init__()[source]
start()[source]

Start the inference thread.

stop()[source]

Signal the thread to exit.

async run(fn, *args, **kwargs)[source]

Submit a callable and await the result.

wrap(fn)[source]

Decorator: like asyncify but runs on the persistent CUDA thread.

class aiofranka.async_utils.ProcessProxy(conn, process)[source]

Bases: object

Transparent proxy to an object living in a separate process.

All attribute access and method calls are forwarded to the child process via a pipe. This completely avoids GIL contention — the child has its own GIL, so a 1kHz control loop on the main process cannot interfere.

Created via mpify(), not directly.

__init__(conn, process)[source]
stop()[source]

Signal the child process to exit.

aiofranka.async_utils.mpify(factory_fn, *args, **kwargs)[source]

Spawn a child process, run factory_fn(*args, **kwargs) in it, and return a transparent async proxy to the created object.

The proxy forwards all method calls and attribute access to the child process. Use await on every access since it crosses a process boundary.

Usage:

def make_policy(checkpoint, device):
    model, config = load_model(checkpoint, device)
    return ACTInferencePolicy(model, config, device)

policy = mpify(make_policy, checkpoint, "cuda:0")

await policy.reset(initial_ee)
ee = await policy.get_action(ee, qpos, task_type)
timing = await policy._last_timing

policy.stop()  # clean shutdown (synchronous)
Parameters:
  • factory_fn – Callable that creates the object. Must be picklable (module-level function). Runs in the child process.

  • *args – Passed to factory_fn.

  • **kwargs

    Passed to factory_fn.

Returns:

ProcessProxy that forwards attribute/method access to the child.

async aiofranka.async_utils.async_input(prompt='')[source]

Async-safe replacement for built-in input().

Runs input() in a thread executor so the event loop (and the 1kHz control loop) keeps running while waiting for user input:

await async_input("Press Enter to start...")
Parameters:

prompt (str)

Return type:

str