写在前面
本文并不是教授如何去攻破网站的人机验证,而是出于对ai大模型
应用方向的技术研究。为防止授人以柄,我不会完整展示所有代码。
一、物体识别模型的选择
在通用领域的宏观物体识别中有很多模型都可以做,但是总结起来有3
个比较大的差异:
- 模型倾向不同
- 模型数据集不同
- 模型识别边界和可信率不同
造成这三个方向的差异的原因是模型的研发背景。所以针对不同场景的宏观物体识别需要从模型研发背景入手,中和考虑这三个方向进行选择。
比如我选择的是yolo 11
,但是在此之前我已经尝试了不下5
个通用模型了,在验证码识别的场景中效果都不尽人意,最终选择到yolo 11
是因为看到了美团、facebook
都对其有大量的模型研发和源数据集训练支持。秉承着用就要用最新的,所以选择yolo 11
二、如何用物体识别模型识别验证码
这个问题比较大,可以做如下拆解:
- 关键目标定位:使用
ocr
进行原始图像的文字特征识别获取到需要识别的目标
、验证码刷新按钮位置
、验证码提交按钮位置
。 - 需要识别的图像定位:通过前面三个位置的相对坐标计算
需要识别的目标图像
区域。 - 预处理图像:使用
opencv
预处理,使其更加容易被识别(主要是去掉图像噪声
加强边界轮廓
,因为验证码图像普遍存在大量噪声。这里不需要进行图像翻转、亮灯、裁剪等操作,yolo 11
默认自带了这些操作)。 - 多种模型识别:通过多模型结果汇总,补全单一模型的识别盲区。
- 定位识别目标位置:通过识别结果的
可信率
过滤结果,最终筛选出识别区域坐标,还原计算成窗口图像坐标。这就是鼠标要点击的识别区域。 - 使用模拟鼠标点击进行操作,至于模拟鼠标点击可以参考
rpa
或者爬虫
的技术,太简单,这里不赘述。 - 对点击结果进行验证,通常验证失败会重置验证码和验证目标,如果没有重置,触发
验证码刷新按钮位置
模拟点击。
三、技术实现
-
硬件要求:
CPU
最低主频2.2GHz
、多核多线程GPU
最低1080 Ti
RAM
最低16G
ROM
最低500G
-
需要安装的环境,不会安装可以参考 AI大模型环境搭建 :
- cuda
- cudnn
- conda
- pytorch
-
pytorch 初始化环境
- python 3.10
- cudatoolkit (版本与安装的
cuda
版本一致)
-
技术方案
- 用
python3
写一个http
服务,比如使用Flask/Django
。用来提供同步的模型输入与输出 - 用
python3
写一个通用ocr
服务,与模型一起部署,吃GPU
红利加快识别速度 web
页面可以直接使用python WebDriver
- 用
补充:
-
模型的精准度非常考验数据集,建议使用 lvis 数据集 二次训练
四、相关代码示例
ocr.py
完整代码:
import json
import os
from pathlib import Path
import easyocr
import cv2
import numpy as np # 导入 NumPy
from django.conf import settings
class OcrHandle:
def __init__(self):
# self.work_path = "/Users/liucheng/it/verification-code/web"
self.work_path = os.getcwd()
self.module_path = self.work_path + "/support"
self.package_path = self.module_path + "/ocr"
# 模型路径
self.model_path = self.package_path + "/tessdata"
self.class_json_path = self.package_path + "/classs.json"
# 输出图片路径
self.reader = None
# ocr识别结果
self.results = []
self.out_path = os.path.join(settings.MEDIA_OCR_OUT)
self.code_words = self.load_classes_locating_words() # 验证码提示词(下一行就是验证码类型) 匹配词
self.verify_button_words = self.load_verify_button_words() # 验证按钮 匹配词
self.success_words = self.load_success_words() # 验证成功 匹配词
self.close_refresh_words = self.load_close_refresh_words() # 关闭 & 刷新 匹配词
self.next_button_words = self.load_next_button_words() # 下一步按钮 匹配词
self.dialog_button_words = self.load_dialog_button_words() # 对话按钮 匹配词
self.input_text_box_words = self.load_input_text_box_words() # 消息输入文本框 匹配词
self.send_button_words = self.load_send_button_words() # 发送消息按钮 匹配词
self.dialog_web_button_words = self.load_dialog_web_button_words() # 对话按钮 匹配词
self.new_dialog_text_words = self.load_new_dialog_text_words() # 新对话文字 匹配词
def gpu_device(self):
# 创建 EasyOCR 读取器,指定语言为简体中文和英文 指定模型路径
self.reader = easyocr.Reader(['ch_sim', 'en'], model_storage_directory=self.model_path)
def cpu_device(self):
# 创建 EasyOCR 读取器,禁用 GPU 使用,强制使用 CPU
self.reader = easyocr.Reader(['ch_sim', 'en'], model_storage_directory=self.model_path,
gpu=False)
def device_read_text(self, image_path, cpu=False):
if cpu:
self.cpu_device()
out = self.read_text(image_path)
else:
self.gpu_device()
out = self.read_text(image_path)
return out
def read_text(self, image_path):
image = cv2.imread(image_path)
result = self.reader.readtext(image_path)
results = []
# 打印结果(包括文本和位置)
for detection in result:
text = detection[1] # 识别的文本
bbox = detection[0] # 文本框坐标 表示一个矩形框,四个点分别是左上、右上、右下和左下的坐标
print(f"文本: {text}")
print(f"坐标: {bbox}")
results.append({
"text": text,
"bbox": [int(bbox[0][0]), int(bbox[0][1]), int(bbox[2][0]), int(bbox[2][1])] # 左上 x,y 右下 x,y
# "bbox": {
# "left_top_x": int(bbox[0][0]), # 左上 x
# "left_top_y": int(bbox[0][1]), # 左上 y
# "right_top_x": int(bbox[1][0]), # 右上 x
# "right_top_y": int(bbox[1][1]), # 右上 y
# "right_down_x": int(bbox[2][0]), # 右下 x
# "right_down_y": int(bbox[2][1]), # 右下 y
# "left_down_x": int(bbox[3][0]), # 左下 x
# "left_down_y": int(bbox[3][1]) # 左下 y
# }
})
# 画出文本框(可视化)
# 获取四个点的坐标并转换为整型
pts = np.array(bbox, dtype=np.int32)
# 绘制文本框(如果坐标点足够)
if len(pts) == 4:
pts = pts.reshape((-1, 1, 2)) # OpenCV 绘制需要的格式
image = cv2.polylines(image, [pts], isClosed=True, color=(0, 255, 0), thickness=2)
self._save(image_path, image)
self.results = results
return self.results
def _save(self, image_path, result_image):
file_name = Path(image_path).stem
out_image_path = os.path.join(self.out_path, file_name + ".png")
# 保存处理后的图像
cv2.imwrite(out_image_path, result_image)
return out_image_path
@staticmethod
def load_close_refresh_words():
words = {"验证", "Verify".replace(" ", "")}
return words
@staticmethod
def load_classes_locating_words():
words = {"请选择所有含有以下物体的图片", "Please select all images with".replace(" ", "")}
return words
@staticmethod
def load_verify_button_words():
words = {"验证", "Verify".replace(" ", "")}
return words
@staticmethod
def load_success_words():
words = {"未找到账户", "末找到账户", "account not found".replace(" ", "")}
return words
@staticmethod
def load_next_button_words():
words = {"下一步", "下-步"}
return words
@staticmethod
def load_dialog_button_words():
words = {"对话"}
return words
@staticmethod
def load_dialog_web_button_words():
words = {"不在你的联系人中", "WHATSAPP 上的联系人", "WHATSAPP 上的联系人".replace(" ", "")}
return words
@staticmethod
def load_input_text_box_words():
words = {"发送消息"}
return words
@staticmethod
def load_send_button_words():
words = {""}
return words
@staticmethod
def load_new_dialog_text_words():
words = {"不在你的联系人中", "没有找到%", "WHATSAPP 上的联系人", "WHATSAPP 上的联系人".replace(" ", "")}
return words
def find_new_dialog_text(self):
return self.find_element(self.new_dialog_text_words, nnext=False, up=True)
def find_dialog_web_button(self):
return self.find_element(self.dialog_web_button_words, nnext=True)
def process_string(self, s: str):
if len(s) > 11 and s[-1] == '1': # 判断长度是否超过 11 且最后一位是 '1'
return s[:-1] # 去掉最后一位
return s # 否则返回原字符串
def find_refresh(self):
# 默认在右侧是刷新
element = self.find_close_refresh()
print(f"refresh element:{element}")
bbox = element["bbox"]
# 计算右侧范围
top_left_x = bbox[0]
top_left_y = bbox[1]
down_right_x = bbox[2]
down_right_y = bbox[3]
# 右边-右上角
top_centre_x = down_right_x - top_left_x
# 如果区域宽度小于 30,认为是单个按钮,否则进行右侧起点坐标计算
if top_centre_x < 30:
top_centre_x = top_left_x
top_centre_y = top_left_y
# # 左边-左下角
# down_centre_x = down_right_x - top_left_x
# down_centre_y = down_right_y
return [top_centre_x, top_centre_y, down_right_x, down_right_y]
def find_next_button(self):
return self.find_element(self.next_button_words)
def find_close_refresh(self):
return self.find_element(self.close_refresh_words, up=True)
def find_success(self):
return self.find_element(self.success_words)
def find_words(self):
return self.find_element(self.code_words, nnext=True)
def find_verify_button(self):
return self.find_element(self.verify_button_words)
def find_dialog_button(self):
return self.find_element(self.dialog_button_words)
def find_input_text_box(self):
return self.find_element(self.input_text_box_words)
def find_send_button(self):
return self.find_element(self.send_button_words)
"""
在 ocr 识别的结果中根据 key 查找元素
nnext=true 时,会查找 key 匹配的元素的下一个(index+1)
up=true 时,会查找 key 匹配的元素的上一个(index-1)
"""
def find_element(self, match=None, nnext=False, up=False):
if match is None:
match = set()
element = None
results = self.results
for i in range(len(results)): # 遍历整个列表
key = results[i]['text'].replace(" ", "") # 去掉空格进行匹配
for word in match:
if "%" in word: # 处理带 '%' 的前缀匹配
prefix = word.replace("%", "")
if key.startswith(prefix): # 前缀匹配
print(f"当前元素: {results[i]}")
if nnext and i < len(results) - 1: # 避免越界
element = results[i + 1]
print(f"下一个元素: {element}")
elif up and i > 0: # 避免负索引
element = results[i - 1]
print(f"上一个元素: {element}")
else:
element = results[i]
return element # 直接返回,避免不必要的循环
elif key == word: # 精确匹配
print(f"当前元素: {results[i]}")
if nnext and i < len(results) - 1: # 避免越界
element = results[i + 1]
print(f"下一个元素: {element}")
elif up and i > 0: # 避免负索引
element = results[i - 1]
print(f"上一个元素: {element}")
else:
element = results[i]
return element # 直接返回
return element # 没找到返回 None
#
# res = OcrHandle().read_text("/Users/liucheng/rpa/actionImage/code4.png", True)
# print(res)
yolo.py
完整代码:
import json
import os
from ultralytics import YOLO
import cv2
from pathlib import Path
from django.conf import settings
# 文档
# https://docs.ultralytics.com/zh/datasets/#example-code-to-optimize-and-zip-a-dataset
# https://docs.ultralytics.com/zh/datasets/detect/lvis/#dataset-yaml
# https://docs.ultralytics.com/zh/guides/yolo-common-issues/#gpu-deployment-issues
class Yolo11Handle:
_instance = None
_initialized = False # 初始化标志
def __init__(self):
# self.work_path = "/Users/liucheng/it/verification-code/web"
self.work_path = os.getcwd()
self.module_path = self.work_path + "/support"
self.package_path = self.module_path + "/yolo"
# 模型路径
self.model_path = self.package_path + "/model/yolo11x.pt"
self.class_json_path = self.package_path + "/classs.json"
# 输出图片路径
self.out_path = os.path.join(settings.MEDIA_YOLO_OUT)
self.model = YOLO(self.model_path)
"""
检测方法
:param image_path: 要识别的图片路径
:param conf: 可信度,可信度要求过高肯能会导致模型识别不到
:param show: 是否弹窗显示带检测结果的图片 false 表示不显示
:param classes: 要识别的物体,参数请参考,输入示例 classes=[1,2,3,4]
{0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone', 68: 'microwave', 69: 'oven', 70: 'toaster', 71: 'sink', 72: 'refrigerator', 73: 'book', 74: 'clock', 75: 'vase', 76: 'scissors', 77: 'teddy bear', 78: 'hair drier', 79: 'toothbrush'}
"""
#
def detect(self, image_path, classes=[], conf=0.05):
image = cv2.imread(image_path)
result_img, results, box_coords = self._predict_and_detect(image, classes, conf)
# 打印检测框坐标
print("检测结果:", box_coords)
file_name = Path(image_path).stem
out_image_path = os.path.join(self.out_path, file_name + ".png")
cv2.imwrite(out_image_path, result_img)
return box_coords, out_image_path, result_img
# 定义预测函数,用于选择模型进行推理预测
def _predict(self, img, classes=[], conf=0.5):
"""
使用给定的模型对输入的图像进行预测,返回检测结果。
:param img: 输入的图像(应该是读取过的图像数据)
:param classes: 可选的类别过滤列表,指定哪些类别的物体需要被检测
:param conf: 置信度阈值,只有当检测物体的置信度高于此值时才会返回该物体
:return: 预测结果(通常是模型返回的检测框、类别标签等信息)
"""
print("classes:", classes)
# 如果传入了classes列表,则按照指定类别进行预测
if classes:
results = self.model.predict(img, classes=classes, conf=conf)
# 否则不限制类别,只根据置信度进行预测
else:
results = self.model.predict(img, conf=conf)
return results # 返回检测结果
# 定义预测并绘制检测框的函数
def _predict_and_detect(self, img, classes=[], conf=0.5, rectangle_thickness=2, text_thickness=1):
"""
使用模型对图像进行预测,并且在检测到物体后绘制边界框和标签。
:param img: 输入的图像(应该是读取过的图像数据)
:param classes: 可选的类别过滤列表,指定哪些类别的物体需要被检测
:param conf: 置信度阈值,只有当检测物体的置信度高于此值时才会返回该物体
:param rectangle_thickness: 绘制边界框的线条厚度
:param text_thickness: 绘制文本标签的线条厚度
:return: 返回处理过的图像和检测结果
"""
# 调用之前定义的predict函数,获取预测结果
results = self._predict(img, classes, conf=conf)
# 用于存储所有检测框的坐标
box_coordinates = []
# 遍历所有检测结果
for result in results:
# print("物体类别:", result.names)
# 遍历每一个检测框
for box in result.boxes:
# 获取边界框坐标 [x_min, y_min, x_max, y_max]
x_min, y_min, x_max, y_max = box.xyxy[0]
# 如果输出的框坐标是 [100, 150, 200, 250],那就意味着:
#
# 左上角坐标是 (100, 150)
# 右下角坐标是 (200, 250)
# 这表示这个物体框覆盖的区域是图像从 (100, 150) 到 (200, 250) 这个区域
# 将框坐标保存到box_coordinates列表
box_coordinates.append({
"classes": result.names[int(box.cls[0])], # 物体类别标签
"conf": box.conf[0].item(), # 置信度 .item() 方法只适用于包含单个元素的 Tensor。如果你的 Tensor 包含多个元素,需要使用其他方法(如 .tolist())来获取它们
"box": [int(x_min), int(y_min), int(x_max), int(y_max)] # 边界框坐标
})
# 绘制边界框,xyxy[0]表示[x_min, y_min, x_max, y_max]
cv2.rectangle(img,
(int(x_min), int(y_min)), # 左上角坐标
(int(x_max), int(y_max)), # 右下角坐标
(255, 0, 0), # 颜色,BGR格式:红色
rectangle_thickness) # 边界框的线条厚度
# 绘制文本,显示物体类别及置信度
cv2.putText(img,
f"{result.names[int(box.cls[0])]}", # 类别标签
(int(x_min), int(y_min) - 10), # 文本位置,位于框的上方
cv2.FONT_HERSHEY_PLAIN, # 字体类型
1, # 字体大小
(255, 0, 0), # 文字颜色,BGR格式:红色
text_thickness) # 文字厚度
return img, results, box_coordinates # 返回处理过的图像、检测结果、框的坐标列表
@classmethod
def initialize(cls):
# 只会初始化一次
if cls._instance is None:
print("Initializing Yolo11Handle...")
# 这里进行实际的初始化工作,例如加载模型
cls._instance = cls() # 创建唯一实例
# 其他初始化代码,例如加载 YOLO 模型、权重等
else:
print("Yolo11Handle already initialized.")
@classmethod
def get_instance(cls):
# 获取 Yolo11Handle 实例
return cls._instance
def match_classes(self, words):
with open(self.class_json_path, "r", encoding="utf-8") as file:
data = json.load(file)
# 将列表转换为字典,key 为 'name', value 为 'value'
result = {item['name']: item['value'] for item in data}
# 输出转换后的字典(相当于 Map)
# print(result) # 输出: {'人': 1, '飞机': 2}
classes = result[words]
print(f"words:{words} =>>> {classes}")
return classes
# Yolo11Handle.initialize()
# yolo11Handle = Yolo11Handle.get_instance()
# classes = yolo11Handle.match_classes("airplane")
# print(classes)
# # yolo11Handle.detect("/Users/liucheng/rpa/actionImage/code4.png", [], 0.01, True)
# yolo11Handle.detect("/Users/liucheng/rpa/actionImage/6_2.png", [], 0.01, True)
identify_code/views.py
部分代码:
def identify_code(image_path):
# 获取需要的 设备类型
ocrHandle = OcrHandle()
# 从ocr中获取 图片特征
ocrHandle.device_read_text(image_path=image_path, cpu=False)
# 先尝试获取 是否验证成功
code_success_element = ocrHandle.find_success()
print(f"code_success_element:{code_success_element}")
if code_success_element:
return identify_object_code_json_result(identify=True, handle="success")
# 尝试获取 是否是开始登录下一步
next_button_element = ocrHandle.find_next_button()
print(f"next_button:{next_button_element}")
if next_button_element:
next_button_bbox = next_button_element["bbox"]
return identify_object_code_json_result(identify=False, button=next_button_bbox, handle="init")
# 根据验证码提示词 获取需要识别的验证码物体类型
code_element = ocrHandle.find_words()
print(f"code_element:{code_element}")
if not code_element:
# 当前验证码识词获取失败,获取刷新按钮的位置
refresh_button_bbox = ocrHandle.find_refresh()
return identify_object_code_json_result(identify=False, button=refresh_button_bbox, handle="refresh_code")
code_words = code_element["text"]
# 获取验证按钮位置
verify_button_element = ocrHandle.find_verify_button()
print(f"verify_button_element:{verify_button_element}")
code_button_bbox = verify_button_element["bbox"]
# 初始化识别模型
Yolo11Handle.initialize()
yolo = Yolo11Handle.get_instance()
# 翻译物体类型为 yolo 类型
classes = yolo.match_classes(code_words)
# yolo 模型开始识别
coordinates, _, _, = yolo.detect(image_path, classes)
# 识别到的位置
if coordinates:
return identify_object_code_json_result(identify=True, codes=coordinates, button=code_button_bbox,
handle="action")
else:
# 当前验证码识别失败,获取刷新按钮的位置
refresh_button_bbox = ocrHandle.find_refresh()
return identify_object_code_json_result(identify=False, button=refresh_button_bbox, handle="refresh_code")
# return json.dumps(data)
"""
identify_object_code 方法的返回
:param identify 验证是否成功 true 表示成功
:param codes 识别的验证码位置
:param button 按钮位置
:param handle 识别结果对应的处理
init:初始化执行脚本
action: 验证码定位成功,转换执行脚本
refresh_code: 刷新验证码
success: 当前识别成功,开始下一组识别
"""
def identify_object_code_json_result(identify=None, codes=None, button=None, handle=None):
return JsonResponse({"status": "success", "result": {
"identify": identify,
"codes": codes,
"button": button,
"handle": handle
}})
- 我的客户端是
kotlin
写的rpa
,运行在客户端,作用是在客户端打开浏览器,访问网页,配合上面的python
项目进行验证码识别和点击实现。 python
这部分运行在远程GPU
服务器上,通过http
实时交互
附上一个使用python
(使用webdriver
)当作客户端打开浏览器并点击的代码。
import os
import platform
import random
import subprocess
import sys
import threading
import requests
import sqlite3
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import WebDriverException
# 获取数据库路径
if platform.system() == "Windows":
base_data_dir = os.path.join(os.environ["USERPROFILE"], "MyAppData", "whatsapp_bot")
else:
base_data_dir = os.path.join(os.environ["HOME"], "MyAppData", "whatsapp_bot")
db_path = os.path.join(base_data_dir, "tasks.db")
def get_pending_tasks():
""" 从数据库获取未处理的任务 """
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT id, task_no, phone, message, notice_url FROM tasks WHERE status='pending'")
tasks = cursor.fetchall()
conn.close()
return tasks
def update_task_status(task_id, status):
""" 更新任务状态 """
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("UPDATE tasks SET status=? WHERE id=?", (status, task_id))
conn.commit()
conn.close()
def send_whatsapp_message(driver, phone, message):
""" 使用 WebDriver 发送 WhatsApp 消息 """
print(f"📩 正在向 {phone} 发送消息...")
# 打开聊天窗口
whatsapp_url = f"https://web.whatsapp.com/send?phone={phone}"
driver.get(whatsapp_url)
# 随机等待时间,避免检测
time.sleep(random.randint(10, 20))
try:
# 等待输入框加载
input_box = WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.XPATH, '//div[@aria-label="输入消息"]'))
)
# 输入消息
input_box.send_keys(message)
time.sleep(2)
# 发送消息
input_box.send_keys(Keys.ENTER)
print(f"✅ 已成功发送给 {phone}")
return True
except Exception as e:
print(f"❌ {phone}发送消息失败: {e}")
return False
def notify_result(notice_url, task_no, phone, success):
""" 发送 POST 请求通知消息状态 """
payload = {"type": 1, "success": True, "taskNo": task_no, "phone": phone, "exist": success, "send": success,
"joinGroup": None}
try:
response = requests.post(notice_url, json=payload, timeout=5)
print(f"📢 通知结果: {response.status_code} - {response.text}")
except Exception as e:
print(f"❌ 发送通知失败: {e}")
# **获取存储目录**
if platform.system() == "Windows":
base_data_dir = os.path.join(os.environ["USERPROFILE"], "MyAppData", "whatsapp_bot")
else:
base_data_dir = os.path.join(os.environ["HOME"], "MyAppData", "whatsapp_bot")
whatsapp_profile_dir = os.path.join(base_data_dir, "whatsapp_profiles")
# **确保程序可以正常退出**
def exit_program():
print("退出 WebDriver 进程,关闭整个程序...")
sys.exit(0) # ✅ 在 Windows 上更稳定
# os._exit(0) # 另一种方式,直接杀死进程
def start_webdriver(profile_name):
""" 启动 WebDriver 并不断检查任务队列 """
print(f"启动 WebDriver")
# 获取当前目录,兼容 PyInstaller
if getattr(sys, 'frozen', False):
base_dir = sys._MEIPASS # PyInstaller 解压后的临时目录
else:
base_dir = os.path.dirname(os.path.abspath(__file__)) # 普通运行模式
system_os = platform.system() # 检测操作系统
if system_os == "Windows":
chromedriver_path = os.path.join(base_dir, "chromedriver", "chromedriver.exe")
elif system_os == "Darwin": # macOS
chromedriver_path = os.path.join(base_dir, "chromedriver", "chromedriver")
else:
raise Exception("Unsupported OS")
user_data_dir = os.path.abspath(f"{whatsapp_profile_dir}/{profile_name}")
# 确保 `whatsapp_profiles` 目录存在
if not os.path.exists(user_data_dir):
os.makedirs(user_data_dir)
options = webdriver.ChromeOptions()
options.add_argument(f"--user-data-dir={user_data_dir}") # 每个 WebDriver 用不同的 profile
options.add_argument("--disable-gpu") # 关闭 GPU 以防崩溃
options.add_argument("--no-sandbox") # 关闭沙盒模式(适用于 Windows)
options.add_argument("--disable-dev-shm-usage") # 解决共享内存问题
options.add_argument("--disable-extensions") # 关闭扩展,减少干扰
service = Service(chromedriver_path)
try:
driver = webdriver.Chrome(service=service, options=options)
driver.get("你需要访问的网址")
# **🚀 启动 WebDriver 监测线程**
threading.Thread(target=monitor_webdriver, args=(driver,), daemon=True).start()
except WebDriverException as e:
print(f"WebDriver 启动失败或崩溃: {e},退出程序...")
exit_program()
print(f"请使用 {profile_name} 账号扫码登录 WhatsApp Web")
# 轮询登录状态
while True:
time.sleep(2)
try:
# 尝试找到搜索框,判断是否已登录成功
search_box = driver.find_element(By.XPATH, '//div[@contenteditable="true"]')
if search_box:
print("✅ WhatsApp 登录成功!")
break
except:
print("⌛ 请扫码登录 WhatsApp Web...")
print(f"检查 tasks 队列")
while True:
tasks = get_pending_tasks()
if tasks:
for task_id, task_no, phone, message, notice_url in tasks:
success = send_whatsapp_message(driver, phone, message)
status = "done" if success else "failed"
update_task_status(task_id, status)
# 发送通知
notify_result(notice_url, task_no, phone, success)
time.sleep(3) # 每 5 秒检查一次数据库
def monitor_webdriver(driver):
""" 监测 WebDriver 是否被关闭 """
while True:
time.sleep(1)
try:
if not driver.window_handles:
print("WebDriver 窗口已关闭")
driver.quit()
exit_program()
except WebDriverException:
print("WebDriver 进程异常,关闭程序...")
driver.quit()
exit_program()
if __name__ == "__main__":
profile_name = "" # 你可以运行多个 WebDriver 进程,指定不同 profile
start_webdriver(profile_name)
完整效果没有录视频,自己整吧。今天就到这了
评论区