【NLP】漏洞类情报信息抽取-- 模型分析与训练数据生成
theme: smartblue
持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第3天,点击查看活动详情
前言
在前两天的文章中,记录了漏洞类情报的采集和处理流程,经过处理后,将我们采集到的原始数据转换为带有BIOES标签的序列标注数据,今天的文章中,主要介绍模型所需如token词典、label词典构建,以及模型结构介绍以及使用tensorflow1.4搭建模型。
命名实体识别
命名实体识(NER)别作为自然语言处理中序列标注的任务形式之一,近些年也受到广泛关注,是指识别文本中具有特定意义的实体,主要包括人名、地名、机构名、时间等。是信息提取、问答系统、句法分析、词性标注、知识图谱等任务的重要步骤或组成部分之一。同时任务应用领域也逐渐广泛,例如科大讯飞AI算法大赛中的农业类别的实体抽取任务,以及阿里天池赛道中的药物说明书识别。
通常命名实体识别的方法分为有监督、无监督和基于规则,有监督通常表现为序列标注任务,即对一句话中的每一个字符进行标签预测 ,从而预测实体的起始和结束位置,这也是本文所使用的方法。同时,也有学者通过无监督的聚类方法,根据不同实体的特征差异,将具有相同特征的实体集合通过无监督方式归为不同簇,预测时选取最近的簇判断实体标签。而基于规则的方法则是通过句法分析、语法分析指定相应的规则模板,从而匹配出合适的实体类别,当实体类别较少且语料较为单一的时候能够达到很好的效果。
本文使用最基本的Bilstm+CRF模式训练实体抽取模型,模型的输入(Input)为一句漏洞情报,输出(Output)为每一个token对应的标签,再经过标签词典的转义和处理,便能够匹配出文本包含有哪些厂商、版本、CVE编号、以及产品名称等。模型结构图如下:
模型准备
在构建模型前,需要生成标签词典与token词典,目的在于将文本转换为embedding矩阵的索引,同时需要增加[unk]字符以及[pad]字符。同时标签也需要转为label_index,用于计算模型loss时,预测标签与真实标签额度偏差,代码如下:
``` import codecs import pickle import random from tqdm import tqdm
def write_pickle(fileName, obj): f = open(fileName, 'wb') pickle.dump(obj, f) f.close()
def load_pickle(fileName): f = open(fileName, 'rb') d = pickle.load(f) f.close() return d
def make_dict():
print("正在生成词典")
vocabulary = {}
lines = codecs.open("NER_data.txt",'r','UTF-8').readlines()
for line in tqdm(lines):
line = (line.strip())
if line != "
def make_label_dic():
vocabulary = {}
lines = codecs.open("NER_data.txt",'r','UTF-8').readlines()
for line in tqdm(lines):
line = (line.strip())
if line != "
def make_dataset():
lines = codecs.open("NER_data.txt", 'r', 'UTF-8').readlines()
total = []
temp = []
for line in lines:
if len(line.strip().split("\t")) > 1:
temp.append(line)
if line.strip() == "
print(len(total))
random.shuffle(total)
train = total[:int(len(total) * 0.9)]
test = total[int(len(total) * 0.9):]
print(len(train))
print(len(test))
writer = codecs.open("train.txt",'w',"UTF-8")
for item in train:
for word in item:
writer.write(word)
writer.write("\n")
writer.close()
writer = codecs.open("test.txt",'w',"UTF-8")
for item in test:
for word in item:
writer.write(word)
writer.write("\n")
writer.close()
if name == 'main': make_dict() make_label_dic() make_dataset() load_pickle("label_dic.pkl") ```
其中NER_data.txt为抓取数据处理后的结果,格式如下:
``` 安全漏洞 O umbraco B_company 是 O 丹麦 O umbraco B_company 公司 O 的 O 一套 O c O # O 编写 O 的 O 开源 O 的 O 内容 O 管理系统 O ( O cms O ) O 。 O umbraco B_company cms O 8.5 B_version . I_version 3 E_version 版本 O 中 O 存在 O 安全漏洞 O 。 O 攻击者 O 可 O 借助 O install O package O 功能 O 利用 O 该 O 漏洞 O 上传 O 文件 O , O 执行 O 代码 O 。 O
```
上述处理脚本中,pickle模块为python特有的序列化或反序列化模块,其生成的序列化文件不能被其他语言读写,基本思想是将python对象直接存入二进制文件里,无需将其转化为字符串等格式,当需要使用时,直接加载序列化文件,便能够得到python对象,该模块能够高效存储python的负载数据格式的数据,缺点是不能够被其他语言读取。
通过上述脚本生成的文件如下:
- 其中包含训练集
- train.txt
- 验证集
- test.txt
- 词典pickle形式
- word_dic.pkl
- 标签词典pickle形式
- label_dic.pkl
模型构建
模型使用最基本的Bilsit+CRF结构,使用tensorflow1.4完成,代码如下:
``` import numpy as np import os, time, sys import tensorflow as tf from tensorflow.contrib.rnn import LSTMCell from tensorflow.contrib.crf import crf_log_likelihood from tensorflow.contrib.crf import viterbi_decode
from data_pro import pad_sequences, batch_yield
from data_helper import batch_yield ,pad_sequences from utils import get_logger from eval import conlleval
class BiLSTM_CRF(object): def init(self, args, embeddings, tag2label, vocab, paths, config): # 模型初始化 self.batch_size = args.batch_size self.epoch_num = args.epoch self.hidden_dim = args.hidden_dim self.embeddings = embeddings self.CRF = args.CRF self.update_embedding = args.update_embedding self.dropout_keep_prob = args.dropout self.optimizer = args.optimizer self.lr = args.lr self.clip_grad = args.clip self.tag2label = tag2label self.num_tags = len(tag2label) self.vocab = vocab self.shuffle = args.shuffle self.model_path = paths['model_path'] self.summary_path = paths['summary_path'] self.logger = get_logger(paths['log_path']) self.result_path = paths['result_path'] self.config = config
def build_graph(self):
# 模型构建
self.add_placeholders() # 占位符初始化
self.lookup_layer_op() # lookup_layer初始化 用于word_id -> embediing
self.biLSTM_layer_op() # biLSTM_layer初始化 用于sentence encoder
self.softmax_pred_op() # softmax_pred初始化 用于CRF
self.loss_op() # loss初始化
self.trainstep_op() # train函数初始化
self.init_op() # 模型初始化
def add_placeholders(self):
self.word_ids = tf.placeholder(tf.int32, shape=[None, None], name="word_ids")
self.labels = tf.placeholder(tf.int32, shape=[None, None], name="labels")
self.sequence_lengths = tf.placeholder(tf.int32, shape=[None], name="sequence_lengths")
self.dropout_pl = tf.placeholder(dtype=tf.float32, shape=[], name="dropout")
self.lr_pl = tf.placeholder(dtype=tf.float32, shape=[], name="lr")
def lookup_layer_op(self):
with tf.variable_scope("words"):
_word_embeddings = tf.Variable(self.embeddings,
dtype=tf.float32,
trainable=self.update_embedding,
name="_word_embeddings")
word_embeddings = tf.nn.embedding_lookup(params=_word_embeddings,
ids=self.word_ids,
name="word_embeddings")
self.word_embeddings = tf.nn.dropout(word_embeddings, self.dropout_pl)
def biLSTM_layer_op(self):
with tf.variable_scope("bi-lstm"):
# 使用双向LSTM作为网络单元
cell_fw = LSTMCell(self.hidden_dim)
cell_bw = LSTMCell(self.hidden_dim)
# 前向输出 后向输出 ,末态
(output_fw_seq, output_bw_seq), _ = tf.nn.bidirectional_dynamic_rnn(
cell_fw=cell_fw,
cell_bw=cell_bw,
inputs=self.word_embeddings,
sequence_length=self.sequence_lengths,
dtype=tf.float32)
# 前向后向相连
output = tf.concat([output_fw_seq, output_bw_seq], axis=-1)
# 增加dropout 防止过拟合
output = tf.nn.dropout(output, self.dropout_pl)
# 以下为全连接操作
with tf.variable_scope("proj"):
W = tf.get_variable(name="W",
shape=[2 * self.hidden_dim, self.num_tags],
initializer=tf.contrib.layers.xavier_initializer(),
dtype=tf.float32)
b = tf.get_variable(name="b",
shape=[self.num_tags],
initializer=tf.zeros_initializer(),
dtype=tf.float32)
s = tf.shape(output)
output = tf.reshape(output, [-1, 2*self.hidden_dim])
pred = tf.matmul(output, W) + b
self.logits = tf.reshape(pred, [-1, s[1], self.num_tags])
def loss_op(self):
# 使用CRF进行预测
if self.CRF:
log_likelihood, self.transition_params = crf_log_likelihood(inputs=self.logits,
tag_indices=self.labels,
sequence_lengths=self.sequence_lengths)
self.loss = -tf.reduce_mean(log_likelihood)
else:
losses = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.logits,
labels=self.labels)
mask = tf.sequence_mask(self.sequence_lengths)
losses = tf.boolean_mask(losses, mask)
self.loss = tf.reduce_mean(losses)
tf.summary.scalar("loss", self.loss)
def softmax_pred_op(self):
if not self.CRF:
self.labels_softmax_ = tf.argmax(self.logits, axis=-1)
self.labels_softmax_ = tf.cast(self.labels_softmax_, tf.int32)
def trainstep_op(self):
# 训练配置
with tf.variable_scope("train_step"):
self.global_step = tf.Variable(0, name="global_step", trainable=False)
if self.optimizer == 'Adam':
optim = tf.train.AdamOptimizer(learning_rate=self.lr_pl)
elif self.optimizer == 'Adadelta':
optim = tf.train.AdadeltaOptimizer(learning_rate=self.lr_pl)
elif self.optimizer == 'Adagrad':
optim = tf.train.AdagradOptimizer(learning_rate=self.lr_pl)
elif self.optimizer == 'RMSProp':
optim = tf.train.RMSPropOptimizer(learning_rate=self.lr_pl)
elif self.optimizer == 'Momentum':
optim = tf.train.MomentumOptimizer(learning_rate=self.lr_pl, momentum=0.9)
elif self.optimizer == 'SGD':
optim = tf.train.GradientDescentOptimizer(learning_rate=self.lr_pl)
else:
optim = tf.train.GradientDescentOptimizer(learning_rate=self.lr_pl)
grads_and_vars = optim.compute_gradients(self.loss)
grads_and_vars_clip = [[tf.clip_by_value(g, -self.clip_grad, self.clip_grad), v] for g, v in grads_and_vars]
self.train_op = optim.apply_gradients(grads_and_vars_clip, global_step=self.global_step)
def init_op(self):
self.init_op = tf.global_variables_initializer()
def add_summary(self, sess):
"""
:param sess:
:return:
"""
self.merged = tf.summary.merge_all()
self.file_writer = tf.summary.FileWriter(self.summary_path, sess.graph)
def train(self, train_data, dev_data, train_label, dev_label):
saver = tf.train.Saver(tf.global_variables())
with tf.Session(config=self.config) as sess:
sess.run(self.init_op)
self.add_summary(sess)
for epoch in range(self.epoch_num):
self.run_one_epoch(sess, [train_data, train_label], [dev_data, dev_label], epoch, saver)
def test(self, test):
saver = tf.train.Saver()
with tf.Session(config=self.config) as sess:
self.logger.info('=========== testing ===========')
saver.restore(sess, self.model_path)
label_list, seq_len_list = self.dev_one_epoch(sess, test)
self.evaluate(label_list, seq_len_list, test)
def demo_one(self, sess, sent):
"""
:param sess:
:param sent:
:return:
"""
label_list = []
for seqs, labels in batch_yield(sent, 1, self.vocab, self.tag2label, shuffle=False):
label_list_, _ = self.predict_one_batch(sess, seqs)
label_list.extend(label_list_)
label2tag = {}
for tag, label in self.tag2label.items():
label2tag[label] = tag if label != 0 else label
tag = [label2tag[label] for label in label_list[0]]
return tag
def run_one_epoch(self, sess, train, dev, epoch, saver):
train_length = np.array(train).shape[1]
num_batches = (train_length + self.batch_size - 1) // self.batch_size
print('num_batches :{}'.format(num_batches))
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
batches = batch_yield(train, self.batch_size, self.vocab, self.tag2label, shuffle=self.shuffle)
for step, (seqs, labels) in enumerate(batches):
# print(' processing: {} batch / {} batches.'.format(step + 1, num_batches) + '\r')
step_num = epoch * num_batches + step + 1
feed_dict, _ = self.get_feed_dict(seqs, labels, self.lr, self.dropout_keep_prob)
_, loss_train, summary, step_num_ = sess.run([self.train_op, self.loss, self.merged, self.global_step],feed_dict=feed_dict)
if step + 1 == 1 or (step + 1) % 300 == 1 or step + 1 == num_batches:
print('{} epoch {}, step {}, loss: {:.4}, global_step: {}'.format(start_time, epoch + 1, step + 1,loss_train, step_num))
self.file_writer.add_summary(summary, step_num)
saver.save(sess, self.model_path, global_step=step_num)
print('===========validation / test===========')
label_list_dev, seq_len_list_dev = self.dev_one_epoch(sess, dev)
self.evaluate(label_list_dev, seq_len_list_dev, dev, epoch)
def get_feed_dict(self, seqs, labels=None, lr=None, dropout=None):
"""
:param seqs:
:param labels:
:param lr:
:param dropout:
:return: feed_dict
"""
word_ids, seq_len_list = pad_sequences(seqs, pad_mark=0)
feed_dict = {self.word_ids: word_ids,
self.sequence_lengths: seq_len_list}
if labels is not None:
labels_, _ = pad_sequences(labels, pad_mark=0)
feed_dict[self.labels] = labels_
if lr is not None:
feed_dict[self.lr_pl] = lr
if dropout is not None:
feed_dict[self.dropout_pl] = dropout
return feed_dict, seq_len_list
def dev_one_epoch(self, sess, dev):
"""
:param sess:
:param dev:
:return:
"""
label_list, seq_len_list = [], []
for seqs, labels in batch_yield(dev, self.batch_size, self.vocab, self.tag2label, shuffle=False):
label_list_, seq_len_list_ = self.predict_one_batch(sess, seqs)
label_list.extend(label_list_)
seq_len_list.extend(seq_len_list_)
return label_list, seq_len_list
def predict_one_batch(self, sess, seqs):
"""
:param sess:
:param seqs:
:return: label_list
seq_len_list
"""
feed_dict, seq_len_list = self.get_feed_dict(seqs, dropout=1.0)
if self.CRF:
logits, transition_params = sess.run([self.logits, self.transition_params],
feed_dict=feed_dict)
label_list = []
for logit, seq_len in zip(logits, seq_len_list):
viterbi_seq, _ = viterbi_decode(logit[:seq_len], transition_params)
label_list.append(viterbi_seq)
return label_list, seq_len_list
else:
label_list = sess.run(self.labels_softmax_, feed_dict=feed_dict)
return label_list, seq_len_list
def evaluate(self, label_list, seq_len_list, data, epoch=None):
label2tag = {}
for tag, label in self.tag2label.items():
label2tag[label] = tag
label = data[1]
total = 0
true = 0
for index ,item in enumerate(label_list):
predict_result = [label2tag[label_] for label_ in item]
ground_truth = label[index]
assert len(predict_result) == len(ground_truth)
total += len(predict_result)
for index,item in enumerate(ground_truth):
if ground_truth[index] == predict_result[index]:
true += 1
print('Evaluate accuracy is :{}'.format(true/total))
```
添加了基本的注释,有兴趣的同学可以看下源码 ,明天将会更新训练测试代码以及模型代码的基本意义~ 蟹蟹~