diff --git a/nnll_05/__init__.py b/nnll_05/__init__.py index 6a3a01b9..9d78627a 100644 --- a/nnll_05/__init__.py +++ b/nnll_05/__init__.py @@ -13,32 +13,6 @@ mir_db = JSONCache(CONFIG_PATH_NAMED) -@debug_monitor -def label_key_prompt(message: dict, mode_in: str) -> str: - """Distil multi-prompt input streams into a single primary source\n - Use secondary streams for supplementary processes.\n - **TL;DR**: text prompts take precedence when available\n - :param message: User-supplied input data - :param source: User-supplied origin data state - :return: `str` Initial conversion state\n\n - ``` - dict medium data - ,-text str|dict - '-image array - message-'-speech array - '-video array - '-music array - ``` - """ - if not mode_in: - if not message["text"]: - mode_in = next(iter([mode_in for mode_in in message if mode_in is not None and len(mode_in) > 1])) - else: - mode_in = "text" - - return message.get(mode_in) - - @debug_monitor def split_sequence_by(delimiter: str = ".", sequence: str = "") -> tuple | None: """Divide a string into an import module and a function @@ -88,6 +62,32 @@ def pull_path_entries(nx_graph: nx.Graph, traced_path: list[tuple]) -> None: return registry_entries +# @debug_monitor +# def label_key_prompt(message: dict, mode_in: str) -> str: +# """Distil multi-prompt input streams into a single primary source\n +# Use secondary streams for supplementary processes.\n +# **TL;DR**: text prompts take precedence when available\n +# :param message: User-supplied input data +# :param source: User-supplied origin data state +# :return: `str` Initial conversion state\n\n +# ``` +# dict medium data +# ,-text str|dict +# '-image array +# message-'-speech array +# '-video array +# '-music array +# ``` +# """ +# if not mode_in: +# if not message["text"]: +# mode_in = next(iter([mode_in for mode_in in message if mode_in is not None and len(mode_in) > 1])) +# else: +# mode_in = "text" + +# return message.get(mode_in) + + # async def machine_intent(message: Any, registry_entries: dict, coordinates_path: dict) -> Any: # """Execute on instructions selected previously""" # from nnll_11 import chat diff --git a/nnll_10/__init__.py b/nnll_10/__init__.py index 49e90c17..4b2ce01c 100644 --- a/nnll_10/__init__.py +++ b/nnll_10/__init__.py @@ -20,9 +20,9 @@ class IntentProcessor: intent_graph: nx.Graph = None - coordinates_path: list[str] = None - registry_entries: list[dict[RegistryEntry]] = None - model_names: list[tuple[str]] = None + coord_path: list[str] = None + ckpts: list[dict[RegistryEntry]] = None + models: list[tuple[str]] = None # additional_model_names: dict = None def __init__(self): @@ -31,15 +31,26 @@ def __init__(self): """ @debug_monitor - def calculate_intent_graph(self) -> None: - """Generate and store the intent graph.""" - from nnll_14 import calculate_graph - - self.intent_graph = calculate_graph() + def calc_graph(self) -> None: + """Generate graph of coordinate pairs from valid conversions\n + Model libraries are auto-detected from cache loading\n + :param nx_graph: Preassembled graph of models to label + :return: Graph modeling all current ML/AI tasks appended with model data""" + from nnll_15 import VALID_CONVERSIONS, from_cache + + self.intent_graph = nx.MultiDiGraph() + self.intent_graph.add_nodes_from(VALID_CONVERSIONS) + registry_entries = from_cache() + if registry_entries: + for model in registry_entries: + self.intent_graph.add_edges_from(model.available_tasks, entry=model, weight=1.0) + else: + nfo("Registry error, graph attributes not applied") return self.intent_graph @debug_monitor - def confirm_available_graph(self) -> None: + def has_graph(self) -> None: + """Verify the graph has been created""" try: assert self.intent_graph is not None except AssertionError as error_log: @@ -48,108 +59,127 @@ def confirm_available_graph(self) -> None: return True @debug_monitor - def confirm_coordinates_path(self) -> None: + def has_path(self) -> None: + """Verify the path has been created""" try: - assert self.coordinates_path is not None + assert self.coord_path is not None except AssertionError as error_log: dbug(error_log) return False return True @debug_monitor - async def confirm_model_waypoints(self) -> None: + def has_ckpt(self) -> None: + """Verify the model checkpoints are known""" try: - assert self.registry_entries is not None + assert self.ckpts is not None except AssertionError as error_log: dbug(error_log) return False return True @debug_monitor - def derive_coordinates_path(self, mode_in: str, mode_out: str) -> None: + def set_path(self, mode_in: str, mode_out: str) -> None: """ - Derive the coordinates path based on traced objectives. - :param prompt_type: If provided, will use this. Otherwise, resolves from content. + Find a valid path from current state (mode_in) to designated state (mode_out)\n + :param mode_in: Input prompt type or starting state/states + :param mode_out: The user-selected ending-state + :return: An iterator for the edges forming a way towards the mode out, or Note """ - from nnll_14 import trace_objective - self.confirm_available_graph() - self.coordinates_path = trace_objective(self.intent_graph, mode_in=mode_in, mode_out=mode_out) + self.has_graph() + if nx.has_path(self.intent_graph, mode_in, mode_out): # Ensure path exists (otherwise 'bidirectional' may loop infinitely) + if mode_in == mode_out and mode_in != "text": + orig_mode_out = mode_out # Solve case of non-text self-loop edge being incomplete transformation + mode_out = "text" + self.coord_path = nx.bidirectional_shortest_path(self.intent_graph, mode_in, mode_out) + self.coord_path.append(orig_mode_out) + else: + self.coord_path = nx.bidirectional_shortest_path(self.intent_graph, mode_in, mode_out) + if len(self.coord_path) == 1: + self.coord_path.append(mode_out) # this behaviour likely to change in future @debug_monitor - def define_model_waypoints(self) -> None: + def set_ckpts(self) -> None: from nnll_05 import pull_path_entries - self.confirm_available_graph() - self.confirm_coordinates_path() - self.registry_entries = pull_path_entries(self.intent_graph, self.coordinates_path) - self.model_names = [] - self.registry_entries = sorted(self.registry_entries, key=lambda x: x["weight"]) - nfo([x["weight"] for x in self.registry_entries]) - for registry in self.registry_entries: - model_name = registry["entry"].model - if int(registry.get("weight")) == 0: - self.model_names.insert(0, (f"*{os.path.basename(model_name)}", model_name)) + self.has_graph() + self.has_path() + self.ckpts = pull_path_entries(self.intent_graph, self.coord_path) + self.models = [] + nfo(self.ckpts) + self.ckpts = sorted(self.ckpts, key=lambda x: x["weight"]) + nfo([x["weight"] for x in self.ckpts]) + for registry in self.ckpts: + model = registry["entry"].model + weight = registry.get("weight") + if weight != 1.0: + self.models.insert(0, (f"*{os.path.basename(model)}", model)) + nfo("adjusted model :", f"*{os.path.basename(model)}", weight) else: - self.model_names.append((os.path.basename(model_name), model_name)) + self.models.append((os.path.basename(model), model)) + # nfo("model : ", model, weight) + self.models = sorted(self.models, key=lambda x: "*" in x) @debug_monitor - async def toggle_weight(self, selection: str, base_weight=1.0, index_num=0) -> None: + def edit_weight(self, selection: str, mode_in: str, mode_out: str) -> None: """Determine entry edge, determine index, then adjust weight""" - entry = [reg_entry for reg_entry in self.intent_graph.edges(data=True) if selection in reg_entry[2].get("entry").model] - edge_data = self.intent_graph[entry[0][0]][entry[0][1]] - nfo("entry :", entry, "edge_data :", edge_data) - for num in edge_data: - if selection in edge_data[num].get("entry").model: - index_num = num - base_weight = entry[0][2].get("weight") - # model_name = entry[0][2].get("entry").model - if int(base_weight) == 0: - self.intent_graph[entry[0][0]][entry[0][1]][index_num]["weight"] = base_weight + 0.1 - else: - self.intent_graph[entry[0][0]][entry[0][1]][index_num]["weight"] = base_weight - 0.1 - # index_name = (os.path.basename(model_name), model_name) - # self.model_names.insert(self.model_names.index(index_name), (f"*{os.path.basename(model_name)}", model_name)) - nfo("Weight changed for: ", entry[0][2].get("entry").model, f"model # {index_num}") - dbug("Confirm :", self.intent_graph[entry[0][0]][entry[0][1]]) - - # [reg_id[2] for reg_id in self.intent_graph.edges(data=True) if selection in reg_id[2].get("entry").model] - # right = left - weight_value - # if weight_value - # pass - - # left = int(weight_value) - # right = left - weight_value - - # # def check_weights(self, entry: str) -> None: - - # registry_data = [reg_id[2].get('entry').model for reg_id in self.intent_graph.edges(data=True).model if 'ibm' in reg_id[2].get('entry').model] - - # def - # add weight - # check weight - - # async def walk_intent(self, send: bool = False, composer: Callable = None, processor: Callable = None) -> None: - # """Provided the coordinates in the intent processor, follow the list of in and out methods""" - # await self.confirm_available_graph() - # await self.confirm_coordinates_path() - # coordinates = self.coordinates_path - # if not coordinates: - # coordinates = ["text", "text"] - # hop_length = len(coordinates) - 1 - # for i in range(hop_length): - # if i + 1 < hop_length: - # await self.confirm_coordinates_path() - # await self.confirm_model_waypoints() - # if send: - # await processor(last_hop=False) - # composer(mode_in=coordinates[i + 1], mode_out=coordinates[i + 2]) - # else: - # old_model_names = self.model_names if self.model_names else [] - # composer(mode_in=coordinates[i + 1], mode_out=coordinates[i + 2], io_only=True) - # self.model_names.extend(old_model_names) - - # elif send: - # await self.confirm_coordinates_path() - # await self.confirm_model_waypoints() - # processor() + reg_entries = [nbrdict for n, nbrdict in self.intent_graph.adjacency()] + index = [x for x in reg_entries[0][mode_out] if selection in reg_entries[0][mode_out][x].get("entry").model] + model = reg_entries[0][mode_out][index[0]].get("entry").model + weight = reg_entries[0][mode_out][index[0]].get("weight") + nfo(model, index, weight) + if weight < 1.0: + self.intent_graph[mode_in][mode_out][index[0]]["weight"] = weight + 0.1 + else: + self.intent_graph[mode_in][mode_out][index[0]]["weight"] = weight - 0.1 + nfo("Weight changed for: ", self.intent_graph[mode_in][mode_out][index[0]]["entry"].model, f"model # {index[0]}") + self.set_ckpts() + dbug("Confirm :", self.intent_graph[mode_in][mode_out]) + + +# model_name = entry[2].get("entry").model +# index_name = (os.path.basename(model_name), model_name) +# self.model_names.insert(self.model_names.index(index_name), (f"*{os.path.basename(model_name)}", model_name)) +# what should happen next is setckpts + +# [reg_id[2] for reg_id in self.intent_graph.edges(data=True) if selection in reg_id[2].get("entry").model] +# right = left - weight_value +# if weight_value +# pass + +# left = int(weight_value) +# right = left - weight_value + +# # def check_weights(self, entry: str) -> None: + +# registry_data = [reg_id[2].get('entry').model for reg_id in self.intent_graph.edges(data=True).model if 'ibm' in reg_id[2].get('entry').model] + +# def +# add weight +# check weight + +# async def walk_intent(self, send: bool = False, composer: Callable = None, processor: Callable = None) -> None: +# """Provided the coordinates in the intent processor, follow the list of in and out methods""" +# await self.confirm_available_graph() +# await self.confirm_coordinates_path() +# coordinates = self.coordinates_path +# if not coordinates: +# coordinates = ["text", "text"] +# hop_length = len(coordinates) - 1 +# for i in range(hop_length): +# if i + 1 < hop_length: +# await self.confirm_coordinates_path() +# await self.confirm_model_waypoints() +# if send: +# await processor(last_hop=False) +# composer(mode_in=coordinates[i + 1], mode_out=coordinates[i + 2]) +# else: +# old_model_names = self.model_names if self.model_names else [] +# composer(mode_in=coordinates[i + 1], mode_out=coordinates[i + 2], io_only=True) +# self.model_names.extend(old_model_names) + +# elif send: +# await self.confirm_coordinates_path() +# await self.confirm_model_waypoints() +# processor() diff --git a/nnll_10/package/__main__.py b/nnll_10/package/__main__.py index c7d7e833..037794b7 100644 --- a/nnll_10/package/__main__.py +++ b/nnll_10/package/__main__.py @@ -3,7 +3,7 @@ # pylint: disable=missing-module-docstring, disable=missing-class-docstring -from textual import work, events +from textual import work, events, on from textual.app import App from textual.binding import Binding from textual.reactive import reactive @@ -15,8 +15,6 @@ class Combo(App): - """Machine Accelerated Intelligent Network""" - SCREENS = {"fold": Fold} CSS_PATH = "combo.tcss" BINDINGS = [Binding("escape", "safe_exit", "◼︎ / ⏏︎")] # Cancel response @@ -31,6 +29,7 @@ def on_mount(self) -> None: self.theme = "flexoki" @work(exit_on_error=True) + @on(events.Key) async def _on_key(self, event: events.Key): """Window for triggering key bindings""" if event.key not in ["escape", "ctrl+left_square_brace"]: diff --git a/nnll_10/package/combo.tcss b/nnll_10/package/combo.tcss index 5d8817ee..6fb93b9f 100644 --- a/nnll_10/package/combo.tcss +++ b/nnll_10/package/combo.tcss @@ -116,6 +116,25 @@ $selection-fg: $background; background: $selection-bg; } } + &.active { + Static#label { + &:blur { + color: $accent; + text-align: end; + + } + &:focus { + color: $accent; + } + &:hover { + color: $accent; + } + &.-selected { + color: $accent; + background: $selection-bg; + } + } + } SelectCurrent { background:$negative-space 7%; color: $sunken-text; @@ -143,7 +162,8 @@ $selection-fg: $background; SelectOverlay { text-wrap: nowrap; - overflow: hidden; + overflow-x: hidden; + overflow-y: scroll; text-align: end; color: $sunken-text; overlay: screen; @@ -282,12 +302,6 @@ SidebarLeft, SidebarRight { background: $top-surface; } } - &.active { - .datatable--cursor { - color: $foreground; - background: $top-surface; - } - } } #input_tag { diff --git a/nnll_10/package/input_tag.py b/nnll_10/package/input_tag.py index 6779f4dc..b5680a0c 100644 --- a/nnll_10/package/input_tag.py +++ b/nnll_10/package/input_tag.py @@ -13,10 +13,12 @@ class InputTag(Carousel): target_options: reactive[set] = reactive({}) def on_mount(self): - graph_edges = self.query_ancestor(Screen).intent_processor.intent_graph.edges - # note: Length sort on target_options kinda poor way to get text on top, works for the moment tho - self.target_options = sorted({edge[1] for edge in graph_edges}, key=len) - self.add_columns("0", "1", "2") - self.add_rows([self.up.strip(), row.strip(), self.dwn.strip()] for row in self.target_options) - self.cursor_foreground_priority = "css" - self.cursor_background_priority = "css" + scrn = self.query_ancestor(Screen) + if scrn.int_proc.has_graph(): + graph_edges = scrn.int_proc.intent_graph.edges + # note: Length sort on target_options kinda poor way to get text on top, works for the moment tho + self.target_options = sorted({edge[1] for edge in graph_edges}, key=len) + self.add_columns("0", "1", "2") + self.add_rows([self.up.strip(), row.strip(), self.dwn.strip()] for row in self.target_options) + self.cursor_foreground_priority = "css" + self.cursor_background_priority = "css" diff --git a/nnll_10/package/main_screen.py b/nnll_10/package/main_screen.py index a0b3d051..82f0969d 100644 --- a/nnll_10/package/main_screen.py +++ b/nnll_10/package/main_screen.py @@ -10,18 +10,20 @@ from textual.app import ComposeResult from textual.binding import Binding from textual.containers import Container - +from textual.message import Message from textual.reactive import reactive from textual.screen import Screen # from textual.widget import Widget -from textual.widgets import Static, ContentSwitcher, Select # , DataTable +from textual.widgets import Static, ContentSwitcher # , DataTable + from nnll_01 import debug_monitor, info_message as nfo, debug_message as dbug from nnll_10.package.message_panel import MessagePanel from nnll_10 import IntentProcessor from nnll_10.package.input_tag import InputTag from nnll_10.package.output_tag import OutputTag +from nnll_10.package.selectah import Selectah class Fold(Screen[bool]): @@ -40,8 +42,10 @@ class Fold(Screen[bool]): ] ui: dict = defaultdict(dict) - intent_processor: reactive[Callable] = reactive(None) - media_pkg: dict = {} + int_proc: reactive[Callable] = reactive(None) + # model_names = [("model", 0), ("x", 0)] + tx_data: dict = {} + counter = 0 input_map: dict = { "text": "message_panel", "image": "message_panel", @@ -57,8 +61,8 @@ def compose(self) -> ComposeResult: from nnll_10.package.response_panel import ResponsePanel from nnll_10.package.voice_panel import VoicePanel - self.intent_processor = IntentProcessor() - self.intent_processor.calculate_intent_graph() + self.int_proc = IntentProcessor() + self.int_proc.calc_graph() self.ready_tx(mode_in="text", mode_out="text") yield Footer(id="footer") with Horizontal(id="app-grid", classes="app-grid-horizontal"): @@ -71,12 +75,11 @@ def compose(self) -> ComposeResult: yield InputTag(id="input_tag", classes="input_tag") with Horizontal(id="seam"): yield DisplayBar(id="display_bar") # 1: - yield Select( + yield Selectah( id="selectah", classes="selectah", - allow_blank=False, - prompt="Model options:", - options=self.intent_processor.model_names, + prompt=os.path.basename(next(iter(self.int_proc.models))[0]), + options=self.int_proc.models, type_to_search=True, ) with Container(id="responsive_display"): # @@ -96,12 +99,14 @@ async def on_mount(self) -> None: self.ui["rp"] = self.query_one("#response_panel") self.ui["vp"] = self.query_one("#voice_panel") self.ui["sl"] = self.query_one("#selectah") + self.ready_tx() + self.walk_intent() # id_name = self.input_tag.highlight_link_id @work(exit_on_error=False) async def on_resize(self, event=events.Resize) -> None: """Textual API, scale/orientation screen responsivity""" - if self.confirm_widgets_ready(): + if self.is_ui_ready(): display = self.query_one("#app-grid") width = event.container_size.width height = event.container_size.height @@ -110,15 +115,9 @@ async def on_resize(self, event=events.Resize) -> None: elif width / 2 < height: # Screen is tall display.set_classes("app-grid-vertical") - @work(exclusive=True) - async def on_focus(self, event: events.Focus) -> None: - """Textual API event, refresh pathing""" - if event.control in ["input_tag", "output_tag", "panel_swap", "message_panel"]: - self.ready_tx() - self.walk_intent() - @debug_monitor - def confirm_widgets_ready(self): + def is_ui_ready(self): + """Confirm UI is active""" try: assert hasattr(self.ui["sl"], "is_mounted") except AssertionError as error_log: @@ -126,26 +125,17 @@ def confirm_widgets_ready(self): return False return True - @work(exclusive=True, group="on_change") - @on(Select.Changed) - async def select_changed(self, event: Select.Changed) -> None: - if self.confirm_widgets_ready() and self.ui["sl"].has_focus: - await self.intent_processor.toggle_weight(selection=os.path.basename(event.control.selection)) + @on(events.Focus) + def on_focus(self, event: events.Focus) -> None: + """Textual API event, refresh pathing""" + if self.ui["sl"].has_focus and self.ui["sl"].expanded: self.ready_tx() - if self.intent_processor.confirm_available_graph() and self.intent_processor.confirm_coordinates_path(): - self.walk_intent() - if await self.intent_processor.confirm_model_waypoints(): - self.ui["sl"].set_options(self.intent_processor.model_names) - - # # @work(exclusive=True) - # @on(events.Click, "#selectah") - # async def on_click(self) -> None: - # """Expand panel immediately when clicked in terminal""" - # if self.confirm_widgets_ready(): - # if not self.ui["sl"].expanded: - # self.ui["sl"].expanded = True - # else: - # self.ui["sl"].expanded = False + self.walk_intent() + self.ui["sl"].prompt = next(iter(self.int_proc.models))[0] + + # selection = next(iter(self.graph.models))[1] + # else: + # selection = self.selection @work(exclusive=True) async def _on_key(self, event: events.Key) -> None: @@ -153,21 +143,22 @@ async def _on_key(self, event: events.Key) -> None: if (hasattr(event, "character") and event.character == "`") or event.key == "grave_accent": event.prevent_default() self.ready_tx(io_only=False) - if self.intent_processor.confirm_available_graph() and self.intent_processor.confirm_coordinates_path(): + if self.int_proc.has_graph() and self.int_proc.has_path(): self.walk_intent(send=True) - elif event.key == "escape" and "active" in self.ui["ot"].classes: - self.cancel_generation() + elif event.key == "escape" and "active" in self.ui["sl"].classes: + Message.stop(True) + self.stop_gen() elif (hasattr(event, "character") and event.character == "\r") or event.key == "enter": - self.alternate_panel("voice_panel", 1) + self.flip_panel("voice_panel", 1) self.ui["vp"].record_audio() - self.tx_audio_to_tokenizer() + self.audio_to_token() elif (hasattr(event, "character") and event.character == " ") or event.key == "space": - self.alternate_panel("voice_panel", 1) + self.flip_panel("voice_panel", 1) self.ui["vp"].play_audio() elif (event.name) == "ctrl_w" or event.key == "ctrl+w": self.clear_input() elif not self.ui["rp"].has_focus and ((hasattr(event, "character") and event.character == "\x7f") or event.key == "backspace"): - self.alternate_panel("message_panel", 0) + self.flip_panel("message_panel", 0) @debug_monitor def _on_mouse_scroll_down(self, event: events.MouseScrollUp) -> None: @@ -181,11 +172,14 @@ def _on_mouse_scroll_down(self, event: events.MouseScrollUp) -> None: self.ui["ot"].emulate_scroll_down() elif self.ui["it"].has_focus: event.prevent_default() - mode_name = self.ui["it"].emulate_scroll_down() - self.ui["ps"].current = self.input_map.get(mode_name) + mode_in_name = self.ui["it"].emulate_scroll_down() + self.ui["ps"].current = self.input_map.get(mode_in_name) if scroll_delta != [self.ui["it"].current_cell, self.ui["ot"].current_cell]: self.ready_tx() self.walk_intent() + self.ui["sl"].mode_in = self.ui["it"].current_cell + self.ui["sl"].mode_out = self.ui["ot"].current_cell + self.ui["sl"].prompt = next(iter(self.int_proc.models))[0] @debug_monitor def _on_mouse_scroll_up(self, event: events.MouseScrollUp) -> None: @@ -204,17 +198,20 @@ def _on_mouse_scroll_up(self, event: events.MouseScrollUp) -> None: if scroll_delta != [self.ui["it"].current_cell, self.ui["ot"].current_cell]: self.ready_tx() self.walk_intent() + self.ui["sl"].mode_in = self.ui["it"].current_cell + self.ui["sl"].mode_out = self.ui["ot"].current_cell + self.ui["sl"].prompt = next(iter(self.int_proc.models))[0] # @work(exclusive=True) @on(MessagePanel.Changed, "#message_panel") - async def tx_text_to_tokenizer(self) -> None: + async def txt_to_token(self) -> None: """Transmit info to token calculation""" message = self.ui["mp"].text - next_model = next(iter(self.intent_processor.registry_entries)).get("entry") - self.ui["db"].calculate_tokens(next_model.model, message=message) + next_model = next(iter(self.int_proc.models))[1] + self.ui["db"].calculate_tokens(next_model, message=message) @work(exclusive=True) - async def tx_audio_to_tokenizer(self) -> None: + async def audio_to_token(self) -> None: """Transmit audio to sample length""" duration = self.ui["vp"].calculate_sample_length() self.ui["db"].calculate_audio(duration) @@ -226,10 +223,10 @@ def ready_tx(self, io_only: bool = True, mode_in: str = None, mode_out: str = No mode_in = self.ui["it"].get_cell_at((self.ui["it"].current_row, 1)) if not mode_out: mode_out = self.ui["ot"].get_cell_at((self.ui["ot"].current_row, 1)) - self.intent_processor.derive_coordinates_path(mode_in=mode_in, mode_out=mode_out) - self.intent_processor.define_model_waypoints() + self.int_proc.set_path(mode_in=mode_in, mode_out=mode_out) + self.int_proc.set_ckpts() if not io_only: - self.media_pkg = { + self.tx_data = { "text": self.ui["mp"].text, "audio": self.ui["vp"].audio, # "attachment": self.message_panel.file # drag and drop from external window @@ -239,48 +236,61 @@ def ready_tx(self, io_only: bool = True, mode_in: str = None, mode_out: str = No # @work(exclusive=True) def walk_intent(self, send=False) -> None: """Provided the coordinates in the intent processor, follow the list of in and out methods""" - coordinates = self.intent_processor.coordinates_path - if not coordinates: - coordinates = ["text", "text"] - hop_length = len(coordinates) - 1 - for i in range(hop_length): - if self.intent_processor.confirm_coordinates_path() and self.intent_processor.confirm_model_waypoints(): - if i + 1 < hop_length: - if send: - self.media_pkg = self.send_tx(last_hop=False) - self.ready_tx(mode_in=coordinates[i + 1], mode_out=coordinates[i + 2]) - else: - old_model_names = self.intent_processor.model_names if self.intent_processor.model_names else [] - # nfo(old_model_names, "walk_intent") - self.ready_tx(mode_in=coordinates[i + 1], mode_out=coordinates[i + 2]) - self.intent_processor.model_names.extend(old_model_names) - # nfo(self.intent_processor.model_names) - - elif send: - self.send_tx() + coords = self.int_proc.coord_path + if not coords: + coords = ["text", "text"] + hops = len(coords) - 1 + for i in range(hops): + if i + 1 < hops: + if send: + self.tx_data = self.send_tx(last_hop=False) + self.ready_tx(mode_in=coords[i + 1], mode_out=coords[i + 2]) + else: + old_models = self.int_proc.models if self.int_proc.models else [] + dbug(old_models, "walk_intent") + self.ready_tx(mode_in=coords[i + 1], mode_out=coords[i + 2]) + self.int_proc.models.extend(old_models) + self.model_names = self.int_proc.models + dbug(self.int_proc.models) + + elif send: + self.send_tx() @work(exclusive=True) async def send_tx(self, last_hop=True) -> None: """Transfer path and promptmedia to generative processing endpoint""" - from nnll_11 import ChatMachineWithMemory, BasicQAHistory + from nnll_11 import ChatMachineWithMemory, QASignature - current_coords = next(iter(self.intent_processor.registry_entries)).get("entry") - chat = ChatMachineWithMemory(memory_size=5, signature=BasicQAHistory) + ckpt = self.ui["sl"].selection + if ckpt is None: + ckpt = next(iter(self.int_proc.ckpts))[1] + chat = ChatMachineWithMemory(sig=QASignature) + self.ui["rp"].on_text_area_changed() self.ui["rp"].insert("\n---\n") - self.ui["ot"].add_class("active") + self.ui["sl"].add_class("active") if last_hop: - nfo(current_coords) - async for chunk in chat.forward(media_pkg=self.media_pkg, model=current_coords.model, library=current_coords.library, max_workers=8): + nfo(ckpt) + async for chunk in chat.forward( + tx_data=self.tx_data, + model=ckpt.model, + library=ckpt.library, + max_workers=8, + ): if chunk is not None: self.ui["rp"].insert(chunk) - self.ui["ot"].set_classes(["output_tag"]) + self.ui["sl"].set_classes(["selectah"]) else: - self.media_pkg = chat.forward(media_pkg=self.media_pkg, model=current_coords.model, library=current_coords.library, max_workers=8) + self.tx_data = chat.forward( + tx_data=self.tx_data, + model=ckpt.model, + library=ckpt.library, + max_workers=8, + ) @work(exclusive=True) - async def cancel_generation(self) -> None: - """Stop the processing of a model""" + async def stop_gen(self) -> None: + """Cancel the inference processing of a model""" self.ui["rp"].workers.cancel_all() self.ui["ot"].set_classes("output_tag") @@ -289,17 +299,17 @@ async def clear_input(self) -> None: """Clear the input on the focused panel""" if self.ui["vp"].has_focus: self.ui["vp"].erase_audio() - self.tx_audio_to_tokenizer() + self.audio_to_token() elif self.ui["mp"].has_focus: self.ui["mp"].erase_message() @work(exclusive=True) - async def alternate_panel(self, id_name: str, y_coordinate: int) -> None: + async def flip_panel(self, id_name: str, y_coord: int) -> None: """Switch between text input and audio input :param id_name: The panel to switch to :param y_coordinate: _description_ """ - self.ui["it"].scroll_to(x=1, y=y_coordinate, force=True, immediate=True, on_complete=self.ui["it"].refresh) + self.ui["it"].scroll_to(x=1, y=y_coord, force=True, immediate=True, on_complete=self.ui["it"].refresh) self.ui["ps"].current = id_name diff --git a/nnll_10/package/output_tag.py b/nnll_10/package/output_tag.py index 856e2f48..76417573 100644 --- a/nnll_10/package/output_tag.py +++ b/nnll_10/package/output_tag.py @@ -13,10 +13,12 @@ class OutputTag(Carousel): target_options: reactive[set] = reactive({}) def on_mount(self): - graph_edges = self.query_ancestor(Screen).intent_processor.intent_graph.edges + scrn = self.query_ancestor(Screen) + if scrn.int_proc.has_graph(): + graph_edges = scrn.int_proc.intent_graph.edges - self.target_options = sorted({edge[0] for edge in graph_edges}, key=len) - self.add_columns("0", "1", "2") - self.add_rows([self.up.strip(), row.strip(), self.dwn.strip()] for row in self.target_options) - self.cursor_foreground_priority = "css" - self.cursor_background_priority = "css" + self.target_options = sorted({edge[0] for edge in graph_edges}, key=len) + self.add_columns("0", "1", "2") + self.add_rows([self.up.strip(), row.strip(), self.dwn.strip()] for row in self.target_options) + self.cursor_foreground_priority = "css" + self.cursor_background_priority = "css" diff --git a/nnll_10/package/selectah.py b/nnll_10/package/selectah.py new file mode 100644 index 00000000..18d8e41f --- /dev/null +++ b/nnll_10/package/selectah.py @@ -0,0 +1,87 @@ +# # # +# # # + +# pylint: disable=import-outside-toplevel + +import os +import networkx as nx +from textual import on, events +from textual.reactive import reactive +from textual.screen import Screen +from textual.widgets import Select +from textual.widgets._select import SelectCurrent, SelectOverlay + +from nnll_01 import debug_message as dbug # , debug_monitor + + +class Selectah(Select): + models: reactive[list[tuple[str, str]]] = reactive([("", "")]) + graph: nx.Graph = None + mode_in: str = "text" + mode_out: str = "text" + + def on_mount(self) -> None: + # self.options = self.graph.models + self.graph = self.query_ancestor(Screen).int_proc + # self.prompt = os.path.basename(next(iter(self.graph.models))[0]) + + @on(events.Focus) + async def on_focus(self) -> None: + """Expand panel immediately when clicked in terminal""" + if SelectOverlay.has_focus: + self.set_options(self.graph.models) + + @on(Select.Changed) + def on_changed(self) -> None: # event: Select.Changed) -> None: + """Rearrange models""" + try: + assert self.query_one(SelectCurrent).has_value + except AssertionError as error_log: + dbug(error_log) + else: + self.graph.edit_weight(selection=self.value, mode_in=self.mode_in, mode_out=self.mode_out) + self.set_options(self.graph.models) + self.prompt = next(iter(self.graph.models))[0] + + +# @on(SelectOverlay.blur) +# def on_select_overlay_blur(self, event: events.Blur) -> None: +# from rich.text import Text + +# nfo(event.control.id, "blah blah blah") +# # if event.control == SelectOverlay: +# self.ui["sl"].prompt = next(iter(self.int_proc.models))[0] +# label = self.ui["sl"].query_one("#label", Static) +# try: +# assert label.renderable == next(iter(self.int_proc.models))[0] +# except AssertionError as error_log: +# dbug(error_log) +# self.ui["sl"].prompt = next(iter(self.int_proc.models))[0] + +# @on(OptionList.OptionSelected) +# def on_select_overlay_option_selected(self, event: OptionList.OptionSelected) -> None: +# """Textual API event, refresh pathing, Switch checkpoint assignments""" +# nfo(" test write") +# overlay = self.ui["sl"].query_one(SelectOverlay) +# mode_in = self.ui["it"].get_cell_at((self.ui["it"].current_row, 1)) +# mode_out = self.ui["ot"].get_cell_at((self.ui["ot"].current_row, 1)) +# if self.ui["sl"].selection == Select.BLANK or self.ui["sl"].selection is None: +# selection = next(iter(self.int_proc.models))[1] +# else: +# selection = self.ui["sl"].selection +# self.int_proc.edit_weight(selection=selection, mode_in=mode_in, mode_out=mode_out) +# self.ready_tx() +# self.walk_intent() +# overlay.recompose() +# # self.ui["sl"].set_options(self.int_proc.models) + +# self.ready_tx() +# if self.int_proc.has_graph() and self.int_proc.has_path(): +# self.walk_intent() +# if self.int_proc.has_ckpt(): + +# self.ui["sl"].set_options(options=self.int_proc.models) +# nfo(self.int_proc.has_ckpt()) +# self.ui["sl"].expanded = False + +# @work(exclusive=True) diff --git a/nnll_11/__init__.py b/nnll_11/__init__.py index f8337e4d..e6630bc8 100644 --- a/nnll_11/__init__.py +++ b/nnll_11/__init__.py @@ -1,34 +1,38 @@ # # # # # # +# pylint: disable=pointless-statement +from typing import Any import dspy from pydantic import BaseModel, Field +from sympy import Basic from nnll_01 import debug_monitor, debug_message as dbug # , info_message as nfo from nnll_15.constants import LibType -class PictureSignature(dspy.Signature): - """Output the dog breed of the dog in the image.""" - - image_1: dspy.Image = dspy.InputField(desc="An image of a dog") - answer: str = dspy.OutputField(desc="The dog breed of the dog in the image") - +ps_sysprompt = "Provide x for Y" +bqa_sysprompt = "Reply with short responses within 60-90 word/10k character code limits" +ps_infield_tag = "An image of x" +ps_outfield_tag = "The nature of the x in the image." +ps_edit_message = "Edited input image of the dog with a yellow hat." -class LLMInput(BaseModel): - """Simple input query fields for chat models including previous contexts""" - context: str = Field(description="The context for the question") - query: str = Field(description="The message to respond to") +class PictureSignature(dspy.Signature): + f"""{ps_sysprompt}""" + image_input: dspy.Image = dspy.InputField(desc=ps_infield_tag) + answer: str = dspy.OutputField(desc=ps_outfield_tag) + image_output: dspy.Image = dspy.OutputField(desc=ps_edit_message) -class BasicQAHistory(dspy.Signature): - """Reply with short responses within 60-90 word/10k character code limits""" +class QASignature(dspy.Signature): + f"""{bqa_sysprompt}""" - message: LLMInput = dspy.InputField() - response = dspy.OutputField(desc="Often between 60 and 90 words and limited to 10000 character code blocks") + message: str = dspy.InputField(desc="The message to respond to") + answer = dspy.OutputField(desc="Often between 60 and 90 words and limited to 10000 character code blocks") +# signature: dspy.Signature = BasicQAHistory @debug_monitor async def get_api(model: str, library: LibType) -> dict: """ @@ -46,29 +50,33 @@ async def get_api(model: str, library: LibType) -> dict: model = {"model": model} # api_base="https://localhost:xxxx/address:port/sdbx/placeholder"} # huggingface/ elif library == LibType.VLLM: model = {"model": model, "api_base": "http://localhost:8000/chat/completions"} # hosted_vllm/ + if library == LibType.LLAMAFILE: + model = {"model": model, "api_base": "http://localhost:8080/v1", "api_key": "sk-no-key-required"} return model +# fact_checking = dspy.ChainOfThought('claims -> verdicts: list[bool]') +# fact_checking(claims=["Python was released in 1991.", "Python is a compiled language."]) + + class ChatMachineWithMemory(dspy.Module): """Base module for Q/A chats using async and `dspy.Predict` List-based memory - Defaults to 5 question history, 4 max workers, and `BasicQAHistory` query""" + Defaults to 5 question history, 4 max workers, and `HistorySignature` query""" @debug_monitor - def __init__(self, memory_size: int = 5, signature: dspy.Signature = BasicQAHistory): + def __init__(self, sig: dspy.Signature = QASignature, streaming=True) -> None: """ Instantiate the module, setup parameters, create async streaming generator.\n Does not load any models until forward pass - :param memory_size: The length of the memory :param signature: The format of messages sent to the model """ super().__init__() - self.memory = [] - self.memory_size = memory_size - generator = dspy.asyncify(dspy.Predict(signature)) # this should only be used in the case of text + self.streaming = streaming + generator = dspy.asyncify(program=dspy.Predict(signature=sig)) # this should only be used in the case of text self.completion = dspy.streamify(generator) @debug_monitor - async def forward(self, media_pkg: str, model: str, library: LibType, max_workers=4): + async def forward(self, tx_data: str, model: str, library: LibType, max_workers=4) -> Any: """ Forward pass for LLM Chat process\n :param model: The library-specific arguments for the model configuration @@ -86,17 +94,14 @@ async def forward(self, media_pkg: str, model: str, library: LibType, max_worker api_kwargs = await get_api(model=model, library=library) model = dspy.LM(**api_kwargs) dspy.settings.configure(lm=model, async_max_workers=max_workers) - combined_context = " ".join(self.memory) - text = media_pkg["text"] - self.memory.append(media_pkg) - if len(self.memory) == self.memory_size: - self.memory.pop(0) - async for chunk in self.completion(message={"context": combined_context, "query": text}, stream=True): + async for chunk in self.completion(message=tx_data["text"], stream=self.streaming): try: if chunk is not None: if isinstance(chunk, dspy.Prediction): - pass - # yield str(chunk) + if not self.streaming: + yield chunk.answer # the final, processed output + else: + pass else: yield chunk["choices"][0]["delta"]["content"] except (GeneratorExit, RuntimeError, AttributeError) as error_log: diff --git a/nnll_14/__init__.py b/nnll_14/__init__.py index d94c9341..56e9d817 100644 --- a/nnll_14/__init__.py +++ b/nnll_14/__init__.py @@ -3,51 +3,35 @@ # pylint: disable=import-outside-toplevel -# from typing import NamedTuple -import networkx as nx -from nnll_01 import debug_monitor, info_message as nfo # , debug_message as dbug - - -@debug_monitor -def calculate_graph() -> nx.Graph: - """Create coordinate pairs from valid conversions then deploy as a graph - assign edge attributes\n - :param nx_graph: Preassembled graph of models to label - :param ollama: Whether to process ollama models - :param hf_hub: Whether to process hub models - :return: Graph modeling all current ML/AI tasks appended with model data""" - from nnll_15 import VALID_CONVERSIONS, from_cache - - nx_graph = nx.MultiDiGraph() - nx_graph.add_nodes_from(VALID_CONVERSIONS) - registry_entries = from_cache() - # nfo(registry_entries) - if registry_entries: - for model in registry_entries: - nx_graph.add_edges_from(model.available_tasks, entry=model, weight=1.0) - return nx_graph - else: - nfo("Registry error, graph attributes not applied") - return nx_graph - - -@debug_monitor -def trace_objective(nx_graph: nx.Graph, mode_in: str, mode_out: str): - """Find a valid path from current state (mode_in) to designated state (mode_out)\n - :param nx_graph: Model graph to use for tracking operation order - :param mode_in: Input prompt type or starting state/states - :param mode_out: The user-selected ending-state - :return: An iterator for the edges forming a way towards the mode out, or Note""" - - model_path = None - if nx.has_path(nx_graph, mode_in, mode_out): # Ensure path exists (otherwise 'bidirectional' may loop infinitely) - if mode_in == mode_out and mode_in != "text": - orig_mode_out = mode_out # Solve case of non-text self-loop edge being incomplete transformation - mode_out = "text" - model_path = nx.bidirectional_shortest_path(nx_graph, mode_in, mode_out) - model_path.append(orig_mode_out) - else: - model_path = nx.bidirectional_shortest_path(nx_graph, mode_in, mode_out) - if len(model_path) == 1: - model_path.append(mode_out) # this behaviour likely to change in future - return model_path + +class DualWeight: + def __init__(self, weight_1=0.0, weight_2=0.0): + self.combined_weight = weight_1 + (weight_2 * 0.01) + + def get_weight_1(self): + return int(self.combined_weight) # Extract integer part + + def get_weight_2(self): + return (self.combined_weight - int(self.combined_weight)) * 100 # Extract fractional part + + def increment_weight_1(self, value): + self.combined_weight += value + self.combined_weight = int(self.combined_weight) + (self.combined_weight - int(self.combined_weight)) # Ensure no overflow into + + def increment_weight_2(self, value): + fractional_part = (self.combined_weight - int(self.combined_weight)) * 100 + value + self.combined_weight = int(self.combined_weight) + (fractional_part / 100) + + def decrement_weight_1(self, value): + self.increment_weight_1(-value) + + def decrement_weight_2(self, value): + self.increment_weight_2(-value) + + +dual_weight = DualWeight(5.0, 75.0) +print("Initial Weights - Weight 1:", dual_weight.get_weight_1(), "Weight 2:", dual_weight.get_weight_2()) +dual_weight.increment_weight_1(3) +dual_weight.decrement_weight_2(10) + +print("adjusted weights- weight 1:", dual_weight.get_weight_1(), "weight 2:", dual_weight.get_weight_2()) diff --git a/nnll_15/constants.py b/nnll_15/constants.py index afec32fa..604fb018 100644 --- a/nnll_15/constants.py +++ b/nnll_15/constants.py @@ -15,7 +15,7 @@ mir_db = JSONCache(CONFIG_PATH_NAMED) -API_NAMES: list = ["ollama", "huggingface_hub", "lmstudio", "vllm"] +API_NAMES: list = ["ollama", "huggingface_hub", "lmstudio", "vllm", "llamafile"] @debug_monitor @@ -43,6 +43,7 @@ class LibType(Enum): HUB: int = (1, API_NAMES[1] in sys_modules) VLLM: int = (2, API_NAMES[2] in sys_modules) LM_STUDIO: int = (3, API_NAMES[3] in sys_modules) + LLAMAFILE: int = (4, API_NAMES[4] in sys_modules) # CORTEX : Identical to OpenAI, http://localhost:39281 # LLAMAFILE : "http://localhost:8080/v1 api_key = "sk-no-key-required" diff --git a/nnll_16/__init__.py b/nnll_16/__init__.py index ab813933..79c37337 100644 --- a/nnll_16/__init__.py +++ b/nnll_16/__init__.py @@ -11,7 +11,9 @@ def first_available(processor=None): if not processor: processor = reduce( - lambda acc, check: check() if acc == "cpu" else acc, [lambda: "cuda" if torch.cuda.is_available() else "cpu", lambda: "mps" if torch.backends.mps.is_available() else "cpu", lambda: "xpu" if torch.xpu.is_available() else "cpu"], "cpu" + lambda acc, check: check() if acc == "cpu" else acc, + [lambda: "cuda" if torch.cuda.is_available() else "cpu", lambda: "mps" if torch.backends.mps.is_available() else "cpu", lambda: "xpu" if torch.xpu.is_available() else "cpu"], + "cpu", ) return torch.device(processor) diff --git a/tests/test_05.py b/tests/test_05.py index e314d792..0b2e5f3c 100644 --- a/tests/test_05.py +++ b/tests/test_05.py @@ -1,63 +1,64 @@ -# # # -# # # - - -# import networkx as nx -import nnll_01 -from nnll_01 import debug_message as dbug, debug_monitor, info_message as nfo -from nnll_05 import lookup_function_for, label_key_prompt # , split_sequence_by, main -from nnll_14 import calculate_graph, trace_objective -from nnll_15.constants import LibType # , loop_in_feature_processes -from tests import test_14_draw_graph - - -@debug_monitor -def test_main(): - import sys - - if "pytest" not in sys.modules: - nx_graph = calculate_graph() - nnll_01.debug_message(f"graph : {nx_graph}") - # example user input - content = {"text": "Test Prompt"} - target = "text" - prompt_type = label_key_prompt(content, mode_in="text") # , aux_processes = - traced_path = trace_objective(nx_graph, prompt_type, target) - dbug(f"traced_path : {traced_path}") - if traced_path is not None: - # """add attribute to nx_graph?""" - # nx_graph = nx_graph.copy() - # if len(aux_processes) > 0: - # for process_type in aux_processes: - # nx_graph = loop_in_feature_processes(nx_graph, process_type, target) - prompt = content[prompt_type] - import importlib - - output_map = {0: prompt} - for i in range(len(traced_path) - 1): - nfo(nx_graph["text"]) - registry_entry = nx_graph[traced_path[i]][traced_path[i + 1]] - nfo(registry_entry) - current_entry = registry_entry[next(iter(registry_entry))] - current_model = current_entry.get("model_id") - nfo(f"current model : {current_model}") - if current_entry.get("library") == "hub": - operations = lookup_function_for(current_model) - import_name = next(iter(operations)) - module = importlib.import_module(import_name) - func = getattr(module, module[import_name]) - test_output = (func, current_model, output_map[i]) - nfo(test_output) - output_map.setdefault(i + 1, test_output) - elif current_entry.get("library") == "ollama": - test_output = ("chat_machine", current_model, output_map[i]) - output_map.setdefault(i + 1, test_output) - print(output_map) - - # return response - - -# response = main(nx.graph, {"text": "Test Prompt"}, target="text") - -if __name__ == "__main__": - test_main() +# # # # +# # # # + + +# # import networkx as nx +# import nnll_01 +# from nnll_01 import debug_message as dbug, debug_monitor, info_message as nfo +# # from nnll_05 import lookup_function_for, label_key_prompt # , split_sequence_by, main +# from nnll_10 import IntentProcessor +# # from nnll_15.constants import LibType # , loop_in_feature_processes +# # from tests import test_14_draw_graph + + +# @debug_monitor +# def test_main(): +# import sys + +# if "pytest" not in sys.modules: +# int_proc = IntentProcessor() +# nx_graph = int_proc.alc_ggraph() +# nnll_01.debug_message(f"graph : {nx_graph}") +# # example user input +# content = {"text": "Test Prompt"} +# target = "text" +# prompt_type = label_key_prompt(content, mode_in="text") # , aux_processes = +# traced_path = int_proc.set_path(nx_graph, prompt_type, target) +# dbug(f"traced_path : {traced_path}") +# if traced_path is not None: +# # """add attribute to nx_graph?""" +# # nx_graph = nx_graph.copy() +# # if len(aux_processes) > 0: +# # for process_type in aux_processes: +# # nx_graph = loop_in_feature_processes(nx_graph, process_type, target) +# prompt = content[prompt_type] +# import importlib + +# output_map = {0: prompt} +# for i in range(len(traced_path) - 1): +# nfo(nx_graph["text"]) +# registry_entry = nx_graph[traced_path[i]][traced_path[i + 1]] +# nfo(registry_entry) +# current_entry = registry_entry[next(iter(registry_entry))] +# current_model = current_entry.get("model_id") +# nfo(f"current model : {current_model}") +# if current_entry.get("library") == "hub": +# operations = lookup_function_for(current_model) +# import_name = next(iter(operations)) +# module = importlib.import_module(import_name) +# func = getattr(module, module[import_name]) +# test_output = (func, current_model, output_map[i]) +# nfo(test_output) +# output_map.setdefault(i + 1, test_output) +# elif current_entry.get("library") == "ollama": +# test_output = ("chat_machine", current_model, output_map[i]) +# output_map.setdefault(i + 1, test_output) +# print(output_map) + +# # return response + + +# # response = main(nx.graph, {"text": "Test Prompt"}, target="text") + +# if __name__ == "__main__": +# test_main() diff --git a/tests/test_10_weight.py b/tests/test_10_weight.py new file mode 100644 index 00000000..7e848d1d --- /dev/null +++ b/tests/test_10_weight.py @@ -0,0 +1,43 @@ +# import unittest +# from unittest.mock import MagicMock + +# # from nnll_15 import RegistryEntry + + +# class TestIntent(unittest.TestCase): +# def setUp(self): +# """Initialize necessary objects for testing""" +# from nnll_10 import IntentProcessor + +# self.ip = IntentProcessor() +# self.ip.intent_graph = MagicMock() + +# def test_edit_weight_zero_base_weight(self): +# """Test case when base_weight is zero""" +# selection = "test_selection" +# index_num = 1 +# self.ip.intent_graph.edges.return_value = [("node1", "node2", {"entry": MagicMock(model="test_model"), "entry": MagicMock(model="test_model")})] + +# self.ip.edit_weight( +# selection, +# base_weight=1.0, +# ) + +# # Assertions to check if weight was incremented by 0.1 +# self.assertEqual(self.ip.intent_graph["node1"]["node2"][index_num]["weight"], 0.1) + +# # def test_edit_weight_non_zero_base_weight(self): +# # """Test case when base_weight is non-zero""" +# # selection = "test_selection" +# # index_num = 0 +# # self.ip.intent_graph.edges.return_value = [("node1", "node2", {"entry": MagicMock(model="test_model")})] +# # base_weight = 5.0 +# # self.ip.edit_weight(selection, base_weight=base_weight, index_num=index_num) + +# # # Assertions to check if weight was decremented by 0.1 + +# # self.assertEqual(self.ip.intent_graph["node1"]["node2"][index_num]["weight"], base_weight - 0.1) + + +# if __name__ == "__main_": +# unittest.main() diff --git a/tests/test_14_draw_graph.py b/tests/test_14_draw_graph.py index e69b83b2..79d2c025 100644 --- a/tests/test_14_draw_graph.py +++ b/tests/test_14_draw_graph.py @@ -1,108 +1,108 @@ -import networkx as nx -from nnll_14 import calculate_graph -import matplotlib.pyplot as plt -import numpy as np - -from nnll_15.constants import LibType - - -def draw_matplot_labeled(nx_graph: nx.Graph) -> None: - nx_graph = calculate_graph() - path = nx.bidirectional_shortest_path(nx_graph, "text", "image") - path_edges = list(zip(path, path[1:])) - edge_colors = ["red" if edge in path_edges or tuple(reversed(edge)) in path_edges else "black" for edge in nx_graph.edges()] - pos = nx.spring_layout(nx_graph) - nx.draw_networkx_nodes(nx_graph, pos) - nx.draw_networkx_edges(nx_graph, pos, edge_color=edge_colors) - nx.draw_networkx_labels(nx_graph, pos) - # nx.draw_networkx_edges(nx_graph, pos) - # nx.draw_networkx_edges(nx_graph, pos, edge_labels={(u, v): d for u, v, d in nx_graph.edges(data=True)}) - nx.draw_networkx_edge_labels(nx_graph, pos, edge_labels={(u, v): d for u, v, d in nx_graph.edges(data=True)}, font_size=3, alpha=0.5, rotate=False) - plt.show() - - -def draw_matplot_circular() -> None: - nx_graph = calculate_graph() - path = nx.bidirectional_shortest_path(nx_graph, "image", "speech") - path_edges = list(zip(path, path[1:])) - edge_colors = ["red" if edge in path_edges or tuple(reversed(edge)) in path_edges else "black" for edge in nx_graph.edges()] - pos = nx.circular_layout(nx_graph) - nx.draw_networkx_nodes(nx_graph, pos) - nx.draw_networkx_edges(nx_graph, pos, edge_color=edge_colors) - nx.draw_networkx_labels(nx_graph, pos) - nx.draw_circular(nx_graph) - plt.show() - - -def draw_matplot_graphviz() -> None: - nx_graph = calculate_graph() - path = nx.bidirectional_shortest_path(nx_graph, "image", "speech") - path_edges = list(zip(path, path[1:])) - edge_colors = ["red" if edge in path_edges or tuple(reversed(edge)) in path_edges else "black" for edge in nx_graph.edges()] - pos = nx.nx_agraph.graphviz_layout(nx_graph, prog="twopi", root=0) - options = {"with_labels": False, "alpha": 0.5, "node_size": 15} - nx.draw(nx_graph, pos, node_color=edge_colors, **options) - plt.show() - - -def draw_matplot_weights() -> None: - nx_graph = calculate_graph() - pos = nx.spring_layout(nx_graph, scale=20, k=3 / np.sqrt(nx_graph.order())) - nx.draw(nx_graph, pos=pos, node_color="lightblue", with_labels=True, node_size=500) - labels = nx.get_edge_attributes(nx_graph, "weight") - nx.draw_networkx_edge_labels(nx_graph, pos, edge_labels=labels) - plt.show() - - -if __name__ == "__main__": - # draw_matplot_weights() - draw_matplot_circular() - - -# from textual.app import App, ComposeResult -# from textual.widgets import Static -# from textual_plot import HiResMode # , PlotWidget -# from textual_plotext import PlotextPlot -# from networkx import convert_node_labels_to_integers +# import networkx as nx +# from nnll_14 import calculate_graph +# import matplotlib.pyplot as plt # import numpy as np -# from typing import Sequence -# class MinimalApp(App[None]): -# def compose(self) -> ComposeResult: -# yield Static() -# # yield PlotextPlot(id="plotext") -# # yield PlotWidget(id="plot) -# def on_mount(self) -> None: +# from nnll_15.constants import LibType + + +# def draw_matplot_labeled(nx_graph: nx.Graph) -> None: +# nx_graph = calculate_graph() +# path = nx.bidirectional_shortest_path(nx_graph, "text", "image") +# path_edges = list(zip(path, path[1:])) +# edge_colors = ["red" if edge in path_edges or tuple(reversed(edge)) in path_edges else "black" for edge in nx_graph.edges()] +# pos = nx.spring_layout(nx_graph) +# nx.draw_networkx_nodes(nx_graph, pos) +# nx.draw_networkx_edges(nx_graph, pos, edge_color=edge_colors) +# nx.draw_networkx_labels(nx_graph, pos) +# # nx.draw_networkx_edges(nx_graph, pos) +# # nx.draw_networkx_edges(nx_graph, pos, edge_labels={(u, v): d for u, v, d in nx_graph.edges(data=True)}) +# nx.draw_networkx_edge_labels(nx_graph, pos, edge_labels={(u, v): d for u, v, d in nx_graph.edges(data=True)}, font_size=3, alpha=0.5, rotate=False) +# plt.show() + + +# def draw_matplot_circular() -> None: +# nx_graph = calculate_graph() +# path = nx.bidirectional_shortest_path(nx_graph, "image", "speech") +# path_edges = list(zip(path, path[1:])) +# edge_colors = ["red" if edge in path_edges or tuple(reversed(edge)) in path_edges else "black" for edge in nx_graph.edges()] +# pos = nx.circular_layout(nx_graph) +# nx.draw_networkx_nodes(nx_graph, pos) +# nx.draw_networkx_edges(nx_graph, pos, edge_color=edge_colors) +# nx.draw_networkx_labels(nx_graph, pos) +# nx.draw_circular(nx_graph) +# plt.show() + + +# def draw_matplot_graphviz() -> None: +# nx_graph = calculate_graph() +# path = nx.bidirectional_shortest_path(nx_graph, "image", "speech") +# path_edges = list(zip(path, path[1:])) +# edge_colors = ["red" if edge in path_edges or tuple(reversed(edge)) in path_edges else "black" for edge in nx_graph.edges()] +# pos = nx.nx_agraph.graphviz_layout(nx_graph, prog="twopi", root=0) +# options = {"with_labels": False, "alpha": 0.5, "node_size": 15} +# nx.draw(nx_graph, pos, node_color=edge_colors, **options) +# plt.show() + + +# def draw_matplot_weights() -> None: +# nx_graph = calculate_graph() +# pos = nx.spring_layout(nx_graph, scale=20, k=3 / np.sqrt(nx_graph.order())) +# nx.draw(nx_graph, pos=pos, node_color="lightblue", with_labels=True, node_size=500) +# labels = nx.get_edge_attributes(nx_graph, "weight") +# nx.draw_networkx_edge_labels(nx_graph, pos, edge_labels=labels) +# plt.show() -# nx_graph = label_edge_attrib_for(nx_graph) -# self.draw_matplot(nx_graph) -# # nx_graph_num = convert_node_labels_to_integers(nx_graph) -# # self.draw_textual_plotext(nx_graph_num) -# # self.draw_textual_plot(nx_graph_num) +# if __name__ == "__main__": +# # draw_matplot_weights() +# draw_matplot_circular() -# def draw_textual_plotext(self, nx_graph_num: nx.Graph) -> None: -# plot = self.query_one("#plotext") -# first_column = [item[0] for item in list(nx_graph_num.edges())] -# second_column = [item[1] for item in list(nx_graph_num.edges())] -# plot.plot(x=first_column, y=second_column, hires_mode=HiResMode.BRAILLE, line_style="purple") -# def draw_textual_plot(self, nx_graph_num: nx.Graph) -> None: -# plot = self.query_one("#plot") -# plot.plt.scatter(nx_graph_num.edges()) +# # from textual.app import App, ComposeResult +# # from textual.widgets import Static +# # from textual_plot import HiResMode # , PlotWidget +# # from textual_plotext import PlotextPlot +# # from networkx import convert_node_labels_to_integers +# # import numpy as np +# # from typing import Sequence +# # class MinimalApp(App[None]): +# # def compose(self) -> ComposeResult: +# # yield Static() +# # # yield PlotextPlot(id="plotext") +# # # yield PlotWidget(id="plot) +# # def on_mount(self) -> None: -# if __name__ == "__main__": -# MinimalApp().run() +# # nx_graph = label_edge_attrib_for(nx_graph) +# # self.draw_matplot(nx_graph) +# # # nx_graph_num = convert_node_labels_to_integers(nx_graph) +# # # self.draw_textual_plotext(nx_graph_num) +# # # self.draw_textual_plot(nx_graph_num) + + +# # def draw_textual_plotext(self, nx_graph_num: nx.Graph) -> None: +# # plot = self.query_one("#plotext") +# # first_column = [item[0] for item in list(nx_graph_num.edges())] +# # second_column = [item[1] for item in list(nx_graph_num.edges())] +# # plot.plot(x=first_column, y=second_column, hires_mode=HiResMode.BRAILLE, line_style="purple") + +# # def draw_textual_plot(self, nx_graph_num: nx.Graph) -> None: +# # plot = self.query_one("#plot") +# # plot.plt.scatter(nx_graph_num.edges()) + + +# # if __name__ == "__main__": +# # MinimalApp().run() -# data = dict(list(nx_graph_num.edges())) -# for each in list(nx_graph_num.edges()): +# # data = dict(list(nx_graph_num.edges())) +# # for each in list(nx_graph_num.edges()): -# # nx -# # plt.show() -# # # plot.set_xticks([]) -# # # plot.set_yticks([]) -# # # plot.set_xlabel("") -# # # plot.set_ylabel("") -# # # plot.set_styles("background: #1f1f1f;") +# # # nx +# # # plt.show() +# # # # plot.set_xticks([]) +# # # # plot.set_yticks([]) +# # # # plot.set_xlabel("") +# # # # plot.set_ylabel("") +# # # # plot.set_styles("background: #1f1f1f;") diff --git a/tests/test_14_graph.py b/tests/test_14_graph.py index f99baea6..6b806efe 100644 --- a/tests/test_14_graph.py +++ b/tests/test_14_graph.py @@ -1,339 +1,339 @@ -# # # -# # # - -# pylint:disable=redefined-outer-name -# pylint:disable=redefined-builtin - -import datetime -from pathlib import PosixPath -from unittest import mock - -# import matplotlib.pyplot as plt -import pytest -from nnll_14 import calculate_graph -from nnll_15.constants import VALID_CONVERSIONS - - -class Model: - """Mock ollama Model class""" - - def __init__(self, model=None, modified_at=None, digest=None, size=None, details=None): - self.model = model - self.modified_at = modified_at - self.digest = digest - self.size = size - self.details = details - - -class ModelDetails: - """Mock ollama ModelDetails class""" - - def __init__(self, parent_model=None, format=None, family=None, families=None, parameter_size=None, quantization_level=None): - self.parent_model = parent_model - self.format = format - self.family = family - self.families = families - self.parameter_size = parameter_size - self.quantization_level = quantization_level - - -class ListResponse: - """Mock ollama ListResponse class""" - - def __init__(self, models=None): - self.models = models - - -@pytest.fixture(scope="session") -def mock_ollama_data(): - """Mock ollama response""" - with mock.patch("ollama.list", new_callable=mock.MagicMock()) as mock_get_registry_data: - data = ListResponse( - models=[ - Model( - model="hf.co/unsloth/gemma-3-27b-it-GGUF:Q8_0", - modified_at=datetime.datetime(2025, 3, 19, 12, 21, 19, 112890, tzinfo=None), - digest="965289b1e3e63c66bfc018051b6a907b2f0b18620d5721dd1cdfad759b679a2c", - size=29565711760, - details=ModelDetails(parent_model="", format="gguf", family="gemma3", families=["gemma3"], parameter_size="27B", quantization_level="unknown"), - ), - Model( - model="hf.co/unsloth/gemma-3-27b-it-GGUF:Q5_K_M", - modified_at=datetime.datetime(2025, 3, 18, 12, 13, 57, 294851, tzinfo=None), - digest="82c7d241b764d0346f382a9059a7b08056075c7bc2d81ac21dfa20d525556b16", - size=20129415184, - details=ModelDetails(parent_model="", format="gguf", family="gemma3", families=["gemma3"], parameter_size="27B", quantization_level="unknown"), - ), - Model( - model="hf.co/bartowski/RekaAI_reka-flash-3-GGUF:Q5_K_M", - modified_at=datetime.datetime(2025, 3, 13, 18, 28, 57, 859962, tzinfo=None), - digest="43d35cd4e25e90f9cbb33585f60823450bd1f279c4703a1b2831a9cba73e60e4", - size=15635474582, - details=ModelDetails(parent_model="", format="gguf", family="llama", families=["llama"], parameter_size="20.9B", quantization_level="unknown"), - ), - ] - ) - mock_get_registry_data.return_value = data - yield mock_get_registry_data - - -class HFCacheInfo: - """Mock hub cache""" - - def __init__(self, size_on_disk, repos): - self.size_on_disk = size_on_disk - self.repos = repos - - -class CachedRepoInfo: - """Mock hub repo cache""" - - def __init__(self, repo_id, repo_type, repo_path, size_on_disk, nb_files, revisions, files, last_accessed, last_modified): - self.repo_id = repo_id - self.repo_type = repo_type - self.repo_path = repo_path - self.size_on_disk = size_on_disk - self.nb_files = nb_files - self.revisions = revisions - self.files = files - self.last_accessed = last_accessed - self.last_modified = last_modified - - -@pytest.fixture(scope="session") -def mock_hub_data(): - """Mock hub data""" - with mock.patch("huggingface_hub.scan_cache_dir", new_callable=mock.MagicMock()) as mock_get_registry_data: - data = HFCacheInfo( - size_on_disk=91018285403, - repos=frozenset( - { - CachedRepoInfo( - repo_id="parler-tts/parler-tts-large-v1", - repo_type="model", - repo_path=PosixPath("/Users/unauthorized/.cache/huggingface/hub/models--parler-tts--parler-tts-large-v1"), - size_on_disk=9335526346, - nb_files=14, - revisions=None, - files=None, - last_accessed=1741910585.3828554, - last_modified=1741908821.5103855, - ), - CachedRepoInfo( - repo_id="THUDM/CogView3-Plus-3B", - repo_type="model", - repo_path=PosixPath("/Users/unauthorized/.cache/huggingface/hub/models--THUDM--CogView3-Plus-3B"), - size_on_disk=25560123724, - nb_files=20, - revisions=None, - files=None, - last_accessed=1741827083.5111423, - last_modified=1741827083.4126444, - ), - } - ), - ) - mock_get_registry_data.return_value = data - yield mock_get_registry_data - - -def test_mocked_ollama(mock_ollama_data): - """Check if mocking ollama correctly""" - result = mock_ollama_data() - - assert len(result.models) == 3 - next_model = next(iter(result.models)) - assert next_model.model == "hf.co/unsloth/gemma-3-27b-it-GGUF:Q8_0" - assert next_model.size == 29565711760 - - -def test_mocked_hub(mock_hub_data): - """Check if mocking hub correctly. - `frozenset` is converted to a sorted list - Otherwise hashed return becomes unordered""" - result = mock_hub_data() - new_list = [] - assert len(result.repos) == 2 - next_model = [*result.repos] - for x in next_model: - new_list.append([x.repo_id, x.size_on_disk]) - new_list.sort(key=lambda x: x[1]) - assert new_list[0][0] == "parler-tts/parler-tts-large-v1" - assert new_list[0][1] == 9335526346 - - -def test_create_graph(mock_ollama_data, mock_hub_data): - """Run test of graph creation""" - nx_graph = calculate_graph() - - assert list(nx_graph) == VALID_CONVERSIONS - key_data = nx_graph.edges.data("key") - for edge in key_data: - if edge[2] is not None: - assert isinstance(edge[2], str) - else: - assert isinstance(edge[1], str) - - size_data = nx_graph.edges.data("size") - for edge in size_data: - if edge[2] is not None: - assert isinstance(edge[2], int) - else: - assert isinstance(edge[1], str) - - -# when user presses trigger : -# run shortest distance, then run operations identified on edges - -# seen = set() -# [e[1] for e in nx_graph.edges if e[1] not in seen and not seen.add(e[1])] - -# nx_graph['speech']['text’] get all paths towards -# get all size on graph -# nx_graph.edges.data(“keys”) get all model name on graph -# nx_graph.edges['text','speech',0]['key'] - -# nx_graph.out_degree('text') get number of edges/paths pointing away -# nx_graph.in_degree('text') get number of edges/paths pointing towards -# nx_graph.edges[‘text’, ‘image’][‘weight'] = 4.2 change attribute - - -# node_attrib = nx.get_node_attributes(nx_graph, “model”) -# node_attrib[‘text’] - - -# nx.draw_networkx -# adjacent_pairs = [(key, item) for key, value in VALID_CONVERSIONS.input.items() for item in (value.values if isinstance(value.values, tuple) else ())] -# from typing import Dict, Tuple -# from pydantic import BaseModel, Field - -# class ConversionValue(BaseModel): -# """(output_medium, more_output_medium)""" +# # # # +# # # # + +# # pylint:disable=redefined-outer-name +# # pylint:disable=redefined-builtin + +# import datetime +# from pathlib import PosixPath +# from unittest import mock + +# # import matplotlib.pyplot as plt +# import pytest +# from nnll_14 import calculate_graph +# from nnll_15.constants import VALID_CONVERSIONS + + +# class Model: +# """Mock ollama Model class""" + +# def __init__(self, model=None, modified_at=None, digest=None, size=None, details=None): +# self.model = model +# self.modified_at = modified_at +# self.digest = digest +# self.size = size +# self.details = details + + +# class ModelDetails: +# """Mock ollama ModelDetails class""" + +# def __init__(self, parent_model=None, format=None, family=None, families=None, parameter_size=None, quantization_level=None): +# self.parent_model = parent_model +# self.format = format +# self.family = family +# self.families = families +# self.parameter_size = parameter_size +# self.quantization_level = quantization_level + + +# class ListResponse: +# """Mock ollama ListResponse class""" + +# def __init__(self, models=None): +# self.models = models + + +# @pytest.fixture(scope="session") +# def mock_ollama_data(): +# """Mock ollama response""" +# with mock.patch("ollama.list", new_callable=mock.MagicMock()) as mock_get_registry_data: +# data = ListResponse( +# models=[ +# Model( +# model="hf.co/unsloth/gemma-3-27b-it-GGUF:Q8_0", +# modified_at=datetime.datetime(2025, 3, 19, 12, 21, 19, 112890, tzinfo=None), +# digest="965289b1e3e63c66bfc018051b6a907b2f0b18620d5721dd1cdfad759b679a2c", +# size=29565711760, +# details=ModelDetails(parent_model="", format="gguf", family="gemma3", families=["gemma3"], parameter_size="27B", quantization_level="unknown"), +# ), +# Model( +# model="hf.co/unsloth/gemma-3-27b-it-GGUF:Q5_K_M", +# modified_at=datetime.datetime(2025, 3, 18, 12, 13, 57, 294851, tzinfo=None), +# digest="82c7d241b764d0346f382a9059a7b08056075c7bc2d81ac21dfa20d525556b16", +# size=20129415184, +# details=ModelDetails(parent_model="", format="gguf", family="gemma3", families=["gemma3"], parameter_size="27B", quantization_level="unknown"), +# ), +# Model( +# model="hf.co/bartowski/RekaAI_reka-flash-3-GGUF:Q5_K_M", +# modified_at=datetime.datetime(2025, 3, 13, 18, 28, 57, 859962, tzinfo=None), +# digest="43d35cd4e25e90f9cbb33585f60823450bd1f279c4703a1b2831a9cba73e60e4", +# size=15635474582, +# details=ModelDetails(parent_model="", format="gguf", family="llama", families=["llama"], parameter_size="20.9B", quantization_level="unknown"), +# ), +# ] +# ) +# mock_get_registry_data.return_value = data +# yield mock_get_registry_data + + +# class HFCacheInfo: +# """Mock hub cache""" + +# def __init__(self, size_on_disk, repos): +# self.size_on_disk = size_on_disk +# self.repos = repos + + +# class CachedRepoInfo: +# """Mock hub repo cache""" + +# def __init__(self, repo_id, repo_type, repo_path, size_on_disk, nb_files, revisions, files, last_accessed, last_modified): +# self.repo_id = repo_id +# self.repo_type = repo_type +# self.repo_path = repo_path +# self.size_on_disk = size_on_disk +# self.nb_files = nb_files +# self.revisions = revisions +# self.files = files +# self.last_accessed = last_accessed +# self.last_modified = last_modified + + +# @pytest.fixture(scope="session") +# def mock_hub_data(): +# """Mock hub data""" +# with mock.patch("huggingface_hub.scan_cache_dir", new_callable=mock.MagicMock()) as mock_get_registry_data: +# data = HFCacheInfo( +# size_on_disk=91018285403, +# repos=frozenset( +# { +# CachedRepoInfo( +# repo_id="parler-tts/parler-tts-large-v1", +# repo_type="model", +# repo_path=PosixPath("/Users/unauthorized/.cache/huggingface/hub/models--parler-tts--parler-tts-large-v1"), +# size_on_disk=9335526346, +# nb_files=14, +# revisions=None, +# files=None, +# last_accessed=1741910585.3828554, +# last_modified=1741908821.5103855, +# ), +# CachedRepoInfo( +# repo_id="THUDM/CogView3-Plus-3B", +# repo_type="model", +# repo_path=PosixPath("/Users/unauthorized/.cache/huggingface/hub/models--THUDM--CogView3-Plus-3B"), +# size_on_disk=25560123724, +# nb_files=20, +# revisions=None, +# files=None, +# last_accessed=1741827083.5111423, +# last_modified=1741827083.4126444, +# ), +# } +# ), +# ) +# mock_get_registry_data.return_value = data +# yield mock_get_registry_data + + +# def test_mocked_ollama(mock_ollama_data): +# """Check if mocking ollama correctly""" +# result = mock_ollama_data() + +# assert len(result.models) == 3 +# next_model = next(iter(result.models)) +# assert next_model.model == "hf.co/unsloth/gemma-3-27b-it-GGUF:Q8_0" +# assert next_model.size == 29565711760 + + +# def test_mocked_hub(mock_hub_data): +# """Check if mocking hub correctly. +# `frozenset` is converted to a sorted list +# Otherwise hashed return becomes unordered""" +# result = mock_hub_data() +# new_list = [] +# assert len(result.repos) == 2 +# next_model = [*result.repos] +# for x in next_model: +# new_list.append([x.repo_id, x.size_on_disk]) +# new_list.sort(key=lambda x: x[1]) +# assert new_list[0][0] == "parler-tts/parler-tts-large-v1" +# assert new_list[0][1] == 9335526346 + + +# def test_create_graph(mock_ollama_data, mock_hub_data): +# """Run test of graph creation""" +# nx_graph = calculate_graph() + +# assert list(nx_graph) == VALID_CONVERSIONS +# key_data = nx_graph.edges.data("key") +# for edge in key_data: +# if edge[2] is not None: +# assert isinstance(edge[2], str) +# else: +# assert isinstance(edge[1], str) + +# size_data = nx_graph.edges.data("size") +# for edge in size_data: +# if edge[2] is not None: +# assert isinstance(edge[2], int) +# else: +# assert isinstance(edge[1], str) + + +# # when user presses trigger : +# # run shortest distance, then run operations identified on edges + +# # seen = set() +# # [e[1] for e in nx_graph.edges if e[1] not in seen and not seen.add(e[1])] + +# # nx_graph['speech']['text’] get all paths towards +# # get all size on graph +# # nx_graph.edges.data(“keys”) get all model name on graph +# # nx_graph.edges['text','speech',0]['key'] + +# # nx_graph.out_degree('text') get number of edges/paths pointing away +# # nx_graph.in_degree('text') get number of edges/paths pointing towards +# # nx_graph.edges[‘text’, ‘image’][‘weight'] = 4.2 change attribute + + +# # node_attrib = nx.get_node_attributes(nx_graph, “model”) +# # node_attrib[‘text’] + + +# # nx.draw_networkx +# # adjacent_pairs = [(key, item) for key, value in VALID_CONVERSIONS.input.items() for item in (value.values if isinstance(value.values, tuple) else ())] +# # from typing import Dict, Tuple +# # from pydantic import BaseModel, Field + +# # class ConversionValue(BaseModel): +# # """(output_medium, more_output_medium)""" -# values: Field() +# # values: Field() -# class ConversionMap(BaseModel): -# """{input_medium: (output_medium, more_output_medium)""" +# # class ConversionMap(BaseModel): +# # """{input_medium: (output_medium, more_output_medium)""" -# input: Dict[str, ConversionValue] +# # input: Dict[str, ConversionValue] -# from networkx import convert_node_labels_to_integers +# # from networkx import convert_node_labels_to_integers -# mllama (vllm), text-to-image, text-generation +# # mllama (vllm), text-to-image, text-generation -# 2. Add nodes for each unique data type and model combination (e.g., `['text']`, `['image']`, etc.) and edges representing the transformations between them using models. +# # 2. Add nodes for each unique data type and model combination (e.g., `['text']`, `['image']`, etc.) and edges representing the transformations between them using models. -# # Define a function to add edges based on input-output pairs -# def add_model_edges(G, input_type, model_dict, output_type): -# for model_name, model_path in model_dict.items(): -# G.add_edge(str(input_type), str(output_type), model_name=model_name, model_path=model_path) +# # # Define a function to add edges based on input-output pairs +# # def add_model_edges(G, input_type, model_dict, output_type): +# # for model_name, model_path in model_dict.items(): +# # G.add_edge(str(input_type), str(output_type), model_name=model_name, model_path=model_path) -# # Add the specified paths to your graph -# add_model_edges(G, ["text"], {"model1": "path1"}, ["text"]) -# add_model_edges(G, ["text", "image"], {"model2": "path2"}, ["text"]) -# add_model_edges(G, ["image"], {"model3": "path3"}, ["text"]) -# add_model_edges(G, ["text"], {"model4": "path4"}, ["image"]) -# add_model_edges(G, ["speech"], {"model5": "path5"}, ["text"]) -# add_model_edges(G, ["text"], {"model6": "path6"}, ["speech"]) +# # # Add the specified paths to your graph +# # add_model_edges(G, ["text"], {"model1": "path1"}, ["text"]) +# # add_model_edges(G, ["text", "image"], {"model2": "path2"}, ["text"]) +# # add_model_edges(G, ["image"], {"model3": "path3"}, ["text"]) +# # add_model_edges(G, ["text"], {"model4": "path4"}, ["image"]) +# # add_model_edges(G, ["speech"], {"model5": "path5"}, ["text"]) +# # add_model_edges(G, ["text"], {"model6": "path6"}, ["speech"]) -# # Example: Find all paths from ['text'] to ['image'] -# paths = list(nx.all_simple_paths(G, str(["text"]), str(["image"]))) -# print(paths) +# # # Example: Find all paths from ['text'] to ['image'] +# # paths = list(nx.all_simple_paths(G, str(["text"]), str(["image"]))) +# # print(paths) -# node would be format -# edge would be conversion model -# 'vllm' -# 'text-generation' +# # node would be format +# # edge would be conversion model +# # 'vllm' +# # 'text-generation' -# input #output -# node #edge #node -# {['text']} { model name: model path} } { ['text']] } -# [['text', 'image']: { model name: model path} } { ['text'] } -# {['image']: { model name: model path} } { ['text'] } -# {['text']: { model name: model path} } { ['image'] } -# {['speech']: { model name: model path} } { ['text'] } -# {['text']: { model name: model path} } { ['speech']} +# # input #output +# # node #edge #node +# # {['text']} { model name: model path} } { ['text']] } +# # [['text', 'image']: { model name: model path} } { ['text'] } +# # {['image']: { model name: model path} } { ['text'] } +# # {['text']: { model name: model path} } { ['image'] } +# # {['speech']: { model name: model path} } { ['text'] } +# # {['text']: { model name: model path} } { ['speech']} -# bidirectional_shortest_path(G, mode_in, mode_out) +# # bidirectional_shortest_path(G, mode_in, mode_out) -# G.add_edges_from[(2, 3, {"weight": 3.1415})] #add edge with attribute -# G.add_nodes_from([(4, {"color": "red"}), (5, {"color": "green"})]) #add node with attribute -# H = nx.path_graph(10) -# G.add_nodes_from(H) # import one graph into another -# G.add_node(H) # the entire graph as a node -# G.clear() +# # G.add_edges_from[(2, 3, {"weight": 3.1415})] #add edge with attribute +# # G.add_nodes_from([(4, {"color": "red"}), (5, {"color": "green"})]) #add node with attribute +# # H = nx.path_graph(10) +# # G.add_nodes_from(H) # import one graph into another +# # G.add_node(H) # the entire graph as a node +# # G.clear() -# class ConversionGraph: -# def __init__(self, graph): -# self.graph = graph # Graph where keys are formats, values are dict of {format: (steps, quality)} +# # class ConversionGraph: +# # def __init__(self, graph): +# # self.graph = graph # Graph where keys are formats, values are dict of {format: (steps, quality)} -# def manhattan_distance(self, node1, node2): -# return abs(node1[0] - node2[0]) + abs(node1[1] - node2[1]) +# # def manhattan_distance(self, node1, node2): +# # return abs(node1[0] - node2[0]) + abs(node1[1] - node2[1]) -# def find_conversion_path(self, initial_format, target_format): -# # Check if direct conversion is possible -# if target_format in self.graph[initial_format]: -# return [(initial_format, target_format)] +# # def find_conversion_path(self, initial_format, target_format): +# # # Check if direct conversion is possible +# # if target_format in self.graph[initial_format]: +# # return [(initial_format, target_format)] -# # Initialize variables for pathfinding -# queue = [[(initial_format, 0, float("inf"))]] # (format, steps, quality) -# visited = set() +# # # Initialize variables for pathfinding +# # queue = [[(initial_format, 0, float("inf"))]] # (format, steps, quality) +# # visited = set() -# while queue: -# path = queue.pop(0) -# current_node = path[-1][0] -# current_steps = path[-1][1] -# current_quality = path[-1][2] +# # while queue: +# # path = queue.pop(0) +# # current_node = path[-1][0] +# # current_steps = path[-1][1] +# # current_quality = path[-1][2] -# if current_node == target_format: -# return path +# # if current_node == target_format: +# # return path -# for neighbor, (steps, quality) in self.graph[current_node].items(): -# # Avoid backtracking and only move forward -# if neighbor not in visited: -# new_steps = current_steps + steps -# new_quality = min(current_quality, quality) -# distance_to_goal = self.manhattan_distance((new_steps, new_quality), (0, float("inf"))) +# # for neighbor, (steps, quality) in self.graph[current_node].items(): +# # # Avoid backtracking and only move forward +# # if neighbor not in visited: +# # new_steps = current_steps + steps +# # new_quality = min(current_quality, quality) +# # distance_to_goal = self.manhattan_distance((new_steps, new_quality), (0, float("inf"))) -# # Prioritize paths with fewer steps but consider higher quality nodes -# queue.append(path + [(neighbor, new_steps, new_quality)]) +# # # Prioritize paths with fewer steps but consider higher quality nodes +# # queue.append(path + [(neighbor, new_steps, new_quality)]) -# visited.add(current_node) -# queue.sort(key=lambda p: (len(p), -p[-1][2])) # Sort by path length and quality +# # visited.add(current_node) +# # queue.sort(key=lambda p: (len(p), -p[-1][2])) # Sort by path length and quality -# return None +# # return None -# # (steps, quality) -# graph = { -# "FormatA": {"FormatB": (1, 8), "FormatC": (2, 9)}, -# "FormatB": {"FormatD": (1, 7)}, -# "FormatC": {"FormatD": (3, 6), "FormatE": (2, 10)}, -# "FormatD": {"TargetFormat": (1, 5)}, -# "FormatE": {"TargetFormat": (1, 9)}, -# } +# # # (steps, quality) +# # graph = { +# # "FormatA": {"FormatB": (1, 8), "FormatC": (2, 9)}, +# # "FormatB": {"FormatD": (1, 7)}, +# # "FormatC": {"FormatD": (3, 6), "FormatE": (2, 10)}, +# # "FormatD": {"TargetFormat": (1, 5)}, +# # "FormatE": {"TargetFormat": (1, 9)}, +# # } -# if __name__ == "__main__": -# converter = ConversionGraph(graph) -# path = converter.find_conversion_path("FormatA", "TargetFormat") -# print("Conversion Path:", path) - - -# for model, details in ollama_models.items(): -# nx_graph.add_edges_from(details.available_tasks, key=model, weight=details.size) -# for model, details in hub_models.items(): -# nx_graph.add_edges_from(details.available_tasks, key=model, weight=details.size) -# nx_graph = build_conversion_graph() -# ollama_models = from_ollama_cache() -# hub_models = from_hf_hub_cache() -# for model, details in ollama_models.items(): -# nx_graph.add_edges_from(details.available_tasks, label=model, weight=details.size) -# for model, details in hub_models.items(): -# nx_graph.add_edges_from(details.available_tasks, label=model, weight=details.size) +# # if __name__ == "__main__": +# # converter = ConversionGraph(graph) +# # path = converter.find_conversion_path("FormatA", "TargetFormat") +# # print("Conversion Path:", path) + + +# # for model, details in ollama_models.items(): +# # nx_graph.add_edges_from(details.available_tasks, key=model, weight=details.size) +# # for model, details in hub_models.items(): +# # nx_graph.add_edges_from(details.available_tasks, key=model, weight=details.size) +# # nx_graph = build_conversion_graph() +# # ollama_models = from_ollama_cache() +# # hub_models = from_hf_hub_cache() +# # for model, details in ollama_models.items(): +# # nx_graph.add_edges_from(details.available_tasks, label=model, weight=details.size) +# # for model, details in hub_models.items(): +# # nx_graph.add_edges_from(details.available_tasks, label=model, weight=details.size)