import copy import os import shutil import cv2 import gradio as gr import modules.scripts as scripts from modules import images from modules.processing import process_images from modules.shared import opts from PIL import Image import numpy as np _BASEDIR = "/controlnet-m2m" _BASEFILE = "animation" def get_all_frames(video_path): if video_path is None: return None cap = cv2.VideoCapture(video_path) frame_list = [] if not cap.isOpened(): return while True: ret, frame = cap.read() if ret: frame_list.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) else: return frame_list def get_min_frame_num(video_list): min_frame_num = -1 for video in video_list: if video is None: continue else: frame_num = len(video) print(frame_num) if min_frame_num < 0: min_frame_num = frame_num elif frame_num < min_frame_num: min_frame_num = frame_num return min_frame_num def pil2cv(image): new_image = np.array(image, dtype=np.uint8) if new_image.ndim == 2: pass elif new_image.shape[2] == 3: new_image = new_image[:, :, ::-1] elif new_image.shape[2] == 4: new_image = new_image[:, :, [2, 1, 0, 3]] return new_image def save_gif(path, image_list, name, duration): tmp_dir = path + "/tmp/" if os.path.isdir(tmp_dir): shutil.rmtree(tmp_dir) os.mkdir(tmp_dir) for i, image in enumerate(image_list): images.save_image(image, tmp_dir, f"output_{i}") os.makedirs(f"{path}{_BASEDIR}", exist_ok=True) image_list[0].save(f"{path}{_BASEDIR}/{name}.gif", save_all=True, append_images=image_list[1:], optimize=False, duration=duration, loop=0) class Script(scripts.Script): def title(self): return "controlnet m2m" def show(self, is_img2img): return True def ui(self, is_img2img): # How the script's is displayed in the UI. See https://gradio.app/docs/#components # for the different UI components you can use and how to create them. # Most UI components can return a value, such as a boolean for a checkbox. # The returned values are passed to the run method as parameters. ctrls_group = () max_models = opts.data.get("control_net_max_models_num", 1) with gr.Group(): with gr.Accordion("ControlNet-M2M", open = False): duration = gr.Slider(label=f"Duration", value=50.0, minimum=10.0, maximum=200.0, step=10, interactive=True, elem_id='controlnet_movie2movie_duration_slider') with gr.Tabs(): for i in range(max_models): with gr.Tab(f"ControlNet-{i}"): with gr.TabItem("Movie Input"): ctrls_group += (gr.Video(format='mp4', source='upload', elem_id = f"video_{i}"), ) with gr.TabItem("Image Input"): ctrls_group += (gr.Image(source='upload', brush_radius=20, mirror_webcam=False, type='numpy', tool='sketch', elem_id=f'image_{i}'), ) ctrls_group += (gr.Checkbox(label=f"Save preprocessed", value=False, elem_id = f"save_pre_{i}"),) ctrls_group += (duration,) return ctrls_group def run(self, p, *args): # This is where the additional processing is implemented. The parameters include # self, the model object "p" (a StableDiffusionProcessing class, see # processing.py), and the parameters returned by the ui method. # Custom functions can be defined here, and additional libraries can be imported # to be used in processing. The return value should be a Processed object, which is # what is returned by the process_images method. contents_num = opts.data.get("control_net_max_models_num", 1) arg_num = 3 item_list = [] video_list = [] for input_set in [tuple(args[:contents_num * arg_num][i:i+3]) for i in range(0, len(args[:contents_num * arg_num]), arg_num)]: if input_set[0] is not None: item_list.append([get_all_frames(input_set[0]), "video"]) video_list.append(get_all_frames(input_set[0])) if input_set[1] is not None: item_list.append([cv2.cvtColor(pil2cv(input_set[1]["image"]), cv2.COLOR_BGRA2RGB), "image"]) save_pre = list(args[2:contents_num * arg_num:3]) item_num = len(item_list) video_num = len(video_list) duration, = args[contents_num * arg_num:] frame_num = get_min_frame_num(video_list) if frame_num > 0: output_image_list = [] pre_output_image_list = [] for i in range(item_num): pre_output_image_list.append([]) for frame in range(frame_num): copy_p = copy.copy(p) copy_p.control_net_input_image = [] for item in item_list: if item[1] == "video": copy_p.control_net_input_image.append(item[0][frame]) elif item[1] == "image": copy_p.control_net_input_image.append(item[0]) else: continue proc = process_images(copy_p) img = proc.images[0] output_image_list.append(img) for i in range(len(save_pre)): if save_pre[i]: try: pre_output_image_list[i].append(proc.images[i + 1]) except: print(f"proc.images[{i} failed") copy_p.close() # filename format is seq-seed-animation.gif seq is 5 places left filled with 0 seq = images.get_next_sequence_number(f"{p.outpath_samples}{_BASEDIR}", "") filename = f"{seq:05}-{proc.seed}-{_BASEFILE}" save_gif(p.outpath_samples, output_image_list, filename, duration) proc.images = [f"{p.outpath_samples}{_BASEDIR}/{filename}.gif"] for i in range(len(save_pre)): if save_pre[i]: # control files add -controlX.gif where X is the controlnet number save_gif(p.outpath_samples, pre_output_image_list[i], f"{filename}-control{i}", duration) proc.images.append(f"{p.outpath_samples}{_BASEDIR}/{filename}-control{i}.gif") else: proc = process_images(p) return proc