Horatiu George Todoran 2025-12-10 18:13:11 +07:00 committed by GitHub
commit d336cc1787
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 921 additions and 170 deletions

@ -129,12 +129,13 @@ COPY --from=builder-armnn \
FROM prod-cpu AS prod-rknn
# renovate: datasource=github-tags depName=airockchip/rknn-toolkit2
ARG RKNN_TOOLKIT_VERSION="v2.3.0"
ARG RKNN_TOOLKIT_VERSION="v2.3.2"
ENV LD_PRELOAD=/usr/lib/libmimalloc.so.2 \
MACHINE_LEARNING_MODEL_ARENA=false
ADD --checksum=sha256:73993ed4b440460825f21611731564503cc1d5a0c123746477da6cd574f34885 "https://github.com/airockchip/rknn-toolkit2/raw/refs/tags/${RKNN_TOOLKIT_VERSION}/rknpu2/runtime/Linux/librknn_api/aarch64/librknnrt.so" /usr/lib/
ADD --checksum=sha256:d31fc19c85b85f6091b2bd0f6af9d962d5264a4e410bfb536402ec92bac738e8 "https://github.com/airockchip/rknn-toolkit2/raw/refs/tags/${RKNN_TOOLKIT_VERSION}/rknpu2/runtime/Linux/librknn_api/aarch64/librknnrt.so" /usr/lib/
ADD --checksum=sha256:c48e11a6f41b451a5fd1e4ad774ea60252d3d94f78bee9b21ea3d21b21deba9a "https://github.com/airockchip/rknn-toolkit2/raw/refs/tags/${RKNN_TOOLKIT_VERSION}/rknpu2/runtime/Linux/librknn_api/include/rknn_api.h" /usr/include/
FROM prod-${DEVICE} AS prod
@ -167,6 +168,21 @@ COPY --from=builder /opt/venv /opt/venv
COPY scripts/healthcheck.py .
COPY immich_ml immich_ml
RUN if [ "$DEVICE" = "rknn" ]; then \
apt-get update; \
apt-get install -y g++ libc6-dev; \
cd immich_ml/sessions/rknn/native; \
RKNN_HEADER=/usr/include/rknn_api.h \
RKNN_LIBRARY=/usr/lib/librknnrt.so \
./build-cross.sh; \
apt-get purge -y --auto-remove \
g++ \
libc6-dev; \
apt-get clean; \
rm -rf /var/lib/apt/lists/*; \
rm /usr/include/rknn_api.h; \
fi
ARG BUILD_ID
ARG BUILD_IMAGE
ARG BUILD_SOURCE_REF

@ -1,76 +1,10 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, NamedTuple
import numpy as np
from numpy.typing import NDArray
from immich_ml.config import log, settings
from immich_ml.schemas import SessionNode
from .rknnpool import RknnPoolExecutor, is_available, soc_name
is_available = is_available and settings.rknn
model_prefix = Path("rknpu") / soc_name if is_available and soc_name is not None else None
def run_inference(rknn_lite: Any, input: list[NDArray[np.float32]]) -> list[NDArray[np.float32]]:
outputs: list[NDArray[np.float32]] = rknn_lite.inference(inputs=input, data_format="nchw")
return outputs
input_output_mapping: dict[str, dict[str, Any]] = {
"detection": {
"input": {"norm_tensor:0": (1, 3, 640, 640)},
"output": {
"norm_tensor:1": (12800, 1),
"norm_tensor:2": (3200, 1),
"norm_tensor:3": (800, 1),
"norm_tensor:4": (12800, 4),
"norm_tensor:5": (3200, 4),
"norm_tensor:6": (800, 4),
"norm_tensor:7": (12800, 10),
"norm_tensor:8": (3200, 10),
"norm_tensor:9": (800, 10),
},
},
"recognition": {"input": {"norm_tensor:0": (1, 3, 112, 112)}, "output": {"norm_tensor:1": (1, 512)}},
}
class RknnSession:
def __init__(self, model_path: Path) -> None:
self.model_type = "detection" if "detection" in model_path.parts else "recognition"
self.tpe = settings.rknn_threads
log.info(f"Loading RKNN model from {model_path} with {self.tpe} threads.")
self.rknnpool = RknnPoolExecutor(model_path=model_path.as_posix(), tpes=self.tpe, func=run_inference)
log.info(f"Loaded RKNN model from {model_path} with {self.tpe} threads.")
def get_inputs(self) -> list[SessionNode]:
return [RknnNode(name=k, shape=v) for k, v in input_output_mapping[self.model_type]["input"].items()]
def get_outputs(self) -> list[SessionNode]:
return [RknnNode(name=k, shape=v) for k, v in input_output_mapping[self.model_type]["output"].items()]
def run(
self,
output_names: list[str] | None,
input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]],
run_options: Any = None,
) -> list[NDArray[np.float32]]:
input_data: list[NDArray[np.float32]] = [np.ascontiguousarray(v) for v in input_feed.values()]
self.rknnpool.put(input_data)
res = self.rknnpool.get()
if res is None:
raise RuntimeError("RKNN inference failed!")
return res
class RknnNode(NamedTuple):
name: str | None
shape: tuple[int, ...]
__all__ = ["RknnSession", "RknnNode", "is_available", "soc_name", "model_prefix"]
from .immich_session import RknnPoolExecutor, RknnSession, is_available, model_prefix, run_inference, soc_name
__all__ = [
"RknnSession",
"RknnPoolExecutor",
"run_inference",
"is_available",
"soc_name",
"model_prefix",
]

@ -0,0 +1,266 @@
from __future__ import annotations
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from pathlib import Path
from types import TracebackType
from typing import TYPE_CHECKING, Any, NamedTuple, Optional, Protocol, Sequence, cast
import numpy as np
from numpy.typing import NDArray
from immich_ml.config import log, settings
from immich_ml.models.constants import RKNN_SUPPORTED_SOCS
from .native import rknn_pool as _native_mod # pragma: no cover - compiled extension load
if TYPE_CHECKING:
class NativeRKNNExecutor(Protocol):
def infer(self, inputs: list[NDArray[np.float32]]) -> list[NDArray[np.float32]]: ...
def get_io_info(self) -> dict[str, Any]: ...
else:
NativeRKNNExecutor = _native_mod.NativeRKNNExecutor
__all__ = [
"RknnSession",
"RknnPoolExecutor",
"run_inference",
"is_available",
"soc_name",
"model_prefix",
]
def get_soc(device_tree_path: Path | str) -> str | None:
try:
with Path(device_tree_path).open() as f:
device_compatible_str = f.read().lower()
for soc in RKNN_SUPPORTED_SOCS:
if soc in device_compatible_str:
return soc
except OSError as exc:
log.debug("Could not read device tree %s: %s", device_tree_path, exc)
return None
soc_name = get_soc("/proc/device-tree/compatible")
is_available = soc_name is not None and settings.rknn
model_prefix = Path("rknpu") / soc_name if is_available and soc_name else None
class SessionNode(NamedTuple):
name: Optional[str]
shape: tuple[int, ...]
class RKNNInferenceResult(NamedTuple):
tag: Any
start_time: float
end_time: float
duration_s: float
outputs: list[NDArray[np.float32]]
class InferenceExecutor(Protocol):
def infer(self, inputs: list[NDArray[np.float32]]) -> list[NDArray[np.float32]]: ...
def run_inference(executor: InferenceExecutor, inputs: list[NDArray[np.float32]]) -> list[NDArray[np.float32]]:
return executor.infer(inputs)
class RknnPoolExecutor:
def __init__(self, model_path: str | Path, tpes: int) -> None:
if tpes < 1:
raise ValueError("tpes must be >= 1")
model_path_str = Path(model_path).as_posix()
self._native = NativeRKNNExecutor(model_path_str, num_workers=tpes)
self._executor = ThreadPoolExecutor(max_workers=tpes, thread_name_prefix="rknn-worker")
self._closed = False
def _run_inference(self, inputs: list[NDArray[np.float32]], tag: Any) -> RKNNInferenceResult:
start = time.perf_counter()
outputs = self._native.infer(inputs)
end = time.perf_counter()
return RKNNInferenceResult(
tag=tag,
start_time=start,
end_time=end,
duration_s=end - start,
outputs=outputs,
)
def submit(self, inputs: Sequence[NDArray[np.float32]], *, tag: Any = None) -> Future[RKNNInferenceResult]:
if self._closed:
raise RuntimeError("Pool is closed")
return self._executor.submit(self._run_inference, list(inputs), tag)
def put(self, inputs: Sequence[NDArray[np.float32]], *, tag: Any = None) -> Future[RKNNInferenceResult]:
return self.submit(inputs, tag=tag)
def close(self, *, wait: bool = True) -> None:
if self._closed:
return
self._closed = True
self._executor.shutdown(wait=wait)
@property
def executor(self) -> NativeRKNNExecutor:
return self._native
def __enter__(self) -> "RknnPoolExecutor":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()
class RknnSession:
def __init__(
self,
model_path: Path | str,
*,
num_workers: Optional[int] = None,
logger: Any = None,
) -> None:
if not is_available:
raise RuntimeError("RKNN is not available on this device")
self.model_path = Path(model_path)
self.log = logger or log
default_workers = getattr(settings, "rknn_threads", 1)
self.tpe = num_workers or default_workers
if self.tpe < 1:
raise ValueError("num_workers must be >= 1")
self.log.info(
"Loading RKNN model from %s with %s worker(s).",
self.model_path,
self.tpe,
)
self.rknnpool = RknnPoolExecutor(self.model_path, self.tpe)
self._io_info = self._normalize_io_info(self.rknnpool.executor.get_io_info())
self._input_nodes = self._build_nodes("inputs")
self._output_nodes = self._build_nodes("outputs")
self.log.info("Loaded RKNN model from %s.", self.model_path)
@property
def io_info(self) -> dict[str, Any]:
return self._io_info
def get_inputs(self) -> list[SessionNode]:
return self._input_nodes
def get_outputs(self) -> list[SessionNode]:
return self._output_nodes
def run(
self,
_output_names: Sequence[str] | None,
input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]],
_run_options: Any = None,
) -> list[NDArray[np.float32]]:
return self.run_async(_output_names, input_feed, _run_options).result().outputs
def run_async(
self,
_output_names: Sequence[str] | None,
input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]],
_run_options: Any = None,
) -> Future[RKNNInferenceResult]:
inputs_list = list(input_feed.values())
if not inputs_list:
raise ValueError("input_feed must not be empty")
batch_sizes = {int(x.shape[0]) for x in inputs_list}
if len(batch_sizes) != 1:
raise ValueError(f"All inputs must have the same batch size, got {sorted(batch_sizes)}")
batch_size = batch_sizes.pop()
if batch_size <= 1:
return self.rknnpool.put(inputs_list)
# Split each input tensor into per-sample slices of shape (1, ...)
per_sample_inputs = [[inp[i : i + 1] for inp in inputs_list] for i in range(batch_size)]
sub_futures = [self.rknnpool.put(sample) for sample in per_sample_inputs]
parent_future: Future[RKNNInferenceResult] = Future()
def _aggregate() -> None:
try:
results = [f.result() for f in sub_futures]
num_outputs = len(results[0].outputs)
stacked_outputs = [np.concatenate([r.outputs[j] for r in results], axis=0) for j in range(num_outputs)]
start_time = min(r.start_time for r in results)
end_time = max(r.end_time for r in results)
parent_future.set_result(
RKNNInferenceResult(
tag=None,
start_time=start_time,
end_time=end_time,
duration_s=end_time - start_time,
outputs=stacked_outputs,
),
)
except Exception as exc: # noqa: BLE001
if not parent_future.done():
parent_future.set_exception(exc)
threading.Thread(target=_aggregate, daemon=True).start()
return parent_future
def close(self) -> None:
self.rknnpool.close()
def _build_nodes(self, key: str) -> list[SessionNode]:
nodes: list[SessionNode] = []
for entry in self._io_info.get(key, []):
shape = self._shape_from_entry(entry)
if key == "inputs" and shape:
# Represent the batch dimension symbolically for readability while
# keeping the static type compatible with the ModelSession protocol.
symbolic_shape_any: tuple[Any, ...] = ("batch", *shape[1:])
symbolic_shape = cast(tuple[int, ...], symbolic_shape_any)
else:
symbolic_shape = shape
nodes.append(SessionNode(name=entry.get("name"), shape=symbolic_shape))
return nodes
@staticmethod
def _shape_from_entry(entry: dict[str, Any]) -> tuple[int, ...]:
if dims := entry.get("dims"):
return tuple(int(dim) for dim in dims)
dyn = entry.get("dynamic", {})
ranges = dyn.get("ranges", [])
if ranges:
return tuple(int(dim) for dim in ranges[-1])
raise ValueError(f"Cannot determine shape from entry: {entry}")
def _normalize_io_info(self, info: dict[str, Any]) -> dict[str, Any]:
return {
**info,
"inputs": [self._normalize_tensor_desc(t) for t in info.get("inputs", [])],
"outputs": [self._normalize_tensor_desc(t) for t in info.get("outputs", [])],
}
@staticmethod
def _normalize_tensor_desc(tensor: dict[str, Any]) -> dict[str, Any]:
dims = list(RknnSession._shape_from_entry(tensor))
desc = {**tensor, "dims": dims, "n_dims": len(dims)}
# Force NCHW format if the runtime reports NHWC tensors
if tensor.get("fmt") == 1 and len(dims) == 4:
n, h, w, c = dims
desc["dims"] = [n, c, h, w]
desc["fmt"] = 0
dyn = desc.get("dynamic", {})
if "ranges" in dyn:
dyn["ranges"] = [
[shape[0], shape[3], shape[1], shape[2]] if len(shape) == 4 else shape for shape in dyn["ranges"]
]
return desc

@ -0,0 +1,138 @@
import argparse
import time
from pathlib import Path
from typing import List
import numpy as np
import numpy.typing as npt
try:
from .immich_session import RknnSession
except ImportError:
from rknn_multi_executor.immich_session import RknnSession # type: ignore
def parse_shape(shape_str: str) -> List[int]:
parts = [p.strip() for p in shape_str.split(",") if p.strip()]
if not parts:
raise ValueError("Invalid shape string")
return [int(p) for p in parts]
def main() -> None:
parser = argparse.ArgumentParser(description="Minimal RKNN Native Executor usage")
parser.add_argument("--model", required=True, type=Path, help="Path to .rknn model")
parser.add_argument("--num-workers", type=int, default=3, help="Number of worker contexts")
parser.add_argument(
"--shape",
type=str,
default="1,3,640,640",
help="Input shape as comma-separated list, e.g. 1,3,640,640",
)
parser.add_argument(
"--dtype",
type=str,
default="float32",
choices=["float32", "float16", "int32", "int8", "uint8"],
help="Data type for randomly generated input tensor",
)
args = parser.parse_args()
shape = parse_shape(args.shape)
if len(shape) < 2:
raise ValueError("Shape must have at least 2 dims (e.g., NCHW)")
gen_t0 = time.perf_counter()
# Generate a random input tensor with the requested dtype
x: npt.NDArray[np.generic]
if args.dtype == "float32":
x = np.random.rand(*shape).astype(np.float32)
elif args.dtype == "float16":
x = np.random.rand(*shape).astype(np.float16)
elif args.dtype == "int32":
# Use a modest integer range; adjust as needed for your model (e.g., vocab size)
x = np.random.randint(0, 1000, size=tuple(shape), dtype=np.int32)
elif args.dtype == "int8":
x = np.random.randint(-128, 128, size=tuple(shape), dtype=np.int8)
elif args.dtype == "uint8":
x = np.random.randint(0, 256, size=tuple(shape), dtype=np.uint8)
else:
raise ValueError(f"Unsupported dtype: {args.dtype}")
gen_t1 = time.perf_counter()
print(f"[timing] generated random {args.dtype} tensor with shape {shape} " f"in {(gen_t1-gen_t0)*1000:.2f} ms")
time.sleep(1)
session_t0 = time.perf_counter()
session = RknnSession(args.model.as_posix(), num_workers=args.num_workers)
session_t1 = time.perf_counter()
print(session.get_inputs())
print(f"[timing] session init took {(session_t1-session_t0)*1000:.2f} ms")
try:
print("IO description:", session.io_info)
inputs = session.get_inputs()
if not inputs:
raise RuntimeError("Model exposes no inputs")
input_name = inputs[0].name or "input"
# Serial demo
for i in range(3):
t0 = time.perf_counter()
outs = session.run(None, {input_name: x})
t1 = time.perf_counter()
print(
f"[serial {i+1}] start={t0:.6f}s end={t1:.6f}s "
f"dur_ms={(t1-t0)*1000:.2f} shapes={[getattr(o, 'shape', None) for o in outs]}"
)
# Batch demo (single RKNN session call with batched input to exercise batch mode)
batch_repeats = 3
if shape[0] == 1:
batch_shape = [batch_repeats, *shape[1:]]
else:
batch_shape = shape
x_batch: npt.NDArray[np.generic]
if args.dtype == "float32":
x_batch = np.random.rand(*batch_shape).astype(np.float32)
elif args.dtype == "float16":
x_batch = np.random.rand(*batch_shape).astype(np.float16)
elif args.dtype == "int32":
x_batch = np.random.randint(0, 1000, size=tuple(batch_shape), dtype=np.int32)
elif args.dtype == "int8":
x_batch = np.random.randint(-128, 128, size=tuple(batch_shape), dtype=np.int8)
elif args.dtype == "uint8":
x_batch = np.random.randint(0, 256, size=tuple(batch_shape), dtype=np.uint8)
else:
raise ValueError(f"Unsupported dtype: {args.dtype}")
for i in range(3):
t0 = time.perf_counter()
outs = session.run(None, {input_name: x_batch})
t1 = time.perf_counter()
print(
f"[batch {i+1}] start={t0:.6f}s end={t1:.6f}s "
f"dur_ms={(t1-t0)*1000:.2f} shapes={[getattr(o, 'shape', None) for o in outs]}"
)
time.sleep(1)
# Parallel demo using Immich-style pool
total_requests = 5 * args.num_workers
print(f"[pool] submitting {total_requests} requests with {args.num_workers} worker contexts")
batch_t0 = time.perf_counter()
futures = []
for _ in range(total_requests):
futures.append(session.rknnpool.put([x]))
for idx, fut in enumerate(futures):
res = fut.result()
outs = res.outputs
lat_ms = res.duration_s * 1000.0
print(f"[parallel {idx+1}] dur_ms={lat_ms:.2f} shapes={[getattr(o, 'shape', None) for o in outs]}")
batch_t1 = time.perf_counter()
print(f"[parallel batch] total_ms={(batch_t1-batch_t0)*1000:.2f}")
time.sleep(1)
finally:
session.close()
if __name__ == "__main__":
main()

@ -0,0 +1,7 @@
from __future__ import annotations
from importlib import import_module
__all__ = ["rknn_pool"]
rknn_pool = import_module(f"{__name__}.rknn_pool")

@ -0,0 +1,64 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SRC_FILE="${SCRIPT_DIR}/rknn_pool.cpp"
CXX="${CXX:-g++}"
RKNN_HEADER="${RKNN_HEADER:-/usr/include/rknn_api.h}"
RKNN_LIBRARY="${RKNN_LIBRARY:-/usr/lib/librknnrt.so}"
RKNN_OUTPUT_DIR="${RKNN_OUTPUT_DIR:-$SCRIPT_DIR}"
if [[ $# -ge 1 ]]; then
RKNN_HEADER="$1"
fi
if [[ $# -ge 2 ]]; then
RKNN_LIBRARY="$2"
fi
if [[ $# -ge 3 ]]; then
RKNN_OUTPUT_DIR="$3"
fi
for file in "$SRC_FILE" "$RKNN_HEADER" "$RKNN_LIBRARY"; do
if [[ ! -f "$file" ]]; then
echo "Missing required file: $file" >&2
exit 1
fi
done
if ! command -v python3 >/dev/null 2>&1; then
echo "python3 is required to determine include paths." >&2
exit 1
fi
read -r -a PYBIND_FLAGS <<<"$(python3 -m pybind11 --includes)"
EXT_SUFFIX="$(python3 - <<'PY'
import sysconfig
print(sysconfig.get_config_var("EXT_SUFFIX") or ".so")
PY
)"
INCLUDE_DIR="$(dirname "$(realpath "$RKNN_HEADER")")"
LIB_DIR="$(dirname "$(realpath "$RKNN_LIBRARY")")"
LIB_BASE="$(basename "$RKNN_LIBRARY")"
LIB_NAME="${LIB_BASE#lib}"
LIB_NAME="${LIB_NAME%%.so*}"
mkdir -p "$RKNN_OUTPUT_DIR"
OUTPUT_PATH="${RKNN_OUTPUT_DIR}/rknn_pool${EXT_SUFFIX}"
echo "[build-cross] Building ${OUTPUT_PATH}"
"$CXX" "$SRC_FILE" \
-shared -o "$OUTPUT_PATH" \
-O3 -DNDEBUG -std=c++17 -fPIC \
-Wall -Wextra -Wno-unused-parameter \
-D_DEFAULT_SOURCE -D_GNU_SOURCE \
-Wl,-z,relro,-z,now \
-Wl,-rpath,"\$ORIGIN" -Wl,-rpath,"\$ORIGIN/." \
"${PYBIND_FLAGS[@]}" \
-I "$INCLUDE_DIR" \
-L"$LIB_DIR" -l"$LIB_NAME" \
-ldl -lpthread
echo "[build-cross] Done."

@ -0,0 +1,414 @@
// RKNNLite-like minimal wrapper with correct input shape/type handling and dup_context sharing.
#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>
#include <pybind11/stl.h>
#include <cstring>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
#include <string>
#include <vector>
#include "rknn_api.h"
namespace py = pybind11;
using namespace py::literals;
// Helpers
static bool is_dims_known(const rknn_tensor_attr& a) {
for (uint32_t d = 0; d < a.n_dims; ++d) {
if (a.dims[d] == 0) return false;
}
return true;
}
static py::array ensure_dtype(py::array arr, rknn_tensor_type t) {
py::dtype target_dtype;
switch (t) {
case RKNN_TENSOR_FLOAT16:
target_dtype = py::dtype("float16");
break;
case RKNN_TENSOR_FLOAT32:
target_dtype = py::dtype::of<float>();
break;
case RKNN_TENSOR_UINT8:
target_dtype = py::dtype::of<uint8_t>();
break;
case RKNN_TENSOR_INT8:
target_dtype = py::dtype::of<int8_t>();
break;
default:
return arr;
}
if (!arr.dtype().is(target_dtype)) {
return py::array::ensure(arr.attr("astype")(target_dtype), py::array::c_style);
}
return arr;
}
struct __attribute__((visibility("hidden"))) PreparedInput {
rknn_input tensor{};
std::vector<uint32_t> shape;
py::array buffer;
};
static py::list dims_to_list(const uint32_t* dims, uint32_t n_dims) {
py::list lst(n_dims);
for (uint32_t k = 0; k < n_dims; ++k) lst[k] = dims[k];
return lst;
}
static py::dict make_tensor_info(uint32_t index, const rknn_tensor_attr& attr) {
return py::dict("index"_a=index, "name"_a=py::str(attr.name), "fmt"_a=static_cast<int>(attr.fmt),
"type"_a=static_cast<int>(attr.type), "n_dims"_a=attr.n_dims,
"dims"_a=dims_to_list(attr.dims, attr.n_dims));
}
static py::dict make_dynamic_dict(const rknn_input_range& rng) {
py::list ranges(rng.shape_number);
for (uint32_t s = 0; s < rng.shape_number; ++s) {
py::list single(rng.n_dims);
for (uint32_t d = 0; d < rng.n_dims; ++d) {
single[d] = rng.dyn_range[s][d];
}
ranges[s] = single;
}
return py::dict("shape_number"_a=rng.shape_number, "n_dims"_a=rng.n_dims,
"fmt"_a=static_cast<int>(rng.fmt), "name"_a=py::str(rng.name), "ranges"_a=ranges);
}
static py::array align_layout(py::array arr, const rknn_tensor_attr& attr) {
if (attr.n_dims != 4 || !is_dims_known(attr)) return arr;
py::buffer_info bi = arr.request();
if (bi.ndim != 4) return arr;
auto matches = [&](uint32_t d1, uint32_t d2, uint32_t d3) -> bool {
return bi.shape[1] == d1 && bi.shape[2] == d2 && bi.shape[3] == d3;
};
if (attr.fmt == RKNN_TENSOR_NHWC) {
if (matches(attr.dims[1], attr.dims[2], attr.dims[3])) return arr;
if (matches(attr.dims[3], attr.dims[1], attr.dims[2])) {
return py::array::ensure(arr.attr("transpose")(py::make_tuple(0, 2, 3, 1)), py::array::c_style);
}
} else if (attr.fmt == RKNN_TENSOR_NCHW) {
if (matches(attr.dims[1], attr.dims[2], attr.dims[3])) return arr;
if (matches(attr.dims[2], attr.dims[3], attr.dims[1])) {
return py::array::ensure(arr.attr("transpose")(py::make_tuple(0, 3, 1, 2)), py::array::c_style);
}
}
return arr;
}
static PreparedInput prepare_input_tensor(py::handle handle, const rknn_tensor_attr& attr, bool capture_shape) {
py::array arr = handle.cast<py::array>();
py::array contiguous = py::array::ensure(arr, py::array::c_style);
contiguous = align_layout(contiguous, attr);
contiguous = ensure_dtype(contiguous, attr.type);
auto bi = contiguous.request();
rknn_input tensor{};
tensor.index = attr.index;
tensor.type = attr.type;
tensor.fmt = attr.fmt;
tensor.size = static_cast<uint32_t>(contiguous.nbytes());
tensor.buf = const_cast<void*>(bi.ptr);
std::vector<uint32_t> shape;
if (capture_shape) {
shape.resize(bi.ndim);
for (ssize_t d = 0; d < bi.ndim; ++d) {
shape[d] = static_cast<uint32_t>(bi.shape[d]);
}
}
return {tensor, std::move(shape), contiguous};
}
static int find_matching_shape(const rknn_input_range& rng, const std::vector<uint32_t>& provided) {
if (provided.size() != rng.n_dims) return rng.shape_number - 1;
for (uint32_t s = 0; s < rng.shape_number; ++s) {
bool match = true;
for (uint32_t d = 0; d < rng.n_dims && match; ++d)
match = (rng.dyn_range[s][d] == provided[d]);
if (match) return s;
}
return rng.shape_number - 1;
}
static py::array make_output_array(const rknn_tensor_attr& attr, const rknn_output& out) {
std::vector<ssize_t> shape(attr.n_dims == 0 ? 1 : attr.n_dims);
if (attr.n_dims == 0) {
shape[0] = static_cast<ssize_t>(out.size / sizeof(float));
} else {
std::copy(attr.dims, attr.dims + attr.n_dims, shape.begin());
}
py::array arr(py::dtype::of<float>(), shape);
std::memcpy(arr.mutable_data(), out.buf, out.size);
return arr;
}
struct RknnCtx {
rknn_context ctx = 0;
rknn_input_output_num io_num{};
std::vector<rknn_tensor_attr> input_attrs;
std::vector<rknn_tensor_attr> output_attrs;
void query_io() {
if (rknn_query(ctx, RKNN_QUERY_IN_OUT_NUM, &io_num, sizeof(io_num)) != RKNN_SUCC)
throw std::runtime_error("rknn_query IN_OUT_NUM failed");
input_attrs.resize(io_num.n_input);
output_attrs.resize(io_num.n_output);
for (uint32_t i = 0; i < io_num.n_input; ++i) {
rknn_tensor_attr attr{};
attr.index = i;
if (rknn_query(ctx, RKNN_QUERY_INPUT_ATTR, &attr, sizeof(attr)) != RKNN_SUCC)
throw std::runtime_error("rknn_query INPUT_ATTR failed");
input_attrs[i] = attr;
}
for (uint32_t i = 0; i < io_num.n_output; ++i) {
rknn_tensor_attr attr{};
attr.index = i;
if (rknn_query(ctx, RKNN_QUERY_OUTPUT_ATTR, &attr, sizeof(attr)) != RKNN_SUCC)
throw std::runtime_error("rknn_query OUTPUT_ATTR failed");
output_attrs[i] = attr;
}
}
};
static void debug_print_io_info(const RknnCtx& ctx) {
std::cerr << "[rknn2] model: inputs=" << ctx.io_num.n_input
<< " outputs=" << ctx.io_num.n_output << std::endl;
std::cerr << "input tensors:" << std::endl;
for (uint32_t i = 0; i < ctx.io_num.n_input; ++i) {
const auto& a = ctx.input_attrs[i];
std::cerr << " in[" << i << "] fmt=" << a.fmt << " dims=[";
for (uint32_t d = 0; d < a.n_dims; ++d) {
std::cerr << a.dims[d] << (d + 1 < a.n_dims ? "," : "");
}
std::cerr << "]" << std::endl;
}
std::cerr << "output tensors:" << std::endl;
for (uint32_t i = 0; i < ctx.io_num.n_output; ++i) {
const auto& a = ctx.output_attrs[i];
std::cerr << " out[" << i << "] fmt=" << a.fmt << " dims=[";
for (uint32_t d = 0; d < a.n_dims; ++d) {
std::cerr << a.dims[d] << (d + 1 < a.n_dims ? "," : "");
}
std::cerr << "]" << std::endl;
}
}
static const rknn_tensor_attr& resolve_output_attr(bool is_dynamic, RknnCtx& ctx, uint32_t index, rknn_tensor_attr& scratch) {
if (!is_dynamic) return ctx.output_attrs[index];
scratch.index = index;
if (rknn_query(ctx.ctx, RKNN_QUERY_CURRENT_OUTPUT_ATTR, &scratch, sizeof(scratch)) == RKNN_SUCC) {
return scratch;
}
return ctx.output_attrs[index];
}
class NativeRKNNExecutor {
public:
explicit NativeRKNNExecutor(const std::string& model_path, int num_workers)
: rr_index_(0),
is_dynamic_model_(false) {
if (num_workers < 1) throw std::invalid_argument("num_workers must be >= 1");
if (num_workers > 3) throw std::invalid_argument("num_workers must be <= 3");
const bool debug_ctor = (std::getenv("RKNN_EXEC_DEBUG") != nullptr);
RknnCtx master;
if (rknn_init(&master.ctx, const_cast<char*>(model_path.c_str()), 0, 0, nullptr) != RKNN_SUCC)
throw std::runtime_error("rknn_init failed");
master.query_io();
if (debug_ctor) debug_print_io_info(master);
input_ranges_.resize(master.io_num.n_input);
for (uint32_t i = 0; i < master.io_num.n_input; ++i) {
rknn_input_range rng{};
rng.index = i;
if (rknn_query(master.ctx, RKNN_QUERY_INPUT_DYNAMIC_RANGE, &rng, sizeof(rng)) == RKNN_SUCC
&& rng.shape_number > 0 && rng.n_dims > 0) {
is_dynamic_model_ = true;
input_ranges_[i] = rng;
}
}
contexts_.push_back(std::move(master));
for (int i = 1; i < num_workers; ++i) {
RknnCtx child;
if (rknn_dup_context(&contexts_[0].ctx, &child.ctx) != RKNN_SUCC)
throw std::runtime_error("rknn_dup_context failed");
child.query_io();
contexts_.push_back(std::move(child));
}
ctx_busy_.assign(contexts_.size(), false);
}
~NativeRKNNExecutor() {
for (auto& c : contexts_) {
if (c.ctx) rknn_destroy(c.ctx);
}
}
py::dict get_io_info() const {
py::dict info;
const RknnCtx& master = contexts_.front();
info["is_dynamic"] = is_dynamic_model_;
py::list inputs(master.io_num.n_input);
for (uint32_t i = 0; i < master.io_num.n_input; ++i) {
py::dict desc = make_tensor_info(i, master.input_attrs[i]);
if (is_dynamic_model_ && i < input_ranges_.size()) {
const auto& rng = input_ranges_[i];
if (rng.shape_number > 0 && rng.n_dims > 0) {
desc["dynamic"] = make_dynamic_dict(rng);
}
}
inputs[i] = desc;
}
info["inputs"] = inputs;
py::list outputs(master.io_num.n_output);
for (uint32_t i = 0; i < master.io_num.n_output; ++i) {
outputs[i] = make_tensor_info(i, master.output_attrs[i]);
}
info["outputs"] = outputs;
return info;
}
py::list infer(const py::list& inputs) {
auto ctx_handle = acquire_ctx_();
auto& c = ctx_handle.ctx;
if (inputs.size() != c.io_num.n_input)
throw std::runtime_error("Input count mismatch");
std::vector<rknn_input> in(c.io_num.n_input);
std::vector<py::array> keep_alive;
keep_alive.reserve(c.io_num.n_input);
std::vector<std::vector<uint32_t>> input_shapes;
if (is_dynamic_model_) input_shapes.reserve(c.io_num.n_input);
for (uint32_t i = 0; i < c.io_num.n_input; ++i) {
PreparedInput prepared = prepare_input_tensor(inputs[i], c.input_attrs[i], is_dynamic_model_);
in[i] = prepared.tensor;
if (is_dynamic_model_) input_shapes.push_back(std::move(prepared.shape));
keep_alive.push_back(std::move(prepared.buffer));
}
if (is_dynamic_model_) set_dynamic_shapes(c, input_shapes);
std::vector<rknn_output> out(c.io_num.n_output);
{
py::gil_scoped_release nogil;
if (rknn_inputs_set(c.ctx, c.io_num.n_input, in.data()) != RKNN_SUCC)
throw std::runtime_error("rknn_inputs_set failed");
if (rknn_run(c.ctx, nullptr) != RKNN_SUCC)
throw std::runtime_error("rknn_run failed");
for (uint32_t i = 0; i < c.io_num.n_output; ++i) {
out[i] = {};
out[i].want_float = 1;
out[i].index = i;
}
if (rknn_outputs_get(c.ctx, c.io_num.n_output, out.data(), nullptr) != RKNN_SUCC)
throw std::runtime_error("rknn_outputs_get failed");
}
py::list result(c.io_num.n_output);
rknn_tensor_attr scratch{};
for (uint32_t i = 0; i < c.io_num.n_output; ++i) {
const auto& attr = resolve_output_attr(is_dynamic_model_, c, i, scratch);
result[i] = make_output_array(attr, out[i]);
}
{
py::gil_scoped_release nogil;
rknn_outputs_release(c.ctx, c.io_num.n_output, out.data());
}
return result;
}
private:
struct CtxHandle {
RknnCtx& ctx;
NativeRKNNExecutor& owner;
size_t index;
CtxHandle(RknnCtx& ctx_ref, NativeRKNNExecutor& parent, size_t idx)
: ctx(ctx_ref), owner(parent), index(idx) {}
~CtxHandle() {
owner.release_ctx_(index);
}
};
CtxHandle acquire_ctx_() {
std::unique_lock<std::mutex> lock(ctx_mutex_);
ctx_cv_.wait(lock, [&]{
for (bool busy : ctx_busy_) {
if (!busy) return true;
}
return false;
});
size_t start = rr_index_;
for (size_t attempt = 0; attempt < ctx_busy_.size(); ++attempt) {
size_t idx = (start + attempt) % ctx_busy_.size();
if (!ctx_busy_[idx]) {
ctx_busy_[idx] = true;
rr_index_ = (idx + 1) % ctx_busy_.size();
return CtxHandle(contexts_[idx], *this, idx);
}
}
throw std::runtime_error("No RKNN context available");
}
void release_ctx_(size_t idx) {
std::lock_guard<std::mutex> lock(ctx_mutex_);
ctx_busy_[idx] = false;
ctx_cv_.notify_one();
}
void set_dynamic_shapes(RknnCtx& ctx, const std::vector<std::vector<uint32_t>>& input_shapes) const;
private:
std::mutex ctx_mutex_;
std::condition_variable ctx_cv_;
size_t rr_index_;
std::vector<RknnCtx> contexts_;
std::vector<bool> ctx_busy_;
bool is_dynamic_model_;
std::vector<rknn_input_range> input_ranges_;
};
void NativeRKNNExecutor::set_dynamic_shapes(RknnCtx& ctx, const std::vector<std::vector<uint32_t>>& input_shapes) const {
std::vector<rknn_tensor_attr> attrs = ctx.input_attrs;
for (uint32_t i = 0; i < ctx.io_num.n_input && i < input_ranges_.size() && i < input_shapes.size(); ++i) {
const auto& rng = input_ranges_[i];
if (rng.shape_number == 0 || rng.n_dims == 0) continue;
int match_idx = find_matching_shape(rng, input_shapes[i]);
auto& attr = attrs[i];
attr.n_dims = rng.n_dims;
for (uint32_t d = 0; d < rng.n_dims && d < RKNN_MAX_DIMS; ++d) {
attr.dims[d] = rng.dyn_range[match_idx][d];
}
if (rng.fmt == RKNN_TENSOR_NHWC || rng.fmt == RKNN_TENSOR_NCHW || rng.fmt == RKNN_TENSOR_UNDEFINED)
attr.fmt = rng.fmt;
attr.type = ctx.input_attrs[i].type;
}
if (rknn_set_input_shapes(ctx.ctx, ctx.io_num.n_input, attrs.data()) != RKNN_SUCC)
throw std::runtime_error("Failed to set input shapes for dynamic model");
}
PYBIND11_MODULE(rknn_pool, m) {
py::class_<NativeRKNNExecutor>(m, "NativeRKNNExecutor")
.def(py::init<const std::string&, int>(), py::arg("model_path"), py::arg("num_workers") = 1)
.def("set_core_mask_all",
[](NativeRKNNExecutor& /*self*/, int /*mask*/) {
// Placeholder for API compatibility; no-op.
},
py::arg("mask"),
"Set the NPU core mask for all contexts (no-op placeholder).")
.def("infer", &NativeRKNNExecutor::infer, py::arg("inputs"),
"Run inference with a list of numpy arrays, returns list of numpy arrays.")
.def("get_io_info", &NativeRKNNExecutor::get_io_info,
"Return a dict with model IO info, including dynamic input ranges when available.");
}

@ -1,91 +0,0 @@
# This code is from leafqycc/rknn-multi-threaded
# Following Apache License 2.0
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from pathlib import Path
from queue import Queue
from typing import Callable
import numpy as np
from numpy.typing import NDArray
from immich_ml.config import log
from immich_ml.models.constants import RKNN_COREMASK_SUPPORTED_SOCS, RKNN_SUPPORTED_SOCS
def get_soc(device_tree_path: Path | str) -> str | None:
try:
with Path(device_tree_path).open() as f:
device_compatible_str = f.read()
for soc in RKNN_SUPPORTED_SOCS:
if soc in device_compatible_str:
return soc
log.warning("Device is not supported for RKNN")
except OSError as e:
log.warning(f"Could not read {device_tree_path}. Reason: %s", e)
return None
soc_name = None
is_available = False
try:
from rknnlite.api import RKNNLite
soc_name = get_soc("/proc/device-tree/compatible")
is_available = soc_name is not None
except ImportError:
log.debug("RKNN is not available")
def init_rknn(model_path: str) -> "RKNNLite":
if not is_available:
raise RuntimeError("rknn is not available!")
rknn_lite = RKNNLite()
rknn_lite.rknn_log.logger.setLevel(logging.ERROR)
ret = rknn_lite.load_rknn(model_path)
if ret != 0:
raise RuntimeError("Failed to load RKNN model")
if soc_name in RKNN_COREMASK_SUPPORTED_SOCS:
ret = rknn_lite.init_runtime(core_mask=RKNNLite.NPU_CORE_AUTO)
else:
ret = rknn_lite.init_runtime() # Please do not set this parameter on other platforms.
if ret != 0:
raise RuntimeError("Failed to initialize RKNN runtime environment")
return rknn_lite
class RknnPoolExecutor:
def __init__(
self,
model_path: str,
tpes: int,
func: Callable[["RKNNLite", list[NDArray[np.float32]]], list[NDArray[np.float32]]],
) -> None:
self.tpes = tpes
self.queue: Queue[Future[list[NDArray[np.float32]]]] = Queue()
self.rknn_pool = [init_rknn(model_path) for _ in range(tpes)]
self.pool = ThreadPoolExecutor(max_workers=tpes)
self.func = func
self.num = 0
def put(self, inputs: list[NDArray[np.float32]]) -> None:
self.queue.put(self.pool.submit(self.func, self.rknn_pool[self.num % self.tpes], inputs))
self.num += 1
def get(self) -> list[NDArray[np.float32]] | None:
if self.queue.empty():
return None
fut = self.queue.get()
return fut.result()
def release(self) -> None:
self.pool.shutdown()
for rknn_lite in self.rknn_pool:
rknn_lite.release()
def __del__(self) -> None:
self.release()

@ -53,7 +53,10 @@ cpu = ["onnxruntime>=1.15.0,<2"]
cuda = ["onnxruntime-gpu>=1.17.0,<2"]
openvino = ["onnxruntime-openvino>=1.17.1,<1.19.0"]
armnn = ["onnxruntime>=1.15.0,<2"]
rknn = ["onnxruntime>=1.15.0,<2", "rknn-toolkit-lite2>=2.3.0,<3"]
rknn = [
"onnxruntime>=1.15.0,<2",
"pybind11>=2.11.0,<4.0",
]
rocm = []
[tool.uv]