【NLP】漏洞類情報資訊抽取--資料處理

語言: CN / TW / HK

image.png


持續創作,加速成長!這是我參與「掘金日新計劃 · 10 月更文挑戰」的第2天,點選檢視活動詳情

前言

昨天的文章中,簡要介紹了漏洞類情報採集方法,使用python的requests和Beautifulsoup4對漏洞釋出網站進行了資料抓取,鑑於抓取資料對伺服器會造成壓力,因此不建議對商業伺服器進行大規模爬蟲,先前文章所抓取的資料已放百度網盤,有興趣的請自取,本節主要記錄的是模型搭建前的資料處理部分,包含BIOES標註,詞典生成、標籤字典生成等

連結不能用的可以自行點選

http://pan.baidu.com/s/1gVNUMYk6ln9dgziG0bobYA?pwd=ggpk

image.png


BIOES標註方法

上一節中,提到了最終資料是通過Json形式進行儲存,格式如下:

{ 'content': "a race condition was found in the way the linux kernel's memory subsystem handled the copy-on-write (cow) breakage of private read-only shared memory mappings. this flaw allows an unprivileged, local user to gain write access to read-only memory mappings, increasing their privileges on the system.", 'company': [ 'ubuntu_18.04' ], 'product': [ 'linux' ], 'version': [ '*' ], 'influence': [ '4.13.0', '16.19' ], 'type': '系統', 'cve_number': 'CVE-2022-2590', 'title': '空標題', 'href': 'http://avd.aliyun.com/detail?id=AVD-2022-2590' }

包含文字 content,產品 product,版本 version,影響版本influence, 漏洞型別 type, CVE編號 cve_number, 標題 title,詳情頁網址 href,我們希望通過模型訓練,獲得一個漏洞資訊抽取模型,可以抽取一段文字中的產品、影響版本、CVE編號以及廠商等資訊,同時需要通過文字分類獲得該漏洞屬於系統類漏洞還是軟體類漏洞,因此需要自然語言處理中的命名實體識別技術與文字分類技術,訓練一個序列標註模型和文字二分類模型來完成需求。這也是抓取漏洞庫資料的原因,由於抓取後的資料滿足生成訓練、測試樣本的模式,因此通過BIO的方式進行訓練資料標註。

image.png

BIO表示Begining、Inside和Outside,e表示end,s表示single,分別表示實體的啟示、居中、結束、和單獨的實體,而本文僅用到了BOE三個標籤,例如B_product、I_product等。程式碼如下:

``` import codecs import jieba

train_writer = codecs.open("NER_data.txt", 'w', 'UTF-8')

lines = codecs.open("aliyunSpider.txt", 'r', 'UTF-8').readlines() for line in lines: try: line_dict = (eval(line.strip())) content = line_dict["content"] type = line_dict["type"] company = list(set(line_dict["company"])) cve_number = line_dict["cve_number"].lower() product = list(set(line_dict["product"])) version = list(set(line_dict["version"])) influence = list(set(line_dict["influence"])) title = line_dict["title"] tempDict = {} for company_token in company: company_token = str(company_token).lower() if "" in company_token: if company_token.replace("_firmware", "").replace("", " ") in content: tempDict[company_token.replace("firmware", "").replace("", " ")] = "company" else: pass else: if company_token in content: tempDict[company_token] = "company"

    for product_token in product:
        product_token = product_token.lower()
        if "_" in product_token:
            if product_token.replace("_firmware", "").replace("_", " ") in content:
                tempDict[product_token.replace("_firmware", "").replace("_", " ")] = "product"
            else:
                pass
        else:
            if product_token in content:
                tempDict[product_token] = "product"

    for version_token in version + influence:
        version_token = version_token.lower()
        if "_" in version_token:
            if version_token.replace("_firmware", "").replace("_", " ") in content:
                tempDict[version_token.replace("_firmware", "").replace("_", " ")] = "version"
            else:
                pass
        else:
            if version_token in content:
                tempDict[version_token] = "version"
    finalContent = title.lower() + " " + content.lower()
    if cve_number in finalContent:
        tempDict[cve_number] = "cve_number"
    content_list = list(jieba.cut(finalContent))
    label_list = ["O" for _ in range(len(content_list))]
    for item, types in tempDict.items():
        counts = (str(finalContent).count(item))
        if counts > 1:
            item_list = (list(jieba.cut(item)))
            if len(item_list) == 1:
                # 單個詞 出現多次情況
                start = 0
                for i in range(counts):
                    if label_list[content_list.index(item, start + 1)] == "O":
                        label_list[content_list.index(item, start + 1)] = "B_{}".format(types)
                        start = content_list.index(item, start + 1)
                    else:
                        break
            else:
                start = 0
                for i in range(counts):
                    flag = True
                    index_num = start
                    while flag:
                        index_num = content_list.index(item_list[0], index_num)
                        if "".join(content_list[index_num:index_num + len(item_list)]) == "".join(item_list):
                            if label_list[index_num] == "O":
                                label_list[index_num] = "B_{}".format(types)
                                for i in range(index_num + 1, index_num + len(item_list) - 1):
                                    label_list[i] = "I_{}".format(types)
                                label_list[index_num + len(item_list) - 1] = "E_{}".format(types)
                                flag = False
                            else:
                                break
                        else:
                            index_num += 1
                    start = index_num + len(item_list) - 1
        else:
            item_list = (list(jieba.cut(item)))
            if len(item_list) == 1:
                if label_list[content_list.index(item)] == "O":
                    label_list[content_list.index(item)] = "B_{}".format(types)
                else:
                    break
            else:
                flag = True
                index_num = 0
                while flag:
                    index_num = content_list.index(item_list[0], index_num)
                    if "".join(content_list[index_num:index_num + len(item_list)]) == "".join(item_list):
                        if label_list[index_num] == "O":
                            label_list[index_num] = "B_{}".format(types)
                            for i in range(index_num + 1, index_num + len(item_list) - 1):
                                label_list[i] = "I_{}".format(types)
                            label_list[index_num + len(item_list) - 1] = "E_{}".format(types)
                            flag = False
                        else:
                            break
                    else:
                        index_num += 1

    for index, item in enumerate(content_list):
        if item == " ":
            del content_list[index]
            del label_list[index]
    if (len(content_list) == len(label_list)):
        for index, token in enumerate(content_list):
            train_writer.write("{}\t{}\n".format(token, label_list[index]))
        train_writer.write("{}\n".format("<sentence split>"))
        train_writer.flush()
except Exception as e:
    pass

```

其中一部分程式碼用於處理資料、對於標籤情況,需要考慮實體詞是否出現多次,同時用到了分詞庫jieba用於分詞:

pip install jieba

image.png

生成後的訓練樣本如下: 安全漏洞 O umbraco B_company 是 O 丹麥 O umbraco B_company 公司 O 的 O 一套 O c O # O 編寫 O 的 O 開源 O 的 O 內容 O 管理系統 O ( O cms O ) O 。 O umbraco B_company cms O 8.5 B_version . I_version 3 E_version 版本 O 中 O 存在 O 安全漏洞 O

至此,訓練資料通過上述指令碼便可完成從原始資料到訓練資料的處理,在下一篇文章中將生成模型所需的詞典等,以及模型的簡單介紹。Thanks♪(・ω・)ノ~