mirror of https://github.com/immich-app/immich.git
refactor(ml): modularization and styling (#2835)
* basic refactor and styling * removed batching * module entrypoint * removed unused imports * model superclass, model cache now in app state * fixed cache dir and enforced abstract method --------- Co-authored-by: Alex Tran <alex.tran1502@gmail.com>pull/2942/head
parent
837ad24f58
commit
a2f5674bbb
@ -1,119 +0,0 @@
|
|||||||
import torch
|
|
||||||
from insightface.app import FaceAnalysis
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from transformers import pipeline, Pipeline
|
|
||||||
from sentence_transformers import SentenceTransformer
|
|
||||||
from typing import Any, BinaryIO
|
|
||||||
import cv2 as cv
|
|
||||||
import numpy as np
|
|
||||||
from PIL import Image
|
|
||||||
from config import settings
|
|
||||||
|
|
||||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
||||||
|
|
||||||
|
|
||||||
def get_model(model_name: str, model_type: str, **model_kwargs):
|
|
||||||
"""
|
|
||||||
Instantiates the specified model.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
model_name: Name of model in the model hub used for the task.
|
|
||||||
model_type: Model type or task, which determines which model zoo is used.
|
|
||||||
`facial-recognition` uses Insightface, while all other models use the HF Model Hub.
|
|
||||||
|
|
||||||
Options:
|
|
||||||
`image-classification`, `clip`,`facial-recognition`, `tokenizer`, `processor`
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
model: The requested model.
|
|
||||||
"""
|
|
||||||
|
|
||||||
cache_dir = _get_cache_dir(model_name, model_type)
|
|
||||||
match model_type:
|
|
||||||
case "facial-recognition":
|
|
||||||
model = _load_facial_recognition(
|
|
||||||
model_name, cache_dir=cache_dir, **model_kwargs
|
|
||||||
)
|
|
||||||
case "clip":
|
|
||||||
model = SentenceTransformer(
|
|
||||||
model_name, cache_folder=cache_dir, **model_kwargs
|
|
||||||
)
|
|
||||||
case _:
|
|
||||||
model = pipeline(
|
|
||||||
model_type,
|
|
||||||
model_name,
|
|
||||||
model_kwargs={"cache_dir": cache_dir, **model_kwargs},
|
|
||||||
)
|
|
||||||
|
|
||||||
return model
|
|
||||||
|
|
||||||
|
|
||||||
def run_classification(
|
|
||||||
model: Pipeline, image: Image, min_score: float | None = None
|
|
||||||
):
|
|
||||||
predictions: list[dict[str, Any]] = model(image) # type: ignore
|
|
||||||
result = {
|
|
||||||
tag
|
|
||||||
for pred in predictions
|
|
||||||
for tag in pred["label"].split(", ")
|
|
||||||
if min_score is None or pred["score"] >= min_score
|
|
||||||
}
|
|
||||||
|
|
||||||
return list(result)
|
|
||||||
|
|
||||||
|
|
||||||
def run_facial_recognition(
|
|
||||||
model: FaceAnalysis, image: bytes
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
file_bytes = np.frombuffer(image, dtype=np.uint8)
|
|
||||||
img = cv.imdecode(file_bytes, cv.IMREAD_COLOR)
|
|
||||||
height, width, _ = img.shape
|
|
||||||
results = []
|
|
||||||
faces = model.get(img)
|
|
||||||
|
|
||||||
for face in faces:
|
|
||||||
x1, y1, x2, y2 = face.bbox
|
|
||||||
|
|
||||||
results.append(
|
|
||||||
{
|
|
||||||
"imageWidth": width,
|
|
||||||
"imageHeight": height,
|
|
||||||
"boundingBox": {
|
|
||||||
"x1": round(x1),
|
|
||||||
"y1": round(y1),
|
|
||||||
"x2": round(x2),
|
|
||||||
"y2": round(y2),
|
|
||||||
},
|
|
||||||
"score": face.det_score.item(),
|
|
||||||
"embedding": face.normed_embedding.tolist(),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
return results
|
|
||||||
|
|
||||||
|
|
||||||
def _load_facial_recognition(
|
|
||||||
model_name: str,
|
|
||||||
min_face_score: float | None = None,
|
|
||||||
cache_dir: Path | str | None = None,
|
|
||||||
**model_kwargs,
|
|
||||||
):
|
|
||||||
if cache_dir is None:
|
|
||||||
cache_dir = _get_cache_dir(model_name, "facial-recognition")
|
|
||||||
if isinstance(cache_dir, Path):
|
|
||||||
cache_dir = cache_dir.as_posix()
|
|
||||||
if min_face_score is None:
|
|
||||||
min_face_score = settings.min_face_score
|
|
||||||
|
|
||||||
model = FaceAnalysis(
|
|
||||||
name=model_name,
|
|
||||||
root=cache_dir,
|
|
||||||
allowed_modules=["detection", "recognition"],
|
|
||||||
**model_kwargs,
|
|
||||||
)
|
|
||||||
model.prepare(ctx_id=0, det_thresh=min_face_score, det_size=(640, 640))
|
|
||||||
return model
|
|
||||||
|
|
||||||
|
|
||||||
def _get_cache_dir(model_name: str, model_type: str) -> Path:
|
|
||||||
return Path(settings.cache_folder, device, model_type, model_name)
|
|
||||||
@ -0,0 +1,3 @@
|
|||||||
|
from .clip import CLIPSTTextEncoder, CLIPSTVisionEncoder
|
||||||
|
from .facial_recognition import FaceRecognizer
|
||||||
|
from .image_classification import ImageClassifier
|
||||||
@ -0,0 +1,52 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import abstractmethod, ABC
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from ..config import get_cache_dir
|
||||||
|
from ..schemas import ModelType
|
||||||
|
|
||||||
|
|
||||||
|
class InferenceModel(ABC):
|
||||||
|
_model_type: ModelType
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
model_name: str,
|
||||||
|
cache_dir: Path | None = None,
|
||||||
|
):
|
||||||
|
self.model_name = model_name
|
||||||
|
self._cache_dir = (
|
||||||
|
cache_dir
|
||||||
|
if cache_dir is not None
|
||||||
|
else get_cache_dir(model_name, self.model_type)
|
||||||
|
)
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def predict(self, inputs: Any) -> Any:
|
||||||
|
...
|
||||||
|
|
||||||
|
@property
|
||||||
|
def model_type(self) -> ModelType:
|
||||||
|
return self._model_type
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cache_dir(self) -> Path:
|
||||||
|
return self._cache_dir
|
||||||
|
|
||||||
|
@cache_dir.setter
|
||||||
|
def cache_dir(self, cache_dir: Path):
|
||||||
|
self._cache_dir = cache_dir
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_model_type(
|
||||||
|
cls, model_type: ModelType, model_name, **model_kwargs
|
||||||
|
) -> InferenceModel:
|
||||||
|
subclasses = {
|
||||||
|
subclass._model_type: subclass for subclass in cls.__subclasses__()
|
||||||
|
}
|
||||||
|
if model_type not in subclasses:
|
||||||
|
raise ValueError(f"Unsupported model type: {model_type}")
|
||||||
|
|
||||||
|
return subclasses[model_type](model_name, **model_kwargs)
|
||||||
@ -0,0 +1,37 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from PIL.Image import Image
|
||||||
|
from sentence_transformers import SentenceTransformer
|
||||||
|
|
||||||
|
from ..schemas import ModelType
|
||||||
|
from .base import InferenceModel
|
||||||
|
|
||||||
|
|
||||||
|
class CLIPSTEncoder(InferenceModel):
|
||||||
|
_model_type = ModelType.CLIP
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
model_name: str,
|
||||||
|
cache_dir: Path | None = None,
|
||||||
|
**model_kwargs,
|
||||||
|
):
|
||||||
|
super().__init__(model_name, cache_dir)
|
||||||
|
self.model = SentenceTransformer(
|
||||||
|
self.model_name,
|
||||||
|
cache_folder=self.cache_dir.as_posix(),
|
||||||
|
**model_kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
def predict(self, image_or_text: Image | str) -> list[float]:
|
||||||
|
return self.model.encode(image_or_text).tolist()
|
||||||
|
|
||||||
|
|
||||||
|
# stubs to allow different behavior between the two in the future
|
||||||
|
# and handle loading different image and text clip models
|
||||||
|
class CLIPSTVisionEncoder(CLIPSTEncoder):
|
||||||
|
_model_type = ModelType.CLIP_VISION
|
||||||
|
|
||||||
|
|
||||||
|
class CLIPSTTextEncoder(CLIPSTEncoder):
|
||||||
|
_model_type = ModelType.CLIP_TEXT
|
||||||
@ -0,0 +1,59 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
from insightface.app import FaceAnalysis
|
||||||
|
|
||||||
|
from ..config import settings
|
||||||
|
from ..schemas import ModelType
|
||||||
|
from .base import InferenceModel
|
||||||
|
|
||||||
|
|
||||||
|
class FaceRecognizer(InferenceModel):
|
||||||
|
_model_type = ModelType.FACIAL_RECOGNITION
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
model_name: str,
|
||||||
|
min_score: float = settings.min_face_score,
|
||||||
|
cache_dir: Path | None = None,
|
||||||
|
**model_kwargs,
|
||||||
|
):
|
||||||
|
super().__init__(model_name, cache_dir)
|
||||||
|
self.min_score = min_score
|
||||||
|
model = FaceAnalysis(
|
||||||
|
name=self.model_name,
|
||||||
|
root=self.cache_dir.as_posix(),
|
||||||
|
allowed_modules=["detection", "recognition"],
|
||||||
|
**model_kwargs,
|
||||||
|
)
|
||||||
|
model.prepare(
|
||||||
|
ctx_id=0,
|
||||||
|
det_thresh=self.min_score,
|
||||||
|
det_size=(640, 640),
|
||||||
|
)
|
||||||
|
self.model = model
|
||||||
|
|
||||||
|
def predict(self, image: cv2.Mat) -> list[dict[str, Any]]:
|
||||||
|
height, width, _ = image.shape
|
||||||
|
results = []
|
||||||
|
faces = self.model.get(image)
|
||||||
|
|
||||||
|
for face in faces:
|
||||||
|
x1, y1, x2, y2 = face.bbox
|
||||||
|
|
||||||
|
results.append(
|
||||||
|
{
|
||||||
|
"imageWidth": width,
|
||||||
|
"imageHeight": height,
|
||||||
|
"boundingBox": {
|
||||||
|
"x1": round(x1),
|
||||||
|
"y1": round(y1),
|
||||||
|
"x2": round(x2),
|
||||||
|
"y2": round(y2),
|
||||||
|
},
|
||||||
|
"score": face.det_score.item(),
|
||||||
|
"embedding": face.normed_embedding.tolist(),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return results
|
||||||
@ -0,0 +1,40 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from PIL.Image import Image
|
||||||
|
from transformers.pipelines import pipeline
|
||||||
|
|
||||||
|
from ..config import settings
|
||||||
|
from ..schemas import ModelType
|
||||||
|
from .base import InferenceModel
|
||||||
|
|
||||||
|
|
||||||
|
class ImageClassifier(InferenceModel):
|
||||||
|
_model_type = ModelType.IMAGE_CLASSIFICATION
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
model_name: str,
|
||||||
|
min_score: float = settings.min_tag_score,
|
||||||
|
cache_dir: Path | None = None,
|
||||||
|
**model_kwargs,
|
||||||
|
):
|
||||||
|
super().__init__(model_name, cache_dir)
|
||||||
|
self.min_score = min_score
|
||||||
|
|
||||||
|
self.model = pipeline(
|
||||||
|
self.model_type.value,
|
||||||
|
self.model_name,
|
||||||
|
model_kwargs={"cache_dir": self.cache_dir, **model_kwargs},
|
||||||
|
)
|
||||||
|
|
||||||
|
def predict(self, image: Image) -> list[str]:
|
||||||
|
predictions = self.model(image)
|
||||||
|
tags = list(
|
||||||
|
{
|
||||||
|
tag
|
||||||
|
for pred in predictions
|
||||||
|
for tag in pred["label"].split(", ")
|
||||||
|
if pred["score"] >= self.min_score
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return tags
|
||||||
Loading…
Reference in New Issue