selenium快速上手教程
大约 19 分钟
相关资料
selenium官方中文文档
火狐插件 Selenium IDE
简单示例
chrome
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
options = ChromeOptions() # chrome选项
# 使用360的chrome版本浏览器
options.binary_location = r"C:\Users\xiong\AppData\Local\360Chrome\Chrome\Application\360chrome.exe"
# 创建浏览器对象(使用chorme对应版本的驱动)
browser = webdriver.Chrome(executable_path='c:/360chromedriver.exe', options=options)
# 打开百度
browser.get("https://www.baidu.com")
firefox
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.firefox.options import Options as FirefoxOptions
options = FirefoxOptions()
browser = webdriver.Firefox(executable_path="c:/geckodriver.exe", firefox_profile=profile)
# 打开百度
try:
browser.get("https://www.baidu.com")
except TimeoutException:
pass
等待某个元素出现
# 隐式等待(全局有效),隐式等待是针对页面的,在使用find函数查找元素时,最多等待30s
browser.implicitly_wait(30)
# 显式等待,轮询等待某个元素出现(最大等待20秒,每0.5秒轮询一次)
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.by import By
locator = (By.ID, 'kw')
# EC.presence_of_element_located() 是确认元素是否已经出现了
input = WebDriverWait(browser, 20, 0.5).until(EC.presence_of_element_located(locator))
# EC.element_to_be_clickable() 是确认元素是否是可点击的
button = WebDriverWait(browser, 20, 0.5).until(EC.element_to_be_clickable((By.CSS_SELECTOR, '.btn-search')))
time.sleep(0.5)
注意:有时候可以会出现元素出现后,还是报找不到元素的错误。这个时候应该使用显式等待,然后在后面加上
time.sleep(0.5)
,最后再查找元素。
常用的判断条件:
title_is #标题是某内容
title_contains #标题包含某内容
presence_of_element_located #元素加载出,传入定位元组,如(By.ID, 'p')
visibility_of_element_located #元素可见,传入定位元组
visibility_of #可见,传入元素对象
presence_of_all_elements_located #所有元素加载出
text_to_be_present_in_element #某个元素文本包含某文字
text_to_be_present_in_element_value #某个元素值包含某文字
frame_to_be_available_and_switch_to_it #frame加载并切换
invisibility_of_element_located #元素不可见
element_to_be_clickable #元素可点击
staleness_of #判断一个元素是否仍在DOM,可判断页面是否已经刷新
element_to_be_selected #元素可选择,传元素对象
element_located_to_be_selected #元素可选择,传入定位元组
element_selection_state_to_be #传入元素对象以及状态,相等返回True,否则返回False
element_located_selection_state_to_be #传入定位元组以及状态,相等返回True,否则返回False
alert_is_present #是否出现Alert
元素的操作
查找元素:
# 得到一个 element 实例
# 得到实例可继续调用进行层级查找
browser.find_element_by_xx()
# 得到一个 element 实例的列表
# 通过对 list 的遍历, 即可对各个实例进行相应的操作
browser.find_elements_by_xx()
以下是详细的定位元素的方法:
# 返回一个匹配元素, 即一个 WebElement 元素
find_element_by_id() # 通过元素id定位
find_element_by_name() # 通过元素name定位
find_element_by_class_name() # 通过类名进行定位
find_element_by_tag_name() # 通过标签定位
find_element_by_link_text() # 通过a标签的完整文本内容定位
find_element_by_partial_link_text() # 通过a标签的部分文本内容定位
find_element_by_xpath() # 通过xpath表达式定位
find_element_by_css_selector() # 通过css选择器进行定位
# 返回一个列表, 包含所有匹配的元素, 即一个 WebElement 列表
find_elements_by_id()
find_elements_by_name()
find_elements_by_class_name()
find_elements_by_tag_name()
find_elements_by_link_text()
find_elements_by_partial_link_text()
find_elements_by_xpath()
find_elements_by_css_selector()
WebDriver对象(browser)
和WebElement对象(element)
都可以使用这些 API。WebDriver
进行全局定位 ;WebElement
进行层级定位, 即查找当前元素的子元素
。
元素基本操作
# 元素的属性
element.id # 元素的id
element.location # 元素的位置
element.size # 元素的大小
element.tag_name # 当前元素的标签名
element.text # 当前元素的内容
element,is_displayed() # 当前元素是否可见
element.is_selected() # 当前元素是否选中, 文本输入框的内容
element.is_enabled() # 当前元素是否禁止, 比如经常会禁用一些元素的点击
element.get_attribute(name) # 获取当前元素执行属性的值
# 元素的操作
element.clear() # 清楚元素的内容, 假如这个元素是一个文本元素(input元素)
element.click() # 点击当前元素
element.send_keys(*value) # 向当前元素模拟键盘事件
element.submit() # 提交表单
element_select.select_by_value('50') # 下拉框选择操作
鼠标交互动作
鼠标操作的方法封装在 ActionChains 类提供
from selenium.webdriver.common.action_chains import ActionChains
# 对元素执行鼠标悬停操作
ActionChains(browser).move_to_element(element).perform()
# 双击
ActionChains(browser).double_click().perform()
# 右击
ActionChains(browser).context_click().perform()
# 拖拽
ActionChains(browser).drag_and_drop(element1, element2).perform()
将动作附加到动作链中串行执行
# 拖拽
from selenium.webdriver.common.action_chains import ActionChains
source = browser.find_element_by_css_selector('#draggable')
target = browser.find_element_by_css_selector('#droppable')
actions = ActionChains(browser) # 构造ActionChains对象
actions.drag_and_drop(source, target) # 拖拽
actions.perform() # 执行所有 ActionChains 中存储的行为,可以理解成是对整个操作的提交动作
浏览器的操作
# ----------------------获取属性------------------------
browser.title # 当前页面的 title
browser.name # 当前浏览器的名字
browser.page_source # 网页源码
browser.current_url # 获取当前加载页面的 URL
# ---------------------cookie操作----------------------
browser.get_cookies() # 得到所有的 cookie
browser.get_cookie("name") # 返回字典的key为“name”的cookie信息
# 为当前会话添加cookie,“cookie_dict”指字典对象,必须有name 和value 值
browser.add_cookie(cookie_dict)
driver.add_cookie({‘name’ : ‘foo’, ‘value’ : ‘bar’}) driver.add_cookie({‘name’ : ‘foo’, ‘value’ : ‘bar’, ‘path’ : ‘/’}) driver.add_cookie({‘name’ : ‘foo’, ‘value’ : ‘bar’, ‘path’ : ‘/’, ‘secure’:True})
browser.delete_all_cookies() # 删除当前会话的所有cookie
# 删除cookie信息。“name”是要删除的cookie的名称,“optionsString”是该cookie的选项,目前支持的选项包括“路径”,“域”
browser.delete_cookie(name,optionsString)
browser.delete_cookie(name) # 删除指定 cookie
# ---------------------页面跳转---------------------------
browser.get(url) # 在当前窗口加载 url
browser.refresh() # 刷新当前页面
browser.back() # 相当于浏览器的后退历史记录
browser.forward() # 相当于浏览器的前进历史记录
browser.close() # 关闭当前窗口, 如果当前窗口是最后一个窗口, 浏览器将关闭
browser.quit() # 关闭所有窗口并停止 ChromeDriver 的执行
# -----------------------页面切换----------------------------------
browser.current_window_handle # 当前窗口的 handle, 相当于一个指针一样的东西, 用来指向当前窗口
browser.window_handles # 当前浏览器中的已经打开的所有窗口, 是一个 list
browser.window_handles[0]
browser.switch_to.window(browser.window_handles[0]) # 切换 window_handle 指向的窗口
browser.switch_to.frame('iframeResult') # 切入到frame标签中
browser.switch_to.parent_frame() # 切出到frame标签
browser.switch_to.default_content() # 切出到最外层的frame标签
#---------------------------脚本执行---------------------------
browser.execute_script(script, *args) # 同步执行 js 脚本
browser.execute_script('alert("demo")') # 执行脚本举例
browser.execute_script("arguments[0].focus();", element) # 脚本传参
browser.execute_async_script(script, *args) # 异步执行 js 脚本
# 截取当前窗口,并指定截图图片的保存位置
browser.get_screenshot_as_file("D:\\baidu_img.jpg")
# 调整窗口大小
browser.set_window_size(375, 812)
# 全屏
browser.maximize_window()
脚本执行
# 停止页面加载
browser.execute_script('window.stop()')
# 拖动滚动条是元素居中
element = browser.find_element_by_id("demo")
browser.execute_script("arguments[0].scrollIntoView();", element)
警告框处理
警告框处理:
- 进入警告框(alert/confirm/prompt)
- 操作警告框
操作方法:
方法 | 说明 |
---|---|
text | 返回 alert/confirm/prompt 中的文字信息 |
accept() | 接受现有警告框 |
dismiss() | 解散现有警告框 |
send_keys(keysToSend) | 发送文本至警告框。keysToSend:将文本发送至警告框。 |
# 接受警告框
browser.switch_to.alert.accept()
错误处理
ElementClickInterceptedException
点击元素时报错,可能是在点击时被其他元素遮挡了,做休眠即可成功点击
time.sleep(2)
browser.find_element_by_css_selector(".c-blocka").click()
selenium最佳实践
webdriver_base.py
"""
包装webdriver,使更好用
一些检测网站:
https://ip77.net/
https://www.browserscan.net/zh
配置说明:https://peter.sh/experiments/chromium-command-line-switches/#enable-print-preview-register-promos
绕过检测的webdriver: https://github.com/ultrafunkamsterdam/undetected-chromedriver
说明:
禁止配置--disable-infobars选项,会被检测为自动化控制
"""
import random
import os
import time
from itertools import cycle
from typing import Union
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import TimeoutException
import undetected_chromedriver as uc
from utils.media_publish.img_captcha import digest_letter_code
from webdriver_manager.chrome import ChromeDriverManager
from .webdriver_proxy_auth_extension import proxies
import requests
from PIL import Image
from io import BytesIO
import base64
from utils.media_publish.img_captcha import digest_letter_code
from selenium.webdriver.common.proxy import Proxy, ProxyType
class CustomChromeDriver(webdriver.Chrome):
download_driver_path = ''
def undetected__init__(self, headless=False, proxy_ip=None):
chrome_options = ChromeOptions()
if proxy_ip:
proxies_extension = proxies('d4245106311', '8ybkdofj', proxy_ip.split(':')[0], proxy_ip.split(':')[1])
chrome_options.add_extension(proxies_extension)
# 这里主要是测试使用
if not self.download_driver_path:
temp_file = os.path.join(os.path.expanduser('~'), '.wdm/drivers/chromedriver/win64/123.0.6312.122/chromedriver-win32/chromedriver.exe')
if os.path.exists(temp_file):
self.__class__.download_driver_path = temp_file
if not self.download_driver_path:
# 另外提供一个手动下载地址:https://registry.npmmirror.com/binary.html?path=chrome-for-testing/
CustomChromeDriver.download_driver_path = ChromeDriverManager(url='https://cdn.npmmirror.com/binaries/chromedriver', latest_release_url='https://cdn.npmmirror.com/binaries/chromedriver/LATEST_RELEASE').install()
# CustomChromeDriver.download_driver_path = ChromeDriverManager().install()
super().__init__(options=chrome_options, driver_executable_path=self.download_driver_path, headless=headless, use_subprocess=True)
def __init__(self, headless=False, proxy_ip=None):
"""这个原始webdriver的初始化"""
chrome_options = ChromeOptions()
if proxy_ip:
proxies_extension = proxies('d4245106311', '8ybkdofj', proxy_ip.split(':')[0], proxy_ip.split(':')[1])
chrome_options.add_extension(proxies_extension)
# chrome_options.add_argument(f'--proxy-server={proxy_ip}') # 无法设置代理IP账号密码功能
# 小红书会对user_agent进行检测,请使用真实user_agent
user_agent_list = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.37 (KHTML, like Gecko) Chrome/123.0.0.1 Safari/537.37",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.38 (KHTML, like Gecko) Chrome/123.0.0.2 Safari/537.38",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.39 (KHTML, like Gecko) Chrome/123.0.0.3 Safari/537.39",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.40 (KHTML, like Gecko) Chrome/123.0.0.4 Safari/537.40",
]
user_agent = random.choice(user_agent_list)
chrome_options.add_argument('user-agent=' + user_agent)
chrome_options.add_argument("--disable-blink-features=AutomationControlled") # 关闭(用户其浏览器正由自动化测试控制),使浏览器看起来不像是被自动化控制的。
# chrome_options.add_argument("--incognito") # 无痕模式(无痕模式无法使用扩展)
chrome_options.add_experimental_option('prefs', {'credentials_enable_service': False, 'profile.password_manager_enabled': False}) # 关闭保存密码的弹框
# chrome_options.add_argument("blink-settings=imagesEnabled=false") # Blink渲染引擎级别的禁用图片加载,加快访问速度
chrome_options.add_argument('log-level=3') # 设置日志级别,禁用大部分日志输出,使输出更简洁
chrome_options.add_argument("--enable-precise-memory-info") # 网站可以通过 JS API 获取到更详细的内存使用数据
chrome_options.add_argument("--test-type") # 开启测试模式(减少弹框和警告,放宽安全特效,启用某些测试特性)
chrome_options.add_argument('--no-sandbox') # 禁用沙盒模式。沙盒模式是一种安全特性,但在某些环境(如Docker容器)中可能导致问题。解决DevToolsActivePort文件不存在的报错
chrome_options.add_argument("--disable-setuid-sandbox") # 禁用setuid沙盒,这是沙盒模式的一个组成部分。
chrome_options.add_argument('--disable-dev-shm-usage') # 避免将数据写入/dev/shm,而写入到/tmp中,因为在某些环境中它的容量很小,这可能会导致问题。
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) # 设置为开发者模式,防止网站检测到Selenium的使用。
chrome_options.add_experimental_option('useAutomationExtension', False) # 禁用自动化扩展,这有助于减少Selenium自动化的可见性。
chrome_options.add_argument("--no-default-browser-check") # 不进行浏览器默认检查,这有助于自动化测试的流畅进行。
chrome_options.add_argument("--disable-popup-blocking") # 关闭弹窗,关闭网站代码直接生成的弹窗行为
# chrome_options.add_argument("--disable-extensions") # 禁用所有Chrome扩展,这有助于保持测试环境的一致性。
chrome_options.add_argument("--ignore-certificate-errors") # 忽略SSL/TLS证书错误,对于测试自签名或无效证书的网站很有用。
chrome_options.add_argument("--no-first-run") # 跳过初次运行的初始化过程,直接进入浏览器主界面。
chrome_options.add_argument("--start-maximized") # 最大化启动,模拟常规用户行为
chrome_options.add_argument("--disable-xss-auditor") # 禁止xss防护
chrome_options.add_argument("--disable-web-security") # 禁用同源策略,允许不受限制地跨域访问,这在某些特定的测试场景中可能是必要的。
chrome_options.add_argument("--allow-running-insecure-content") # 允许运行不安全的内容,即使在使用HTTPS的页面上也可以加载不安全的HTTP资源。
if headless:
chrome_options.add_argument("--headless") # 设置无头模式
chrome_options.add_argument("--window-size=1920,1080") # 设置窗口大小,解决无头模式报错element not interactable
chrome_options.add_argument('--disable-gpu') # 无头模式下,需要禁用GPU硬件加速来避免BUG
chrome_options.add_argument("--disable-webgl") # 禁用WebGL,这可能有助于提高性能或规避特定的WebGL相关问题。
# ------- 无头模式 代理IP登录验证 示例(网上的代码未测试)
# 无头模式不支持自定义扩展,可以使用selenium-wire包解决代理验证问题
# if proxy_ip:
# proxy_options = {
# 'http': f'http://d4245106311:8ybkdofj@{proxy_ip}',
# 'https': f'https://d4245106311:8ybkdofj@{proxy_ip}',
# 'no_proxy': 'localhost,127.0.0.1'
# }
# from seleniumwire import webdriver
# driver = webdriver.Chrome(options=chrome_options, seleniumwire_options={'proxy': proxy_options})
# ChromeDriverManager().install() 会占用1.5秒时间,所以只放在第一次检查。
# 这里主要是测试使用
if not self.download_driver_path:
temp_file = os.path.join(os.path.expanduser('~'), '.wdm/drivers/chromedriver/win64/123.0.6312.122/chromedriver-win32/chromedriver.exe')
if os.path.exists(temp_file):
self.__class__.download_driver_path = temp_file
if not self.download_driver_path:
# 另外提供一个手动下载地址:https://registry.npmmirror.com/binary.html?path=chrome-for-testing/
CustomChromeDriver.download_driver_path = ChromeDriverManager(url='https://cdn.npmmirror.com/binaries/chromedriver', latest_release_url='https://cdn.npmmirror.com/binaries/chromedriver/LATEST_RELEASE').install()
# CustomChromeDriver.download_driver_path = ChromeDriverManager().install()
# selenium 4 在启动浏览器时会自动识别和下载对于的webdriver,导致第一次启动速度非常慢,所以需要手动指定版本
super().__init__(
options=chrome_options,
service=ChromeService(self.download_driver_path),
)
self.set_page_load_timeout(15) # 设置页面加载的超时时间
def show_progress(self, message):
"""在浏览器页面上显示进度信息"""
js_code = f"""
var progressDiv = document.getElementById('customProgressDiv');
if (!progressDiv) {{
progressDiv = document.createElement('div');
progressDiv.id = 'customProgressDiv';
document.body.appendChild(progressDiv);
progressDiv.style.position = 'fixed';
progressDiv.style.bottom = '20px';
progressDiv.style.right = '20px';
progressDiv.style.backgroundColor = 'rgba(76, 175, 80, 0.9)';
progressDiv.style.color = 'white';
progressDiv.style.padding = '10px';
progressDiv.style.borderRadius = '5px';
progressDiv.style.zIndex = '10000';
}}
progressDiv.textContent = '{message}';
"""
self.execute_script(js_code)
def wait_url(self, url_part, timeout=10, raise_exception=False) -> bool:
"""等待页面跳转
:param timeout: 等待超时时间
:param url_part: url片段
"""
try:
WebDriverWait(self, timeout).until(lambda driver: url_part in driver.current_url)
return True
except Exception as e:
if raise_exception:
raise e
return False
def get_element_by_css_selector(self, css_selector: Union[str, list], timeout=15, raise_exception=False):
"""
通过 CSS 选择器查找并返回第一个匹配的元素
:param css_selector: 用于定位元素的 CSS 选择器
"""
if isinstance(css_selector, str):
css_selectors = [css_selector]
else:
css_selectors = css_selector
start_time = time.time()
for css in cycle(css_selectors):
# 计算剩余时间,如果超时了则返回
remaining_time = timeout - (time.time() - start_time)
if remaining_time <= 0:
if raise_exception:
raise TimeoutException(f"等待元素 {css} 超时")
else:
return
try:
# 等待元素在页面上变得可见
element = WebDriverWait(self, 2).until(
EC.visibility_of_element_located((By.CSS_SELECTOR, css))
)
return element
except Exception as e:
continue
def get_elements_by_css_selector(self, css_selector, timeout=10, raise_exception=False):
"""
通过 CSS 选择器查找并返回一个元素列表
:param css_selector: 用于定位元素的 CSS 选择器
"""
try:
# 等待元素在页面上变得可见
elements = WebDriverWait(self, timeout).until(
EC.visibility_of_all_elements_located((By.CSS_SELECTOR, css_selector))
)
return elements
except Exception as e:
if raise_exception:
raise e
print(f"没有找到或等待元素可见时出现问题 {css_selector}: {e}")
def wait_deleted_for_css_selector(self, css_selector, timeout=2, raise_exception=False) -> bool:
"""等待元素消失
:param timeout: 等待超时时间
:param css_selector: 用于定位元素的 CSS 选择器
"""
# WebDriverWait(self.webdriver, 10).until(lambda driver: len(driver.find_elements_by_css_selector(css_selector)) == 0)
try:
# 使用显式等待检查元素是否消失(不可见或不在DOM中)
WebDriverWait(self, timeout).until(
EC.invisibility_of_element_located((By.CSS_SELECTOR, css_selector))
)
return True # 元素已消失
except Exception as e:
if raise_exception:
raise e
return False # 元素未消失
def wait_cookies_key(self, key: str, raise_exception=False) -> bool:
"""等待cookies中key存在
:param key: cookies key
"""
try:
WebDriverWait(self, 10).until(lambda driver: key in [cookie['name'] for cookie in driver.get_cookies()])
return True
except Exception as e:
if raise_exception:
raise e
return False
def click_element_by_css_selector(self, css_selector: Union[str, list], timeout=10, raise_exception=False) -> bool:
"""
通过 CSS 选择器查找并点击第一个匹配的元素
:param css_selector: 用于定位元素的 CSS 选择器
"""
if isinstance(css_selector, str):
css_selectors = [css_selector]
else:
css_selectors = css_selector
start_time = time.time()
for css in cycle(css_selectors):
# 计算剩余时间,如果超时了则返回
remaining_time = timeout - (time.time() - start_time)
if remaining_time <= 0:
if raise_exception:
raise TimeoutException(f"等待元素 {css} 超时")
else:
return False
try:
# 等待元素在页面上变得可点击
element = WebDriverWait(self, 2).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, css))
)
# 点击元素
element.click()
return True
except Exception as e:
continue
def input_by_css_selector(self, css_selector, input_text, is_enter=False, raise_exception=False):
"""通过 CSS 选择器查找并输入文本到第一个匹配的元素
:param is_enter: 是否输入后自动回车
"""
try:
# 等待输入框在页面上变得可见
input_field = WebDriverWait(self, 15).until(
EC.visibility_of_element_located((By.CSS_SELECTOR, css_selector))
)
input_field.clear() # 清除输入框中的任何现有内容
for chat in input_text:
input_field.send_keys(chat) # 填写新的值
time.sleep(0.1)
if is_enter:
input_field.send_keys(Keys.ENTER) # 输入回车键
except Exception as e:
if raise_exception:
raise e
print(f"表单输入到CSS选择器失败 {css_selector}: {e}")
def hover_over_element(self, css_selector):
"""
目前没有用到,可删除
将鼠标悬停在指定 CSS 选择器的元素上
:param css_selector: 用于定位元素的 CSS 选择器
"""
try:
# 等待元素加载到DOM中, 不管元素是否可见
element = WebDriverWait(self, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, css_selector))
)
# 创建 ActionChains 对象并悬停在元素上
hover = ActionChains(self).move_to_element(element)
hover.perform()
except Exception as e:
print(f"悬停到CSS选择器元素失败 {css_selector}: {e}")
def switch_iframe_by_css_selector(self, css_selector, raise_exception=False) -> bool:
"""如何当前页面存在iframe,一定需要切换到iframe后在能选择出里面的元素"""
# 找到iframe并切换
try:
iframe = WebDriverWait(self, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, css_selector)))
self.switch_to.frame(iframe)
return True
except Exception as e:
if raise_exception:
raise e
return False
def slide_captcha(self, img_base64_str, button_css_selector):
"""获取滑动验证码的滑动值,并且拖动滑动按钮到指定位置
:param img_base64_str: base64格式的带缺口的背景图片
:param button_css_selector: 滑动按钮的css选择器
"""
# 获取滑动距离(这里耦合了其他模块)
distance = digest_letter_code(img_base64_str, typeid=33)
distance = int(distance)
distance = distance - 10 # 实际滑动会有指定偏移,需要减去10px
# 定位滑动按钮
slider_button = self.find_element(By.CSS_SELECTOR, button_css_selector)
# 使用 ActionChains 来模拟拖动操作
action_chains = ActionChains(self)
action_chains.click_and_hold(slider_button).perform() # 点击并按住滑块
# 拖动滑块到指定距离
action_chains.move_by_offset(int(distance), 0).perform() # x 方向移动 distance,y 方向不变
action_chains.release().perform() # 释放滑块
# 这里主要是将滑动距离尝试前后调整几次试试
if not self.wait_deleted_for_css_selector(button_css_selector):
action_chains.click_and_hold(slider_button).perform() # 点击并按住滑块
action_chains.move_by_offset(int(distance) - 3, 0).perform() # x 方向移动 distance,y 方向不变
action_chains.release().perform() # 释放滑块
else:
return
if not self.wait_deleted_for_css_selector(button_css_selector):
action_chains.click_and_hold(slider_button).perform() # 点击并按住滑块
action_chains.move_by_offset(int(distance) - 5, 0).perform() # x 方向移动 distance,y 方向不变
action_chains.release().perform() # 释放滑块
else:
return
if not self.wait_deleted_for_css_selector(button_css_selector):
action_chains.click_and_hold(slider_button).perform() # 点击并按住滑块
action_chains.move_by_offset(int(distance) + 3, 0).perform() # x 方向移动 distance,y 方向不变
action_chains.release().perform() # 释放滑块
else:
return
if not self.wait_deleted_for_css_selector(button_css_selector):
action_chains.click_and_hold(slider_button).perform() # 点击并按住滑块
action_chains.move_by_offset(int(distance) + 5, 0).perform() # x 方向移动 distance,y 方向不变
action_chains.release().perform() # 释放滑块
else:
return
# 这里可能需要加一些等待或验证拖动成功的逻辑
def captcha_rotate(self, img_css, img_bg_css, button_css_selector, refresh_css='', slide_max_distance: int = 0):
"""旋转图片验证码
Args:
img_css: 被旋转图img标签的css选择器
img_bg_css: 背景图的img标签的css选择器
button_css_selector: 滑动按钮的css选择器
refresh_css: 刷新验证码的按钮的css选择器
slide_max_distance: 可滑动的最大距离,实际滑动距离要按照可滑动的最大距离的比例计算(distance / 360 * slide_max_distance)
"""
# 这里判断如果有验证码窗口就继续,如果没有弹出验证码框,则返回
if not self.get_element_by_css_selector(img_css, timeout=4):
return
for _ in range(10): # 总共尝试10次
img_base64 = self.get_img_base64(img_css_selector=img_css)
img_bg_base64 = self.get_img_base64(img_css_selector=img_bg_css)
distance = digest_letter_code(img_base64, img_bg_base64, 29)
distance = int(distance)
if distance < 0:
distance = 360 - abs(distance)
if slide_max_distance:
distance = distance / 360 * slide_max_distance
self.slide(button_css_selector, distance) # 滑动
time.sleep(2)
# 这里判断如果有验证码窗口就继续,如果没有代表已经验证成功
if not self.get_element_by_css_selector(img_css, timeout=2):
break
# 如果有刷新验证码按钮
if refresh_css:
self.click_element_by_css_selector(refresh_css, timeout=2)
def slide(self, button_css_selector: str, x, is_step=True):
"""水平向右滑动
Args:
is_step: 是否分步平滑滑动
"""
# 定位滑动按钮
slider_button = self.find_element(By.CSS_SELECTOR, button_css_selector)
# 使用 ActionChains 来模拟拖动操作
action_chains = ActionChains(self)
action_chains.click_and_hold(slider_button).perform() # 点击并按住滑块
# 分步拖动滑块到指定距离
if is_step:
for _ in range(5):
step_distance = int(int(x) / 5) # 计算每一步移动的距离
action_chains.move_by_offset(step_distance, 0).perform() # 逐步移动滑块
else:
# 拖动滑块到指定距离
action_chains.move_by_offset(int(x), 0).perform() # x 方向移动 distance,y 方向不变
action_chains.release().perform() # 释放滑块
def get_img_base64(self, img_url: str = '', img_css_selector: str = ''):
"""
img_url和img_css_selector参数任选其一
"""
if img_css_selector:
img_element = self.get_element_by_css_selector(img_css_selector)
img_url = img_element.get_attribute('src')
response = requests.get(img_url)
response.raise_for_status() # 确保请求成功
image = Image.open(BytesIO(response.content))
img_byte_arr = BytesIO()
image.save(img_byte_arr, format=image.format)
# 获取到byte类型的图片数据
img_byte = img_byte_arr.getvalue()
# 编码为base64字符串
img_base64 = base64.b64encode(img_byte).decode('utf-8')
return img_base64
def save_html(self):
"""保存网页源代码"""
html_source = self.page_source
with open('test.html', 'w', encoding='utf-8') as file:
file.write(html_source)
def set_cookies(self, url, cookies_str) -> dict:
"""设置cookies"""
self.get(url) # 进入主页
cookies = json.loads(cookies_str)
# 为WebDriver会话添加每个cookie
for cookie in cookies:
# 确保cookie中不包含会导致问题的额外字段
clean_cookie = {k: v for k, v in cookie.items() if k in ['name', 'value', 'domain', 'path', 'expiry', 'secure', 'httpOnly']}
# 如果cookie中有'expiry'字段,确保它是整数类型
if 'expiry' in clean_cookie:
clean_cookie['expiry'] = int(clean_cookie['expiry'])
self.add_cookie(clean_cookie)
# 再次访问网站或特定页面,以应用新的cookies
self.get(url)
img_captcha.py
"""图片验证码识别
图鉴官网:http://www.ttshitu.com/
"""
import time
from typing import Tuple
import requests
def report_error(rid):
"""
上报验证码解析错误
"""
data = {"id": rid}
result = requests.post("http://api.ttshitu.com/reporterror.json", json=data).text
return result
# 官方API请求
def __captcha_get(
image,
username="xxx",
password="xxx",
typeid=3,
angle=None,
typename=None,
remark=None,
imageback=None,
content=None,
title_image=None,
):
url = "http://api.ttshitu.com/predict"
data = {
"username": username,
"password": password,
"image": image,
"typeid": typeid,
"angle": angle,
"typename": typename,
"remark": remark,
"imageback": imageback,
"content": content,
"title_image": title_image,
}
response = requests.post(url, json=data).json()
if response.get("success"):
return response["data"]["id"], response["data"]["result"]
return None, response.get("message", None)
def digest_letter_code(img, imgback=None, typeid=1003) -> str:
"""识别数字英文类验证码
参数:
- img : 需要识别的图片
- 支持类型webelement, base64字符串, bytes
- typeid : 验证码类型, 默认数字英文混合
# 一、图片文字类型(默认 3 数英混合):
# 1 : 纯数字
# 1001:纯数字2
# 2 : 纯英文
# 1002:纯英文2
# 3 : 数英混合
# 1003:数英混合2
# 4 : 闪动GIF
# 7 : 无感学习(独家)
# 11 : 计算题
# 1005: 快速计算题
# 16 : 汉字
# 32 : 通用文字识别(证件、单据)
# 66: 问答题
# 49 :recaptcha图片识别 参考 https://shimo.im/docs/RPGcTpxdVgkkdQdY
# 二、图片旋转角度类型:
# 29 : 旋转类型
#
# 三、图片坐标点选类型:
# 19 : 1个坐标
# 20 : 3个坐标
# 21 : 3 ~ 5个坐标
# 22 : 5 ~ 8个坐标
# 27 : 1 ~ 4个坐标
# 48 : 轨迹类型
#
# 四、缺口识别
# 18 : 缺口识别(需要2张图 一张目标图一张缺口图)
# 33 : 单缺口识别(返回X轴坐标 只需要1张图)
# 五、拼图识别
# 53:拼图识别
返回值:
- (请求id, 识别结果)
"""
if not isinstance(img, str):
img = img.screenshot_as_base64
# time.sleep(1)
if imgback:
if not isinstance(imgback, str):
imgback = imgback.screenshot_as_base64
id, result = __captcha_get(image=img, imageback=imgback, typeid=typeid)
return result
def jigsaw_code(move_img, back_img) -> Tuple[str, int]:
"""拼图类验证码
参数:
- move_img : 可滑动的图片
- 支持类型webelement, base64字符串
- back_img : 背景图片
- 支持类型webelement, base64字符串
返回值:
- (请求id, 识别结果)
"""
if not isinstance(move_img, str):
move_img = move_img.screenshot_as_base64
if not isinstance(back_img, str):
back_img = back_img.screenshot_as_base64
flag, res = __captcha_get(image=move_img, imageback=back_img, typeid=18)
if flag:
return flag, int(res.split(",")[0])
return False
def click_code(title_img, panel_img, typeid=21) -> int:
"""点选类验证码
参数:
- title_img : 标题图片
- 支持类型webelement, base64字符串
- plan_img : 需要点选的图片
- 支持类型webelement, base64字符串
- typeid : 点选类型
- 19 点选1个坐标
- 20 点选3个坐标
- 21 点选3 ~ 5个坐标
- 22 点选5 ~ 8个坐标
- 27 点选1 ~ 4个坐标
- 42 通用点选验证码步骤
返回值:
- (请求id, 识别结果)
"""
if title_img is not None and not isinstance(title_img, str):
title_img = title_img.screenshot_as_base64
if panel_img is not None and not isinstance(panel_img, str):
panel_img = panel_img.screenshot_as_base64
flag, res = __captcha_get(
image=panel_img, title_image=title_img, typeid=typeid
)
if flag:
return flag, res.split("|")
return False
if __name__ == "__main__":
import base64
t1 = "./captcha.jpg"
with open(t1, "rb") as f:
base64_data = base64.b64encode(f.read())
t1 = base64_data.decode()
# with open(t3, "rb") as f:
# base64_data = base64.b64encode(f.read())
# t3 = base64_data.decode()
#
# res = digest_letter_code(t3, 29)
res = digest_letter_code(t1, typeid=33)
print("结果", res)
webdriver_proxy_auth_extension.py
"""
Selenium chrome 代理IP认证 自定义扩展
https://github.com/Smartproxy/Selenium-proxy-authentication
"""
import zipfile
import tempfile
import os
def proxies(username, password, endpoint, port):
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 2,
"name": "Proxies",
"permissions": [
"proxy",
"tabs",
"unlimitedStorage",
"storage",
"<all_urls>",
"webRequest",
"webRequestBlocking"
],
"background": {
"scripts": ["background.js"]
},
"minimum_chrome_version":"22.0.0"
}
"""
background_js = """
var config = {
mode: "fixed_servers",
rules: {
singleProxy: {
scheme: "http",
host: "%s",
port: parseInt(%s)
},
bypassList: ["localhost"]
}
};
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
function callbackFn(details) {
return {
authCredentials: {
username: "%s",
password: "%s"
}
};
}
chrome.webRequest.onAuthRequired.addListener(
callbackFn,
{urls: ["<all_urls>"]},
['blocking']
);
""" % (endpoint, port, username, password)
# 创建一个临时目录
with tempfile.TemporaryDirectory(delete=False) as temp_dir:
# 定义文件路径,系统会自动分配一个唯一的文件名
extension_path = os.path.join(temp_dir, 'proxies_extension.zip')
# 创建ZIP文件并写入内容
with zipfile.ZipFile(extension_path, 'w') as zp:
zp.writestr("manifest.json", manifest_json)
zp.writestr("background.js", background_js)
# 返回文件路径
return extension_path
# extension = 'proxies_extension.zip'
#
# with zipfile.ZipFile(extension, 'w') as zp:
# zp.writestr("manifest.json", manifest_json)
# zp.writestr("background.js", background_js)
#
# return extension
if __name__ == '__main__':
file_path = proxies('username', 'password', 'endpoint', 'port')
print(file_path)