侧边栏壁纸
博主头像
术业有道之编程博主等级

亦是三月纷飞雨,亦是人间惊鸿客。亦是秋霜去叶多,亦是风华正当时。

  • 累计撰写 99 篇文章
  • 累计创建 50 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

用ai识别并点击验证谷歌物体验证码

Administrator
2025-03-13 / 0 评论 / 0 点赞 / 16 阅读 / 28112 字

写在前面

本文并不是教授如何去攻破网站的人机验证,而是出于对ai大模型应用方向的技术研究。为防止授人以柄,我不会完整展示所有代码。

一、物体识别模型的选择

在通用领域的宏观物体识别中有很多模型都可以做,但是总结起来有3个比较大的差异:

  • 模型倾向不同
  • 模型数据集不同
  • 模型识别边界和可信率不同

造成这三个方向的差异的原因是模型的研发背景。所以针对不同场景的宏观物体识别需要从模型研发背景入手,中和考虑这三个方向进行选择。

比如我选择的是yolo 11,但是在此之前我已经尝试了不下5个通用模型了,在验证码识别的场景中效果都不尽人意,最终选择到yolo 11 是因为看到了美团、facebook都对其有大量的模型研发和源数据集训练支持。秉承着用就要用最新的,所以选择yolo 11

二、如何用物体识别模型识别验证码

这个问题比较大,可以做如下拆解:

  • 关键目标定位:使用ocr进行原始图像的文字特征识别获取到需要识别的目标验证码刷新按钮位置验证码提交按钮位置
  • 需要识别的图像定位:通过前面三个位置的相对坐标计算需要识别的目标图像区域。
  • 预处理图像:使用opencv预处理,使其更加容易被识别(主要是去掉图像噪声加强边界轮廓,因为验证码图像普遍存在大量噪声。这里不需要进行图像翻转、亮灯、裁剪等操作,yolo 11 默认自带了这些操作)。
  • 多种模型识别:通过多模型结果汇总,补全单一模型的识别盲区。
  • 定位识别目标位置:通过识别结果的可信率过滤结果,最终筛选出识别区域坐标,还原计算成窗口图像坐标。这就是鼠标要点击的识别区域。
  • 使用模拟鼠标点击进行操作,至于模拟鼠标点击可以参考rpa或者爬虫的技术,太简单,这里不赘述。
  • 对点击结果进行验证,通常验证失败会重置验证码和验证目标,如果没有重置,触发验证码刷新按钮位置模拟点击。

三、技术实现

  • 硬件要求:

    • CPU最低主频2.2GHz、多核多线程
    • GPU最低1080 Ti
    • RAM 最低16G
    • ROM最低500G
  • 需要安装的环境,不会安装可以参考 AI大模型环境搭建

    • cuda
    • cudnn
    • conda
    • pytorch
  • pytorch 初始化环境

    • python 3.10
    • cudatoolkit (版本与安装的cuda版本一致)
  • 技术方案

    • python3写一个http服务,比如使用 Flask/Django。用来提供同步的模型输入与输出
    • python3写一个通用ocr服务,与模型一起部署,吃GPU红利加快识别速度
    • web页面可以直接使用python WebDriver

补充:

  • 模型的精准度非常考验数据集,建议使用 lvis 数据集 二次训练

四、相关代码示例

项目结构.png

  • ocr.py完整代码:
import json
import os
from pathlib import Path

import easyocr
import cv2
import numpy as np  # 导入 NumPy
from django.conf import settings


class OcrHandle:
    def __init__(self):
        # self.work_path = "/Users/liucheng/it/verification-code/web"
        self.work_path = os.getcwd()
        self.module_path = self.work_path + "/support"
        self.package_path = self.module_path + "/ocr"
        # 模型路径
        self.model_path = self.package_path + "/tessdata"
        self.class_json_path = self.package_path + "/classs.json"
        # 输出图片路径
        self.reader = None
        # ocr识别结果
        self.results = []
        self.out_path = os.path.join(settings.MEDIA_OCR_OUT)
        self.code_words = self.load_classes_locating_words()  # 验证码提示词(下一行就是验证码类型)  匹配词
        self.verify_button_words = self.load_verify_button_words()  # 验证按钮  匹配词
        self.success_words = self.load_success_words()  # 验证成功  匹配词
        self.close_refresh_words = self.load_close_refresh_words()  # 关闭 & 刷新 匹配词
        self.next_button_words = self.load_next_button_words()  # 下一步按钮 匹配词
        self.dialog_button_words = self.load_dialog_button_words()  # 对话按钮 匹配词
        self.input_text_box_words = self.load_input_text_box_words()  # 消息输入文本框 匹配词
        self.send_button_words = self.load_send_button_words()  # 发送消息按钮 匹配词
        self.dialog_web_button_words = self.load_dialog_web_button_words()  # 对话按钮 匹配词
        self.new_dialog_text_words = self.load_new_dialog_text_words()  # 新对话文字 匹配词

    def gpu_device(self):
        # 创建 EasyOCR 读取器,指定语言为简体中文和英文 指定模型路径
        self.reader = easyocr.Reader(['ch_sim', 'en'], model_storage_directory=self.model_path)

    def cpu_device(self):
        # 创建 EasyOCR 读取器,禁用 GPU 使用,强制使用 CPU
        self.reader = easyocr.Reader(['ch_sim', 'en'], model_storage_directory=self.model_path,
                                     gpu=False)

    def device_read_text(self, image_path, cpu=False):
        if cpu:
            self.cpu_device()
            out = self.read_text(image_path)
        else:
            self.gpu_device()
            out = self.read_text(image_path)
        return out

    def read_text(self, image_path):
        image = cv2.imread(image_path)
        result = self.reader.readtext(image_path)
        results = []
        # 打印结果(包括文本和位置)
        for detection in result:
            text = detection[1]  # 识别的文本
            bbox = detection[0]  # 文本框坐标 表示一个矩形框,四个点分别是左上、右上、右下和左下的坐标
            print(f"文本: {text}")
            print(f"坐标: {bbox}")
            results.append({
                "text": text,
                "bbox": [int(bbox[0][0]), int(bbox[0][1]), int(bbox[2][0]), int(bbox[2][1])]  # 左上 x,y 右下 x,y
                # "bbox": {
                # "left_top_x": int(bbox[0][0]),  # 左上 x
                # "left_top_y": int(bbox[0][1]),  # 左上 y
                # "right_top_x": int(bbox[1][0]),  # 右上 x
                # "right_top_y": int(bbox[1][1]),  # 右上 y
                # "right_down_x": int(bbox[2][0]),  # 右下 x
                # "right_down_y": int(bbox[2][1]),  # 右下 y
                # "left_down_x": int(bbox[3][0]),  # 左下 x
                # "left_down_y": int(bbox[3][1])  # 左下 y
                # }
            })
            # 画出文本框(可视化)
            # 获取四个点的坐标并转换为整型
            pts = np.array(bbox, dtype=np.int32)
            # 绘制文本框(如果坐标点足够)
            if len(pts) == 4:
                pts = pts.reshape((-1, 1, 2))  # OpenCV 绘制需要的格式
                image = cv2.polylines(image, [pts], isClosed=True, color=(0, 255, 0), thickness=2)
                self._save(image_path, image)
        self.results = results
        return self.results

    def _save(self, image_path, result_image):
        file_name = Path(image_path).stem
        out_image_path = os.path.join(self.out_path, file_name + ".png")
        # 保存处理后的图像
        cv2.imwrite(out_image_path, result_image)
        return out_image_path

    @staticmethod
    def load_close_refresh_words():
        words = {"验证", "Verify".replace(" ", "")}
        return words

    @staticmethod
    def load_classes_locating_words():
        words = {"请选择所有含有以下物体的图片", "Please select all images with".replace(" ", "")}
        return words

    @staticmethod
    def load_verify_button_words():
        words = {"验证", "Verify".replace(" ", "")}
        return words

    @staticmethod
    def load_success_words():
        words = {"未找到账户", "末找到账户", "account not found".replace(" ", "")}
        return words

    @staticmethod
    def load_next_button_words():
        words = {"下一步", "下-步"}
        return words

    @staticmethod
    def load_dialog_button_words():
        words = {"对话"}
        return words

    @staticmethod
    def load_dialog_web_button_words():
        words = {"不在你的联系人中", "WHATSAPP 上的联系人", "WHATSAPP 上的联系人".replace(" ", "")}
        return words

    @staticmethod
    def load_input_text_box_words():
        words = {"发送消息"}
        return words

    @staticmethod
    def load_send_button_words():
        words = {""}
        return words

    @staticmethod
    def load_new_dialog_text_words():
        words = {"不在你的联系人中", "没有找到%", "WHATSAPP 上的联系人", "WHATSAPP 上的联系人".replace(" ", "")}
        return words

    def find_new_dialog_text(self):
        return self.find_element(self.new_dialog_text_words, nnext=False, up=True)

    def find_dialog_web_button(self):
        return self.find_element(self.dialog_web_button_words, nnext=True)

    def process_string(self, s: str):
        if len(s) > 11 and s[-1] == '1':  # 判断长度是否超过 11 且最后一位是 '1'
            return s[:-1]  # 去掉最后一位
        return s  # 否则返回原字符串

    def find_refresh(self):
        # 默认在右侧是刷新
        element = self.find_close_refresh()
        print(f"refresh element:{element}")
        bbox = element["bbox"]
        # 计算右侧范围
        top_left_x = bbox[0]
        top_left_y = bbox[1]
        down_right_x = bbox[2]
        down_right_y = bbox[3]
        # 右边-右上角
        top_centre_x = down_right_x - top_left_x
        # 如果区域宽度小于 30,认为是单个按钮,否则进行右侧起点坐标计算
        if top_centre_x < 30:
            top_centre_x = top_left_x
        top_centre_y = top_left_y
        # # 左边-左下角
        # down_centre_x = down_right_x - top_left_x
        # down_centre_y = down_right_y
        return [top_centre_x, top_centre_y, down_right_x, down_right_y]

    def find_next_button(self):
        return self.find_element(self.next_button_words)

    def find_close_refresh(self):
        return self.find_element(self.close_refresh_words, up=True)

    def find_success(self):
        return self.find_element(self.success_words)

    def find_words(self):
        return self.find_element(self.code_words, nnext=True)

    def find_verify_button(self):
        return self.find_element(self.verify_button_words)

    def find_dialog_button(self):
        return self.find_element(self.dialog_button_words)

    def find_input_text_box(self):
        return self.find_element(self.input_text_box_words)

    def find_send_button(self):
        return self.find_element(self.send_button_words)

    """
    在 ocr 识别的结果中根据 key 查找元素
    nnext=true 时,会查找 key 匹配的元素的下一个(index+1)
    up=true 时,会查找 key 匹配的元素的上一个(index-1)
    """

    def find_element(self, match=None, nnext=False, up=False):
        if match is None:
            match = set()
        element = None
        results = self.results
        for i in range(len(results)):  # 遍历整个列表
            key = results[i]['text'].replace(" ", "")  # 去掉空格进行匹配

            for word in match:
                if "%" in word:  # 处理带 '%' 的前缀匹配
                    prefix = word.replace("%", "")
                    if key.startswith(prefix):  # 前缀匹配
                        print(f"当前元素: {results[i]}")
                        if nnext and i < len(results) - 1:  # 避免越界
                            element = results[i + 1]
                            print(f"下一个元素: {element}")
                        elif up and i > 0:  # 避免负索引
                            element = results[i - 1]
                            print(f"上一个元素: {element}")
                        else:
                            element = results[i]
                        return element  # 直接返回,避免不必要的循环
                elif key == word:  # 精确匹配
                    print(f"当前元素: {results[i]}")
                    if nnext and i < len(results) - 1:  # 避免越界
                        element = results[i + 1]
                        print(f"下一个元素: {element}")
                    elif up and i > 0:  # 避免负索引
                        element = results[i - 1]
                        print(f"上一个元素: {element}")
                    else:
                        element = results[i]
                    return element  # 直接返回

        return element  # 没找到返回 None

#
# res = OcrHandle().read_text("/Users/liucheng/rpa/actionImage/code4.png", True)
# print(res)

  • yolo.py完整代码:
import json
import os

from ultralytics import YOLO
import cv2
from pathlib import Path
from django.conf import settings


# 文档
# https://docs.ultralytics.com/zh/datasets/#example-code-to-optimize-and-zip-a-dataset
# https://docs.ultralytics.com/zh/datasets/detect/lvis/#dataset-yaml
# https://docs.ultralytics.com/zh/guides/yolo-common-issues/#gpu-deployment-issues
class Yolo11Handle:
    _instance = None
    _initialized = False  # 初始化标志

    def __init__(self):
        # self.work_path = "/Users/liucheng/it/verification-code/web"
        self.work_path = os.getcwd()
        self.module_path = self.work_path + "/support"
        self.package_path = self.module_path + "/yolo"
        # 模型路径
        self.model_path = self.package_path + "/model/yolo11x.pt"
        self.class_json_path = self.package_path + "/classs.json"
        # 输出图片路径
        self.out_path = os.path.join(settings.MEDIA_YOLO_OUT)
        self.model = YOLO(self.model_path)

    """
     检测方法
    :param image_path: 要识别的图片路径
    :param conf: 可信度,可信度要求过高肯能会导致模型识别不到
    :param show: 是否弹窗显示带检测结果的图片 false 表示不显示
    :param classes: 要识别的物体,参数请参考,输入示例 classes=[1,2,3,4]
         {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone', 68: 'microwave', 69: 'oven', 70: 'toaster', 71: 'sink', 72: 'refrigerator', 73: 'book', 74: 'clock', 75: 'vase', 76: 'scissors', 77: 'teddy bear', 78: 'hair drier', 79: 'toothbrush'}
    """

    #
    def detect(self, image_path, classes=[], conf=0.05):
        image = cv2.imread(image_path)
        result_img, results, box_coords = self._predict_and_detect(image, classes, conf)
        # 打印检测框坐标
        print("检测结果:", box_coords)
        file_name = Path(image_path).stem
        out_image_path = os.path.join(self.out_path, file_name + ".png")
        cv2.imwrite(out_image_path, result_img)
        return box_coords, out_image_path, result_img

    # 定义预测函数,用于选择模型进行推理预测
    def _predict(self, img, classes=[], conf=0.5):
        """
        使用给定的模型对输入的图像进行预测,返回检测结果。

        :param img: 输入的图像(应该是读取过的图像数据)
        :param classes: 可选的类别过滤列表,指定哪些类别的物体需要被检测
        :param conf: 置信度阈值,只有当检测物体的置信度高于此值时才会返回该物体
        :return: 预测结果(通常是模型返回的检测框、类别标签等信息)
        """
        print("classes:", classes)
        # 如果传入了classes列表,则按照指定类别进行预测
        if classes:
            results = self.model.predict(img, classes=classes, conf=conf)
        # 否则不限制类别,只根据置信度进行预测
        else:
            results = self.model.predict(img, conf=conf)

        return results  # 返回检测结果

    # 定义预测并绘制检测框的函数
    def _predict_and_detect(self, img, classes=[], conf=0.5, rectangle_thickness=2, text_thickness=1):
        """
        使用模型对图像进行预测,并且在检测到物体后绘制边界框和标签。

        :param img: 输入的图像(应该是读取过的图像数据)
        :param classes: 可选的类别过滤列表,指定哪些类别的物体需要被检测
        :param conf: 置信度阈值,只有当检测物体的置信度高于此值时才会返回该物体
        :param rectangle_thickness: 绘制边界框的线条厚度
        :param text_thickness: 绘制文本标签的线条厚度
        :return: 返回处理过的图像和检测结果
        """
        # 调用之前定义的predict函数,获取预测结果
        results = self._predict(img, classes, conf=conf)
        # 用于存储所有检测框的坐标
        box_coordinates = []
        # 遍历所有检测结果
        for result in results:
            # print("物体类别:", result.names)
            # 遍历每一个检测框
            for box in result.boxes:
                # 获取边界框坐标 [x_min, y_min, x_max, y_max]
                x_min, y_min, x_max, y_max = box.xyxy[0]
                # 如果输出的框坐标是 [100, 150, 200, 250],那就意味着:
                #
                # 左上角坐标是 (100, 150)
                # 右下角坐标是 (200, 250)
                # 这表示这个物体框覆盖的区域是图像从 (100, 150) 到 (200, 250) 这个区域
                # 将框坐标保存到box_coordinates列表
                box_coordinates.append({
                    "classes": result.names[int(box.cls[0])],  # 物体类别标签
                    "conf": box.conf[0].item(),  # 置信度 .item() 方法只适用于包含单个元素的 Tensor。如果你的 Tensor 包含多个元素,需要使用其他方法(如 .tolist())来获取它们
                    "box": [int(x_min), int(y_min), int(x_max), int(y_max)]  # 边界框坐标
                })

                # 绘制边界框,xyxy[0]表示[x_min, y_min, x_max, y_max]
                cv2.rectangle(img,
                              (int(x_min), int(y_min)),  # 左上角坐标
                              (int(x_max), int(y_max)),  # 右下角坐标
                              (255, 0, 0),  # 颜色,BGR格式:红色
                              rectangle_thickness)  # 边界框的线条厚度

                # 绘制文本,显示物体类别及置信度
                cv2.putText(img,
                            f"{result.names[int(box.cls[0])]}",  # 类别标签
                            (int(x_min), int(y_min) - 10),  # 文本位置,位于框的上方
                            cv2.FONT_HERSHEY_PLAIN,  # 字体类型
                            1,  # 字体大小
                            (255, 0, 0),  # 文字颜色,BGR格式:红色
                            text_thickness)  # 文字厚度

        return img, results, box_coordinates  # 返回处理过的图像、检测结果、框的坐标列表

    @classmethod
    def initialize(cls):
        # 只会初始化一次
        if cls._instance is None:
            print("Initializing Yolo11Handle...")
            # 这里进行实际的初始化工作,例如加载模型
            cls._instance = cls()  # 创建唯一实例
            # 其他初始化代码,例如加载 YOLO 模型、权重等
        else:
            print("Yolo11Handle already initialized.")

    @classmethod
    def get_instance(cls):
        # 获取 Yolo11Handle 实例
        return cls._instance

    def match_classes(self, words):
        with open(self.class_json_path, "r", encoding="utf-8") as file:
            data = json.load(file)
        # 将列表转换为字典,key 为 'name', value 为 'value'
        result = {item['name']: item['value'] for item in data}
        # 输出转换后的字典(相当于 Map)
        # print(result)  # 输出: {'人': 1, '飞机': 2}
        classes = result[words]
        print(f"words:{words} =>>> {classes}")
        return classes

# Yolo11Handle.initialize()
# yolo11Handle = Yolo11Handle.get_instance()
# classes = yolo11Handle.match_classes("airplane")
# print(classes)
# # yolo11Handle.detect("/Users/liucheng/rpa/actionImage/code4.png", [], 0.01, True)
# yolo11Handle.detect("/Users/liucheng/rpa/actionImage/6_2.png", [], 0.01, True)

  • identify_code/views.py部分代码:
def identify_code(image_path):
    # 获取需要的 设备类型
    ocrHandle = OcrHandle()
    # 从ocr中获取 图片特征
    ocrHandle.device_read_text(image_path=image_path, cpu=False)
    # 先尝试获取 是否验证成功
    code_success_element = ocrHandle.find_success()
    print(f"code_success_element:{code_success_element}")
    if code_success_element:
        return identify_object_code_json_result(identify=True, handle="success")

    # 尝试获取 是否是开始登录下一步
    next_button_element = ocrHandle.find_next_button()
    print(f"next_button:{next_button_element}")
    if next_button_element:
        next_button_bbox = next_button_element["bbox"]
        return identify_object_code_json_result(identify=False, button=next_button_bbox, handle="init")

    # 根据验证码提示词 获取需要识别的验证码物体类型
    code_element = ocrHandle.find_words()
    print(f"code_element:{code_element}")
    if not code_element:
        # 当前验证码识词获取失败,获取刷新按钮的位置
        refresh_button_bbox = ocrHandle.find_refresh()
        return identify_object_code_json_result(identify=False, button=refresh_button_bbox, handle="refresh_code")

    code_words = code_element["text"]
    # 获取验证按钮位置
    verify_button_element = ocrHandle.find_verify_button()
    print(f"verify_button_element:{verify_button_element}")
    code_button_bbox = verify_button_element["bbox"]
    # 初始化识别模型
    Yolo11Handle.initialize()
    yolo = Yolo11Handle.get_instance()
    # 翻译物体类型为 yolo 类型
    classes = yolo.match_classes(code_words)
    # yolo 模型开始识别
    coordinates, _, _, = yolo.detect(image_path, classes)
    # 识别到的位置
    if coordinates:
        return identify_object_code_json_result(identify=True, codes=coordinates, button=code_button_bbox,
                                                handle="action")
    else:
        # 当前验证码识别失败,获取刷新按钮的位置
        refresh_button_bbox = ocrHandle.find_refresh()
        return identify_object_code_json_result(identify=False, button=refresh_button_bbox, handle="refresh_code")
    # return json.dumps(data)


"""
identify_object_code 方法的返回
:param identify 验证是否成功 true 表示成功
:param codes 识别的验证码位置
:param button 按钮位置
:param handle 识别结果对应的处理 
  init:初始化执行脚本
  action: 验证码定位成功,转换执行脚本
  refresh_code: 刷新验证码
  success: 当前识别成功,开始下一组识别
"""


def identify_object_code_json_result(identify=None, codes=None, button=None, handle=None):
    return JsonResponse({"status": "success", "result": {
        "identify": identify,
        "codes": codes,
        "button": button,
        "handle": handle
    }})

  • 我的客户端是kotlin写的rpa,运行在客户端,作用是在客户端打开浏览器,访问网页,配合上面的python项目进行验证码识别和点击实现。
  • python这部分运行在远程GPU服务器上,通过http实时交互

附上一个使用python(使用webdriver)当作客户端打开浏览器并点击的代码。

import os
import platform
import random
import subprocess
import sys
import threading

import requests
import sqlite3
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import WebDriverException

# 获取数据库路径
if platform.system() == "Windows":
    base_data_dir = os.path.join(os.environ["USERPROFILE"], "MyAppData", "whatsapp_bot")
else:
    base_data_dir = os.path.join(os.environ["HOME"], "MyAppData", "whatsapp_bot")

db_path = os.path.join(base_data_dir, "tasks.db")


def get_pending_tasks():
    """ 从数据库获取未处理的任务 """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute("SELECT id, task_no, phone, message, notice_url FROM tasks WHERE status='pending'")
    tasks = cursor.fetchall()
    conn.close()
    return tasks


def update_task_status(task_id, status):
    """ 更新任务状态 """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute("UPDATE tasks SET status=? WHERE id=?", (status, task_id))
    conn.commit()
    conn.close()


def send_whatsapp_message(driver, phone, message):
    """ 使用 WebDriver 发送 WhatsApp 消息 """
    print(f"📩 正在向 {phone} 发送消息...")
    # 打开聊天窗口
    whatsapp_url = f"https://web.whatsapp.com/send?phone={phone}"
    driver.get(whatsapp_url)
    # 随机等待时间,避免检测
    time.sleep(random.randint(10, 20))
    try:
        # 等待输入框加载
        input_box = WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.XPATH, '//div[@aria-label="输入消息"]'))
        )
        # 输入消息
        input_box.send_keys(message)
        time.sleep(2)
        # 发送消息
        input_box.send_keys(Keys.ENTER)
        print(f"✅ 已成功发送给 {phone}")
        return True
    except Exception as e:
        print(f"❌ {phone}发送消息失败: {e}")
        return False


def notify_result(notice_url, task_no, phone, success):
    """ 发送 POST 请求通知消息状态 """
    payload = {"type": 1, "success": True, "taskNo": task_no, "phone": phone, "exist": success, "send": success,
               "joinGroup": None}
    try:
        response = requests.post(notice_url, json=payload, timeout=5)
        print(f"📢 通知结果: {response.status_code} - {response.text}")
    except Exception as e:
        print(f"❌ 发送通知失败: {e}")


# **获取存储目录**
if platform.system() == "Windows":
    base_data_dir = os.path.join(os.environ["USERPROFILE"], "MyAppData", "whatsapp_bot")
else:
    base_data_dir = os.path.join(os.environ["HOME"], "MyAppData", "whatsapp_bot")

whatsapp_profile_dir = os.path.join(base_data_dir, "whatsapp_profiles")


# **确保程序可以正常退出**
def exit_program():
    print("退出 WebDriver 进程,关闭整个程序...")
    sys.exit(0)  # ✅ 在 Windows 上更稳定
    # os._exit(0)  # 另一种方式,直接杀死进程


def start_webdriver(profile_name):
    """ 启动 WebDriver 并不断检查任务队列 """
    print(f"启动  WebDriver")
    # 获取当前目录,兼容 PyInstaller
    if getattr(sys, 'frozen', False):
        base_dir = sys._MEIPASS  # PyInstaller 解压后的临时目录
    else:
        base_dir = os.path.dirname(os.path.abspath(__file__))  # 普通运行模式
    system_os = platform.system()  # 检测操作系统
    if system_os == "Windows":
        chromedriver_path = os.path.join(base_dir, "chromedriver", "chromedriver.exe")
    elif system_os == "Darwin":  # macOS
        chromedriver_path = os.path.join(base_dir, "chromedriver", "chromedriver")
    else:
        raise Exception("Unsupported OS")
    user_data_dir = os.path.abspath(f"{whatsapp_profile_dir}/{profile_name}")
    # 确保 `whatsapp_profiles` 目录存在
    if not os.path.exists(user_data_dir):
        os.makedirs(user_data_dir)
    options = webdriver.ChromeOptions()
    options.add_argument(f"--user-data-dir={user_data_dir}")  # 每个 WebDriver 用不同的 profile
    options.add_argument("--disable-gpu")  # 关闭 GPU 以防崩溃
    options.add_argument("--no-sandbox")  # 关闭沙盒模式(适用于 Windows)
    options.add_argument("--disable-dev-shm-usage")  # 解决共享内存问题
    options.add_argument("--disable-extensions")  # 关闭扩展,减少干扰
    service = Service(chromedriver_path)
    try:
        driver = webdriver.Chrome(service=service, options=options)
        driver.get("你需要访问的网址")
        # **🚀 启动 WebDriver 监测线程**
        threading.Thread(target=monitor_webdriver, args=(driver,), daemon=True).start()
    except WebDriverException as e:
        print(f"WebDriver 启动失败或崩溃: {e},退出程序...")
        exit_program()

    print(f"请使用 {profile_name} 账号扫码登录 WhatsApp Web")
    # 轮询登录状态
    while True:
        time.sleep(2)
        try:
            # 尝试找到搜索框,判断是否已登录成功
            search_box = driver.find_element(By.XPATH, '//div[@contenteditable="true"]')
            if search_box:
                print("✅ WhatsApp 登录成功!")
                break
        except:
            print("⌛ 请扫码登录 WhatsApp Web...")
    print(f"检查 tasks 队列")
    while True:
        tasks = get_pending_tasks()
        if tasks:
            for task_id, task_no, phone, message, notice_url in tasks:
                success = send_whatsapp_message(driver, phone, message)
                status = "done" if success else "failed"
                update_task_status(task_id, status)

                # 发送通知
                notify_result(notice_url, task_no, phone, success)

        time.sleep(3)  # 每 5 秒检查一次数据库


def monitor_webdriver(driver):
    """ 监测 WebDriver 是否被关闭 """
    while True:
        time.sleep(1)
        try:
            if not driver.window_handles:
                print("WebDriver 窗口已关闭")
                driver.quit()
                exit_program()
        except WebDriverException:
            print("WebDriver 进程异常,关闭程序...")
            driver.quit()
            exit_program()


if __name__ == "__main__":
    profile_name = ""  # 你可以运行多个 WebDriver 进程,指定不同 profile
    start_webdriver(profile_name)

完整效果没有录视频,自己整吧。今天就到这了

个人公众号

0

评论区