From fa5e7734eda1f25b4622f2a5a764535ff50b4576 Mon Sep 17 00:00:00 2001 From: Martin Pitt Date: Tue, 30 Jul 2024 21:46:59 +0200 Subject: [PATCH] WIP test: Port from CDP to BiDi browser automation This slightly changes the API of `Browser.keys()` with modifiers. We only use that once inside of testlib.py and twice in check-system-terminal (and nowhere in external projects), so just adjust the three callers. --- test/common/bidi.py | 398 ++++++++++++++++++++++++++++++ test/common/storagelib.py | 2 +- test/common/testlib.py | 371 +++++++++++++++++----------- test/verify/check-system-terminal | 4 +- 4 files changed, 629 insertions(+), 146 deletions(-) create mode 100644 test/common/bidi.py diff --git a/test/common/bidi.py b/test/common/bidi.py new file mode 100644 index 000000000000..a113b641c8db --- /dev/null +++ b/test/common/bidi.py @@ -0,0 +1,398 @@ +# This file is part of Cockpit. +# +# Copyright (C) 2024 Red Hat, Inc. +# +# Cockpit is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2.1 of the License, or +# (at your option) any later version. +# +# Cockpit is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Cockpit; If not, see . + +"""BiDi session/API driver + +This directly talks the https://w3c.github.io/webdriver-bidi/ protocol from async Python, +without any node/JS in between. The only dependencies are aiohttp, firefox, and/or +chromedriver+chromium (or headless_shell). +""" + +import asyncio +import json +import logging +import socket +import tempfile +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import aiohttp + +log_proto = logging.getLogger("bidi.proto") +log_command = logging.getLogger("bidi.command") + + +class WebdriverError(RuntimeError): + pass + + +class Error(RuntimeError): + def __init__(self, msg: str) -> None: + self.msg = msg + + def __str__(self) -> str: + return self.msg + + +@dataclass +class LogMessage: + level: str # like "info" + type: str # usually "console" + timestamp: int + args: list[object] + text: str + + def __init__(self, message_params): + self.level = message_params["level"] + self.type = message_params["type"] + self.timestamp = message_params["timestamp"] + self.args = message_params.get("args", []) + self.text = message_params["text"] + + def __str__(self): + return f"LogMessage: {self.type} {self.level} @{self.timestamp}: {self.text} {self.args}" + + +@dataclass +class BidiSession: + ws_url: str + session_url: str + process: asyncio.subprocess.Process + + +# Return port numbers that were free at the time of checking +# They might be in use again by the time the function returns... +def pick_ports(count: int) -> list[int]: + sockets: list[socket.socket] = [] + ports: list[int] = [] + + for _ in range(count): + sock = socket.socket() + sock.bind(('127.0.0.1', 0)) + sockets.append(sock) + ports.append(sock.getsockname()[1]) + + for s in sockets: + s.close() + + return ports + + +def jsquote(js: object) -> str: + return json.dumps(js) + + +class WebdriverBidi: + http_session: aiohttp.ClientSession + + def __init__(self, headless=False) -> None: + self.headless = headless + self.last_id = 0 + self.pending_commands: dict[int, asyncio.Future] = {} + self.logs: list[LogMessage] = [] + self.bidi_session: BidiSession | None = None + self.future_wait_page_load = None + self.top_context: str | None = None # top-level browsingContext + self.context: str | None # currently selected context (top or iframe) + + async def start_bidi_session(self) -> None: + raise NotImplementedError('must be implemented by concrete subclass') + + async def close_bidi_session(self) -> None: + raise NotImplementedError('must be implemented by concrete subclass') + + async def close(self) -> None: + assert self.bidi_session is not None + log_proto.debug("cleaning up webdriver") + + self.task_reader.cancel() + del self.task_reader + await self.ws.close() + await self.close_bidi_session() + self.bidi_session.process.terminate() + await self.bidi_session.process.wait() + self.bidi_session = None + await self.http_session.close() + + def ws_done_callback(self, future): + for fut in self.pending_commands.values(): + fut.set_exception(WebdriverError("websocket closed")) + if not future.cancelled(): + log_proto.error("ws_reader crashed: %r", future.result()) + + async def start_session(self) -> None: + self.http_session = aiohttp.ClientSession(raise_for_status=True) + await self.start_bidi_session() + assert self.bidi_session + self.ws = await self.http_session.ws_connect(self.bidi_session.ws_url) + self.task_reader = asyncio.create_task(self.ws_reader(self.ws), name="bidi_reader") + self.task_reader.add_done_callback(self.ws_done_callback) + + await self.bidi("session.subscribe", events=[ + "log.entryAdded", "browsingContext.domContentLoaded", + ]) + + # wait for browser to initialize default context + for _ in range(10): + realms = (await self.bidi("script.getRealms"))["realms"] + if len(realms) > 0: + self.top_context = realms[0]["context"] + self.context = self.top_context + break + await asyncio.sleep(0.5) + else: + raise WebdriverError("timed out waiting for default realm") + + async def __aenter__(self): + await self.start_session() + return self + + async def __aexit__(self, *_excinfo): + if self.bidi_session is not None: + await self.close() + + async def ws_reader(self, ws: aiohttp.client.ClientWebSocketResponse) -> None: + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + data = json.loads(msg.data) + log_proto.debug("ws TEXT → %r", data) + if "id" in data and data["id"] in self.pending_commands: + log_proto.debug("ws_reader: resolving pending command %i", data["id"]) + if data["type"] == "success": + result = data["result"] + if result.get("type") == "exception": + self.pending_commands[data["id"]].set_exception(Error(result["exceptionDetails"]["text"])) + else: + self.pending_commands[data["id"]].set_result(result) + else: + self.pending_commands[data["id"]].set_exception( + WebdriverError(f"{data['type']}: {data['message']}")) + del self.pending_commands[data["id"]] + continue + + if data["type"] == "event": + if data["method"] == "log.entryAdded": + log = LogMessage(data["params"]) + self.logs.append(log) + log_command.info(str(log)) + continue + if data["method"] == "browsingContext.domContentLoaded": + if self.future_wait_page_load: + log_command.debug("page loaded: %r, resolving wait page load future", data["params"]) + self.future_wait_page_load.set_result(data["params"]["url"]) + else: + log_command.debug("page loaded: %r (not awaited)", data["params"]) + continue + + log_proto.warning("ws_reader: unhandled message %r", data) + elif msg.type == aiohttp.WSMsgType.ERROR: + log_proto.error("BiDi failure: %s", msg) + break + + async def bidi(self, method, *, quiet: bool = False, **params) -> dict[str, Any]: + """Send a Webdriver BiDi command and return the JSON response""" + + payload = json.dumps({"id": self.last_id, "method": method, "params": params}) + log_command.info("← %s(%r)", method, 'quiet' if quiet else params) + await self.ws.send_str(payload) + future = asyncio.get_event_loop().create_future() + self.pending_commands[self.last_id] = future + self.last_id += 1 + # some calls can take very long (wait for condition); + # safe-guard timeout for avoiding eternally hanging tests + res = await asyncio.wait_for(future, timeout=60) + if not quiet: + log_command.info("→ %r", res) + return res + + # this is mostly unused; testlib uses ph_find() due to sizzle + async def locate(self, selector: str) -> str: + r = await self.bidi("browsingContext.locateNodes", context=self.context, + locator={"type": "css", "value": selector}) + nodes = r["nodes"] + if len(nodes) == 0: + raise Error(f"no element found for {selector}") + if len(nodes) > 1: + raise Error(f"selector {selector} is ambiguous: {nodes}") + log_command.info("locate(%s) = %r", selector, nodes[0]) + return nodes[0] + + async def switch_to_frame(self, name: str) -> None: + frame = await self.locate(f"iframe[name='{name}']") + cw = await self.bidi("script.callFunction", + functionDeclaration="f => f.contentWindow", + arguments=[frame], + awaitPromise=False, + target={"context": self.top_context}) + self.context = cw["result"]["value"]["context"] + log_command.info("← switch_to_frame(%s)", name) + + def switch_to_top(self) -> None: + self.context = self.top_context + log_command.info("← switch_to_top") + + +class ChromiumBidi(WebdriverBidi): + def __init__(self, headless=False) -> None: + super().__init__(headless) + self.cdp_ws: aiohttp.client.ClientWebSocketResponse | None = None + + async def start_bidi_session(self) -> None: + assert self.bidi_session is None + + chrome_binary = "/usr/lib64/chromium-browser/headless_shell" if self.headless else "/usr/bin/chromium-browser" + + session_args = {"capabilities": { + "alwaysMatch": { + "webSocketUrl": True, + "goog:chromeOptions": {"binary": chrome_binary}, + } + }} + + [webdriver_port] = pick_ports(1) + driver = await asyncio.create_subprocess_exec("chromedriver", "--port=" + str(webdriver_port)) + + wd_url = f"http://localhost:{webdriver_port}" + + # webdriver needs some time to launch + for retry in range(1, 10): + try: + async with self.http_session.post(f"{wd_url}/session", + data=json.dumps(session_args).encode()) as resp: + session_info = json.loads(await resp.text())["value"] + log_proto.debug("webdriver session request: %r %r", resp, session_info) + break + except (IOError, aiohttp.client.ClientResponseError) as e: + log_proto.debug("waiting for webdriver: %s", e) + await asyncio.sleep(0.1 * retry) + else: + raise WebdriverError("could not connect to chromedriver") + + self.cdp_address = session_info["capabilities"]["goog:chromeOptions"]["debuggerAddress"] + self.last_cdp_id = 0 + + self.bidi_session = BidiSession( + session_url=f"{wd_url}/session/{session_info['sessionId']}", + ws_url=session_info["capabilities"]["webSocketUrl"], + process=driver) + log_proto.debug("Established chromium session %r, CDP address %s", self.bidi_session, self.cdp_address) + + async def close_cdp_session(self) -> None: + if self.cdp_ws is not None: + await self.cdp_ws.close() + self.cdp_ws = None + + async def close_bidi_session(self) -> None: + assert self.bidi_session is not None + await self.close_cdp_session() + await self.http_session.delete(self.bidi_session.session_url) + + async def cdp(self, method, **params) -> dict[str, Any]: + """Send a Chrome DevTools command and return the JSON response + + This is currently *not* safe for enabling events! These should be handled via BiDi, + this is only an escape hatch for CDP specific functionality such as Profiler. + """ + if self.cdp_ws is None: + # unfortunately we have to hold on to the open ws after sending .enable() commands, + # otherwise they'll reset when closing and re-opening + self.cdp_ws = await self.http_session.ws_connect(f"ws://{self.cdp_address}/devtools/page/{self.top_context}") + + reply = None + payload = json.dumps({"id": self.last_cdp_id, "method": method, "params": params}) + log_proto.debug("CDP ← %r", payload) + await self.cdp_ws.send_str(payload) + async for msg in self.cdp_ws: + if msg.type == aiohttp.WSMsgType.TEXT: + reply = json.loads(msg.data) + if reply.get("id") == self.last_cdp_id: + break + else: + log_proto.debug("CDP message: %r", reply) + else: + log_proto.debug("CDP non-text message: %r", msg) + assert reply + log_proto.debug("CDP → %r", reply) + self.last_cdp_id += 1 + return reply + + +# We could do this with https://github.com/mozilla/geckodriver/releases with a similar protocol as ChromeBidi +# But let's use https://firefox-source-docs.mozilla.org/testing/marionette/Protocol.html directly, fewer moving parts +class FirefoxBidi(WebdriverBidi): + async def start_bidi_session(self) -> None: + [marionette_port, bidi_port] = pick_ports(2) + + self.homedir = tempfile.TemporaryDirectory(prefix="firefox-home-") + (Path(self.homedir.name) / 'download').mkdir() + self.profiledir = Path(self.homedir.name) / "profile" + self.profiledir.mkdir() + (self.profiledir / "user.js").write_text(f""" + user_pref("remote.enabled", true); + user_pref("remote.frames.enabled", true); + user_pref("app.update.auto", false); + user_pref("datareporting.policy.dataSubmissionEnabled", false); + user_pref("toolkit.telemetry.reportingpolicy.firstRun", false); + user_pref("dom.disable_beforeunload", true); + user_pref("browser.download.dir", "{self.homedir}/download"); + user_pref("browser.download.folderList", 2); + user_pref("signon.rememberSignons", false); + user_pref("dom.navigation.locationChangeRateLimit.count", 9999); + // HACK: https://bugzilla.mozilla.org/show_bug.cgi?id=1746154 + user_pref("fission.webContentIsolationStrategy", 0); + user_pref("fission.bfcacheInParent", false); + user_pref('marionette.port', {marionette_port}); + """) + + driver = await asyncio.create_subprocess_exec( + "firefox", "-profile", str(self.profiledir), "--marionette", "--no-remote", + f"--remote-debugging-port={bidi_port}", + *(["-headless"] if self.headless else []), "about:blank") + + # needs some time to launch + for _ in range(1, 30): + try: + # we must keep this socket open throughout the lifetime of that session + reader, self.writer_marionette = await asyncio.open_connection("127.0.0.1", marionette_port) + break + except ConnectionRefusedError as e: + log_proto.debug("waiting for firefox marionette: %s", e) + await asyncio.sleep(1) + else: + raise WebdriverError("could not connect to firefox marionette") + + reply = await reader.read(1024) + if b'"marionetteProtocol":3' not in reply: + raise WebdriverError(f"unexpected marionette reply: {reply.decode()}") + cmd = '[0,1,"WebDriver:NewSession",{"webSocketUrl":true}]' + self.writer_marionette.write(f"{len(cmd)}:{cmd}".encode()) + await self.writer_marionette.drain() + reply = await reader.read(1024) + # cut off length prefix + reply = json.loads(reply[reply.index(b":") + 1:].decode()) + if not isinstance(reply, list) or len(reply) != 4 or not isinstance(reply[3], dict): + raise WebdriverError(f"unexpected marionette session request reply: {reply!r}") + log_proto.debug("marionette session request reply: %s", reply) + + url = reply[3]["capabilities"]["webSocketUrl"] + self.bidi_session = BidiSession(session_url=url, ws_url=url, process=driver) + log_proto.debug("Established firefox session %r", self.bidi_session) + + async def close_bidi_session(self) -> None: + self.writer_marionette.close() + await self.writer_marionette.wait_closed() diff --git a/test/common/storagelib.py b/test/common/storagelib.py index 0ba9b4d13b48..71ea782f7137 100644 --- a/test/common/storagelib.py +++ b/test/common/storagelib.py @@ -241,7 +241,7 @@ def dialog_cancel(self): def dialog_wait_close(self): # file system operations often take longer than 10s - with self.browser.wait_timeout(max(self.browser.cdp.timeout, 60)): + with self.browser.wait_timeout(max(self.browser.timeout, 60)): self.browser.wait_not_present('#dialog') def dialog_check(self, expect): diff --git a/test/common/testlib.py b/test/common/testlib.py index 60bb2527fba3..61ba58fea309 100644 --- a/test/common/testlib.py +++ b/test/common/testlib.py @@ -18,6 +18,7 @@ """Tools for writing Cockpit test cases.""" import argparse +import asyncio import base64 import contextlib import errno @@ -25,6 +26,7 @@ import glob import io import json +import logging import os import re import shutil @@ -32,13 +34,15 @@ import subprocess import sys import tempfile +import threading import time import traceback import unittest from collections.abc import Collection, Container, Iterator, Mapping, Sequence +from pathlib import Path from typing import Any, Callable, ClassVar, Literal, Never, TypedDict, TypeVar -import cdp +import bidi from lcov import write_lcov from lib.constants import OSTREE_IMAGES from machine import testvm @@ -47,6 +51,8 @@ _T = TypeVar('_T') _FT = TypeVar("_FT", bound=Callable[..., Any]) +JsonObject = dict[str, Any] + BASE_DIR = os.path.realpath(f'{__file__}/../../..') TEST_DIR = f'{BASE_DIR}/test' BOTS_DIR = f'{BASE_DIR}/bots' @@ -92,6 +98,25 @@ opts.coverage = False +# https://w3c.github.io/webdriver/#keyboard-actions for encoding key names +WEBDRIVER_KEYS = { + "Backspace": "\uE003", + "Tab": "\uE004", + "Return": "\uE006", + "Enter": "\uE007", + "Shift": "\uE008", + "Control": "\uE009", + "Alt": "\uE00A", + "Escape": "\uE00C", + "ArrowLeft": "\uE012", + "ArrowUp": "\uE013", + "ArrowRight": "\uE014", + "ArrowDown": "\uE015", + "Insert": "\uE016", + "Delete": "\uE017", +} + + # Browser layouts # # A browser can be switched into a number of different layouts, such @@ -185,6 +210,7 @@ def unique_filename(base: str, ext: str) -> str: class Browser: + driver: bidi.WebdriverBidi layouts: Sequence[BrowserLayout] current_layout: BrowserLayout | None port: str | int @@ -211,34 +237,79 @@ def __init__( self.used_pixel_references = set[str]() self.coverage_label = coverage_label self.machine = machine - path = os.path.dirname(__file__) - sizzle_js = os.path.join(path, "../../node_modules/sizzle/dist/sizzle.js") - helpers = [os.path.join(path, "test-functions.js")] - if os.path.exists(sizzle_js): - helpers.append(sizzle_js) - self.cdp = cdp.CDP("C.utf8", verbose=opts.trace, trace=opts.trace, - inject_helpers=helpers, - start_profile=coverage_label is not None) + + headless = not bool(os.environ.get("TEST_SHOW_BROWSER", "")) + browser = os.environ.get("TEST_BROWSER", "chromium") + if browser == "chromium": + self.driver = bidi.ChromiumBidi(headless=headless) + elif browser == "firefox": + self.driver = bidi.FirefoxBidi(headless=headless) + else: + raise ValueError(f"unknown browser {browser}") + self.loop = asyncio.new_event_loop() + self.bidi_thread = threading.Thread(target=self.asyncio_loop_thread, args=(self.loop,)) + self.bidi_thread.start() + + asyncio.run_coroutine_threadsafe(self.driver.start_session(), self.loop).result() + + if opts.trace: + logging.basicConfig(level=logging.INFO) + bidi.log_command.setLevel(logging.INFO if opts.trace else logging.WARNING) + + test_functions = (Path(__file__).parent / "test-functions.js").read_text() + self.bidi("script.addPreloadScript", quiet=True, functionDeclaration=f"() => {{ {test_functions} }}") + + try: + sizzle_js = (Path(__file__).parent.parent.parent / "node_modules/sizzle/dist/sizzle.js").read_text() + # HACK: injecting sizzle fails on missing `document` in assert() + sizzle_js = sizzle_js.replace('function assert( fn ) {', 'function assert( fn ) { if (true) return true; else ') + # HACK: sizzle tracks document and when we switch frames, it sees the old document + # although we execute it in different context. + sizzle_js = sizzle_js.replace('context = context || document;', 'context = context || window.document;') + self.bidi("script.addPreloadScript", quiet=True, functionDeclaration=f"() => {{ {sizzle_js} }}") + except FileNotFoundError: + pass + self.password = "foobar" self.timeout_factor = int(os.getenv("TEST_TIMEOUT_FACTOR", "1")) + self.timeout = 15 self.failed_pixel_tests = 0 self.allow_oops = False - self.body_clip = None try: with open(f'{TEST_DIR}/browser-layouts.json') as fp: self.layouts = json.load(fp) except FileNotFoundError: self.layouts = default_layouts - # Firefox CDP does not support setting EmulatedMedia - # https://bugzilla.mozilla.org/show_bug.cgi?id=1549434 - if self.cdp.browser.name != "chromium": - self.layouts = [layout for layout in self.layouts if layout["theme"] != "dark"] self.current_layout = None + self.valid = True + + @staticmethod + def asyncio_loop_thread(loop: asyncio.AbstractEventLoop) -> None: + asyncio.set_event_loop(loop) + loop.run_forever() + + def kill(self) -> None: + asyncio.run_coroutine_threadsafe(self.driver.close(), self.loop).result() + self.loop.call_soon_threadsafe(self.loop.stop) + self.bidi_thread.join() + self.valid = False + + def bidi(self, method: str, **params: Any) -> JsonObject: + """Send a Webdriver BiDi command and return the JSON response""" + + return asyncio.run_coroutine_threadsafe(self.driver.bidi(method, **params), self.loop).result() + + def cdp_command(self, method: str, **params: Any) -> JsonObject: + """Send a Chrome DevTools Protocol command and return the JSON response""" + + if isinstance(self.driver, bidi.ChromiumBidi): + return asyncio.run_coroutine_threadsafe(self.driver.cdp(method, **params), self.loop).result() + else: + raise bidi.WebdriverError("CDP is only supported in Chromium") def allow_download(self) -> None: """Allow browser downloads""" - if self.cdp.browser.name == "chromium": - self.cdp.invoke("Page.setDownloadBehavior", behavior="allow", downloadPath=self.cdp.download_dir) + raise NotImplementedError def open(self, href: str, cookie: Mapping[str, str] | None = None, tls: bool = False) -> None: """Load a page into the browser. @@ -259,14 +330,14 @@ def open(self, href: str, cookie: Mapping[str, str] | None = None, tls: bool = F size = self.current_layout["shell_size"] self._set_window_size(size[0], size[1]) if cookie: - self.cdp.invoke("Network.setCookie", **cookie) + raise NotImplementedError self.switch_to_top() # Some browsers optimize this away if the current URL is already href # (e.g. in TestKeys.testAuthorizedKeys). Load the blank page first to always # force a load. - self.cdp.invoke("Page.navigate", url="about:blank") - self.cdp.invoke("Page.navigate", url=href) + self.bidi("browsingContext.navigate", context=self.driver.context, url="about:blank", wait="complete") + self.bidi("browsingContext.navigate", context=self.driver.context, url=href, wait="complete") def set_user_agent(self, ua: str) -> None: """Set the user agent of the browser @@ -274,7 +345,7 @@ def set_user_agent(self, ua: str) -> None: :param ua: user agent string :type ua: str """ - self.cdp.invoke("Emulation.setUserAgentOverride", userAgent=ua) + raise NotImplementedError def reload(self, ignore_cache: bool = False) -> None: """Reload the current page @@ -285,7 +356,7 @@ def reload(self, ignore_cache: bool = False) -> None: self.switch_to_top() self.wait_js_cond("window.ph_select('iframe.container-frame').every(e => e.getAttribute('data-loaded'))") - self.cdp.invoke("Page.reload", ignoreCache=ignore_cache) + self.bidi("browsingContext.reload", context=self.driver.context, ignoreCache=ignore_cache, wait="complete") self.machine.allow_restart_journal_messages() @@ -297,14 +368,17 @@ def switch_to_frame(self, name: str | None) -> None: :param name: frame name """ - self.cdp.set_frame(name) + if name is None: + self.switch_to_top() + else: + asyncio.run_coroutine_threadsafe(self.driver.switch_to_frame(name), self.loop).result() def switch_to_top(self) -> None: """Switch to the main frame Switch to the main frame from for example an iframe. """ - self.cdp.set_frame(None) + self.driver.switch_to_top() def upload_files(self, selector: str, files: Sequence[str]) -> None: """Upload a local file to the browser @@ -312,9 +386,7 @@ def upload_files(self, selector: str, files: Sequence[str]) -> None: The selector should select the element. Files is a list of absolute paths to files which should be uploaded. """ - r = self.cdp.invoke("Runtime.evaluate", expression='document.querySelector(%s)' % jsquote(selector)) - objectId = r["result"]["objectId"] - self.cdp.invoke("DOM.setFileInputFiles", files=files, objectId=objectId) + raise NotImplementedError def raise_cdp_exception( self, func: str, arg: str, details: Mapping[str, Any], trailer: str | None = None @@ -336,8 +408,10 @@ def inject_js(self, code: str) -> None: :param code: a string containing JavaScript code :type code: str """ - self.cdp.invoke("Runtime.evaluate", expression=code, trace=code, - silent=False, awaitPromise=True, returnByValue=False, no_trace=True) + raise NotImplementedError + # this feels redundant -- could this just merge with eval_js? there's no returnByValue in bidi + # self.cdp.invoke("Runtime.evaluate", expression=code, trace=code, + # silent=False, awaitPromise=True, returnByValue=False, no_trace=True) def eval_js(self, code: str, no_trace: bool = False) -> Any: """Execute JS code that returns something @@ -345,13 +419,12 @@ def eval_js(self, code: str, no_trace: bool = False) -> Any: :param code: a string containing JavaScript code :param no_trace: do not print information about unknown return values (default False) """ - result = self.cdp.invoke("Runtime.evaluate", expression=code, trace=code, - silent=False, awaitPromise=True, returnByValue=True, no_trace=no_trace) - if "exceptionDetails" in result: - self.raise_cdp_exception("eval_js", code, result["exceptionDetails"]) + result = self.bidi("script.evaluate", expression=code, + awaitPromise=True, target={"context": self.driver.context}) _type = result.get("result", {}).get("type") - if _type == 'object' and result["result"].get("subtype", "") == "error": - raise Error(result["result"]["description"]) + # TODO: do we need this? + # if _type == 'object' and result["result"].get("subtype", "") == "error": + # raise Error(result["result"]["description"]) if _type == "undefined": return None if _type and "value" in result["result"]: @@ -391,12 +464,7 @@ def cookie(self, name: str) -> Mapping[str, object] | None: :param name: the name of the cookie :type name: str """ - cookies = self.cdp.invoke("Network.getCookies") - for c in cookies["cookies"]: - assert isinstance(c, Mapping) - if c["name"] == name: - return c - return None + raise NotImplementedError def go(self, url_hash: str) -> None: self.call_js_func('window.ph_go', url_hash) @@ -427,14 +495,47 @@ def mouse( :param metaKey: press the meta key """ self.wait_visible(selector) - self.call_js_func('window.ph_mouse', selector, event, x, y, btn, ctrlKey, shiftKey, altKey, metaKey) + self.bidi("script.evaluate", expression=f"window.ph_find({jsquote(selector)}).scrollIntoViewIfNeeded()", + awaitPromise=False, target={"context": self.driver.context}) + + # HACK: Chromium mis-clicks to wrong position with iframes; use our old "synthesize MouseEvent" approach + # TODO: file/find bug + if isinstance(self.driver, bidi.ChromiumBidi): + self.call_js_func('window.ph_mouse', selector, event, x, y, btn, ctrlKey, shiftKey, altKey, metaKey) + return + + element = self.bidi("script.evaluate", expression=f"window.ph_find({jsquote(selector)})", + awaitPromise=False, target={"context": self.driver.context})["result"] + + actions = [{"type": "pointerMove", "x": 0, "y": 0, "origin": {"type": "element", "element": element}}] + down = {"type": "pointerDown", "button": btn} + up = {"type": "pointerUp", "button": btn} + if event == "click": + actions.extend([down, up]) + elif event == "dblclick": + actions.extend([down, up, down, up]) + elif event == "mouseeenter": + pass + else: + raise NotImplementedError(f"unknown event {event}") + if ctrlKey or shiftKey or altKey or metaKey: + raise NotImplementedError("mouse with modifier keys") + + self.bidi("input.performActions", context=self.driver.context, actions=[ + { + "id": f"pointer-{self.driver.last_id}", + "type": "pointer", + "parameters": {"pointerType": "mouse"}, + "actions": actions, + } + ]) def click(self, selector: str) -> None: """Click on a ui element :param selector: the selector to click on """ - self.mouse(selector + ":not([disabled]):not([aria-disabled=true])", "click", 0, 0, 0) + self.mouse(selector + ":not([disabled]):not([aria-disabled=true])", "click") def val(self, selector: str) -> Any: """Get the value attribute of a selector. @@ -497,8 +598,10 @@ def set_checked(self, selector: str, val: bool) -> None: :param selector: the selector :param val: boolean value to enable or disable checkbox """ - self.wait_visible(selector + ':not([disabled]):not([aria-disabled=true])') - self.call_js_func('window.ph_set_checked', selector, val) + # aavoid ph_set_checked, that doesn't use proper mouse emulation + checked = self.get_checked(selector) + if checked != val: + self.click(selector) def focus(self, selector: str) -> None: """Set focus on selected element. @@ -517,43 +620,37 @@ def blur(self, selector: str) -> None: self.call_js_func('window.ph_blur', selector) def input_text(self, text: str) -> None: - for char in text: - if char == "\n": - self.key("Enter") - else: - self.cdp.invoke("Input.dispatchKeyEvent", type="keyDown", text=char, key=char) - self.cdp.invoke("Input.dispatchKeyEvent", type="keyUp", text=char, key=char) - - def key(self, name: str, repeat: int = 1, modifiers: int = 0) -> None: + actions = [] + for c in text: + actions.append({"type": "keyDown", "value": c}) + actions.append({"type": "keyUp", "value": c}) + self.bidi("input.performActions", context=self.driver.context, actions=[ + {"type": "key", "id": "key-0", "actions": actions}]) + + def key(self, name: str, repeat: int = 1, modifiers: list[str] | None = None) -> None: """Press and release a named keyboard key. Use this function to input special characters or modifiers. - :param name: key name like "Enter", "Delete", or "ArrowLeft" + :param name: ASCII value or key name like "Enter", "Delete", or "ArrowLeft" (entry in WEBDRIVER_KEYS) :param repeat: number of times to repeat this key (default 1) - :param modifiers: bit field: Alt=1, Ctrl=2, Meta/Command=4, Shift=8 + :param modifiers: "Shift", "Control", "Alt" """ - args: dict[str, int | str] = {} - if self.cdp.browser.name == "chromium": - # HACK: chromium doesn't understand some key codes in some situations - win_keys = { - "Backspace": 8, - "Enter": 13, - "Escape": 27, - "ArrowDown": 40, - "Insert": 45, - } - if name in win_keys: - args["windowsVirtualKeyCode"] = win_keys[name] - if name == 'Enter': - args["text"] = '\r' - # HACK: chromium needs windowsVirtualKeyCode with modifiers - elif len(name) == 1 and name.isalnum() and modifiers != 0: - args["windowsVirtualKeyCode"] = ord(name.upper()) + actions = [] + actions_pre = [] + actions_post = [] + keycode = WEBDRIVER_KEYS.get(name, name) + + for m in (modifiers or []): + actions_pre.append({"type": "keyDown", "value": WEBDRIVER_KEYS[m]}) + actions_post.append({"type": "keyUp", "value": WEBDRIVER_KEYS[m]}) for _ in range(repeat): - self.cdp.invoke("Input.dispatchKeyEvent", type="keyDown", key=name, modifiers=modifiers, **args) - self.cdp.invoke("Input.dispatchKeyEvent", type="keyUp", key=name, modifiers=modifiers, **args) + actions.append({"type": "keyDown", "value": keycode}) + actions.append({"type": "keyUp", "value": keycode}) + + self.bidi("input.performActions", context=self.driver.context, actions=[ + {"type": "key", "id": "key-0", "actions": actions_pre + actions + actions_post}]) def select_from_dropdown(self, selector: str, value: object) -> None: """For an actual