123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- import os
- import gc
- import pandas as pd
- import numpy as np
- from typing import Tuple, List, Dict
- from io import BytesIO
- from PIL import Image
- from pathlib import Path
- from huggingface_hub import hf_hub_download
- from modules import shared
- from modules.deepbooru import re_special as tag_escape_pattern
- # i'm not sure if it's okay to add this file to the repository
- from . import dbimutils
- # select a device to process
- use_cpu = ('all' in shared.cmd_opts.use_cpu) or (
- 'interrogate' in shared.cmd_opts.use_cpu)
- if use_cpu:
- tf_device_name = '/cpu:0'
- else:
- tf_device_name = '/gpu:0'
- if shared.cmd_opts.device_id is not None:
- try:
- tf_device_name = f'/gpu:{int(shared.cmd_opts.device_id)}'
- except ValueError:
- print('--device-id is not a integer')
- class Interrogator:
- @staticmethod
- def postprocess_tags(
- tags: Dict[str, float],
- threshold=0.35,
- additional_tags: List[str] = [],
- exclude_tags: List[str] = [],
- sort_by_alphabetical_order=False,
- add_confident_as_weight=False,
- replace_underscore=False,
- replace_underscore_excludes: List[str] = [],
- escape_tag=False
- ) -> Dict[str, float]:
- for t in additional_tags:
- tags[t] = 1.0
- # those lines are totally not "pythonic" but looks better to me
- tags = {
- t: c
- # sort by tag name or confident
- for t, c in sorted(
- tags.items(),
- key=lambda i: i[0 if sort_by_alphabetical_order else 1],
- reverse=not sort_by_alphabetical_order
- )
- # filter tags
- if (
- c >= threshold
- and t not in exclude_tags
- )
- }
- new_tags = []
- for tag in list(tags):
- new_tag = tag
- if replace_underscore and tag not in replace_underscore_excludes:
- new_tag = new_tag.replace('_', ' ')
- if escape_tag:
- new_tag = tag_escape_pattern.sub(r'\\\1', new_tag)
- if add_confident_as_weight:
- new_tag = f'({new_tag}:{tags[tag]})'
- new_tags.append((new_tag, tags[tag]))
- tags = dict(new_tags)
- return tags
- def __init__(self, name: str) -> None:
- self.name = name
- def load(self):
- raise NotImplementedError()
- def unload(self) -> bool:
- unloaded = False
- if hasattr(self, 'model') and self.model is not None:
- del self.model
- unloaded = True
- print(f'Unloaded {self.name}')
- if hasattr(self, 'tags'):
- del self.tags
- return unloaded
- def interrogate(
- self,
- image: Image
- ) -> Tuple[
- Dict[str, float], # rating confidents
- Dict[str, float] # tag confidents
- ]:
- raise NotImplementedError()
- class DeepDanbooruInterrogator(Interrogator):
- def __init__(self, name: str, project_path: os.PathLike) -> None:
- super().__init__(name)
- self.project_path = project_path
- def load(self) -> None:
- print(f'Loading {self.name} from {str(self.project_path)}')
- # deepdanbooru package is not include in web-sd anymore
- # https://github.com/AUTOMATIC1111/stable-diffusion-webui/commit/c81d440d876dfd2ab3560410f37442ef56fc663
- from launch import is_installed, run_pip
- if not is_installed('deepdanbooru'):
- package = os.environ.get(
- 'DEEPDANBOORU_PACKAGE',
- 'git+https://github.com/KichangKim/DeepDanbooru.git@d91a2963bf87c6a770d74894667e9ffa9f6de7ff'
- )
- run_pip(
- f'install {package} tensorflow tensorflow-io', 'deepdanbooru')
- import tensorflow as tf
- # tensorflow maps nearly all vram by default, so we limit this
- # https://www.tensorflow.org/guide/gpu#limiting_gpu_memory_growth
- # TODO: only run on the first run
- for device in tf.config.experimental.list_physical_devices('GPU'):
- tf.config.experimental.set_memory_growth(device, True)
- with tf.device(tf_device_name):
- import deepdanbooru.project as ddp
- self.model = ddp.load_model_from_project(
- project_path=self.project_path,
- compile_model=False
- )
- print(f'Loaded {self.name} model from {str(self.project_path)}')
- self.tags = ddp.load_tags_from_project(
- project_path=self.project_path
- )
- def unload(self) -> bool:
- # unloaded = super().unload()
- # if unloaded:
- # # tensorflow suck
- # # https://github.com/keras-team/keras/issues/2102
- # import tensorflow as tf
- # tf.keras.backend.clear_session()
- # gc.collect()
- # return unloaded
- # There is a bug in Keras where it is not possible to release a model that has been loaded into memory.
- # Downgrading to keras==2.1.6 may solve the issue, but it may cause compatibility issues with other packages.
- # Using subprocess to create a new process may also solve the problem, but it can be too complex (like Automatic1111 did).
- # It seems that for now, the best option is to keep the model in memory, as most users use the Waifu Diffusion model with onnx.
- return False
- def interrogate(
- self,
- image: Image
- ) -> Tuple[
- Dict[str, float], # rating confidents
- Dict[str, float] # tag confidents
- ]:
- # init model
- if not hasattr(self, 'model') or self.model is None:
- self.load()
- import deepdanbooru.data as ddd
- # convert an image to fit the model
- image_bufs = BytesIO()
- image.save(image_bufs, format='PNG')
- image = ddd.load_image_for_evaluate(
- image_bufs,
- self.model.input_shape[2],
- self.model.input_shape[1]
- )
- image = image.reshape((1, *image.shape[0:3]))
- # evaluate model
- result = self.model.predict(image)
- confidents = result[0].tolist()
- ratings = {}
- tags = {}
- for i, tag in enumerate(self.tags):
- tags[tag] = confidents[i]
- return ratings, tags
- class WaifuDiffusionInterrogator(Interrogator):
- def __init__(
- self,
- name: str,
- model_path='model.onnx',
- tags_path='selected_tags.csv',
- **kwargs
- ) -> None:
- super().__init__(name)
- self.model_path = model_path
- self.tags_path = tags_path
- self.kwargs = kwargs
- def download(self) -> Tuple[os.PathLike, os.PathLike]:
- print(f"Loading {self.name} model file from {self.kwargs['repo_id']}")
- model_path = Path(hf_hub_download(
- **self.kwargs, filename=self.model_path))
- tags_path = Path(hf_hub_download(
- **self.kwargs, filename=self.tags_path))
- return model_path, tags_path
- def load(self) -> None:
- model_path, tags_path = self.download()
- # only one of these packages should be installed at a time in any one environment
- # https://onnxruntime.ai/docs/get-started/with-python.html#install-onnx-runtime
- # TODO: remove old package when the environment changes?
- from launch import is_installed, run_pip
- if not is_installed('onnxruntime'):
- package = os.environ.get(
- 'ONNXRUNTIME_PACKAGE',
- 'onnxruntime-gpu'
- )
- run_pip(f'install {package}', 'onnxruntime')
- from onnxruntime import InferenceSession
- # https://onnxruntime.ai/docs/execution-providers/
- # https://github.com/toriato/stable-diffusion-webui-wd14-tagger/commit/e4ec460122cf674bbf984df30cdb10b4370c1224#r92654958
- providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
- if use_cpu:
- providers.pop(0)
- self.model = InferenceSession(str(model_path), providers=providers)
- print(f'Loaded {self.name} model from {model_path}')
- self.tags = pd.read_csv(tags_path)
- def interrogate(
- self,
- image: Image
- ) -> Tuple[
- Dict[str, float], # rating confidents
- Dict[str, float] # tag confidents
- ]:
- # init model
- if not hasattr(self, 'model') or self.model is None:
- self.load()
- # code for converting the image and running the model is taken from the link below
- # thanks, SmilingWolf!
- # https://huggingface.co/spaces/SmilingWolf/wd-v1-4-tags/blob/main/app.py
- # convert an image to fit the model
- _, height, _, _ = self.model.get_inputs()[0].shape
- # alpha to white
- image = image.convert('RGBA')
- new_image = Image.new('RGBA', image.size, 'WHITE')
- new_image.paste(image, mask=image)
- image = new_image.convert('RGB')
- image = np.asarray(image)
- # PIL RGB to OpenCV BGR
- image = image[:, :, ::-1]
- image = dbimutils.make_square(image, height)
- image = dbimutils.smart_resize(image, height)
- image = image.astype(np.float32)
- image = np.expand_dims(image, 0)
- # evaluate model
- input_name = self.model.get_inputs()[0].name
- label_name = self.model.get_outputs()[0].name
- confidents = self.model.run([label_name], {input_name: image})[0]
- tags = self.tags[:][['name']]
- tags['confidents'] = confidents[0]
- # first 4 items are for rating (general, sensitive, questionable, explicit)
- ratings = dict(tags[:4].values)
- # rest are regular tags
- tags = dict(tags[4:].values)
- return ratings, tags
|