如何构建一个自己的 NVD 漏洞数据库?

因为毕设需要,笔者必须构建一个漏洞数据库,其中包含 CVE ID、CVSS 评分、攻击方式等信息。经过前期的一些尝试后,发现现在网上找到的主流方式均不再适用,因此只好自己写了个脚本。整个过程用到的工具/组件有:Python3,Pycharm,PostgreSQL,Docker。

前期尝试

因为项目中用到了攻击图生成工具 MulVAL,而 MulVAL 中也提供了漏洞数据库的构建脚本 nvd_sync.sh,简单分析代码后可以知道它是利用 NVD 提供的 XML 文件来构建数据库的。可是我配置好环境后运行该脚本,无法使用。简单看一下报错,是下载失败了。脚本中下载的语句如下:

while [ $i -le $year ]; do
  wget http://nvd.nist.gov/download/nvdcve-$i.xml.gz
  gunzip -f nvdcve-$i.xml.gz
  i=`expr $i + 1`
done

我试着手动从该链接下载,但是失败了,前往官网查看,发现已经 404 了。但是天无绝人之路,虽然该文件无法下载,但是我看到了另外一个下载页面 NVD - Data Feeds (nist.gov)。在该页面手动下载 XML 文件,看到下载链接形如 https://nvd.nist.gov/feeds/xml/cve/trans/es/nvdcve-2021trans.xml.gz​。抱着侥幸的心理,我修改了代码里的下载链接,祈祷只是下载链接变了而文件内容没有变。结果当然是……失败了!并不是下载失败,而是数据解析失败,数据库中有了相关字段,但值都是空的。

还在网上看到了一些使用爬虫的方式,但我觉得爬虫太慢了,而且也不知道 NVD 有没有反爬虫机制,不如直接下载文件进行解析、填充。由于对 XML 文件格式不太熟悉,所以就选择了 NVD 提供的另外一种文件类型:JSON。

数据库的选择

PostgreSQL 在数据量很大时性能比较好,因此就选择了 PostgreSQL。为了保证环境快速部署,直接使用了 Docker。关于 Docker 的说明与安装在这里不再介绍。 PostgreSQL 官方提供了 Docker,只需要 pull 下来启动即可,我使用的启动命令是:

docker run --name nvd-db -e POSTGRES_PASSWORD=自定义密码 \ 
-e POSTGRES_USER=nvd \ 
-e POSTGRES_DB=nvd \ 
-e TZ='Asia/Shanghai' \ 
-e ALLOW_IP_RANGE=0.0.0.0/0 \ 
-v /root/Code/Auto-PT-Auxiliary/db:/var/lib/postgresql/data \ 
-p 54321:5432 \ 
--restart always \ 
-d postgres

分析 JSON 文件结构

image

在刚才提到的网站下载一个 json 文件,打开后分析一下里面的结构,然后使用 python 的 json 库从中提取到需要的信息,详细含义如下:

import json
with open("../config/record.json",'r') as load_f:
    load_dict = json.load(load_f)

# 获取第1个CVE对象
load_dict['CVE_Items'][1] 
# 获取CVE ID
load_dict['CVE_Items'][1024]['cve']['CVE_data_meta']['ID']
# 获取该cve的CVSS3 base score
load_dict['CVE_Items'][1024]['impact']['baseMetricV3']['cvssV3']['baseScore']
# 获取该cve的CVSS3 exploitability score
load_dict['CVE_Items'][1024]['impact']['baseMetricV3']['exploitabilityScore']
# 获取该cve的CVSS3 impact score
load_dict['CVE_Items'][1024]['impact']['baseMetricV3']['impactScore']
# 获取CVSS3的攻击向量:network, local, adjacent network, physical
load_dict['CVE_Items'][1024]['impact']['baseMetricV3']['cvssV3']['attackVector']
# 获取漏洞等级:High, Medium, Low
load_dict['CVE_Items'][1024]['impact']['baseMetricV3']['cvssV3']['baseSeverity']
# 获取该cve的CVSS2 base score
load_dict['CVE_Items'][1024]['impact']['baseMetricV2']['cvssV3']['baseScore']
# 获取该cve的CVSS2 exploitability score
load_dict['CVE_Items'][1024]['impact']['baseMetricV2']['exploitabilityScore']
# 获取该cve的CVSS2 impact score
load_dict['CVE_Items'][1024]['impact']['baseMetricV2']['impactScore']

有的 CVE ID 没有被使用,但是在 json 文件中仍然存在,这些在入库时需要做筛选。目前发现筛选出来的标准之一是 impact ​字段如果为空,那么说明该 ID 没有被使用。可以用类似于 if load_dict['CVE_Items'][1024]['impact'] != {}​ 这样的 if 语句进行一个简单的过滤。不过需要注意,nvd 的漏洞数据在 2015 年及之前都只有 CVSS2 评分,没有 CVSS3,直到 2016 年才出现了 CVSS3。而最新(2021 年 12 月去看的)的一些漏洞,又只有 CVSS3,没有 CVSS2。

将数据导入数据库

直接上代码了

import json
import os
import psycopg2


class DataBaseConnector:
    def __init__(self, _db_config: dict):
        self.db_host = _db_config['host']  # 数据库服务器
        self.db_port = _db_config['port']  # 数据库端口,MySQL默认3306,PostgreSQL默认5432
        self.db_user = _db_config['user']  # 数据库用户
        self.db_password = _db_config['password']  # 数据库密码
        self.db_name = _db_config['db_name']  # 数据库名
        self.db_table = _db_config['db_table']  # 数据表名

        self.conn = self.connect_db()  # 数据库连接对象

        self._current_path = os.path.dirname(__file__)  # 当前文件路径
        self.start_year = int(_db_config['start_year'])  # NVD开始年份
        self.end_year = int(_db_config['end_year'])  # NVD结束年份

    def connect_db(self):
        """
        连接数据库,返回conn

        :return: 数据库连接对象
        """
        conn = psycopg2.connect(
            host=self.db_host,
            port=self.db_port,
            user=self.db_user,
            password=self.db_password,
            database=self.db_name
        )
        return conn

    def fill_db(self):
        cursor = self.conn.cursor()
        print("正在清理过时的数据库......")
        cursor.execute("drop table if exists nvd CASCADE")
        cursor.execute("drop type if exists vector")
        cursor.execute("drop type if exists level")
        # 对于攻击向量的枚举类型: ('NETWORK', 'ADJACENT_NETWORK', 'LOCAL', 'PHYSICAL')
        enum_vector_sql = "create type vector as enum ('NETWORK', 'ADJACENT_NETWORK', 'LOCAL', 'PHYSICAL')"
        # 对于漏洞等级的枚举类型: ('CRITICAL', 'HIGH', 'MEDIUM', 'LOW')
        enum_level_sql = "create type level as enum ('CRITICAL', 'HIGH', 'MEDIUM', 'LOW')"
        # 创建nvd数据表的SQL语句
        create_table_sql = f"create table {self.db_table} (cve_id varchar(20) not null, \
            attack_vector vector not null, vuln_level level not null, base_score decimal not null, \
            exploitability_socre decimal not null, impact_score decimal not null, primary key(cve_id))"
        cursor.execute(enum_vector_sql)  # 创建vector
        cursor.execute(enum_level_sql)  # 创建level
        cursor.execute(create_table_sql)  # 创建nvd数据表

        # 向nvd数据表中插入数据
        for year in range(self.start_year, self.end_year + 1):
            with open(f'{self._current_path}/nvd_json/nvdcve-1.1-{year}.json', 'r') as f:
                print(f"正在导入{year}年的CVE数据......", end="")
                load_dict = json.load(f)
                for cve in load_dict['CVE_Items']:
                    """
                    因为json文件中有的项可能只是占位项,并没有真的漏洞,
                    因此首先要判断是不是占位项,如果是,跳过当前项。
                    判断的标准很简单,看"impact"字段是否为空。
                    """
                    if cve['impact'] == {}:
                        continue
                    cve_id = cve['cve']['CVE_data_meta']['ID']  # CVE ID
                    """
                    2015年之前的漏洞都只有CVSS2评分,因此只能使用CVSS2;
                    而2021年最新的几个漏洞只有CVSS3,没有CVSS2,因此只能使用CVSS3;
                    中间的年份既有CVSS2,又有CVSS3。因此,无法统一标准,只能妥协:
                    如果存在CVSS3,优先使用CVSS3;否则再使用CVSS2。
                    """
                    if 'baseMetricV3' in cve['impact']:  # CVSS3
                        attack_vector = cve['impact']['baseMetricV3']['cvssV3']['attackVector']  # 攻击向量
                        vuln_level = cve['impact']['baseMetricV3']['cvssV3']['baseSeverity']  # 漏洞等级
                        base_score = cve['impact']['baseMetricV3']['cvssV3']['baseScore']  # base score
                        exploitability_score = cve['impact']['baseMetricV3'][
                            'exploitabilityScore']  # exploitability score
                        impact_score = cve['impact']['baseMetricV3']['impactScore']  # impact score
                    else:  # CVSS2
                        attack_vector = cve['impact']['baseMetricV2']['cvssV2']['accessVector']
                        vuln_level = cve['impact']['baseMetricV2']['severity']
                        base_score = cve['impact']['baseMetricV2']['cvssV2']['baseScore']
                        exploitability_score = cve['impact']['baseMetricV2']['exploitabilityScore']
                        impact_score = cve['impact']['baseMetricV2']['impactScore']
                    # 向数据库中插入数据的SQL语句
                    insert_sql = f"insert into {self.db_table} values ('{cve_id}','{attack_vector}'," \
                                 f"'{vuln_level}',{base_score:.2f},{exploitability_score:.2f},{impact_score:.2f})"
                    cursor.execute(insert_sql)  # 插入本条CVE数据
            print("导入成功!")
        self.conn.commit()
        print(f"\n[SUCCESS]成功向数据库中导入{self.start_year}年到{self.end_year}年的所有漏洞数据!")

    def download_nvd_json(self):
        """
        从NVD下载指定年份区间内的json文件并解压
        """
        os.system(f"rm -rf {self._current_path}/nvd_json/*")
        for year in range(self.start_year, self.end_year + 1):
            # 下载文件到 /db/nvd_json/目录下
            os.system(
                f"wget -P {self._current_path}/nvd_json/ \
                https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{year}.json.gz"
            )
            os.system(f"gunzip {self._current_path}/nvd_json/nvdcve-1.1-{year}.json.gz")

    def close(self):
        """
        关闭数据库连接
        """
        self.conn.colse()


if __name__ == "__main__":
    db_config = {'host': '172.17.0.3', 'port': '5432', 'user': 'nvd', 'password': 'password', 'db_name': 'nvd',
                 'db_table': 'nvd', 'start_year': 2002, 'end_year': 2021}
    db = DataBaseConnector(db_config)
    # db.download_nvd_json()
    db.fill_db()

最终完成了所有数据的插入,共有 165947 条 CVE 数据,完成这么多数据的解析和插入总共用时 46.79541 秒,应该是个还可以接受的时间吧,毕竟没有做什么优化。

image

标签: nvd, 脚本

添加新评论