【NLP】漏洞类情报信息抽取--数据处理

语言: CN / TW / HK

image.png


持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第2天,点击查看活动详情

前言

昨天的文章中,简要介绍了漏洞类情报采集方法,使用python的requests和Beautifulsoup4对漏洞发布网站进行了数据抓取,鉴于抓取数据对服务器会造成压力,因此不建议对商业服务器进行大规模爬虫,先前文章所抓取的数据已放百度网盘,有兴趣的请自取,本节主要记录的是模型搭建前的数据处理部分,包含BIOES标注,词典生成、标签字典生成等

链接不能用的可以自行点击

http://pan.baidu.com/s/1gVNUMYk6ln9dgziG0bobYA?pwd=ggpk

image.png


BIOES标注方法

上一节中,提到了最终数据是通过Json形式进行存储,格式如下:

{ 'content': "a race condition was found in the way the linux kernel's memory subsystem handled the copy-on-write (cow) breakage of private read-only shared memory mappings. this flaw allows an unprivileged, local user to gain write access to read-only memory mappings, increasing their privileges on the system.", 'company': [ 'ubuntu_18.04' ], 'product': [ 'linux' ], 'version': [ '*' ], 'influence': [ '4.13.0', '16.19' ], 'type': '系统', 'cve_number': 'CVE-2022-2590', 'title': '空标题', 'href': 'http://avd.aliyun.com/detail?id=AVD-2022-2590' }

包含文本 content,产品 product,版本 version,影响版本influence, 漏洞类型 type, CVE编号 cve_number, 标题 title,详情页网址 href,我们希望通过模型训练,获得一个漏洞信息抽取模型,可以抽取一段文本中的产品、影响版本、CVE编号以及厂商等信息,同时需要通过文本分类获得该漏洞属于系统类漏洞还是软件类漏洞,因此需要自然语言处理中的命名实体识别技术与文本分类技术,训练一个序列标注模型和文本二分类模型来完成需求。这也是抓取漏洞库数据的原因,由于抓取后的数据满足生成训练、测试样本的模式,因此通过BIO的方式进行训练数据标注。

image.png

BIO表示Begining、Inside和Outside,e表示end,s表示single,分别表示实体的启示、居中、结束、和单独的实体,而本文仅用到了BOE三个标签,例如B_product、I_product等。代码如下:

``` import codecs import jieba

train_writer = codecs.open("NER_data.txt", 'w', 'UTF-8')

lines = codecs.open("aliyunSpider.txt", 'r', 'UTF-8').readlines() for line in lines: try: line_dict = (eval(line.strip())) content = line_dict["content"] type = line_dict["type"] company = list(set(line_dict["company"])) cve_number = line_dict["cve_number"].lower() product = list(set(line_dict["product"])) version = list(set(line_dict["version"])) influence = list(set(line_dict["influence"])) title = line_dict["title"] tempDict = {} for company_token in company: company_token = str(company_token).lower() if "" in company_token: if company_token.replace("_firmware", "").replace("", " ") in content: tempDict[company_token.replace("firmware", "").replace("", " ")] = "company" else: pass else: if company_token in content: tempDict[company_token] = "company"

    for product_token in product:
        product_token = product_token.lower()
        if "_" in product_token:
            if product_token.replace("_firmware", "").replace("_", " ") in content:
                tempDict[product_token.replace("_firmware", "").replace("_", " ")] = "product"
            else:
                pass
        else:
            if product_token in content:
                tempDict[product_token] = "product"

    for version_token in version + influence:
        version_token = version_token.lower()
        if "_" in version_token:
            if version_token.replace("_firmware", "").replace("_", " ") in content:
                tempDict[version_token.replace("_firmware", "").replace("_", " ")] = "version"
            else:
                pass
        else:
            if version_token in content:
                tempDict[version_token] = "version"
    finalContent = title.lower() + " " + content.lower()
    if cve_number in finalContent:
        tempDict[cve_number] = "cve_number"
    content_list = list(jieba.cut(finalContent))
    label_list = ["O" for _ in range(len(content_list))]
    for item, types in tempDict.items():
        counts = (str(finalContent).count(item))
        if counts > 1:
            item_list = (list(jieba.cut(item)))
            if len(item_list) == 1:
                # 单个词 出现多次情况
                start = 0
                for i in range(counts):
                    if label_list[content_list.index(item, start + 1)] == "O":
                        label_list[content_list.index(item, start + 1)] = "B_{}".format(types)
                        start = content_list.index(item, start + 1)
                    else:
                        break
            else:
                start = 0
                for i in range(counts):
                    flag = True
                    index_num = start
                    while flag:
                        index_num = content_list.index(item_list[0], index_num)
                        if "".join(content_list[index_num:index_num + len(item_list)]) == "".join(item_list):
                            if label_list[index_num] == "O":
                                label_list[index_num] = "B_{}".format(types)
                                for i in range(index_num + 1, index_num + len(item_list) - 1):
                                    label_list[i] = "I_{}".format(types)
                                label_list[index_num + len(item_list) - 1] = "E_{}".format(types)
                                flag = False
                            else:
                                break
                        else:
                            index_num += 1
                    start = index_num + len(item_list) - 1
        else:
            item_list = (list(jieba.cut(item)))
            if len(item_list) == 1:
                if label_list[content_list.index(item)] == "O":
                    label_list[content_list.index(item)] = "B_{}".format(types)
                else:
                    break
            else:
                flag = True
                index_num = 0
                while flag:
                    index_num = content_list.index(item_list[0], index_num)
                    if "".join(content_list[index_num:index_num + len(item_list)]) == "".join(item_list):
                        if label_list[index_num] == "O":
                            label_list[index_num] = "B_{}".format(types)
                            for i in range(index_num + 1, index_num + len(item_list) - 1):
                                label_list[i] = "I_{}".format(types)
                            label_list[index_num + len(item_list) - 1] = "E_{}".format(types)
                            flag = False
                        else:
                            break
                    else:
                        index_num += 1

    for index, item in enumerate(content_list):
        if item == " ":
            del content_list[index]
            del label_list[index]
    if (len(content_list) == len(label_list)):
        for index, token in enumerate(content_list):
            train_writer.write("{}\t{}\n".format(token, label_list[index]))
        train_writer.write("{}\n".format("<sentence split>"))
        train_writer.flush()
except Exception as e:
    pass

```

其中一部分代码用于处理数据、对于标签情况,需要考虑实体词是否出现多次,同时用到了分词库jieba用于分词:

pip install jieba

image.png

生成后的训练样本如下: 安全漏洞 O umbraco B_company 是 O 丹麦 O umbraco B_company 公司 O 的 O 一套 O c O # O 编写 O 的 O 开源 O 的 O 内容 O 管理系统 O ( O cms O ) O 。 O umbraco B_company cms O 8.5 B_version . I_version 3 E_version 版本 O 中 O 存在 O 安全漏洞 O

至此,训练数据通过上述脚本便可完成从原始数据到训练数据的处理,在下一篇文章中将生成模型所需的词典等,以及模型的简单介绍。Thanks♪(・ω・)ノ~