Compare commits

..

16 Commits

Author SHA1 Message Date
ilike2burnthing
6dc279a9d3 Bump version 3.0.3 2023-03-06 13:59:20 +00:00
Artemiy Ryabinkov
96fcd21174 Update undetected_chromedriver version to 3.4.6 (#715)
Co-authored-by: ilike2burnthing <59480337+ilike2burnthing@users.noreply.github.com>
2023-03-06 13:57:38 +00:00
ngosang
3a6e8e0f92 Update GitHub bug report template 2023-01-28 18:00:57 +01:00
ilike2burnthing
2d97f88276 Update README.md 2023-01-09 21:51:49 +00:00
ngosang
ac5c64319e Bump version 3.0.2 2023-01-08 20:48:20 +01:00
ngosang
c93834e2f0 Check Chrome / Chromium web browser is installed correctly 2023-01-08 20:46:11 +01:00
ngosang
e3b4200d94 Detect Cloudflare blocked access 2023-01-08 20:40:10 +01:00
ngosang
0941861f80 Update changelog 2023-01-06 18:49:05 +01:00
ngosang
8a10eb27a6 Bump version 3.0.1 2023-01-06 18:33:02 +01:00
ngosang
e9c08c84ef Update GitHub actions 2023-01-06 18:32:34 +01:00
ngosang
2aa1744476 Add more selectors to detect blocked access 2023-01-06 18:12:26 +01:00
ngosang
a89679a52d Disable Zygote sandbox in Chromium browser 2023-01-06 18:05:23 +01:00
ngosang
410ee7981f Apply undetected-chromedriver patches
* Hide Chrome window in Windows/NT
* Not use subprocess by default (independent process)
* Kill Chromium processes properly to avoid defunct/zombie processes
2023-01-06 17:50:52 +01:00
ngosang
e163019f28 Update undetected-chromedriver 2023-01-06 17:19:11 +01:00
ngosang
7d84f1b663 Kill Chromium processes properly to avoid defunct/zombie processes 2023-01-06 13:58:24 +01:00
ngosang
4807e9dbe2 Include procps (ps), curl and vim packages in the Docker image 2023-01-05 13:25:45 +01:00
21 changed files with 1850 additions and 1858 deletions

View File

@@ -32,7 +32,8 @@ body:
- Operating system: - Operating system:
- Are you using Docker: [yes/no] - Are you using Docker: [yes/no]
- FlareSolverr User-Agent (see log traces or / endpoint): - FlareSolverr User-Agent (see log traces or / endpoint):
- Are you using a proxy or VPN: [yes/no] - Are you using a VPN: [yes/no]
- Are you using a Proxy: [yes/no]
- Are you using Captcha Solver: [yes/no] - Are you using Captcha Solver: [yes/no]
- If using captcha solver, which one: - If using captcha solver, which one:
- URL to test this issue: - URL to test this issue:

View File

@@ -11,7 +11,7 @@ jobs:
steps: steps:
- -
name: Checkout name: Checkout
uses: actions/checkout@v2 uses: actions/checkout@v3
- -
name: Auto Tag name: Auto Tag
uses: Klemensas/action-autotag@stable uses: Klemensas/action-autotag@stable

View File

@@ -11,39 +11,39 @@ jobs:
steps: steps:
- -
name: Checkout name: Checkout
uses: actions/checkout@v2 uses: actions/checkout@v3
- -
name: Downcase repo name: Downcase repo
run: echo REPOSITORY=$(echo ${{ github.repository }} | tr '[:upper:]' '[:lower:]') >> $GITHUB_ENV run: echo REPOSITORY=$(echo ${{ github.repository }} | tr '[:upper:]' '[:lower:]') >> $GITHUB_ENV
- -
name: Docker meta name: Docker meta
id: docker_meta id: docker_meta
uses: crazy-max/ghaction-docker-meta@v1 uses: crazy-max/ghaction-docker-meta@v3
with: with:
images: ${{ env.REPOSITORY }},ghcr.io/${{ env.REPOSITORY }} images: ${{ env.REPOSITORY }},ghcr.io/${{ env.REPOSITORY }}
tag-sha: false tag-sha: false
- -
name: Set up QEMU name: Set up QEMU
uses: docker/setup-qemu-action@v1.0.1 uses: docker/setup-qemu-action@v2
- -
name: Set up Docker Buildx name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1 uses: docker/setup-buildx-action@v2
- -
name: Login to DockerHub name: Login to DockerHub
uses: docker/login-action@v1 uses: docker/login-action@v2
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }} password: ${{ secrets.DOCKERHUB_TOKEN }}
- -
name: Login to GitHub Container Registry name: Login to GitHub Container Registry
uses: docker/login-action@v1 uses: docker/login-action@v2
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GH_PAT }} password: ${{ secrets.GH_PAT }}
- -
name: Build and push name: Build and push
uses: docker/build-push-action@v2 uses: docker/build-push-action@v3
with: with:
context: . context: .
file: ./Dockerfile file: ./Dockerfile

View File

@@ -11,12 +11,12 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v2 uses: actions/checkout@v3
with: with:
fetch-depth: 0 # get all commits, branches and tags (required for the changelog) fetch-depth: 0 # get all commits, branches and tags (required for the changelog)
- name: Setup Node - name: Setup Node
uses: actions/setup-node@v2 uses: actions/setup-node@v3
with: with:
node-version: '16' node-version: '16'

View File

@@ -1,5 +1,18 @@
# Changelog # Changelog
## v3.0.2 (2023/01/08)
* Detect Cloudflare blocked access
* Check Chrome / Chromium web browser is installed correctly
## v3.0.1 (2023/01/06)
* Kill Chromium processes properly to avoid defunct/zombie processes
* Update undetected-chromedriver
* Disable Zygote sandbox in Chromium browser
* Add more selectors to detect blocked access
* Include procps (ps), curl and vim packages in the Docker image
## v3.0.0 (2023/01/04) ## v3.0.0 (2023/01/04)
* This is the first release of FlareSolverr v3. There are some breaking changes * This is the first release of FlareSolverr v3. There are some breaking changes

View File

@@ -29,7 +29,8 @@ RUN dpkg -i /libgl1-mesa-dri.deb \
&& dpkg -i /adwaita-icon-theme.deb \ && dpkg -i /adwaita-icon-theme.deb \
# Install dependencies # Install dependencies
&& apt-get update \ && apt-get update \
&& apt-get install -y --no-install-recommends chromium chromium-common chromium-driver xvfb \ && apt-get install -y --no-install-recommends chromium chromium-common chromium-driver xvfb dumb-init \
procps curl vim \
# Remove temporary files and hardware decoding libraries # Remove temporary files and hardware decoding libraries
&& rm -rf /var/lib/apt/lists/* \ && rm -rf /var/lib/apt/lists/* \
&& rm -f /usr/lib/x86_64-linux-gnu/libmfxhw* \ && rm -f /usr/lib/x86_64-linux-gnu/libmfxhw* \
@@ -52,6 +53,9 @@ COPY package.json ../
EXPOSE 8191 EXPOSE 8191
# dumb-init avoids zombie chromium processes
ENTRYPOINT ["/usr/bin/dumb-init", "--"]
CMD ["/usr/local/bin/python", "-u", "/app/flaresolverr.py"] CMD ["/usr/local/bin/python", "-u", "/app/flaresolverr.py"]
# Local build # Local build

View File

@@ -64,14 +64,11 @@ Remember to restart the Docker daemon and the container after the update.
### Precompiled binaries ### Precompiled binaries
This is the recommended way for Windows users. Precompiled binaries are not currently available for v3. Please see https://github.com/FlareSolverr/FlareSolverr/issues/660 for updates,
* Download the [FlareSolverr zip](https://github.com/FlareSolverr/FlareSolverr/releases) from the release's assets. It is available for Windows and Linux. or below for instructions of how to build FlareSolverr from source code.
* Extract the zip file. FlareSolverr executable and firefox folder must be in the same directory.
* Execute FlareSolverr binary. In the environment variables section you can find how to change the configuration.
### From source code ### From source code
This is the recommended way for macOS users and for developers.
* Install [Python 3.10](https://www.python.org/downloads/). * Install [Python 3.10](https://www.python.org/downloads/).
* Install [Chrome](https://www.google.com/intl/en_us/chrome/) or [Chromium](https://www.chromium.org/getting-involved/download-chromium/) web browser. * Install [Chrome](https://www.google.com/intl/en_us/chrome/) or [Chromium](https://www.chromium.org/getting-involved/download-chromium/) web browser.
* (Only in Linux / macOS) Install [Xvfb](https://en.wikipedia.org/wiki/Xvfb) package. * (Only in Linux / macOS) Install [Xvfb](https://en.wikipedia.org/wiki/Xvfb) package.

View File

@@ -1,7 +1,7 @@
{ {
"name": "flaresolverr", "name": "flaresolverr",
"version": "3.0.0", "version": "3.0.3",
"description": "Proxy server to bypass Cloudflare protection", "description": "Proxy server to bypass Cloudflare protection",
"author": "Diego Heras (ngosang / ngosang@hotmail.es)", "author": "Diego Heras (ngosang / ngosang@hotmail.es)",
"license": "MIT" "license": "MIT"
} }

View File

@@ -1,9 +1,9 @@
bottle==0.12.23 bottle==0.12.23
waitress==2.1.2 waitress==2.1.2
selenium==4.4.3 selenium==4.7.2
func-timeout==4.3.5 func-timeout==4.3.5
# required by undetected_chromedriver # required by undetected_chromedriver
requests==2.28.1 requests==2.28.1
websockets==10.3 websockets==10.4
# only required for linux # only required for linux
xvfbwrapper==0.2.9 xvfbwrapper==0.2.9

View File

@@ -1,4 +1,5 @@
import logging import logging
import sys
import time import time
from urllib.parse import unquote from urllib.parse import unquote
@@ -13,11 +14,19 @@ from dtos import V1RequestBase, V1ResponseBase, ChallengeResolutionT, ChallengeR
HealthResponse, STATUS_OK, STATUS_ERROR HealthResponse, STATUS_OK, STATUS_ERROR
import utils import utils
ACCESS_DENIED_TITLES = [
# Cloudflare
'Access denied',
# Cloudflare http://bitturk.net/ Firefox
'Attention Required! | Cloudflare'
]
ACCESS_DENIED_SELECTORS = [ ACCESS_DENIED_SELECTORS = [
# Cloudflare # Cloudflare
'div.cf-error-title span.cf-code-label span' 'div.cf-error-title span.cf-code-label span',
# Cloudflare http://bitturk.net/ Firefox
'#cf-error-details div.cf-error-overview h1'
] ]
CHALLENGE_TITLE = [ CHALLENGE_TITLES = [
# Cloudflare # Cloudflare
'Just a moment...', 'Just a moment...',
# DDoS-GUARD # DDoS-GUARD
@@ -34,6 +43,21 @@ SHORT_TIMEOUT = 10
def test_browser_installation(): def test_browser_installation():
logging.info("Testing web browser installation...") logging.info("Testing web browser installation...")
chrome_exe_path = utils.get_chrome_exe_path()
if chrome_exe_path is None:
logging.error("Chrome / Chromium web browser not installed!")
sys.exit(1)
else:
logging.info("Chrome / Chromium path: " + chrome_exe_path)
chrome_major_version = utils.get_chrome_major_version()
if chrome_major_version == '':
logging.error("Chrome / Chromium version not detected!")
sys.exit(1)
else:
logging.info("Chrome / Chromium major version: " + chrome_major_version)
user_agent = utils.get_user_agent() user_agent = utils.get_user_agent()
logging.info("FlareSolverr User-Agent: " + user_agent) logging.info("FlareSolverr User-Agent: " + user_agent)
logging.info("Test successful") logging.info("Test successful")
@@ -172,7 +196,13 @@ def _evil_logic(req: V1RequestBase, driver: WebDriver, method: str) -> Challenge
# wait for the page # wait for the page
html_element = driver.find_element(By.TAG_NAME, "html") html_element = driver.find_element(By.TAG_NAME, "html")
page_title = driver.title
# find access denied titles
for title in ACCESS_DENIED_TITLES:
if title == page_title:
raise Exception('Cloudflare has blocked this request. '
'Probably your IP is banned for this site, check in your web browser.')
# find access denied selectors # find access denied selectors
for selector in ACCESS_DENIED_SELECTORS: for selector in ACCESS_DENIED_SELECTORS:
found_elements = driver.find_elements(By.CSS_SELECTOR, selector) found_elements = driver.find_elements(By.CSS_SELECTOR, selector)
@@ -182,8 +212,7 @@ def _evil_logic(req: V1RequestBase, driver: WebDriver, method: str) -> Challenge
# find challenge by title # find challenge by title
challenge_found = False challenge_found = False
page_title = driver.title for title in CHALLENGE_TITLES:
for title in CHALLENGE_TITLE:
if title == page_title: if title == page_title:
challenge_found = True challenge_found = True
logging.info("Challenge detected. Title found: " + title) logging.info("Challenge detected. Title found: " + title)
@@ -200,8 +229,8 @@ def _evil_logic(req: V1RequestBase, driver: WebDriver, method: str) -> Challenge
if challenge_found: if challenge_found:
while True: while True:
try: try:
# wait until the title change # wait until the title changes
for title in CHALLENGE_TITLE: for title in CHALLENGE_TITLES:
logging.debug("Waiting for title: " + title) logging.debug("Waiting for title: " + title)
WebDriverWait(driver, SHORT_TIMEOUT).until_not(title_is(title)) WebDriverWait(driver, SHORT_TIMEOUT).until_not(title_is(title))

File diff suppressed because it is too large Load Diff

View File

@@ -1,259 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
"""
888 888 d8b
888 888 Y8P
888 888
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
"""
import io
import logging
import os
import random
import re
import string
import sys
import zipfile
from distutils.version import LooseVersion
from urllib.request import urlopen, urlretrieve
from selenium.webdriver import Chrome as _Chrome, ChromeOptions as _ChromeOptions
TARGET_VERSION = 0
logger = logging.getLogger("uc")
class Chrome:
def __new__(cls, *args, emulate_touch=False, **kwargs):
if not ChromeDriverManager.installed:
ChromeDriverManager(*args, **kwargs).install()
if not ChromeDriverManager.selenium_patched:
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
if not kwargs.get("executable_path"):
kwargs["executable_path"] = "./{}".format(
ChromeDriverManager(*args, **kwargs).executable_path
)
if not kwargs.get("options"):
kwargs["options"] = ChromeOptions()
instance = object.__new__(_Chrome)
instance.__init__(*args, **kwargs)
instance._orig_get = instance.get
def _get_wrapped(*args, **kwargs):
if instance.execute_script("return navigator.webdriver"):
instance.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, 'navigator', {
value: new Proxy(navigator, {
has: (target, key) => (key === 'webdriver' ? false : key in target),
get: (target, key) =>
key === 'webdriver'
? undefined
: typeof target[key] === 'function'
? target[key].bind(target)
: target[key]
})
});
"""
},
)
return instance._orig_get(*args, **kwargs)
instance.get = _get_wrapped
instance.get = _get_wrapped
instance.get = _get_wrapped
original_user_agent_string = instance.execute_script(
"return navigator.userAgent"
)
instance.execute_cdp_cmd(
"Network.setUserAgentOverride",
{
"userAgent": original_user_agent_string.replace("Headless", ""),
},
)
if emulate_touch:
instance.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {
get: () => 1
})"""
},
)
logger.info(f"starting undetected_chromedriver.Chrome({args}, {kwargs})")
return instance
class ChromeOptions:
def __new__(cls, *args, **kwargs):
if not ChromeDriverManager.installed:
ChromeDriverManager(*args, **kwargs).install()
if not ChromeDriverManager.selenium_patched:
ChromeDriverManager(*args, **kwargs).patch_selenium_webdriver()
instance = object.__new__(_ChromeOptions)
instance.__init__()
instance.add_argument("start-maximized")
instance.add_experimental_option("excludeSwitches", ["enable-automation"])
instance.add_argument("--disable-blink-features=AutomationControlled")
return instance
class ChromeDriverManager(object):
installed = False
selenium_patched = False
target_version = None
DL_BASE = "https://chromedriver.storage.googleapis.com/"
def __init__(self, executable_path=None, target_version=None, *args, **kwargs):
_platform = sys.platform
if TARGET_VERSION:
# use global if set
self.target_version = TARGET_VERSION
if target_version:
# use explicitly passed target
self.target_version = target_version # user override
if not self.target_version:
# none of the above (default) and just get current version
self.target_version = self.get_release_version_number().version[
0
] # only major version int
self._base = base_ = "chromedriver{}"
exe_name = self._base
if _platform in ("win32",):
exe_name = base_.format(".exe")
if _platform in ("linux",):
_platform += "64"
exe_name = exe_name.format("")
if _platform in ("darwin",):
_platform = "mac64"
exe_name = exe_name.format("")
self.platform = _platform
self.executable_path = executable_path or exe_name
self._exe_name = exe_name
def patch_selenium_webdriver(self_):
"""
Patches selenium package Chrome, ChromeOptions classes for current session
:return:
"""
import selenium.webdriver.chrome.service
import selenium.webdriver
selenium.webdriver.Chrome = Chrome
selenium.webdriver.ChromeOptions = ChromeOptions
logger.info("Selenium patched. Safe to import Chrome / ChromeOptions")
self_.__class__.selenium_patched = True
def install(self, patch_selenium=True):
"""
Initialize the patch
This will:
download chromedriver if not present
patch the downloaded chromedriver
patch selenium package if <patch_selenium> is True (default)
:param patch_selenium: patch selenium webdriver classes for Chrome and ChromeDriver (for current python session)
:return:
"""
if not os.path.exists(self.executable_path):
self.fetch_chromedriver()
if not self.__class__.installed:
if self.patch_binary():
self.__class__.installed = True
if patch_selenium:
self.patch_selenium_webdriver()
def get_release_version_number(self):
"""
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
:return: version string
"""
path = (
"LATEST_RELEASE"
if not self.target_version
else f"LATEST_RELEASE_{self.target_version}"
)
return LooseVersion(urlopen(self.__class__.DL_BASE + path).read().decode())
def fetch_chromedriver(self):
"""
Downloads ChromeDriver from source and unpacks the executable
:return: on success, name of the unpacked executable
"""
base_ = self._base
zip_name = base_.format(".zip")
ver = self.get_release_version_number().vstring
if os.path.exists(self.executable_path):
return self.executable_path
urlretrieve(
f"{self.__class__.DL_BASE}{ver}/{base_.format(f'_{self.platform}')}.zip",
filename=zip_name,
)
with zipfile.ZipFile(zip_name) as zf:
zf.extract(self._exe_name)
os.remove(zip_name)
if sys.platform != "win32":
os.chmod(self._exe_name, 0o755)
return self._exe_name
@staticmethod
def random_cdc():
cdc = random.choices(string.ascii_lowercase, k=26)
cdc[-6:-4] = map(str.upper, cdc[-6:-4])
cdc[2] = cdc[0]
cdc[3] = "_"
return "".join(cdc).encode()
def patch_binary(self):
"""
Patches the ChromeDriver binary
:return: False on failure, binary name on success
"""
linect = 0
replacement = self.random_cdc()
with io.open(self.executable_path, "r+b") as fh:
for line in iter(lambda: fh.readline(), b""):
if b"cdc_" in line:
fh.seek(-len(line), 1)
newline = re.sub(b"cdc_.{22}", replacement, line)
fh.write(newline)
linect += 1
return linect
def install(executable_path=None, target_version=None, *args, **kwargs):
ChromeDriverManager(executable_path, target_version, *args, **kwargs).install()

View File

@@ -1,112 +1,112 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# this module is part of undetected_chromedriver # this module is part of undetected_chromedriver
import json import json
import logging import logging
from collections.abc import Mapping, Sequence
import requests
import requests import websockets
import websockets
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class CDPObject(dict): class CDPObject(dict):
def __init__(self, *a, **k): def __init__(self, *a, **k):
super().__init__(*a, **k) super().__init__(*a, **k)
self.__dict__ = self self.__dict__ = self
for k in self.__dict__: for k in self.__dict__:
if isinstance(self.__dict__[k], dict): if isinstance(self.__dict__[k], dict):
self.__dict__[k] = CDPObject(self.__dict__[k]) self.__dict__[k] = CDPObject(self.__dict__[k])
elif isinstance(self.__dict__[k], list): elif isinstance(self.__dict__[k], list):
for i in range(len(self.__dict__[k])): for i in range(len(self.__dict__[k])):
if isinstance(self.__dict__[k][i], dict): if isinstance(self.__dict__[k][i], dict):
self.__dict__[k][i] = CDPObject(self) self.__dict__[k][i] = CDPObject(self)
def __repr__(self): def __repr__(self):
tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)" tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)"
return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items())) return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items()))
class PageElement(CDPObject): class PageElement(CDPObject):
pass pass
class CDP: class CDP:
log = logging.getLogger("CDP") log = logging.getLogger("CDP")
endpoints = CDPObject( endpoints = CDPObject(
{ {
"json": "/json", "json": "/json",
"protocol": "/json/protocol", "protocol": "/json/protocol",
"list": "/json/list", "list": "/json/list",
"new": "/json/new?{url}", "new": "/json/new?{url}",
"activate": "/json/activate/{id}", "activate": "/json/activate/{id}",
"close": "/json/close/{id}", "close": "/json/close/{id}",
} }
) )
def __init__(self, options: "ChromeOptions"): # noqa def __init__(self, options: "ChromeOptions"): # noqa
self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":")) self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":"))
self._reqid = 0 self._reqid = 0
self._session = requests.Session() self._session = requests.Session()
self._last_resp = None self._last_resp = None
self._last_json = None self._last_json = None
resp = self.get(self.endpoints.json) # noqa resp = self.get(self.endpoints.json) # noqa
self.sessionId = resp[0]["id"] self.sessionId = resp[0]["id"]
self.wsurl = resp[0]["webSocketDebuggerUrl"] self.wsurl = resp[0]["webSocketDebuggerUrl"]
def tab_activate(self, id=None): def tab_activate(self, id=None):
if not id: if not id:
active_tab = self.tab_list()[0] active_tab = self.tab_list()[0]
id = active_tab.id # noqa id = active_tab.id # noqa
self.wsurl = active_tab.webSocketDebuggerUrl # noqa self.wsurl = active_tab.webSocketDebuggerUrl # noqa
return self.post(self.endpoints["activate"].format(id=id)) return self.post(self.endpoints["activate"].format(id=id))
def tab_list(self): def tab_list(self):
retval = self.get(self.endpoints["list"]) retval = self.get(self.endpoints["list"])
return [PageElement(o) for o in retval] return [PageElement(o) for o in retval]
def tab_new(self, url): def tab_new(self, url):
return self.post(self.endpoints["new"].format(url=url)) return self.post(self.endpoints["new"].format(url=url))
def tab_close_last_opened(self): def tab_close_last_opened(self):
sessions = self.tab_list() sessions = self.tab_list()
opentabs = [s for s in sessions if s["type"] == "page"] opentabs = [s for s in sessions if s["type"] == "page"]
return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"])) return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"]))
async def send(self, method: str, params: dict): async def send(self, method: str, params: dict):
self._reqid += 1 self._reqid += 1
async with websockets.connect(self.wsurl) as ws: async with websockets.connect(self.wsurl) as ws:
await ws.send( await ws.send(
json.dumps({"method": method, "params": params, "id": self._reqid}) json.dumps({"method": method, "params": params, "id": self._reqid})
) )
self._last_resp = await ws.recv() self._last_resp = await ws.recv()
self._last_json = json.loads(self._last_resp) self._last_json = json.loads(self._last_resp)
self.log.info(self._last_json) self.log.info(self._last_json)
def get(self, uri): def get(self, uri):
resp = self._session.get(self.server_addr + uri) resp = self._session.get(self.server_addr + uri)
try: try:
self._last_resp = resp self._last_resp = resp
self._last_json = resp.json() self._last_json = resp.json()
except Exception: except Exception:
return return
else: else:
return self._last_json return self._last_json
def post(self, uri, data: dict = None): def post(self, uri, data: dict = None):
if not data: if not data:
data = {} data = {}
resp = self._session.post(self.server_addr + uri, json=data) resp = self._session.post(self.server_addr + uri, json=data)
try: try:
self._last_resp = resp self._last_resp = resp
self._last_json = resp.json() self._last_json = resp.json()
except Exception: except Exception:
return self._last_resp return self._last_resp
@property @property
def last_json(self): def last_json(self):
return self._last_json return self._last_json

View File

@@ -1,191 +1,190 @@
import asyncio import asyncio
import logging from collections.abc import Mapping
import time from collections.abc import Sequence
import traceback from functools import wraps
from collections.abc import Mapping import logging
from collections.abc import Sequence import threading
from typing import Any import time
from typing import Awaitable import traceback
from typing import Callable from typing import Any
from typing import List from typing import Awaitable
from typing import Optional from typing import Callable
from contextlib import ExitStack from typing import List
import threading from typing import Optional
from functools import wraps, partial
class Structure(dict):
class Structure(dict): """
""" This is a dict-like object structure, which you should subclass
This is a dict-like object structure, which you should subclass Only properties defined in the class context are used on initialization.
Only properties defined in the class context are used on initialization.
See example
See example """
"""
_store = {}
_store = {}
def __init__(self, *a, **kw):
def __init__(self, *a, **kw): """
""" Instantiate a new instance.
Instantiate a new instance.
:param a:
:param a: :param kw:
:param kw: """
"""
super().__init__()
super().__init__()
# auxiliar dict
# auxiliar dict d = dict(*a, **kw)
d = dict(*a, **kw) for k, v in d.items():
for k, v in d.items(): if isinstance(v, Mapping):
if isinstance(v, Mapping): self[k] = self.__class__(v)
self[k] = self.__class__(v) elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)): self[k] = [self.__class__(i) for i in v]
self[k] = [self.__class__(i) for i in v] else:
else: self[k] = v
self[k] = v super().__setattr__("__dict__", self)
super().__setattr__("__dict__", self)
def __getattr__(self, item):
def __getattr__(self, item): return getattr(super(), item)
return getattr(super(), item)
def __getitem__(self, item):
def __getitem__(self, item): return super().__getitem__(item)
return super().__getitem__(item)
def __setattr__(self, key, value):
def __setattr__(self, key, value): self.__setitem__(key, value)
self.__setitem__(key, value)
def __setitem__(self, key, value):
def __setitem__(self, key, value): super().__setitem__(key, value)
super().__setitem__(key, value)
def update(self, *a, **kw):
def update(self, *a, **kw): super().update(*a, **kw)
super().update(*a, **kw)
def __eq__(self, other):
def __eq__(self, other): return frozenset(other.items()) == frozenset(self.items())
return frozenset(other.items()) == frozenset(self.items())
def __hash__(self):
def __hash__(self): return hash(frozenset(self.items()))
return hash(frozenset(self.items()))
@classmethod
@classmethod def __init_subclass__(cls, **kwargs):
def __init_subclass__(cls, **kwargs): cls._store = {}
cls._store = {}
def _normalize_strings(self):
def _normalize_strings(self): for k, v in self.copy().items():
for k, v in self.copy().items(): if isinstance(v, (str)):
if isinstance(v, (str)): self[k] = v.strip()
self[k] = v.strip()
def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None):
def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None): def wrapper(func):
def wrapper(func): @wraps(func)
@wraps(func) def wrapped(*args, **kwargs):
def wrapped(*args, **kwargs): def function_reached_timeout():
def function_reached_timeout(): if on_timeout:
if on_timeout: on_timeout(func)
on_timeout(func) else:
else: raise TimeoutError("function call timed out")
raise TimeoutError("function call timed out")
t = threading.Timer(interval=seconds, function=function_reached_timeout)
t = threading.Timer(interval=seconds, function=function_reached_timeout) t.start()
t.start() try:
try: return func(*args, **kwargs)
return func(*args, **kwargs) except:
except: t.cancel()
t.cancel() raise
raise finally:
finally: t.cancel()
t.cancel()
return wrapped
return wrapped
return wrapper
return wrapper
def test():
def test(): import sys, os
import sys, os
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) import undetected_chromedriver as uc
import undetected_chromedriver as uc import threading
import threading
def collector(
def collector( driver: uc.Chrome,
driver: uc.Chrome, stop_event: threading.Event,
stop_event: threading.Event, on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None,
on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None, listen_events: Sequence = ("browser", "network", "performance"),
listen_events: Sequence = ("browser", "network", "performance"), ):
): def threaded(driver, stop_event, on_event_coro):
def threaded(driver, stop_event, on_event_coro): async def _ensure_service_started():
async def _ensure_service_started(): while (
while ( getattr(driver, "service", False)
getattr(driver, "service", False) and getattr(driver.service, "process", False)
and getattr(driver.service, "process", False) and driver.service.process.poll()
and driver.service.process.poll() ):
): print("waiting for driver service to come back on")
print("waiting for driver service to come back on") await asyncio.sleep(0.05)
await asyncio.sleep(0.05) # await asyncio.sleep(driver._delay or .25)
# await asyncio.sleep(driver._delay or .25)
async def get_log_lines(typ):
async def get_log_lines(typ): await _ensure_service_started()
await _ensure_service_started() return driver.get_log(typ)
return driver.get_log(typ)
async def looper():
async def looper(): while not stop_event.is_set():
while not stop_event.is_set(): log_lines = []
log_lines = [] try:
try: for _ in listen_events:
for _ in listen_events: try:
try: log_lines += await get_log_lines(_)
log_lines += await get_log_lines(_) except:
except: if logging.getLogger().getEffectiveLevel() <= 10:
if logging.getLogger().getEffectiveLevel() <= 10: traceback.print_exc()
traceback.print_exc() continue
continue if log_lines and on_event_coro:
if log_lines and on_event_coro: await on_event_coro(log_lines)
await on_event_coro(log_lines) except Exception as e:
except Exception as e: if logging.getLogger().getEffectiveLevel() <= 10:
if logging.getLogger().getEffectiveLevel() <= 10: traceback.print_exc()
traceback.print_exc()
loop = asyncio.new_event_loop()
loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
asyncio.set_event_loop(loop) loop.run_until_complete(looper())
loop.run_until_complete(looper())
t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro))
t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro)) t.start()
t.start()
async def on_event(data):
async def on_event(data): print("on_event")
print("on_event") print("data:", data)
print("data:", data)
def func_called(fn):
def func_called(fn): def wrapped(*args, **kwargs):
def wrapped(*args, **kwargs): print(
print( "func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs)
"func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs) )
) while driver.service.process and driver.service.process.poll() is not None:
while driver.service.process and driver.service.process.poll() is not None: time.sleep(0.1)
time.sleep(0.1) res = fn(*args, **kwargs)
res = fn(*args, **kwargs) print("func completed! (result: %s)" % res)
print("func completed! (result: %s)" % res) return res
return res
return wrapped
return wrapped
logging.basicConfig(level=10)
logging.basicConfig(level=10)
options = uc.ChromeOptions()
options = uc.ChromeOptions() options.set_capability(
options.set_capability( "goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"}
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"} )
)
driver = uc.Chrome(version_main=96, options=options)
driver = uc.Chrome(version_main=96, options=options)
# driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request)
# driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request) driver.command_executor._request = func_called(driver.command_executor._request)
driver.command_executor._request = func_called(driver.command_executor._request) collector_stop = threading.Event()
collector_stop = threading.Event() collector(driver, collector_stop, on_event)
collector(driver, collector_stop, on_event)
driver.get("https://nowsecure.nl")
driver.get("https://nowsecure.nl")
time.sleep(10)
time.sleep(10)
driver.quit()
driver.quit()

View File

@@ -1,75 +1,76 @@
import multiprocessing import atexit
import os import logging
import platform import multiprocessing
import sys import os
from subprocess import PIPE import platform
from subprocess import Popen import signal
import atexit from subprocess import PIPE
import traceback from subprocess import Popen
import logging import sys
import signal
CREATE_NEW_PROCESS_GROUP = 0x00000200 CREATE_NEW_PROCESS_GROUP = 0x00000200
DETACHED_PROCESS = 0x00000008 DETACHED_PROCESS = 0x00000008
REGISTERED = [] REGISTERED = []
def start_detached(executable, *args): def start_detached(executable, *args):
""" """
Starts a fully independent subprocess (with no parent) Starts a fully independent subprocess (with no parent)
:param executable: executable :param executable: executable
:param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...] :param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...]
:return: pid of the grandchild process :return: pid of the grandchild process
""" """
# create pipe # create pipe
reader, writer = multiprocessing.Pipe(False) reader, writer = multiprocessing.Pipe(False)
# do not keep reference # do not keep reference
multiprocessing.Process( process = multiprocessing.Process(
target=_start_detached, target=_start_detached,
args=(executable, *args), args=(executable, *args),
kwargs={"writer": writer}, kwargs={"writer": writer},
daemon=True, daemon=True,
).start() )
# receive pid from pipe process.start()
pid = reader.recv() process.join()
REGISTERED.append(pid) # receive pid from pipe
# close pipes pid = reader.recv()
writer.close() REGISTERED.append(pid)
reader.close() # close pipes
writer.close()
return pid reader.close()
return pid
def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
# configure launch def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
kwargs = {} # configure launch
if platform.system() == "Windows": kwargs = {}
kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP) if platform.system() == "Windows":
elif sys.version_info < (3, 2): kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP)
# assume posix elif sys.version_info < (3, 2):
kwargs.update(preexec_fn=os.setsid) # assume posix
else: # Python 3.2+ and Unix kwargs.update(preexec_fn=os.setsid)
kwargs.update(start_new_session=True) else: # Python 3.2+ and Unix
kwargs.update(start_new_session=True)
# run
p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs) # run
p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs)
# send pid to pipe
writer.send(p.pid) # send pid to pipe
sys.exit() writer.send(p.pid)
sys.exit()
def _cleanup():
for pid in REGISTERED: def _cleanup():
try: for pid in REGISTERED:
logging.getLogger(__name__).debug("cleaning up pid %d " % pid) try:
os.kill(pid, signal.SIGTERM) logging.getLogger(__name__).debug("cleaning up pid %d " % pid)
except: # noqa os.kill(pid, signal.SIGTERM)
pass except: # noqa
pass
atexit.register(_cleanup)
atexit.register(_cleanup)

View File

@@ -1,70 +1,85 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# this module is part of undetected_chromedriver # this module is part of undetected_chromedriver
import json import json
import os import os
from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions
class ChromeOptions(_ChromiumOptions): class ChromeOptions(_ChromiumOptions):
_session = None _session = None
_user_data_dir = None _user_data_dir = None
@property @property
def user_data_dir(self): def user_data_dir(self):
return self._user_data_dir return self._user_data_dir
@user_data_dir.setter @user_data_dir.setter
def user_data_dir(self, path: str): def user_data_dir(self, path: str):
""" """
Sets the browser profile folder to use, or creates a new profile Sets the browser profile folder to use, or creates a new profile
at given <path>. at given <path>.
Parameters Parameters
---------- ----------
path: str path: str
the path to a chrome profile folder the path to a chrome profile folder
if it does not exist, a new profile will be created at given location if it does not exist, a new profile will be created at given location
""" """
apath = os.path.abspath(path) apath = os.path.abspath(path)
self._user_data_dir = os.path.normpath(apath) self._user_data_dir = os.path.normpath(apath)
@staticmethod @staticmethod
def _undot_key(key, value): def _undot_key(key, value):
"""turn a (dotted key, value) into a proper nested dict""" """turn a (dotted key, value) into a proper nested dict"""
if "." in key: if "." in key:
key, rest = key.split(".", 1) key, rest = key.split(".", 1)
value = ChromeOptions._undot_key(rest, value) value = ChromeOptions._undot_key(rest, value)
return {key: value} return {key: value}
def handle_prefs(self, user_data_dir): @staticmethod
prefs = self.experimental_options.get("prefs") def _merge_nested(a, b):
if prefs: """
merges b into a
user_data_dir = user_data_dir or self._user_data_dir leaf values in a are overwritten with values from b
default_path = os.path.join(user_data_dir, "Default") """
os.makedirs(default_path, exist_ok=True) for key in b:
if key in a:
# undot prefs dict keys if isinstance(a[key], dict) and isinstance(b[key], dict):
undot_prefs = {} ChromeOptions._merge_nested(a[key], b[key])
for key, value in prefs.items(): continue
undot_prefs.update(self._undot_key(key, value)) a[key] = b[key]
return a
prefs_file = os.path.join(default_path, "Preferences")
if os.path.exists(prefs_file): def handle_prefs(self, user_data_dir):
with open(prefs_file, encoding="latin1", mode="r") as f: prefs = self.experimental_options.get("prefs")
undot_prefs.update(json.load(f)) if prefs:
user_data_dir = user_data_dir or self._user_data_dir
with open(prefs_file, encoding="latin1", mode="w") as f: default_path = os.path.join(user_data_dir, "Default")
json.dump(undot_prefs, f) os.makedirs(default_path, exist_ok=True)
# remove the experimental_options to avoid an error # undot prefs dict keys
del self._experimental_options["prefs"] undot_prefs = {}
for key, value in prefs.items():
@classmethod undot_prefs = self._merge_nested(
def from_options(cls, options): undot_prefs, self._undot_key(key, value)
o = cls() )
o.__dict__.update(options.__dict__)
return o prefs_file = os.path.join(default_path, "Preferences")
if os.path.exists(prefs_file):
with open(prefs_file, encoding="latin1", mode="r") as f:
undot_prefs = self._merge_nested(json.load(f), undot_prefs)
with open(prefs_file, encoding="latin1", mode="w") as f:
json.dump(undot_prefs, f)
# remove the experimental_options to avoid an error
del self._experimental_options["prefs"]
@classmethod
def from_options(cls, options):
o = cls()
o.__dict__.update(options.__dict__)
return o

View File

@@ -1,276 +1,275 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# this module is part of undetected_chromedriver # this module is part of undetected_chromedriver
import io from distutils.version import LooseVersion
import logging import io
import os import logging
import random import os
import re import random
import string import re
import sys import string
import time import sys
import zipfile import time
from distutils.version import LooseVersion from urllib.request import urlopen
from urllib.request import urlopen, urlretrieve from urllib.request import urlretrieve
import secrets import zipfile
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux")) IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux", "linux2"))
class Patcher(object): class Patcher(object):
url_repo = "https://chromedriver.storage.googleapis.com" url_repo = "https://chromedriver.storage.googleapis.com"
zip_name = "chromedriver_%s.zip" zip_name = "chromedriver_%s.zip"
exe_name = "chromedriver%s" exe_name = "chromedriver%s"
platform = sys.platform platform = sys.platform
if platform.endswith("win32"): if platform.endswith("win32"):
zip_name %= "win32" zip_name %= "win32"
exe_name %= ".exe" exe_name %= ".exe"
if platform.endswith("linux"): if platform.endswith(("linux", "linux2")):
zip_name %= "linux64" zip_name %= "linux64"
exe_name %= "" exe_name %= ""
if platform.endswith("darwin"): if platform.endswith("darwin"):
zip_name %= "mac64" zip_name %= "mac64"
exe_name %= "" exe_name %= ""
if platform.endswith("win32"): if platform.endswith("win32"):
d = "~/appdata/roaming/undetected_chromedriver" d = "~/appdata/roaming/undetected_chromedriver"
elif platform.startswith("linux"): elif "LAMBDA_TASK_ROOT" in os.environ:
d = "~/.local/share/undetected_chromedriver" d = "/tmp/undetected_chromedriver"
elif platform.endswith("darwin"): elif platform.startswith(("linux", "linux2")):
d = "~/Library/Application Support/undetected_chromedriver" d = "~/.local/share/undetected_chromedriver"
else: elif platform.endswith("darwin"):
d = "~/.undetected_chromedriver" d = "~/Library/Application Support/undetected_chromedriver"
data_path = os.path.abspath(os.path.expanduser(d)) else:
d = "~/.undetected_chromedriver"
def __init__(self, executable_path=None, force=False, version_main: int = 0): data_path = os.path.abspath(os.path.expanduser(d))
"""
def __init__(self, executable_path=None, force=False, version_main: int = 0):
Args: """
executable_path: None = automatic Args:
a full file path to the chromedriver executable executable_path: None = automatic
force: False a full file path to the chromedriver executable
terminate processes which are holding lock force: False
version_main: 0 = auto terminate processes which are holding lock
specify main chrome version (rounded, ex: 82) version_main: 0 = auto
""" specify main chrome version (rounded, ex: 82)
"""
self.force = force self.force = force
self.executable_path = None self._custom_exe_path = False
prefix = secrets.token_hex(8) prefix = "undetected"
if not os.path.exists(self.data_path): if not os.path.exists(self.data_path):
os.makedirs(self.data_path, exist_ok=True) os.makedirs(self.data_path, exist_ok=True)
if not executable_path: if not executable_path:
self.executable_path = os.path.join( self.executable_path = os.path.join(
self.data_path, "_".join([prefix, self.exe_name]) self.data_path, "_".join([prefix, self.exe_name])
) )
if not IS_POSIX: if not IS_POSIX:
if executable_path: if executable_path:
if not executable_path[-4:] == ".exe": if not executable_path[-4:] == ".exe":
executable_path += ".exe" executable_path += ".exe"
self.zip_path = os.path.join(self.data_path, prefix) self.zip_path = os.path.join(self.data_path, prefix)
if not executable_path: if not executable_path:
self.executable_path = os.path.abspath( self.executable_path = os.path.abspath(
os.path.join(".", self.executable_path) os.path.join(".", self.executable_path)
) )
self._custom_exe_path = False if executable_path:
self._custom_exe_path = True
if executable_path: self.executable_path = executable_path
self._custom_exe_path = True self.version_main = version_main
self.executable_path = executable_path self.version_full = None
self.version_main = version_main
self.version_full = None def auto(self, executable_path=None, force=False, version_main=None):
if executable_path:
def auto(self, executable_path=None, force=False, version_main=None): self.executable_path = executable_path
"""""" self._custom_exe_path = True
if executable_path:
self.executable_path = executable_path if self._custom_exe_path:
self._custom_exe_path = True ispatched = self.is_binary_patched(self.executable_path)
if not ispatched:
if self._custom_exe_path: return self.patch_exe()
ispatched = self.is_binary_patched(self.executable_path) else:
if not ispatched: return
return self.patch_exe()
else: if version_main:
return self.version_main = version_main
if force is True:
if version_main: self.force = force
self.version_main = version_main
if force is True: try:
self.force = force os.unlink(self.executable_path)
except PermissionError:
try: if self.force:
os.unlink(self.executable_path) self.force_kill_instances(self.executable_path)
except PermissionError: return self.auto(force=not self.force)
if self.force: try:
self.force_kill_instances(self.executable_path) if self.is_binary_patched():
return self.auto(force=not self.force) # assumes already running AND patched
try: return True
if self.is_binary_patched(): except PermissionError:
# assumes already running AND patched pass
return True # return False
except PermissionError: except FileNotFoundError:
pass pass
# return False
except FileNotFoundError: release = self.fetch_release_number()
pass self.version_main = release.version[0]
self.version_full = release
release = self.fetch_release_number() self.unzip_package(self.fetch_package())
self.version_main = release.version[0] return self.patch()
self.version_full = release
self.unzip_package(self.fetch_package()) def patch(self):
return self.patch() self.patch_exe()
return self.is_binary_patched()
def patch(self):
self.patch_exe() def fetch_release_number(self):
return self.is_binary_patched() """
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
def fetch_release_number(self): :return: version string
""" :rtype: LooseVersion
Gets the latest major version available, or the latest major version of self.target_version if set explicitly. """
:return: version string path = "/latest_release"
:rtype: LooseVersion if self.version_main:
""" path += f"_{self.version_main}"
path = "/latest_release" path = path.upper()
if self.version_main: logger.debug("getting release number from %s" % path)
path += f"_{self.version_main}" return LooseVersion(urlopen(self.url_repo + path).read().decode())
path = path.upper()
logger.debug("getting release number from %s" % path) def parse_exe_version(self):
return LooseVersion(urlopen(self.url_repo + path).read().decode()) with io.open(self.executable_path, "rb") as f:
for line in iter(lambda: f.readline(), b""):
def parse_exe_version(self): match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line)
with io.open(self.executable_path, "rb") as f: if match:
for line in iter(lambda: f.readline(), b""): return LooseVersion(match[1].decode())
match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line)
if match: def fetch_package(self):
return LooseVersion(match[1].decode()) """
Downloads ChromeDriver from source
def fetch_package(self):
""" :return: path to downloaded file
Downloads ChromeDriver from source """
u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name)
:return: path to downloaded file logger.debug("downloading from %s" % u)
""" # return urlretrieve(u, filename=self.data_path)[0]
u = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, self.zip_name) return urlretrieve(u)[0]
logger.debug("downloading from %s" % u)
# return urlretrieve(u, filename=self.data_path)[0] def unzip_package(self, fp):
return urlretrieve(u)[0] """
Does what it says
def unzip_package(self, fp):
""" :return: path to unpacked executable
Does what it says """
logger.debug("unzipping %s" % fp)
:return: path to unpacked executable try:
""" os.unlink(self.zip_path)
logger.debug("unzipping %s" % fp) except (FileNotFoundError, OSError):
try: pass
os.unlink(self.zip_path)
except (FileNotFoundError, OSError): os.makedirs(self.zip_path, mode=0o755, exist_ok=True)
pass with zipfile.ZipFile(fp, mode="r") as zf:
zf.extract(self.exe_name, self.zip_path)
os.makedirs(self.zip_path, mode=0o755, exist_ok=True) os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path)
with zipfile.ZipFile(fp, mode="r") as zf: os.remove(fp)
zf.extract(self.exe_name, self.zip_path) os.rmdir(self.zip_path)
os.rename(os.path.join(self.zip_path, self.exe_name), self.executable_path) os.chmod(self.executable_path, 0o755)
os.remove(fp) return self.executable_path
os.rmdir(self.zip_path)
os.chmod(self.executable_path, 0o755) @staticmethod
return self.executable_path def force_kill_instances(exe_name):
"""
@staticmethod kills running instances.
def force_kill_instances(exe_name): :param: executable name to kill, may be a path as well
"""
kills running instances. :return: True on success else False
:param: executable name to kill, may be a path as well """
exe_name = os.path.basename(exe_name)
:return: True on success else False if IS_POSIX:
""" r = os.system("kill -f -9 $(pidof %s)" % exe_name)
exe_name = os.path.basename(exe_name) else:
if IS_POSIX: r = os.system("taskkill /f /im %s" % exe_name)
r = os.system("kill -f -9 $(pidof %s)" % exe_name) return not r
else:
r = os.system("taskkill /f /im %s" % exe_name) @staticmethod
return not r def gen_random_cdc():
cdc = random.choices(string.ascii_letters, k=27)
@staticmethod return "".join(cdc).encode()
def gen_random_cdc():
cdc = random.choices(string.ascii_lowercase, k=26) def is_binary_patched(self, executable_path=None):
cdc[-6:-4] = map(str.upper, cdc[-6:-4]) executable_path = executable_path or self.executable_path
cdc[2] = cdc[0] try:
cdc[3] = "_" with io.open(executable_path, "rb") as fh:
return "".join(cdc).encode() return fh.read().find(b"undetected chromedriver") != -1
except FileNotFoundError:
def is_binary_patched(self, executable_path=None): return False
"""simple check if executable is patched.
def patch_exe(self):
:return: False if not patched, else True start = time.perf_counter()
""" logger.info("patching driver executable %s" % self.executable_path)
executable_path = executable_path or self.executable_path with io.open(self.executable_path, "r+b") as fh:
with io.open(executable_path, "rb") as fh: content = fh.read()
for line in iter(lambda: fh.readline(), b""): # match_injected_codeblock = re.search(rb"{window.*;}", content)
if b"cdc_" in line: match_injected_codeblock = re.search(rb"\{window\.cdc.*?;\}", content)
return False if match_injected_codeblock:
else: target_bytes = match_injected_codeblock[0]
return True new_target_bytes = (
b'{console.log("undetected chromedriver 1337!")}'.ljust(
def patch_exe(self): len(target_bytes), b" "
""" )
Patches the ChromeDriver binary )
new_content = content.replace(target_bytes, new_target_bytes)
:return: False on failure, binary name on success if new_content == content:
""" logger.warning(
logger.info("patching driver executable %s" % self.executable_path) "something went wrong patching the driver binary. could not find injection code block"
)
linect = 0 else:
replacement = self.gen_random_cdc() logger.debug(
with io.open(self.executable_path, "r+b") as fh: "found block:\n%s\nreplacing with:\n%s"
for line in iter(lambda: fh.readline(), b""): % (target_bytes, new_target_bytes)
if b"cdc_" in line: )
fh.seek(-len(line), 1) fh.seek(0)
newline = re.sub(b"cdc_.{22}", replacement, line) fh.write(new_content)
fh.write(newline) logger.debug(
linect += 1 "patching took us {:.2f} seconds".format(time.perf_counter() - start)
return linect )
def __repr__(self): def __repr__(self):
return "{0:s}({1:s})".format( return "{0:s}({1:s})".format(
self.__class__.__name__, self.__class__.__name__,
self.executable_path, self.executable_path,
) )
def __del__(self): def __del__(self):
if self._custom_exe_path:
if self._custom_exe_path: # if the driver binary is specified by user
# if the driver binary is specified by user # we assume it is important enough to not delete it
# we assume it is important enough to not delete it return
return else:
else: timeout = 3 # stop trying after this many seconds
timeout = 3 # stop trying after this many seconds t = time.monotonic()
t = time.monotonic() while True:
while True: now = time.monotonic()
now = time.monotonic() if now - t > timeout:
if now - t > timeout: # we don't want to wait until the end of time
# we don't want to wait until the end of time logger.debug(
logger.debug( "could not unlink %s in time (%d seconds)"
"could not unlink %s in time (%d seconds)" % (self.executable_path, timeout)
% (self.executable_path, timeout) )
) break
break try:
try: os.unlink(self.executable_path)
os.unlink(self.executable_path) logger.debug("successfully unlinked %s" % self.executable_path)
logger.debug("successfully unlinked %s" % self.executable_path) break
break except (OSError, RuntimeError, PermissionError):
except (OSError, RuntimeError, PermissionError): time.sleep(0.1)
time.sleep(0.1) continue
continue except FileNotFoundError:
except FileNotFoundError: break
break

View File

@@ -1,102 +1,99 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# this module is part of undetected_chromedriver # this module is part of undetected_chromedriver
import asyncio import asyncio
import json import json
import logging import logging
import threading import threading
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
class Reactor(threading.Thread):
def __init__(self, driver: "Chrome"): class Reactor(threading.Thread):
super().__init__() def __init__(self, driver: "Chrome"):
super().__init__()
self.driver = driver
self.loop = asyncio.new_event_loop() self.driver = driver
self.loop = asyncio.new_event_loop()
self.lock = threading.Lock()
self.event = threading.Event() self.lock = threading.Lock()
self.daemon = True self.event = threading.Event()
self.handlers = {} self.daemon = True
self.handlers = {}
def add_event_handler(self, method_name, callback: callable):
""" def add_event_handler(self, method_name, callback: callable):
"""
Parameters
---------- Parameters
event_name: str ----------
example "Network.responseReceived" event_name: str
example "Network.responseReceived"
callback: callable
callable which accepts 1 parameter: the message object dictionary callback: callable
callable which accepts 1 parameter: the message object dictionary
Returns
------- Returns
-------
"""
with self.lock: """
self.handlers[method_name.lower()] = callback with self.lock:
self.handlers[method_name.lower()] = callback
@property
def running(self): @property
return not self.event.is_set() def running(self):
return not self.event.is_set()
def run(self):
try: def run(self):
asyncio.set_event_loop(self.loop) try:
self.loop.run_until_complete(self.listen()) asyncio.set_event_loop(self.loop)
except Exception as e: self.loop.run_until_complete(self.listen())
logger.warning("Reactor.run() => %s", e) except Exception as e:
logger.warning("Reactor.run() => %s", e)
async def _wait_service_started(self):
while True: async def _wait_service_started(self):
with self.lock: while True:
if ( with self.lock:
getattr(self.driver, "service", None) if (
and getattr(self.driver.service, "process", None) getattr(self.driver, "service", None)
and self.driver.service.process.poll() and getattr(self.driver.service, "process", None)
): and self.driver.service.process.poll()
await asyncio.sleep(self.driver._delay or 0.25) ):
else: await asyncio.sleep(self.driver._delay or 0.25)
break else:
break
async def listen(self):
async def listen(self):
while self.running: while self.running:
await self._wait_service_started()
await self._wait_service_started() await asyncio.sleep(1)
await asyncio.sleep(1)
try:
try: with self.lock:
with self.lock: log_entries = self.driver.get_log("performance")
log_entries = self.driver.get_log("performance")
for entry in log_entries:
for entry in log_entries: try:
obj_serialized: str = entry.get("message")
try: obj = json.loads(obj_serialized)
message = obj.get("message")
obj_serialized: str = entry.get("message") method = message.get("method")
obj = json.loads(obj_serialized)
message = obj.get("message") if "*" in self.handlers:
method = message.get("method") await self.loop.run_in_executor(
None, self.handlers["*"], message
if "*" in self.handlers: )
await self.loop.run_in_executor( elif method.lower() in self.handlers:
None, self.handlers["*"], message await self.loop.run_in_executor(
) None, self.handlers[method.lower()], message
elif method.lower() in self.handlers: )
await self.loop.run_in_executor(
None, self.handlers[method.lower()], message # print(type(message), message)
) except Exception as e:
raise e from None
# print(type(message), message)
except Exception as e: except Exception as e:
raise e from None if "invalid session id" in str(e):
pass
except Exception as e: else:
if "invalid session id" in str(e): logging.debug("exception ignored :", e)
pass
else:
logging.debug("exception ignored :", e)

View File

@@ -1,4 +0,0 @@
# for backward compatibility
import sys
sys.modules[__name__] = sys.modules[__package__]

View File

@@ -1,37 +1,86 @@
import selenium.webdriver.remote.webelement from typing import List
from selenium.webdriver.common.by import By
class WebElement(selenium.webdriver.remote.webelement.WebElement): import selenium.webdriver.remote.webelement
"""
Custom WebElement class which makes it easier to view elements when
working in an interactive environment. class WebElement(selenium.webdriver.remote.webelement.WebElement):
def click_safe(self):
standard webelement repr: super().click()
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")> self._parent.reconnect(0.1)
using this WebElement class: def children(
<WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)> self, tag=None, recursive=False
) -> List[selenium.webdriver.remote.webelement.WebElement]:
""" """
returns direct child elements of current element
@property :param tag: str, if supplied, returns <tag> nodes only
def attrs(self): """
if not hasattr(self, "_attrs"): script = "return [... arguments[0].children]"
self._attrs = self._parent.execute_script( if tag:
""" script += ".filter( node => node.tagName === '%s')" % tag.upper()
var items = {}; if recursive:
for (index = 0; index < arguments[0].attributes.length; ++index) return list(_recursive_children(self, tag))
{ return list(self._parent.execute_script(script, self))
items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value
};
return items; class UCWebElement(WebElement):
""", """
self, Custom WebElement class which makes it easier to view elements when
) working in an interactive environment.
return self._attrs
standard webelement repr:
def __repr__(self): <selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()])
if strattrs: using this WebElement class:
strattrs = " " + strattrs <WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)>
return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>"
"""
def __init__(self, parent, id_):
super().__init__(parent, id_)
self._attrs = None
@property
def attrs(self):
if not self._attrs:
self._attrs = self._parent.execute_script(
"""
var items = {};
for (index = 0; index < arguments[0].attributes.length; ++index)
{
items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value
};
return items;
""",
self,
)
return self._attrs
def __repr__(self):
strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()])
if strattrs:
strattrs = " " + strattrs
return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>"
def _recursive_children(element, tag: str = None, _results=None):
"""
returns all children of <element> recursively
:param element: `WebElement` object.
find children below this <element>
:param tag: str = None.
if provided, return only <tag> elements. example: 'a', or 'img'
:param _results: do not use!
"""
results = _results or set()
for element in element.children():
if tag:
if element.tag_name == tag:
results.add(element)
else:
results.add(element)
results |= _recursive_children(element, tag, results)
return results

View File

@@ -44,6 +44,8 @@ def get_webdriver() -> WebDriver:
# todo: this param shows a warning in chrome head-full # todo: this param shows a warning in chrome head-full
options.add_argument('--disable-setuid-sandbox') options.add_argument('--disable-setuid-sandbox')
options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-dev-shm-usage')
# this option removes the zygote sandbox (it seems that the resolution is a bit faster)
options.add_argument('--no-zygote')
# note: headless mode is detected (options.headless = True) # note: headless mode is detected (options.headless = True)
# we launch the browser in head-full mode with the window hidden # we launch the browser in head-full mode with the window hidden
@@ -86,6 +88,10 @@ def get_webdriver() -> WebDriver:
return driver return driver
def get_chrome_exe_path() -> str:
return uc.find_chrome_executable()
def get_chrome_major_version() -> str: def get_chrome_major_version() -> str:
global CHROME_MAJOR_VERSION global CHROME_MAJOR_VERSION
if CHROME_MAJOR_VERSION is not None: if CHROME_MAJOR_VERSION is not None:
@@ -110,7 +116,6 @@ def get_chrome_major_version() -> str:
process.close() process.close()
CHROME_MAJOR_VERSION = complete_version.split('.')[0].split(' ')[-1] CHROME_MAJOR_VERSION = complete_version.split('.')[0].split(' ')[-1]
logging.info(f"Chrome major version: {CHROME_MAJOR_VERSION}")
return CHROME_MAJOR_VERSION return CHROME_MAJOR_VERSION