|
| 1 | +import asyncio |
| 2 | +import logging |
| 3 | +import random |
| 4 | +from typing import Union, Coroutine, Optional, Dict, List |
| 5 | +from playwright.async_api import Page, ElementHandle, CDPSession |
| 6 | + |
| 7 | +from pyppeteer_ghost_cursor.shared.math import ( |
| 8 | + Vector, |
| 9 | + origin, |
| 10 | + overshoot, |
| 11 | +) |
| 12 | +from pyppeteer_ghost_cursor.shared.spoof import ( |
| 13 | + path, |
| 14 | + should_overshoot, |
| 15 | + get_random_box_point, |
| 16 | +) |
| 17 | + |
| 18 | + |
| 19 | +logger = logging.getLogger(__name__) |
| 20 | + |
| 21 | + |
| 22 | +class GhostCursor: |
| 23 | + def __init__(self, page: Page, start: Vector): |
| 24 | + self.page = page |
| 25 | + self.previous = start |
| 26 | + self.moving = False |
| 27 | + self.overshoot_spread = 10 |
| 28 | + self.overshoot_radius = 120 |
| 29 | + |
| 30 | + async def get_cdp_session(self) -> Coroutine[None, None, CDPSession]: |
| 31 | + if not hasattr(self, "cdp_session"): |
| 32 | + self.cdp_session = await self.page.context.new_cdp_session(self.page) |
| 33 | + return self.cdp_session |
| 34 | + |
| 35 | + async def get_random_page_point(self) -> Coroutine[None, None, Vector]: |
| 36 | + """Get a random point on a browser window""" |
| 37 | + target_id = self.page.target._targetId |
| 38 | + window = (await self.get_cdp_session()).send( |
| 39 | + "Browser.getWindowForTarget", {"targetId": target_id} |
| 40 | + ) |
| 41 | + return get_random_box_point( |
| 42 | + { |
| 43 | + "x": origin.x, |
| 44 | + "y": origin.y, |
| 45 | + "width": window["bounds"]["width"], |
| 46 | + "height": window["bounds"]["height"], |
| 47 | + } |
| 48 | + ) |
| 49 | + |
| 50 | + async def random_move(self): |
| 51 | + """Start random mouse movements. Function recursively calls itself""" |
| 52 | + try: |
| 53 | + if not self.moving: |
| 54 | + rand = await self.get_random_page_point() |
| 55 | + await self.trace_path(path(self.previous, rand), True) |
| 56 | + self.previous = rand |
| 57 | + await asyncio.sleep(random.random() * 2) |
| 58 | + asyncio.ensure_future( |
| 59 | + self.random_move() |
| 60 | + ) # fire and forget, recursive function |
| 61 | + except: |
| 62 | + logger.debug("Warning: stopping random mouse movements") |
| 63 | + |
| 64 | + async def trace_path(self, vectors: List[Vector], abort_on_move: bool = False): |
| 65 | + """Move the mouse over a number of vectors""" |
| 66 | + for v in vectors: |
| 67 | + try: |
| 68 | + # In case this is called from random mouse movements and the users wants to move the mouse, abort |
| 69 | + if abort_on_move and self.moving: |
| 70 | + return |
| 71 | + await self.page.mouse.move(v.x, v.y) |
| 72 | + self.previous = v |
| 73 | + except Exception as exc: |
| 74 | + # Exit function if the browser is no longer connected |
| 75 | + if not (await self.page.browser.is_connected()): |
| 76 | + return |
| 77 | + logger.debug("Warning: could not move mouse, error message: %s", exc) |
| 78 | + |
| 79 | + def toggle_random_move(self, random_: bool): |
| 80 | + self.moving = not random_ |
| 81 | + |
| 82 | + async def click( |
| 83 | + self, |
| 84 | + selector: Optional[Union[str, ElementHandle]], |
| 85 | + padding_percentage: Optional[float] = None, |
| 86 | + wait_for_selector: Optional[float] = None, |
| 87 | + wait_for_click: Optional[float] = None, |
| 88 | + ): |
| 89 | + self.toggle_random_move(False) |
| 90 | + if selector is not None: |
| 91 | + await self.move(selector, padding_percentage, wait_for_selector) |
| 92 | + self.toggle_random_move(False) |
| 93 | + |
| 94 | + try: |
| 95 | + await self.page.mouse.down() |
| 96 | + if wait_for_click is not None: |
| 97 | + await asyncio.sleep(wait_for_click / 1000) |
| 98 | + await self.page.mouse.up() |
| 99 | + except Exception as exc: |
| 100 | + logger.debug("Warning: could not click mouse, error message: %s", exc) |
| 101 | + |
| 102 | + await asyncio.sleep(random.random() * 2) |
| 103 | + self.toggle_random_move(True) |
| 104 | + |
| 105 | + async def move( |
| 106 | + self, |
| 107 | + selector: Union[str, ElementHandle], |
| 108 | + padding_percentage: Optional[float] = None, |
| 109 | + wait_for_selector: Optional[float] = None, |
| 110 | + ): |
| 111 | + self.toggle_random_move(False) |
| 112 | + elem = None |
| 113 | + if isinstance(selector, str): |
| 114 | + if wait_for_selector: |
| 115 | + await self.page.wait_for_selector(selector, timeout=wait_for_selector) |
| 116 | + elem = await self.page.query_selector(selector) |
| 117 | + if elem is None: |
| 118 | + raise Exception( |
| 119 | + 'Could not find element with selector "${}", make sure you\'re waiting for the elements with "puppeteer.wait_for_selector"'.format( |
| 120 | + selector |
| 121 | + ) |
| 122 | + ) |
| 123 | + else: # ElementHandle |
| 124 | + elem = selector |
| 125 | + |
| 126 | + # Make sure the object is in view |
| 127 | + await elem.scroll_into_view_if_needed() |
| 128 | + box = await elem.bounding_box() |
| 129 | + if box is None: |
| 130 | + raise Exception( |
| 131 | + "Could not find the dimensions of the element you're clicking on, this might be a bug?" |
| 132 | + ) |
| 133 | + destination = get_random_box_point(box, padding_percentage) |
| 134 | + dimensions = {"height": box["height"], "width": box["width"]} |
| 135 | + overshooting = should_overshoot(self.previous, destination) |
| 136 | + to = ( |
| 137 | + overshoot(destination, self.overshoot_radius) |
| 138 | + if overshooting |
| 139 | + else destination |
| 140 | + ) |
| 141 | + await self.trace_path(path(self.previous, to)) |
| 142 | + |
| 143 | + if overshooting: |
| 144 | + bounding_box = { |
| 145 | + "height": dimensions["height"], |
| 146 | + "width": dimensions["width"], |
| 147 | + "x": destination.x, |
| 148 | + "y": destination.y, |
| 149 | + } |
| 150 | + correction = path(to, bounding_box, self.overshoot_spread) |
| 151 | + await self.trace_path(correction) |
| 152 | + self.previous = destination |
| 153 | + self.toggle_random_move(True) |
| 154 | + |
| 155 | + async def move_to(self, destination: dict): |
| 156 | + destination_vector = Vector(destination["x"], destination["y"]) |
| 157 | + self.toggle_random_move(False) |
| 158 | + await self.trace_path(path(self.previous, destination_vector)) |
| 159 | + self.toggle_random_move(True) |
| 160 | + |
| 161 | + |
| 162 | +def create_cursor( |
| 163 | + page, start: Union[Vector, Dict] = origin, perform_random_moves: bool = False |
| 164 | +) -> GhostCursor: |
| 165 | + if isinstance(start, dict): |
| 166 | + start = Vector(**start) |
| 167 | + cursor = GhostCursor(page, start) |
| 168 | + if perform_random_moves: |
| 169 | + asyncio.ensure_future(cursor.random_move()) # fire and forget |
| 170 | + return cursor |
0 commit comments