From 05ffbbe544dc8ccd6bd5abce1b309acfad694fe6 Mon Sep 17 00:00:00 2001 From: rickard Date: Fri, 18 Oct 2024 21:52:32 +0200 Subject: [PATCH 1/6] story context --- llm_config.yaml | 1 + stories/combat_sandbox/story.py | 2 +- stories/demo/story.py | 10 +- stories/prancingllama/story.py | 10 +- tale/cmds/normal.py | 2 +- tale/driver.py | 4 +- tale/dungeon/dungeon_generator.py | 2 +- tale/json_story.py | 2 +- tale/llm/contexts/BaseContext.py | 12 +- tale/llm/{llm_ext.py => dynamic_story.py} | 4 +- tale/llm/llm_cache.py | 4 +- tale/llm/llm_utils.py | 2 +- tale/llm/story_building.py | 4 + tale/llm/world_building.py | 2 +- tale/parse_utils.py | 17 +- tale/random_event.py | 2 +- tale/story.py | 32 +- tale/story_builder.py | 2 +- tale/tio/console_http_io.py | 189 ++++++++ tale/tio/if_browser_io_text.py | 561 ++++++++++++++++++++++ tale/web/resources/test.jpg | Bin 1419 -> 0 bytes tale/web/web_utils.py | 13 +- tale/zone.py | 6 +- tests/files/test_cache.json | 11 +- tests/test_llm_ext.py | 2 +- tests/test_mudobjects.py | 2 +- tests/test_normal_commands.py | 2 +- tests/test_parse_utils.py | 9 +- tests/test_quests.py | 2 +- tests/test_spells.py | 2 +- tests/test_story_builder.py | 2 +- tests/test_web_utils.py | 4 +- tests/test_wizard_commands.py | 2 +- tests/test_zone.py | 3 +- 34 files changed, 868 insertions(+), 56 deletions(-) rename tale/llm/{llm_ext.py => dynamic_story.py} (98%) create mode 100644 tale/tio/console_http_io.py create mode 100644 tale/tio/if_browser_io_text.py delete mode 100644 tale/web/resources/test.jpg diff --git a/llm_config.yaml b/llm_config.yaml index 745334be..299d69a3 100644 --- a/llm_config.yaml +++ b/llm_config.yaml @@ -46,3 +46,4 @@ REQUEST_FOLLOW_PROMPT: '{context}\n[USER_START]Act as as {cha DAY_CYCLE_EVENT_PROMPT: '{context}\n[USER_START] Write up to two sentences describing the transition from {from_time} to {to_time} in {location_name}, using the information supplied inside the tags.' NARRATIVE_EVENT_PROMPT: '{context}\n[USER_START] Write a narrative event that occurs in {location_name} using the information supplied inside the tags. The event should be related to the location and the characters present. Use up to 50 words.' RANDOM_SPAWN_PROMPT: '{context}\n[USER_START] An npc or a mob has entered {location_name}. Select either and fill in one of the following templates using the information supplied inside the tags. Respond using JSON in the following format: {npc_template}' +ADVANCE_STORY_PROMPT: '{context}\n[USER_START] Advance the story in {location_name}. Use the information supplied inside the tags to write a paragraph that progresses the story. Use up to 100 words.' \ No newline at end of file diff --git a/stories/combat_sandbox/story.py b/stories/combat_sandbox/story.py index 4a5e9473..ae37676e 100644 --- a/stories/combat_sandbox/story.py +++ b/stories/combat_sandbox/story.py @@ -7,7 +7,7 @@ from tale.base import Location from tale.driver import Driver from tale.json_story import JsonStory -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.main import run_from_cmdline from tale.player import Player, PlayerConnection from tale.charbuilder import PlayerNaming diff --git a/stories/demo/story.py b/stories/demo/story.py index ceda5d69..31f294ee 100644 --- a/stories/demo/story.py +++ b/stories/demo/story.py @@ -5,10 +5,12 @@ Copyright by Irmen de Jong (irmen@razorvine.net) """ import datetime +import pathlib import sys from typing import Optional, Generator from tale.driver import Driver +from tale.main import run_from_cmdline from tale.player import Player, PlayerConnection from tale.charbuilder import PlayerNaming from tale.story import * @@ -85,5 +87,9 @@ def goodbye(self, player: Player) -> None: if __name__ == "__main__": # story is invoked as a script, start it. - from tale.main import run_from_cmdline - run_from_cmdline(["--game", sys.path[0]]) + gamedir = pathlib.Path(__file__).parent + if gamedir.is_dir() or gamedir.is_file(): + cmdline_args = sys.argv[1:] + cmdline_args.insert(0, "--game") + cmdline_args.insert(1, str(gamedir)) + run_from_cmdline(cmdline_args) \ No newline at end of file diff --git a/stories/prancingllama/story.py b/stories/prancingllama/story.py index a73bb0db..48f1057b 100644 --- a/stories/prancingllama/story.py +++ b/stories/prancingllama/story.py @@ -6,7 +6,7 @@ from tale.base import Location from tale.cmds import spells from tale.driver import Driver -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.skills.magic import MagicType from tale.main import run_from_cmdline from tale.player import Player, PlayerConnection @@ -42,7 +42,7 @@ class Story(DynamicStory): def init(self, driver: Driver) -> None: """Called by the game driver when it is done with its initial initialization.""" self.driver = driver - self._zones = dict() # type: dict(str, Zone) + self._zones = dict() # type: {str, Zone} self._zones["The Prancing Llama"] = Zone("The Prancing Llama", description="A cold, craggy mountain range. Snow covered peaks and uncharted valleys hide and attract all manners of creatures.") import zones.prancingllama for location in zones.prancingllama.all_locations: @@ -96,13 +96,13 @@ def goodbye(self, player: Player) -> None: player.tell("Goodbye, %s. Please come back again soon." % player.title) player.tell("\n") - def races_for_zone(self, zone: str) -> [str]: + def races_for_zone(self, zone: str) -> list[str]: return self._catalogue._creatures - def items_for_zone(self, zone: str) -> [str]: + def items_for_zone(self, zone: str) -> list[str]: return self._catalogue._items - def zone_info(self, zone_name: str, location: str) -> dict(): + def zone_info(self, zone_name: str, location: str) -> dict: zone_info = super.zone_info(zone_name, location) zone_info['races'] = self.races_for_zone(zone_name) zone_info['items'] = self.items_for_zone(zone_name) diff --git a/tale/cmds/normal.py b/tale/cmds/normal.py index 8105bbcc..c4db5ed5 100644 --- a/tale/cmds/normal.py +++ b/tale/cmds/normal.py @@ -11,7 +11,7 @@ from typing import Iterable, List, Dict, Generator, Union, Optional from tale.llm.LivingNpc import LivingNpc -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.skills.skills import SkillType from . import abbreviations, cmd, disabled_in_gamemode, disable_notify_action, overrides_soul, no_soul_parse diff --git a/tale/driver.py b/tale/driver.py index a41f1292..c11f40ae 100644 --- a/tale/driver.py +++ b/tale/driver.py @@ -36,7 +36,7 @@ from .races import playable_races from .errors import StoryCompleted from tale.load_character import CharacterLoader, CharacterV2 -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_utils import LlmUtil from tale.web.web_utils import clear_resources, copy_web_resources @@ -933,7 +933,7 @@ def build_location(self, targetLocation: base.Location, zone: Zone, player: play # try to add location, and if it fails, remove exit to it result = dynamic_story.add_location(location, zone=zone.name) if not result: - for exit in exits: # type: Exit + for exit in exits: if exit.name == location.name: exits.remove(exit) for exit in exits: diff --git a/tale/dungeon/dungeon_generator.py b/tale/dungeon/dungeon_generator.py index 4b0a10aa..429a2316 100644 --- a/tale/dungeon/dungeon_generator.py +++ b/tale/dungeon/dungeon_generator.py @@ -5,7 +5,7 @@ from tale.coord import Coord from tale.item_spawner import ItemSpawner from tale.items.basic import Money -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.mob_spawner import MobSpawner from tale.zone import Zone diff --git a/tale/json_story.py b/tale/json_story.py index fe786755..2d3e53b9 100644 --- a/tale/json_story.py +++ b/tale/json_story.py @@ -2,7 +2,7 @@ from tale.day_cycle.day_cycle import DayCycle from tale.day_cycle.llm_day_cycle_listener import LlmDayCycleListener from tale.items import generic -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.player import Player from tale.random_event import RandomEvent from tale.story import GameMode, StoryConfig diff --git a/tale/llm/contexts/BaseContext.py b/tale/llm/contexts/BaseContext.py index c455454c..8f3ba10b 100644 --- a/tale/llm/contexts/BaseContext.py +++ b/tale/llm/contexts/BaseContext.py @@ -1,12 +1,16 @@ - - from abc import ABC, abstractmethod +from typing import Union + +from tale.story import StoryContext class BaseContext(ABC): - def __init__(self, story_context: str) -> None: - self.story_context = story_context + def __init__(self, story_context: Union[StoryContext, str]) -> None: + if isinstance(story_context, StoryContext): + self.story_context = story_context.to_context_with_past() + else: + self.story_context = story_context @abstractmethod def to_prompt_string(self) -> str: diff --git a/tale/llm/llm_ext.py b/tale/llm/dynamic_story.py similarity index 98% rename from tale/llm/llm_ext.py rename to tale/llm/dynamic_story.py index a1061016..e3e2f2d7 100644 --- a/tale/llm/llm_ext.py +++ b/tale/llm/dynamic_story.py @@ -10,7 +10,7 @@ from tale.llm.LivingNpc import LivingNpc from tale.quest import Quest, QuestType from tale.mob_spawner import MobSpawner -from tale.story import StoryBase +from tale.story import StoryBase, StoryContext from tale.zone import Zone import tale.llm.llm_cache as llm_cache @@ -21,6 +21,8 @@ def __init__(self) -> None: self._zones = dict() # type: dict[str, Zone] self._world = WorldInfo() self._catalogue = Catalogue() + if isinstance(self.config.context, str): + self.config.context = StoryContext(self.config.context) def get_zone(self, name: str) -> Zone: """ Find a zone by name.""" diff --git a/tale/llm/llm_cache.py b/tale/llm/llm_cache.py index f420fc59..945cccbb 100644 --- a/tale/llm/llm_cache.py +++ b/tale/llm/llm_cache.py @@ -21,7 +21,7 @@ def cache_event(event: str, event_hash: int = -1) -> int: event_cache[event_hash] = event return event_hash -def get_events(event_hashes: [int]) -> str: +def get_events(event_hashes: list[int]) -> str: """ Gets events from the cache. """ return "".join([event_cache.get(event_hash, '') for event_hash in event_hashes]) @@ -37,7 +37,7 @@ def cache_look(look: str, look_hash: int = -1) -> int: look_cache[look_hash] = look return look_hash -def get_looks(look_hashes: [int]) -> str: +def get_looks(look_hashes: list[int]) -> str: """ Gets an event from the cache. """ return ", ".join([look_cache.get(look_hash, '') for look_hash in look_hashes]) diff --git a/tale/llm/llm_utils.py b/tale/llm/llm_utils.py index 92cb11e2..5a970886 100644 --- a/tale/llm/llm_utils.py +++ b/tale/llm/llm_utils.py @@ -14,7 +14,7 @@ from tale.llm.contexts.EvokeContext import EvokeContext from tale.llm.contexts.FollowContext import FollowContext from tale.llm.contexts.WorldGenerationContext import WorldGenerationContext -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_io import IoUtil from tale.llm.contexts.DialogueContext import DialogueContext from tale.llm.quest_building import QuestBuilding diff --git a/tale/llm/story_building.py b/tale/llm/story_building.py index b76a6f49..a9632e18 100644 --- a/tale/llm/story_building.py +++ b/tale/llm/story_building.py @@ -11,6 +11,7 @@ def __init__(self, io_util: IoUtil, default_body: dict, backend: str = 'kobold_c self.io_util = io_util self.default_body = default_body self.story_background_prompt = llm_config.params['STORY_BACKGROUND_PROMPT'] # Type: str + self.advance_story_prompt = llm_config.params['ADVANCE_STORY_PROMPT'] # Type: str def generate_story_background(self, world_mood: int, world_info: str, story_type: str): prompt = self.story_background_prompt.format( @@ -19,4 +20,7 @@ def generate_story_background(self, world_mood: int, world_info: str, story_type world_info=world_info) request_body = self.default_body return self.io_util.synchronous_request(request_body, prompt=prompt) + + def advance_story_section(self): + prompt = self.advance_story_prompt.format() \ No newline at end of file diff --git a/tale/llm/world_building.py b/tale/llm/world_building.py index 21c00518..8257d6a8 100644 --- a/tale/llm/world_building.py +++ b/tale/llm/world_building.py @@ -10,7 +10,7 @@ from tale.llm import llm_config from tale.llm.contexts.DungeonLocationsContext import DungeonLocationsContext from tale.llm.contexts.WorldGenerationContext import WorldGenerationContext -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_io import IoUtil from tale.llm.requests.generate_zone import GenerateZone from tale.llm.requests.start_location import StartLocation diff --git a/tale/parse_utils.py b/tale/parse_utils.py index f165a994..c07b10f2 100644 --- a/tale/parse_utils.py +++ b/tale/parse_utils.py @@ -8,19 +8,16 @@ from tale.equip_npcs import equip_npc from tale.item_spawner import ItemSpawner from tale.items import generic -from tale.items.basic import Boxlike, Drink, Food, Health, Money, Note from tale.llm.LivingNpc import LivingNpc from tale.load_items import load_item from tale.skills.magic import MagicType from tale.npc_defs import StationaryMob, StationaryNpc, Trader from tale.races import BodyType, UnarmedAttack from tale.mob_spawner import MobSpawner -from tale.story import GameMode, MoneyType, TickMethod, StoryConfig -from tale.skills.weapon_type import WeaponSkills, WeaponType -from tale.wearable import WearLocation +from tale.story import GameMode, MoneyType, StoryContext, TickMethod, StoryConfig +from tale.skills.weapon_type import WeaponType import json import re -import sys import os @@ -236,7 +233,11 @@ def load_story_config(json_file: dict): config.server_mode = GameMode[json_file['server_mode']] config.npcs = json_file.get('npcs', '') config.items = json_file.get('items', '') - config.context = json_file.get('context', '') + context = json_file.get('context', '') + if isinstance(context, dict): + config.context = StoryContext().from_json(context) + else: + config.context = context config.type = json_file.get('type', '') config.world_info = json_file.get('world_info', '') config.world_mood = json_file.get('world_mood', config.world_mood) @@ -275,7 +276,7 @@ def save_story_config(config: StoryConfig) -> dict: json_file['type'] = config.type json_file['world_info'] = config.world_info json_file['world_mood'] = config.world_mood - json_file['context'] = config.context + json_file['context'] = config.context if isinstance(config.context, str) else config.context.to_json() json_file['custom_resources'] = config.custom_resources json_file['image_gen'] = config.image_gen json_file['epoch'] = 0 # TODO: fix later @@ -646,7 +647,7 @@ def load_stats(json_stats: dict) -> Stats: stats.skills[WeaponType(int_skill)] = json return stats -def save_items(items: List[Item]) -> []: +def save_items(items: List[Item]) -> list[dict]: json_items = [] for item in items: json_item = item.to_dict() diff --git a/tale/random_event.py b/tale/random_event.py index e3875d04..8e979d9f 100644 --- a/tale/random_event.py +++ b/tale/random_event.py @@ -3,7 +3,7 @@ import random from tale import _MudContext from tale.driver import Driver -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.player import PlayerConnection from tale.util import call_periodically diff --git a/tale/story.py b/tale/story.py index 2ccac42b..05bee55d 100644 --- a/tale/story.py +++ b/tale/story.py @@ -7,9 +7,10 @@ import datetime import enum -from typing import Optional, Any, List, Set, Generator +from typing import Optional, Any, List, Set, Generator, Union from packaging.version import Version + from . import __version__ as tale_version_str from .errors import StoryConfigError @@ -72,7 +73,7 @@ def __init__(self) -> None: self.server_mode = GameMode.IF # the actual game mode the server is operating in (will be set at startup time) self.items = "" # items to populate the world with. only used by json loading self.npcs = "" # npcs to populate the world with. only used by json loading - self.context = "" # context to giving background for the story. + self.context = "" # type: Union[str, StoryContext] # context to giving background for the story. self.type = "" # brief description of the setting and type of story, for LLM context self.world_info = "" # brief description of the world, for LLM context self.world_mood = 0 # how safe is the world? 5 is a happy place, -5 is nightmare mode. @@ -164,3 +165,30 @@ def _verify(self, driver) -> None: tale_version_required = Version(self.config.requires_tale) if tale_version < tale_version_required: raise StoryConfigError("This game requires tale " + self.config.requires_tale + ", but " + tale_version_str + " is installed.") + + +class StoryContext: + + def __init__(self, base_story: str = "") -> None: + self.base_story = base_story + self.current_section = "" + self.past_sections = [] + + def set_current_section(self, section: str) -> None: + self.past_sections.append(self.current_section) + self.current_section = section + + def to_context(self) -> str: + return f" Base plot: {self.base_story}; Active section: {self.current_section}" + + def to_context_with_past(self) -> str: + return f" Base plot: {self.base_story}; Past: {' '.join(self.past_sections) if self.past_sections else 'This is the beginning of the story'}; Active section:{self.current_section}" + + def from_json(self, data: dict) -> 'StoryContext': + self.base_story = data.get("base_story", "") + self.current_section = data.get("current_section", "") + self.past_sections = data.get("past_sections", []) + return self + + def to_json(self) -> dict: + return {"base_story": self.base_story, "current_section": self.current_section, "past_sections": self.past_sections} \ No newline at end of file diff --git a/tale/story_builder.py b/tale/story_builder.py index 560216c4..e80ff531 100644 --- a/tale/story_builder.py +++ b/tale/story_builder.py @@ -4,7 +4,7 @@ from tale import lang, load_items from tale.base import Location -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_utils import LlmUtil from tale.player import PlayerConnection from tale.quest import Quest, QuestType diff --git a/tale/tio/console_http_io.py b/tale/tio/console_http_io.py new file mode 100644 index 00000000..6c3ce4ad --- /dev/null +++ b/tale/tio/console_http_io.py @@ -0,0 +1,189 @@ +""" +Console-based input/output. + +'Tale' mud driver, mudlib and interactive fiction framework +Copyright by Irmen de Jong (irmen@razorvine.net) +""" +from http.server import BaseHTTPRequestHandler, HTTPServer +import os +import signal +import sys +import threading +from typing import Sequence, Tuple, Any, Optional, List + +from tale import lang +try: + import prompt_toolkit + from prompt_toolkit.contrib.completers import WordCompleter +except ImportError: + prompt_toolkit = None + +from . import colorama_patched as colorama +from . import styleaware_wrapper, iobase +from ..driver import Driver +from ..player import PlayerConnection, Player +from .. import mud_context + + +colorama.init() +assert type(colorama.Style.DIM) is str, "Incompatible colorama library installed. Please upgrade to a more recent version (0.3.6+)" + +__all__ = ["ConsoleIo"] + +style_words = { + "dim": colorama.Style.DIM, + "normal": colorama.Style.NORMAL, + "bright": colorama.Style.BRIGHT, + "ul": colorama.Style.UNDERLINED, + "it": colorama.Style.ITALIC, + "rev": colorama.Style.REVERSEVID, + "/": colorama.Style.RESET_ALL, + "location": colorama.Style.BRIGHT, + "clear": "\033[1;1H\033[2J", # ansi sequence to clear the console screen + "monospaced": "", # we assume the console is already monospaced font + "/monospaced": "" +} +assert len(set(style_words.keys()) ^ iobase.ALL_STYLE_TAGS) == 0, "mismatch in list of style tags" + +if os.name == "nt": + if not hasattr(colorama, "win32") or colorama.win32.windll is None: + style_words.clear() # running on windows without colorama ansi support + +class Server(BaseHTTPRequestHandler): + + def setMessage(self, message: str): + self.message = message + + def _set_headers(self): + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + + def do_HEAD(self): + self._set_headers() + + # GET sends back a Hello world message + def do_GET(self): + print('GET') + self._set_headers() + + global response_message + + with open(response_message) as data_file: + self.wfile.write(data_file.read().encode()) + + # POST echoes the message adding a JSON field + def do_POST(self): + print('POST') + # read the message and convert it into a python dictionary + length = int(self.headers.get('content-length', "0")) + message = json.loads(self.rfile.read(length)) + print(self.headers) + print(message) + # add a property to the object, just to mess with data + message['received'] = 'ok' + + global response_message + # send the message back + self._set_headers() + with open(response_message) as data_file: + self.wfile.write(data_file.read().encode()) + +class ConsoleHttpIo(iobase.IoAdapterBase): + """ + I/O adapter for the text-console (standard input/standard output). + """ + def __init__(self, player_connection: PlayerConnection, port: int) -> None: + super().__init__(player_connection) + try: + # try to output a unicode character such as smartypants uses for nicer formatting + encoding = getattr(sys.stdout, "encoding", sys.getfilesystemencoding()) + chr(8230).encode(encoding) + except (UnicodeEncodeError, TypeError): + self.supports_smartquotes = False + self.stop_main_loop = False + self.input_not_paused = threading.Event() + self.input_not_paused.set() + server_address = ('', port) + httpd = HTTPServer(server_address, Server) + print('Starting httpd on port %d...' % port) + httpd.serve_forever() + + def __repr__(self): + return "" % (id(self), os.getpid()) + + def singleplayer_mainloop(self, player_connection: PlayerConnection) -> None: + while not self.stop_main_loop: + try: + pass + except KeyboardInterrupt: + print("* break - stopping server loop") + if lang.yesno(input("Are you sure you want to exit the Tale driver, and kill the game? ")): + break + print("Game shutting down.") + + def render_output(self, paragraphs: Sequence[Tuple[str, bool]], **params: Any) -> str: + """ + Render (format) the given paragraphs to a text representation. + It doesn't output anything to the screen yet; it just returns the text string. + Any style-tags are still embedded in the text. + This console-implementation expects 2 extra parameters: "indent" and "width". + """ + if not paragraphs: + return "" + indent = " " * params["indent"] + wrapper = styleaware_wrapper.StyleTagsAwareTextWrapper(width=params["width"], fix_sentence_endings=True, + initial_indent=indent, subsequent_indent=indent) + output = [] + for txt, formatted in paragraphs: + if formatted: + txt = wrapper.fill(txt) + "\n" + else: + # unformatted output, prepend every line with the indent but otherwise leave them alone + txt = indent + ("\n" + indent).join(txt.splitlines()) + "\n" + assert txt.endswith("\n") + output.append(txt) + return self.smartquotes("".join(output)) + + def output(self, *lines: str) -> None: + """Write some text to the screen. Takes care of style tags that are embedded.""" + super().output(*lines) + for line in lines: + print(self._apply_style(line, self.do_styles)) + sys.stdout.flush() + + def output_no_newline(self, text: str, new_paragraph = True) -> None: + """Like output, but just writes a single line, without end-of-line.""" + if prompt_toolkit and self.do_prompt_toolkit: + self.output(text) + else: + super().output_no_newline(text, new_paragraph) + print(self._apply_style(text, self.do_styles), end="") + sys.stdout.flush() + + def write_input_prompt(self) -> None: + """write the input prompt '>>'""" + if not prompt_toolkit or not self.do_prompt_toolkit: + print(self._apply_style("\n>> ", self.do_styles), end="") + sys.stdout.flush() + + def _apply_style(self, line: str, do_styles: bool) -> str: + """Convert style tags to ansi escape sequences suitable for console text output""" + if "<" not in line: + return line + elif style_words and do_styles: + for tag, replacement in style_words.items(): + line = line.replace("<%s>" % tag, replacement) + return line + else: + return iobase.strip_text_styles(line) # type: ignore + + +if __name__ == "__main__": + def _main(): + co = ConsoleIo(PlayerConnection()) + lines = ["test Tale Console output", "'singlequotes'", '"double quotes"', "ellipsis...", "BRIGHT REVERSE NORMAL"] + paragraphs = [(text, False) for text in lines] + output = co.render_output(paragraphs, indent=2, width=50) + co.output(output) + _main() diff --git a/tale/tio/if_browser_io_text.py b/tale/tio/if_browser_io_text.py new file mode 100644 index 00000000..bd35956c --- /dev/null +++ b/tale/tio/if_browser_io_text.py @@ -0,0 +1,561 @@ +""" +Webbrowser based I/O for a single player ('if') story. + +'Tale' mud driver, mudlib and interactive fiction framework +Copyright by Irmen de Jong (irmen@razorvine.net) +""" +import json +import time +import socket +from socketserver import ThreadingMixIn +from email.utils import formatdate, parsedate +from hashlib import md5 +from html import escape as html_escape +from threading import Lock, Event +from typing import Iterable, Sequence, Tuple, Any, Optional, Dict, Callable, List +from urllib.parse import parse_qs +from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer + +from tale.web.web_utils import create_chat_container, dialogue_splitter + +from . import iobase +from .. import mud_context, vfs, lang +from .styleaware_wrapper import tag_split_re +from .. import __version__ as tale_version_str +from ..driver import Driver +from ..player import PlayerConnection + +__all__ = ["HttpIo", "TaleWsgiApp", "TaleWsgiAppBase", "WsgiStartResponseType"] + +WsgiStartResponseType = Callable[..., None] + + +style_tags_html = { + "": ("", ""), + "": ("", ""), + "": ("", ""), + "
    ": ("", ""), + "": ("", ""), + "": ("", ""), + "": None, + "": None, + "": ("", ""), + "": ("", "") +} + + +def squash_parameters(parameters: Dict[str, Any]) -> Dict[str, Any]: + """ + Makes a cgi-parsed parameter dictionary into a dict where the values that + are just a list of a single value, are converted to just that single value. + """ + for key, value in parameters.items(): + if isinstance(value, (list, tuple)) and len(value) == 1: + parameters[key] = value[0] + return parameters + + +class HttpIo(iobase.IoAdapterBase): + """ + I/O adapter for a http/browser based interface. + This doubles as a wsgi app and runs as a web server using wsgiref. + This way it is a simple call for the driver, it starts everything that is needed. + """ + def __init__(self, player_connection: PlayerConnection, wsgi_server: WSGIServer) -> None: + super().__init__(player_connection) + self.wsgi_server = wsgi_server + self.__html_to_browser = [] # type: List[str] # the lines that need to be displayed in the player's browser + self.__html_special = [] # type: List[str] # special out of band commands (such as 'clear') + self.__html_to_browser_lock = Lock() + self.__new_html_available = Event() + self.__data_to_browser = [] + + def destroy(self) -> None: + self.__new_html_available.set() + + def append_html_to_browser(self, text: str) -> None: + with self.__html_to_browser_lock: + self.__html_to_browser.append(text) + self.__new_html_available.set() + + def append_html_special(self, text: str) -> None: + with self.__html_to_browser_lock: + self.__html_special.append(text) + self.__new_html_available.set() + + def append_data_to_browser(self, data: str) -> None: + with self.__html_to_browser_lock: + self.__data_to_browser.append(data) + self.__new_html_available.set() + + def get_html_to_browser(self) -> List[str]: + with self.__html_to_browser_lock: + html, self.__html_to_browser = self.__html_to_browser, [] + return html + + def get_html_special(self) -> List[str]: + with self.__html_to_browser_lock: + special, self.__html_special = self.__html_special, [] + return special + + def get_data_to_browser(self) -> List[str]: + with self.__html_to_browser_lock: + data, self.__data_to_browser = self.__data_to_browser, [] + return data + + def wait_html_available(self, timeout: float=None) -> None: + self.__new_html_available.wait(timeout=timeout) + self.__new_html_available.clear() + + def singleplayer_mainloop(self, player_connection: PlayerConnection) -> None: + """mainloop for the web browser interface for single player mode""" + import webbrowser + from threading import Thread + protocol = "https" if self.wsgi_server.use_ssl else "http" + + if self.wsgi_server.address_family == socket.AF_INET6: + hostname, port, _, _ = self.wsgi_server.server_address + if hostname[0] != '[': + hostname = '[' + hostname + ']' + url = "%s://%s:%d/tale/" % (protocol, hostname, port) + print("Access the game on this web server url (ipv6): ", url, end="\n\n") + else: + hostname, port = self.wsgi_server.server_address + if hostname.startswith("127.0"): + hostname = "localhost" + url = "%s://%s:%d/tale/" % (protocol, hostname, port) + print("Access the game on this web server url (ipv4): ", url, end="\n\n") + t = Thread(target=webbrowser.open, args=(url, )) # type: ignore + t.daemon = True + t.start() + while not self.stop_main_loop: + try: + self.wsgi_server.handle_request() + except KeyboardInterrupt: + print("* break - stopping server loop") + if lang.yesno(input("Are you sure you want to exit the Tale driver, and kill the game? ")): + break + print("Game shutting down.") + + def pause(self, unpause: bool=False) -> None: + pass + + def clear_screen(self) -> None: + self.append_html_special("clear") + + def render_output(self, paragraphs: Sequence[Tuple[str, bool]], **params: Any) -> str: + if not paragraphs: + return "" + with self.__html_to_browser_lock: + for text, formatted in paragraphs: + # text = self.convert_to_html(text) + if text == "\n": + text = "
    " + if dialogue_splitter in text: + text = create_chat_container(text) + self.__html_to_browser.append("

    " + text + "

    \n") + elif formatted: + self.__html_to_browser.append("

    " + text + "

    \n") + else: + self.__html_to_browser.append("
    " + text + "
    \n") + self.__new_html_available.set() + return "" # the output is pushed to the browser via a buffer, rather than printed to a screen + + def output(self, *lines: str) -> None: + super().output(*lines) + with self.__html_to_browser_lock: + for line in lines: + self.output_no_newline(line) + self.__new_html_available.set() + + def output_no_newline(self, text: str, new_paragraph = True) -> None: + super().output_no_newline(text, new_paragraph) + if text == "\n": + text = "
    " + if new_paragraph: + self.__html_to_browser.append("

    " + text + "

    \n") + else: + self.__html_to_browser.append(text.replace("\\n", "
    ")) + self.__new_html_available.set() + + def send_data(self, data: str) -> None: + self.append_data_to_browser(data) + + +class TaleWsgiAppBase: + """ + Generic wsgi functionality that is not tied to a particular + single or multiplayer web server. + """ + def __init__(self, driver: Driver) -> None: + self.driver = driver + + def __call__(self, environ: Dict[str, Any], start_response: WsgiStartResponseType) -> Iterable[bytes]: + method = environ.get("REQUEST_METHOD") + path = environ.get('PATH_INFO', '').lstrip('/') + if not path: + return self.wsgi_redirect(start_response, "/tale/") + if path.startswith("tale/"): + if method in ("GET", "POST"): + if method == "POST": + clength = int(environ['CONTENT_LENGTH']) + if clength > 1e6: + raise ValueError('Maximum content length exceeded') + inputstream = environ['wsgi.input'] + qs = inputstream.read(clength).decode("utf-8") + elif method == "GET": + qs = environ.get("QUERY_STRING", "") + parameters = squash_parameters(parse_qs(qs, encoding="UTF-8")) + return self.wsgi_route(environ, path[5:], parameters, start_response) + else: + return self.wsgi_invalid_request(start_response) + return self.wsgi_not_found(start_response) + + def wsgi_route(self, environ: Dict[str, Any], path: str, parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + if not path or path == "start": + return self.wsgi_handle_start(environ, parameters, start_response) + elif path == "about": + return self.wsgi_handle_about(environ, parameters, start_response) + elif path == "story": + return self.wsgi_handle_story(environ, parameters, start_response) + elif path == "tabcomplete": + return self.wsgi_handle_tabcomplete(environ, parameters, start_response) + elif path == "input": + return self.wsgi_handle_input(environ, parameters, start_response) + elif path == "eventsource": + return self.wsgi_handle_eventsource(environ, parameters, start_response) + elif path.startswith("static/"): + return self.wsgi_handle_static(environ, path, start_response) + elif path == "quit": + return self.wsgi_handle_quit(environ, parameters, start_response) + return self.wsgi_not_found(start_response) + + def wsgi_invalid_request(self, start_response: WsgiStartResponseType) -> Iterable[bytes]: + """Called if invalid http method.""" + start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) + return [b'Error 405: Method Not Allowed'] + + def wsgi_not_found(self, start_response: WsgiStartResponseType) -> Iterable[bytes]: + """Called if Url not found.""" + start_response('404 Not Found', [('Content-Type', 'text/plain')]) + return [b'Error 404: Not Found'] + + def wsgi_redirect(self, start_response: Callable, target: str) -> Iterable[bytes]: + """Called to do a redirect""" + start_response('302 Found', [('Location', target)]) + return [] + + def wsgi_redirect_other(self, start_response: Callable, target: str) -> Iterable[bytes]: + """Called to do a redirect see-other""" + start_response('303 See Other', [('Location', target)]) + return [] + + def wsgi_not_modified(self, start_response: WsgiStartResponseType) -> Iterable[bytes]: + """Called to signal that a resource wasn't modified""" + start_response('304 Not Modified', []) + return [] + + def wsgi_internal_server_error(self, start_response: Callable, message: str="") -> Iterable[bytes]: + """Called when an internal server error occurred""" + start_response('500 Internal server error', []) + return [message.encode("utf-8")] + + def wsgi_internal_server_error_json(self, start_response: Callable, message: str="") -> Iterable[bytes]: + """Called when an internal server error occurred, returns json response rather than html""" + start_response('500 Internal server error', [('Content-Type', 'application/json; charset=utf-8')]) + message = '{"error": "%s"}' % message + return [message.encode("utf-8")] + + def wsgi_handle_about(self, environ: Dict[str, Any], parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + raise NotImplementedError("implement this in subclass") # about page + + def wsgi_handle_quit(self, environ: Dict[str, Any], parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + raise NotImplementedError("implement this in subclass") # quit/logged out page + + def wsgi_handle_start(self, environ: Dict[str, Any], parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + # start page / titlepage + headers = [('Content-Type', 'text/html; charset=utf-8')] + resource = vfs.internal_resources["web/index.html"] + etag = self.etag(id(self), time.mktime(self.driver.server_started.timetuple()), resource.mtime, "start") + if_none = environ.get('HTTP_IF_NONE_MATCH') + if if_none and (if_none == '*' or etag in if_none): + return self.wsgi_not_modified(start_response) + headers.append(("ETag", etag)) + start_response("200 OK", headers) + txt = resource.text.format(story_version=self.driver.story.config.version, + story_name=self.driver.story.config.name, + story_author=self.driver.story.config.author, + story_author_email=self.driver.story.config.author_address) + return [txt.encode("utf-8")] + + def wsgi_handle_story(self, environ: Dict[str, Any], parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + headers = [('Content-Type', 'text/html; charset=utf-8')] + resource = vfs.internal_resources["web/story.html"] + etag = self.etag(id(self), time.mktime(self.driver.server_started.timetuple()), 'resource.mtime', "story") + if_none = environ.get('HTTP_IF_NONE_MATCH') + if if_none and (if_none == '*' or etag in if_none): + return self.wsgi_not_modified(start_response) + headers.append(("ETag", etag)) + start_response('200 OK', headers) + txt = '' + txt = resource.text.format(story_version=self.driver.story.config.version, + story_name=self.driver.story.config.name, + story_author=self.driver.story.config.author, + story_author_email=self.driver.story.config.author_address) + txt = self.modify_web_page(environ["wsgi.session"]["player_connection"], txt) + return [txt.encode("utf-8")] + + def wsgi_handle_eventsource(self, environ: Dict[str, Any], parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + session = environ["wsgi.session"] + conn = session.get("player_connection") + if not conn: + return self.wsgi_internal_server_error_json(start_response, "not logged in") + start_response('200 OK', [('Content-Type', 'text/event-stream; charset=utf-8'), + ('Cache-Control', 'no-cache'), + # ('Transfer-Encoding', 'chunked'), not allowed by wsgi + ('X-Accel-Buffering', 'no') # nginx + ]) + yield (":" + ' ' * 2050 + "\n\n").encode("utf-8") # padding for older browsers + while self.driver.is_running(): + if conn.io and conn.player: + conn.io.wait_html_available(timeout=15) # keepalives every 15 sec + if not conn.io or not conn.player: + break + html = conn.io.get_html_to_browser() + special = conn.io.get_html_special() + data = conn.io.get_data_to_browser() + if html or special: + location = conn.player.location # type : Optional[Location] + if conn.io.dont_echo_next_cmd: + special.append("noecho") + npc_names = '' + items = '' + exits = '' + if location: + npc_names = ','.join([l.name for l in location.livings if l.alive and l.visible and l != conn.player]) + items = ','.join([i.name for i in location.items if i.visible]) + exits = ','.join(list(set([e.name for e in location.exits.values() if e.visible]))) + response = { + "text": "\n".join(html), + "special": special, + "turns": conn.player.turns, + "location": location.title if location else "???", + "location_image": location.avatar if location and location.avatar else "", + "npcs": npc_names if location else '', + "items": items if location else '', + "exits": exits if location else '', + } + result = "event: text\nid: {event_id}\ndata: {data}\n\n"\ + .format(event_id=str(time.time()), data=json.dumps(response)) + yield result.encode("utf-8") + elif data: + for d in data: + result = "event: data\nid: {event_id}\ndata: {data}\n\n"\ + .format(event_id=str(time.time()), data=d) + yield result.encode("utf-8") + else: + yield "data: keepalive\n\n".encode("utf-8") + + def wsgi_handle_tabcomplete(self, environ: Dict[str, Any], parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + session = environ["wsgi.session"] + conn = session.get("player_connection") + if not conn: + return self.wsgi_internal_server_error_json(start_response, "not logged in") + start_response('200 OK', [('Content-Type', 'application/json; charset=utf-8'), + ('Cache-Control', 'no-cache, no-store, must-revalidate'), + ('Pragma', 'no-cache'), + ('Expires', '0')]) + return [json.dumps(conn.io.tab_complete(parameters["prefix"], self.driver)).encode("utf-8")] + + def wsgi_handle_input(self, environ: Dict[str, Any], parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + session = environ["wsgi.session"] + conn = session.get("player_connection") + if not conn: + return self.wsgi_internal_server_error_json(start_response, "not logged in") + cmd = parameters.get("cmd", "") + if cmd and "autocomplete" in parameters: + suggestions = conn.io.tab_complete(cmd, self.driver) + if suggestions: + conn.io.append_html_to_browser("

    Suggestions:

    ") + conn.io.append_html_to_browser("

    " + "   ".join(suggestions) + "

    ") + else: + conn.io.append_html_to_browser("

    No matching commands.

    ") + else: + cmd = html_escape(cmd, False) + if cmd: + if conn.io.dont_echo_next_cmd: + conn.io.dont_echo_next_cmd = False + elif conn.io.echo_input: + conn.io.append_html_to_browser("%s" % cmd) + conn.player.store_input_line(cmd) + start_response('200 OK', [('Content-Type', 'text/plain')]) + return [] + + def wsgi_handle_license(self, environ: Dict[str, Any], parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + license = "The author hasn't provided any license information." + if self.driver.story.config.license_file: + license = self.driver.resources[self.driver.story.config.license_file].text + resource = vfs.internal_resources["web/about_license.html"] + headers = [('Content-Type', 'text/html; charset=utf-8')] + etag = self.etag(id(self), time.mktime(self.driver.server_started.timetuple()), resource.mtime, "license") + if_none = environ.get('HTTP_IF_NONE_MATCH') + if if_none and (if_none == '*' or etag in if_none): + return self.wsgi_not_modified(start_response) + headers.append(("ETag", etag)) + start_response("200 OK", headers) + txt = resource.text.format(license=license, + story_version=self.driver.story.config.version, + story_name=self.driver.story.config.name, + story_author=self.driver.story.config.author, + story_author_email=self.driver.story.config.author_address) + return [txt.encode("utf-8")] + + def wsgi_handle_static(self, environ: Dict[str, Any], path: str, start_response: WsgiStartResponseType) -> Iterable[bytes]: + path = path[len("static/"):] + if not self.wsgi_is_asset_allowed(path): + return self.wsgi_not_found(start_response) + try: + return self.wsgi_serve_static("web/" + path, environ, start_response) + except IOError: + return self.wsgi_not_found(start_response) + + def wsgi_is_asset_allowed(self, path: str) -> bool: + return path.endswith(".html") or path.endswith(".js") or path.endswith(".jpg") \ + or path.endswith(".png") or path.endswith(".gif") or path.endswith(".css") or path.endswith(".ico") + + def etag(self, *components: Any) -> str: + return '"' + md5("-".join(str(c) for c in components).encode("ascii")).hexdigest() + '"' + + def wsgi_serve_static(self, path: str, environ: Dict[str, Any], start_response: WsgiStartResponseType) -> Iterable[bytes]: + headers = [] + resource = vfs.internal_resources[path] + if resource.mtime: + mtime_formatted = formatdate(resource.mtime) + etag = self.etag(id(vfs.internal_resources), resource.mtime, path) + if_modified = environ.get('HTTP_IF_MODIFIED_SINCE') + if if_modified: + if parsedate(if_modified) >= parsedate(mtime_formatted): # type: ignore + # the resource wasn't modified since last requested + return self.wsgi_not_modified(start_response) + if_none = environ.get('HTTP_IF_NONE_MATCH') + if if_none and (if_none == '*' or etag in if_none): + return self.wsgi_not_modified(start_response) + headers.append(("ETag", etag)) + headers.append(("Last-Modified", formatdate(resource.mtime))) + if resource.is_text: + # text + headers.append(('Content-Type', resource.mimetype + "; charset=utf-8")) + data = resource.text.encode("utf-8") + else: + # binary + headers.append(('Content-Type', resource.mimetype)) + data = resource.data + start_response('200 OK', headers) + return [data] + + def modify_web_page(self, player_connection: PlayerConnection, html_content: str) -> None: + """Modify the html before it is sent to the browser.""" + if not "wizard" in player_connection.player.privileges: + html_content = html_content.replace('', '') + html_content = html_content.replace('', '') + return html_content + + + +class TaleWsgiApp(TaleWsgiAppBase): + """ + The actual wsgi app that the player's browser connects to. + Note that it is deliberatly simplistic and ony able to handle a single + player connection; it only works for 'if' single-player game mode. + """ + def __init__(self, driver: Driver, player_connection: PlayerConnection, + use_ssl: bool, ssl_certs: Tuple[str, str, str]) -> None: + super().__init__(driver) + self.completer = None + self.player_connection = player_connection # just a single player here + CustomWsgiServer.use_ssl = use_ssl + if use_ssl and ssl_certs: + CustomWsgiServer.ssl_cert_locations = ssl_certs + + @classmethod + def create_app_server(cls, driver: Driver, player_connection: PlayerConnection, *, + use_ssl: bool=False, ssl_certs: Tuple[str, str, str]=None) -> Callable: + wsgi_app = SessionMiddleware(cls(driver, player_connection, use_ssl, ssl_certs)) # type: ignore + wsgi_server = make_server(driver.story.config.mud_host, driver.story.config.mud_port, app=wsgi_app, + handler_class=CustomRequestHandler, server_class=CustomWsgiServer) + wsgi_server.timeout = 0.5 + return wsgi_server + + def wsgi_handle_quit(self, environ: Dict[str, Any], parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + # Quit/logged out page. For single player, simply close down the whole driver. + start_response('200 OK', [('Content-Type', 'text/html')]) + self.driver._stop_driver() + return [b"

    Tale game session ended.

    " + b"

    You may close this window/tab.

    "] + + def wsgi_handle_about(self, environ: Dict[str, Any], parameters: Dict[str, str], + start_response: WsgiStartResponseType) -> Iterable[bytes]: + # about page + if "license" in parameters: + return self.wsgi_handle_license(environ, parameters, start_response) + start_response("200 OK", [('Content-Type', 'text/html; charset=utf-8')]) + resource = vfs.internal_resources["web/about.html"] + txt = resource.text.format(tale_version=tale_version_str, + story_version=self.driver.story.config.version, + story_name=self.driver.story.config.name, + uptime="%d:%02d:%02d" % self.driver.uptime, + starttime=self.driver.server_started) + return [txt.encode("utf-8")] + + +class CustomRequestHandler(WSGIRequestHandler): + def log_message(self, format: str, *args: Any): + pass + + +class CustomWsgiServer(ThreadingMixIn, WSGIServer): + """ + A simple wsgi server with a modest request queue size, meant for single user access. + Set use_ssl to True to enable HTTPS mode instead of unencrypted HTTP. + """ + request_queue_size = 10 + use_ssl = False + ssl_cert_locations = ("./certs/localhost_cert.pem", "./certs/localhost_key.pem", "") # certfile, keyfile, certpassword + + def __init__(self, server_address, rh_class): + self.address_family = socket.AF_INET + if server_address[0][0] == '[' and server_address[0][-1] == ']': + self.address_family = socket.AF_INET6 + server_address = (server_address[0][1:-1], server_address[1], 0, 0) + super().__init__(server_address, rh_class) + + def server_bind(self): + if self.use_ssl: + print("\n\nUsing SSL\n\n") + import ssl + ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + ctx.load_cert_chain(self.ssl_cert_locations[0], self.ssl_cert_locations[1] or None, self.ssl_cert_locations[2] or None) + self.socket = ctx.wrap_socket(self.socket, server_side=True) + return super().server_bind() + + +class SessionMiddleware: + def __init__(self, app): + self.app = app + + def __call__(self, environ: Dict[str, Any], start_response: WsgiStartResponseType) -> None: + environ["wsgi.session"] = { + "id": None, + "player_connection": self.app.player_connection + } + return self.app(environ, start_response) diff --git a/tale/web/resources/test.jpg b/tale/web/resources/test.jpg deleted file mode 100644 index 35af9819df85a8e820e8a4ddf78e023dbb18aec1..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1419 zcmex=^(PF6}rMnOeST|r4lSw=>~TvNxu(8R<c1}I=;VrF4wW9Q)H;sz?% zD!{d!pzFb!U9xX3zTPI5o8roG<0MW4oqZMDikqloVbuf*=gfJ(V&YTRE(2~ znmD<{#3dx9RMpfqG__1j&CD$#!>#7W>CJDqJWWh_yXCEd!b$t7QCojJelU9h z%LY%rs&_qw@y1IQ|JI85v!T9%@4eUZ2&bOVciHCImGcub_6ygju0E$*a&xoci`fom zODEjfbIPM-{j%&Q`K^8{AMe&ZY-zE6qd?8#t_N$y-~OFbw!zEp)%2ppXSLp*iOuf( zIKQt(bh|3n!jF~HSHAw$cK@jtsxP`)AN4Cq^Ks_Xx{H$e4u@y`tbY7z!;h(I zF$QKAC-&W~Sa-*|=*o%Dk-BPKtTV53+|`bb`_Q-k;f|7! z6-Q3rem$!9bj&?}mCxqk#gju{EPHZmBk%Sdfv%!T`VK7zYit?p*9NZZJMO-Fdf1g5 zrm3}SC)FCyI+UFm7S^9EwD0N^b-A>4CFl7=os72iy=d!tQ^UFJhuODh&;JSYdR1#& z;a73tKU4dGw`7*;z3GVyv#wqHCKwxKSe$RN%DnZt{CrW1u2(%G+f7e@Fg|u=it=); zolir4Bu;yj+Qo6tQ{rKu$Rz&_BA2gx?0UaX_GNKo#p}b@IYMWJ^Ecl}c>ZOvaAe?8 zt+|uyi!5IRmZqQT6RNIUo0gHMRIxNpQYC1O-GVEnS8F_XSKsfLb?1D~-z5u}FFG*) n49ja?^q(Qe(;{7CquQIke^*0R6<_YTA2R=5+T%=<`u{fpStx9? diff --git a/tale/web/web_utils.py b/tale/web/web_utils.py index b47b4cc5..631f7509 100644 --- a/tale/web/web_utils.py +++ b/tale/web/web_utils.py @@ -71,7 +71,7 @@ def _find_image(image_name: str) -> str: def copy_web_resources(gamepath: str): # copy the resources folder to the resources folder in the web folder - shutil.copytree(os.path.join(gamepath, "resources"), os.path.join(web_resources_path, resource_folder), dirs_exist_ok=True) + shutil.copytree(os.path.join(gamepath, resource_folder), os.path.join(web_resources_path, resource_folder), dirs_exist_ok=True) def clear_resources(): resource_path = os.path.join(web_resources_path, resource_folder) @@ -84,8 +84,13 @@ def clear_resources(): if os.path.isfile(item_path): os.remove(item_path) -def copy_single_image(gamepath: str, image_name: str): - shutil.copy(os.path.join(gamepath, "resources", image_name), os.path.join(web_resources_path, resource_folder)) +def copy_single_image(gamepath: str, image_name: str) -> str: + from_path = os.path.join(gamepath, resource_folder, image_name) + if not os.path.exists(from_path): + return + to_path = os.path.join(web_resources_path, resource_folder) + return shutil.copy(from_path, to_path) def _check_file_exists(filename: str) -> bool: - return os.path.exists(os.path.join(web_resources_path, "resources", filename)) \ No newline at end of file + check_path = os.path.join(web_resources_path, resource_folder, filename) + return os.path.exists(check_path) \ No newline at end of file diff --git a/tale/zone.py b/tale/zone.py index 48488d53..ecc6319d 100644 --- a/tale/zone.py +++ b/tale/zone.py @@ -1,4 +1,3 @@ -import random from tale.base import Location from tale.coord import Coord @@ -17,6 +16,7 @@ def __init__(self, name: str, description: str = '') -> None: self.neighbors = dict() # type: dict[str, Zone] # north, east, south or west self.center = Coord(0,0,0) self.name = name + self.lore = "" def add_location(self, location: Location) -> bool: """ Add a location to the zone. Skip if location already exists.""" @@ -42,7 +42,8 @@ def get_info(self) -> dict: "races":self.races, "items":self.items, "size":self.size, - "center":self.center.as_tuple() + "center":self.center.as_tuple(), + "lore":self.lore, } def get_neighbor(self, direction: str) -> 'Zone': @@ -86,4 +87,5 @@ def from_json(data: dict) -> 'Zone': if data.get("center", None) is not None: center = data.get("center") zone.center = Coord(center[0], center[1], center[2]) + zone.lore = data.get("lore", "") return zone diff --git a/tests/files/test_cache.json b/tests/files/test_cache.json index c9e3a8c4..d64fc0be 100644 --- a/tests/files/test_cache.json +++ b/tests/files/test_cache.json @@ -1,11 +1,12 @@ { "events": { - "-8722945376858612228": "test event" + "-8722945376858612228": "test event", + "249076626384170890539394815371839499036": "test event", + "249076626384170890539394815371839499036": "test event" }, "looks": { - "5170880201559623883": "test look" - }, - "tells": { - "-3872932153149350210": "test tell" + "5170880201559623883": "test look", + "59748369602019872689181092956633477846": "test look", + "59748369602019872689181092956633477846": "test look" } } \ No newline at end of file diff --git a/tests/test_llm_ext.py b/tests/test_llm_ext.py index 3ba8f320..12693ac8 100644 --- a/tests/test_llm_ext.py +++ b/tests/test_llm_ext.py @@ -10,7 +10,7 @@ from tale.coord import Coord from tale.llm.LivingNpc import LivingNpc from tale.llm.item_handling_result import ItemHandlingResult -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_io import IoUtil from tale.llm.llm_utils import LlmUtil from tale.player import Player diff --git a/tests/test_mudobjects.py b/tests/test_mudobjects.py index f3d91bad..67b031fd 100644 --- a/tests/test_mudobjects.py +++ b/tests/test_mudobjects.py @@ -12,7 +12,7 @@ from tale.base import Location, Exit, Item, MudObject, Living, _limbo, Container, Weapon, Door, Key, ParseResult, MudObjRegistry from tale.demo.story import Story as DemoStory from tale.errors import ActionRefused, LocationIntegrityError, UnknownVerbException, TaleError -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.player import Player from tale.story import MoneyType, StoryBase from tale.shop import Shopkeeper diff --git a/tests/test_normal_commands.py b/tests/test_normal_commands.py index a63a84f4..e8f0fd0e 100644 --- a/tests/test_normal_commands.py +++ b/tests/test_normal_commands.py @@ -7,7 +7,7 @@ from tale.cmds import normal from tale.errors import ActionRefused, ParseError from tale.llm.LivingNpc import LivingNpc -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_utils import LlmUtil from tale.player import Player from tale.skills.skills import SkillType diff --git a/tests/test_parse_utils.py b/tests/test_parse_utils.py index a9ed7606..48195ff2 100644 --- a/tests/test_parse_utils.py +++ b/tests/test_parse_utils.py @@ -10,7 +10,7 @@ from tale.mob_spawner import MobSpawner from tale.npc_defs import Trader from tale.races import BodyType -from tale.story import GameMode, MoneyType +from tale.story import GameMode, MoneyType, StoryContext from tale.skills.weapon_type import WeaponType from tale.wearable import WearLocation from tale.zone import Zone @@ -46,6 +46,13 @@ def test_load_story_config(self): assert(config.money_type == MoneyType.NOTHING) assert(config.supported_modes == {GameMode.IF}) assert(config.zones == ["test zone"]) + + config.context = StoryContext().from_json({'base_story': 'Base context', 'current_context': 'Current context'}) + + stored_config = parse_utils.save_story_config(config) + + new_config = parse_utils.load_story_config(stored_config) + assert(isinstance(new_config.context, StoryContext)) def test_connect_location_to_exit(self): """ This simulates a room having been generated before""" diff --git a/tests/test_quests.py b/tests/test_quests.py index 714729f5..9b89ba32 100644 --- a/tests/test_quests.py +++ b/tests/test_quests.py @@ -2,7 +2,7 @@ from tale.base import Item, ParseResult from tale.driver import Driver from tale.llm.LivingNpc import LivingNpc -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_utils import LlmUtil from tale.quest import Quest, QuestStatus, QuestType from tale.story import StoryConfig diff --git a/tests/test_spells.py b/tests/test_spells.py index dde09566..5f73150e 100644 --- a/tests/test_spells.py +++ b/tests/test_spells.py @@ -4,7 +4,7 @@ from tale.cmds import spells from tale.errors import ActionRefused from tale.llm.LivingNpc import LivingNpc -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_utils import LlmUtil from tale.skills.magic import MagicType from tale.player import Player diff --git a/tests/test_story_builder.py b/tests/test_story_builder.py index 62601162..d8b63269 100644 --- a/tests/test_story_builder.py +++ b/tests/test_story_builder.py @@ -4,7 +4,7 @@ import pytest from tale import util, mud_context from tale.driver_if import IFDriver -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_utils import LlmUtil from tale.player import PlayerConnection from tale.story import StoryConfig diff --git a/tests/test_web_utils.py b/tests/test_web_utils.py index 4a5bd15c..f499f477 100644 --- a/tests/test_web_utils.py +++ b/tests/test_web_utils.py @@ -5,7 +5,6 @@ class TestWebUtils(): - def test_create_chat_container(self): result = create_chat_container("Bloated Murklin <:> Hello World!") @@ -16,4 +15,5 @@ def test_create_chat_container(self): def test_copy_single_image(self): web_utils.web_resources_path = "tale/web" copy_single_image("./tests/files", "test.jpg") - assert _check_file_exists("test.jpg") \ No newline at end of file + assert _check_file_exists("test.jpg") + web_utils.clear_resources() \ No newline at end of file diff --git a/tests/test_wizard_commands.py b/tests/test_wizard_commands.py index a37df8d4..98f3f11c 100644 --- a/tests/test_wizard_commands.py +++ b/tests/test_wizard_commands.py @@ -8,7 +8,7 @@ from tale.errors import ParseError from tale.items.basic import Food from tale.llm.LivingNpc import LivingNpc -from tale.llm.llm_ext import DynamicStory +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_utils import LlmUtil from tale.player import Player from tale.story import StoryConfig diff --git a/tests/test_zone.py b/tests/test_zone.py index b67eb796..fb737e17 100644 --- a/tests/test_zone.py +++ b/tests/test_zone.py @@ -52,7 +52,8 @@ def test_get_info(self): "items":['sword', 'shield', 'armor'], "center":(0, 0, 0), "level":2, - "size":5} + "size":5, + "lore":''} def test_get_neighbor(self): zone = Zone('test') From 684854864a957997e80c4acc755ababc8e1fd005 Mon Sep 17 00:00:00 2001 From: rickard Date: Sat, 19 Oct 2024 18:21:45 +0200 Subject: [PATCH 2/6] dynamic story context --- llm_config.yaml | 2 +- tale/driver.py | 5 +- tale/llm/contexts/AdvanceStoryContext.py | 12 +++ tale/llm/llm_utils.py | 4 + tale/llm/story_building.py | 9 ++- tale/story.py | 21 ++++- tale/story_builder.py | 6 +- tests/test_dynamic_story.py | 79 ++++++++++++++++++ tests/{test_llm_ext.py => test_living_npc.py} | 80 +------------------ tests/test_llm_utils.py | 12 ++- tests/test_story.py | 71 ++++++++++++++++ 11 files changed, 213 insertions(+), 88 deletions(-) create mode 100644 tale/llm/contexts/AdvanceStoryContext.py create mode 100644 tests/test_dynamic_story.py rename tests/{test_llm_ext.py => test_living_npc.py} (83%) create mode 100644 tests/test_story.py diff --git a/llm_config.yaml b/llm_config.yaml index 299d69a3..3e4646a2 100644 --- a/llm_config.yaml +++ b/llm_config.yaml @@ -46,4 +46,4 @@ REQUEST_FOLLOW_PROMPT: '{context}\n[USER_START]Act as as {cha DAY_CYCLE_EVENT_PROMPT: '{context}\n[USER_START] Write up to two sentences describing the transition from {from_time} to {to_time} in {location_name}, using the information supplied inside the tags.' NARRATIVE_EVENT_PROMPT: '{context}\n[USER_START] Write a narrative event that occurs in {location_name} using the information supplied inside the tags. The event should be related to the location and the characters present. Use up to 50 words.' RANDOM_SPAWN_PROMPT: '{context}\n[USER_START] An npc or a mob has entered {location_name}. Select either and fill in one of the following templates using the information supplied inside the tags. Respond using JSON in the following format: {npc_template}' -ADVANCE_STORY_PROMPT: '{context}\n[USER_START] Advance the story in {location_name}. Use the information supplied inside the tags to write a paragraph that progresses the story. Use up to 100 words.' \ No newline at end of file +ADVANCE_STORY_PROMPT: '{context}\n[USER_START] Advance the story in {location_name}. Use the information supplied inside the tags to write a paragraph that progresses the story. The story progress is a value between 0 and 10. Consider the pacing of the plot when writing the section. A value below 3 is considered early. An inciting event should happen during this phase. Between 3 and 8 is the middle of the story. A climactic event should happen at 8 or 9. Above 9 is considered to be the ending. Use up to 100 words.' \ No newline at end of file diff --git a/tale/driver.py b/tale/driver.py index c11f40ae..86d67305 100644 --- a/tale/driver.py +++ b/tale/driver.py @@ -31,7 +31,7 @@ from . import __version__ as tale_version_str, _check_required_libraries from . import mud_context, errors, util, cmds, player, pubsub, charbuilder, lang, verbdefs, vfs, base -from .story import TickMethod, GameMode, MoneyType, StoryBase +from .story import StoryContext, TickMethod, GameMode, MoneyType, StoryBase from .tio import DEFAULT_SCREEN_WIDTH from .races import playable_races from .errors import StoryCompleted @@ -535,6 +535,9 @@ def _server_tick(self) -> None: events, idle_time, subbers = topicinfo[topicname] if events == 0 and not subbers and idle_time > 30: pubsub.topic(topicname).destroy() + progress = self.story.increase_progress(0.01) + if progress: + self.llm_util.advance_story_section(self.story) def disconnect_idling(self, conn: player.PlayerConnection) -> None: raise NotImplementedError diff --git a/tale/llm/contexts/AdvanceStoryContext.py b/tale/llm/contexts/AdvanceStoryContext.py new file mode 100644 index 00000000..7792ee59 --- /dev/null +++ b/tale/llm/contexts/AdvanceStoryContext.py @@ -0,0 +1,12 @@ + + +from tale.llm.contexts.BaseContext import BaseContext + + +class AdvanceStoryContext(BaseContext): + + def __init__(self, story_context: str): + super().__init__(story_context) + + def to_prompt_string(self) -> str: + return f"{self.story_context}" \ No newline at end of file diff --git a/tale/llm/llm_utils.py b/tale/llm/llm_utils.py index 5a970886..bb1a5351 100644 --- a/tale/llm/llm_utils.py +++ b/tale/llm/llm_utils.py @@ -342,6 +342,10 @@ def set_story(self, story: DynamicStory): if story.config.image_gen: self._init_image_gen(story.config.image_gen) + def advance_story_section(self, story: DynamicStory): + """ Increase the story progress""" + self._story_building.advance_story_section(story or self.__story) + def _init_image_gen(self, image_gen: str): """ Initialize the image generator""" clazz = getattr(sys.modules['tale.image_gen.' + image_gen.lower()], image_gen) diff --git a/tale/llm/story_building.py b/tale/llm/story_building.py index a9632e18..7540bd8a 100644 --- a/tale/llm/story_building.py +++ b/tale/llm/story_building.py @@ -1,6 +1,8 @@ # This file contains the StoryBuilding class, which is responsible for generating the story background from tale import parse_utils from tale.llm import llm_config +from tale.llm.contexts.AdvanceStoryContext import AdvanceStoryContext +from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_io import IoUtil @@ -21,6 +23,9 @@ def generate_story_background(self, world_mood: int, world_info: str, story_type request_body = self.default_body return self.io_util.synchronous_request(request_body, prompt=prompt) - def advance_story_section(self): - prompt = self.advance_story_prompt.format() + def advance_story_section(self, story: DynamicStory): + story_context = AdvanceStoryContext(story.config.context) + prompt = self.advance_story_prompt.format(context=story_context.to_prompt_string()) + request_body = self.default_body + return self.io_util.synchronous_request(request_body, prompt=prompt) \ No newline at end of file diff --git a/tale/story.py b/tale/story.py index 05bee55d..22fe2de4 100644 --- a/tale/story.py +++ b/tale/story.py @@ -7,10 +7,10 @@ import datetime import enum +import random from typing import Optional, Any, List, Set, Generator, Union from packaging.version import Version - from . import __version__ as tale_version_str from .errors import StoryConfigError @@ -149,6 +149,11 @@ def story_failure(self, player) -> None: player.tell("You have failed to complete the story.") player.tell("\n") + def increase_progress(self, amount: float = 1.0) -> bool: + if isinstance(self.config.context, StoryContext): + return self.config.context.increase_progress(amount) + return False + def _verify(self, driver) -> None: """verify correctness and compatibility of the story configuration""" if not isinstance(self.config, StoryConfig): @@ -173,9 +178,21 @@ def __init__(self, base_story: str = "") -> None: self.base_story = base_story self.current_section = "" self.past_sections = [] + self.progress = 0.0 + self.length = 10.0 + self.speed = 1.0 + + def increase_progress(self, amount: float = 1.0) -> bool: + """ increase the progress by the given amount, return True if the progress has changed past the integer value """ + start_progess = int(self.progress) + self.progress += random.random() * amount * self.speed + if self.progress >= self.length: + self.progress = self.length + return start_progess != int(self.progress) def set_current_section(self, section: str) -> None: - self.past_sections.append(self.current_section) + if self.current_section: + self.past_sections.append(self.current_section) self.current_section = section def to_context(self) -> str: diff --git a/tale/story_builder.py b/tale/story_builder.py index e80ff531..4847d9ed 100644 --- a/tale/story_builder.py +++ b/tale/story_builder.py @@ -8,6 +8,7 @@ from tale.llm.llm_utils import LlmUtil from tale.player import PlayerConnection from tale.quest import Quest, QuestType +from tale.story import StoryContext from tale.zone import Zone @@ -148,10 +149,11 @@ def apply_to_story(self, story: DynamicStory, llm_util: LlmUtil): story.config.world_info = self.story_info.world_info story.config.world_mood = self.story_info.world_mood self.connection.output("Generating story background...") - story.config.context = llm_util.generate_story_background(world_info=story.config.world_info, + background = llm_util.generate_story_background(world_info=story.config.world_info, world_mood=story.config.world_mood, story_type=story.config.type) - + story_context = StoryContext(base_story=background) + story.config.context = story_context items = self.generate_world_items(llm_util, story.config.context, story.config.type, story.config.world_info, story.config.world_mood) for item in items: story._catalogue.add_item(item) diff --git a/tests/test_dynamic_story.py b/tests/test_dynamic_story.py new file mode 100644 index 00000000..adb3a725 --- /dev/null +++ b/tests/test_dynamic_story.py @@ -0,0 +1,79 @@ +from tale.base import Location +from tale.coord import Coord +from tale.llm.dynamic_story import DynamicStory + + +class TestDynamicStory(): + + def test_add_location(self): + story = DynamicStory() + test_location = Location('test') + test_location.world_location = Coord(0,0,0) + story.add_location(test_location) + assert(story._world._locations['test'] == test_location) + assert(story._world._grid[(0,0,0)] == test_location) + + def test_add_location_duplicate(self): + story = DynamicStory() + test_location = Location('test') + test_location.world_location = Coord(0,0,0) + story.add_location(test_location) + assert(story._world._locations['test'] == test_location) + assert(story._world._grid[(0,0,0)] == test_location) + assert(not story.add_location(test_location)) + + def test_add_location_to_zone(self): + story = DynamicStory() + test_location = Location('test') + test_location.world_location = Coord(0,0,0) + test_zone = Zone('zone') + story.add_zone(test_zone) + story.add_location(test_location, 'zone') + assert(story._world._locations['test'] == test_location) + assert(story._world._grid[(0,0,0)] == test_location) + assert(story._zones['zone'].locations['test'] == test_location) + assert(story.get_location(zone='zone', name='test') == test_location) + + def test_find_location_in_zone(self): + story = DynamicStory() + test_location = Location('test') + test_location.world_location = Coord(0,0,0) + test_zone = Zone('zone') + story.add_zone(test_zone) + story.add_location(test_location, 'zone') + assert(story.find_location('test') == test_location) + + def test_neighbors_for_location(self): + story = DynamicStory() + story._locations = dict() + test_location = Location('test') + test_location.world_location = Coord(0,0,0) + north_location = Location('north') + north_location.world_location = Coord(0,1,0) + south_location = Location('south') + south_location.world_location = Coord(0,-1,0) + east_location = Location('east') + east_location.world_location = Coord(1,0,0) + west_location = Location('west') + west_location.world_location = Coord(-1,0,0) + + story.add_location(test_location) + story.add_location(north_location) + story.add_location(south_location) + story.add_location(east_location) + story.add_location(west_location) + + neighbors = story.neighbors_for_location(test_location) + assert(neighbors['north'] == north_location) + assert(neighbors['south'] == south_location) + assert(neighbors['east'] == east_location) + assert(neighbors['west'] == west_location) + + def test_check_setting(self): + story = DynamicStory() + assert(story.check_setting('fantasy') == 'fantasy') + assert(story.check_setting('modern') == 'modern') + assert(story.check_setting('sci-fi') == 'scifi') + assert(story.check_setting('steampunk') == '') + assert(story.check_setting('cyberpunk') == '') + assert(story.check_setting('western') == '') diff --git a/tests/test_llm_ext.py b/tests/test_living_npc.py similarity index 83% rename from tests/test_llm_ext.py rename to tests/test_living_npc.py index 12693ac8..ee656d25 100644 --- a/tests/test_llm_ext.py +++ b/tests/test_living_npc.py @@ -1,13 +1,10 @@ -from datetime import timedelta import json import responses from tale import mud_context -import tale from tale.llm import llm_cache from tale.base import Exit, Item, Living, Location, ParseResult, Weapon -from tale.coord import Coord from tale.llm.LivingNpc import LivingNpc from tale.llm.item_handling_result import ItemHandlingResult from tale.llm.dynamic_story import DynamicStory @@ -18,8 +15,8 @@ from tale.skills.skills import SkillType from tale.skills.weapon_type import WeaponType from tale.wearable import WearLocation -from tale.zone import Zone from tests.supportstuff import FakeDriver, MsgTraceNPC + class TestLivingNpc(): drink = Item("ale", "jug of ale", descr="Looks and smells like strong ale.") @@ -356,78 +353,3 @@ def test_search(self): assert(actions == '') assert not self.npc2.hidden assert ["test searches for something.", "test reveals actor"] == self.msg_trace_npc.messages - -class TestDynamicStory(): - - def test_add_location(self): - story = DynamicStory() - test_location = Location('test') - test_location.world_location = Coord(0,0,0) - story.add_location(test_location) - assert(story._world._locations['test'] == test_location) - assert(story._world._grid[(0,0,0)] == test_location) - - def test_add_location_duplicate(self): - story = DynamicStory() - test_location = Location('test') - test_location.world_location = Coord(0,0,0) - story.add_location(test_location) - assert(story._world._locations['test'] == test_location) - assert(story._world._grid[(0,0,0)] == test_location) - assert(not story.add_location(test_location)) - - def test_add_location_to_zone(self): - story = DynamicStory() - test_location = Location('test') - test_location.world_location = Coord(0,0,0) - test_zone = Zone('zone') - story.add_zone(test_zone) - story.add_location(test_location, 'zone') - assert(story._world._locations['test'] == test_location) - assert(story._world._grid[(0,0,0)] == test_location) - assert(story._zones['zone'].locations['test'] == test_location) - assert(story.get_location(zone='zone', name='test') == test_location) - - def test_find_location_in_zone(self): - story = DynamicStory() - test_location = Location('test') - test_location.world_location = Coord(0,0,0) - test_zone = Zone('zone') - story.add_zone(test_zone) - story.add_location(test_location, 'zone') - assert(story.find_location('test') == test_location) - - def test_neighbors_for_location(self): - story = DynamicStory() - story._locations = dict() - test_location = Location('test') - test_location.world_location = Coord(0,0,0) - north_location = Location('north') - north_location.world_location = Coord(0,1,0) - south_location = Location('south') - south_location.world_location = Coord(0,-1,0) - east_location = Location('east') - east_location.world_location = Coord(1,0,0) - west_location = Location('west') - west_location.world_location = Coord(-1,0,0) - - story.add_location(test_location) - story.add_location(north_location) - story.add_location(south_location) - story.add_location(east_location) - story.add_location(west_location) - - neighbors = story.neighbors_for_location(test_location) - assert(neighbors['north'] == north_location) - assert(neighbors['south'] == south_location) - assert(neighbors['east'] == east_location) - assert(neighbors['west'] == west_location) - - def test_check_setting(self): - story = DynamicStory() - assert(story.check_setting('fantasy') == 'fantasy') - assert(story.check_setting('modern') == 'modern') - assert(story.check_setting('sci-fi') == 'scifi') - assert(story.check_setting('steampunk') == '') - assert(story.check_setting('cyberpunk') == '') - assert(story.check_setting('western') == '') \ No newline at end of file diff --git a/tests/test_llm_utils.py b/tests/test_llm_utils.py index 8128a017..12b4866a 100644 --- a/tests/test_llm_utils.py +++ b/tests/test_llm_utils.py @@ -543,4 +543,14 @@ def test_generate_note_quest(self): assert(quest.reason == 'A test quest') assert(quest.target == 'Arto') - \ No newline at end of file +# class TestStoryBuilding(): + +# llm_util = LlmUtil(FakeIoUtil()) # type: LlmUtil + +# def test_advance_story_section(self): +# self.llm_util._story_building.io_util.response = "Chapter 2" +# story_context = StoryContext(name='Test', base_story='test') + +# result = self.llm_util._story_building.advance_story_section(story_context) + +# assert(result == 'Chapter 2') \ No newline at end of file diff --git a/tests/test_story.py b/tests/test_story.py new file mode 100644 index 00000000..c824c995 --- /dev/null +++ b/tests/test_story.py @@ -0,0 +1,71 @@ +import pytest +from tale.story import StoryContext + +class TestStoryContext: + + def test_initialization(self): + context = StoryContext(base_story="A hero's journey") + assert context.base_story == "A hero's journey" + assert context.current_section == "" + assert context.past_sections == [] + + def test_set_current_section(self): + context = StoryContext(base_story="A hero's journey") + context.set_current_section("Chapter 1") + assert context.current_section == "Chapter 1" + assert context.past_sections == [] + + context.set_current_section("Chapter 2") + assert context.current_section == "Chapter 2" + assert context.past_sections == ["Chapter 1"] + + def test_increase_progress(self): + context = StoryContext(base_story="A hero's journey") + result = context.increase_progress(0.01) + assert result == False + assert context.progress < 0.011 + context.increase_progress(0.01) + assert context.progress < 0.021 + + context.progress = 0.99999999999 + result = context.increase_progress(1) + assert result == True + assert context.progress > 1.0 + + context.progress = 9.9999999999999999999 + context.increase_progress(1) + assert context.progress == 10.0 + + def test_to_context(self): + context = StoryContext(base_story="A hero's journey") + context.set_current_section("Chapter 1") + assert context.to_context() == " Base plot: A hero's journey; Active section: Chapter 1" + + def test_to_context_with_past(self): + context = StoryContext(base_story="A hero's journey") + context.set_current_section("Chapter 1") + context.set_current_section("Chapter 2") + assert context.to_context_with_past() == " Base plot: A hero's journey; Past: Chapter 1; Active section:Chapter 2" + + def test_from_json(self): + data = { + "base_story": "A hero's journey", + "current_section": "Chapter 1", + "past_sections": ["Prologue"] + } + context = StoryContext().from_json(data) + assert context.base_story == "A hero's journey" + assert context.current_section == "Chapter 1" + assert context.past_sections == ["Prologue"] + + def test_to_json(self): + context = StoryContext(base_story="A hero's journey") + context.set_current_section("Prologue") + context.set_current_section("Chapter 1") + context.set_current_section("Chapter 2") + data = context.to_json() + assert data == { + "base_story": "A hero's journey", + "current_section": "Chapter 2", + "past_sections": ["Prologue", "Chapter 1"] + } \ No newline at end of file From 2ba587909a9ba452e5f3578497f23270b663876b Mon Sep 17 00:00:00 2001 From: rickard Date: Sat, 19 Oct 2024 21:45:52 +0200 Subject: [PATCH 3/6] fix context --- llm_config.yaml | 4 ++-- stories/prancingllama/story.py | 3 ++- tale/llm/llm_utils.py | 4 ++-- tale/llm/story_building.py | 8 +++++--- tale/story.py | 17 +++++++++++++---- tests/test_llm_utils.py | 26 ++++++++++++++++++-------- tests/test_story.py | 13 ++++++++++--- 7 files changed, 52 insertions(+), 23 deletions(-) diff --git a/llm_config.yaml b/llm_config.yaml index 3e4646a2..5b915c40 100644 --- a/llm_config.yaml +++ b/llm_config.yaml @@ -16,7 +16,7 @@ DUNGEON_LOCATION_TEMPLATE: '{"index": (int), "name": "", "description": 25 words CHARACTER_TEMPLATE: '{"name":"", "description": "50 words", "appearance": "25 words", "personality": "50 words", "money":(int), "level":"", "gender":"m/f/n", "age":(int), "race":"", "occupation":""}' FOLLOW_TEMPLATE: '{{"response":"yes or no", "reason":"50 words"}}' ITEM_TYPES: ["Weapon", "Wearable", "Health", "Money", "Trash"] -PRE_PROMPT: 'You are a creative game keeper for an interactive fiction story telling session. You craft detailed worlds and interesting characters with unique and deep personalities for the player to interact with. Do not acknowledge the task or speak directly to the user, or respond with anything besides the request..' +PRE_PROMPT: 'You are a creative game keeper for an interactive fiction story telling session. You craft detailed worlds and interesting characters with unique and deep personalities for the player to interact with. Always follow the instructions given, never acknowledge the task or speak directly to the user or respond with anything besides the request.' BASE_PROMPT: '{context}\n[USER_START] Rewrite [{input_text}] in your own words. The information inside the tags should be used to ensure it fits the story. Use about {max_words} words.' DIALOGUE_PROMPT: '{context}\nThe following is a conversation between {character1} and {character2}; {character2}s sentiment towards {character1}: {sentiment}. Write a single response as {character2} in third person pov, using {character2} description and other information found inside the tags. If {character2} has a quest active, they will discuss it based on its status. Respond in JSON using this template: """{dialogue_template}""". [USER_START]Continue the following conversation as {character2}: {previous_conversation}' COMBAT_PROMPT: '{context}\nThe following is a combat scene between {attackers} and {defenders} in {location}. [USER_START] Describe the following combat result in about 150 words in vivid language, using the characters weapons and their health status: 1.0 is highest, 0.0 is lowest. Combat Result: {input_text}' @@ -46,4 +46,4 @@ REQUEST_FOLLOW_PROMPT: '{context}\n[USER_START]Act as as {cha DAY_CYCLE_EVENT_PROMPT: '{context}\n[USER_START] Write up to two sentences describing the transition from {from_time} to {to_time} in {location_name}, using the information supplied inside the tags.' NARRATIVE_EVENT_PROMPT: '{context}\n[USER_START] Write a narrative event that occurs in {location_name} using the information supplied inside the tags. The event should be related to the location and the characters present. Use up to 50 words.' RANDOM_SPAWN_PROMPT: '{context}\n[USER_START] An npc or a mob has entered {location_name}. Select either and fill in one of the following templates using the information supplied inside the tags. Respond using JSON in the following format: {npc_template}' -ADVANCE_STORY_PROMPT: '{context}\n[USER_START] Advance the story in {location_name}. Use the information supplied inside the tags to write a paragraph that progresses the story. The story progress is a value between 0 and 10. Consider the pacing of the plot when writing the section. A value below 3 is considered early. An inciting event should happen during this phase. Between 3 and 8 is the middle of the story. A climactic event should happen at 8 or 9. Above 9 is considered to be the ending. Use up to 100 words.' \ No newline at end of file +ADVANCE_STORY_PROMPT: '{context}\n[USER_START] Advance the high-level plot outline in the current story, hidden to the player of this roleplaying game. The story progress is a value between 0 and 10. Consider the pacing of the plot when writing the section. A value below 3 is considered early. An inciting event should happen during this phase. Between 3 and 8 is the middle of the story. A climactic event should happen at 8 or 9. Above 9 is considered to be the ending. Use the information supplied inside the tags to write a short paragraph that progresses the plot outline. Use up to 100 words.' \ No newline at end of file diff --git a/stories/prancingllama/story.py b/stories/prancingllama/story.py index 48f1057b..e9b6acd2 100644 --- a/stories/prancingllama/story.py +++ b/stories/prancingllama/story.py @@ -14,6 +14,7 @@ from tale.skills.skills import SkillType from tale.story import * from tale.skills.weapon_type import WeaponType +from tale.story import StoryContext from tale.zone import Zone class Story(DynamicStory): @@ -34,7 +35,7 @@ class Story(DynamicStory): config.startlocation_player = "prancingllama.entrance" config.startlocation_wizard = "prancingllama.entrance" config.zones = ["prancingllama"] - config.context = "The final outpost high up in a cold, craggy mountain range. It's frequented by adventurers and those seeking to avoid attention." + config.context = StoryContext(base_story="The final outpost high up in a cold, craggy mountain range. A drama unfolds between those avoiding the cold and those seeking the cold. And what is lurking underneath the snow covered peaks and uncharted valleys?") config.type = "A low level fantasy adventure with focus of character building and interaction." config.custom_resources = True diff --git a/tale/llm/llm_utils.py b/tale/llm/llm_utils.py index bb1a5351..dd0b4ca3 100644 --- a/tale/llm/llm_utils.py +++ b/tale/llm/llm_utils.py @@ -342,9 +342,9 @@ def set_story(self, story: DynamicStory): if story.config.image_gen: self._init_image_gen(story.config.image_gen) - def advance_story_section(self, story: DynamicStory): + def advance_story_section(self, story: DynamicStory) -> str: """ Increase the story progress""" - self._story_building.advance_story_section(story or self.__story) + return self._story_building.advance_story_section(story or self.__story) def _init_image_gen(self, image_gen: str): """ Initialize the image generator""" diff --git a/tale/llm/story_building.py b/tale/llm/story_building.py index 7540bd8a..f9290215 100644 --- a/tale/llm/story_building.py +++ b/tale/llm/story_building.py @@ -15,7 +15,7 @@ def __init__(self, io_util: IoUtil, default_body: dict, backend: str = 'kobold_c self.story_background_prompt = llm_config.params['STORY_BACKGROUND_PROMPT'] # Type: str self.advance_story_prompt = llm_config.params['ADVANCE_STORY_PROMPT'] # Type: str - def generate_story_background(self, world_mood: int, world_info: str, story_type: str): + def generate_story_background(self, world_mood: int, world_info: str, story_type: str) -> str: prompt = self.story_background_prompt.format( story_type=story_type, world_mood=parse_utils.mood_string_from_int(world_mood), @@ -23,9 +23,11 @@ def generate_story_background(self, world_mood: int, world_info: str, story_type request_body = self.default_body return self.io_util.synchronous_request(request_body, prompt=prompt) - def advance_story_section(self, story: DynamicStory): + def advance_story_section(self, story: DynamicStory) -> str: story_context = AdvanceStoryContext(story.config.context) prompt = self.advance_story_prompt.format(context=story_context.to_prompt_string()) request_body = self.default_body - return self.io_util.synchronous_request(request_body, prompt=prompt) + result = self.io_util.synchronous_request(request_body, prompt=prompt) + story.config.context.set_current_section(result) + return result \ No newline at end of file diff --git a/tale/story.py b/tale/story.py index 22fe2de4..4b2916d5 100644 --- a/tale/story.py +++ b/tale/story.py @@ -7,6 +7,7 @@ import datetime import enum +import math import random from typing import Optional, Any, List, Set, Generator, Union from packaging.version import Version @@ -184,11 +185,11 @@ def __init__(self, base_story: str = "") -> None: def increase_progress(self, amount: float = 1.0) -> bool: """ increase the progress by the given amount, return True if the progress has changed past the integer value """ - start_progess = int(self.progress) + start_progess = math.floor(self.progress) self.progress += random.random() * amount * self.speed if self.progress >= self.length: self.progress = self.length - return start_progess != int(self.progress) + return start_progess != math.floor(self.progress) def set_current_section(self, section: str) -> None: if self.current_section: @@ -199,13 +200,21 @@ def to_context(self) -> str: return f" Base plot: {self.base_story}; Active section: {self.current_section}" def to_context_with_past(self) -> str: - return f" Base plot: {self.base_story}; Past: {' '.join(self.past_sections) if self.past_sections else 'This is the beginning of the story'}; Active section:{self.current_section}" + return f" Base plot: {self.base_story}; Past: {' '.join(self.past_sections) if self.past_sections else 'This is the beginning of the story'}; Active section:{self.current_section}; Progress: {self.progress}/{self.length};" def from_json(self, data: dict) -> 'StoryContext': self.base_story = data.get("base_story", "") self.current_section = data.get("current_section", "") self.past_sections = data.get("past_sections", []) + self.progress = data.get("progress", 0.0) + self.length = data.get("length", 10.0) + self.speed = data.get("speed", 1.0) return self def to_json(self) -> dict: - return {"base_story": self.base_story, "current_section": self.current_section, "past_sections": self.past_sections} \ No newline at end of file + return {"base_story": self.base_story, + "current_section": self.current_section, + "past_sections": self.past_sections, + "progress": self.progress, + "length": self.length, + "speed": self.speed} \ No newline at end of file diff --git a/tests/test_llm_utils.py b/tests/test_llm_utils.py index 12b4866a..4080d9af 100644 --- a/tests/test_llm_utils.py +++ b/tests/test_llm_utils.py @@ -5,6 +5,7 @@ from tale.llm.contexts.CharacterContext import CharacterContext from tale.llm.contexts.FollowContext import FollowContext from tale.llm.contexts.WorldGenerationContext import WorldGenerationContext +from tale.llm.dynamic_story import DynamicStory import tale.llm.llm_cache as llm_cache from tale import mud_context from tale import zone @@ -17,7 +18,7 @@ from tale.llm.responses.FollowResponse import FollowResponse from tale.player import Player, PlayerConnection from tale.races import UnarmedAttack -from tale.story import MoneyType +from tale.story import MoneyType, StoryConfig, StoryContext from tale.tio.console_io import ConsoleIo from tale.zone import Zone from tests.supportstuff import FakeIoUtil @@ -543,14 +544,23 @@ def test_generate_note_quest(self): assert(quest.reason == 'A test quest') assert(quest.target == 'Arto') -# class TestStoryBuilding(): +class TestStoryBuilding(): -# llm_util = LlmUtil(FakeIoUtil()) # type: LlmUtil + driver = IFDriver(screen_delay=99, gui=False, web=True, wizard_override=True) + driver.game_clock = util.GameDateTime(datetime.datetime(year=2023, month=1, day=1), 1) + + llm_util = LlmUtil(FakeIoUtil()) # type: LlmUtil + + story = JsonStory('tests/files/world_story/', parse_utils.load_story_config(parse_utils.load_json('tests/files/test_story_config_empty.json'))) + + story.init(driver) + + def test_advance_story_section(self): + self.llm_util._story_building.io_util.response = "Chapter 2" -# def test_advance_story_section(self): -# self.llm_util._story_building.io_util.response = "Chapter 2" -# story_context = StoryContext(name='Test', base_story='test') + self.story.config.context = StoryContext(base_story='test context') -# result = self.llm_util._story_building.advance_story_section(story_context) + result = self.llm_util._story_building.advance_story_section(self.story) -# assert(result == 'Chapter 2') \ No newline at end of file + assert(result == 'Chapter 2') + assert(self.story.config.context.current_section == 'Chapter 2') \ No newline at end of file diff --git a/tests/test_story.py b/tests/test_story.py index c824c995..268bb1cb 100644 --- a/tests/test_story.py +++ b/tests/test_story.py @@ -45,18 +45,22 @@ def test_to_context_with_past(self): context = StoryContext(base_story="A hero's journey") context.set_current_section("Chapter 1") context.set_current_section("Chapter 2") - assert context.to_context_with_past() == " Base plot: A hero's journey; Past: Chapter 1; Active section:Chapter 2" + assert context.to_context_with_past() == " Base plot: A hero's journey; Past: Chapter 1; Active section:Chapter 2; Progress: 0.0/10.0;" def test_from_json(self): data = { "base_story": "A hero's journey", "current_section": "Chapter 1", - "past_sections": ["Prologue"] + "past_sections": ["Prologue"], + "progress": 0.5, + "length": 10.0, } context = StoryContext().from_json(data) assert context.base_story == "A hero's journey" assert context.current_section == "Chapter 1" assert context.past_sections == ["Prologue"] + assert context.progress == 0.5 + assert context.length == 10.0 def test_to_json(self): context = StoryContext(base_story="A hero's journey") @@ -67,5 +71,8 @@ def test_to_json(self): assert data == { "base_story": "A hero's journey", "current_section": "Chapter 2", - "past_sections": ["Prologue", "Chapter 1"] + "past_sections": ["Prologue", "Chapter 1"], + "progress": 0.0, + "length": 10.0, + "speed": 1.0 } \ No newline at end of file From 56d9a28cd3e4e6fa1ee9ea28f2743c32465368b5 Mon Sep 17 00:00:00 2001 From: rickard Date: Sun, 20 Oct 2024 07:52:08 +0200 Subject: [PATCH 4/6] fix test --- tale/driver.py | 2 +- tale/tio/console_http_io.py | 189 ----------- tale/tio/if_browser_io_text.py | 561 --------------------------------- tests/files/test.jpg | Bin 0 -> 1415 bytes tests/test_dynamic_story.py | 1 + 5 files changed, 2 insertions(+), 751 deletions(-) delete mode 100644 tale/tio/console_http_io.py delete mode 100644 tale/tio/if_browser_io_text.py create mode 100644 tests/files/test.jpg diff --git a/tale/driver.py b/tale/driver.py index 86d67305..dc53759b 100644 --- a/tale/driver.py +++ b/tale/driver.py @@ -535,7 +535,7 @@ def _server_tick(self) -> None: events, idle_time, subbers = topicinfo[topicname] if events == 0 and not subbers and idle_time > 30: pubsub.topic(topicname).destroy() - progress = self.story.increase_progress(0.01) + progress = self.story.increase_progress(0.0001) if progress: self.llm_util.advance_story_section(self.story) diff --git a/tale/tio/console_http_io.py b/tale/tio/console_http_io.py deleted file mode 100644 index 6c3ce4ad..00000000 --- a/tale/tio/console_http_io.py +++ /dev/null @@ -1,189 +0,0 @@ -""" -Console-based input/output. - -'Tale' mud driver, mudlib and interactive fiction framework -Copyright by Irmen de Jong (irmen@razorvine.net) -""" -from http.server import BaseHTTPRequestHandler, HTTPServer -import os -import signal -import sys -import threading -from typing import Sequence, Tuple, Any, Optional, List - -from tale import lang -try: - import prompt_toolkit - from prompt_toolkit.contrib.completers import WordCompleter -except ImportError: - prompt_toolkit = None - -from . import colorama_patched as colorama -from . import styleaware_wrapper, iobase -from ..driver import Driver -from ..player import PlayerConnection, Player -from .. import mud_context - - -colorama.init() -assert type(colorama.Style.DIM) is str, "Incompatible colorama library installed. Please upgrade to a more recent version (0.3.6+)" - -__all__ = ["ConsoleIo"] - -style_words = { - "dim": colorama.Style.DIM, - "normal": colorama.Style.NORMAL, - "bright": colorama.Style.BRIGHT, - "ul": colorama.Style.UNDERLINED, - "it": colorama.Style.ITALIC, - "rev": colorama.Style.REVERSEVID, - "/": colorama.Style.RESET_ALL, - "location": colorama.Style.BRIGHT, - "clear": "\033[1;1H\033[2J", # ansi sequence to clear the console screen - "monospaced": "", # we assume the console is already monospaced font - "/monospaced": "" -} -assert len(set(style_words.keys()) ^ iobase.ALL_STYLE_TAGS) == 0, "mismatch in list of style tags" - -if os.name == "nt": - if not hasattr(colorama, "win32") or colorama.win32.windll is None: - style_words.clear() # running on windows without colorama ansi support - -class Server(BaseHTTPRequestHandler): - - def setMessage(self, message: str): - self.message = message - - def _set_headers(self): - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.end_headers() - - def do_HEAD(self): - self._set_headers() - - # GET sends back a Hello world message - def do_GET(self): - print('GET') - self._set_headers() - - global response_message - - with open(response_message) as data_file: - self.wfile.write(data_file.read().encode()) - - # POST echoes the message adding a JSON field - def do_POST(self): - print('POST') - # read the message and convert it into a python dictionary - length = int(self.headers.get('content-length', "0")) - message = json.loads(self.rfile.read(length)) - print(self.headers) - print(message) - # add a property to the object, just to mess with data - message['received'] = 'ok' - - global response_message - # send the message back - self._set_headers() - with open(response_message) as data_file: - self.wfile.write(data_file.read().encode()) - -class ConsoleHttpIo(iobase.IoAdapterBase): - """ - I/O adapter for the text-console (standard input/standard output). - """ - def __init__(self, player_connection: PlayerConnection, port: int) -> None: - super().__init__(player_connection) - try: - # try to output a unicode character such as smartypants uses for nicer formatting - encoding = getattr(sys.stdout, "encoding", sys.getfilesystemencoding()) - chr(8230).encode(encoding) - except (UnicodeEncodeError, TypeError): - self.supports_smartquotes = False - self.stop_main_loop = False - self.input_not_paused = threading.Event() - self.input_not_paused.set() - server_address = ('', port) - httpd = HTTPServer(server_address, Server) - print('Starting httpd on port %d...' % port) - httpd.serve_forever() - - def __repr__(self): - return "" % (id(self), os.getpid()) - - def singleplayer_mainloop(self, player_connection: PlayerConnection) -> None: - while not self.stop_main_loop: - try: - pass - except KeyboardInterrupt: - print("* break - stopping server loop") - if lang.yesno(input("Are you sure you want to exit the Tale driver, and kill the game? ")): - break - print("Game shutting down.") - - def render_output(self, paragraphs: Sequence[Tuple[str, bool]], **params: Any) -> str: - """ - Render (format) the given paragraphs to a text representation. - It doesn't output anything to the screen yet; it just returns the text string. - Any style-tags are still embedded in the text. - This console-implementation expects 2 extra parameters: "indent" and "width". - """ - if not paragraphs: - return "" - indent = " " * params["indent"] - wrapper = styleaware_wrapper.StyleTagsAwareTextWrapper(width=params["width"], fix_sentence_endings=True, - initial_indent=indent, subsequent_indent=indent) - output = [] - for txt, formatted in paragraphs: - if formatted: - txt = wrapper.fill(txt) + "\n" - else: - # unformatted output, prepend every line with the indent but otherwise leave them alone - txt = indent + ("\n" + indent).join(txt.splitlines()) + "\n" - assert txt.endswith("\n") - output.append(txt) - return self.smartquotes("".join(output)) - - def output(self, *lines: str) -> None: - """Write some text to the screen. Takes care of style tags that are embedded.""" - super().output(*lines) - for line in lines: - print(self._apply_style(line, self.do_styles)) - sys.stdout.flush() - - def output_no_newline(self, text: str, new_paragraph = True) -> None: - """Like output, but just writes a single line, without end-of-line.""" - if prompt_toolkit and self.do_prompt_toolkit: - self.output(text) - else: - super().output_no_newline(text, new_paragraph) - print(self._apply_style(text, self.do_styles), end="") - sys.stdout.flush() - - def write_input_prompt(self) -> None: - """write the input prompt '>>'""" - if not prompt_toolkit or not self.do_prompt_toolkit: - print(self._apply_style("\n>> ", self.do_styles), end="") - sys.stdout.flush() - - def _apply_style(self, line: str, do_styles: bool) -> str: - """Convert style tags to ansi escape sequences suitable for console text output""" - if "<" not in line: - return line - elif style_words and do_styles: - for tag, replacement in style_words.items(): - line = line.replace("<%s>" % tag, replacement) - return line - else: - return iobase.strip_text_styles(line) # type: ignore - - -if __name__ == "__main__": - def _main(): - co = ConsoleIo(PlayerConnection()) - lines = ["test Tale Console output", "'singlequotes'", '"double quotes"', "ellipsis...", "BRIGHT REVERSE NORMAL"] - paragraphs = [(text, False) for text in lines] - output = co.render_output(paragraphs, indent=2, width=50) - co.output(output) - _main() diff --git a/tale/tio/if_browser_io_text.py b/tale/tio/if_browser_io_text.py deleted file mode 100644 index bd35956c..00000000 --- a/tale/tio/if_browser_io_text.py +++ /dev/null @@ -1,561 +0,0 @@ -""" -Webbrowser based I/O for a single player ('if') story. - -'Tale' mud driver, mudlib and interactive fiction framework -Copyright by Irmen de Jong (irmen@razorvine.net) -""" -import json -import time -import socket -from socketserver import ThreadingMixIn -from email.utils import formatdate, parsedate -from hashlib import md5 -from html import escape as html_escape -from threading import Lock, Event -from typing import Iterable, Sequence, Tuple, Any, Optional, Dict, Callable, List -from urllib.parse import parse_qs -from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer - -from tale.web.web_utils import create_chat_container, dialogue_splitter - -from . import iobase -from .. import mud_context, vfs, lang -from .styleaware_wrapper import tag_split_re -from .. import __version__ as tale_version_str -from ..driver import Driver -from ..player import PlayerConnection - -__all__ = ["HttpIo", "TaleWsgiApp", "TaleWsgiAppBase", "WsgiStartResponseType"] - -WsgiStartResponseType = Callable[..., None] - - -style_tags_html = { - "": ("", ""), - "": ("", ""), - "": ("", ""), - "
      ": ("", ""), - "": ("", ""), - "": ("", ""), - "": None, - "": None, - "": ("", ""), - "": ("", "") -} - - -def squash_parameters(parameters: Dict[str, Any]) -> Dict[str, Any]: - """ - Makes a cgi-parsed parameter dictionary into a dict where the values that - are just a list of a single value, are converted to just that single value. - """ - for key, value in parameters.items(): - if isinstance(value, (list, tuple)) and len(value) == 1: - parameters[key] = value[0] - return parameters - - -class HttpIo(iobase.IoAdapterBase): - """ - I/O adapter for a http/browser based interface. - This doubles as a wsgi app and runs as a web server using wsgiref. - This way it is a simple call for the driver, it starts everything that is needed. - """ - def __init__(self, player_connection: PlayerConnection, wsgi_server: WSGIServer) -> None: - super().__init__(player_connection) - self.wsgi_server = wsgi_server - self.__html_to_browser = [] # type: List[str] # the lines that need to be displayed in the player's browser - self.__html_special = [] # type: List[str] # special out of band commands (such as 'clear') - self.__html_to_browser_lock = Lock() - self.__new_html_available = Event() - self.__data_to_browser = [] - - def destroy(self) -> None: - self.__new_html_available.set() - - def append_html_to_browser(self, text: str) -> None: - with self.__html_to_browser_lock: - self.__html_to_browser.append(text) - self.__new_html_available.set() - - def append_html_special(self, text: str) -> None: - with self.__html_to_browser_lock: - self.__html_special.append(text) - self.__new_html_available.set() - - def append_data_to_browser(self, data: str) -> None: - with self.__html_to_browser_lock: - self.__data_to_browser.append(data) - self.__new_html_available.set() - - def get_html_to_browser(self) -> List[str]: - with self.__html_to_browser_lock: - html, self.__html_to_browser = self.__html_to_browser, [] - return html - - def get_html_special(self) -> List[str]: - with self.__html_to_browser_lock: - special, self.__html_special = self.__html_special, [] - return special - - def get_data_to_browser(self) -> List[str]: - with self.__html_to_browser_lock: - data, self.__data_to_browser = self.__data_to_browser, [] - return data - - def wait_html_available(self, timeout: float=None) -> None: - self.__new_html_available.wait(timeout=timeout) - self.__new_html_available.clear() - - def singleplayer_mainloop(self, player_connection: PlayerConnection) -> None: - """mainloop for the web browser interface for single player mode""" - import webbrowser - from threading import Thread - protocol = "https" if self.wsgi_server.use_ssl else "http" - - if self.wsgi_server.address_family == socket.AF_INET6: - hostname, port, _, _ = self.wsgi_server.server_address - if hostname[0] != '[': - hostname = '[' + hostname + ']' - url = "%s://%s:%d/tale/" % (protocol, hostname, port) - print("Access the game on this web server url (ipv6): ", url, end="\n\n") - else: - hostname, port = self.wsgi_server.server_address - if hostname.startswith("127.0"): - hostname = "localhost" - url = "%s://%s:%d/tale/" % (protocol, hostname, port) - print("Access the game on this web server url (ipv4): ", url, end="\n\n") - t = Thread(target=webbrowser.open, args=(url, )) # type: ignore - t.daemon = True - t.start() - while not self.stop_main_loop: - try: - self.wsgi_server.handle_request() - except KeyboardInterrupt: - print("* break - stopping server loop") - if lang.yesno(input("Are you sure you want to exit the Tale driver, and kill the game? ")): - break - print("Game shutting down.") - - def pause(self, unpause: bool=False) -> None: - pass - - def clear_screen(self) -> None: - self.append_html_special("clear") - - def render_output(self, paragraphs: Sequence[Tuple[str, bool]], **params: Any) -> str: - if not paragraphs: - return "" - with self.__html_to_browser_lock: - for text, formatted in paragraphs: - # text = self.convert_to_html(text) - if text == "\n": - text = "
      " - if dialogue_splitter in text: - text = create_chat_container(text) - self.__html_to_browser.append("

      " + text + "

      \n") - elif formatted: - self.__html_to_browser.append("

      " + text + "

      \n") - else: - self.__html_to_browser.append("
      " + text + "
      \n") - self.__new_html_available.set() - return "" # the output is pushed to the browser via a buffer, rather than printed to a screen - - def output(self, *lines: str) -> None: - super().output(*lines) - with self.__html_to_browser_lock: - for line in lines: - self.output_no_newline(line) - self.__new_html_available.set() - - def output_no_newline(self, text: str, new_paragraph = True) -> None: - super().output_no_newline(text, new_paragraph) - if text == "\n": - text = "
      " - if new_paragraph: - self.__html_to_browser.append("

      " + text + "

      \n") - else: - self.__html_to_browser.append(text.replace("\\n", "
      ")) - self.__new_html_available.set() - - def send_data(self, data: str) -> None: - self.append_data_to_browser(data) - - -class TaleWsgiAppBase: - """ - Generic wsgi functionality that is not tied to a particular - single or multiplayer web server. - """ - def __init__(self, driver: Driver) -> None: - self.driver = driver - - def __call__(self, environ: Dict[str, Any], start_response: WsgiStartResponseType) -> Iterable[bytes]: - method = environ.get("REQUEST_METHOD") - path = environ.get('PATH_INFO', '').lstrip('/') - if not path: - return self.wsgi_redirect(start_response, "/tale/") - if path.startswith("tale/"): - if method in ("GET", "POST"): - if method == "POST": - clength = int(environ['CONTENT_LENGTH']) - if clength > 1e6: - raise ValueError('Maximum content length exceeded') - inputstream = environ['wsgi.input'] - qs = inputstream.read(clength).decode("utf-8") - elif method == "GET": - qs = environ.get("QUERY_STRING", "") - parameters = squash_parameters(parse_qs(qs, encoding="UTF-8")) - return self.wsgi_route(environ, path[5:], parameters, start_response) - else: - return self.wsgi_invalid_request(start_response) - return self.wsgi_not_found(start_response) - - def wsgi_route(self, environ: Dict[str, Any], path: str, parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - if not path or path == "start": - return self.wsgi_handle_start(environ, parameters, start_response) - elif path == "about": - return self.wsgi_handle_about(environ, parameters, start_response) - elif path == "story": - return self.wsgi_handle_story(environ, parameters, start_response) - elif path == "tabcomplete": - return self.wsgi_handle_tabcomplete(environ, parameters, start_response) - elif path == "input": - return self.wsgi_handle_input(environ, parameters, start_response) - elif path == "eventsource": - return self.wsgi_handle_eventsource(environ, parameters, start_response) - elif path.startswith("static/"): - return self.wsgi_handle_static(environ, path, start_response) - elif path == "quit": - return self.wsgi_handle_quit(environ, parameters, start_response) - return self.wsgi_not_found(start_response) - - def wsgi_invalid_request(self, start_response: WsgiStartResponseType) -> Iterable[bytes]: - """Called if invalid http method.""" - start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) - return [b'Error 405: Method Not Allowed'] - - def wsgi_not_found(self, start_response: WsgiStartResponseType) -> Iterable[bytes]: - """Called if Url not found.""" - start_response('404 Not Found', [('Content-Type', 'text/plain')]) - return [b'Error 404: Not Found'] - - def wsgi_redirect(self, start_response: Callable, target: str) -> Iterable[bytes]: - """Called to do a redirect""" - start_response('302 Found', [('Location', target)]) - return [] - - def wsgi_redirect_other(self, start_response: Callable, target: str) -> Iterable[bytes]: - """Called to do a redirect see-other""" - start_response('303 See Other', [('Location', target)]) - return [] - - def wsgi_not_modified(self, start_response: WsgiStartResponseType) -> Iterable[bytes]: - """Called to signal that a resource wasn't modified""" - start_response('304 Not Modified', []) - return [] - - def wsgi_internal_server_error(self, start_response: Callable, message: str="") -> Iterable[bytes]: - """Called when an internal server error occurred""" - start_response('500 Internal server error', []) - return [message.encode("utf-8")] - - def wsgi_internal_server_error_json(self, start_response: Callable, message: str="") -> Iterable[bytes]: - """Called when an internal server error occurred, returns json response rather than html""" - start_response('500 Internal server error', [('Content-Type', 'application/json; charset=utf-8')]) - message = '{"error": "%s"}' % message - return [message.encode("utf-8")] - - def wsgi_handle_about(self, environ: Dict[str, Any], parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - raise NotImplementedError("implement this in subclass") # about page - - def wsgi_handle_quit(self, environ: Dict[str, Any], parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - raise NotImplementedError("implement this in subclass") # quit/logged out page - - def wsgi_handle_start(self, environ: Dict[str, Any], parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - # start page / titlepage - headers = [('Content-Type', 'text/html; charset=utf-8')] - resource = vfs.internal_resources["web/index.html"] - etag = self.etag(id(self), time.mktime(self.driver.server_started.timetuple()), resource.mtime, "start") - if_none = environ.get('HTTP_IF_NONE_MATCH') - if if_none and (if_none == '*' or etag in if_none): - return self.wsgi_not_modified(start_response) - headers.append(("ETag", etag)) - start_response("200 OK", headers) - txt = resource.text.format(story_version=self.driver.story.config.version, - story_name=self.driver.story.config.name, - story_author=self.driver.story.config.author, - story_author_email=self.driver.story.config.author_address) - return [txt.encode("utf-8")] - - def wsgi_handle_story(self, environ: Dict[str, Any], parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - headers = [('Content-Type', 'text/html; charset=utf-8')] - resource = vfs.internal_resources["web/story.html"] - etag = self.etag(id(self), time.mktime(self.driver.server_started.timetuple()), 'resource.mtime', "story") - if_none = environ.get('HTTP_IF_NONE_MATCH') - if if_none and (if_none == '*' or etag in if_none): - return self.wsgi_not_modified(start_response) - headers.append(("ETag", etag)) - start_response('200 OK', headers) - txt = '' - txt = resource.text.format(story_version=self.driver.story.config.version, - story_name=self.driver.story.config.name, - story_author=self.driver.story.config.author, - story_author_email=self.driver.story.config.author_address) - txt = self.modify_web_page(environ["wsgi.session"]["player_connection"], txt) - return [txt.encode("utf-8")] - - def wsgi_handle_eventsource(self, environ: Dict[str, Any], parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - session = environ["wsgi.session"] - conn = session.get("player_connection") - if not conn: - return self.wsgi_internal_server_error_json(start_response, "not logged in") - start_response('200 OK', [('Content-Type', 'text/event-stream; charset=utf-8'), - ('Cache-Control', 'no-cache'), - # ('Transfer-Encoding', 'chunked'), not allowed by wsgi - ('X-Accel-Buffering', 'no') # nginx - ]) - yield (":" + ' ' * 2050 + "\n\n").encode("utf-8") # padding for older browsers - while self.driver.is_running(): - if conn.io and conn.player: - conn.io.wait_html_available(timeout=15) # keepalives every 15 sec - if not conn.io or not conn.player: - break - html = conn.io.get_html_to_browser() - special = conn.io.get_html_special() - data = conn.io.get_data_to_browser() - if html or special: - location = conn.player.location # type : Optional[Location] - if conn.io.dont_echo_next_cmd: - special.append("noecho") - npc_names = '' - items = '' - exits = '' - if location: - npc_names = ','.join([l.name for l in location.livings if l.alive and l.visible and l != conn.player]) - items = ','.join([i.name for i in location.items if i.visible]) - exits = ','.join(list(set([e.name for e in location.exits.values() if e.visible]))) - response = { - "text": "\n".join(html), - "special": special, - "turns": conn.player.turns, - "location": location.title if location else "???", - "location_image": location.avatar if location and location.avatar else "", - "npcs": npc_names if location else '', - "items": items if location else '', - "exits": exits if location else '', - } - result = "event: text\nid: {event_id}\ndata: {data}\n\n"\ - .format(event_id=str(time.time()), data=json.dumps(response)) - yield result.encode("utf-8") - elif data: - for d in data: - result = "event: data\nid: {event_id}\ndata: {data}\n\n"\ - .format(event_id=str(time.time()), data=d) - yield result.encode("utf-8") - else: - yield "data: keepalive\n\n".encode("utf-8") - - def wsgi_handle_tabcomplete(self, environ: Dict[str, Any], parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - session = environ["wsgi.session"] - conn = session.get("player_connection") - if not conn: - return self.wsgi_internal_server_error_json(start_response, "not logged in") - start_response('200 OK', [('Content-Type', 'application/json; charset=utf-8'), - ('Cache-Control', 'no-cache, no-store, must-revalidate'), - ('Pragma', 'no-cache'), - ('Expires', '0')]) - return [json.dumps(conn.io.tab_complete(parameters["prefix"], self.driver)).encode("utf-8")] - - def wsgi_handle_input(self, environ: Dict[str, Any], parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - session = environ["wsgi.session"] - conn = session.get("player_connection") - if not conn: - return self.wsgi_internal_server_error_json(start_response, "not logged in") - cmd = parameters.get("cmd", "") - if cmd and "autocomplete" in parameters: - suggestions = conn.io.tab_complete(cmd, self.driver) - if suggestions: - conn.io.append_html_to_browser("

      Suggestions:

      ") - conn.io.append_html_to_browser("

      " + "   ".join(suggestions) + "

      ") - else: - conn.io.append_html_to_browser("

      No matching commands.

      ") - else: - cmd = html_escape(cmd, False) - if cmd: - if conn.io.dont_echo_next_cmd: - conn.io.dont_echo_next_cmd = False - elif conn.io.echo_input: - conn.io.append_html_to_browser("%s" % cmd) - conn.player.store_input_line(cmd) - start_response('200 OK', [('Content-Type', 'text/plain')]) - return [] - - def wsgi_handle_license(self, environ: Dict[str, Any], parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - license = "The author hasn't provided any license information." - if self.driver.story.config.license_file: - license = self.driver.resources[self.driver.story.config.license_file].text - resource = vfs.internal_resources["web/about_license.html"] - headers = [('Content-Type', 'text/html; charset=utf-8')] - etag = self.etag(id(self), time.mktime(self.driver.server_started.timetuple()), resource.mtime, "license") - if_none = environ.get('HTTP_IF_NONE_MATCH') - if if_none and (if_none == '*' or etag in if_none): - return self.wsgi_not_modified(start_response) - headers.append(("ETag", etag)) - start_response("200 OK", headers) - txt = resource.text.format(license=license, - story_version=self.driver.story.config.version, - story_name=self.driver.story.config.name, - story_author=self.driver.story.config.author, - story_author_email=self.driver.story.config.author_address) - return [txt.encode("utf-8")] - - def wsgi_handle_static(self, environ: Dict[str, Any], path: str, start_response: WsgiStartResponseType) -> Iterable[bytes]: - path = path[len("static/"):] - if not self.wsgi_is_asset_allowed(path): - return self.wsgi_not_found(start_response) - try: - return self.wsgi_serve_static("web/" + path, environ, start_response) - except IOError: - return self.wsgi_not_found(start_response) - - def wsgi_is_asset_allowed(self, path: str) -> bool: - return path.endswith(".html") or path.endswith(".js") or path.endswith(".jpg") \ - or path.endswith(".png") or path.endswith(".gif") or path.endswith(".css") or path.endswith(".ico") - - def etag(self, *components: Any) -> str: - return '"' + md5("-".join(str(c) for c in components).encode("ascii")).hexdigest() + '"' - - def wsgi_serve_static(self, path: str, environ: Dict[str, Any], start_response: WsgiStartResponseType) -> Iterable[bytes]: - headers = [] - resource = vfs.internal_resources[path] - if resource.mtime: - mtime_formatted = formatdate(resource.mtime) - etag = self.etag(id(vfs.internal_resources), resource.mtime, path) - if_modified = environ.get('HTTP_IF_MODIFIED_SINCE') - if if_modified: - if parsedate(if_modified) >= parsedate(mtime_formatted): # type: ignore - # the resource wasn't modified since last requested - return self.wsgi_not_modified(start_response) - if_none = environ.get('HTTP_IF_NONE_MATCH') - if if_none and (if_none == '*' or etag in if_none): - return self.wsgi_not_modified(start_response) - headers.append(("ETag", etag)) - headers.append(("Last-Modified", formatdate(resource.mtime))) - if resource.is_text: - # text - headers.append(('Content-Type', resource.mimetype + "; charset=utf-8")) - data = resource.text.encode("utf-8") - else: - # binary - headers.append(('Content-Type', resource.mimetype)) - data = resource.data - start_response('200 OK', headers) - return [data] - - def modify_web_page(self, player_connection: PlayerConnection, html_content: str) -> None: - """Modify the html before it is sent to the browser.""" - if not "wizard" in player_connection.player.privileges: - html_content = html_content.replace('', '') - html_content = html_content.replace('', '') - return html_content - - - -class TaleWsgiApp(TaleWsgiAppBase): - """ - The actual wsgi app that the player's browser connects to. - Note that it is deliberatly simplistic and ony able to handle a single - player connection; it only works for 'if' single-player game mode. - """ - def __init__(self, driver: Driver, player_connection: PlayerConnection, - use_ssl: bool, ssl_certs: Tuple[str, str, str]) -> None: - super().__init__(driver) - self.completer = None - self.player_connection = player_connection # just a single player here - CustomWsgiServer.use_ssl = use_ssl - if use_ssl and ssl_certs: - CustomWsgiServer.ssl_cert_locations = ssl_certs - - @classmethod - def create_app_server(cls, driver: Driver, player_connection: PlayerConnection, *, - use_ssl: bool=False, ssl_certs: Tuple[str, str, str]=None) -> Callable: - wsgi_app = SessionMiddleware(cls(driver, player_connection, use_ssl, ssl_certs)) # type: ignore - wsgi_server = make_server(driver.story.config.mud_host, driver.story.config.mud_port, app=wsgi_app, - handler_class=CustomRequestHandler, server_class=CustomWsgiServer) - wsgi_server.timeout = 0.5 - return wsgi_server - - def wsgi_handle_quit(self, environ: Dict[str, Any], parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - # Quit/logged out page. For single player, simply close down the whole driver. - start_response('200 OK', [('Content-Type', 'text/html')]) - self.driver._stop_driver() - return [b"

      Tale game session ended.

      " - b"

      You may close this window/tab.

      "] - - def wsgi_handle_about(self, environ: Dict[str, Any], parameters: Dict[str, str], - start_response: WsgiStartResponseType) -> Iterable[bytes]: - # about page - if "license" in parameters: - return self.wsgi_handle_license(environ, parameters, start_response) - start_response("200 OK", [('Content-Type', 'text/html; charset=utf-8')]) - resource = vfs.internal_resources["web/about.html"] - txt = resource.text.format(tale_version=tale_version_str, - story_version=self.driver.story.config.version, - story_name=self.driver.story.config.name, - uptime="%d:%02d:%02d" % self.driver.uptime, - starttime=self.driver.server_started) - return [txt.encode("utf-8")] - - -class CustomRequestHandler(WSGIRequestHandler): - def log_message(self, format: str, *args: Any): - pass - - -class CustomWsgiServer(ThreadingMixIn, WSGIServer): - """ - A simple wsgi server with a modest request queue size, meant for single user access. - Set use_ssl to True to enable HTTPS mode instead of unencrypted HTTP. - """ - request_queue_size = 10 - use_ssl = False - ssl_cert_locations = ("./certs/localhost_cert.pem", "./certs/localhost_key.pem", "") # certfile, keyfile, certpassword - - def __init__(self, server_address, rh_class): - self.address_family = socket.AF_INET - if server_address[0][0] == '[' and server_address[0][-1] == ']': - self.address_family = socket.AF_INET6 - server_address = (server_address[0][1:-1], server_address[1], 0, 0) - super().__init__(server_address, rh_class) - - def server_bind(self): - if self.use_ssl: - print("\n\nUsing SSL\n\n") - import ssl - ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - ctx.load_cert_chain(self.ssl_cert_locations[0], self.ssl_cert_locations[1] or None, self.ssl_cert_locations[2] or None) - self.socket = ctx.wrap_socket(self.socket, server_side=True) - return super().server_bind() - - -class SessionMiddleware: - def __init__(self, app): - self.app = app - - def __call__(self, environ: Dict[str, Any], start_response: WsgiStartResponseType) -> None: - environ["wsgi.session"] = { - "id": None, - "player_connection": self.app.player_connection - } - return self.app(environ, start_response) diff --git a/tests/files/test.jpg b/tests/files/test.jpg new file mode 100644 index 0000000000000000000000000000000000000000..81aad162c58540981087ecdbdb60c38c04570c2b GIT binary patch literal 1415 zcmex=^(PF6}rMnOeST|r4lSw=>~TvNxu(8R<c1}I=;VrF4wW9Q)H;sz?% zD!{d!pzFb!U9xX3zTPI5o8roG<0MW4oqZMDikqloVbuf*=gfJ(V&YTRE(2~ znmD<{#3dx9RMpfqG__1j&CD$#!>#7W@h!Srz9y!;-SSpJ;iUc4$gMvfK3F}0 zWy5K{s&_qw@uu$Tf4o)t6YMRl_iD{g$mR&_+PUV-_L*P*b$>KIRJG^Pb>I2Ma-Cav zu1E78KDVU9>sQ)7j~@}Q=2>mLV?4R*H=E&cuZL^J-|n4Lw!zEp)%2pprh|3nxypSU0KO`rj{o$ z*SdaY?mCy5%jdb)9Q^HgdiU{<>fd$4zph`pXr9^B@VA1~zVKMCpJ8XaTK~vOqlb;# zdiwMBgr~{w%u1=;e9K!#Q+u7@uCRT3AL>RQp78S0D?_)=6-S>(znNgEeLSY*m8SCE zt-CixN4gzjX6y~CV4Zt)%lcxO6Fq#4=NN3YBhK!=D;c!1{IqF$(F|jbGzNL?^7D-c zY*#Ual*arsO_TZC^@92M>4`tCI*09=eL5xM<*YTce7}^Jtypc7u5qTgIaa!2*YggZja;GM zwiUmeb4~iThib;RN#VCP_bfhh_1w%0D;7#dyO%h6H@P`H+Re}O@*n@6ojc|9t}Nf& z>+s3N;m`5=4A)&3-OZe4rMEhlbxOHePs~A`RpzbF<>!l9a7v&SxF{J#kRW#V76 literal 0 HcmV?d00001 diff --git a/tests/test_dynamic_story.py b/tests/test_dynamic_story.py index adb3a725..26409bf3 100644 --- a/tests/test_dynamic_story.py +++ b/tests/test_dynamic_story.py @@ -1,6 +1,7 @@ from tale.base import Location from tale.coord import Coord from tale.llm.dynamic_story import DynamicStory +from tale.zone import Zone class TestDynamicStory(): From 1a04e6c6fcbb6a697d60dda181dfc325fe44196c Mon Sep 17 00:00:00 2001 From: rickard Date: Sun, 20 Oct 2024 09:03:31 +0200 Subject: [PATCH 5/6] commenting test --- tale/parse_utils.py | 2 +- tests/test_web_utils.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tale/parse_utils.py b/tale/parse_utils.py index c07b10f2..5854481b 100644 --- a/tale/parse_utils.py +++ b/tale/parse_utils.py @@ -296,7 +296,7 @@ def _insert(new_item: Item, locations, location: str): loc.insert(new_item, None) def remove_special_chars(message: str): - re.sub('[^A-Za-z0-9 .,_\-\'\"]+', '', message) + re.sub('[^A-Za-z0-9 .,_\'\"]+', '', message) return message def trim_response(message: str): diff --git a/tests/test_web_utils.py b/tests/test_web_utils.py index f499f477..ff687e66 100644 --- a/tests/test_web_utils.py +++ b/tests/test_web_utils.py @@ -12,8 +12,8 @@ def test_create_chat_container(self): assert '
      ' in result assert '
      Hello World!
      ' in result - def test_copy_single_image(self): - web_utils.web_resources_path = "tale/web" - copy_single_image("./tests/files", "test.jpg") - assert _check_file_exists("test.jpg") - web_utils.clear_resources() \ No newline at end of file + # def test_copy_single_image(self): + # web_utils.web_resources_path = "tale/web" + # copy_single_image("./tests/files", "test.jpg") + # assert _check_file_exists("test.jpg") + # web_utils.clear_resources() \ No newline at end of file From d8fc9070320e50e61bd034ca171abe1360793b14 Mon Sep 17 00:00:00 2001 From: rickard Date: Sun, 20 Oct 2024 19:29:46 +0200 Subject: [PATCH 6/6] one more test --- tests/test_story_builder.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/test_story_builder.py b/tests/test_story_builder.py index d8b63269..7a0419d9 100644 --- a/tests/test_story_builder.py +++ b/tests/test_story_builder.py @@ -7,7 +7,7 @@ from tale.llm.dynamic_story import DynamicStory from tale.llm.llm_utils import LlmUtil from tale.player import PlayerConnection -from tale.story import StoryConfig +from tale.story import StoryConfig, StoryContext from tale.story_builder import StoryBuilder, StoryInfo from tests.supportstuff import FakeIoUtil @@ -74,7 +74,9 @@ def test_apply_to_story(self): builder.story_info.world_mood = 3 builder.story_info.start_location = "on a small road outside a village" - responses = ['In the peaceful land of Whimsy, where the sun always shines bright and the creatures are forever young, a darkness has begun to stir beneath the surface. A great evil, born from the twisted imagination of the Mad Hatter, threatens to consume the entire realm. This malevolent force seeks to plunge Whimsy into eternal night, and all who oppose it must band together if they hope to save their home from destruction. The players\' village is the first to fall under attack, forcing them to embark on a perilous journey across this magical landscape to rally allies for the impending war against the shadowy foe. Along the way, they discover hidden secrets about the nature of their world and their own roles within its grand narrative, ultimately deciding the fate of an enchanted civilization hanging precariously upon their shoulders.', + story_background = "In the peaceful land of Whimsy, where the sun always shines bright and the creatures are forever young, a darkness has begun to stir beneath the surface. A great evil, born from the twisted imagination of the Mad Hatter, threatens to consume the entire realm. This malevolent force seeks to plunge Whimsy into eternal night, and all who oppose it must band together if they hope to save their home from destruction. The players\' village is the first to fall under attack, forcing them to embark on a perilous journey across this magical landscape to rally allies for the impending war against the shadowy foe. Along the way, they discover hidden secrets about the nature of their world and their own roles within its grand narrative, ultimately deciding the fate of an enchanted civilization hanging precariously upon their shoulders." + + responses = [story_background, '{"items":[{"name":"Enchanted Petals", "type":"Health", "value": 20, "description": "A handful of enchanted petals that can be used to heal wounds and cure ailments."}]}', '{"creatures": [ { "name": "Whimsy Woozle", "description": "A gentle, ethereal creature with a penchant for gardening and poetry. They tend to the area\'s lush fields and forests, filling the air with sweet melodies and the scent of blooming wildflowers. They are friendly and welcoming to all visitors.", "level": 1 }, { "name": "Lunar Lopster", "description": "A mysterious crustacean with an affinity for the moon\'s gentle light. They roam the area at night, their glowing shells lighting the way through the darkness. They are neutral towards visitors, but may offer cryptic advice or guidance to those who seek it.", "level": 2 }, { "name": "Shadow Stag", "description": "A sleek and elusive creature with a mischievous grin. They roam the area\'s forests, their dark forms blending into the shadows. They are hostile towards intruders, and will not hesitate to attack those who threaten their home.", "level": 3 }, { "name": "Moonflower", "description": "A rare and beautiful flower that blooms only under the light of the full moon. They can be found in the area\'s forests, and are said to have powerful healing properties. They are very friendly towards visitors, and will offer their petals to those who show kindness and respect.", "level": 4 }, { "name": "Moonstone", "description": "A rare and valuable mineral that can be found in the area\'s mountains. It glows with a soft, ethereal light, and is said to have powerful magical properties. It is highly sought after by collectors, and can be found in both the earth and the water.", "level": 5 }]}', '{ "name": "Moonlit Meadows", "description": "A serene and mystical area nestled between two great mountains, where the moon\'s gentle light illuminates the lush fields and forests. The air is filled with the sweet scent of blooming wildflowers, and the gentle chirping of nocturnal creatures can be heard in the distance. The area is home to a diverse array of magical creatures, including the Whimsy Woozle, the Lunar Lopster, and the Shadow Stag."}', @@ -105,6 +107,8 @@ def test_apply_to_story(self): assert(story.config.world_info == "a mix between alice in wonderland and the wizard of oz with a dark element") assert(story.config.world_mood == 3) assert(story.config.startlocation_player == "Moonlit Meadows.Greenhaven") + assert(isinstance(story.config.context, StoryContext)) + assert(story.config.context.to_json() == StoryContext(base_story=story_background).to_json()) quest = None assert(start_location.livings)