Compare commits

..

10 Commits

Author SHA1 Message Date
ilike2burnthing
2eef473934 Update Dockerfile 2024-07-30 03:02:21 +01:00
ilike2burnthing
330e4f3a5e add libxml2 & libxslt dev packages 2024-07-30 02:46:15 +01:00
ilike2burnthing
1c9222d8dd bump requests version 2024-07-30 02:37:33 +01:00
ilike2burnthing
8f6c5a6391 revert 2024-07-30 02:11:52 +01:00
ilike2burnthing
8f5a223ed3 proxy fix 2024-07-29 23:30:25 +01:00
ilike2burnthing
e847083479 Update requirements.txt 2024-07-29 21:01:01 +01:00
ilike2burnthing
ecad429269 Update utils.py 2024-07-29 21:00:49 +01:00
ilike2burnthing
4d97333160 Delete src/undetected_chromedriver directory 2024-07-29 21:00:45 +01:00
ilike2burnthing
37215107e9 Update sessions.py 2024-07-29 21:00:40 +01:00
ilike2burnthing
af274a2485 Update flaresolverr_service.py 2024-07-29 21:00:35 +01:00
30 changed files with 386 additions and 2658 deletions

View File

@@ -29,13 +29,6 @@ body:
options:
- label: I have read the Discussions
required: true
- type: input
attributes:
label: Have you ACTUALLY checked all these?
description: Please do not waste our time and yours; these checks are there for a reason, it is not just so you can tick boxes for fun. If you type <b>YES</b> and it is clear you did not or have put in no effort, your issue will be closed and locked without comment. If you type <b>NO</b> but still open this issue, you will be permanently blocked for timewasting.
placeholder: YES or NO
validations:
required: true
- type: textarea
attributes:
label: Environment

View File

@@ -1,4 +1,4 @@
name: Autotag
name: autotag
on:
push:
@@ -9,10 +9,11 @@ jobs:
tag-release:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v5
- name: Auto Tag
-
name: Checkout
uses: actions/checkout@v3
-
name: Auto Tag
uses: Klemensas/action-autotag@stable
with:
GITHUB_TOKEN: "${{ secrets.GH_PAT }}"

View File

@@ -1,9 +1,9 @@
name: Docker release
name: release-docker
on:
push:
tags:
- "v*.*.*"
- 'v*.*.*'
pull_request:
branches:
- master
@@ -15,10 +15,10 @@ concurrency:
jobs:
build-docker-images:
if: ${{ !github.event.pull_request.head.repo.fork }}
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout repository
uses: actions/checkout@v5
- name: Checkout
uses: actions/checkout@v4
- name: Downcase repo
run: echo REPOSITORY=$(echo ${{ github.repository }} | tr '[:upper:]' '[:lower:]') >> $GITHUB_ENV
@@ -43,8 +43,8 @@ jobs:
uses: docker/setup-buildx-action@v3
- name: Login to DockerHub
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
if: github.event_name != 'pull_request'
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

View File

@@ -1,63 +1,95 @@
name: Release
name: release
on:
push:
tags:
- "v*.*.*"
- 'v*.*.*'
jobs:
create-release:
name: Create release
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout repository
uses: actions/checkout@v5
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 0
fetch-depth: 0 # get all commits, branches and tags (required for the changelog)
- name: Build changelog
id: github_changelog
run: |
changelog=$(git log $(git tag | tail -2 | head -1)..HEAD --no-merges --oneline)
echo "changelog<<EOF" >> $GITHUB_OUTPUT
echo "$changelog" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
changelog="${changelog//'%'/'%25'}"
changelog="${changelog//$'\n'/'%0A'}"
changelog="${changelog//$'\r'/'%0D'}"
echo "##[set-output name=changelog;]${changelog}"
- name: Create release
uses: softprops/action-gh-release@v2
with:
body: ${{ steps.github_changelog.outputs.changelog }}
id: create_release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GH_PAT }}
build-package:
name: Build binaries
needs: create-release
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest]
steps:
- name: Checkout repository
uses: actions/checkout@v5
with:
fetch-depth: 0
tag_name: ${{ github.ref }}
release_name: ${{ github.ref }}
body: ${{ steps.github_changelog.outputs.changelog }}
draft: false
prerelease: false
build-linux-package:
name: Build Linux binary
needs: create-release
runs-on: ubuntu-22.04
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 0 # get all commits, branches and tags (required for the changelog)
- name: Setup Python
uses: actions/setup-python@v6
uses: actions/setup-python@v4
with:
python-version: "3.13"
python-version: '3.11'
- name: Build artifacts
run: |
python -m pip install -r requirements.txt
python -m pip install pyinstaller==6.16.0
python -m pip install pyinstaller==5.13.0
cd src
python build_package.py
- name: Upload release artifacts
uses: softprops/action-gh-release@v2
with:
files: ./dist/flaresolverr_*
uses: alexellis/upload-assets@0.4.0
env:
GITHUB_TOKEN: ${{ secrets.GH_PAT }}
with:
asset_paths: '["./dist/flaresolverr_*"]'
build-windows-package:
name: Build Windows binary
needs: create-release
runs-on: windows-2022
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 0 # get all commits, branches and tags (required for the changelog)
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Build artifacts
run: |
python -m pip install -r requirements.txt
python -m pip install pyinstaller==5.13.0
cd src
python build_package.py
- name: Upload release artifacts
uses: alexellis/upload-assets@0.4.0
env:
GITHUB_TOKEN: ${{ secrets.GH_PAT }}
with:
asset_paths: '["./dist/flaresolverr_*"]'

View File

@@ -1,53 +1,5 @@
# Changelog
## v3.4.6 (2025/11/29)
* Add disable image, css, fonts option with CDP. Thanks @Ananto30
## v3.4.5 (2025/11/11)
* Revert to Python v3.13
## v3.4.4 (2025/11/04)
* Bump dependencies, Chromium, and some other general fixes. Thanks @flowerey
## v3.4.3 (2025/10/28)
* Update proxy extension
## v3.4.2 (2025/10/09)
* Bump dependencies & CI actions. Thanks @flowerey
* Add optional wait time after resolving the challenge before returning. Thanks @kennedyoliveira
* Add proxy ENVs. Thanks @Robokishan
* Handle empty string and keys without value in postData. Thanks @eZ4RK0
* Add quote protection for password containing it. Thanks @warrenberberd
* Add returnScreenshot parameter to screenshot the final web page. Thanks @estebanthi
* Add log file support. Thanks @acg5159
## v3.4.1 (2025/09/15)
* Fix regex pattern syntax in utils.py
* Change access denied title check to use startswith
## v3.4.0 (2025/08/25)
* Modernize and upgrade application. Thanks @TheCrazyLex
* Remove disable software rasterizer option for ARM builds. Thanks @smrodman83
## v3.3.25 (2025/06/14)
* Remove `use-gl` argument. Thanks @qwerty12
* u_c: remove apparent c&p typo. Thanks @ok3721
* Bump requirements
## v3.3.24 (2025/06/04)
* Remove hidden character
## v3.3.23 (2025/06/04)
* Update base image to bookworm. Thanks @rwjack
## v3.3.22 (2025/06/03)
* Disable search engine choice screen
* Fix headless=false stalling. Thanks @MAKMED1337
* Change from click to keys. Thanks @sh4dowb
* Don't open devtools
* Bump Chromium to v137 for build
* Bump requirements
## v3.3.21 (2024/06/26)
* Add challenge selector to catch reloading page on non-English systems
* Escape values for generated form used in request.post. Thanks @mynameisbogdan

View File

@@ -1,4 +1,4 @@
FROM python:3.13-slim-bookworm AS builder
FROM python:3.11-slim-bullseye as builder
# Build dummy packages to skip installing them and their dependencies
RUN apt-get update \
@@ -12,7 +12,7 @@ RUN apt-get update \
&& equivs-build adwaita-icon-theme \
&& mv adwaita-icon-theme_*.deb /adwaita-icon-theme.deb
FROM python:3.13-slim-bookworm
FROM python:3.11-slim-bullseye
# Copy dummy packages
COPY --from=builder /*.deb /
@@ -30,7 +30,7 @@ RUN dpkg -i /libgl1-mesa-dri.deb \
# Install dependencies
&& apt-get update \
&& apt-get install -y --no-install-recommends chromium chromium-common chromium-driver xvfb dumb-init \
procps curl vim xauth \
procps curl vim xauth libxml2-dev libxslt-dev gcc \
# Remove temporary files and hardware decoding libraries
&& rm -rf /var/lib/apt/lists/* \
&& rm -f /usr/lib/x86_64-linux-gnu/libmfxhw* \
@@ -38,12 +38,7 @@ RUN dpkg -i /libgl1-mesa-dri.deb \
# Create flaresolverr user
&& useradd --home-dir /app --shell /bin/sh flaresolverr \
&& mv /usr/bin/chromedriver chromedriver \
&& chown -R flaresolverr:flaresolverr . \
# Create config dir
&& mkdir /config \
&& chown flaresolverr:flaresolverr /config
VOLUME /config
&& chown -R flaresolverr:flaresolverr .
# Install Python dependencies
COPY requirements.txt .
@@ -67,17 +62,17 @@ ENTRYPOINT ["/usr/bin/dumb-init", "--"]
CMD ["/usr/local/bin/python", "-u", "/app/flaresolverr.py"]
# Local build
# docker build -t ngosang/flaresolverr:3.4.6 .
# docker run -p 8191:8191 ngosang/flaresolverr:3.4.6
# docker build -t ngosang/flaresolverr:3.3.21 .
# docker run -p 8191:8191 ngosang/flaresolverr:3.3.21
# Multi-arch build
# docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
# docker buildx create --use
# docker buildx build -t ngosang/flaresolverr:3.4.6 --platform linux/386,linux/amd64,linux/arm/v7,linux/arm64/v8 .
# docker buildx build -t ngosang/flaresolverr:3.3.21 --platform linux/386,linux/amd64,linux/arm/v7,linux/arm64/v8 .
# add --push to publish in DockerHub
# Test multi-arch build
# docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
# docker buildx create --use
# docker buildx build -t ngosang/flaresolverr:3.4.6 --platform linux/arm/v7 --load .
# docker run -p 8191:8191 --platform linux/arm/v7 ngosang/flaresolverr:3.4.6
# docker buildx build -t ngosang/flaresolverr:3.3.21 --platform linux/arm/v7 --load .
# docker run -p 8191:8191 --platform linux/arm/v7 ngosang/flaresolverr:3.3.21

View File

@@ -1,6 +1,6 @@
MIT License
Copyright (c) 2025 Diego Heras (ngosang / ngosang@hotmail.es)
Copyright (c) 2023 Diego Heras (ngosang / ngosang@hotmail.es)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

210
README.md
View File

@@ -33,14 +33,13 @@ It is recommended to install using a Docker container because the project depend
already included within the image.
Docker images are available in:
- GitHub Registry => https://github.com/orgs/FlareSolverr/packages/container/package/flaresolverr
- DockerHub => https://hub.docker.com/r/flaresolverr/flaresolverr
* GitHub Registry => https://github.com/orgs/FlareSolverr/packages/container/package/flaresolverr
* DockerHub => https://hub.docker.com/r/flaresolverr/flaresolverr
Supported architectures are:
| Architecture | Tag |
| ------------ | ------------ |
|--------------|--------------|
| x86 | linux/386 |
| x86-64 | linux/amd64 |
| ARM32 | linux/arm/v7 |
@@ -51,7 +50,6 @@ We provide a `docker-compose.yml` configuration file. Clone this repository and
the container.
If you prefer the `docker cli` execute the following command.
```bash
docker run -d \
--name=flaresolverr \
@@ -71,29 +69,28 @@ Remember to restart the Docker daemon and the container after the update.
> Precompiled binaries are only available for x64 architecture. For other architectures see Docker images.
This is the recommended way for Windows users.
- Download the [FlareSolverr executable](https://github.com/FlareSolverr/FlareSolverr/releases) from the release's page. It is available for Windows x64 and Linux x64.
- Execute FlareSolverr binary. In the environment variables section you can find how to change the configuration.
* Download the [FlareSolverr executable](https://github.com/FlareSolverr/FlareSolverr/releases) from the release's page. It is available for Windows x64 and Linux x64.
* Execute FlareSolverr binary. In the environment variables section you can find how to change the configuration.
### From source code
> **Warning**
> Installing from source code only works for x64 architecture. For other architectures see Docker images.
- Install [Python 3.13](https://www.python.org/downloads/).
- Install [Chrome](https://www.google.com/intl/en_us/chrome/) (all OS) or [Chromium](https://www.chromium.org/getting-involved/download-chromium/) (just Linux, it doesn't work in Windows) web browser.
- (Only in Linux) Install [Xvfb](https://en.wikipedia.org/wiki/Xvfb) package.
- (Only in macOS) Install [XQuartz](https://www.xquartz.org/) package.
- Clone this repository and open a shell in that path.
- Run `pip install -r requirements.txt` command to install FlareSolverr dependencies.
- Run `python src/flaresolverr.py` command to start FlareSolverr.
* Install [Python 3.11](https://www.python.org/downloads/).
* Install [Chrome](https://www.google.com/intl/en_us/chrome/) (all OS) or [Chromium](https://www.chromium.org/getting-involved/download-chromium/) (just Linux, it doesn't work in Windows) web browser.
* (Only in Linux) Install [Xvfb](https://en.wikipedia.org/wiki/Xvfb) package.
* (Only in macOS) Install [XQuartz](https://www.xquartz.org/) package.
* Clone this repository and open a shell in that path.
* Run `pip install -r requirements.txt` command to install FlareSolverr dependencies.
* Run `python src/flaresolverr.py` command to start FlareSolverr.
### From source code (FreeBSD/TrueNAS CORE)
- Run `pkg install chromium python313 py313-pip xorg-vfbserver` command to install the required dependencies.
- Clone this repository and open a shell in that path.
- Run `python3.13 -m pip install -r requirements.txt` command to install FlareSolverr dependencies.
- Run `python3.13 src/flaresolverr.py` command to start FlareSolverr.
* Run `pkg install chromium python39 py39-pip xorg-vfbserver` command to install the required dependencies.
* Clone this repository and open a shell in that path.
* Run `python3.9 -m pip install -r requirements.txt` command to install FlareSolverr dependencies.
* Run `python3.9 src/flaresolverr.py` command to start FlareSolverr.
### Systemd service
@@ -102,7 +99,6 @@ We provide an example Systemd unit file `flaresolverr.service` as reference. You
## Usage
Example Bash request:
```bash
curl -L -X POST 'http://localhost:8191/v1' \
-H 'Content-Type: application/json' \
@@ -114,7 +110,6 @@ curl -L -X POST 'http://localhost:8191/v1' \
```
Example Python request:
```py
import requests
@@ -130,7 +125,6 @@ print(response.text)
```
Example PowerShell request:
```ps1
$body = @{
cmd = "request.get"
@@ -151,9 +145,9 @@ cookies for the browser to use.
This also speeds up the requests since it won't have to launch a new browser instance for every request.
| Parameter | Notes |
| --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| session | Optional. The session ID that you want to be assigned to the instance. If isn't set a random UUID will be assigned. |
| Parameter | Notes |
|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| session | Optional. The session ID that you want to be assigned to the instance. If isn't set a random UUID will be assigned. |
| proxy | Optional, default disabled. Eg: `"proxy": {"url": "http://127.0.0.1:8888"}`. You must include the proxy schema in the URL: `http://`, `socks4://` or `socks5://`. Authorization (username/password) is supported. Eg: `"proxy": {"url": "http://127.0.0.1:8888", "username": "testuser", "password": "testpass"}` |
#### + `sessions.list`
@@ -166,7 +160,11 @@ Example response:
```json
{
"sessions": ["session_id_1", "session_id_2", "session_id_3..."]
"sessions": [
"session_id_1",
"session_id_2",
"session_id_3..."
]
}
```
@@ -176,24 +174,20 @@ This will properly shutdown a browser instance and remove all files associated w
session. When you no longer need to use a session you should make sure to close it.
| Parameter | Notes |
| --------- | --------------------------------------------- |
|-----------|-----------------------------------------------|
| session | The session ID that you want to be destroyed. |
#### + `request.get`
| Parameter | Notes |
| ------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| url | Mandatory |
| session | Optional. Will send the request from and existing browser instance. If one is not sent it will create a temporary instance that will be destroyed immediately after the request is completed. |
| session_ttl_minutes | Optional. FlareSolverr will automatically rotate expired sessions based on the TTL provided in minutes. |
| maxTimeout | Optional, default value 60000. Max timeout to solve the challenge in milliseconds. |
| cookies | Optional. Will be used by the headless browser. Eg: `"cookies": [{"name": "cookie1", "value": "value1"}, {"name": "cookie2", "value": "value2"}]`. |
| returnOnlyCookies | Optional, default false. Only returns the cookies. Response data, headers and other parts of the response are removed. |
| returnScreenshot | Optional, default false. Captures a screenshot of the final rendered page after all challenges and waits are completed. The screenshot is returned as a Base64-encoded PNG string in the `screenshot` field of the response. |
| proxy | Optional, default disabled. Eg: `"proxy": {"url": "http://127.0.0.1:8888"}`. You must include the proxy schema in the URL: `http://`, `socks4://` or `socks5://`. Authorization (username/password) is not supported. (When the `session` parameter is set, the proxy is ignored; a session specific proxy can be set in `sessions.create`.) |
| waitInSeconds | Optional, default none. Length to wait in seconds after solving the challenge, and before returning the results. Useful to allow it to load dynamic content. |
| disableMedia | Optional, default false. When true FlareSolverr will prevent media resources (images, CSS, and fonts) from being loaded to speed up navigation. |
| tabs_till_verify | Optional, default none. Number of times the `Tab` button is needed to be pressed to end up on the turnstile captcha, in order to verify it. After verifying the captcha, the result will be stored in the solution under `turnstile_token`. |
> **Warning**
> If you want to use Cloudflare clearance cookie in your scripts, make sure you use the FlareSolverr User-Agent too. If they don't match you will see the challenge.
@@ -202,103 +196,96 @@ Example response from running the `curl` above:
```json
{
"solution": {
"url": "https://www.google.com/?gws_rd=ssl",
"status": 200,
"headers": {
"status": "200",
"date": "Thu, 16 Jul 2020 04:15:49 GMT",
"expires": "-1",
"cache-control": "private, max-age=0",
"content-type": "text/html; charset=UTF-8",
"strict-transport-security": "max-age=31536000",
"p3p": "CP=\"This is not a P3P policy! See g.co/p3phelp for more info.\"",
"content-encoding": "br",
"server": "gws",
"content-length": "61587",
"x-xss-protection": "0",
"x-frame-options": "SAMEORIGIN",
"set-cookie": "1P_JAR=2020-07-16-04; expires=Sat..."
"solution": {
"url": "https://www.google.com/?gws_rd=ssl",
"status": 200,
"headers": {
"status": "200",
"date": "Thu, 16 Jul 2020 04:15:49 GMT",
"expires": "-1",
"cache-control": "private, max-age=0",
"content-type": "text/html; charset=UTF-8",
"strict-transport-security": "max-age=31536000",
"p3p": "CP=\"This is not a P3P policy! See g.co/p3phelp for more info.\"",
"content-encoding": "br",
"server": "gws",
"content-length": "61587",
"x-xss-protection": "0",
"x-frame-options": "SAMEORIGIN",
"set-cookie": "1P_JAR=2020-07-16-04; expires=Sat..."
},
"response":"<!DOCTYPE html>...",
"cookies": [
{
"name": "NID",
"value": "204=QE3Ocq15XalczqjuDy52HeseG3zAZuJzID3R57...",
"domain": ".google.com",
"path": "/",
"expires": 1610684149.307722,
"size": 178,
"httpOnly": true,
"secure": true,
"session": false,
"sameSite": "None"
},
{
"name": "1P_JAR",
"value": "2020-07-16-04",
"domain": ".google.com",
"path": "/",
"expires": 1597464949.307626,
"size": 19,
"httpOnly": false,
"secure": true,
"session": false,
"sameSite": "None"
}
],
"userAgent": "Windows NT 10.0; Win64; x64) AppleWebKit/5..."
},
"response": "<!DOCTYPE html>...",
"cookies": [
{
"name": "NID",
"value": "204=QE3Ocq15XalczqjuDy52HeseG3zAZuJzID3R57...",
"domain": ".google.com",
"path": "/",
"expires": 1610684149.307722,
"size": 178,
"httpOnly": true,
"secure": true,
"session": false,
"sameSite": "None"
},
{
"name": "1P_JAR",
"value": "2020-07-16-04",
"domain": ".google.com",
"path": "/",
"expires": 1597464949.307626,
"size": 19,
"httpOnly": false,
"secure": true,
"session": false,
"sameSite": "None"
}
],
"userAgent": "Windows NT 10.0; Win64; x64) AppleWebKit/5...",
"turnstile_token": "03AGdBq24k3lK7JH2v8uN1T5F..."
},
"status": "ok",
"message": "",
"startTimestamp": 1594872947467,
"endTimestamp": 1594872949617,
"version": "1.0.0"
"status": "ok",
"message": "",
"startTimestamp": 1594872947467,
"endTimestamp": 1594872949617,
"version": "1.0.0"
}
```
### + `request.post`
This works like `request.get`, with the addition of the postData parameter. Note that `tabs_till_verify` is currently supported only for GET requests and requires one extra argument.
This is the same as `request.get` but it takes one more param:
| Parameter | Notes |
| --------- | ------------------------------------------------------------------------ |
|-----------|--------------------------------------------------------------------------|
| postData | Must be a string with `application/x-www-form-urlencoded`. Eg: `a=b&c=d` |
## Environment variables
| Name | Default | Notes |
| ------------------ | ---------------------- | ---------------------------------------------------------------------------------------------------------------------------------------- |
| LOG_LEVEL | info | Verbosity of the logging. Use `LOG_LEVEL=debug` for more information. |
| LOG_FILE | none | Path to capture log to file. Example: `/config/flaresolverr.log`. |
| LOG_HTML | false | Only for debugging. If `true` all HTML that passes through the proxy will be logged to the console in `debug` level. |
| PROXY_URL | none | URL for proxy. Will be overwritten by `request` or `sessions` proxy, if used. Example: `http://127.0.0.1:8080`. |
| PROXY_USERNAME | none | Username for proxy. Will be overwritten by `request` or `sessions` proxy, if used. Example: `testuser`. |
| PROXY_PASSWORD | none | Password for proxy. Will be overwritten by `request` or `sessions` proxy, if used. Example: `testpass`. |
| CAPTCHA_SOLVER | none | Captcha solving method. It is used when a captcha is encountered. See the Captcha Solvers section. |
| TZ | UTC | Timezone used in the logs and the web browser. Example: `TZ=Europe/London`. |
| LANG | none | Language used in the web browser. Example: `LANG=en_GB`. |
| HEADLESS | true | Only for debugging. To run the web browser in headless mode or visible. |
| DISABLE_MEDIA | false | To disable loading images, CSS, and other media in the web browser to save network bandwidth. |
| TEST_URL | https://www.google.com | FlareSolverr makes a request on start to make sure the web browser is working. You can change that URL if it is blocked in your country. |
| PORT | 8191 | Listening port. You don't need to change this if you are running on Docker. |
| HOST | 0.0.0.0 | Listening interface. You don't need to change this if you are running on Docker. |
| PROMETHEUS_ENABLED | false | Enable Prometheus exporter. See the Prometheus section below. |
| PROMETHEUS_PORT | 8192 | Listening port for Prometheus exporter. See the Prometheus section below. |
| Name | Default | Notes |
|--------------------|------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| LOG_LEVEL | info | Verbosity of the logging. Use `LOG_LEVEL=debug` for more information. |
| LOG_HTML | false | Only for debugging. If `true` all HTML that passes through the proxy will be logged to the console in `debug` level. |
| CAPTCHA_SOLVER | none | Captcha solving method. It is used when a captcha is encountered. See the Captcha Solvers section. |
| TZ | UTC | Timezone used in the logs and the web browser. Example: `TZ=Europe/London`. |
| LANG | none | Language used in the web browser. Example: `LANG=en_GB`. |
| HEADLESS | true | Only for debugging. To run the web browser in headless mode or visible. |
| BROWSER_TIMEOUT | 40000 | If you are experiencing errors/timeouts because your system is slow, you can try to increase this value. Remember to increase the `maxTimeout` parameter too. |
| TEST_URL | https://www.google.com | FlareSolverr makes a request on start to make sure the web browser is working. You can change that URL if it is blocked in your country. |
| PORT | 8191 | Listening port. You don't need to change this if you are running on Docker. |
| HOST | 0.0.0.0 | Listening interface. You don't need to change this if you are running on Docker. |
| PROMETHEUS_ENABLED | false | Enable Prometheus exporter. See the Prometheus section below. |
| PROMETHEUS_PORT | 8192 | Listening port for Prometheus exporter. See the Prometheus section below. |
Environment variables are set differently depending on the operating system. Some examples:
- Docker: Take a look at the Docker section in this document. Environment variables can be set in the `docker-compose.yml` file or in the Docker CLI command.
- Linux: Run `export LOG_LEVEL=debug` and then run `flaresolverr` in the same shell.
- Windows: Open `cmd.exe`, run `set LOG_LEVEL=debug` and then run `flaresolverr.exe` in the same shell.
* Docker: Take a look at the Docker section in this document. Environment variables can be set in the `docker-compose.yml` file or in the Docker CLI command.
* Linux: Run `export LOG_LEVEL=debug` and then run `flaresolverr` in the same shell.
* Windows: Open `cmd.exe`, run `set LOG_LEVEL=debug` and then run `flaresolverr.exe` in the same shell.
## Prometheus exporter
The Prometheus exporter for FlareSolverr is disabled by default. It can be enabled with the environment variable `PROMETHEUS_ENABLED`. If you are using Docker make sure you expose the `PROMETHEUS_PORT`.
Example metrics:
```shell
# HELP flaresolverr_request_total Total requests with result
# TYPE flaresolverr_request_total counter
@@ -330,9 +317,8 @@ solve a captcha.
If this is the case, FlareSolverr will return the error `Captcha detected but no automatic solver is configured.`
FlareSolverr can be customized to solve the CAPTCHA automatically by setting the environment variable `CAPTCHA_SOLVER`
to the file name of one of the adapters inside the `/captcha` directory.
to the file name of one of the adapters inside the [/captcha](src/captcha) directory.
## Related projects
- C# implementation => https://github.com/FlareSolverr/FlareSolverrSharp
* C# implementation => https://github.com/FlareSolverr/FlareSolverrSharp

View File

@@ -7,12 +7,9 @@ services:
container_name: flaresolverr
environment:
- LOG_LEVEL=${LOG_LEVEL:-info}
- LOG_FILE=${LOG_FILE:-none}
- LOG_HTML=${LOG_HTML:-false}
- CAPTCHA_SOLVER=${CAPTCHA_SOLVER:-none}
- TZ=Europe/London
ports:
- "${PORT:-8191}:8191"
volumes:
- /var/lib/flaresolver:/config
restart: unless-stopped

View File

@@ -1,19 +0,0 @@
[Unit]
Description=FlareSolverr
After=network.target
[Service]
SyslogIdentifier=flaresolverr
Restart=always
RestartSec=5
Type=simple
User=flaresolverr
Group=flaresolverr
Environment="LOG_LEVEL=info"
Environment="CAPTCHA_SOLVER=none"
WorkingDirectory=/opt/flaresolverr
ExecStart=/opt/flaresolverr/flaresolverr
TimeoutStopSec=30
[Install]
WantedBy=multi-user.target

View File

@@ -1,6 +1,6 @@
{
"name": "flaresolverr",
"version": "3.4.6",
"version": "3.3.21",
"description": "Proxy server to bypass Cloudflare protection",
"author": "Diego Heras (ngosang / ngosang@hotmail.es)",
"license": "MIT"

View File

@@ -1,14 +1,13 @@
bottle==0.13.4
waitress==3.0.2
selenium==4.38.0
bottle==0.12.25
waitress==2.1.2
DrissionPage==4.1.0.0b14
func-timeout==4.3.5
prometheus-client==0.23.1
# Required by undetected_chromedriver
requests==2.32.5
certifi==2025.10.5
websockets==15.0.1
packaging==25.0
# Only required for Linux and macOS
xvfbwrapper==0.2.15; platform_system != "Windows"
# Only required for Windows
pefile==2024.8.26; platform_system == "Windows"
prometheus-client==0.17.1
# required by undetected_chromedriver
requests==2.32.3
certifi==2024.07.04
websockets==11.0.3
# only required for linux and macos
xvfbwrapper==0.2.9; platform_system != "Windows"
# only required for windows
pefile==2023.2.7; platform_system == "Windows"

View File

@@ -5,7 +5,7 @@ import logging
def logger_plugin(callback):
"""
Bottle plugin to use logging module
https://bottlepy.org/docs/dev/plugindev.html
http://bottlepy.org/docs/dev/plugindev.html
Wrap a Bottle request so that a log line is emitted after it's handled.
(This decorator can be extended to take the desired logger as a param.)

View File

@@ -18,7 +18,7 @@ def setup():
def prometheus_plugin(callback):
"""
Bottle plugin to expose Prometheus metrics
https://bottlepy.org/docs/dev/plugindev.html
http://bottlepy.org/docs/dev/plugindev.html
"""
def wrapper(*args, **kwargs):
actual_response = callback(*args, **kwargs)

View File

@@ -25,7 +25,7 @@ def clean_files():
def download_chromium():
# https://commondatastorage.googleapis.com/chromium-browser-snapshots/index.html?prefix=Linux_x64/
revision = "1522586" if os.name == 'nt' else '1522586'
revision = "1260008" if os.name == 'nt' else '1260015'
arch = 'Win_x64' if os.name == 'nt' else 'Linux_x64'
dl_file = 'chrome-win' if os.name == 'nt' else 'chrome-linux'
dl_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir, 'dist_chrome')

View File

@@ -10,8 +10,6 @@ class ChallengeResolutionResultT:
response: str = None
cookies: list = None
userAgent: str = None
screenshot: str | None = None
turnstile_token: str = None
def __init__(self, _dict):
self.__dict__.update(_dict)
@@ -43,14 +41,8 @@ class V1RequestBase(object):
url: str = None
postData: str = None
returnOnlyCookies: bool = None
returnScreenshot: bool = None
download: bool = None # deprecated v2.0.0, not used
returnRawHtml: bool = None # deprecated v2.0.0, not used
waitInSeconds: int = None
# Optional resource blocking flag (blocks images, CSS, and fonts)
disableMedia: bool = None
# Optional when you've got a turnstile captcha that needs to be clicked after X number of Tab presses
tabs_till_verify : int = None
def __init__(self, _dict):
self.__dict__.update(_dict)

View File

@@ -13,10 +13,6 @@ from dtos import V1RequestBase
import flaresolverr_service
import utils
env_proxy_url = os.environ.get('PROXY_URL', None)
env_proxy_username = os.environ.get('PROXY_USERNAME', None)
env_proxy_password = os.environ.get('PROXY_PASSWORD', None)
class JSONErrorBottle(Bottle):
"""
@@ -54,14 +50,7 @@ def controller_v1():
"""
Controller v1
"""
data = request.json or {}
if (('proxy' not in data or not data.get('proxy')) and env_proxy_url is not None and (env_proxy_username is None and env_proxy_password is None)):
logging.info('Using proxy URL ENV')
data['proxy'] = {"url": env_proxy_url}
if (('proxy' not in data or not data.get('proxy')) and env_proxy_url is not None and (env_proxy_username is not None or env_proxy_password is not None)):
logging.info('Using proxy URL, username & password ENVs')
data['proxy'] = {"url": env_proxy_url, "username": env_proxy_username, "password": env_proxy_password}
req = V1RequestBase(data)
req = V1RequestBase(request.json)
res = flaresolverr_service.controller_v1_endpoint(req)
if res.__error_500__:
response.status = 500
@@ -81,13 +70,12 @@ if __name__ == "__main__":
# fix ssl certificates for compiled binaries
# https://github.com/pyinstaller/pyinstaller/issues/7229
# https://stackoverflow.com/q/55736855
# https://stackoverflow.com/questions/55736855/how-to-change-the-cafile-argument-in-the-ssl-module-in-python3
os.environ["REQUESTS_CA_BUNDLE"] = certifi.where()
os.environ["SSL_CERT_FILE"] = certifi.where()
# validate configuration
log_level = os.environ.get('LOG_LEVEL', 'info').upper()
log_file = os.environ.get('LOG_FILE', None)
log_html = utils.get_config_log_html()
headless = utils.get_config_headless()
server_host = os.environ.get('HOST', '0.0.0.0')
@@ -97,29 +85,14 @@ if __name__ == "__main__":
logger_format = '%(asctime)s %(levelname)-8s %(message)s'
if log_level == 'DEBUG':
logger_format = '%(asctime)s %(levelname)-8s ReqId %(thread)s %(message)s'
if log_file:
log_file = os.path.realpath(log_file)
log_path = os.path.dirname(log_file)
os.makedirs(log_path, exist_ok=True)
logging.basicConfig(
format=logger_format,
level=log_level,
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(log_file)
]
)
else:
logging.basicConfig(
format=logger_format,
level=log_level,
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logging.basicConfig(
format=logger_format,
level=log_level,
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
# disable warning traces from urllib3
logging.getLogger('urllib3').setLevel(logging.ERROR)
logging.getLogger('selenium.webdriver.remote.remote_connection').setLevel(logging.WARNING)

View File

@@ -7,14 +7,8 @@ from html import escape
from urllib.parse import unquote, quote
from func_timeout import FunctionTimedOut, func_timeout
from selenium.common import TimeoutException
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.expected_conditions import (
presence_of_element_located, staleness_of, title_is)
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.wait import WebDriverWait
from DrissionPage import ChromiumPage
from DrissionPage._units.listener import DataPacket
import utils
from dtos import (STATUS_ERROR, STATUS_OK, ChallengeResolutionResultT,
@@ -48,11 +42,6 @@ CHALLENGE_SELECTORS = [
# Fairlane / pararius.com
'div.vc div.text-box h2'
]
TURNSTILE_SELECTORS = [
"input[name='cf-turnstile-response']"
]
SHORT_TIMEOUT = 1
SESSIONS_STORAGE = SessionsStorage()
@@ -257,241 +246,138 @@ def _resolve_challenge(req: V1RequestBase, method: str) -> ChallengeResolutionT:
driver.quit()
logging.debug('A used instance of webdriver has been destroyed')
def click_verify(driver: WebDriver, num_tabs: int = 1):
def click_verify(driver: ChromiumPage) -> DataPacket:
try:
logging.debug("Try to find the Cloudflare verify checkbox...")
actions = ActionChains(driver)
actions.pause(5)
for _ in range(num_tabs):
actions.send_keys(Keys.TAB).pause(0.1)
actions.pause(1)
actions.send_keys(Keys.SPACE).perform()
logging.debug(f"Cloudflare verify checkbox clicked after {num_tabs} tabs!")
except Exception:
logging.debug("Cloudflare verify checkbox not found on the page.")
finally:
driver.switch_to.default_content()
try:
logging.debug("Try to find the Cloudflare 'Verify you are human' button...")
button = driver.find_element(
by=By.XPATH,
value="//input[@type='button' and @value='Verify you are human']",
bde = (
driver
.ele("@Style=border: 0px; margin: 0px; padding: 0px;", timeout=10)
.shadow_root
.ele("tag:iframe", timeout=10)
.ele('tag:body', timeout=10)
.shadow_root
)
if button:
actions = ActionChains(driver)
actions.move_to_element_with_offset(button, 5, 7)
actions.click(button)
actions.perform()
logging.debug("The Cloudflare 'Verify you are human' button found and clicked!")
except Exception:
logging.debug("The Cloudflare 'Verify you are human' button not found on the page.")
ve = bde.ele("text:Verify you are human", timeout=10)
time.sleep(2)
driver.listen.resume()
ve.click()
data = driver.listen.wait(count=1,timeout=5)
def _get_turnstile_token(driver: WebDriver, tabs: int):
token_input = driver.find_element(By.CSS_SELECTOR, "input[name='cf-turnstile-response']")
current_value = token_input.get_attribute("value")
while True:
click_verify(driver, num_tabs=tabs)
turnstile_token = token_input.get_attribute("value")
if turnstile_token:
if turnstile_token != current_value:
logging.info(f"Turnstile token: {turnstile_token}")
return turnstile_token
logging.debug(f"Failed to extract token possibly click failed")
if isinstance(data, DataPacket):
return data
# reset focus
driver.execute_script("""
let el = document.createElement('button');
el.style.position='fixed';
el.style.top='0';
el.style.left='0';
document.body.prepend(el);
el.focus();
""")
time.sleep(1)
return None
def _resolve_turnstile_captcha(req: V1RequestBase, driver: WebDriver):
turnstile_token = None
if req.tabs_till_verify is not None:
logging.debug(f'Navigating to... {req.url} in order to pass the turnstile challenge')
driver.get(req.url)
except Exception as e:
logging.debug("Cloudflare verify checkbox not found on the page. %s", repr(e))
turnstile_challenge_found = False
for selector in TURNSTILE_SELECTORS:
found_elements = driver.find_elements(By.CSS_SELECTOR, selector)
if len(found_elements) > 0:
turnstile_challenge_found = True
logging.info("Turnstile challenge detected. Selector found: " + selector)
break
if turnstile_challenge_found:
turnstile_token = _get_turnstile_token(driver=driver, tabs=req.tabs_till_verify)
else:
logging.debug(f'Turnstile challenge not found')
return turnstile_token
def _evil_logic(req: V1RequestBase, driver: WebDriver, method: str) -> ChallengeResolutionT:
def search_challenge(driver: ChromiumPage) -> bool:
page_title = driver.title.lower()
# find challenge by title
for title in CHALLENGE_TITLES:
if title.lower() == page_title:
logging.debug("Challenge detected. Title found: %s", page_title)
return True
# find challenge by selectors
if driver.wait.eles_loaded(locators=CHALLENGE_SELECTORS, timeout=SHORT_TIMEOUT, any_one=True):
logging.debug("Challenge detected. One of selectors found")
return True
return False
def _evil_logic(req: V1RequestBase, driver: ChromiumPage, method: str) -> ChallengeResolutionT:
res = ChallengeResolutionT({})
res.status = STATUS_OK
res.message = ""
# optionally block resources like images/css/fonts using CDP
disable_media = utils.get_config_disable_media()
if req.disableMedia is not None:
disable_media = req.disableMedia
if disable_media:
block_urls = [
# Images
"*.png", "*.jpg", "*.jpeg", "*.gif", "*.webp", "*.bmp", "*.svg", "*.ico",
"*.PNG", "*.JPG", "*.JPEG", "*.GIF", "*.WEBP", "*.BMP", "*.SVG", "*.ICO",
"*.tiff", "*.tif", "*.jpe", "*.apng", "*.avif", "*.heic", "*.heif",
"*.TIFF", "*.TIF", "*.JPE", "*.APNG", "*.AVIF", "*.HEIC", "*.HEIF",
# Stylesheets
"*.css",
"*.CSS",
# Fonts
"*.woff", "*.woff2", "*.ttf", "*.otf", "*.eot",
"*.WOFF", "*.WOFF2", "*.TTF", "*.OTF", "*.EOT"
]
try:
logging.debug("Network.setBlockedURLs: %s", block_urls)
driver.execute_cdp_cmd("Network.enable", {})
driver.execute_cdp_cmd("Network.setBlockedURLs", {"urls": block_urls})
except Exception:
# if CDP commands are not available or fail, ignore and continue
logging.debug("Network.setBlockedURLs failed or unsupported on this webdriver")
# navigate to the page
logging.debug(f"Navigating to... {req.url}")
turnstile_token = None
if method == "POST":
logging.debug('Navigating to... %s', req.url)
driver.listen.start(req.url)
if method == 'POST':
_post_request(req, driver)
else:
if req.tabs_till_verify is None:
driver.get(req.url)
else:
turnstile_token = _resolve_turnstile_captcha(req, driver)
driver.get(req.url)
data = driver.listen.wait(count=1,timeout=5)
driver.listen.pause()
# set cookies if required
if req.cookies is not None and len(req.cookies) > 0:
logging.debug(f'Setting cookies...')
logging.debug('Setting cookies...')
for cookie in req.cookies:
driver.delete_cookie(cookie['name'])
driver.add_cookie(cookie)
driver.set.cookies.remove(cookie['name'])
driver.set.cookies(cookie)
# reload the page
driver.listen.resume()
if method == 'POST':
_post_request(req, driver)
else:
driver.get(req.url)
data = driver.listen.wait(count=1, timeout=5)
driver.listen.pause()
# wait for the page
if utils.get_config_log_html():
logging.debug(f"Response HTML:\n{driver.page_source}")
html_element = driver.find_element(By.TAG_NAME, "html")
page_title = driver.title
logging.debug("Response HTML:\n%s", driver.page_source)
page_title = driver.title
# find access denied titles
for title in ACCESS_DENIED_TITLES:
if page_title.startswith(title):
if title == page_title:
raise Exception('Cloudflare has blocked this request. '
'Probably your IP is banned for this site, check in your web browser.')
# find access denied selectors
for selector in ACCESS_DENIED_SELECTORS:
found_elements = driver.find_elements(By.CSS_SELECTOR, selector)
if len(found_elements) > 0:
raise Exception('Cloudflare has blocked this request. '
'Probably your IP is banned for this site, check in your web browser.')
# find challenge by title
challenge_found = False
for title in CHALLENGE_TITLES:
if title.lower() == page_title.lower():
challenge_found = True
logging.info("Challenge detected. Title found: " + page_title)
break
if not challenge_found:
# find challenge by selectors
for selector in CHALLENGE_SELECTORS:
found_elements = driver.find_elements(By.CSS_SELECTOR, selector)
if len(found_elements) > 0:
challenge_found = True
logging.info("Challenge detected. Selector found: " + selector)
break
if driver.wait.eles_loaded(locators=ACCESS_DENIED_SELECTORS, timeout=SHORT_TIMEOUT, any_one=True):
raise Exception('Cloudflare has blocked this request. '
'Probably your IP is banned for this site, check in your web browser.')
attempt = 0
if challenge_found:
while True:
try:
attempt = attempt + 1
# wait until the title changes
for title in CHALLENGE_TITLES:
logging.debug("Waiting for title (attempt " + str(attempt) + "): " + title)
WebDriverWait(driver, SHORT_TIMEOUT).until_not(title_is(title))
challenge_found = True
while challenge_found:
try:
attempt += 1
# then wait until all the selectors disappear
for selector in CHALLENGE_SELECTORS:
logging.debug("Waiting for selector (attempt " + str(attempt) + "): " + selector)
WebDriverWait(driver, SHORT_TIMEOUT).until_not(
presence_of_element_located((By.CSS_SELECTOR, selector)))
if search_challenge(driver):
if attempt == 1:
logging.info("Challenge detected.")
# all elements not found
data = click_verify(driver)
else:
if attempt == 1:
logging.info("Challenge not detected!")
res.message = "Challenge not detected!"
else:
logging.info("Challenge solved!")
res.message = "Challenge solved!"
break
except TimeoutException:
logging.debug("Timeout waiting for selector")
except Exception as e:
logging.debug("Cloudflare check exception")
raise e
click_verify(driver)
# update the html (cloudflare reloads the page every 5 s)
html_element = driver.find_element(By.TAG_NAME, "html")
# waits until cloudflare redirection ends
logging.debug("Waiting for redirect")
# noinspection PyBroadException
try:
WebDriverWait(driver, SHORT_TIMEOUT).until(staleness_of(html_element))
except Exception:
logging.debug("Timeout waiting for redirect")
logging.info("Challenge solved!")
res.message = "Challenge solved!"
else:
logging.info("Challenge not detected!")
res.message = "Challenge not detected!"
challenge_res = ChallengeResolutionResultT({})
challenge_res.url = driver.current_url
challenge_res.status = 200 # todo: fix, selenium not provides this info
challenge_res.cookies = driver.get_cookies()
challenge_res.url = driver.url
if data is not None and data.response is not None:
challenge_res.status = data.response.status
if not req.returnOnlyCookies:
challenge_res.response = data.response.body
challenge_res.headers = data.response.headers.copy()
challenge_res.cookies = driver.cookies()
challenge_res.userAgent = utils.get_user_agent(driver)
challenge_res.turnstile_token = turnstile_token
if not req.returnOnlyCookies:
challenge_res.headers = {} # todo: fix, selenium not provides this info
if req.waitInSeconds and req.waitInSeconds > 0:
logging.info("Waiting " + str(req.waitInSeconds) + " seconds before returning the response...")
time.sleep(req.waitInSeconds)
challenge_res.response = driver.page_source
if req.returnScreenshot:
challenge_res.screenshot = driver.get_screenshot_as_base64()
res.result = challenge_res
return res
def _post_request(req: V1RequestBase, driver: WebDriver):
def _post_request(req: V1RequestBase, driver: ChromiumPage):
post_form = f'<form id="hackForm" action="{req.url}" method="POST">'
query_string = req.postData if req.postData and req.postData[0] != '?' else req.postData[1:] if req.postData else ''
query_string = req.postData if req.postData[0] != '?' else req.postData[1:]
pairs = query_string.split('&')
for pair in pairs:
parts = pair.split('=', 1)
parts = pair.split('=')
# noinspection PyBroadException
try:
name = unquote(parts[0])
@@ -501,11 +387,9 @@ def _post_request(req: V1RequestBase, driver: WebDriver):
continue
# noinspection PyBroadException
try:
value = unquote(parts[1]) if len(parts) > 1 else ''
value = unquote(parts[1])
except Exception:
value = parts[1] if len(parts) > 1 else ''
# Protection of " character, for syntax
value=value.replace('"','&quot;')
value = parts[1]
post_form += f'<input type="text" name="{escape(quote(name))}" value="{escape(quote(value))}"><br>'
post_form += '</form>'
html_content = f"""

View File

@@ -4,7 +4,7 @@ from datetime import datetime, timedelta
from typing import Optional, Tuple
from uuid import uuid1
from selenium.webdriver.chrome.webdriver import WebDriver
from DrissionPage import ChromiumPage
import utils
@@ -12,7 +12,7 @@ import utils
@dataclass
class Session:
session_id: str
driver: WebDriver
driver: ChromiumPage
created_at: datetime
def lifetime(self) -> timedelta:
@@ -27,13 +27,13 @@ class SessionsStorage:
def create(self, session_id: Optional[str] = None, proxy: Optional[dict] = None,
force_new: Optional[bool] = False) -> Tuple[Session, bool]:
"""create creates new instance of WebDriver if necessary,
"""create creates new instance of ChromiumPage if necessary,
assign defined (or newly generated) session_id to the instance
and returns the session object. If a new session has been created
second argument is set to True.
Note: The function is idempotent, so in case if session_id
already exists in the storage a new instance of WebDriver won't be created
already exists in the storage a new instance of ChromiumPage won't be created
and existing session will be returned. Second argument defines if
new session has been created (True) or an existing one was used (False).
"""

View File

@@ -21,11 +21,11 @@ class TestFlareSolverr(unittest.TestCase):
proxy_socks_url = "socks5://127.0.0.1:1080"
google_url = "https://www.google.com"
post_url = "https://httpbin.org/post"
cloudflare_url = "https://nowsecure.nl/"
cloudflare_url = "https://nowsecure.nl"
cloudflare_url_2 = "https://idope.se/torrent-list/harry/"
ddos_guard_url = "https://www.litres.ru/"
ddos_guard_url = "https://anidex.info/"
fairlane_url = "https://www.pararius.com/apartments/amsterdam"
custom_cloudflare_url = "https://www.muziekfabriek.org/"
custom_cloudflare_url = "https://www.muziekfabriek.org"
cloudflare_blocked_url = "https://cpasbiens3.fr/index.php?do=search&subaction=search"
app = TestApp(flaresolverr.app)
@@ -92,29 +92,6 @@ class TestFlareSolverr(unittest.TestCase):
self.assertGreater(len(solution.cookies), 0)
self.assertIn("Chrome/", solution.userAgent)
def test_v1_endpoint_request_get_disable_resources(self):
res = self.app.post_json("/v1", {
"cmd": "request.get",
"url": self.google_url,
"disableMedia": True
})
self.assertEqual(res.status_code, 200)
body = V1ResponseBase(res.json)
self.assertEqual(STATUS_OK, body.status)
self.assertEqual("Challenge not detected!", body.message)
self.assertGreater(body.startTimestamp, 10000)
self.assertGreaterEqual(body.endTimestamp, body.startTimestamp)
self.assertEqual(utils.get_flaresolverr_version(), body.version)
solution = body.solution
self.assertIn(self.google_url, solution.url)
self.assertEqual(solution.status, 200)
self.assertIs(len(solution.headers), 0)
self.assertIn("<title>Google</title>", solution.response)
self.assertGreater(len(solution.cookies), 0)
self.assertIn("Chrome/", solution.userAgent)
def test_v1_endpoint_request_get_cloudflare_js_1(self):
res = self.app.post_json('/v1', {
"cmd": "request.get",
@@ -185,7 +162,7 @@ class TestFlareSolverr(unittest.TestCase):
self.assertIn(self.ddos_guard_url, solution.url)
self.assertEqual(solution.status, 200)
self.assertIs(len(solution.headers), 0)
self.assertIn("<title>Литрес", solution.response)
self.assertIn("<title>AniDex</title>", solution.response)
self.assertGreater(len(solution.cookies), 0)
self.assertIn("Chrome/", solution.userAgent)

View File

@@ -1,910 +0,0 @@
#!/usr/bin/env python3
"""
888 888 d8b
888 888 Y8P
888 888
.d8888b 88888b. 888d888 .d88b. 88888b.d88b. .d88b. .d88888 888d888 888 888 888 .d88b. 888d888
d88P" 888 "88b 888P" d88""88b 888 "888 "88b d8P Y8b d88" 888 888P" 888 888 888 d8P Y8b 888P"
888 888 888 888 888 888 888 888 888 88888888 888 888 888 888 Y88 88P 88888888 888
Y88b. 888 888 888 Y88..88P 888 888 888 Y8b. Y88b 888 888 888 Y8bd8P Y8b. 888
"Y8888P 888 888 888 "Y88P" 888 888 888 "Y8888 "Y88888 888 888 Y88P "Y8888 888 88888888
by UltrafunkAmsterdam (https://github.com/ultrafunkamsterdam)
"""
from __future__ import annotations
__version__ = "3.5.5"
import json
import logging
import os
import pathlib
import re
import shutil
import subprocess
import sys
import tempfile
import time
from weakref import finalize
import selenium.webdriver.chrome.service
import selenium.webdriver.chrome.webdriver
from selenium.webdriver.common.by import By
import selenium.webdriver.chromium.service
import selenium.webdriver.remote.command
import selenium.webdriver.remote.webdriver
from .cdp import CDP
from .dprocess import start_detached
from .options import ChromeOptions
from .patcher import IS_POSIX
from .patcher import Patcher
from .reactor import Reactor
from .webelement import UCWebElement
from .webelement import WebElement
__all__ = (
"Chrome",
"ChromeOptions",
"Patcher",
"Reactor",
"CDP",
"find_chrome_executable",
)
logger = logging.getLogger("uc")
logger.setLevel(logging.getLogger().getEffectiveLevel())
class Chrome(selenium.webdriver.chrome.webdriver.WebDriver):
"""
Controls the ChromeDriver and allows you to drive the browser.
The webdriver file will be downloaded by this module automatically,
you do not need to specify this. however, you may if you wish.
Attributes
----------
Methods
-------
reconnect()
this can be useful in case of heavy detection methods
-stops the chromedriver service which runs in the background
-starts the chromedriver service which runs in the background
-recreate session
start_session(capabilities=None, browser_profile=None)
differentiates from the regular method in that it does not
require a capabilities argument. The capabilities are automatically
recreated from the options at creation time.
--------------------------------------------------------------------------
NOTE:
Chrome has everything included to work out of the box.
it does not `need` customizations.
any customizations MAY lead to trigger bot migitation systems.
--------------------------------------------------------------------------
"""
_instances = set()
session_id = None
debug = False
def __init__(
self,
options=None,
user_data_dir=None,
driver_executable_path=None,
browser_executable_path=None,
port=0,
enable_cdp_events=False,
# service_args=None,
# service_creationflags=None,
desired_capabilities=None,
advanced_elements=False,
# service_log_path=None,
keep_alive=True,
log_level=0,
headless=False,
version_main=None,
patcher_force_close=False,
suppress_welcome=True,
use_subprocess=False,
debug=False,
no_sandbox=True,
windows_headless=False,
user_multi_procs: bool = False,
**kw,
):
"""
Creates a new instance of the chrome driver.
Starts the service and then creates new instance of chrome driver.
Parameters
----------
options: ChromeOptions, optional, default: None - automatic useful defaults
this takes an instance of ChromeOptions, mainly to customize browser behavior.
anything other dan the default, for example extensions or startup options
are not supported in case of failure, and can probably lowers your undetectability.
user_data_dir: str , optional, default: None (creates temp profile)
if user_data_dir is a path to a valid chrome profile directory, use it,
and turn off automatic removal mechanism at exit.
driver_executable_path: str, optional, default: None(=downloads and patches new binary)
browser_executable_path: str, optional, default: None - use find_chrome_executable
Path to the browser executable.
If not specified, make sure the executable's folder is in $PATH
port: int, optional, default: 0
port to be used by the chromedriver executable, this is NOT the debugger port.
leave it at 0 unless you know what you are doing.
the default value of 0 automatically picks an available port.
enable_cdp_events: bool, default: False
:: currently for chrome only
this enables the handling of wire messages
when enabled, you can subscribe to CDP events by using:
driver.add_cdp_listener("Network.dataReceived", yourcallback)
# yourcallback is an callable which accepts exactly 1 dict as parameter
service_args: list of str, optional, default: None
arguments to pass to the driver service
desired_capabilities: dict, optional, default: None - auto from config
Dictionary object with non-browser specific capabilities only, such as "item" or "loggingPref".
advanced_elements: bool, optional, default: False
makes it easier to recognize elements like you know them from html/browser inspection, especially when working
in an interactive environment
default webelement repr:
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
advanced webelement repr
<WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)>
note: when retrieving large amounts of elements ( example: find_elements_by_tag("*") ) and print them, it does take a little more time.
service_log_path: str, optional, default: None
path to log information from the driver.
keep_alive: bool, optional, default: True
Whether to configure ChromeRemoteConnection to use HTTP keep-alive.
log_level: int, optional, default: adapts to python global log level
headless: bool, optional, default: False
can also be specified in the options instance.
Specify whether you want to use the browser in headless mode.
warning: this lowers undetectability and not fully supported.
version_main: int, optional, default: None (=auto)
if you, for god knows whatever reason, use
an older version of Chrome. You can specify it's full rounded version number
here. Example: 87 for all versions of 87
patcher_force_close: bool, optional, default: False
instructs the patcher to do whatever it can to access the chromedriver binary
if the file is locked, it will force shutdown all instances.
setting it is not recommended, unless you know the implications and think
you might need it.
suppress_welcome: bool, optional , default: True
a "welcome" alert might show up on *nix-like systems asking whether you want to set
chrome as your default browser, and if you want to send even more data to google.
now, in case you are nag-fetishist, or a diagnostics data feeder to google, you can set this to False.
Note: if you don't handle the nag screen in time, the browser loses it's connection and throws an Exception.
use_subprocess: bool, optional , default: True,
False (the default) makes sure Chrome will get it's own process (so no subprocess of chromedriver.exe or python
This fixes a LOT of issues, like multithreaded run, but mst importantly. shutting corectly after
program exits or using .quit()
you should be knowing what you're doing, and know how python works.
unfortunately, there is always an edge case in which one would like to write an single script with the only contents being:
--start script--
import undetected_chromedriver as uc
d = uc.Chrome()
d.get('https://somesite/')
---end script --
and will be greeted with an error, since the program exists before chrome has a change to launch.
in that case you can set this to `True`. The browser will start via subprocess, and will keep running most of times.
! setting it to True comes with NO support when being detected. !
no_sandbox: bool, optional, default=True
uses the --no-sandbox option, and additionally does suppress the "unsecure option" status bar
this option has a default of True since many people seem to run this as root (....) , and chrome does not start
when running as root without using --no-sandbox flag.
user_multi_procs:
set to true when you are using multithreads/multiprocessing
ensures not all processes are trying to modify a binary which is in use by another.
for this to work. YOU MUST HAVE AT LEAST 1 UNDETECTED_CHROMEDRIVER BINARY IN YOUR ROAMING DATA FOLDER.
this requirement can be easily satisfied, by just running this program "normal" and close/kill it.
"""
finalize(self, self._ensure_close, self)
self.debug = debug
self.patcher = Patcher(
executable_path=driver_executable_path,
force=patcher_force_close,
version_main=version_main,
user_multi_procs=user_multi_procs,
)
# self.patcher.auto(user_multiprocess = user_multi_num_procs)
self.patcher.auto()
# self.patcher = patcher
if not options:
options = ChromeOptions()
try:
if hasattr(options, "_session") and options._session is not None:
# prevent reuse of options,
# as it just appends arguments, not replace them
# you'll get conflicts starting chrome
raise RuntimeError("you cannot reuse the ChromeOptions object")
except AttributeError:
pass
options._session = self
if not options.debugger_address:
debug_port = (
port
if port != 0
else selenium.webdriver.common.service.utils.free_port()
)
debug_host = "127.0.0.1"
options.debugger_address = "%s:%d" % (debug_host, debug_port)
else:
debug_host, debug_port = options.debugger_address.split(":")
debug_port = int(debug_port)
if enable_cdp_events:
options.set_capability(
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL"}
)
options.add_argument("--remote-debugging-host=%s" % debug_host)
options.add_argument("--remote-debugging-port=%s" % debug_port)
if user_data_dir:
options.add_argument("--user-data-dir=%s" % user_data_dir)
language, keep_user_data_dir = None, bool(user_data_dir)
# see if a custom user profile is specified in options
for arg in options.arguments:
if any([_ in arg for _ in ("--headless", "headless")]):
options.arguments.remove(arg)
options.headless = True
if "lang" in arg:
m = re.search("(?:--)?lang(?:[ =])?(.*)", arg)
try:
language = m[1]
except IndexError:
logger.debug("will set the language to en-US,en;q=0.9")
language = "en-US,en;q=0.9"
if "user-data-dir" in arg:
m = re.search("(?:--)?user-data-dir(?:[ =])?(.*)", arg)
try:
user_data_dir = m[1]
logger.debug(
"user-data-dir found in user argument %s => %s" % (arg, m[1])
)
keep_user_data_dir = True
except IndexError:
logger.debug(
"no user data dir could be extracted from supplied argument %s "
% arg
)
if not user_data_dir:
# backward compatiblity
# check if an old uc.ChromeOptions is used, and extract the user data dir
if hasattr(options, "user_data_dir") and getattr(
options, "user_data_dir", None
):
import warnings
warnings.warn(
"using ChromeOptions.user_data_dir might stop working in future versions."
"use uc.Chrome(user_data_dir='/xyz/some/data') in case you need existing profile folder"
)
options.add_argument("--user-data-dir=%s" % options.user_data_dir)
keep_user_data_dir = True
logger.debug(
"user_data_dir property found in options object: %s" % user_data_dir
)
else:
user_data_dir = os.path.normpath(tempfile.mkdtemp())
keep_user_data_dir = False
arg = "--user-data-dir=%s" % user_data_dir
options.add_argument(arg)
logger.debug(
"created a temporary folder in which the user-data (profile) will be stored during this\n"
"session, and added it to chrome startup arguments: %s" % arg
)
if not language:
try:
import locale
language = locale.getdefaultlocale()[0].replace("_", "-")
except Exception:
pass
if not language:
language = "en-US"
options.add_argument("--lang=%s" % language)
if not options.binary_location:
options.binary_location = (
browser_executable_path or find_chrome_executable()
)
if not options.binary_location or not \
pathlib.Path(options.binary_location).exists():
raise FileNotFoundError(
"\n---------------------\n"
"Could not determine browser executable."
"\n---------------------\n"
"Make sure your browser is installed in the default location (path).\n"
"If you are sure about the browser executable, you can specify it using\n"
"the `browser_executable_path='{}` parameter.\n\n"
.format("/path/to/browser/executable" if IS_POSIX else "c:/path/to/your/browser.exe")
)
self._delay = 3
self.user_data_dir = user_data_dir
self.keep_user_data_dir = keep_user_data_dir
if suppress_welcome:
options.arguments.extend(["--no-default-browser-check", "--no-first-run"])
if no_sandbox:
options.arguments.extend(["--no-sandbox", "--test-type"])
if headless or getattr(options, 'headless', None):
#workaround until a better checking is found
try:
v_main = int(self.patcher.version_main) if self.patcher.version_main else 108
if v_main < 108:
options.add_argument("--headless=chrome")
elif v_main >= 108:
options.add_argument("--headless=new")
except:
logger.warning("could not detect version_main."
"therefore, we are assuming it is chrome 108 or higher")
options.add_argument("--headless=new")
options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized")
options.add_argument("--no-sandbox")
# fixes "could not connect to chrome" error when running
# on linux using privileged user like root (which i don't recommend)
options.add_argument(
"--log-level=%d" % log_level
or divmod(logging.getLogger().getEffectiveLevel(), 10)[0]
)
if hasattr(options, "handle_prefs"):
options.handle_prefs(user_data_dir)
# fix exit_type flag to prevent tab-restore nag
try:
with open(
os.path.join(user_data_dir, "Default/Preferences"),
encoding="latin1",
mode="r+",
) as fs:
config = json.load(fs)
if config["profile"]["exit_type"] is not None:
# fixing the restore-tabs-nag
config["profile"]["exit_type"] = None
fs.seek(0, 0)
json.dump(config, fs)
fs.truncate() # the file might be shorter
logger.debug("fixed exit_type flag")
except Exception as e:
logger.debug("did not find a bad exit_type flag ")
self.options = options
if not desired_capabilities:
desired_capabilities = options.to_capabilities()
if not use_subprocess and not windows_headless:
self.browser_pid = start_detached(
options.binary_location, *options.arguments
)
else:
startupinfo = None
if os.name == 'nt' and windows_headless:
# STARTUPINFO() is Windows only
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
browser = subprocess.Popen(
[options.binary_location, *options.arguments],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=IS_POSIX,
startupinfo=startupinfo
)
self.browser_pid = browser.pid
service = selenium.webdriver.chromium.service.ChromiumService(
self.patcher.executable_path
)
super().__init__(
service=service,
options=options,
keep_alive=keep_alive,
)
self.reactor = None
if enable_cdp_events:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
logging.getLogger(
"selenium.webdriver.remote.remote_connection"
).setLevel(20)
reactor = Reactor(self)
reactor.start()
self.reactor = reactor
if advanced_elements:
self._web_element_cls = UCWebElement
else:
self._web_element_cls = WebElement
if headless or getattr(options, 'headless', None):
self._configure_headless()
def _configure_headless(self):
orig_get = self.get
logger.info("setting properties for headless")
def get_wrapped(*args, **kwargs):
if self.execute_script("return navigator.webdriver"):
logger.info("patch navigator.webdriver")
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(window, "navigator", {
value: new Proxy(navigator, {
has: (target, key) => (key === "webdriver" ? false : key in target),
get: (target, key) =>
key === "webdriver"
? false
: typeof target[key] === "function"
? target[key].bind(target)
: target[key],
}),
});
"""
},
)
logger.info("patch user-agent string")
self.execute_cdp_cmd(
"Network.setUserAgentOverride",
{
"userAgent": self.execute_script(
"return navigator.userAgent"
).replace("Headless", "")
},
)
self.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'maxTouchPoints', {get: () => 1});
Object.defineProperty(navigator.connection, 'rtt', {get: () => 100});
// https://github.com/microlinkhq/browserless/blob/master/packages/goto/src/evasions/chrome-runtime.js
window.chrome = {
app: {
isInstalled: false,
InstallState: {
DISABLED: 'disabled',
INSTALLED: 'installed',
NOT_INSTALLED: 'not_installed'
},
RunningState: {
CANNOT_RUN: 'cannot_run',
READY_TO_RUN: 'ready_to_run',
RUNNING: 'running'
}
},
runtime: {
OnInstalledReason: {
CHROME_UPDATE: 'chrome_update',
INSTALL: 'install',
SHARED_MODULE_UPDATE: 'shared_module_update',
UPDATE: 'update'
},
OnRestartRequiredReason: {
APP_UPDATE: 'app_update',
OS_UPDATE: 'os_update',
PERIODIC: 'periodic'
},
PlatformArch: {
ARM: 'arm',
ARM64: 'arm64',
MIPS: 'mips',
MIPS64: 'mips64',
X86_32: 'x86-32',
X86_64: 'x86-64'
},
PlatformNaclArch: {
ARM: 'arm',
MIPS: 'mips',
MIPS64: 'mips64',
X86_32: 'x86-32',
X86_64: 'x86-64'
},
PlatformOs: {
ANDROID: 'android',
CROS: 'cros',
LINUX: 'linux',
MAC: 'mac',
OPENBSD: 'openbsd',
WIN: 'win'
},
RequestUpdateCheckStatus: {
NO_UPDATE: 'no_update',
THROTTLED: 'throttled',
UPDATE_AVAILABLE: 'update_available'
}
}
}
// https://github.com/microlinkhq/browserless/blob/master/packages/goto/src/evasions/navigator-permissions.js
if (!window.Notification) {
window.Notification = {
permission: 'denied'
}
}
const originalQuery = window.navigator.permissions.query
window.navigator.permissions.__proto__.query = parameters =>
parameters.name === 'notifications'
? Promise.resolve({ state: window.Notification.permission })
: originalQuery(parameters)
const oldCall = Function.prototype.call
function call() {
return oldCall.apply(this, arguments)
}
Function.prototype.call = call
const nativeToStringFunctionString = Error.toString().replace(/Error/g, 'toString')
const oldToString = Function.prototype.toString
function functionToString() {
if (this === window.navigator.permissions.query) {
return 'function query() { [native code] }'
}
if (this === functionToString) {
return nativeToStringFunctionString
}
return oldCall.call(oldToString, this)
}
// eslint-disable-next-line
Function.prototype.toString = functionToString
"""
},
)
return orig_get(*args, **kwargs)
self.get = get_wrapped
# def _get_cdc_props(self):
# return self.execute_script(
# """
# let objectToInspect = window,
# result = [];
# while(objectToInspect !== null)
# { result = result.concat(Object.getOwnPropertyNames(objectToInspect));
# objectToInspect = Object.getPrototypeOf(objectToInspect); }
#
# return result.filter(i => i.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig))
# """
# )
#
# def _hook_remove_cdc_props(self):
# self.execute_cdp_cmd(
# "Page.addScriptToEvaluateOnNewDocument",
# {
# "source": """
# let objectToInspect = window,
# result = [];
# while(objectToInspect !== null)
# { result = result.concat(Object.getOwnPropertyNames(objectToInspect));
# objectToInspect = Object.getPrototypeOf(objectToInspect); }
# result.forEach(p => p.match(/^([a-zA-Z]){27}(Array|Promise|Symbol)$/ig)
# &&delete window[p]&&console.log('removed',p))
# """
# },
# )
def get(self, url):
# if self._get_cdc_props():
# self._hook_remove_cdc_props()
return super().get(url)
def add_cdp_listener(self, event_name, callback):
if (
self.reactor
and self.reactor is not None
and isinstance(self.reactor, Reactor)
):
self.reactor.add_event_handler(event_name, callback)
return self.reactor.handlers
return False
def clear_cdp_listeners(self):
if self.reactor and isinstance(self.reactor, Reactor):
self.reactor.handlers.clear()
def window_new(self):
self.execute(
selenium.webdriver.remote.command.Command.NEW_WINDOW, {"type": "window"}
)
def tab_new(self, url: str):
"""
this opens a url in a new tab.
apparently, that passes all tests directly!
Parameters
----------
url
Returns
-------
"""
if not hasattr(self, "cdp"):
from .cdp import CDP
cdp = CDP(self.options)
cdp.tab_new(url)
def reconnect(self, timeout=0.1):
try:
self.service.stop()
except Exception as e:
logger.debug(e)
time.sleep(timeout)
try:
self.service.start()
except Exception as e:
logger.debug(e)
try:
self.start_session()
except Exception as e:
logger.debug(e)
def start_session(self, capabilities=None, browser_profile=None):
if not capabilities:
capabilities = self.options.to_capabilities()
super().start_session(capabilities)
# super(Chrome, self).start_session(capabilities, browser_profile) # Original explicit call commented out
def find_elements_recursive(self, by, value):
"""
find elements in all frames
this is a generator function, which is needed
since if it would return a list of elements, they
will be stale on arrival.
using generator, when the element is returned we are in the correct frame
to use it directly
Args:
by: By
value: str
Returns: Generator[webelement.WebElement]
"""
def search_frame(f=None):
if not f:
# ensure we are on main content frame
self.switch_to.default_content()
else:
self.switch_to.frame(f)
for elem in self.find_elements(by, value):
yield elem
# switch back to main content, otherwise we will get StaleElementReferenceException
self.switch_to.default_content()
# search root frame
for elem in search_frame():
yield elem
# get iframes
frames = self.find_elements('css selector', 'iframe')
# search per frame
for f in frames:
for elem in search_frame(f):
yield elem
def quit(self):
try:
self.service.stop()
self.service.process.kill()
self.command_executor.close()
self.service.process.wait(5)
logger.debug("webdriver process ended")
except (AttributeError, RuntimeError, OSError):
pass
try:
self.reactor.event.set()
logger.debug("shutting down reactor")
except AttributeError:
pass
try:
os.kill(self.browser_pid, 15)
logger.debug("gracefully closed browser")
except Exception as e: # noqa
pass
if (
hasattr(self, "keep_user_data_dir")
and hasattr(self, "user_data_dir")
and not self.keep_user_data_dir
):
for _ in range(5):
try:
shutil.rmtree(self.user_data_dir, ignore_errors=False)
except FileNotFoundError:
pass
except (RuntimeError, OSError, PermissionError) as e:
logger.debug(
"When removing the temp profile, a %s occured: %s\nretrying..."
% (e.__class__.__name__, e)
)
else:
logger.debug("successfully removed %s" % self.user_data_dir)
break
try:
time.sleep(0.1)
except OSError:
pass
# dereference patcher, so patcher can start cleaning up as well.
# this must come last, otherwise it will throw 'in use' errors
self.patcher = None
def __getattribute__(self, item):
if not super().__getattribute__("debug"):
return super().__getattribute__(item)
else:
import inspect
original = super().__getattribute__(item)
if inspect.ismethod(original) and not inspect.isclass(original):
def newfunc(*args, **kwargs):
logger.debug(
"calling %s with args %s and kwargs %s\n"
% (original.__qualname__, args, kwargs)
)
return original(*args, **kwargs)
return newfunc
return original
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.service.stop()
time.sleep(self._delay)
self.service.start()
self.start_session()
def __hash__(self):
return hash(self.options.debugger_address)
def __dir__(self):
return object.__dir__(self)
def __del__(self):
try:
self.service.process.kill()
except: # noqa
pass
self.quit()
@classmethod
def _ensure_close(cls, self):
# needs to be a classmethod so finalize can find the reference
logger.info("ensuring close")
if (
hasattr(self, "service")
and hasattr(self.service, "process")
and hasattr(self.service.process, "kill")
):
self.service.process.kill()
def find_chrome_executable():
"""
Finds the chrome, chrome beta, chrome canary, chromium executable
Returns
-------
executable_path : str
the full file path to found executable
"""
candidates = set()
if IS_POSIX:
for item in os.environ.get("PATH").split(os.pathsep):
for subitem in (
"google-chrome",
"chromium",
"chromium-browser",
"chrome",
"google-chrome-stable",
):
candidates.add(os.sep.join((item, subitem)))
if "darwin" in sys.platform:
candidates.update(
[
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
]
)
else:
for item in map(
os.environ.get,
("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"),
):
if item is not None:
for subitem in (
"Google/Chrome/Application",
):
candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates:
logger.debug('checking if %s exists and is executable' % candidate)
if os.path.exists(candidate) and os.access(candidate, os.X_OK):
logger.debug('found! using %s' % candidate)
return os.path.normpath(candidate)

View File

@@ -1,112 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import json
import logging
import requests
import websockets
log = logging.getLogger(__name__)
class CDPObject(dict):
def __init__(self, *a, **k):
super().__init__(*a, **k)
self.__dict__ = self
for k in self.__dict__:
if isinstance(self.__dict__[k], dict):
self.__dict__[k] = CDPObject(self.__dict__[k])
elif isinstance(self.__dict__[k], list):
for i in range(len(self.__dict__[k])):
if isinstance(self.__dict__[k][i], dict):
self.__dict__[k][i] = CDPObject(self)
def __repr__(self):
tpl = f"{self.__class__.__name__}(\n\t{{}}\n\t)"
return tpl.format("\n ".join(f"{k} = {v}" for k, v in self.items()))
class PageElement(CDPObject):
pass
class CDP:
log = logging.getLogger("CDP")
endpoints = CDPObject(
{
"json": "/json",
"protocol": "/json/protocol",
"list": "/json/list",
"new": "/json/new?{url}",
"activate": "/json/activate/{id}",
"close": "/json/close/{id}",
}
)
def __init__(self, options: "ChromeOptions"): # noqa
self.server_addr = "http://{0}:{1}".format(*options.debugger_address.split(":"))
self._reqid = 0
self._session = requests.Session()
self._last_resp = None
self._last_json = None
resp = self.get(self.endpoints.json) # noqa
self.sessionId = resp[0]["id"]
self.wsurl = resp[0]["webSocketDebuggerUrl"]
def tab_activate(self, id=None):
if not id:
active_tab = self.tab_list()[0]
id = active_tab.id # noqa
self.wsurl = active_tab.webSocketDebuggerUrl # noqa
return self.post(self.endpoints["activate"].format(id=id))
def tab_list(self):
retval = self.get(self.endpoints["list"])
return [PageElement(o) for o in retval]
def tab_new(self, url):
return self.post(self.endpoints["new"].format(url=url))
def tab_close_last_opened(self):
sessions = self.tab_list()
opentabs = [s for s in sessions if s["type"] == "page"]
return self.post(self.endpoints["close"].format(id=opentabs[-1]["id"]))
async def send(self, method: str, params: dict):
self._reqid += 1
async with websockets.connect(self.wsurl) as ws:
await ws.send(
json.dumps({"method": method, "params": params, "id": self._reqid})
)
self._last_resp = await ws.recv()
self._last_json = json.loads(self._last_resp)
self.log.info(self._last_json)
def get(self, uri):
resp = self._session.get(self.server_addr + uri)
try:
self._last_resp = resp
self._last_json = resp.json()
except Exception:
return
else:
return self._last_json
def post(self, uri, data: dict = None):
if not data:
data = {}
resp = self._session.post(self.server_addr + uri, json=data)
try:
self._last_resp = resp
self._last_json = resp.json()
except Exception:
return self._last_resp
@property
def last_json(self):
return self._last_json

View File

@@ -1,193 +0,0 @@
import asyncio
from collections.abc import Mapping
from collections.abc import Sequence
from functools import wraps
import os
import logging
import threading
import time
import traceback
from typing import Any
from typing import Awaitable
from typing import Callable
from typing import List
from typing import Optional
class Structure(dict):
"""
This is a dict-like object structure, which you should subclass
Only properties defined in the class context are used on initialization.
See example
"""
_store = {}
def __init__(self, *a, **kw):
"""
Instantiate a new instance.
:param a:
:param kw:
"""
super().__init__()
# auxiliar dict
d = dict(*a, **kw)
for k, v in d.items():
if isinstance(v, Mapping):
self[k] = self.__class__(v)
elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
self[k] = [self.__class__(i) for i in v]
else:
self[k] = v
super().__setattr__("__dict__", self)
def __getattr__(self, item):
return getattr(super(), item)
def __getitem__(self, item):
return super().__getitem__(item)
def __setattr__(self, key, value):
self.__setitem__(key, value)
def __setitem__(self, key, value):
super().__setitem__(key, value)
def update(self, *a, **kw):
super().update(*a, **kw)
def __eq__(self, other):
return frozenset(other.items()) == frozenset(self.items())
def __hash__(self):
return hash(frozenset(self.items()))
@classmethod
def __init_subclass__(cls, **kwargs):
cls._store = {}
def _normalize_strings(self):
for k, v in self.copy().items():
if isinstance(v, (str)):
self[k] = v.strip()
def timeout(seconds=3, on_timeout: Optional[Callable[[callable], Any]] = None):
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
def function_reached_timeout():
if on_timeout:
on_timeout(func)
else:
raise TimeoutError("function call timed out")
t = threading.Timer(interval=seconds, function=function_reached_timeout)
t.start()
try:
return func(*args, **kwargs)
except:
t.cancel()
raise
finally:
t.cancel()
return wrapped
return wrapper
def test():
import sys, os
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
import undetected_chromedriver as uc
import threading
def collector(
driver: uc.Chrome,
stop_event: threading.Event,
on_event_coro: Optional[Callable[[List[str]], Awaitable[Any]]] = None,
listen_events: Sequence = ("browser", "network", "performance"),
):
def threaded(driver, stop_event, on_event_coro):
async def _ensure_service_started():
while (
getattr(driver, "service", False)
and getattr(driver.service, "process", False)
and driver.service.process.poll()
):
print("waiting for driver service to come back on")
await asyncio.sleep(0.05)
# await asyncio.sleep(driver._delay or .25)
async def get_log_lines(typ):
await _ensure_service_started()
return driver.get_log(typ)
async def looper():
while not stop_event.is_set():
log_lines = []
try:
for _ in listen_events:
try:
log_lines += await get_log_lines(_)
except:
if logging.getLogger().getEffectiveLevel() <= 10:
traceback.print_exc()
continue
if log_lines and on_event_coro:
await on_event_coro(log_lines)
except Exception as e:
if logging.getLogger().getEffectiveLevel() <= 10:
traceback.print_exc()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(looper())
t = threading.Thread(target=threaded, args=(driver, stop_event, on_event_coro))
t.start()
async def on_event(data):
print("on_event")
print("data:", data)
def func_called(fn):
def wrapped(*args, **kwargs):
print(
"func called! %s (args: %s, kwargs: %s)" % (fn.__name__, args, kwargs)
)
while driver.service.process and driver.service.process.poll() is not None:
time.sleep(0.1)
res = fn(*args, **kwargs)
print("func completed! (result: %s)" % res)
return res
return wrapped
logging.basicConfig(level=10)
options = uc.ChromeOptions()
options.set_capability(
"goog:loggingPrefs", {"performance": "ALL", "browser": "ALL", "network": "ALL"}
)
driver = uc.Chrome(version_main=96, options=options)
# driver.command_executor._request = timeout(seconds=1)(driver.command_executor._request)
driver.command_executor._request = func_called(driver.command_executor._request)
collector_stop = threading.Event()
collector(driver, collector_stop, on_event)
driver.get("https://nowsecure.nl")
time.sleep(10)
if os.name == "nt":
driver.close()
driver.quit()

View File

@@ -1,77 +0,0 @@
import atexit
import logging
import multiprocessing
import os
import platform
import signal
from subprocess import PIPE
from subprocess import Popen
import sys
CREATE_NEW_PROCESS_GROUP = 0x00000200
DETACHED_PROCESS = 0x00000008
REGISTERED = []
def start_detached(executable, *args):
"""
Starts a fully independent subprocess (with no parent)
:param executable: executable
:param args: arguments to the executable, eg: ['--param1_key=param1_val', '-vvv' ...]
:return: pid of the grandchild process
"""
# create pipe
reader, writer = multiprocessing.Pipe(False)
# do not keep reference
process = multiprocessing.Process(
target=_start_detached,
args=(executable, *args),
kwargs={"writer": writer},
daemon=True,
)
process.start()
process.join()
# receive pid from pipe
pid = reader.recv()
REGISTERED.append(pid)
# close pipes
writer.close()
reader.close()
process.close()
return pid
def _start_detached(executable, *args, writer: multiprocessing.Pipe = None):
# configure launch
kwargs = {}
if platform.system() == "Windows":
kwargs.update(creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP)
elif sys.version_info < (3, 2):
# assume posix
kwargs.update(preexec_fn=os.setsid)
else: # Python 3.2+ and Unix
kwargs.update(start_new_session=True)
# run
p = Popen([executable, *args], stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs)
# send pid to pipe
writer.send(p.pid)
sys.exit()
def _cleanup():
for pid in REGISTERED:
try:
logging.getLogger(__name__).debug("cleaning up pid %d " % pid)
os.kill(pid, signal.SIGTERM)
except: # noqa
pass
atexit.register(_cleanup)

View File

@@ -1,85 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import json
import os
from selenium.webdriver.chromium.options import ChromiumOptions as _ChromiumOptions
class ChromeOptions(_ChromiumOptions):
_session = None
_user_data_dir = None
@property
def user_data_dir(self):
return self._user_data_dir
@user_data_dir.setter
def user_data_dir(self, path: str):
"""
Sets the browser profile folder to use, or creates a new profile
at given <path>.
Parameters
----------
path: str
the path to a chrome profile folder
if it does not exist, a new profile will be created at given location
"""
apath = os.path.abspath(path)
self._user_data_dir = os.path.normpath(apath)
@staticmethod
def _undot_key(key, value):
"""turn a (dotted key, value) into a proper nested dict"""
if "." in key:
key, rest = key.split(".", 1)
value = ChromeOptions._undot_key(rest, value)
return {key: value}
@staticmethod
def _merge_nested(a, b):
"""
merges b into a
leaf values in a are overwritten with values from b
"""
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
ChromeOptions._merge_nested(a[key], b[key])
continue
a[key] = b[key]
return a
def handle_prefs(self, user_data_dir):
prefs = self.experimental_options.get("prefs")
if prefs:
user_data_dir = user_data_dir or self._user_data_dir
default_path = os.path.join(user_data_dir, "Default")
os.makedirs(default_path, exist_ok=True)
# undot prefs dict keys
undot_prefs = {}
for key, value in prefs.items():
undot_prefs = self._merge_nested(
undot_prefs, self._undot_key(key, value)
)
prefs_file = os.path.join(default_path, "Preferences")
if os.path.exists(prefs_file):
with open(prefs_file, encoding="latin1", mode="r") as f:
undot_prefs = self._merge_nested(json.load(f), undot_prefs)
with open(prefs_file, encoding="latin1", mode="w") as f:
json.dump(undot_prefs, f)
# remove the experimental_options to avoid an error
del self._experimental_options["prefs"]
@classmethod
def from_options(cls, options):
o = cls()
o.__dict__.update(options.__dict__)
return o

View File

@@ -1,473 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
from packaging.version import Version as LooseVersion
import io
import json
import logging
import os
import pathlib
import platform
import random
import re
import shutil
import string
import subprocess
import sys
import time
from urllib.request import urlopen
from urllib.request import urlretrieve
import zipfile
from multiprocessing import Lock
logger = logging.getLogger(__name__)
IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux", "linux2", "freebsd"))
class Patcher(object):
lock = Lock()
exe_name = "chromedriver%s"
platform = sys.platform
if platform.endswith("win32"):
d = "~/appdata/roaming/undetected_chromedriver"
elif "LAMBDA_TASK_ROOT" in os.environ:
d = "/tmp/undetected_chromedriver"
elif platform.startswith(("linux", "linux2")):
d = "~/.local/share/undetected_chromedriver"
elif platform.endswith("darwin"):
d = "~/Library/Application Support/undetected_chromedriver"
else:
d = "~/.undetected_chromedriver"
data_path = os.path.abspath(os.path.expanduser(d))
def __init__(
self,
executable_path=None,
force=False,
version_main: int = 0,
user_multi_procs=False,
):
"""
Args:
executable_path: None = automatic
a full file path to the chromedriver executable
force: False
terminate processes which are holding lock
version_main: 0 = auto
specify main chrome version (rounded, ex: 82)
"""
self.force = force
self._custom_exe_path = False
prefix = "undetected"
self.user_multi_procs = user_multi_procs
try:
# Try to convert version_main into an integer
version_main_int = int(version_main)
# check if version_main_int is less than or equal to e.g 114
self.is_old_chromedriver = version_main and version_main_int <= 114
except (ValueError,TypeError):
# Check not running inside Docker
if not os.path.exists("/app/chromedriver"):
# If the conversion fails, log an error message
logging.info("version_main cannot be converted to an integer")
# Set self.is_old_chromedriver to False if the conversion fails
self.is_old_chromedriver = False
# Needs to be called before self.exe_name is accessed
self._set_platform_name()
if not os.path.exists(self.data_path):
os.makedirs(self.data_path, exist_ok=True)
if not executable_path:
if sys.platform.startswith("freebsd"):
self.executable_path = os.path.join(
self.data_path, self.exe_name
)
else:
self.executable_path = os.path.join(
self.data_path, "_".join([prefix, self.exe_name])
)
if not IS_POSIX:
if executable_path:
if not executable_path[-4:] == ".exe":
executable_path += ".exe"
self.zip_path = os.path.join(self.data_path, prefix)
if not executable_path:
if not self.user_multi_procs:
self.executable_path = os.path.abspath(
os.path.join(".", self.executable_path)
)
if executable_path:
self._custom_exe_path = True
self.executable_path = executable_path
# Set the correct repository to download the Chromedriver from
if self.is_old_chromedriver:
self.url_repo = "https://chromedriver.storage.googleapis.com"
else:
self.url_repo = "https://googlechromelabs.github.io/chrome-for-testing"
self.version_main = version_main
self.version_full = None
def _set_platform_name(self):
"""
Set the platform and exe name based on the platform undetected_chromedriver is running on
in order to download the correct chromedriver.
"""
if self.platform.endswith("win32"):
self.platform_name = "win32"
self.exe_name %= ".exe"
if self.platform.endswith(("linux", "linux2")):
self.platform_name = "linux64"
self.exe_name %= ""
if self.platform.endswith("darwin"):
if self.is_old_chromedriver:
self.platform_name = "mac64"
else:
self.platform_name = "mac-x64"
self.exe_name %= ""
if self.platform.startswith("freebsd"):
self.platform_name = "freebsd"
self.exe_name %= ""
def auto(self, executable_path=None, force=False, version_main=None, _=None):
"""
Args:
executable_path:
force:
version_main:
Returns:
"""
p = pathlib.Path(self.data_path)
if self.user_multi_procs:
with Lock():
files = list(p.rglob("*chromedriver*"))
most_recent = max(files, key=lambda f: f.stat().st_mtime)
files.remove(most_recent)
list(map(lambda f: f.unlink(), files))
if self.is_binary_patched(most_recent):
self.executable_path = str(most_recent)
return True
if executable_path:
self.executable_path = executable_path
self._custom_exe_path = True
if self._custom_exe_path:
ispatched = self.is_binary_patched(self.executable_path)
if not ispatched:
return self.patch_exe()
else:
return
if version_main:
self.version_main = version_main
if force is True:
self.force = force
if self.platform_name == "freebsd":
chromedriver_path = shutil.which("chromedriver")
if not os.path.isfile(chromedriver_path) or not os.access(chromedriver_path, os.X_OK):
logging.error("Chromedriver not installed!")
return
version_path = os.path.join(os.path.dirname(self.executable_path), "version.txt")
process = os.popen(f'"{chromedriver_path}" --version')
chromedriver_version = process.read().split(' ')[1].split(' ')[0]
process.close()
current_version = None
if os.path.isfile(version_path) or os.access(version_path, os.X_OK):
with open(version_path, 'r') as f:
current_version = f.read()
if current_version != chromedriver_version:
logging.info("Copying chromedriver executable...")
shutil.copy(chromedriver_path, self.executable_path)
os.chmod(self.executable_path, 0o755)
with open(version_path, 'w') as f:
f.write(chromedriver_version)
logging.info("Chromedriver executable copied!")
else:
try:
os.unlink(self.executable_path)
except PermissionError:
if self.force:
self.force_kill_instances(self.executable_path)
return self.auto(force=not self.force)
try:
if self.is_binary_patched():
# assumes already running AND patched
return True
except PermissionError:
pass
# return False
except FileNotFoundError:
pass
release = self.fetch_release_number()
self.version_main = release.major
self.version_full = release
self.unzip_package(self.fetch_package())
return self.patch()
def driver_binary_in_use(self, path: str = None) -> bool:
"""
naive test to check if a found chromedriver binary is
currently in use
Args:
path: a string or PathLike object to the binary to check.
if not specified, we check use this object's executable_path
"""
if not path:
path = self.executable_path
p = pathlib.Path(path)
if not p.exists():
raise OSError("file does not exist: %s" % p)
try:
with open(p, mode="a+b") as fs:
exc = []
try:
fs.seek(0, 0)
except PermissionError as e:
exc.append(e) # since some systems apprently allow seeking
# we conduct another test
try:
fs.readline()
except PermissionError as e:
exc.append(e)
if exc:
return True
return False
# ok safe to assume this is in use
except Exception as e:
# logger.exception("whoops ", e)
pass
def cleanup_unused_files(self):
p = pathlib.Path(self.data_path)
items = list(p.glob("*undetected*"))
for item in items:
try:
item.unlink()
except:
pass
def patch(self):
self.patch_exe()
return self.is_binary_patched()
def fetch_release_number(self):
"""
Gets the latest major version available, or the latest major version of self.target_version if set explicitly.
:return: version string
:rtype: LooseVersion
"""
# Endpoint for old versions of Chromedriver (114 and below)
if self.is_old_chromedriver:
path = f"/latest_release_{self.version_main}"
path = path.upper()
logger.debug("getting release number from %s" % path)
return LooseVersion(urlopen(self.url_repo + path).read().decode())
# Endpoint for new versions of Chromedriver (115+)
if not self.version_main:
# Fetch the latest version
path = "/last-known-good-versions-with-downloads.json"
logger.debug("getting release number from %s" % path)
with urlopen(self.url_repo + path) as conn:
response = conn.read().decode()
last_versions = json.loads(response)
return LooseVersion(last_versions["channels"]["Stable"]["version"])
# Fetch the latest minor version of the major version provided
path = "/latest-versions-per-milestone-with-downloads.json"
logger.debug("getting release number from %s" % path)
with urlopen(self.url_repo + path) as conn:
response = conn.read().decode()
major_versions = json.loads(response)
return LooseVersion(major_versions["milestones"][str(self.version_main)]["version"])
def parse_exe_version(self):
with io.open(self.executable_path, "rb") as f:
for line in iter(lambda: f.readline(), b""):
match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line)
if match:
return LooseVersion(match[1].decode())
def fetch_package(self):
"""
Downloads ChromeDriver from source
:return: path to downloaded file
"""
zip_name = f"chromedriver_{self.platform_name}.zip"
if self.is_old_chromedriver:
download_url = "%s/%s/%s" % (self.url_repo, str(self.version_full), zip_name)
else:
zip_name = zip_name.replace("_", "-", 1)
download_url = "https://storage.googleapis.com/chrome-for-testing-public/%s/%s/%s"
download_url %= (str(self.version_full), self.platform_name, zip_name)
logger.debug("downloading from %s" % download_url)
return urlretrieve(download_url)[0]
def unzip_package(self, fp):
"""
Does what it says
:return: path to unpacked executable
"""
exe_path = self.exe_name
if not self.is_old_chromedriver:
# The new chromedriver unzips into its own folder
zip_name = f"chromedriver-{self.platform_name}"
exe_path = os.path.join(zip_name, self.exe_name)
logger.debug("unzipping %s" % fp)
try:
os.unlink(self.zip_path)
except (FileNotFoundError, OSError):
pass
os.makedirs(self.zip_path, mode=0o755, exist_ok=True)
with zipfile.ZipFile(fp, mode="r") as zf:
zf.extractall(self.zip_path)
os.rename(os.path.join(self.zip_path, exe_path), self.executable_path)
os.remove(fp)
shutil.rmtree
os.chmod(self.executable_path, 0o755)
return self.executable_path
@staticmethod
def force_kill_instances(exe_name):
"""
kills running instances.
:param: executable name to kill, may be a path as well
:return: True on success else False
"""
exe_name = os.path.basename(exe_name)
if IS_POSIX:
# Using shell=True for pidof, consider a more robust pid finding method if issues arise.
# pgrep can be an alternative: ["pgrep", "-f", exe_name]
# Or psutil if adding a dependency is acceptable.
command = f"pidof {exe_name}"
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, check=True)
pids = result.stdout.strip().split()
if pids:
subprocess.run(["kill", "-9"] + pids, check=False) # Changed from -f -9 to -9 as -f is not standard for kill
return True
return False # No PIDs found
except subprocess.CalledProcessError: # pidof returns 1 if no process found
return False # No process found
except Exception as e:
logger.debug(f"Error killing process on POSIX: {e}")
return False
else:
try:
# TASKKILL /F /IM chromedriver.exe
result = subprocess.run(["taskkill", "/f", "/im", exe_name], check=False, capture_output=True)
# taskkill returns 0 if process was killed, 128 if not found.
return result.returncode == 0
except Exception as e:
logger.debug(f"Error killing process on Windows: {e}")
return False
@staticmethod
def gen_random_cdc():
cdc = random.choices(string.ascii_letters, k=27)
return "".join(cdc).encode()
def is_binary_patched(self, executable_path=None):
executable_path = executable_path or self.executable_path
try:
with io.open(executable_path, "rb") as fh:
return fh.read().find(b"undetected chromedriver") != -1
except FileNotFoundError:
return False
def patch_exe(self):
start = time.perf_counter()
logger.info("patching driver executable %s" % self.executable_path)
with io.open(self.executable_path, "r+b") as fh:
content = fh.read()
# match_injected_codeblock = re.search(rb"{window.*;}", content)
match_injected_codeblock = re.search(rb"\{window\.cdc.*?;\}", content)
if match_injected_codeblock:
target_bytes = match_injected_codeblock[0]
new_target_bytes = (
b'{console.log("undetected chromedriver 1337!")}'.ljust(
len(target_bytes), b" "
)
)
new_content = content.replace(target_bytes, new_target_bytes)
if new_content == content:
logger.warning(
"something went wrong patching the driver binary. could not find injection code block"
)
else:
logger.debug(
"found block:\n%s\nreplacing with:\n%s"
% (target_bytes, new_target_bytes)
)
fh.seek(0)
fh.write(new_content)
logger.debug(
"patching took us {:.2f} seconds".format(time.perf_counter() - start)
)
def __repr__(self):
return "{0:s}({1:s})".format(
self.__class__.__name__,
self.executable_path,
)
def __del__(self):
if self._custom_exe_path:
# if the driver binary is specified by user
# we assume it is important enough to not delete it
return
else:
timeout = 3 # stop trying after this many seconds
t = time.monotonic()
now = lambda: time.monotonic()
while now() - t > timeout:
# we don't want to wait until the end of time
try:
if self.user_multi_procs:
break
os.unlink(self.executable_path)
logger.debug("successfully unlinked %s" % self.executable_path)
break
except (OSError, RuntimeError, PermissionError):
time.sleep(0.01)
continue
except FileNotFoundError:
break

View File

@@ -1,99 +0,0 @@
#!/usr/bin/env python3
# this module is part of undetected_chromedriver
import asyncio
import json
import logging
import threading
logger = logging.getLogger(__name__)
class Reactor(threading.Thread):
def __init__(self, driver: "Chrome"):
super().__init__()
self.driver = driver
self.loop = asyncio.new_event_loop()
self.lock = threading.Lock()
self.event = threading.Event()
self.daemon = True
self.handlers = {}
def add_event_handler(self, method_name, callback: callable):
"""
Parameters
----------
event_name: str
example "Network.responseReceived"
callback: callable
callable which accepts 1 parameter: the message object dictionary
Returns
-------
"""
with self.lock:
self.handlers[method_name.lower()] = callback
@property
def running(self):
return not self.event.is_set()
def run(self):
try:
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.listen())
except Exception as e:
logger.warning("Reactor.run() => %s", e)
async def _wait_service_started(self):
while True:
with self.lock:
if (
getattr(self.driver, "service", None)
and getattr(self.driver.service, "process", None)
and self.driver.service.process.poll()
):
await asyncio.sleep(self.driver._delay or 0.25)
else:
break
async def listen(self):
while self.running:
await self._wait_service_started()
await asyncio.sleep(1)
try:
with self.lock:
log_entries = self.driver.get_log("performance")
for entry in log_entries:
try:
obj_serialized: str = entry.get("message")
obj = json.loads(obj_serialized)
message = obj.get("message")
method = message.get("method")
if "*" in self.handlers:
await self.loop.run_in_executor(
None, self.handlers["*"], message
)
elif method.lower() in self.handlers:
await self.loop.run_in_executor(
None, self.handlers[method.lower()], message
)
# print(type(message), message)
except Exception as e:
raise e from None
except Exception as e:
if "invalid session id" in str(e):
pass
else:
logging.debug("exception ignored :", e)

View File

@@ -1,86 +0,0 @@
from typing import List
from selenium.webdriver.common.by import By
import selenium.webdriver.remote.webelement
class WebElement(selenium.webdriver.remote.webelement.WebElement):
def click_safe(self):
super().click()
self._parent.reconnect(0.1)
def children(
self, tag=None, recursive=False
) -> List[selenium.webdriver.remote.webelement.WebElement]:
"""
returns direct child elements of current element
:param tag: str, if supplied, returns <tag> nodes only
"""
script = "return [... arguments[0].children]"
if tag:
script += ".filter( node => node.tagName === '%s')" % tag.upper()
if recursive:
return list(_recursive_children(self, tag))
return list(self._parent.execute_script(script, self))
class UCWebElement(WebElement):
"""
Custom WebElement class which makes it easier to view elements when
working in an interactive environment.
standard webelement repr:
<selenium.webdriver.remote.webelement.WebElement (session="85ff0f671512fa535630e71ee951b1f2", element="6357cb55-92c3-4c0f-9416-b174f9c1b8c4")>
using this WebElement class:
<WebElement(<a class="mobile-show-inline-block mc-update-infos init-ok" href="#" id="main-cat-switcher-mobile">)>
"""
def __init__(self, parent, id_):
super().__init__(parent, id_)
self._attrs = None
@property
def attrs(self):
if not self._attrs:
self._attrs = self._parent.execute_script(
"""
var items = {};
for (index = 0; index < arguments[0].attributes.length; ++index)
{
items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value
};
return items;
""",
self,
)
return self._attrs
def __repr__(self):
strattrs = " ".join([f'{k}="{v}"' for k, v in self.attrs.items()])
if strattrs:
strattrs = " " + strattrs
return f"{self.__class__.__name__} <{self.tag_name}{strattrs}>"
def _recursive_children(element, tag: str = None, _results=None):
"""
returns all children of <element> recursively
:param element: `WebElement` object.
find children below this <element>
:param tag: str = None.
if provided, return only <tag> elements. example: 'a', or 'img'
:param _results: do not use!
"""
results = _results or set()
for element in element.children():
if tag:
if element.tag_name == tag:
results.add(element)
else:
results.add(element)
results |= _recursive_children(element, tag, results)
return results

View File

@@ -1,23 +1,22 @@
import json
import logging
import os
import platform
import re
import shutil
import sys
import tempfile
import urllib.parse
import tempfile
import sys
from selenium.webdriver.chrome.webdriver import WebDriver
import undetected_chromedriver as uc
from DrissionPage import ChromiumPage, ChromiumOptions
# from DrissionPage.common import Settings
FLARESOLVERR_VERSION = None
PLATFORM_VERSION = None
CHROME_EXE_PATH = None
CHROME_MAJOR_VERSION = None
USER_AGENT = None
XVFB_DISPLAY = None
PATCHED_DRIVER_PATH = None
CHROME_EXE_PATH = os.environ.get('CHROME_EXE_PATH', None)
USER_AGENT = os.environ.get('USER_AGENT', None)
def get_config_log_html() -> bool:
@@ -28,10 +27,6 @@ def get_config_headless() -> bool:
return os.environ.get('HEADLESS', 'true').lower() == 'true'
def get_config_disable_media() -> bool:
return os.environ.get('DISABLE_MEDIA', 'false').lower() == 'true'
def get_flaresolverr_version() -> str:
global FLARESOLVERR_VERSION
if FLARESOLVERR_VERSION is not None:
@@ -62,21 +57,18 @@ def create_proxy_extension(proxy: dict) -> str:
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 3,
"manifest_version": 2,
"name": "Chrome Proxy",
"permissions": [
"proxy",
"tabs",
"unlimitedStorage",
"storage",
"<all_urls>",
"webRequest",
"webRequestAuthProvider"
"webRequestBlocking"
],
"host_permissions": [
"<all_urls>"
],
"background": {
"service_worker": "background.js"
},
"background": {"scripts": ["background.js"]},
"minimum_chrome_version": "76.0.0"
}
"""
@@ -129,98 +121,64 @@ def create_proxy_extension(proxy: dict) -> str:
return proxy_extension_dir
def get_webdriver(proxy: dict = None) -> WebDriver:
global PATCHED_DRIVER_PATH, USER_AGENT
def get_webdriver(proxy: dict = None) -> ChromiumPage:
global CHROME_EXE_PATH, USER_AGENT
logging.debug('Launching web browser...')
# undetected_chromedriver
options = uc.ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('--window-size=1920,1080')
options.add_argument('--disable-search-engine-choice-screen')
options = ChromiumOptions()
options.set_argument('--no-sandbox')
options.set_argument('--window-size=1920,1080')
# todo: this param shows a warning in chrome head-full
options.add_argument('--disable-setuid-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.set_argument('--disable-setuid-sandbox')
options.set_argument('--disable-dev-shm-usage')
# this option removes the zygote sandbox (it seems that the resolution is a bit faster)
options.add_argument('--no-zygote')
options.set_argument('--no-zygote')
# attempt to fix Docker ARM32 build
IS_ARMARCH = platform.machine().startswith(('arm', 'aarch'))
if IS_ARMARCH:
options.add_argument('--disable-gpu-sandbox')
options.add_argument('--ignore-certificate-errors')
options.add_argument('--ignore-ssl-errors')
options.set_argument('--disable-gpu-sandbox')
options.set_argument('--disable-software-rasterizer')
options.set_argument('--ignore-certificate-errors')
options.set_argument('--ignore-ssl-errors')
# fix GL errors in ASUSTOR NAS
# https://github.com/FlareSolverr/FlareSolverr/issues/782
# https://github.com/microsoft/vscode/issues/127800#issuecomment-873342069
# https://peter.sh/experiments/chromium-command-line-switches/#use-gl
options.set_argument('--use-gl=swiftshader')
language = os.environ.get('LANG', None)
if language is not None:
options.add_argument('--accept-lang=%s' % language)
options.set_argument('--accept-lang=%s' % language)
# Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910
if USER_AGENT is not None:
options.add_argument('--user-agent=%s' % USER_AGENT)
options.set_argument('--user-agent=%s' % USER_AGENT)
proxy_extension_dir = None
if proxy and all(key in proxy for key in ['url', 'username', 'password']):
proxy_extension_dir = create_proxy_extension(proxy)
options.add_argument("--disable-features=DisableLoadExtensionCommandLineSwitch")
options.add_argument("--load-extension=%s" % os.path.abspath(proxy_extension_dir))
options.set_argument("--load-extension=%s" % os.path.abspath(proxy_extension_dir))
elif proxy and 'url' in proxy:
proxy_url = proxy['url']
logging.debug("Using webdriver proxy: %s", proxy_url)
options.add_argument('--proxy-server=%s' % proxy_url)
options.set_argument('--proxy-server=%s' % proxy_url)
# note: headless mode is detected (headless = True)
# we launch the browser in head-full mode with the window hidden
windows_headless = False
if get_config_headless():
if os.name == 'nt':
windows_headless = True
else:
start_xvfb_display()
# For normal headless mode:
# options.add_argument('--headless')
options.headless(windows_headless)
options.set_argument("--auto-open-devtools-for-tabs")
# if we are inside the Docker container, we avoid downloading the driver
driver_exe_path = None
version_main = None
if os.path.exists("/app/chromedriver"):
# running inside Docker
driver_exe_path = "/app/chromedriver"
else:
version_main = get_chrome_major_version()
if PATCHED_DRIVER_PATH is not None:
driver_exe_path = PATCHED_DRIVER_PATH
# detect chrome path
browser_executable_path = get_chrome_exe_path()
# downloads and patches the chromedriver
# if we don't set driver_executable_path it downloads, patches, and deletes the driver each time
try:
driver = uc.Chrome(options=options, browser_executable_path=browser_executable_path,
driver_executable_path=driver_exe_path, version_main=version_main,
windows_headless=windows_headless, headless=get_config_headless())
except Exception as e:
logging.error("Error starting Chrome: %s" % e)
# No point in continuing if we cannot retrieve the driver
raise e
# save the patched driver to avoid re-downloads
if driver_exe_path is None:
PATCHED_DRIVER_PATH = os.path.join(driver.patcher.data_path, driver.patcher.exe_name)
if PATCHED_DRIVER_PATH != driver.patcher.executable_path:
shutil.copy(driver.patcher.executable_path, PATCHED_DRIVER_PATH)
if CHROME_EXE_PATH is not None:
options.set_paths(browser_path=CHROME_EXE_PATH)
# clean up proxy extension directory
if proxy_extension_dir is not None:
shutil.rmtree(proxy_extension_dir)
# selenium vanilla
# options = webdriver.ChromeOptions()
# options.add_argument('--no-sandbox')
# options.add_argument('--window-size=1920,1080')
# options.add_argument('--disable-setuid-sandbox')
# options.add_argument('--disable-dev-shm-usage')
# driver = webdriver.Chrome(options=options)
driver = ChromiumPage(addr_or_opts=options)
return driver
@@ -243,10 +201,53 @@ def get_chrome_exe_path() -> str:
CHROME_EXE_PATH = chrome_path
return CHROME_EXE_PATH
# system
CHROME_EXE_PATH = uc.find_chrome_executable()
CHROME_EXE_PATH = find_chrome_executable()
return CHROME_EXE_PATH
def find_chrome_executable():
"""
Finds the chrome, chrome beta, chrome canary, chromium executable
Returns
-------
executable_path : str
the full file path to found executable
"""
candidates = set()
if sys.platform.startswith(("darwin", "cygwin", "linux", "linux2", "freebsd")):
for item in os.environ.get("PATH").split(os.pathsep):
for subitem in (
"google-chrome",
"chromium",
"chromium-browser",
"chrome",
"google-chrome-stable",
):
candidates.add(os.sep.join((item, subitem)))
if "darwin" in sys.platform:
candidates.update(
[
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
]
)
else:
for item in map(
os.environ.get,
("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "PROGRAMW6432"),
):
if item is not None:
for subitem in (
"Google/Chrome/Application",
):
candidates.add(os.sep.join((item, subitem, "chrome.exe")))
for candidate in candidates:
if os.path.exists(candidate) and os.access(candidate, os.X_OK):
return os.path.normpath(candidate)
def get_chrome_major_version() -> str:
global CHROME_MAJOR_VERSION
if CHROME_MAJOR_VERSION is not None:
@@ -304,7 +305,7 @@ def extract_version_nt_folder() -> str:
paths = [f.path for f in os.scandir(path) if f.is_dir()]
for path in paths:
filename = os.path.basename(path)
pattern = r'\d+\.\d+\.\d+\.\d+'
pattern = '\d+\.\d+\.\d+\.\d+'
match = re.search(pattern, filename)
if match and match.group():
# Found a Chrome version.
@@ -320,7 +321,7 @@ def get_user_agent(driver=None) -> str:
try:
if driver is None:
driver = get_webdriver()
USER_AGENT = driver.execute_script("return navigator.userAgent")
USER_AGENT = driver.user_agent
# Fix for Chrome 117 | https://github.com/FlareSolverr/FlareSolverr/issues/910
USER_AGENT = re.sub('HEADLESS', '', USER_AGENT, flags=re.IGNORECASE)
return USER_AGENT

View File

@@ -1 +1 @@
WebTest==3.0.7
WebTest==3.0.0