diff --git a/folder_paths.py b/folder_paths.py index 64c213ab..38fad623 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -1,14 +1,15 @@ +from __future__ import annotations + import os import time +import mimetypes import logging -import tempfile -import requests -from urllib.parse import urlparse -from typing import Any, IO +from typing import Set, List, Dict, Tuple, Literal +from collections.abc import Collection -supported_pt_extensions = set(['.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl']) +supported_pt_extensions: set[str] = {'.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl', '.sft'} -folder_names_and_paths = {} +folder_names_and_paths: dict[str, tuple[list[str], set[str]]] = {} base_path = os.path.dirname(os.path.realpath(__file__)) models_dir = os.path.join(base_path, "models") @@ -18,7 +19,7 @@ folder_names_and_paths["loras"] = ([os.path.join(models_dir, "loras")], supported_pt_extensions) folder_names_and_paths["vae"] = ([os.path.join(models_dir, "vae")], supported_pt_extensions) folder_names_and_paths["clip"] = ([os.path.join(models_dir, "clip")], supported_pt_extensions) -folder_names_and_paths["unet"] = ([os.path.join(models_dir, "unet")], supported_pt_extensions) +folder_names_and_paths["diffusion_models"] = ([os.path.join(models_dir, "unet"), os.path.join(models_dir, "diffusion_models")], supported_pt_extensions) folder_names_and_paths["clip_vision"] = ([os.path.join(models_dir, "clip_vision")], supported_pt_extensions) folder_names_and_paths["style_models"] = ([os.path.join(models_dir, "style_models")], supported_pt_extensions) folder_names_and_paths["embeddings"] = ([os.path.join(models_dir, "embeddings")], supported_pt_extensions) @@ -30,7 +31,7 @@ folder_names_and_paths["upscale_models"] = ([os.path.join(models_dir, "upscale_models")], supported_pt_extensions) -folder_names_and_paths["custom_nodes"] = ([os.path.join(base_path, "custom_nodes")], []) +folder_names_and_paths["custom_nodes"] = ([os.path.join(base_path, "custom_nodes")], set()) folder_names_and_paths["hypernetworks"] = ([os.path.join(models_dir, "hypernetworks")], supported_pt_extensions) @@ -43,7 +44,15 @@ input_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "input") user_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "user") -filename_list_cache = {} +filename_list_cache: dict[str, tuple[list[str], dict[str, float], float]] = {} + +extension_mimetypes_cache = { + "webp" : "image", +} + +def map_legacy(folder_name: str) -> str: + legacy = {"unet": "diffusion_models"} + return legacy.get(folder_name, folder_name) if not os.path.exists(input_directory): try: @@ -51,33 +60,40 @@ except: logging.error("Failed to create input directory") -def set_output_directory(output_dir): +def set_output_directory(output_dir: str) -> None: global output_directory output_directory = output_dir -def set_temp_directory(temp_dir): +def set_temp_directory(temp_dir: str) -> None: global temp_directory temp_directory = temp_dir -def set_input_directory(input_dir): +def set_input_directory(input_dir: str) -> None: global input_directory input_directory = input_dir -def get_output_directory(): +def get_output_directory() -> str: global output_directory return output_directory -def get_temp_directory(): +def get_temp_directory() -> str: global temp_directory return temp_directory -def get_input_directory(): +def get_input_directory() -> str: global input_directory return input_directory +def get_user_directory() -> str: + return user_directory + +def set_user_directory(user_dir: str) -> None: + global user_directory + user_directory = user_dir + #NOTE: used in http server so don't put folders that should not be accessed remotely -def get_directory_by_type(type_name): +def get_directory_by_type(type_name: str) -> str | None: if type_name == "output": return get_output_directory() if type_name == "temp": @@ -86,39 +102,32 @@ def get_directory_by_type(type_name): return get_input_directory() return None - -def is_valid_url(candidate_str: Any) -> bool: - if not isinstance(candidate_str, str): - return False - parsed = urlparse(candidate_str) - return parsed.scheme != "" and parsed.netloc != "" - -def _make_temp_file(file_path: str) -> IO: +def filter_files_content_types(files: List[str], content_types: Literal["image", "video", "audio"]) -> List[str]: """ - A utility function to write bytes to a temporary file. This is useful - if one needs to pass a file object to a function, but only has bytes. + Example: + files = os.listdir(folder_paths.get_input_directory()) + filter_files_content_types(files, ["image", "audio", "video"]) """ - # If the source is a valid url, we will download the content and return it. - try: - content = requests.get(file_path).content - except Exception: - raise ValueError(f"Failed to download content from url: {file_path}") - - _, extension = os.path.splitext(file_path) - - # get the temp file path - f = tempfile.NamedTemporaryFile(delete=False, suffix=extension) - f.write(content) - # Flush to make sure that the content is written. - f.flush() - # Seek to the beginning of the file so that the content can be read. - f.seek(0) - print(f'download url resources success to {f.name}') - return f + global extension_mimetypes_cache + result = [] + for file in files: + extension = file.split('.')[-1] + if extension not in extension_mimetypes_cache: + mime_type, _ = mimetypes.guess_type(file, strict=False) + if not mime_type: + continue + content_type = mime_type.split('/')[0] + extension_mimetypes_cache[extension] = content_type + else: + content_type = extension_mimetypes_cache[extension] + + if content_type in content_types: + result.append(file) + return result # determine base_dir rely on annotation if name is 'filename.ext [annotation]' format # otherwise use default_path as base_dir -def annotated_filepath(name): +def annotated_filepath(name: str) -> tuple[str, str | None]: if name.endswith("[output]"): base_dir = get_output_directory() name = name[:-9] @@ -134,16 +143,10 @@ def annotated_filepath(name): return name, base_dir -def get_annotated_filepath(name, default_dir=None): - # norm path +def get_annotated_filepath(name: str, default_dir: str | None=None) -> str: name, base_dir = annotated_filepath(name) if base_dir is None: - # find a https url - if is_valid_url(name): - src = _make_temp_file(name) - return src.name - if default_dir is not None: base_dir = default_dir else: @@ -152,7 +155,7 @@ def get_annotated_filepath(name, default_dir=None): return os.path.join(base_dir, name) -def exists_annotated_filepath(name): +def exists_annotated_filepath(name) -> bool: name, base_dir = annotated_filepath(name) if base_dir is None: @@ -162,17 +165,19 @@ def exists_annotated_filepath(name): return os.path.exists(filepath) -def add_model_folder_path(folder_name, full_folder_path): +def add_model_folder_path(folder_name: str, full_folder_path: str) -> None: global folder_names_and_paths + folder_name = map_legacy(folder_name) if folder_name in folder_names_and_paths: folder_names_and_paths[folder_name][0].append(full_folder_path) else: folder_names_and_paths[folder_name] = ([full_folder_path], set()) -def get_folder_paths(folder_name): +def get_folder_paths(folder_name: str) -> list[str]: + folder_name = map_legacy(folder_name) return folder_names_and_paths[folder_name][0][:] -def recursive_search(directory, excluded_dir_names=None): +def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) -> tuple[list[str], dict[str, float]]: if not os.path.isdir(directory): return [], {} @@ -189,6 +194,10 @@ def recursive_search(directory, excluded_dir_names=None): logging.warning(f"Warning: Unable to access {directory}. Skipping this path.") logging.debug("recursive file list on directory {}".format(directory)) + dirpath: str + subdirs: list[str] + filenames: list[str] + for dirpath, subdirs, filenames in os.walk(directory, followlinks=True, topdown=True): subdirs[:] = [d for d in subdirs if d not in excluded_dir_names] for file_name in filenames: @@ -196,7 +205,7 @@ def recursive_search(directory, excluded_dir_names=None): result.append(relative_path) for d in subdirs: - path = os.path.join(dirpath, d) + path: str = os.path.join(dirpath, d) try: dirs[path] = os.path.getmtime(path) except FileNotFoundError: @@ -205,13 +214,14 @@ def recursive_search(directory, excluded_dir_names=None): logging.debug("found {} files".format(len(result))) return result, dirs -def filter_files_extensions(files, extensions): +def filter_files_extensions(files: Collection[str], extensions: Collection[str]) -> list[str]: return sorted(list(filter(lambda a: os.path.splitext(a)[-1].lower() in extensions or len(extensions) == 0, files))) -def get_full_path(folder_name, filename): +def get_full_path(folder_name: str, filename: str) -> str | None: global folder_names_and_paths + folder_name = map_legacy(folder_name) if folder_name not in folder_names_and_paths: return None folders = folder_names_and_paths[folder_name] @@ -225,7 +235,16 @@ def get_full_path(folder_name, filename): return None -def get_filename_list_(folder_name): + +def get_full_path_or_raise(folder_name: str, filename: str) -> str: + full_path = get_full_path(folder_name, filename) + if full_path is None: + raise FileNotFoundError(f"Model in folder '{folder_name}' with filename '{filename}' not found.") + return full_path + + +def get_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float], float]: + folder_name = map_legacy(folder_name) global folder_names_and_paths output_list = set() folders = folder_names_and_paths[folder_name] @@ -235,11 +254,12 @@ def get_filename_list_(folder_name): output_list.update(filter_files_extensions(files, folders[1])) output_folders = {**output_folders, **folders_all} - return (sorted(list(output_list)), output_folders, time.perf_counter()) + return sorted(list(output_list)), output_folders, time.perf_counter() -def cached_filename_list_(folder_name): +def cached_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float], float] | None: global filename_list_cache global folder_names_and_paths + folder_name = map_legacy(folder_name) if folder_name not in filename_list_cache: return None out = filename_list_cache[folder_name] @@ -258,7 +278,8 @@ def cached_filename_list_(folder_name): return out -def get_filename_list(folder_name): +def get_filename_list(folder_name: str) -> list[str]: + folder_name = map_legacy(folder_name) out = cached_filename_list_(folder_name) if out is None: out = get_filename_list_(folder_name) @@ -266,22 +287,30 @@ def get_filename_list(folder_name): filename_list_cache[folder_name] = out return list(out[0]) -def get_save_image_path(filename_prefix, output_dir, image_width=0, image_height=0): - def map_filename(filename): +def get_save_image_path(filename_prefix: str, output_dir: str, image_width=0, image_height=0) -> tuple[str, str, int, str, str]: + def map_filename(filename: str) -> tuple[int, str]: prefix_len = len(os.path.basename(filename_prefix)) prefix = filename[:prefix_len + 1] try: digits = int(filename[prefix_len + 1:].split('_')[0]) except: digits = 0 - return (digits, prefix) + return digits, prefix - def compute_vars(input, image_width, image_height): + def compute_vars(input: str, image_width: int, image_height: int) -> str: input = input.replace("%width%", str(image_width)) input = input.replace("%height%", str(image_height)) + now = time.localtime() + input = input.replace("%year%", str(now.tm_year)) + input = input.replace("%month%", str(now.tm_mon).zfill(2)) + input = input.replace("%day%", str(now.tm_mday).zfill(2)) + input = input.replace("%hour%", str(now.tm_hour).zfill(2)) + input = input.replace("%minute%", str(now.tm_min).zfill(2)) + input = input.replace("%second%", str(now.tm_sec).zfill(2)) return input - filename_prefix = compute_vars(filename_prefix, image_width, image_height) + if "%" in filename_prefix: + filename_prefix = compute_vars(filename_prefix, image_width, image_height) subfolder = os.path.dirname(os.path.normpath(filename_prefix)) filename = os.path.basename(os.path.normpath(filename_prefix)) diff --git a/poetry.lock b/poetry.lock index 90955a49..c75e9d3c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1497,6 +1497,19 @@ files = [ {file = "einops-0.8.0.tar.gz", hash = "sha256:63486517fed345712a8385c100cb279108d9d47e6ae59099b07657e983deae85"}, ] +[[package]] +name = "emoji" +version = "1.7.0" +description = "Emoji for Python" +optional = false +python-versions = "*" +files = [ + {file = "emoji-1.7.0.tar.gz", hash = "sha256:65c54533ea3c78f30d0729288998715f418d7467de89ec258a31c0ce8660a1d1"}, +] + +[package.extras] +dev = ["coverage", "coveralls", "pytest"] + [[package]] name = "enscons" version = "0.30.0" @@ -4594,6 +4607,24 @@ files = [ docs = ["furo", "olefile", "sphinx (>=2.4)", "sphinx-copybutton", "sphinx-inline-tabs", "sphinx-removed-in", "sphinxext-opengraph"] tests = ["check-manifest", "coverage", "defusedxml", "markdown2", "olefile", "packaging", "pyroma", "pytest", "pytest-cov", "pytest-timeout"] +[[package]] +name = "pilmoji" +version = "2.0.4" +description = "Pilmoji is an emoji renderer for Pillow, Python's imaging library." +optional = false +python-versions = ">=3.8.0" +files = [ + {file = "pilmoji-2.0.4-py3-none-any.whl", hash = "sha256:2fcb2116226ed8aa600fcf6e65d7693b5d08d60715f8c460553130081a5adc26"}, + {file = "pilmoji-2.0.4.tar.gz", hash = "sha256:3861c09999f36bb32dd559e181b63a8a9c8068b2cddcac32637581e48de2d11f"}, +] + +[package.dependencies] +emoji = "*" +Pillow = "*" + +[package.extras] +requests = ["requests"] + [[package]] name = "pkginfo" version = "1.11.1" @@ -8821,4 +8852,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.12" -content-hash = "00710a0626abe7b28296d6e6c69c3e45021676388681e38846e7cedbd8279b0c" +content-hash = "78fb6cdbf77ab406a729b3d3a0783308cb4c5ba5d82157f60f8ed37272c55fa5" diff --git a/proconfig/widgets/__init__.py b/proconfig/widgets/__init__.py index 8db850ce..2518cedf 100644 --- a/proconfig/widgets/__init__.py +++ b/proconfig/widgets/__init__.py @@ -8,6 +8,7 @@ import proconfig.widgets.imagen_widgets import proconfig.widgets.language_models import proconfig.widgets.tools +import proconfig.widgets.myshell_widgets # load custom widgets import os diff --git a/proconfig/widgets/myshell_widgets/__init__.py b/proconfig/widgets/myshell_widgets/__init__.py new file mode 100644 index 00000000..b162c806 --- /dev/null +++ b/proconfig/widgets/myshell_widgets/__init__.py @@ -0,0 +1 @@ +from proconfig.widgets.myshell_widgets.tools.image_text_fuser import ImageTextFuserWidget \ No newline at end of file diff --git a/proconfig/widgets/myshell_widgets/tools/fonts/Caveat-Regular.ttf b/proconfig/widgets/myshell_widgets/tools/fonts/Caveat-Regular.ttf new file mode 100644 index 00000000..1febd9f1 Binary files /dev/null and b/proconfig/widgets/myshell_widgets/tools/fonts/Caveat-Regular.ttf differ diff --git a/proconfig/widgets/myshell_widgets/tools/fonts/KronaOne-Regular.ttf b/proconfig/widgets/myshell_widgets/tools/fonts/KronaOne-Regular.ttf new file mode 100644 index 00000000..51725ff4 Binary files /dev/null and b/proconfig/widgets/myshell_widgets/tools/fonts/KronaOne-Regular.ttf differ diff --git a/proconfig/widgets/myshell_widgets/tools/fonts/NotoSansTC-Regular.ttf b/proconfig/widgets/myshell_widgets/tools/fonts/NotoSansTC-Regular.ttf new file mode 100644 index 00000000..e838ef54 Binary files /dev/null and b/proconfig/widgets/myshell_widgets/tools/fonts/NotoSansTC-Regular.ttf differ diff --git a/proconfig/widgets/myshell_widgets/tools/fonts/Oswald-Bold.ttf b/proconfig/widgets/myshell_widgets/tools/fonts/Oswald-Bold.ttf new file mode 100644 index 00000000..b9c6e376 Binary files /dev/null and b/proconfig/widgets/myshell_widgets/tools/fonts/Oswald-Bold.ttf differ diff --git a/proconfig/widgets/myshell_widgets/tools/fonts/Oswald-Regular.ttf b/proconfig/widgets/myshell_widgets/tools/fonts/Oswald-Regular.ttf new file mode 100644 index 00000000..5903df43 Binary files /dev/null and b/proconfig/widgets/myshell_widgets/tools/fonts/Oswald-Regular.ttf differ diff --git a/proconfig/widgets/myshell_widgets/tools/image_text_fuser.py b/proconfig/widgets/myshell_widgets/tools/image_text_fuser.py new file mode 100644 index 00000000..a112d5ef --- /dev/null +++ b/proconfig/widgets/myshell_widgets/tools/image_text_fuser.py @@ -0,0 +1,286 @@ +import os +from proconfig.widgets.base import BaseWidget, WIDGETS +from typing import Literal, Any, Dict +import torch +from proconfig.utils.misc import upload_file_to_myshell +from PIL import Image, ImageDraw, ImageFont +import json +import emoji +from pilmoji import Pilmoji +import tempfile +import requests +from io import BytesIO + +font_dir = os.path.abspath('proconfig/widgets/myshell_widgets/tools/fonts') + +def fit_text_to_box(draw, text, box_size, font_path, max_font_size, min_font_size=10, line_spacing_factor=0.2): + max_width, max_height = box_size + font_size = max_font_size + + while font_size >= min_font_size: + font = ImageFont.truetype(os.path.join(font_dir, font_path), font_size) + words = text.split() + lines = [] + current_line = [] + + for word in words: + current_line.append(word) + line_width = font.getlength(' '.join(current_line)) + if line_width > max_width: + if len(current_line) == 1: + current_line = [word] + continue # Try the next font size + lines.append(' '.join(current_line[:-1])) + current_line = [word] + + if current_line: + lines.append(' '.join(current_line)) + + # Calculate line height and add extra line spacing + line_height = font.getbbox('Ay')[3] - font.getbbox('Ay')[1] + line_spacing = line_height * line_spacing_factor + total_height = (line_height + line_spacing) * len(lines) - line_spacing + + if total_height <= max_height: + return lines, font, line_height + line_spacing + + font_size -= 1 + + return None, None, None # If it can't fit, return None + +def hex_to_rgb(hex_color): + """Convert hex color string to RGBA tuple""" + hex_color = hex_color.lstrip('#') + hex_color = ''.join(c for c in hex_color if c.isalnum()) # Remove all non-alphanumeric characters + + if len(hex_color) == 6: + rgb = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) + return rgb + (255,) # Add fully opaque alpha value + elif len(hex_color) == 8: + return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4, 6)) + else: + print(f"Warning: Invalid hex color format '{hex_color}'. Using black.") + return (0, 0, 0, 255) # Return black as default + +def add_outline_to_text(pilmoji_draw, text, x, y, font, text_color, outline_color, line_height): + # 绘制文本轮廓(不包含表情符号) + text_without_emoji = ''.join(char for char in text if char not in emoji.EMOJI_DATA) + pilmoji_draw.text((x-1, y-1), text_without_emoji, font=font, fill=outline_color) + pilmoji_draw.text((x+1, y-1), text_without_emoji, font=font, fill=outline_color) + pilmoji_draw.text((x-1, y+1), text_without_emoji, font=font, fill=outline_color) + pilmoji_draw.text((x+1, y+1), text_without_emoji, font=font, fill=outline_color) + + # Draw the text itself + pilmoji_draw.text((x, y), text, font=font, fill=text_color, node_spacing=1, emoji_position_offset=[0, int(0.25 * line_height)]) + +def resize_image(img, position, size, method='contain', rotation=0): + original_width, original_height = img.size + target_width, target_height = size + x, y = position + + # First, perform rotation + if rotation != 0: + img = img.rotate(rotation, expand=True, resample=Image.BICUBIC) + + if method == 'cover': + img.thumbnail((target_width * 2, target_height * 2), Image.LANCZOS) + img = img.crop(( + (img.width - target_width) // 2, + (img.height - target_height) // 2, + (img.width + target_width) // 2, + (img.height + target_height) // 2 + )) + return img, position + + elif method == 'contain': + img.thumbnail(size, Image.LANCZOS) + paste_x = x + (target_width - img.width) // 2 + paste_y = y + (target_height - img.height) // 2 + return img, (paste_x, paste_y) + + elif method == 'fill': + img = img.resize(size, Image.LANCZOS) + return img, position + + else: + raise ValueError("Invalid resize method. Choose 'cover', 'contain', or 'fill'.") + +def get_image(image_path): + if image_path.startswith('http'): + response = requests.get(image_path) + response.raise_for_status() + return Image.open(BytesIO(response.content)).convert('RGBA') + else: + return Image.open(image_path).convert('RGBA') +# ImageFont.truetype(os.path.join(font_dir, title['font']), title['font_size']) +def add_content_to_image(template_path, content): + image = get_image(template_path) + draw = ImageDraw.Draw(image, mode='RGBA') + pilmoji_draw = Pilmoji(image) + + layers = [] + default_z_index = 0 + + # Handle title + if 'title' in content: + title = content['title'] + title_font = ImageFont.truetype(os.path.join(font_dir, title['font']), title['font_size']) + title_box_size = (image.width, 50) # Adjust height as needed + title_lines, title_font, line_height = fit_text_to_box(draw, title['text'], title_box_size, title['font'], title['font_size']) + title_color = title.get('color', '000000FF') # default black + if isinstance(title_color, str): + title_color = hex_to_rgb(title_color) + if title_lines: + y = 50 # Adjust top margin as needed + for line in title_lines: + line_width = title_font.getlength(line) + x = (image.width - line_width) / 2 # Center horizontally + layers.append({ + 'type': 'text', + 'content': line, + 'position': (x, y), + 'font': title_font, + 'color': title_color, + 'z_index': title.get('z_index', default_z_index), + 'outline_color': title.get('outline_color'), + 'line_height': 0 + }) + y += line_height + default_z_index += 1 + + for box in content['boxes']: + position = tuple(box['position']) + size = tuple(box['size']) + z_index = box.get('z_index', default_z_index) + default_z_index += 1 + + if 'text' in box: + text = box['text'] + font_path = os.path.join(font_dir, box.get('font', content.get('font', 'Oswald-Regular.ttf'))) + font_size = box['font_size'] + color = box.get('color', '000000FF') + if isinstance(color, str): + color = hex_to_rgb(color) + + underline = box.get('underline', False) + text_align = box.get('text_align', 'center') + vertical_align = box.get('vertical_align', 'center') + outline_color = box.get('outline_color') + + lines, font, line_height = fit_text_to_box(draw, text, size, font_path, font_size) + if not lines: + print(f"Warning: Text '{text}' doesn't fit in the box even at 1pt size.") + continue + + x, y = position + box_width, box_height = size + text_height = len(lines) * line_height + + # Vertical alignment + if vertical_align == 'top': + y_start = y + elif vertical_align == 'bottom': + y_start = y + box_height - text_height + else: # center + y_start = y + (box_height - text_height) / 2 + + for line in lines: + line_width = font.getlength(line) + + # Horizontal alignment + if text_align == 'left': + x_aligned = x + elif text_align == 'right': + x_aligned = x + box_width - line_width + else: # center + x_aligned = x + (box_width - line_width) / 2 + + layers.append({ + 'type': 'text', + 'content': line, + 'position': (x_aligned, y_start), + 'font': font, + 'color': color, + 'underline': underline, + 'z_index': z_index, + 'outline_color': outline_color, + 'line_height': line_height + }) + y_start += line_height + + elif 'image' in box: + try: + box_image = get_image(box['image']) + + rotation = box.get('rotation', 0) + resize_method = box.get('resize_method', 'contain') + box_image, paste_position = resize_image(box_image, position, size, resize_method, rotation) + + layers.append({ + 'type': 'image', + 'content': box_image, + 'position': paste_position, + 'z_index': z_index + }) + except Exception as e: + print(f"Error processing image {box['image']}: {str(e)}") + + # Sort layers based on z_index, maintaining original order for equal z_index values + layers.sort(key=lambda x: x.get('z_index', 0)) + print([layer['z_index'] for layer in layers]) + + # Draw layers in order + for layer in layers: + print(layer) + if layer['type'] == 'text': + layer['position'] = [int(x) for x in layer['position']] + if layer.get('outline_color'): + outline_color = layer['outline_color'] + if isinstance(outline_color, str): + outline_color = hex_to_rgb(outline_color) + print(layer['content'], layer['color']) + add_outline_to_text(pilmoji_draw, layer['content'], layer['position'][0], layer['position'][1], + layer['font'], layer['color'], outline_color, layer['line_height']) + else: + pilmoji_draw.text(layer['position'], layer['content'], font=layer['font'], fill=layer['color'], node_spacing=1, emoji_position_offset=[0, int(0.25 * layer['line_height'])]) + + if layer.get('underline', False): + x, y = layer['position'] + line_width = layer['font'].getlength(layer['content']) + y_underline = y + layer['font'].getbbox('A')[3] + 2 + draw.line((x, y_underline, x + line_width, y_underline), fill=layer['color'], width=1) + elif layer['type'] == 'image': + temp = Image.new('RGBA', image.size, (0, 0, 0, 0)) + temp.paste(layer['content'], layer['position'], layer['content']) + image = Image.alpha_composite(image, temp) + draw = ImageDraw.Draw(image) + pilmoji_draw = Pilmoji(image) + + return image + +@WIDGETS.register_module() +class ImageTextFuserWidget(BaseWidget): + NAME = "Image Text Fuser" + CATEGORY = 'Myshell Widgets/Tools' + + class InputsSchema(BaseWidget.InputsSchema): + config: str = "" + + class OutputsSchema(BaseWidget.OutputsSchema): + url: str + + @torch.no_grad() + def execute(self, environ, config): + return_dict = {} + + parsed_config = json.loads(config.config) + template_path = parsed_config['template_path'] + + image = add_content_to_image(template_path, parsed_config) + with tempfile.NamedTemporaryFile(suffix=".png") as f: + image.save(f.name) + f.flush() + f.seek(0) + return_dict['url'] = upload_file_to_myshell(f.name) + + return return_dict \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 7545b7cb..d5262dd6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,8 @@ torchscale = "0.3.0" requests = "2.31.0" wget = "^3.2" pilgram = "^1.2.1" +pilmoji = "^2.0.4" +emoji = "1.7.0" [build-system] diff --git a/servers/automata.py b/servers/automata.py index be3f4c0f..fbe95fbc 100644 --- a/servers/automata.py +++ b/servers/automata.py @@ -13,6 +13,7 @@ from filelock import FileLock import torch +import re from pydantic import BaseModel @@ -265,17 +266,8 @@ def generate(client_queue): continue client_queue = queue.Queue() - - # while True: - # if len(clients) == 0: - # clients.append(client_queue) - # break - # else: - # time.sleep(2) - # print("waiting...") threading.Thread(target=execute_automata, args=(task_id, client_queue, automata, environ, event_data.form_data, sess_id, sess_state)).start() - # return Response(generate(client_queue), mimetype='text/event-stream') resp = Response(generate(client_queue), mimetype='text/event-stream') headers = [ ('Connection', 'keep-alive'), @@ -335,7 +327,25 @@ def build_form_schema(target_inputs): canUploadFile=canUploadFile ) return json_schema, inputSetting - + + +def process_text_embeded_uri(text): + if text is None: + return "" + # Define a regular expression to find img, video, and audio tags with src attribute + def replace_src(match): + tag, attributes = match.groups() + # Find the src attribute and replace its value + new_attributes = re.sub(r'src=["\']([^"\']+)["\']', r'src="/api/files/\1"', attributes) + return f"<{tag} {new_attributes}>" + + # Regular expression to match ,