Python实现自动查询maven仓库中依赖库的CVE
背景
代码审计中有一个步骤就是要查询组件是否存在公开的CVE漏洞,不难但非常耗费时间,无论高中低危,一般java的都是在这个maven仓库中查询,而该站点未提供相对应API出来,且是前后端不分离,所以只能靠页面爬虫,但该网站使用了clouldflare进行防护,直接爬会拦截。
遇到的问题🙋
直接爬取页面会造成clouldflare人机识别,拿了通过验证的CK进行请求,很快就拦截了
想通过模拟真实浏览器,点击后再自动爬去,python的Selenuim环境无论怎么补,也无法点击是否为真人,因为已经被识别浏览器为非正常浏览器
clouldflare人机识别JS一直在更新,一直逆向更新不是办法(逆一次太麻烦)
解决方案🥇
使用浏览器debug模式,也就是真实正在使用的浏览器,这样就不会检测是否为非正常浏览器
废话不多说,上教程!!
代码在文末(里面包括了java自动解析POM的实现一整套)
步骤
打开浏览器debug模式
MAC
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --remote-debugging-port=9222
Windows
找到chrome的安装路径,打开CMD控制台
chrome.exe --remote-debugging-port=9222
提前访问网站手动过人机验证
https://mvnrepository.com/
手动过一次后就不会要求要人机验证了
初始化driver对象
获取操作对象后就跟selenium操作方式没啥区别了
def get_chrome_driver():
# 判断运行环境
# if sys.platform == 'darwin':
# # 使用 subprocess 代替 os.system
# command = [
# "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
# "--remote-debugging-port=9222"
# ]
# os.spawnvp(os.P_NOWAIT, command[0], command) # 异步启动 Chrome
# else:
# print("暂不支持非 Mac 环境")
# return None
options = webdriver.ChromeOptions()
options.debugger_address = "127.0.0.1:9222"
# 使用 webdriver-manager 自动下载匹配版本的 ChromeDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service,options=options)
return driver
XPATH调试方法
比如我要的是直接漏洞,就可以在F12控制台,使用以下命令进行调试
$x('xpath')
目标的xpath就如下(调试也是需要时间的)
$x('//th[normalize-space(text())="Vulnerabilities"]/following-sibling::td/span[normalize-space(text())="Direct vulnerabilities:"]/following-sibling::span[not(preceding-sibling::span[normalize-space(text())="Vulnerabilities from dependencies:"])]/a[@class="vuln"]')
食用方法
使用debug模式打开浏览器
手动访问https://mvnrepository.com/过人机验证
将需要代审的项目绝对路径放入代码中,运行
自动解析截图有CVE的中间件,并存入excel
使用效果
完整代码
import os
import sys
import xml.etree.ElementTree as ET
import asyncio
import time
import requests
from lxml import etree
from openpyxl import load_workbook
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# 动态获取POM
class POM:
def extract_pom_info(self,project_path):
pom_files = self.find_pom_files(project_path)
pom_info = []
for pom_file in pom_files:
dependencies = self.extract_dependencies_from_pom(pom_file)
for dependency in dependencies:
dependency["位置"] = pom_file
pom_info.append(dependency)
return pom_info
def find_pom_files(self,project_path):
pom_files = []
for root, dirs, files in os.walk(project_path):
for file in files:
if file == "pom.xml":
pom_files.append(os.path.join(root, file))
return pom_files
def resolve_property(self,property_element, properties):
if property_element is None:
return None
property_name = property_element.text.strip()
if property_name.startswith("${") and property_name.endswith("}"):
return properties.get(property_name[2:-1])
return property_name
def extract_dependencies_from_pom(self,pom_file):
dependencies = []
try:
tree = ET.parse(pom_file)
root = tree.getroot()
# Extract properties
properties_element = root.find("{http://maven.apache.org/POM/4.0.0}properties")
properties = {}
if properties_element is not None:
for prop in properties_element.findall("*"):
properties[prop.tag.split("}")[1]] = prop.text.strip()
# Extract dependencies from dependencyManagement
dependency_management = root.find("{http://maven.apache.org/POM/4.0.0}dependencyManagement")
if dependency_management is not None:
managed_dependencies = dependency_management.findall(
"{http://maven.apache.org/POM/4.0.0}dependencies/{http://maven.apache.org/POM/4.0.0}dependency")
if managed_dependencies:
for dep in managed_dependencies:
groupId = self.resolve_property(dep.find("{http://maven.apache.org/POM/4.0.0}groupId"), properties)
artifactId = self.resolve_property(dep.find("{http://maven.apache.org/POM/4.0.0}artifactId"),
properties)
version = self.resolve_property(dep.find("{http://maven.apache.org/POM/4.0.0}version"), properties)
if groupId is not None and artifactId is not None and version is not None:
dependencies.append({"groupId": groupId, "artifactId": artifactId, "version": version})
# Extract dependencies from child modules
child_dependencies = root.findall(
".//{http://maven.apache.org/POM/4.0.0}dependencies/{http://maven.apache.org/POM/4.0.0}dependency")
if child_dependencies:
for dependency in child_dependencies:
groupId = self.resolve_property(dependency.find("{http://maven.apache.org/POM/4.0.0}groupId"),
properties)
artifactId = self.resolve_property(dependency.find("{http://maven.apache.org/POM/4.0.0}artifactId"),
properties)
version = self.resolve_property(dependency.find("{http://maven.apache.org/POM/4.0.0}version"),
properties)
if groupId is not None and artifactId is not None and version is not None:
dependencies.append({"groupId": groupId, "artifactId": artifactId, "version": version})
except (ET.ParseError, IOError) as e:
print(f"Error processing {pom_file}: {e}")
return dependencies
def get_chrome_driver():
# 判断运行环境
# if sys.platform == 'darwin':
# # 使用 subprocess 代替 os.system
# command = [
# "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
# "--remote-debugging-port=9222"
# ]
# os.spawnvp(os.P_NOWAIT, command[0], command) # 异步启动 Chrome
# else:
# print("暂不支持非 Mac 环境")
# return None
options = webdriver.ChromeOptions()
options.debugger_address = "127.0.0.1:9222"
# 使用 webdriver-manager 自动下载匹配版本的 ChromeDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service,options=options)
# driver.get("https://mvnrepository.com/")
return driver
class ExeUtils:
"""exe生成相关工具类"""
@staticmethod
def get_resources(path):
"""
获取实际的资源访问路径(本地||临时)
根据打包生成的临时目录访问资源
或者直接运行脚本获取本地访问资源
:param path:
:return:
"""
if getattr(sys, 'frozen', False):
base_path = sys._MEIPASS
else:
base_path = os.path.abspath(".")
return os.path.join(base_path, path)
def checkc_cve(cve):
cve_info, cve_desc, _ = None, None, None
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/json;charset=UTF-8',
'Origin': 'https://www.cnnvd.org.cn',
'Pragma': 'no-cache',
'Referer': 'https://www.cnnvd.org.cn/home/globalSearch?keyword=CVE-2021-36374',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0',
'dnt': '1',
'sec-ch-ua': '"Chromium";v="130", "Microsoft Edge";v="130", "Not?A_Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-gpc': '1',
}
json_data = {
'pageIndex': 1,
'pageSize': 10,
'keyword': cve,
}
response = requests.post('https://www.cnnvd.org.cn/web/homePage/cnnvdVulList', json=json_data,headers=headers,timeout=60).json()
if response['code'] != 200:
print(f'{cve},请求失败,响应:{response},重试中。。。')
cve_level, cve_desc, cve_title = checkc_cve(cve)
return cve_level, cve_desc, cve_title
if len(response['data']['records'])<=0:
print(f'{cve},未查询到CVE信息')
return None
json_data = {
'id': response['data']['records'][0]['id'],
'vulType': '0',
'cnnvdCode': response['data']['records'][0]['cnnvdCode'],
}
response = requests.post('https://www.cnnvd.org.cn/web/cnnvdVul/getCnnnvdDetailOnDatasource', json=json_data,headers=headers,timeout=60).json()
if response['code'] != 200:
print(f'{cve},请求失败,响应:{response},重试中。。。')
cve_level, cve_desc, cve_title=checkc_cve(cve)
return cve_level, cve_desc, cve_title
# 1:超危,2:高危,3:中危,4:低危
level_map = {
1: '超危',
2: '高危',
3: '中危',
4: '低危',
}
cve_level = level_map.get(response['data']['cnnvdDetail']['hazardLevel'])
cve_desc = response['data']['cnnvdDetail']['vulDesc']
cve_title = response['data']['cnnvdDetail']['vulName']
return cve_level, cve_desc, cve_title
async def get_cve(item):
url = f"https://mvnrepository.com/artifact/{item['groupId']}/{item['artifactId']}"
driver.get(url)
html = etree.HTML(driver.page_source)
if "Just a moment..." in driver.page_source:
print(f"{item['artifactId']},触发了Cloudflare的检测")
exit()
if html is None:
await get_cve(item)
return
try:
vuln = html.xpath('//a[@class="vuln"]')
except:
print(html)
exit()
if len(vuln) == 0:
print(f"{item['artifactId']},未找到直接漏洞")
return
versions = []
# XPath 定位版本号的元素
version_elements = html.xpath('//td/a[@class="vbtn release"]')
# 遍历版本号元素
for version_elem in version_elements:
# 提取版本号
version_text = version_elem.text.strip()
# 获取当前版本对应的父行(<tr>)
parent_row = version_elem.xpath('./ancestor::tr')[0]
# 检查当前行是否有漏洞信息的 <a> 元素
vuln_elem = parent_row.xpath('.//a[@class="vuln"]')
# 如果漏洞信息存在,标记为 True,否则为 False
has_vulnerability = bool(vuln_elem)
# 保存版本信息及漏洞状态
versions.append({
"version": version_text,
"has_vulnerability": has_vulnerability
})
# 反转版本
versions.reverse()
# 当前版本
found_flag = False
no_vulnerability_version = "暂无"
for version in versions:
if version["version"] == item["version"]:
found_flag = True
if found_flag:
if not version['has_vulnerability']:
no_vulnerability_version = version['version']
item['无漏洞版本'] = no_vulnerability_version
break
url = f"https://mvnrepository.com/artifact/{item['groupId']}/{item['artifactId']}/{item['version']}"
driver.get(url)
html = etree.HTML(driver.page_source)
try:
vuln = html.xpath('//th[normalize-space(text())="Vulnerabilities"]/following-sibling::td/span[normalize-space(text())="Direct vulnerabilities:"]/following-sibling::span[not(preceding-sibling::span[normalize-space(text())="Vulnerabilities from dependencies:"])]/a[@class="vuln"]')
except:
print(f"{url},未找到CVE编号")
return
if len(vuln) == 0:
print(f"{url},未找到CVE编号")
return
else:
# 截图当前页面()
driver.save_screenshot(f"{pom_path}/{item['groupId']}_{item['artifactId']}_{item['version']}.png")
cve = [
i.text for i in vuln if i.text is not None and "vulnerabilities" not in i.text and "vulnerability" not in i.text
]
for i in range(len(cve)):
cve_level, cve_desc, cve_title = checkc_cve(cve[i])
if cve_level is not None:
cve[i] = cve[i] + "(" + cve_level + ")"
# 将描述和标题写入Excel
if cve_desc is not None: item['CVE描述'] = "\n".join(cve_desc) if isinstance(cve_desc, list) else str(
cve_desc)
if cve_title is not None: item['CVE名称'] = "\n".join(cve_title) if isinstance(cve_title, list) else str(
cve_title)
# item[''] = "\n".join(cve)
item['CVE'] = "\n".join(cve)
return item
# 处理pom
def deal_pom(path):
pom = POM()
return pom.extract_pom_info(path)
# print(pom_info)
async def main():
# 实例化POM
pom_info = deal_pom(pom_path)
# for item in pom_info:
# # print(item)
# get_cve(item)
tasks = [get_cve(item) for item in pom_info]
# 等待所有任务完成
await asyncio.gather(*tasks)
template_excel_path = ExeUtils.get_resources(r"resources/template.xlsx")
# 加载 Excel 模板
workbook = load_workbook(template_excel_path)
sheet = workbook.active # 获取活动工作表(模板中的默认表)
# 获取表头
headers = [cell.value for cell in sheet[1]] # 假设表头在第一行
# 去重处理
pom_info = [dict(t) for t in {tuple(d.items()) for d in pom_info}]
# 对齐数据到表头
aligned_data = []
for item in pom_info:
row = [item.get(header, "") for header in headers] # 按表头顺序排列数据
aligned_data.append(row)
# 写入数据从第二行开始
for row_index, row_data in enumerate(aligned_data, start=2):
for col_index, value in enumerate(row_data, start=1):
sheet.cell(row=row_index, column=col_index, value=value)
# 保存文件
output_excel_path = f"{pom_path}/cve_result.xlsx"
workbook.save(output_excel_path)
print(f"数据已成功写入 {output_excel_path}")
if __name__ == "__main__":
# 统计耗时
start_time = time.time()
driver = get_chrome_driver()
# java项目路径
pom_path = '/Code/dd/'
# 启动异步主函数
asyncio.run(main())
driver.quit()
print("=====================================")
print(f"耗时: {time.time() - start_time} 秒")
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 途深
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果