diff --git a/Makefile.am b/Makefile.am
index 183c093085fd..230dfb918505 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -85,15 +85,12 @@ clean-local::
find $(builddir) -name '*.gc??' -delete
find $(srcdir) -name '*.pyc' -delete
-# required for running unit and integration tests; commander and ws are deps of chrome-remote-interface
+# required for running integration tests
node_modules/%: $(srcdir)/package-lock.json
@true
EXTRA_DIST += \
- node_modules/chrome-remote-interface \
- node_modules/commander \
node_modules/sizzle \
- node_modules/ws \
$(NULL)
check: export VERBOSE=1
diff --git a/pyproject.toml b/pyproject.toml
index 3a75f3c77d94..e9b00ff62166 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -66,6 +66,7 @@ module = [
# test/common
'cdp',
'testlib',
+ 'webdriver_bidi',
]
[tool.pylint]
diff --git a/test/common/cdp.py b/test/common/cdp.py
index 19a52ef8e4f3..143fcce47534 100644
--- a/test/common/cdp.py
+++ b/test/common/cdp.py
@@ -58,7 +58,7 @@ def path(self, show_browser: bool) -> str:
@abc.abstractmethod
def cmd(
- self, cdp_port: int, env: Mapping[str, str], show_browser: bool, browser_home: str, download_dir: str
+ self, cdp_port: int, env: Mapping[str, str], show_browser: bool, download_dir: str
) -> Sequence[str]:
pass
@@ -96,7 +96,7 @@ def _path(self, show_browser: bool) -> str | None:
@override
def cmd(
- self, cdp_port: int, env: Mapping[str, str], show_browser: bool, browser_home: str, download_dir: str
+ self, cdp_port: int, env: Mapping[str, str], show_browser: bool, download_dir: str
) -> Sequence[str]:
exe = self.path(show_browser)
@@ -110,65 +110,6 @@ def cmd(
"--v=0", f"--remote-debugging-port={cdp_port}", "about:blank"]
-class Firefox(Browser):
- NAME = "firefox"
- EXECUTABLES = ["firefox-developer-edition", "firefox-nightly", "firefox"]
- CDP_DRIVER_FILENAME = f"{TEST_DIR}/common/firefox-cdp-driver.js"
-
- @override
- def _path(self, show_browser: bool) -> str | None:
- """Return path to Firefox browser."""
- return self.find_exe()
-
- @override
- def cmd(
- self, cdp_port: int, env: Mapping[str, str], show_browser: bool, browser_home: str, download_dir: str
- ) -> Sequence[str]:
- exe = self.path(show_browser)
-
- subprocess.check_call([exe, "--headless", "--no-remote", "-CreateProfile", "blank"], env=env)
- profile = glob.glob(os.path.join(browser_home, ".mozilla/firefox/*.blank"))[0]
-
- with open(os.path.join(profile, "user.js"), "w") as f:
- f.write(f"""
- user_pref("remote.enabled", true);
- user_pref("remote.frames.enabled", true);
- user_pref("app.update.auto", false);
- user_pref("datareporting.policy.dataSubmissionEnabled", false);
- user_pref("toolkit.telemetry.reportingpolicy.firstRun", false);
- user_pref("dom.disable_beforeunload", true);
- user_pref("browser.download.dir", "{download_dir}");
- user_pref("browser.download.folderList", 2);
- user_pref("signon.rememberSignons", false);
- user_pref("dom.navigation.locationChangeRateLimit.count", 9999);
- // HACK: https://bugzilla.mozilla.org/show_bug.cgi?id=1746154
- user_pref("fission.webContentIsolationStrategy", 0);
- user_pref("fission.bfcacheInParent", false);
- """)
-
- with open(os.path.join(profile, "handlers.json"), "w") as f:
- f.write('{'
- '"defaultHandlersVersion":{"en-US":4},'
- '"mimeTypes":{"application/xz":{"action":0,"extensions":["xz"]}}'
- '}')
-
- cmd = [exe, "-P", "blank", f"--remote-debugging-port={cdp_port}", "--no-remote", "localhost"]
- if not show_browser:
- cmd.insert(3, "--headless")
- return cmd
-
-
-def get_browser(browser: str) -> Browser:
- browser_classes = [
- Chromium,
- Firefox,
- ]
- for cls in browser_classes:
- if browser == cls.NAME:
- return cls()
- raise SystemError(f"Unsupported browser: {browser}")
-
-
def jsquote(obj: object) -> str:
return json.dumps(obj)
@@ -195,7 +136,7 @@ def __init__(
self.trace = trace
self.inject_helpers = inject_helpers
self.start_profile = start_profile
- self.browser = get_browser(os.environ.get("TEST_BROWSER", "chromium"))
+ self.browser = Chromium()
self.show_browser = bool(os.environ.get("TEST_SHOW_BROWSER", ""))
self.download_dir = tempfile.mkdtemp()
self._driver = None
@@ -322,8 +263,7 @@ def start(self) -> None:
except KeyError:
pass
- cmd = self.browser.cmd(cdp_port, environ, self.show_browser,
- self._browser_home, self.download_dir)
+ cmd = self.browser.cmd(cdp_port, environ, self.show_browser, self.download_dir)
# sandboxing does not work in Docker container
self._browser = subprocess.Popen(
diff --git a/test/common/firefox-cdp-driver.js b/test/common/firefox-cdp-driver.js
deleted file mode 100755
index 1d7d78291b39..000000000000
--- a/test/common/firefox-cdp-driver.js
+++ /dev/null
@@ -1,397 +0,0 @@
-#!/usr/bin/env node
-
-/*
- * This file is part of Cockpit.
- *
- * Copyright (C) 2019 Red Hat, Inc.
- *
- * Cockpit is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or
- * (at your option) any later version.
- *
- * Cockpit is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with Cockpit; If not, see .
- */
-
-/* firefox-cdp-driver -- A command-line JSON input/output wrapper around
- * chrome-remote-interface (Chrome Debug Protocol).
- * See https://chromedevtools.github.io/devtools-protocol/
- * This needs support for protocol version 1.3.
- *
- * Set $TEST_CDP_DEBUG environment variable to enable additional
- * frame/execution context debugging.
- */
-
-import * as readline from 'readline';
-import CDP from 'chrome-remote-interface';
-
-let enable_debug = false;
-
-function debug(msg) {
- if (enable_debug)
- process.stderr.write("CDP: " + msg + "\n");
-}
-
-/**
- * Format response to the client
- */
-
-function fatal() {
- console.error.apply(console.error, arguments);
- process.exit(1);
-}
-
-// We keep sequence numbers so that we never get the protocol out of
-// synch with re-ordered or duplicate replies. This only matters for
-// duplicate replies due to destroyed contexts, but that is already so
-// hairy that this big hammer seems necessary.
-
-let cur_cmd_seq = 0;
-let next_reply_seq = 1;
-
-function fail(seq, err) {
- if (seq != next_reply_seq)
- return;
- next_reply_seq++;
-
- if (typeof err === 'undefined')
- err = null;
- process.stdout.write(JSON.stringify({ error: err }) + '\n');
-}
-
-function success(seq, result) {
- if (seq != next_reply_seq)
- return;
- next_reply_seq++;
-
- if (typeof result === 'undefined')
- result = null;
- process.stdout.write(JSON.stringify({ result }) + '\n');
-}
-
-/**
- * Record console.*() calls and Log messages so that we can forward them to
- * stderr and dump them on test failure
- */
-const messages = [];
-let logPromiseResolver;
-let nReportedLogMessages = 0;
-const unhandledExceptions = [];
-
-function clearExceptions() {
- unhandledExceptions.length = 0;
- return Promise.resolve();
-}
-
-function stringifyConsoleArg(arg) {
- if (arg.type === 'string')
- return arg.value;
- if (arg.type === 'object')
- return JSON.stringify(arg.value);
- return JSON.stringify(arg);
-}
-
-function setupLogging(client) {
- client.Runtime.enable();
-
- client.Runtime.consoleAPICalled(info => {
- const msg = info.args.map(stringifyConsoleArg).join(" ");
- messages.push([info.type, msg]);
- process.stderr.write("> " + info.type + ": " + msg + "\n");
-
- resolveLogPromise();
- });
-
- function processException(info) {
- let details = info.exceptionDetails;
- if (details.exception)
- details = details.exception;
-
- // don't log test timeouts, they already get handled
- if (details.className === "PhWaitCondTimeout")
- return;
-
- process.stderr.write(details.description || details.text || JSON.stringify(details) + "\n");
-
- unhandledExceptions.push(details.message ||
- details.description ||
- details.value ||
- JSON.stringify(details));
- }
-
- client.Runtime.exceptionThrown(info => processException(info));
-
- client.Log.enable();
- client.Log.entryAdded(entry => {
- // HACK: Firefox does not implement `Runtime.exceptionThrown` but logs it
- // Lets parse it to have at least some basic check that code did not throw
- // exception
- // https://bugzilla.mozilla.org/show_bug.cgi?id=1549528
-
- const msg = entry.entry;
- let text = msg.text;
- if (typeof text !== "string")
- if (text[0] && typeof text[0] === "string")
- text = text[0];
-
- if (msg.stackTrace !== undefined &&
- typeof text === "string" &&
- text.indexOf("Error: ") !== -1) {
- const trace = text.split(": ", 1);
- processException({
- exceptionDetails: {
- exception: {
- className: trace[0],
- message: trace.length > 1 ? trace[1] : "",
- stacktrace: msg.stackTrace,
- entry: msg,
- },
- }
- });
- } else {
- messages.push(["cdp", msg]);
- /* Ignore authentication failure log lines that don't denote failures */
- if (!(msg.url || "").endsWith("/login") || (text || "").indexOf("401") === -1) {
- process.stderr.write("CDP: " + JSON.stringify(msg) + "\n");
- }
- resolveLogPromise();
- }
- });
-}
-
-/**
- * Resolve the log promise created with waitLog().
- */
-function resolveLogPromise() {
- if (logPromiseResolver) {
- logPromiseResolver(messages.slice(nReportedLogMessages));
- nReportedLogMessages = messages.length;
- logPromiseResolver = undefined;
- }
-}
-
-/**
- * Returns a promise that resolves when log messages are available. If there
- * are already some unreported ones in the global messages variable, resolves
- * immediately.
- *
- * Only one such promise can be active at a given time. Once the promise is
- * resolved, this function can be called again to wait for further messages.
- */
-function waitLog() { // eslint-disable-line no-unused-vars
- console.assert(logPromiseResolver === undefined);
-
- return new Promise((resolve, reject) => {
- logPromiseResolver = resolve;
-
- if (nReportedLogMessages < messages.length)
- resolveLogPromise();
- });
-}
-
-/**
- * Frame tracking
- *
- * For tests to be able to select the current frame (by its name) and make
- * subsequent queries apply to that, we need to track frame name → frameId →
- * executionContextId. Frame and context IDs can even change through page
- * operations (e. g. in systemd/logs.js when reporting a crash is complete),
- * so we also need a helper function to explicitly wait for a particular frame
- * to load. This is very laborious, see this issue for discussing improvements:
- * https://github.com/ChromeDevTools/devtools-protocol/issues/72
- */
-const scriptsOnNewContext = [];
-const frameIdToContextId = {};
-const frameNameToFrameId = {};
-
-let pageLoadHandler = null;
-let currentExecId = null;
-
-function setupFrameTracking(client) {
- client.Page.enable();
-
- // map frame names to frame IDs; root frame has no name, no need to track that
- client.Page.frameNavigated(info => {
- if (info.frame?.url?.startsWith("about:")) {
- debug("frameNavigated: ignoring about: frame " + JSON.stringify(info));
- return;
- }
- debug("frameNavigated " + JSON.stringify(info));
- frameNameToFrameId[info.frame.name || "cockpit1"] = info.frame.id;
- });
-
- client.Page.loadEventFired(() => {
- if (pageLoadHandler) {
- debug("loadEventFired, calling pageLoadHandler");
- pageLoadHandler();
- } else {
- debug("loadEventFired, but no pageLoadHandler");
- }
- });
-
- // track execution contexts so that we can map between context and frame IDs
- client.Runtime.executionContextCreated(info => {
- debug("executionContextCreated " + JSON.stringify(info));
- frameIdToContextId[info.context.auxData.frameId] = info.context.id;
- scriptsOnNewContext.forEach(s => {
- client.Runtime.evaluate({ expression: s, contextId: info.context.id })
- .catch(ex => {
- // race condition with short-lived frames -- OK if the frame is already gone
- if (ex.response && ex.response.message && ex.response.message.indexOf("Cannot find context") >= 0)
- debug(`scriptsOnNewContext for context ${info.context.id} failed, ignoring: ${JSON.stringify(ex.response)}`);
- else
- throw ex;
- });
- });
- });
-
- client.Runtime.executionContextDestroyed(info => {
- debug("executionContextDestroyed " + info.executionContextId);
- for (const frameId in frameIdToContextId) {
- if (frameIdToContextId[frameId] == info.executionContextId) {
- delete frameIdToContextId[frameId];
- break;
- }
- }
-
- // Firefox does not report an error when the execution context
- // of a Runtime.evaluate call gets destroyed. It will never
- // ever resolve or be rejected. So let's provide the failure
- // reply from here.
- //
- // However, if the timing is just right, the context gets
- // destroyed before Runtime.evaluate has started the real
- // processing, and in that case it will return an error. Then
- // we would send the reply here, and would also send the
- // error. This would drive the protocol out of synch. Also, our driver
- // might immediately send more commands after seeing the first reply,
- // and the unwanted second reply might be triggered in the middle of one
- // of the next commands. To reliably suppress the second reply we have
- // the pretty general sequence number checks.
- //
- if (info.executionContextId == currentExecId) {
- currentExecId = null;
- fail(cur_cmd_seq, { response: { message: "Execution context was destroyed." } });
- }
- });
-}
-
-function setupLocalFunctions(client) {
- client.setupPageLoadHandler = timeout => {
- if (pageLoadHandler !== null)
- return Promise.reject("setupPageLoadHandler: already pending"); // eslint-disable-line prefer-promise-reject-errors
-
- client.pageLoadPromise = new Promise((resolve, reject) => {
- const timeout_timer = setTimeout(() => {
- pageLoadHandler = null;
- reject("Timeout waiting for page load"); // eslint-disable-line prefer-promise-reject-errors
- }, timeout * 1000);
-
- pageLoadHandler = () => {
- clearTimeout(timeout_timer);
- pageLoadHandler = null;
- resolve({});
- };
- });
-
- return Promise.resolve({});
- };
-}
-
-// helper functions for testlib.py which are too unwieldy to be poked in from Python
-function getFrameExecId(frame) { // eslint-disable-line no-unused-vars
- const frameId = frameNameToFrameId[frame || "cockpit1"];
- const execId = frameIdToContextId[frameId];
- if (execId !== undefined)
- currentExecId = execId;
- else
- debug(`WARNING: getFrameExecId: frame ${frame} ID ${frameId} has no known execution context`);
- return execId;
-}
-
-/**
- * Main input/process loop
- *
- * Read one line with a JS expression, eval() it, and respond with the result:
- * success
- * fail
- * EOF shuts down the client.
- */
-process.stdin.setEncoding('utf8');
-
-if (process.env.TEST_CDP_DEBUG)
- enable_debug = true;
-
-const options = { };
-if (process.argv.length >= 3) {
- options.port = parseInt(process.argv[2]);
- if (!options.port) {
- process.stderr.write("Usage: firefox-cdp-driver.js [port]\n");
- process.exit(1);
- }
-}
-
-// HACK: `addScriptToEvaluateOnNewDocument` is not implemented in Firefox
-// thus save all scripts in array and on each new context just execute these
-// scripts in them
-// https://bugzilla.mozilla.org/show_bug.cgi?id=1549465
-function addScriptToEvaluateOnNewDocument(script) { // eslint-disable-line no-unused-vars
- return new Promise((resolve, reject) => {
- scriptsOnNewContext.push(script.source);
- resolve();
- });
-}
-
-// This should work on different targets (meaning tabs)
-// CDP takes {target:target} so we can pick target
-// Problem is that CDP.New() which creates new target works only for chrome/ium
-// But we should be able to use CPD.List to list all targets and then pick one
-// Firefox just gives them ascending numbers, so we can pick the one with highest number
-// and if we feel fancy we can check that url is `about:newtab`.
-// That still though does not create new tab - but we can just call `firefox about:blank`
-// from cdline and since firefox would open it in the same browser, it should work.
-// This would work just fine in CI (as there would be only one browser) but on our machines it may
-// pick a wrong window (no idea if they can be somehow distinguish and execute it in a specific
-// one). But I guess we can live with it (and it seems it picks the last opened window anyway,
-// so having your own browser running should not interfere)
-//
-// Just calling executable to open another tab in the same browser works also for chromium, so
-// should be fine
-CDP(options)
- .then(client => {
- setupLogging(client);
- setupFrameTracking(client);
- setupLocalFunctions(client);
- // TODO: Security handling not yet supported in Firefox
-
- readline.createInterface(process.stdin)
- .on('line', command => {
- // HACKS: See description of related functions
- if (command.startsWith("client.Page.addScriptToEvaluateOnNewDocument"))
- command = command.substring(12);
-
- // run the command
- const seq = ++cur_cmd_seq;
- eval(command).then(reply => { // eslint-disable-line no-eval
- currentExecId = null;
- if (unhandledExceptions.length === 0) {
- success(seq, reply);
- } else {
- const message = unhandledExceptions[0];
- fail(seq, message.split("\n")[0]);
- clearExceptions();
- }
- }, err => {
- currentExecId = null;
- fail(seq, err);
- });
- })
- .on('close', () => process.exit(0));
- })
- .catch(fatal);
diff --git a/test/common/storagelib.py b/test/common/storagelib.py
index 38877302eae2..357b0651a4c5 100644
--- a/test/common/storagelib.py
+++ b/test/common/storagelib.py
@@ -241,7 +241,7 @@ def dialog_cancel(self):
def dialog_wait_close(self):
# file system operations often take longer than 10s
- with self.browser.wait_timeout(max(self.browser.cdp.timeout, 60)):
+ with self.browser.wait_timeout(max(self.browser.timeout, 60)):
self.browser.wait_not_present('#dialog')
def dialog_check(self, expect):
diff --git a/test/common/testlib.py b/test/common/testlib.py
index 7a87e5d47d7a..0aacc26fac8a 100644
--- a/test/common/testlib.py
+++ b/test/common/testlib.py
@@ -18,6 +18,7 @@
"""Tools for writing Cockpit test cases."""
import argparse
+import asyncio
import base64
import contextlib
import errno
@@ -25,6 +26,7 @@
import glob
import io
import json
+import logging
import os
import re
import shutil
@@ -32,13 +34,15 @@
import subprocess
import sys
import tempfile
+import threading
import time
import traceback
import unittest
-from collections.abc import Collection, Container, Iterator, Mapping, Sequence
-from typing import Any, Callable, ClassVar, Literal, Never, TypedDict, TypeVar
+from collections.abc import Collection, Container, Coroutine, Iterator, Mapping, Sequence
+from pathlib import Path
+from typing import Any, Callable, ClassVar, Literal, TypedDict, TypeVar
-import cdp
+import webdriver_bidi
from lcov import write_lcov
from lib.constants import OSTREE_IMAGES
from machine import testvm
@@ -47,6 +51,8 @@
_T = TypeVar('_T')
_FT = TypeVar("_FT", bound=Callable[..., Any])
+JsonObject = dict[str, Any]
+
BASE_DIR = os.path.realpath(f'{__file__}/../../..')
TEST_DIR = f'{BASE_DIR}/test'
BOTS_DIR = f'{BASE_DIR}/bots'
@@ -92,6 +98,25 @@
opts.coverage = False
+# https://w3c.github.io/webdriver/#keyboard-actions for encoding key names
+WEBDRIVER_KEYS = {
+ "Backspace": "\uE003",
+ "Tab": "\uE004",
+ "Return": "\uE006",
+ "Enter": "\uE007",
+ "Shift": "\uE008",
+ "Control": "\uE009",
+ "Alt": "\uE00A",
+ "Escape": "\uE00C",
+ "ArrowLeft": "\uE012",
+ "ArrowUp": "\uE013",
+ "ArrowRight": "\uE014",
+ "ArrowDown": "\uE015",
+ "Insert": "\uE016",
+ "Delete": "\uE017",
+}
+
+
# Browser layouts
#
# A browser can be switched into a number of different layouts, such
@@ -185,6 +210,8 @@ def unique_filename(base: str, ext: str) -> str:
class Browser:
+ driver: webdriver_bidi.WebdriverBidi
+ browser: str
layouts: Sequence[BrowserLayout]
current_layout: BrowserLayout | None
port: str | int
@@ -211,39 +238,97 @@ def __init__(
self.used_pixel_references = set[str]()
self.coverage_label = coverage_label
self.machine = machine
- path = os.path.dirname(__file__)
- sizzle_js = os.path.join(path, "../../node_modules/sizzle/dist/sizzle.js")
- helpers = [os.path.join(path, "test-functions.js")]
- if os.path.exists(sizzle_js):
- helpers.append(sizzle_js)
- self.cdp = cdp.CDP("C.utf8", verbose=opts.trace, trace=opts.trace,
- inject_helpers=helpers,
- start_profile=coverage_label is not None)
+
+ headless = not bool(os.environ.get("TEST_SHOW_BROWSER", ""))
+ self.browser = os.environ.get("TEST_BROWSER", "chromium")
+ if self.browser == "chromium":
+ self.driver = webdriver_bidi.ChromiumBidi(headless=headless)
+ elif self.browser == "firefox":
+ self.driver = webdriver_bidi.FirefoxBidi(headless=headless)
+ else:
+ raise ValueError(f"unknown browser {self.browser}")
+ self.loop = asyncio.new_event_loop()
+ self.bidi_thread = threading.Thread(target=self.asyncio_loop_thread, args=(self.loop,))
+ self.bidi_thread.start()
+
+ self.run_async(self.driver.start_session())
+
+ if opts.trace:
+ logging.basicConfig(level=logging.INFO)
+ webdriver_bidi.log_command.setLevel(logging.INFO if opts.trace else logging.WARNING)
+ # not appropriate for --trace, just enable for debugging low-level protocol with browser
+ # bidiwebdriver_.log_proto.setLevel(logging.DEBUG)
+
+ test_functions = (Path(__file__).parent / "test-functions.js").read_text()
+ # Don't redefine globals, this confuses Firefox
+ test_functions = "if (window.ph_select) return; " + test_functions
+ self.bidi("script.addPreloadScript", quiet=True, functionDeclaration=f"() => {{ {test_functions} }}")
+
+ try:
+ sizzle_js = (Path(__file__).parent.parent.parent / "node_modules/sizzle/dist/sizzle.js").read_text()
+ # HACK: injecting sizzle fails on missing `document` in assert()
+ sizzle_js = sizzle_js.replace('function assert( fn ) {',
+ 'function assert( fn ) { if (true) return true; else ')
+ # HACK: sizzle tracks document and when we switch frames, it sees the old document
+ # although we execute it in different context.
+ sizzle_js = sizzle_js.replace('context = context || document;', 'context = context || window.document;')
+ self.bidi("script.addPreloadScript", quiet=True, functionDeclaration=f"() => {{ {sizzle_js} }}")
+ except FileNotFoundError:
+ pass
+
+ if coverage_label:
+ self.cdp_command("Profiler.enable")
+ self.cdp_command("Profiler.startPreciseCoverage", callCount=False, detailed=True)
+
self.password = "foobar"
self.timeout_factor = int(os.getenv("TEST_TIMEOUT_FACTOR", "1"))
+ self.timeout = 15
self.failed_pixel_tests = 0
self.allow_oops = False
- self.body_clip = None
try:
with open(f'{TEST_DIR}/browser-layouts.json') as fp:
self.layouts = json.load(fp)
except FileNotFoundError:
self.layouts = default_layouts
- # Firefox CDP does not support setting EmulatedMedia
- # https://bugzilla.mozilla.org/show_bug.cgi?id=1549434
- if self.cdp.browser.name != "chromium":
- self.layouts = [layout for layout in self.layouts if layout["theme"] != "dark"]
self.current_layout = None
+ self.valid = True
- # we don't have a proper SSL certificate for our tests, ignore it
- # Firefox does not support this, tests which rely on it need to skip it
- if self.cdp.browser.name == "chromium":
- self.cdp.command("setSSLBadCertificateAction('continue')")
+ def run_async(self, coro: Coroutine[Any, Any, Any]) -> JsonObject:
+ """Run coro in main loop in our BiDi thread
- def allow_download(self) -> None:
- """Allow browser downloads"""
- if self.cdp.browser.name == "chromium":
- self.cdp.invoke("Page.setDownloadBehavior", behavior="allow", downloadPath=self.cdp.download_dir)
+ Wait for the result and return it.
+ """
+ return asyncio.run_coroutine_threadsafe(coro, self.loop).result()
+
+ @staticmethod
+ def asyncio_loop_thread(loop: asyncio.AbstractEventLoop) -> None:
+ asyncio.set_event_loop(loop)
+ loop.run_forever()
+
+ def kill(self) -> None:
+ if not self.valid:
+ return
+ self.run_async(self.driver.close())
+ self.loop.call_soon_threadsafe(self.loop.stop)
+ self.bidi_thread.join()
+ self.valid = False
+
+ def bidi(self, method: str, **params: Any) -> webdriver_bidi.JsonObject:
+ """Send a Webdriver BiDi command and return the JSON response"""
+
+ try:
+ return self.run_async(self.driver.bidi(method, **params))
+ except webdriver_bidi.WebdriverError as e:
+ raise Error(str(e)) from None
+
+ def cdp_command(self, method: str, **params: Any) -> webdriver_bidi.JsonObject:
+ """Send a Chrome DevTools Protocol command and return the JSON response"""
+
+ if self.browser == "chromium":
+ assert isinstance(self.driver, webdriver_bidi.ChromiumBidi)
+ return self.run_async(self.driver.cdp(method, **params))
+ else:
+ raise webdriver_bidi.WebdriverError("CDP is only supported in Chromium")
def open(self, href: str, cookie: Mapping[str, str] | None = None, tls: bool = False) -> None:
"""Load a page into the browser.
@@ -264,14 +349,15 @@ def open(self, href: str, cookie: Mapping[str, str] | None = None, tls: bool = F
size = self.current_layout["shell_size"]
self._set_window_size(size[0], size[1])
if cookie:
- self.cdp.invoke("Network.setCookie", **cookie)
+ c = {**cookie, "value": {"type": "string", "value": cookie["value"]}}
+ self.bidi("storage.setCookie", cookie=c)
self.switch_to_top()
# Some browsers optimize this away if the current URL is already href
# (e.g. in TestKeys.testAuthorizedKeys). Load the blank page first to always
# force a load.
- self.cdp.invoke("Page.navigate", url="about:blank")
- self.cdp.invoke("Page.navigate", url=href)
+ self.bidi("browsingContext.navigate", context=self.driver.context, url="about:blank", wait="complete")
+ self.bidi("browsingContext.navigate", context=self.driver.context, url=href, wait="complete")
def set_user_agent(self, ua: str) -> None:
"""Set the user agent of the browser
@@ -279,7 +365,10 @@ def set_user_agent(self, ua: str) -> None:
:param ua: user agent string
:type ua: str
"""
- self.cdp.invoke("Emulation.setUserAgentOverride", userAgent=ua)
+ if self.browser == "chromium":
+ self.cdp_command("Emulation.setUserAgentOverride", userAgent=ua)
+ else:
+ raise NotImplementedError
def reload(self, ignore_cache: bool = False) -> None:
"""Reload the current page
@@ -289,8 +378,15 @@ def reload(self, ignore_cache: bool = False) -> None:
"""
self.switch_to_top()
- self.wait_js_cond("ph_select('iframe.container-frame').every(function (e) { return e.getAttribute('data-loaded'); })")
- self.cdp.invoke("Page.reload", ignoreCache=ignore_cache)
+ self.wait_js_cond("ph_select('iframe.container-frame').every(e => e.getAttribute('data-loaded'))")
+ if self.browser == "firefox":
+ if ignore_cache:
+ webdriver_bidi.log_command.warning(
+ "Browser.reload(): ignore_cache==True not yet supported with Firefox, ignoring")
+ self.bidi("browsingContext.reload", context=self.driver.context, wait="complete")
+ else:
+ self.bidi("browsingContext.reload", context=self.driver.context, ignoreCache=ignore_cache,
+ wait="complete")
self.machine.allow_restart_journal_messages()
@@ -302,14 +398,23 @@ def switch_to_frame(self, name: str | None) -> None:
:param name: frame name
"""
- self.cdp.set_frame(name)
+ if name is None:
+ self.switch_to_top()
+ else:
+ self.run_async(self.driver.switch_to_frame(name))
def switch_to_top(self) -> None:
"""Switch to the main frame
Switch to the main frame from for example an iframe.
"""
- self.cdp.set_frame(None)
+ self.driver.switch_to_top()
+
+ def allow_download(self) -> None:
+ """Allow browser downloads"""
+ # this is only necessary for headless chromium
+ if self.browser == "chromium":
+ self.cdp_command("Page.setDownloadBehavior", behavior="allow", downloadPath=str(self.driver.download_dir))
def upload_files(self, selector: str, files: Sequence[str]) -> None:
"""Upload a local file to the browser
@@ -317,32 +422,8 @@ def upload_files(self, selector: str, files: Sequence[str]) -> None:
The selector should select the element.
Files is a list of absolute paths to files which should be uploaded.
"""
- r = self.cdp.invoke("Runtime.evaluate", expression='document.querySelector(%s)' % jsquote(selector))
- objectId = r["result"]["objectId"]
- self.cdp.invoke("DOM.setFileInputFiles", files=files, objectId=objectId)
-
- def raise_cdp_exception(
- self, func: str, arg: str, details: Mapping[str, Any], trailer: str | None = None
- ) -> Never:
- # unwrap a typical error string
- if details.get("exception", {}).get("type") == "string":
- msg = details["exception"]["value"]
- elif details.get("text", None):
- msg = details.get("text", None)
- else:
- msg = str(details)
- if trailer:
- msg += "\n" + trailer
- raise Error("%s(%s): %s" % (func, arg, msg))
-
- def inject_js(self, code: str) -> None:
- """Execute JS code that does not return anything
-
- :param code: a string containing JavaScript code
- :type code: str
- """
- self.cdp.invoke("Runtime.evaluate", expression=code, trace=code,
- silent=False, awaitPromise=True, returnByValue=False, no_trace=True)
+ element = self.eval_js(f"ph_find({jsquote(selector)})")
+ self.bidi("input.setFiles", context=self.driver.context, element=element, files=files)
def eval_js(self, code: str, no_trace: bool = False) -> Any:
"""Execute JS code that returns something
@@ -350,21 +431,8 @@ def eval_js(self, code: str, no_trace: bool = False) -> Any:
:param code: a string containing JavaScript code
:param no_trace: do not print information about unknown return values (default False)
"""
- result = self.cdp.invoke("Runtime.evaluate", expression=code, trace=code,
- silent=False, awaitPromise=True, returnByValue=True, no_trace=no_trace)
- if "exceptionDetails" in result:
- self.raise_cdp_exception("eval_js", code, result["exceptionDetails"])
- _type = result.get("result", {}).get("type")
- if _type == 'object' and result["result"].get("subtype", "") == "error":
- raise Error(result["result"]["description"])
- if _type == "undefined":
- return None
- if _type and "value" in result["result"]:
- return result["result"]["value"]
-
- if opts.trace:
- print("eval_js(%s): cannot interpret return value %s" % (code, result))
- return None
+ return self.bidi("script.evaluate", expression=code, quiet=no_trace,
+ awaitPromise=True, target={"context": self.driver.context})["result"]
def call_js_func(self, func: str, *args: object) -> Any:
"""Call a JavaScript function
@@ -396,11 +464,13 @@ def cookie(self, name: str) -> Mapping[str, object] | None:
:param name: the name of the cookie
:type name: str
"""
- cookies = self.cdp.invoke("Network.getCookies")
- for c in cookies["cookies"]:
- assert isinstance(c, Mapping)
- if c["name"] == name:
- return c
+ cookies = self.bidi("storage.getCookies", filter={"name": name})["cookies"]
+ if len(cookies) > 0:
+ c = cookies[0]
+ # if we ever need to handle "base64", add that
+ assert c["value"]["type"] == "string"
+ c["value"] = c["value"]["value"]
+ return c
return None
def go(self, url_hash: str) -> None:
@@ -439,7 +509,7 @@ def click(self, selector: str) -> None:
:param selector: the selector to click on
"""
- self.mouse(selector + ":not([disabled]):not([aria-disabled=true])", "click", 0, 0, 0)
+ self.mouse(selector + ":not([disabled]):not([aria-disabled=true])", "click")
def val(self, selector: str) -> Any:
"""Get the value attribute of a selector.
@@ -502,8 +572,10 @@ def set_checked(self, selector: str, val: bool) -> None:
:param selector: the selector
:param val: boolean value to enable or disable checkbox
"""
- self.wait_visible(selector + ':not([disabled]):not([aria-disabled=true])')
- self.call_js_func('ph_set_checked', selector, val)
+ # avoid ph_set_checked, that doesn't use proper mouse emulation
+ checked = self.get_checked(selector)
+ if checked != val:
+ self.click(selector)
def focus(self, selector: str) -> None:
"""Set focus on selected element.
@@ -522,43 +594,37 @@ def blur(self, selector: str) -> None:
self.call_js_func('ph_blur', selector)
def input_text(self, text: str) -> None:
- for char in text:
- if char == "\n":
- self.key("Enter")
- else:
- self.cdp.invoke("Input.dispatchKeyEvent", type="keyDown", text=char, key=char)
- self.cdp.invoke("Input.dispatchKeyEvent", type="keyUp", text=char, key=char)
-
- def key(self, name: str, repeat: int = 1, modifiers: int = 0) -> None:
+ actions = []
+ for c in text:
+ actions.append({"type": "keyDown", "value": c})
+ actions.append({"type": "keyUp", "value": c})
+ self.bidi("input.performActions", context=self.driver.context, actions=[
+ {"type": "key", "id": "key-0", "actions": actions}])
+
+ def key(self, name: str, repeat: int = 1, modifiers: list[str] | None = None) -> None:
"""Press and release a named keyboard key.
Use this function to input special characters or modifiers.
- :param name: key name like "Enter", "Delete", or "ArrowLeft"
+ :param name: ASCII value or key name like "Enter", "Delete", or "ArrowLeft" (entry in WEBDRIVER_KEYS)
:param repeat: number of times to repeat this key (default 1)
- :param modifiers: bit field: Alt=1, Ctrl=2, Meta/Command=4, Shift=8
+ :param modifiers: "Shift", "Control", "Alt"
"""
- args: dict[str, int | str] = {}
- if self.cdp.browser.name == "chromium":
- # HACK: chromium doesn't understand some key codes in some situations
- win_keys = {
- "Backspace": 8,
- "Enter": 13,
- "Escape": 27,
- "ArrowDown": 40,
- "Insert": 45,
- }
- if name in win_keys:
- args["windowsVirtualKeyCode"] = win_keys[name]
- if name == 'Enter':
- args["text"] = '\r'
- # HACK: chromium needs windowsVirtualKeyCode with modifiers
- elif len(name) == 1 and name.isalnum() and modifiers != 0:
- args["windowsVirtualKeyCode"] = ord(name.upper())
+ actions = []
+ actions_pre = []
+ actions_post = []
+ keycode = WEBDRIVER_KEYS.get(name, name)
+
+ for m in (modifiers or []):
+ actions_pre.append({"type": "keyDown", "value": WEBDRIVER_KEYS[m]})
+ actions_post.append({"type": "keyUp", "value": WEBDRIVER_KEYS[m]})
for _ in range(repeat):
- self.cdp.invoke("Input.dispatchKeyEvent", type="keyDown", key=name, modifiers=modifiers, **args)
- self.cdp.invoke("Input.dispatchKeyEvent", type="keyUp", key=name, modifiers=modifiers, **args)
+ actions.append({"type": "keyDown", "value": keycode})
+ actions.append({"type": "keyUp", "value": keycode})
+
+ self.bidi("input.performActions", context=self.driver.context, actions=[
+ {"type": "key", "id": "key-0", "actions": actions_pre + actions + actions_post}])
def select_from_dropdown(self, selector: str, value: object) -> None:
"""For an actual