背景

代码审计中有一个步骤就是要查询组件是否存在公开的CVE漏洞,不难但非常耗费时间,无论高中低危,一般java的都是在这个maven仓库中查询,而该站点未提供相对应API出来,且是前后端不分离,所以只能靠页面爬虫,但该网站使用了clouldflare进行防护,直接爬会拦截。

遇到的问题🙋

  • 直接爬取页面会造成clouldflare人机识别,拿了通过验证的CK进行请求,很快就拦截了

  • 想通过模拟真实浏览器,点击后再自动爬去,python的Selenuim环境无论怎么补,也无法点击是否为真人,因为已经被识别浏览器为非正常浏览器

  • clouldflare人机识别JS一直在更新,一直逆向更新不是办法(逆一次太麻烦)

解决方案🥇

  • 使用浏览器debug模式,也就是真实正在使用的浏览器,这样就不会检测是否为非正常浏览器

  • 废话不多说,上教程!!

  • 代码在文末(里面包括了java自动解析POM的实现一整套)

步骤

打开浏览器debug模式

MAC

/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --remote-debugging-port=9222

Windows

找到chrome的安装路径,打开CMD控制台

chrome.exe --remote-debugging-port=9222

提前访问网站手动过人机验证

  • https://mvnrepository.com/

手动过一次后就不会要求要人机验证了

初始化driver对象

获取操作对象后就跟selenium操作方式没啥区别了

def get_chrome_driver():
    # 判断运行环境
    # if sys.platform == 'darwin':
    #     # 使用 subprocess 代替 os.system
    #     command = [
    #         "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
    #         "--remote-debugging-port=9222"
    #     ]
    #     os.spawnvp(os.P_NOWAIT, command[0], command)  # 异步启动 Chrome
    # else:
    #     print("暂不支持非 Mac 环境")
    #     return None

    options = webdriver.ChromeOptions()
    options.debugger_address = "127.0.0.1:9222"

    # 使用 webdriver-manager 自动下载匹配版本的 ChromeDriver
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service,options=options)
    return driver

XPATH调试方法

比如我要的是直接漏洞,就可以在F12控制台,使用以下命令进行调试

$x('xpath')

目标的xpath就如下(调试也是需要时间的)

$x('//th[normalize-space(text())="Vulnerabilities"]/following-sibling::td/span[normalize-space(text())="Direct vulnerabilities:"]/following-sibling::span[not(preceding-sibling::span[normalize-space(text())="Vulnerabilities from dependencies:"])]/a[@class="vuln"]')

食用方法

  • 使用debug模式打开浏览器

  • 手动访问https://mvnrepository.com/过人机验证

  • 将需要代审的项目绝对路径放入代码中,运行

  • 自动解析截图有CVE的中间件,并存入excel

使用效果

完整代码

import os
import sys
import xml.etree.ElementTree as ET
import asyncio
import time

import requests
from lxml import etree
from openpyxl import load_workbook
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager


# 动态获取POM
class POM:
    def extract_pom_info(self,project_path):
        pom_files = self.find_pom_files(project_path)
        pom_info = []
        for pom_file in pom_files:
            dependencies = self.extract_dependencies_from_pom(pom_file)
            for dependency in dependencies:
                dependency["位置"] = pom_file
                pom_info.append(dependency)
        return pom_info

    def find_pom_files(self,project_path):
        pom_files = []
        for root, dirs, files in os.walk(project_path):
            for file in files:
                if file == "pom.xml":
                    pom_files.append(os.path.join(root, file))
        return pom_files

    def resolve_property(self,property_element, properties):
        if property_element is None:
            return None
        property_name = property_element.text.strip()
        if property_name.startswith("${") and property_name.endswith("}"):
            return properties.get(property_name[2:-1])
        return property_name

    def extract_dependencies_from_pom(self,pom_file):
        dependencies = []
        try:
            tree = ET.parse(pom_file)
            root = tree.getroot()

            # Extract properties
            properties_element = root.find("{http://maven.apache.org/POM/4.0.0}properties")
            properties = {}
            if properties_element is not None:
                for prop in properties_element.findall("*"):
                    properties[prop.tag.split("}")[1]] = prop.text.strip()

            # Extract dependencies from dependencyManagement
            dependency_management = root.find("{http://maven.apache.org/POM/4.0.0}dependencyManagement")
            if dependency_management is not None:
                managed_dependencies = dependency_management.findall(
                    "{http://maven.apache.org/POM/4.0.0}dependencies/{http://maven.apache.org/POM/4.0.0}dependency")
                if managed_dependencies:
                    for dep in managed_dependencies:
                        groupId = self.resolve_property(dep.find("{http://maven.apache.org/POM/4.0.0}groupId"), properties)
                        artifactId = self.resolve_property(dep.find("{http://maven.apache.org/POM/4.0.0}artifactId"),
                                                      properties)
                        version = self.resolve_property(dep.find("{http://maven.apache.org/POM/4.0.0}version"), properties)
                        if groupId is not None and artifactId is not None and version is not None:
                            dependencies.append({"groupId": groupId, "artifactId": artifactId, "version": version})

            # Extract dependencies from child modules
            child_dependencies = root.findall(
                ".//{http://maven.apache.org/POM/4.0.0}dependencies/{http://maven.apache.org/POM/4.0.0}dependency")
            if child_dependencies:
                for dependency in child_dependencies:
                    groupId = self.resolve_property(dependency.find("{http://maven.apache.org/POM/4.0.0}groupId"),
                                               properties)
                    artifactId = self.resolve_property(dependency.find("{http://maven.apache.org/POM/4.0.0}artifactId"),
                                                  properties)
                    version = self.resolve_property(dependency.find("{http://maven.apache.org/POM/4.0.0}version"),
                                               properties)
                    if groupId is not None and artifactId is not None and version is not None:
                        dependencies.append({"groupId": groupId, "artifactId": artifactId, "version": version})

        except (ET.ParseError, IOError) as e:
            print(f"Error processing {pom_file}: {e}")

        return dependencies

def get_chrome_driver():
    # 判断运行环境
    # if sys.platform == 'darwin':
    #     # 使用 subprocess 代替 os.system
    #     command = [
    #         "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
    #         "--remote-debugging-port=9222"
    #     ]
    #     os.spawnvp(os.P_NOWAIT, command[0], command)  # 异步启动 Chrome
    # else:
    #     print("暂不支持非 Mac 环境")
    #     return None

    options = webdriver.ChromeOptions()
    options.debugger_address = "127.0.0.1:9222"

    # 使用 webdriver-manager 自动下载匹配版本的 ChromeDriver
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service,options=options)
    # driver.get("https://mvnrepository.com/")
    return driver


class ExeUtils:
    """exe生成相关工具类"""

    @staticmethod
    def get_resources(path):
        """
        获取实际的资源访问路径(本地||临时)
        根据打包生成的临时目录访问资源
        或者直接运行脚本获取本地访问资源
        :param path:
        :return:
        """
        if getattr(sys, 'frozen', False):
            base_path = sys._MEIPASS
        else:
            base_path = os.path.abspath(".")
        return os.path.join(base_path, path)

def checkc_cve(cve):
    cve_info, cve_desc, _ = None, None, None
    headers = {
        'Accept': 'application/json, text/plain, */*',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'Content-Type': 'application/json;charset=UTF-8',
        'Origin': 'https://www.cnnvd.org.cn',
        'Pragma': 'no-cache',
        'Referer': 'https://www.cnnvd.org.cn/home/globalSearch?keyword=CVE-2021-36374',
        'Sec-Fetch-Dest': 'empty',
        'Sec-Fetch-Mode': 'cors',
        'Sec-Fetch-Site': 'same-origin',
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0',
        'dnt': '1',
        'sec-ch-ua': '"Chromium";v="130", "Microsoft Edge";v="130", "Not?A_Brand";v="99"',
        'sec-ch-ua-mobile': '?0',
        'sec-ch-ua-platform': '"macOS"',
        'sec-gpc': '1',
    }

    json_data = {
        'pageIndex': 1,
        'pageSize': 10,
        'keyword': cve,
    }

    response = requests.post('https://www.cnnvd.org.cn/web/homePage/cnnvdVulList', json=json_data,headers=headers,timeout=60).json()
    if response['code'] != 200:
        print(f'{cve},请求失败,响应:{response},重试中。。。')
        cve_level, cve_desc, cve_title = checkc_cve(cve)
        return cve_level, cve_desc, cve_title
    if  len(response['data']['records'])<=0:
        print(f'{cve},未查询到CVE信息')
        return None
    json_data = {
        'id': response['data']['records'][0]['id'],
        'vulType': '0',
        'cnnvdCode': response['data']['records'][0]['cnnvdCode'],
    }

    response = requests.post('https://www.cnnvd.org.cn/web/cnnvdVul/getCnnnvdDetailOnDatasource', json=json_data,headers=headers,timeout=60).json()
    if response['code'] != 200:
        print(f'{cve},请求失败,响应:{response},重试中。。。')
        cve_level, cve_desc, cve_title=checkc_cve(cve)
        return cve_level, cve_desc, cve_title
    # 1:超危,2:高危,3:中危,4:低危
    level_map = {
        1: '超危',
        2: '高危',
        3: '中危',
        4: '低危',
    }

    cve_level = level_map.get(response['data']['cnnvdDetail']['hazardLevel'])

    cve_desc = response['data']['cnnvdDetail']['vulDesc']
    cve_title = response['data']['cnnvdDetail']['vulName']
    return cve_level, cve_desc, cve_title

async def get_cve(item):
    url = f"https://mvnrepository.com/artifact/{item['groupId']}/{item['artifactId']}"
    driver.get(url)

    html = etree.HTML(driver.page_source)

    if "Just a moment..." in driver.page_source:
        print(f"{item['artifactId']},触发了Cloudflare的检测")
        exit()
    if html is None:
        await get_cve(item)
        return
    try:
        vuln = html.xpath('//a[@class="vuln"]')
    except:
        print(html)
        exit()
    if len(vuln) == 0:
        print(f"{item['artifactId']},未找到直接漏洞")
        return

    versions = []

    # XPath 定位版本号的元素
    version_elements = html.xpath('//td/a[@class="vbtn release"]')

    # 遍历版本号元素
    for version_elem in version_elements:
        # 提取版本号
        version_text = version_elem.text.strip()

        # 获取当前版本对应的父行(<tr>)
        parent_row = version_elem.xpath('./ancestor::tr')[0]

        # 检查当前行是否有漏洞信息的 <a> 元素
        vuln_elem = parent_row.xpath('.//a[@class="vuln"]')

        # 如果漏洞信息存在,标记为 True,否则为 False
        has_vulnerability = bool(vuln_elem)

        # 保存版本信息及漏洞状态
        versions.append({
            "version": version_text,
            "has_vulnerability": has_vulnerability
        })
    # 反转版本
    versions.reverse()
    # 当前版本
    found_flag = False
    no_vulnerability_version = "暂无"
    for version in versions:
        if version["version"] == item["version"]:
            found_flag = True
        if found_flag:
            if not version['has_vulnerability']:
                no_vulnerability_version = version['version']
                item['无漏洞版本'] = no_vulnerability_version
                break


    url = f"https://mvnrepository.com/artifact/{item['groupId']}/{item['artifactId']}/{item['version']}"
    driver.get(url)
    html = etree.HTML(driver.page_source)
    try:
        vuln = html.xpath('//th[normalize-space(text())="Vulnerabilities"]/following-sibling::td/span[normalize-space(text())="Direct vulnerabilities:"]/following-sibling::span[not(preceding-sibling::span[normalize-space(text())="Vulnerabilities from dependencies:"])]/a[@class="vuln"]')
    except:
        print(f"{url},未找到CVE编号")
        return
    if len(vuln) == 0:
        print(f"{url},未找到CVE编号")
        return
    else:
        # 截图当前页面()
        driver.save_screenshot(f"{pom_path}/{item['groupId']}_{item['artifactId']}_{item['version']}.png")

    cve = [
        i.text for i in vuln if i.text is not None and "vulnerabilities" not in i.text and "vulnerability" not in i.text
    ]

    for i in range(len(cve)):
        cve_level, cve_desc, cve_title = checkc_cve(cve[i])
        if cve_level is not None:
            cve[i] = cve[i] + "(" + cve_level + ")"
            # 将描述和标题写入Excel
            if cve_desc is not None: item['CVE描述'] = "\n".join(cve_desc) if isinstance(cve_desc, list) else str(
                cve_desc)
            if cve_title is not None: item['CVE名称'] = "\n".join(cve_title) if isinstance(cve_title, list) else str(
                cve_title)

    # item[''] = "\n".join(cve)

    item['CVE'] = "\n".join(cve)
    return item

# 处理pom
def deal_pom(path):
    pom = POM()
    return pom.extract_pom_info(path)

    # print(pom_info)


async def main():
    # 实例化POM
    pom_info = deal_pom(pom_path)
    # for item in pom_info:
    #     # print(item)
    #     get_cve(item)
    tasks = [get_cve(item) for item in pom_info]
    # 等待所有任务完成
    await asyncio.gather(*tasks)


    template_excel_path = ExeUtils.get_resources(r"resources/template.xlsx")

    # 加载 Excel 模板
    workbook = load_workbook(template_excel_path)
    sheet = workbook.active  # 获取活动工作表(模板中的默认表)

    # 获取表头
    headers = [cell.value for cell in sheet[1]]  # 假设表头在第一行

    # 去重处理
    pom_info = [dict(t) for t in {tuple(d.items()) for d in pom_info}]
    # 对齐数据到表头
    aligned_data = []
    for item in pom_info:
        row = [item.get(header, "") for header in headers]  # 按表头顺序排列数据
        aligned_data.append(row)

    # 写入数据从第二行开始
    for row_index, row_data in enumerate(aligned_data, start=2):
        for col_index, value in enumerate(row_data, start=1):
            sheet.cell(row=row_index, column=col_index, value=value)

    # 保存文件
    output_excel_path = f"{pom_path}/cve_result.xlsx"
    workbook.save(output_excel_path)
    print(f"数据已成功写入 {output_excel_path}")




if __name__ == "__main__":
    # 统计耗时
    start_time = time.time()
    driver = get_chrome_driver()
    # java项目路径
    pom_path = '/Code/dd/'  
    # 启动异步主函数
    asyncio.run(main())
    driver.quit()
    print("=====================================")
    print(f"耗时: {time.time() - start_time} 秒")