Async Utilities
Utilities for running blocking functions without starving the 1kHz control loop. See Async Mode Guide for usage guidance.
Utilities for running blocking functions without starving the 1kHz control loop.
After controller.start(), the asyncio event loop must not be blocked for more than ~1ms. These helpers offload synchronous work to a thread executor so the real-time loop keeps running.
- Usage:
from aiofranka import asyncify
# Decorator class MyPolicy:
@asyncify def get_action(self, obs):
return self.model(obs)
action = await policy.get_action(obs)
# Wrap an existing function model_async = asyncify(model) result = await model_async(input_tensor)
# Original sync function is accessible via .sync result = policy.get_action.sync(obs)
- aiofranka.async_utils.asyncify(fn)[source]
Wrap a blocking function so it runs in a thread executor when awaited.
Works as a decorator or as a wrapper for existing functions:
@asyncify def heavy_compute(x): return x ** 2 result = await heavy_compute(42) result = heavy_compute.sync(42) # original sync version
- class aiofranka.async_utils.CudaInferenceThread[source]
Bases:
objectA persistent thread for CUDA inference that avoids run_in_executor overhead.
run_in_executor dispatches to a thread pool, which causes ~40x slowdown for CUDA ops due to thread-pool dispatch and CUDA cross-thread synchronization. This class keeps a single dedicated thread alive with an initialized CUDA context, so the per-call cost is just a queue round-trip (~microseconds).
Usage:
infer = CudaInferenceThread() infer.start() # wrap any callable action = await infer.run(policy.get_action, obs) # or use as a decorator @infer.wrap def get_action(obs): return model(obs) action = await get_action(obs) action = get_action.sync(obs) # original sync version infer.stop()
- class aiofranka.async_utils.ProcessProxy(conn, process)[source]
Bases:
objectTransparent proxy to an object living in a separate process.
All attribute access and method calls are forwarded to the child process via a pipe. This completely avoids GIL contention — the child has its own GIL, so a 1kHz control loop on the main process cannot interfere.
Created via
mpify(), not directly.
- aiofranka.async_utils.mpify(factory_fn, *args, **kwargs)[source]
Spawn a child process, run factory_fn(*args, **kwargs) in it, and return a transparent async proxy to the created object.
The proxy forwards all method calls and attribute access to the child process. Use
awaiton every access since it crosses a process boundary.Usage:
def make_policy(checkpoint, device): model, config = load_model(checkpoint, device) return ACTInferencePolicy(model, config, device) policy = mpify(make_policy, checkpoint, "cuda:0") await policy.reset(initial_ee) ee = await policy.get_action(ee, qpos, task_type) timing = await policy._last_timing policy.stop() # clean shutdown (synchronous)
- Parameters:
factory_fn – Callable that creates the object. Must be picklable (module-level function). Runs in the child process.
*args – Passed to factory_fn.
**kwargs –
Passed to factory_fn.
- Returns:
ProcessProxy that forwards attribute/method access to the child.