本文基于ComfyUI API编写了类似于webUI的Gradio交互式界面,支持文生图/图生图(SD1.x,SD2.x,SDXL,Stable Cascade),Lora,ControlNet,图生视频(SVD_xt_1_1),图像修复(FaceDetailer),图像放大(Extras),图片/API工作流信息读取(Info)。
1. 在线体验
本文代码已部署到百度飞桨AI Studio平台,以供大家在线体验Stable Diffusion ComfyUI/webUI 原版界面及自制Gradio界面。
项目链接:Stable Diffusion ComfyUI 在线体验
2. 自制Gradio界面展示
Stable Diffusion 文生图/图生图界面:
Stable Cascade 文生图/图生图界面:
Stable Video Diffusion 图生视频界面:
图片放大界面:
图片/API工作流信息读取界面:
3. Gradio界面设计及ComfyUI API调用源码
import io import json import os import random import re import subprocess import sys import urllib.parse import uuid sys.path.append("/home/aistudio/ComfyUI/venv/lib/python3.10/site-packages") import gradio as gr import requests import websocket from collections import OrderedDict, deque from PIL import Image class Default: # 1表示启用,0表示禁用 design_mode = 0 lora_weight = 0.8 controlnet_num = 5 controlnet_saveimage = 1 facedetailer_num = 3 prompt = "(best quality:1), (high quality:1), detailed/(extreme, highly, ultra/), realistic, 1girl/(beautiful, delicate, perfect/), " negative_prompt = "(worst quality:1), (low quality:1), (normal quality:1), lowres, signature, blurry, watermark, duplicate, bad link, plump, bad anatomy, extra arms, extra digits, missing finger, bad hands, bad feet, deformed, error, mutation, text" if design_mode == 1: width = 64 hight = 64 steps = 2 else: width = 512 hight = 768 steps = 20 class Initial: os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" client_id = str(uuid.uuid4()) server_address = "127.0.0.1:8188" if Default.design_mode == 0: cmd = "ps -eo pid,args | grep 'export GRADIO_SERVER_PORT=' | awk '{print $8, $14}'" ps_output = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True).stdout.splitlines() for i in ps_output: if "/home/aistudio/ComfyUI.gradio.py" in i: port = i.split(" ")[0].split("=")[1] server_address = f"127.0.0.1:{port}" output_dir = os.path.join(os.getcwd(), "ComfyUI/output/") uploaded_image = {} class Choices: ws = websocket.WebSocket() ws.connect("ws://{}/ws?clientId={}".format(Initial.server_address, Initial.client_id)) object_info = requests.get(url="http://{}/object_info".format(Initial.server_address)).json() embedding = requests.get(url="http://{}/embeddings".format(Initial.server_address)).json() ws.close() ckpt = [] ckpt_list = {} ckpt_name = object_info["ImageOnlyCheckpointLoader"]["input"]["required"]["ckpt_name"][0] hidden_ckpt = ["stable_cascade_stage_c.safetensors", "stable_cascade_stage_b.safetensors", "svd_xt_1_1.safetensors", "control_v11p_sd15_canny.safetensors", "control_v11f1p_sd15_depth.safetensors", "control_v11p_sd15_openpose.safetensors"] for i in ckpt_name: path, file = os.path.split(i) if file not in hidden_ckpt: ckpt.append(file) ckpt_list[file] = i ckpt = sorted(ckpt) controlnet_model = [] controlnet_model_list = {} controlnet_name = object_info["ControlNetLoader"]["input"]["required"]["control_net_name"][0] for i in controlnet_name: path, file = os.path.split(i) controlnet_model.append(file) controlnet_model_list[file] = i controlnet_model = sorted(controlnet_model) preprocessor = ["Canny"] if "AIO_Preprocessor" in object_info: preprocessor = ["none", "Canny", "CannyEdgePreprocessor", "DepthAnythingPreprocessor", "DWPreprocessor", "OpenposePreprocessor"] for i in sorted(object_info["AIO_Preprocessor"]["input"]["optional"]["preprocessor"][0]): if i not in preprocessor: preprocessor.append(i) if "FaceDetailer" in object_info: facedetailer_detector_model = [] facedetailer_detector_model_list = {} facedetailer_detector_model_name = object_info["UltralyticsDetectorProvider"]["input"]["required"]["model_name"][0] for i in facedetailer_detector_model_name: path, file = os.path.split(i) facedetailer_detector_model.append(file) facedetailer_detector_model_list[file] = i facedetailer_detector_model = sorted(facedetailer_detector_model) lora = object_info["LoraLoader"]["input"]["required"]["lora_name"][0] sampler = object_info["KSampler"]["input"]["required"]["sampler_name"][0] scheduler = object_info["KSampler"]["input"]["required"]["scheduler"][0] upscale_method = object_info["ImageScaleBy"]["input"]["required"]["upscale_method"][0] upscale_model = object_info["UpscaleModelLoader"]["input"]["required"]["model_name"][0] vae = ["Automatic"] for i in sorted(object_info["VAELoader"]["input"]["required"]["vae_name"][0]): vae.append(i) class Function: def format_prompt(prompt): prompt = re.sub(r"\s+,", ",", prompt) prompt = re.sub(r"\s+", " ", prompt) prompt = re.sub(",,+", ",", prompt) prompt = re.sub(",", ", ", prompt) prompt = re.sub(r"\s+", " ", prompt) prompt = re.sub(r"^,", "", prompt) prompt = re.sub(r"^ ", "", prompt) prompt = re.sub(r" $", "", prompt) prompt = re.sub(r",$", "", prompt) prompt = re.sub(": ", ":", prompt) return prompt def get_model_path(model_name): return Choices.ckpt_list[model_name] def gen_seed(seed): seed = int(seed) if seed < 0: seed = random.randint(0, 18446744073709551615) if seed > 18446744073709551615: seed = 18446744073709551615 return seed def initialize(): Lora.cache = {} Upscale.cache = {} UpscaleWithModel.cache = {} ControlNet.cache = {} FaceDetailer.cache = {} def upload_image(image): buffer = io.BytesIO() image.save(buffer, format="png") image = buffer.getbuffer() image_hash = hash(image.tobytes()) if image_hash in Initial.uploaded_image: return Initial.uploaded_image[image_hash] image_name = str(uuid.uuid4()) + ".png" Initial.uploaded_image[image_hash] = image_name image_file = {"image": (image_name, image)} ws = websocket.WebSocket() ws.connect("ws://{}/ws?clientId={}".format(Initial.server_address, Initial.client_id)) requests.post(url="http://{}/upload/image".format(Initial.server_address), files=image_file) ws.close() return image_name def order_workflow(workflow): link_list = {} for node in workflow: node_link = [] for input in workflow[node]["inputs"]: if isinstance(workflow[node]["inputs"][input], list): node_link.append(workflow[node]["inputs"][input][0]) link_list[node] = node_link in_degree = {v: 0 for v in link_list} for node in link_list: for neighbor in link_list[node]: in_degree[neighbor] += 1 queue = deque([node for node in in_degree if in_degree[node] == 0]) order_list = [] while queue: node = queue.popleft() order_list.append(node) for neighbor in link_list[node]: in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) order_list = order_list[::-1] max_nodes = 1000 new_node_id = max_nodes * 10 + 1 workflow_string = json.dumps(workflow) for node in order_list: workflow_string = workflow_string.replace(f'"{node}"', f'"{new_node_id}"') new_node_id += 1 workflow = json.loads(workflow_string) workflow = OrderedDict(sorted(workflow.items())) new_node_id = 1 workflow_string = json.dumps(workflow) for node in workflow: workflow_string = workflow_string.replace(f'"{node}"', f'"{new_node_id}"') new_node_id += 1 workflow = json.loads(workflow_string) for node in workflow: if "_meta" in workflow[node]: del workflow[node]["_meta"] return workflow def post_interrupt(): Initial.interrupt = True ws = websocket.WebSocket() ws.connect("ws://{}/ws?clientId={}".format(Initial.server_address, Initial.client_id)) requests.post(url="http://{}/interrupt".format(Initial.server_address)) ws.close() def add_embedding(embedding, negative_prompt): for i in Choices.embedding: negative_prompt = negative_prompt.replace(f"embedding:{i},", "") negative_prompt = Function.format_prompt(negative_prompt) for i in embedding[::-1]: negative_prompt = f"embedding:{i}, {negative_prompt}" return negative_prompt def gen_image(workflow, counter, batch_count, progress): if counter == 1: progress(0, desc="Processing...") if batch_count == 1: batch_info = "" else: batch_info = f"Batch {counter}/{batch_count}: " workflow = Function.order_workflow(workflow) current_progress = 0 Initial.interrupt = False ws = websocket.WebSocket() ws.connect("ws://{}/ws?clientId={}".format(Initial.server_address, Initial.client_id)) data = {"prompt": workflow, "client_id": Initial.client_id} prompt_id = requests.post(url="http://{}/prompt".format(Initial.server_address), json=data).json()["prompt_id"] while True: try: ws.settimeout(0.1) wsrecv = ws.recv() if isinstance(wsrecv, str): data = json.loads(wsrecv)["data"] if "node" in data: if data["node"] is not None: if "value" in data and "max" in data: if data["max"] > 1: current_progress = data["value"]/data["max"] progress(current_progress, desc=f"{batch_info}" + workflow[data["node"]]["class_type"] + " " + str(data["value"]) + "/" + str(data["max"])) else: progress(current_progress, desc=f"{batch_info}" + workflow[data["node"]]["class_type"]) if data["node"] is None and data["prompt_id"] == prompt_id: break else: continue except websocket.WebSocketTimeoutException: if Initial.interrupt is True: ws.close() return None, None history = requests.get(url="http://{}/history/{}".format(Initial.server_address, prompt_id)).json()[prompt_id] images = [] file_path = "" for node_id in history["outputs"]: node_output = history["outputs"][node_id] if "images" in node_output: for image in node_output["images"]: file_path = Initial.output_dir + image["filename"] data = {"filename": image["filename"], "subfolder": image["subfolder"], "type": image["type"]} url_values = urllib.parse.urlencode(data) image_data = requests.get("http://{}/view?{}".format(Initial.server_address, url_values)).content image = Image.open(io.BytesIO(image_data)) images.append(image) ws.close() return images, file_path def get_gallery_index(evt: gr.SelectData): return evt.index def get_image_info(image_pil): image_info=[] if image_pil is None: return for key, value in image_pil.info.items(): image_info.append(value) if image_info != []: image_info = image_info[0] if image_info == 0: image_info = "None" else: image_info = "None" return image_info def send_to(data, index): if data == [] or data is None: return None return data[index] class Lora: cache = {} def add_node(module, workflow, node_id, model_port, clip_port): for lora in Lora.cache[module]: strength_model = Lora.cache[module][lora] strength_clip = Lora.cache[module][lora] node_id += 1 workflow[str(node_id)] = {"inputs": {"lora_name": lora, "strength_model": strength_model, "strength_clip": strength_clip, "model": model_port, "clip": clip_port}, "class_type": "LoraLoader"} model_port = [str(node_id), 0] clip_port = [str(node_id), 1] return workflow, node_id, model_port, clip_port def update_cache(module, lora, lora_weight): if Initial.initialized is False: Function.initialize() if lora == []: Lora.cache[module] = {} return True, [], gr.update(value="", visible=False) lora_list = {} for i in lora_weight.split("<"): for j in i.split(">"): if j != "" and ":" in j: lora_name, weight = j.split(":") lora_list[lora_name] = weight lora_weight = "" Lora.cache[module] = {} for i in lora: if i in lora_list: weight = lora_list[i] else: weight = Default.lora_weight if lora.index(i) == 0: lora_weight = f"<{i}:{weight}>" else: lora_weight = f"{lora_weight}\n\n<{i}:{weight}>" if weight != "": weight = float(weight) Lora.cache[module][i] = weight return True, gr.update(), gr.update(value=lora_weight, visible=True) def blocks(module): module = gr.Textbox(value=module, visible=False) lora = gr.Dropdown(Choices.lora, label="Lora", multiselect=True, interactive=True) lora_weight = gr.Textbox(label="Lora weight | Lora 权重", visible=False) for gr_block in [lora, lora_weight]: gr_block.change(fn=Lora.update_cache, inputs=[module, lora, lora_weight], outputs=[Initial.initialized, lora, lora_weight]) class Upscale: cache = {} def add_node(module, workflow, node_id, image_port): upscale_method = Upscale.cache[module]["upscale_method"] scale_by = Upscale.cache[module]["scale_by"] node_id += 1 workflow[str(node_id)] = {"inputs": {"upscale_method": upscale_method, "scale_by": scale_by, "image": image_port}, "class_type": "ImageScaleBy"} image_port = [str(node_id), 0] return workflow, node_id, image_port def auto_enable(scale_by): if scale_by > 1: return True else: return False def update_cache(module, enable, upscale_method, scale_by): if Initial.initialized is False: Function.initialize() if module not in Upscale.cache: Upscale.cache[module] = {} if enable is True: Upscale.cache[module]["upscale_method"] = upscale_method Upscale.cache[module]["scale_by"] = scale_by else: del Upscale.cache[module] return True def blocks(module): module = gr.Textbox(value=module, visible=False) enable = gr.Checkbox(label="Enable(放大系数大于1后自动启用)") with gr.Row(): upscale_method = gr.Dropdown(Choices.upscale_method, label="Upscale method | 放大方法", value=Choices.upscale_method[-1]) scale_by = gr.Slider(minimum=1, maximum=8, step=1, label="Scale by | 放大系数", value=1) scale_by.release(fn=Upscale.auto_enable, inputs=[scale_by], outputs=[enable]) inputs = [module, enable, upscale_method, scale_by] for gr_block in inputs: if type(gr_block) is gr.components.slider.Slider: gr_block.release(fn=Upscale.update_cache, inputs=inputs, outputs=[Initial.initialized]) else: gr_block.change(fn=Upscale.update_cache, inputs=inputs, outputs=[Initial.initialized]) class UpscaleWithModel: cache = {} def add_node(module, workflow, node_id, image_port): upscale_model = UpscaleWithModel.cache[module]["upscale_model"] node_id += 1 workflow[str(node_id)] = {"inputs": {"model_name": upscale_model}, "class_type": "UpscaleModelLoader"} upscale_model_port = [str(node_id), 0] node_id += 1 workflow[str(node_id)] = {"inputs": {"upscale_model": upscale_model_port, "image": image_port}, "class_type": "ImageUpscaleWithModel"} image_port = [str(node_id), 0] return workflow, node_id, image_port def update_cache(module, enable, upscale_model): if Initial.initialized is False: Function.initialize() if module not in UpscaleWithModel.cache: UpscaleWithModel.cache[module] = {} if enable is True: UpscaleWithModel.cache[module]["upscale_model"] = upscale_model else: del UpscaleWithModel.cache[module] return True def blocks(module): module = gr.Textbox(value=module, visible=False) enable = gr.Checkbox(label="Enable") upscale_model = gr.Dropdown(Choices.upscale_model, label="Upscale model | 超分模型", value=Choices.upscale_model[0]) inputs = [module, enable, upscale_model] for gr_block in inputs: gr_block.change(fn=UpscaleWithModel.update_cache, inputs=inputs, outputs=[Initial.initialized]) class ControlNet: cache = {} model_preprocessor_list = { "control_v11e_sd15_ip2p.safetensors": [], "control_v11e_sd15_shuffle.safetensors": ["ShufflePreprocessor"], "control_v11f1e_sd15_tile.bin": ["TilePreprocessor", "TTPlanet_TileGF_Preprocessor", "TTPlanet_TileSimple_Preprocessor"], "control_v11f1p_sd15_depth.safetensors": ["DepthAnythingPreprocessor", "LeReS-DepthMapPreprocessor", "MiDaS-NormalMapPreprocessor", "MeshGraphormer-DepthMapPreprocessor", "MeshGraphormer+ImpactDetector-DepthMapPreprocessor", "MiDaS-DepthMapPreprocessor", "Zoe_DepthAnythingPreprocessor", "Zoe-DepthMapPreprocessor"], "control_v11p_sd15_canny.safetensors": ["Canny", "CannyEdgePreprocessor"], "control_v11p_sd15_inpaint.safetensors": [], "control_v11p_sd15_lineart.safetensors": ["LineArtPreprocessor", "LineartStandardPreprocessor"], "control_v11p_sd15_mlsd.safetensors": ["M-LSDPreprocessor"], "control_v11p_sd15_normalbae.safetensors": ["BAE-NormalMapPreprocessor", "DSINE-NormalMapPreprocessor"], "control_v11p_sd15_openpose.safetensors": ["DWPreprocessor", "OpenposePreprocessor", "DensePosePreprocessor"], "control_v11p_sd15_scribble.safetensors": ["ScribblePreprocessor", "Scribble_XDoG_Preprocessor", "Scribble_PiDiNet_Preprocessor", "FakeScribblePreprocessor"], "control_v11p_sd15_seg.safetensors": ["AnimeFace_SemSegPreprocessor", "OneFormer-COCO-SemSegPreprocessor", "OneFormer-ADE20K-SemSegPreprocessor", "SemSegPreprocessor", "UniFormer-SemSegPreprocessor"], "control_v11p_sd15_softedge.safetensors": ["HEDPreprocessor", "PiDiNetPreprocessor", "TEEDPreprocessor", "DiffusionEdge_Preprocessor"], "control_v11p_sd15s2_lineart_anime.safetensors": ["AnimeLineArtPreprocessor", "Manga2Anime_LineArt_Preprocessor"], "control_scribble.safetensors": ["BinaryPreprocessor"], "ioclab_sd15_recolor.safetensors": ["ImageLuminanceDetector", "ImageIntensityDetector"], "control_sd15_animal_openpose_fp16.pth": ["AnimalPosePreprocessor"], "controlnet_sd21_laion_face_v2.safetensors": ["MediaPipe-FaceMeshPreprocessor"] } def add_node(module, counter, workflow, node_id, positive_port, negative_port): for unit_id in ControlNet.cache[module]: preprocessor = ControlNet.cache[module][unit_id]["preprocessor"] model = ControlNet.cache[module][unit_id]["model"] input_image = ControlNet.cache[module][unit_id]["input_image"] resolution = ControlNet.cache[module][unit_id]["resolution"] strength = ControlNet.cache[module][unit_id]["strength"] start_percent = ControlNet.cache[module][unit_id]["start_percent"] end_percent = ControlNet.cache[module][unit_id]["end_percent"] node_id += 1 workflow[str(node_id)] = {"inputs": {"image": input_image, "upload": "image"}, "class_type": "LoadImage"} image_port = [str(node_id), 0] if preprocessor == "Canny": node_id += 1 workflow[str(node_id)] = {"inputs": {"low_threshold": 0.3, "high_threshold": 0.7, "image": image_port}, "class_type": "Canny"} image_port = [str(node_id), 0] else: node_id += 1 workflow[str(node_id)] = {"inputs": {"preprocessor": preprocessor, "resolution": resolution, "image": image_port}, "class_type": "AIO_Preprocessor"} image_port = [str(node_id), 0] if counter == 1 and Default.controlnet_saveimage == 1: node_id += 1 workflow[str(node_id)] = {"inputs": {"filename_prefix": "ControlNet", "images": image_port}, "class_type": "SaveImage"} node_id += 1 workflow[str(node_id)] = {"inputs": {"control_net_name": model}, "class_type": "ControlNetLoader"} control_net_port = [str(node_id), 0] node_id += 1 workflow[str(node_id)] = {"inputs": {"strength": strength, "start_percent": start_percent, "end_percent": end_percent, "positive": positive_port, "negative": negative_port, "control_net": control_net_port, "image": image_port}, "class_type": "ControlNetApplyAdvanced"} positive_port = [str(node_id), 0] negative_port = [str(node_id), 1] return workflow, node_id, positive_port, negative_port def auto_enable(): return True def auto_select_model(preprocessor): for model in Choices.controlnet_model: if model in ControlNet.model_preprocessor_list: if preprocessor in ControlNet.model_preprocessor_list[model]: return gr.update(value=model) return gr.update(value="未定义/检测到对应的模型,请自行选择!") def preprocess(unit_id, preview, preprocessor, input_image, resolution, progress=gr.Progress()): if preview is False or input_image is None: return input_image = Function.upload_image(input_image) workflow = {} node_id = 1 workflow[str(node_id)] = {"inputs": {"image": input_image, "upload": "image"}, "class_type": "LoadImage"} image_port = [str(node_id), 0] if preprocessor == "Canny": node_id += 1 workflow[str(node_id)] = {"inputs": {"low_threshold": 0.3, "high_threshold": 0.7, "image": image_port}, "class_type": "Canny"} image_port = [str(node_id), 0] else: node_id += 1 workflow[str(node_id)] = {"inputs": {"preprocessor": preprocessor, "resolution": resolution, "image": image_port}, "class_type": "AIO_Preprocessor"} image_port = [str(node_id), 0] node_id += 1 workflow[str(node_id)] = {"inputs": {"images": image_port}, "class_type": "PreviewImage"} output = Function.gen_image(workflow, 1, 1, progress)[0] if output is not None: output = output[0] return output def update_cache(module, unit_id, enable, preprocessor, model, input_image, resolution, strength, start_percent, end_percent): if Initial.initialized is False: Function.initialize() if module not in ControlNet.cache: ControlNet.cache[module] = {} ControlNet.cache[module][unit_id] = {} if input_image is None: del ControlNet.cache[module][unit_id] return True, False if model not in Choices.controlnet_model: del ControlNet.cache[module][unit_id] return True, False if enable is True: ControlNet.cache[module][unit_id]["preprocessor"] = preprocessor ControlNet.cache[module][unit_id]["model"] = Choices.controlnet_model_list[model] ControlNet.cache[module][unit_id]["input_image"] = Function.upload_image(input_image) ControlNet.cache[module][unit_id]["resolution"] = resolution ControlNet.cache[module][unit_id]["strength"] = strength ControlNet.cache[module][unit_id]["start_percent"] = start_percent ControlNet.cache[module][unit_id]["end_percent"] = end_percent else: del ControlNet.cache[module][unit_id] return True, gr.update() def unit(module, i): module = gr.Textbox(value=module, visible=False) unit_id = gr.Textbox(value=i, visible=False) with gr.Row(): enable = gr.Checkbox(label="Enable(上传图片后自动启用)") preview = gr.Checkbox(label="Preview") with gr.Row(): preprocessor = gr.Dropdown(Choices.preprocessor, label="Preprocessor", value="Canny") model = gr.Dropdown(Choices.controlnet_model, label="ControlNet model", value="control_v11p_sd15_canny.safetensors") with gr.Row(): input_image = gr.Image(type="pil") preprocess_preview = gr.Image(label="Preprocessor preview") with gr.Row(): resolution = gr.Slider(label="Resolution", minimum=64, maximum=2048, step=64, value=512) strength = gr.Slider(label="Strength", minimum=0, maximum=2, step=0.01, value=1) with gr.Row(): start_percent = gr.Slider(label="Start percent", minimum=0, maximum=1, step=0.01, value=0) end_percent = gr.Slider(label="End percent", minimum=0, maximum=1, step=0.01, value=1) input_image.upload(fn=ControlNet.auto_enable, inputs=None, outputs=[enable]) preprocessor.change(fn=ControlNet.auto_select_model, inputs=[preprocessor], outputs=[model]) for gr_block in [preview, preprocessor, input_image]: gr_block.change(fn=ControlNet.preprocess, inputs=[unit_id, preview, preprocessor, input_image, resolution], outputs=[preprocess_preview]) inputs = [module, unit_id, enable, preprocessor, model, input_image, resolution, strength, start_percent, end_percent] for gr_block in inputs: if type(gr_block) is gr.components.slider.Slider: gr_block.release(fn=ControlNet.update_cache, inputs=inputs, outputs=[Initial.initialized, enable]) else: gr_block.change(fn=ControlNet.update_cache, inputs=inputs, outputs=[Initial.initialized, enable]) def blocks(module): with gr.Tab(label="控制网络"): if Default.controlnet_num == 1: ControlNet.unit(module, 1) else: for i in range(Default.controlnet_num): with gr.Tab(label=f"ControlNet Unit {i + 1}"): ControlNet.unit(module, i + 1) class FaceDetailer: cache = {} def add_node(module, workflow, node_id, image_port, model_port, clip_port, vae_port, positive_port, negative_port, seed, steps, cfg, sampler_name, scheduler): for unit_id in FaceDetailer.cache[module]: model = Choices.facedetailer_detector_model_list[FaceDetailer.cache[module][unit_id]["model"]] node_id += 1 workflow[str(node_id)] = {"inputs": {"model_name": model}, "class_type": "UltralyticsDetectorProvider"} bbox_detector_port = [str(node_id), 0] segm_detector_opt_port = [str(node_id), 1] node_id += 1 workflow[str(node_id)] = {"inputs": {"model_name": "sam_vit_b_01ec64.pth", "device_mode": "AUTO"}, "class_type": "SAMLoader"} sam_model_opt_port = [str(node_id), 0] node_id += 1 workflow[str(node_id)] = {"inputs": {"guide_size": 384, "guide_size_for": "True", "max_size": 1024, "seed": seed, "steps": steps, "cfg": cfg, "sampler_name": sampler_name, "scheduler": scheduler, "denoise": 0.5, "feather": 5, "noise_mask": "True", "force_inpaint": "True", "bbox_threshold": 0.5, "bbox_dilation": 10, "bbox_crop_factor": 3, "sam_detection_hint": "center-1", "sam_dilation": 0, "sam_threshold": 0.93, "sam_bbox_expansion": 0, "sam_mask_hint_threshold": 0.7, "sam_mask_hint_use_negative": "False", "drop_size": 10, "wildcard": "", "cycle": 1, "inpaint_model": "False", "noise_mask_feather": 20, "image": image_port, "model": model_port, "clip": clip_port, "vae": vae_port, "positive": positive_port, "negative": negative_port, "bbox_detector": bbox_detector_port, "sam_model_opt": sam_model_opt_port, "segm_detector_opt": segm_detector_opt_port}, "class_type": "FaceDetailer"} image_port = [str(node_id), 0] return workflow, node_id, image_port def update_cache(module, unit_id, enable, model): if Initial.initialized is False: Function.initialize() if module not in FaceDetailer.cache: FaceDetailer.cache[module] = {} FaceDetailer.cache[module][unit_id] = {} if enable is True: FaceDetailer.cache[module][unit_id]["model"] = model else: del FaceDetailer.cache[module][unit_id] return True def unit(module, i): module = gr.Textbox(value=module, visible=False) unit_id = gr.Textbox(value=i, visible=False) enable = gr.Checkbox(label="Enable") if i == 1: model = gr.Dropdown(Choices.facedetailer_detector_model, label="Detector model", value="face_yolov8m.pt") if i == 2: model = gr.Dropdown(Choices.facedetailer_detector_model, label="Detector model", value="hand_yolov8s.pt") if i == 3: model = gr.Dropdown(Choices.facedetailer_detector_model, label="Detector model", value="person_yolov8m-seg.pt") inputs = [module, unit_id, enable, model] for gr_block in inputs: gr_block.change(fn=FaceDetailer.update_cache, inputs=inputs, outputs=[Initial.initialized]) def blocks(module): with gr.Tab(label="图像修复"): if Default.facedetailer_num == 1: FaceDetailer.unit(module, 1) else: with gr.Row(): for i in range(Default.facedetailer_num): with gr.Column(): with gr.Tab(label=f"FaceDetailer Unit {i + 1}"): FaceDetailer.unit(module, i + 1) if Default.facedetailer_num % 2 != 0: with gr.Column(): gr.HTML("") class Postprocess: def add_node(module, *args): if module == "SD": workflow, node_id, image_port, model_port, clip_port, vae_port, positive_port, negative_port, seed, steps, cfg, sampler_name, scheduler = args else: workflow, node_id, image_port = args if module in FaceDetailer.cache: workflow, node_id, image_port = FaceDetailer.add_node(module, workflow, node_id, image_port, model_port, clip_port, vae_port, positive_port, negative_port, seed, steps, cfg, sampler_name, scheduler) if module in Upscale.cache: workflow, node_id, image_port = Upscale.add_node(module, workflow, node_id, image_port) if module in UpscaleWithModel.cache: workflow, node_id, image_port = UpscaleWithModel.add_node(module, workflow, node_id, image_port) return workflow, node_id, image_port def blocks(module): if module == "SD": if "FaceDetailer" in Choices.object_info: FaceDetailer.blocks(module) with gr.Tab(label="图像放大"): with gr.Row(): with gr.Tab(label="算术放大"): Upscale.blocks(module) with gr.Row(): with gr.Tab(label="超分放大"): UpscaleWithModel.blocks(module) gr.HTML("注意:同时启用两种放大模式将先执行算术放大,再执行超分放大,最终放大倍数为二者放大倍数的乘积!") class SD: def generate(initialized, batch_count, ckpt_name, vae_name, clip_mode, clip_skip, width, height, batch_size, negative_prompt, positive_prompt, seed, steps, cfg, sampler_name, scheduler, denoise, input_image, progress=gr.Progress()): module = "SD" ckpt_name = Function.get_model_path(ckpt_name) seed = Function.gen_seed(seed) if input_image is not None: input_image = Function.upload_image(input_image) counter = 1 output_images = [] node_id = 0 while counter <= batch_count: workflow = {} node_id += 1 workflow[str(node_id)] = {"inputs": {"ckpt_name": ckpt_name}, "class_type": "CheckpointLoaderSimple"} model_port = [str(node_id), 0] clip_port = [str(node_id), 1] vae_port = [str(node_id), 2] if vae_name != "Automatic": node_id += 1 workflow[str(node_id)] = {"inputs": {"vae_name": vae_name}, "class_type": "VAELoader"} vae_port = [str(node_id), 0] if input_image is None: node_id += 1 workflow[str(node_id)] = {"inputs": {"width": width, "height": height, "batch_size": batch_size}, "class_type": "EmptyLatentImage"} latent_image_port = [str(node_id), 0] else: node_id += 1 workflow[str(node_id)] = {"inputs": {"image": input_image, "upload": "image"}, "class_type": "LoadImage"} pixels_port = [str(node_id), 0] node_id += 1 workflow[str(node_id)] = {"inputs": {"pixels": pixels_port, "vae": vae_port}, "class_type": "VAEEncode"} latent_image_port = [str(node_id), 0] node_id += 1 workflow[str(node_id)] = {"inputs": {"stop_at_clip_layer": -clip_skip, "clip": clip_port}, "class_type": "CLIPSetLastLayer"} clip_port = [str(node_id), 0] if initialized is True and module in Lora.cache: workflow, node_id, model_port, clip_port = Lora.add_node(module, workflow, node_id, model_port, clip_port) node_id += 1 if clip_mode == "ComfyUI": workflow[str(node_id)] = {"inputs": {"text": positive_prompt, "clip": clip_port}, "class_type": "CLIPTextEncode"} else: workflow[str(node_id)] = {"inputs": {"text": positive_prompt, "token_normalization": "none", "weight_interpretation": "A1111", "clip": clip_port}, "class_type": "BNK_CLIPTextEncodeAdvanced"} positive_port = [str(node_id), 0] node_id += 1 if clip_mode == "ComfyUI": workflow[str(node_id)] = {"inputs": {"text": negative_prompt, "clip": clip_port}, "class_type": "CLIPTextEncode"} else: workflow[str(node_id)] = {"inputs": {"text": negative_prompt, "token_normalization": "none", "weight_interpretation": "A1111", "clip": clip_port}, "class_type": "BNK_CLIPTextEncodeAdvanced"} negative_port = [str(node_id), 0] if initialized is True and module in ControlNet.cache: workflow, node_id, positive_port, negative_port = ControlNet.add_node(module, counter, workflow, node_id, positive_port, negative_port) node_id += 1 workflow[str(node_id)] = {"inputs": {"seed": seed, "steps": steps, "cfg": cfg, "sampler_name": sampler_name, "scheduler": scheduler, "denoise": denoise, "model": model_port, "positive": positive_port, "negative": negative_port, "latent_image": latent_image_port}, "class_type": "KSampler"} samples_port = [str(node_id), 0] node_id += 1 workflow[str(node_id)] = {"inputs": {"samples": samples_port, "vae": vae_port}, "class_type": "VAEDecode"} image_port = [str(node_id), 0] if initialized is True: workflow, node_id, image_port = Postprocess.add_node(module, workflow, node_id, image_port, model_port, clip_port, vae_port, positive_port, negative_port, seed, steps, cfg, sampler_name, scheduler) node_id += 1 workflow[str(node_id)] = {"inputs": {"filename_prefix": "ComfyUI", "images": image_port}, "class_type": "SaveImage"} images = Function.gen_image(workflow, counter, batch_count, progress)[0] if images is None: break for image in images: output_images.append(image) seed += 1 counter += 1 return output_images, output_images def blocks(): with gr.Row(): with gr.Column(): positive_prompt = gr.Textbox(placeholder="Positive prompt | 正向提示词", show_label=False, value=Default.prompt, lines=3) negative_prompt = gr.Textbox(placeholder="Negative prompt | 负向提示词", show_label=False, value=Default.negative_prompt, lines=3) with gr.Tab(label="基础设置"): with gr.Row(): ckpt_name = gr.Dropdown(Choices.ckpt, label="Ckpt name | Ckpt 模型名称", value=Choices.ckpt[0]) vae_name = gr.Dropdown(Choices.vae, label="VAE name | VAE 模型名称", value=Choices.vae[0]) if "BNK_CLIPTextEncodeAdvanced" in Choices.object_info: clip_mode = gr.Dropdown(["ComfyUI", "WebUI"], label="Clip 编码类型", value="ComfyUI") else: clip_mode = gr.Dropdown(["ComfyUI", "WebUI"], label="Clip 编码类型", value="ComfyUI", visible=False) clip_skip = gr.Slider(minimum=1, maximum=12, step=1, label="Clip 跳过", value=1) with gr.Row(): width = gr.Slider(minimum=64, maximum=2048, step=64, label="Width | 图像宽度", value=Default.width) batch_size = gr.Slider(minimum=1, maximum=8, step=1, label="Batch size | 批次大小", value=1) with gr.Row(): height = gr.Slider(minimum=64, maximum=2048, step=64, label="Height | 图像高度", value=Default.hight) batch_count = gr.Slider(minimum=1, maximum=100, step=1, label="Batch count | 生成批次", value=1) with gr.Row(): if Choices.lora != []: Lora.blocks("SD") if Choices.embedding != []: embedding = gr.Dropdown(Choices.embedding, label="Embedding", multiselect=True, interactive=True) embedding.change(fn=Function.add_embedding, inputs=[embedding, negative_prompt], outputs=[negative_prompt]) with gr.Row(): SD.input_image = gr.Image(value=None, type="pil") gr.HTML("<br>上传图片即自动转为图生图模式。<br><br>文生图、图生图模式共享设置参数。<br><br>图像宽度、图像高度、批次大小对图生图无效。") with gr.Tab(label="采样设置"): with gr.Row(): sampler_name = gr.Dropdown(Choices.sampler, label="Sampling method | 采样方法", value=Choices.sampler[12]) scheduler = gr.Dropdown(Choices.scheduler, label="Schedule type | 采样计划表类型", value=Choices.scheduler[1]) with gr.Row(): denoise = gr.Slider(minimum=0, maximum=1, step=0.05, label="Denoise | 去噪强度", value=1) steps = gr.Slider(minimum=1, maximum=100, step=1, label="Sampling steps | 采样次数", value=Default.steps) with gr.Row(): cfg = gr.Slider(minimum=0, maximum=20, step=0.1, label="CFG Scale | CFG权重", value=7) seed = gr.Slider(minimum=-1, maximum=18446744073709550000, step=1, label="Seed | 种子数", value=-1) if Choices.controlnet_model != []: ControlNet.blocks("SD") Postprocess.blocks("SD") with gr.Column(): with gr.Row(): btn = gr.Button("Generate | 生成", elem_id="button") btn2 = gr.Button("Interrupt | 终止") output = gr.Gallery(preview=True, height=600) with gr.Row(): SD.send_to_sd = gr.Button("发送图片至 SD") if SC.enable is True: SD.send_to_sc = gr.Button("发送图片至 SC") if SVD.enable is True: SD.send_to_svd = gr.Button("发送图片至 SVD") SD.send_to_extras = gr.Button("发送图片至 Extras") SD.send_to_info = gr.Button("发送图片至 Info") SD.data = gr.State() SD.index = gr.State() btn.click(fn=SD.generate, inputs=[Initial.initialized, batch_count, ckpt_name, vae_name, clip_mode, clip_skip, width, height, batch_size, negative_prompt, positive_prompt, seed, steps, cfg, sampler_name, scheduler, denoise, SD.input_image], outputs=[output, SD.data]) btn2.click(fn=Function.post_interrupt, inputs=None, outputs=None) output.select(fn=Function.get_gallery_index, inputs=None, outputs=[SD.index]) class SC: if Default.design_mode == 1: enable = True elif "stable_cascade_stage_c.safetensors" in Choices.ckpt_list and "stable_cascade_stage_b.safetensors" in Choices.ckpt_list: enable = True else: enable = False def generate(initialized, batch_count, positive_prompt, negative_prompt, width, height, batch_size, seed_c, steps_c, cfg_c, sampler_name_c, scheduler_c, denoise_c, seed_b, steps_b, cfg_b, sampler_name_b, scheduler_b, denoise_b, input_image, progress=gr.Progress()): module = "SC" ckpt_name_c = Function.get_model_path("stable_cascade_stage_c.safetensors") ckpt_name_b = Function.get_model_path("stable_cascade_stage_b.safetensors") seed_c = Function.gen_seed(seed_c) seed_b = Function.gen_seed(seed_b) if input_image is not None: input_image = Function.upload_image(input_image) counter = 1 output_images = [] while counter <= batch_count: workflow = { "1": {"inputs": {"ckpt_name": ckpt_name_c}, "class_type": "CheckpointLoaderSimple"}, "2": {"inputs": {"image": input_image, "upload": "image"}, "class_type": "LoadImage"}, "3": {"inputs": {"compression": 42, "image": ["2", 0], "vae": ["1", 2]}, "class_type": "StableCascade_StageC_VAEEncode"}, "4": {"inputs": {"text": negative_prompt, "clip": ["1", 1]}, "class_type": "CLIPTextEncode"}, "5": {"inputs": {"text": positive_prompt, "clip": ["1", 1]}, "class_type": "CLIPTextEncode"}, "6": {"inputs": {"seed": seed_c, "steps": steps_c, "cfg": cfg_c, "sampler_name": sampler_name_c, "scheduler": scheduler_c, "denoise": denoise_c, "model": ["1", 0], "positive": ["5", 0], "negative": ["4", 0], "latent_image": ["3", 0]}, "class_type": "KSampler"}, "7": {"inputs": {"conditioning": ["5", 0], "stage_c": ["6", 0]}, "class_type": "StableCascade_StageB_Conditioning"}, "8": {"inputs": {"ckpt_name": ckpt_name_b}, "class_type": "CheckpointLoaderSimple"}, "9": {"inputs": {"seed": seed_b, "steps": steps_b, "cfg": cfg_b, "sampler_name": sampler_name_b, "scheduler": scheduler_b, "denoise": denoise_b, "model": ["8", 0], "positive": ["7", 0], "negative": ["4", 0], "latent_image": ["3", 1]}, "class_type": "KSampler"}, "10": {"inputs": {"samples": ["9", 0], "vae": ["8", 2]}, "class_type": "VAEDecode"} } if input_image is None: del workflow["2"] workflow["3"] = {"inputs": {"width": width, "height": height, "compression": 42, "batch_size": batch_size}, "class_type": "StableCascade_EmptyLatentImage"} node_id = 10 image_port = [str(node_id), 0] if initialized is True: workflow, node_id, image_port = Postprocess.add_node(module, workflow, node_id, image_port) node_id += 1 workflow[str(node_id)] = {"inputs": {"filename_prefix": "ComfyUI", "images": image_port}, "class_type": "SaveImage"} images = Function.gen_image(workflow, counter, batch_count, progress)[0] if images is None: break for image in images: output_images.append(image) seed_c += 1 counter += 1 return output_images, output_images def blocks(): with gr.Row(): with gr.Column(): positive_prompt = gr.Textbox(placeholder="Positive prompt | 正向提示词", show_label=False, value=Default.prompt, lines=3) negative_prompt = gr.Textbox(placeholder="Negative prompt | 负向提示词", show_label=False, value=Default.negative_prompt, lines=3) with gr.Tab(label="基础设置"): with gr.Row(): width = gr.Slider(minimum=128, maximum=2048, step=128, label="Width | 图像宽度", value=1024) batch_size = gr.Slider(minimum=1, maximum=8, step=1, label="Batch size | 批次大小", value=1) with gr.Row(): height = gr.Slider(minimum=128, maximum=2048, step=128, label="Height | 图像高度", value=1024) batch_count = gr.Slider(minimum=1, maximum=100, step=1, label="Batch count | 生成批次", value=1) with gr.Row(): SC.input_image = gr.Image(value=None, type="pil") gr.HTML("<br>上传图片即自动转为图生图模式。<br><br>文生图、图生图模式共享设置参数。<br><br>图像宽度、图像高度、批次大小对图生图无效。") with gr.Tab(label="Stage C 采样设置"): with gr.Row(): sampler_name_c = gr.Dropdown(Choices.sampler, label="Sampling method | 采样方法", value=Choices.sampler[12]) scheduler_c = gr.Dropdown(Choices.scheduler, label="Schedule type | 采样计划表类型", value=Choices.scheduler[1]) with gr.Row(): denoise_c = gr.Slider(minimum=0, maximum=1, step=0.05, label="Denoise | 去噪强度", value=1) steps_c = gr.Slider(minimum=10, maximum=30, step=1, label="Sampling steps | 采样次数", value=20) with gr.Row(): cfg_c = gr.Slider(minimum=0, maximum=20, step=0.1, label="CFG Scale | CFG权重", value=4) seed_c = gr.Slider(minimum=-1, maximum=18446744073709550000, step=1, label="Seed | 种子数", value=-1) with gr.Tab(label="Stage B 采样设置"): with gr.Row(): sampler_name_b = gr.Dropdown(Choices.sampler, label="Sampling method | 采样方法", value=Choices.sampler[12]) scheduler_b = gr.Dropdown(Choices.scheduler, label="Schedule type | 采样计划表类型", value=Choices.scheduler[1]) with gr.Row(): denoise_b = gr.Slider(minimum=0, maximum=1, step=0.05, label="Denoise | 去噪强度", value=1) steps_b = gr.Slider(minimum=4, maximum=12, step=1, label="Sampling steps | 采样次数", value=10) with gr.Row(): cfg_b = gr.Slider(minimum=0, maximum=20, step=0.1, label="CFG Scale | CFG权重", value=1.1) seed_b = gr.Slider(minimum=-1, maximum=18446744073709550000, step=1, label="Seed | 种子数", value=-1) Postprocess.blocks("SC") with gr.Column(): with gr.Row(): btn = gr.Button("Generate | 生成", elem_id="button") btn2 = gr.Button("Interrupt | 终止") output = gr.Gallery(preview=True, height=600) with gr.Row(): SC.send_to_sd = gr.Button("发送图片至 SD") SC.send_to_sc = gr.Button("发送图片至 SC") if SVD.enable is True: SC.send_to_svd = gr.Button("发送图片至 SVD") SC.send_to_extras = gr.Button("发送图片至 Extras") SC.send_to_info = gr.Button("发送图片至 Info") SC.data = gr.State() SC.index = gr.State() btn.click(fn=SC.generate, inputs=[Initial.initialized, batch_count, positive_prompt, negative_prompt, width, height, batch_size, seed_c, steps_c, cfg_c, sampler_name_c, scheduler_c, denoise_c, seed_b, steps_b, cfg_b, sampler_name_b, scheduler_b, denoise_b, SC.input_image], outputs=[output, SC.data]) btn2.click(fn=Function.post_interrupt, inputs=None, outputs=None) output.select(fn=Function.get_gallery_index, inputs=None, outputs=[SC.index]) class SVD: if Default.design_mode == 1: enable = True elif "svd_xt_1_1.safetensors" in Choices.ckpt_list: enable = True else: enable = False def generate(input_image, width, height, video_frames, motion_bucket_id, fps, augmentation_level, min_cfg, seed, steps, cfg, sampler_name, scheduler, denoise, fps2, lossless, quality, method, progress=gr.Progress()): ckpt_name = Function.get_model_path("svd_xt_1_1.safetensors") seed = Function.gen_seed(seed) if input_image is None: return else: input_image = Function.upload_image(input_image) workflow = { "1": {"inputs": {"ckpt_name": ckpt_name}, "class_type": "ImageOnlyCheckpointLoader"}, "2": {"inputs": {"image": input_image, "upload": "image"}, "class_type": "LoadImage"}, "3": {"inputs": {"width": width, "height": height, "video_frames": video_frames, "motion_bucket_id": motion_bucket_id, "fps": fps, "augmentation_level": augmentation_level, "clip_vision": ["1", 1], "init_image": ["2", 0], "vae": ["1", 2]}, "class_type": "SVD_img2vid_Conditioning"}, "4": {"inputs": {"min_cfg": min_cfg, "model": ["1", 0]}, "class_type": "VideoLinearCFGGuidance"}, "5": {"inputs": {"seed": seed, "steps": steps, "cfg": cfg, "sampler_name": sampler_name, "scheduler": scheduler, "denoise": denoise, "model": ["4", 0], "positive": ["3", 0], "negative": ["3", 1], "latent_image": ["3", 2]}, "class_type": "KSampler"}, "6": {"inputs": {"samples": ["5", 0], "vae": ["1", 2]}, "class_type": "VAEDecode"}, "7": {"inputs": {"filename_prefix": "ComfyUI", "fps": fps2, "lossless": False, "quality": quality, "method": method, "images": ["6", 0]}, "class_type": "SaveAnimatedWEBP"} } return Function.gen_image(workflow, 1, 1, progress)[1] def blocks(): with gr.Row(): with gr.Column(): SVD.input_image = gr.Image(value=None, type="pil") with gr.Tab(label="基础设置"): with gr.Row(): width = gr.Slider(minimum=64, maximum=2048, step=64, label="Width | 图像宽度", value=512) video_frames = gr.Slider(minimum=1, maximum=25, step=1, label="Video frames | 视频帧", value=25) with gr.Row(): height = gr.Slider(minimum=64, maximum=2048, step=64, label="Height | 图像高度", value=512) fps = gr.Slider(minimum=1, maximum=30, step=1, label="FPS | 帧率", value=6) with gr.Row(): with gr.Column(): augmentation_level = gr.Slider(minimum=0, maximum=1, step=0.01, label="Augmentation level | 增强级别", value=0) motion_bucket_id = gr.Slider(minimum=1, maximum=256, step=1, label="Motion bucket id | 运动参数", value=127) with gr.Column(): min_cfg = gr.Slider(minimum=0, maximum=20, step=0.5, label="Min CFG | 最小CFG权重", value=1) with gr.Tab(label="采样设置"): with gr.Row(): sampler_name = gr.Dropdown(Choices.sampler, label="Sampling method | 采样方法", value=Choices.sampler[12]) scheduler = gr.Dropdown(Choices.scheduler, label="Schedule type | 采样计划表类型", value=Choices.scheduler[1]) with gr.Row(): denoise = gr.Slider(minimum=0, maximum=1, step=0.05, label="Denoise | 去噪强度", value=1) steps = gr.Slider(minimum=10, maximum=30, step=1, label="Sampling steps | 采样次数", value=20) with gr.Row(): cfg = gr.Slider(minimum=0, maximum=20, step=0.1, label="CFG Scale | CFG权重", value=2.5) seed = gr.Slider(minimum=-1, maximum=18446744073709550000, step=1, label="Seed | 种子数", value=-1) with gr.Tab(label="输出设置"): with gr.Row(): method = gr.Dropdown(["default", "fastest", "slowest"], label="Method | 输出方法", value="default") lossless = gr.Dropdown(["true", "false"], label="Lossless | 无损压缩", value="false") with gr.Row(): quality = gr.Slider(minimum=70, maximum=100, step=1, label="Quality | 输出质量", value=85) fps2 = gr.Slider(minimum=1, maximum=30, step=1, label="FPS | 帧率", value=10) with gr.Column(): with gr.Row(): btn = gr.Button("Generate | 生成", elem_id="button") btn2 = gr.Button("Interrupt | 终止") output = gr.Image(height=600) btn.click(fn=SVD.generate, inputs=[SVD.input_image, width, height, video_frames, motion_bucket_id, fps, augmentation_level, min_cfg, seed, steps, cfg, sampler_name, scheduler, denoise, fps2, lossless, quality, method], outputs=[output]) btn2.click(fn=Function.post_interrupt, inputs=None, outputs=None) class Extras: def generate(initialized, input_image, progress=gr.Progress()): module = "Extras" if input_image is None: return else: input_image = Function.upload_image(input_image) workflow = {} node_id = 1 workflow[str(node_id)] = {"inputs": {"image": input_image, "upload": "image"}, "class_type": "LoadImage"} image_port = [str(node_id), 0] if initialized is True: if module not in Upscale.cache and module not in UpscaleWithModel.cache: return if module in Upscale.cache: workflow, node_id, image_port = Upscale.add_node(module, workflow, node_id, image_port) if module in UpscaleWithModel.cache: workflow, node_id, image_port = UpscaleWithModel.add_node(module, workflow, node_id, image_port) else: return node_id += 1 workflow[str(node_id)] = {"inputs": {"filename_prefix": "ComfyUI", "images": image_port}, "class_type": "SaveImage"} output = Function.gen_image(workflow, 1, 1, progress)[0] if output is not None: output = output[0] return output def blocks(): with gr.Row(): with gr.Column(): Extras.input_image = gr.Image(value=None, type="pil") with gr.Row(): with gr.Tab(label="算术放大"): Upscale.blocks("Extras") with gr.Row(): with gr.Tab(label="超分放大"): UpscaleWithModel.blocks("Extras") gr.HTML("注意:同时启用两种放大模式将先执行算术放大,再执行超分放大,最终放大倍数为二者放大倍数的乘积!") with gr.Column(): with gr.Row(): btn = gr.Button("Generate | 生成", elem_id="button") btn2 = gr.Button("Interrupt | 终止") output = gr.Image(height=600) btn.click(fn=Extras.generate, inputs=[Initial.initialized, Extras.input_image], outputs=[output]) btn2.click(fn=Function.post_interrupt, inputs=None, outputs=None) class Info: def generate(image_info, progress=gr.Progress()): if not image_info or image_info is None or image_info == "仅支持API工作流!!!" or "Version:" in image_info or image_info == "None": return workflow = json.loads(image_info) return Function.gen_image(workflow, 1, 1, progress)[0] def order_workflow(workflow): if workflow is None: return gr.update(visible=False, value=None) workflow = json.loads(workflow) if "last_node_id" in workflow: return gr.update(show_label=False, visible=True, value="仅支持API工作流!!!", lines=1) workflow = Function.order_workflow(workflow) lines = len(workflow) + 5 workflow_string = "{" for node in workflow: workflow_string = workflow_string + "\n" + f'"{node}": {workflow[node]},' workflow_string = workflow_string + "\n}" workflow_string = workflow_string.replace(",\n}", "\n}") workflow_string = workflow_string.replace("'", '"') return gr.update(label="Ordered workflow_api", show_label=True, visible=True, value=workflow_string, lines=lines) def get_image_info(image_pil): if image_pil is None: return gr.update(visible=False, value=None) else: image_info = Function.get_image_info(image_pil) if image_info == "None": return gr.update(visible=False, value=None) if "Version:" in image_info: return gr.update(label="Image info", show_label=True, visible=True, value=image_info, lines=3) return Info.order_workflow(image_info) def hide_another_input(this_input): if this_input is None: return gr.update(visible=True) return gr.update(visible=False) def blocks(): with gr.Row(): with gr.Column(): Info.input_image = gr.Image(value=None, type="pil") workflow = gr.File(label="workflow_api.json", file_types=[".json"], type="binary") image_info = gr.Textbox(visible=False) with gr.Column(): with gr.Row(): btn = gr.Button("Generate | 生成", elem_id="button") btn2 = gr.Button("Interrupt | 终止") output = gr.Gallery(preview=True, height=600) btn.click(fn=Info.generate, inputs=[image_info], outputs=[output]) btn2.click(fn=Function.post_interrupt, inputs=None, outputs=None) Info.input_image.change(fn=Info.hide_another_input, inputs=[Info.input_image], outputs=[workflow]) Info.input_image.change(fn=Info.get_image_info, inputs=[Info.input_image], outputs=[image_info]) workflow.change(fn=Info.hide_another_input, inputs=[workflow], outputs=[Info.input_image]) workflow.change(fn=Info.order_workflow, inputs=[workflow], outputs=[image_info]) with gr.Blocks(css="#button {background: #FFE1C0; color: #FF453A} .block.padded:not(.gradio-accordion) {padding: 0 !important;} div.form {border-width: 0; box-shadow: none; background: white; gap: 1.15em;}") as demo: Initial.initialized = gr.Checkbox(value=False, visible=False) with gr.Tab(label="Stable Diffusion"): SD.blocks() if SC.enable is True: with gr.Tab(label="Stable Cascade"): SC.blocks() if SVD.enable is True: with gr.Tab(label="Stable Video Diffusion"): SVD.blocks() with gr.Tab(label="Extras"): Extras.blocks() with gr.Tab(label="Info"): Info.blocks() SD.send_to_sd.click(fn=Function.send_to, inputs=[SD.data, SD.index], outputs=[SD.input_image]) if SC.enable is True: SD.send_to_sc.click(fn=Function.send_to, inputs=[SD.data, SD.index], outputs=[SC.input_image]) if SVD.enable is True: SD.send_to_svd.click(fn=Function.send_to, inputs=[SD.data, SD.index], outputs=[SVD.input_image]) SD.send_to_extras.click(fn=Function.send_to, inputs=[SD.data, SD.index], outputs=[Extras.input_image]) SD.send_to_info.click(fn=Function.send_to, inputs=[SD.data, SD.index], outputs=[Info.input_image]) if SC.enable is True: SC.send_to_sd.click(fn=Function.send_to, inputs=[SC.data, SC.index], outputs=[SD.input_image]) SC.send_to_sc.click(fn=Function.send_to, inputs=[SC.data, SC.index], outputs=[SC.input_image]) if SVD.enable is True: SC.send_to_svd.click(fn=Function.send_to, inputs=[SC.data, SC.index], outputs=[SVD.input_image]) SC.send_to_extras.click(fn=Function.send_to, inputs=[SC.data, SC.index], outputs=[Extras.input_image]) SC.send_to_info.click(fn=Function.send_to, inputs=[SC.data, SC.index], outputs=[Info.input_image]) demo.queue(concurrency_count=100).launch(inbrowser=True, inline=False)
flowworkflowcliimutpupromptcontrolnetideclipunitctoappcomfyuiwebjsoncodecadsocurlgradio