赞
踩
def build_vocab(file_path, tokenizer, max_size, min_freq): # 定义词汇表字典:使用 vocab_dic = {} 初始化一个空字典,用于存储每个词及其出现频率 vocab_dic = {} with open(file_path, 'r', encoding='UTF-8') as f: for line in tqdm(f): lin = line.strip() if not lin: continue content = lin.split('\t')[0] """ 分割与计数:对每行内容进行处理,先用 strip() 去除首尾空白符,然后分割出需要处理的文本内容(默认以制表符\t分割)。 使用 tokenizer(content) 对内容进行分词,然后统计每个词的出现次数 """ for word in tokenizer(content): vocab_dic[word] = vocab_dic.get(word, 0) + 1 """ 词汇按出现频率筛选(频率大于等于 min_freq)并排序(按频率降序),最多保留 max_size 个词汇 """ vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= min_freq], key=lambda x: x[1], reverse=True)[:max_size] ''' 重构词汇表:将筛选并排序后的词汇列表转换为字典,每个词汇映射到一个唯一的索引。 词汇表中还额外添加了特殊标记 UNK(未知词)和 PAD(填充符),它们分别在词汇表的末尾添加。 ''' vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)} vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1}) # k:v ===> 词:索引 return vocab_dic
def build_dataset(config, ues_word):
if ues_word:
tokenizer = lambda x: x.split(' ') # 以空格隔开,word-level
else:
tokenizer = lambda x: [y for y in x] # char-level
if os.path.exists(config.vocab_path):
vocab = pkl.load(open(config.vocab_path, 'rb'))
else:
vocab = build_vocab(config.train_path, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1)
pkl.dump(vocab, open(config.vocab_path, 'wb'))
print(f"Vocab size: {len(vocab)}")
train = load_dataset(config.train_path,vocab, config.pad_size)
dev = load_dataset(config.dev_path,vocab, config.pad_size)
test = load_dataset(config.test_path,vocab, config.pad_size)
return vocab, train, dev, test
首先数据格式:
文本内容以及对应过的标签
data label
数据预处理:
去除空行: 忽略空行。
分割行: 将每一行通过制表符\t分割为content(内容)和label(标签)。
文本转换: 使用tokenizer函数将content分词。
序列填充或截断: 根据pad_size参数(默认为32),
如果分词后的序列长度小于pad_size,则用vocab字典中的PAD标记进行填充;如果长度大于pad_size,则进行截断。
def load_dataset(path,vocab,tokenizer, pad_size=32): contents = [] with open(path, 'r', encoding='UTF-8') as f: for line in tqdm(f): lin = line.strip() if not lin: continue content, label = lin.split('\t') words_line = [] token = tokenizer(content) seq_len = len(token) if pad_size: if len(token) < pad_size: token.extend([vocab.get(PAD)] * (pad_size - len(token))) else: token = token[:pad_size] seq_len = pad_size # word to id for word in token: words_line.append(vocab.get(word, vocab.get(UNK))) contents.append((words_line, int(label), seq_len)) return contents # [([...], 0), ([...], 1), ...]
class DatasetIterater(object): def __init__(self, batches, batch_size, device): self.batch_size = batch_size self.batches = batches self.n_batches = len(batches) // batch_size self.residue = False # 记录batch数量是否为整数 if len(batches) % self.n_batches != 0: self.residue = True self.index = 0 self.device = device def _to_tensor(self, datas): # xx = [xxx[2] for xxx in datas] # indexx = np.argsort(xx)[::-1] # datas = np.array(datas)[indexx] x = torch.LongTensor([_[0] for _ in datas]).to(self.device) y = torch.LongTensor([_[1] for _ in datas]).to(self.device) bigram = torch.LongTensor([_[3] for _ in datas]).to(self.device) trigram = torch.LongTensor([_[4] for _ in datas]).to(self.device) # pad前的长度(超过pad_size的设为pad_size) seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device) return (x, seq_len, bigram, trigram), y def __next__(self): if self.residue and self.index == self.n_batches: batches = self.batches[self.index * self.batch_size: len(self.batches)] self.index += 1 batches = self._to_tensor(batches) return batches elif self.index > self.n_batches: self.index = 0 raise StopIteration else: batches = self.batches[self.index * self.batch_size: (self.index + 1) * self.batch_size] self.index += 1 batches = self._to_tensor(batches) return batches def __iter__(self): return self def __len__(self): if self.residue: return self.n_batches + 1 else: return self.n_batches
设置文件路径和参数:
vocab_dir
:词汇表的文件路径,这个文件包含从词汇到索引的映射。pretrain_dir
:预训练词向量文件的路径。emb_dim
:词向量的维度,这里设为300。filename_trimmed_dir
:压缩后保存新词向量的文件路径。加载词汇表:
pickle
(pkl
)加载词汇表,得到word_to_id
字典,它将词汇映射到一个唯一的索引。初始化词向量矩阵:
embeddings
,其形状为词汇表长度×词向量维度(len(word_to_id), emb_dim
)。读取预训练的词向量:
lin
,其中lin[0]
是词汇,lin[1:301]
是对应的300维词向量。更新词向量矩阵:
lin[0]
存在于word_to_id
中,找到对应的索引idx
。lin[1:301]
中的字符串转换为浮点数,形成新的词向量emb
。embeddings
矩阵中的idx
行,即用新的词向量替换原来的随机向量。保存词向量矩阵:
numpy
的savez_compressed
方法,将更新后的embeddings
矩阵压缩保存到指定路径。'''提取预训练词向量''' vocab_dir = "./THUCNews/data/vocab.pkl" pretrain_dir = "./THUCNews/data/sgns.sogou.char" emb_dim = 300 filename_trimmed_dir = "./THUCNews/data/vocab.embedding.sougou" word_to_id = pkl.load(open(vocab_dir, 'rb')) embeddings = np.random.rand(len(word_to_id), emb_dim) f = open(pretrain_dir, "r", encoding='UTF-8') for i, line in enumerate(f.readlines()): # if i == 0: # 若第一行是标题,则跳过 # continue lin = line.strip().split(" ") if lin[0] in word_to_id: idx = word_to_id[lin[0]] emb = [float(x) for x in lin[1:(emb_dim+1)]] embeddings[idx] = np.asarray(emb, dtype='float32') f.close() np.savez_compressed(filename_trimmed_dir, embeddings=embeddings)
输入文本先通过embedding层转换为词向量表示。
添加一个维度以适配卷积操作(unsqueeze(1))。
应用多个卷积层和池化层(conv_and_pool),然后将结果拼接。
应用Dropout。
通过全连接层得到最终分类结果。
# coding: UTF-8 import torch import torch.nn as nn import torch.nn.functional as F import numpy as np class Config(object): """配置参数""" def __init__(self, dataset, embedding): self.model_name = 'TextCNN' self.train_path = dataset + '/data/train.txt' # 训练集 self.dev_path = dataset + '/data/dev.txt' # 验证集 self.test_path = dataset + '/data/test.txt' # 测试集 self.class_list = [x.strip() for x in open( dataset + '/data/class.txt').readlines()] # 类别名单 self.vocab_path = dataset + '/data/vocab.pkl' # 词表 self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt' # 模型训练结果 self.log_path = dataset + '/log/' + self.model_name self.embedding_pretrained = torch.tensor( np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32'))\ if embedding != 'random' else None # 预训练词向量 self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 设备 self.dropout = 0.5 # 随机失活 self.require_improvement = 1000 # 若超过1000batch效果还没提升,则提前结束训练 self.num_classes = len(self.class_list) # 类别数 self.n_vocab = 0 # 词表大小,在运行时赋值 self.num_epochs = 20 # epoch数 self.batch_size = 128 # mini-batch大小 self.pad_size = 32 # 每句话处理成的长度(短填长切) self.learning_rate = 1e-3 # 学习率 self.embed = self.embedding_pretrained.size(1)\ if self.embedding_pretrained is not None else 300 # 字向量维度 self.filter_sizes = (2, 3, 4) # 卷积核尺寸 self.num_filters = 256 # 卷积核数量(channels数) '''Convolutional Neural Networks for Sentence Classification''' class Model(nn.Module): def __init__(self, config): super(Model, self).__init__() if config.embedding_pretrained is not None: self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False) else: self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1) self.convs = nn.ModuleList( [nn.Conv2d(1, config.num_filters, (k, config.embed)) for k in config.filter_sizes]) self.dropout = nn.Dropout(config.dropout) self.fc = nn.Linear(config.num_filters * len(config.filter_sizes), config.num_classes) def conv_and_pool(self, x, conv): x = F.relu(conv(x)).squeeze(3) x = F.max_pool1d(x, x.size(2)).squeeze(2) return x def forward(self, x): #print (x[0].shape) out = self.embedding(x[0]) out = out.unsqueeze(1) out = torch.cat([self.conv_and_pool(out, conv) for conv in self.convs], 1) out = self.dropout(out) out = self.fc(out) return out
# coding: UTF-8 import torch import torch.nn as nn import torch.nn.functional as F import numpy as np class Config(object): """配置参数""" def __init__(self, dataset, embedding): self.model_name = 'TextRNN' self.train_path = dataset + '/data/train.txt' # 训练集 self.dev_path = dataset + '/data/dev.txt' # 验证集 self.test_path = dataset + '/data/test.txt' # 测试集 self.class_list = [x.strip() for x in open( dataset + '/data/class.txt').readlines()] # 类别名单 self.vocab_path = dataset + '/data/vocab.pkl' # 词表 self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt' # 模型训练结果 self.log_path = dataset + '/log/' + self.model_name self.embedding_pretrained = torch.tensor( np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32'))\ if embedding != 'random' else None # 预训练词向量 self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 设备 self.dropout = 0.5 # 随机失活 self.require_improvement = 1000 # 若超过1000batch效果还没提升,则提前结束训练 self.num_classes = len(self.class_list) # 类别数 self.n_vocab = 0 # 词表大小,在运行时赋值 self.num_epochs = 10 # epoch数 self.batch_size = 128 # mini-batch大小 self.pad_size = 32 # 每句话处理成的长度(短填长切) self.learning_rate = 1e-3 # 学习率 self.embed = self.embedding_pretrained.size(1)\ if self.embedding_pretrained is not None else 300 # 字向量维度, 若使用了预训练词向量,则维度统一 self.hidden_size = 128 # lstm隐藏层 self.num_layers = 2 # lstm层数 '''Recurrent Neural Network for Text Classification with Multi-Task Learning''' class Model(nn.Module): def __init__(self, config): super(Model, self).__init__() if config.embedding_pretrained is not None: self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False) else: self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1) self.lstm = nn.LSTM(config.embed, config.hidden_size, config.num_layers, bidirectional=True, batch_first=True, dropout=config.dropout) self.fc = nn.Linear(config.hidden_size * 2, config.num_classes) def forward(self, x): x, _ = x out = self.embedding(x) # [batch_size, seq_len, embeding]=[128, 32, 300] out, _ = self.lstm(out) out = self.fc(out[:, -1, :]) # 句子最后时刻的 hidden state return out
# coding: UTF-8 import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from sklearn import metrics import time from utils import get_time_dif from tensorboardX import SummaryWriter # 权重初始化,默认xavier def init_network(model, method='xavier', exclude='embedding', seed=123): for name, w in model.named_parameters(): if exclude not in name: if 'weight' in name: if method == 'xavier': nn.init.xavier_normal_(w) elif method == 'kaiming': nn.init.kaiming_normal_(w) else: nn.init.normal_(w) elif 'bias' in name: nn.init.constant_(w, 0) else: pass def train(config, model, train_iter, dev_iter, test_iter,writer): start_time = time.time() model.train() optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate) # 学习率指数衰减,每次epoch:学习率 = gamma * 学习率 # scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9) total_batch = 0 # 记录进行到多少batch dev_best_loss = float('inf') last_improve = 0 # 记录上次验证集loss下降的batch数 flag = False # 记录是否很久没有效果提升 #writer = SummaryWriter(log_dir=config.log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime())) for epoch in range(config.num_epochs): print('Epoch [{}/{}]'.format(epoch + 1, config.num_epochs)) # scheduler.step() # 学习率衰减 for i, (trains, labels) in enumerate(train_iter): #print (trains[0].shape) outputs = model(trains) model.zero_grad() loss = F.cross_entropy(outputs, labels) loss.backward() optimizer.step() if total_batch % 100 == 0: # 每多少轮输出在训练集和验证集上的效果 true = labels.data.cpu() predic = torch.max(outputs.data, 1)[1].cpu() train_acc = metrics.accuracy_score(true, predic) dev_acc, dev_loss = evaluate(config, model, dev_iter) if dev_loss < dev_best_loss: dev_best_loss = dev_loss torch.save(model.state_dict(), config.save_path) improve = '*' last_improve = total_batch else: improve = '' time_dif = get_time_dif(start_time) msg = 'Iter: {0:>6}, Train Loss: {1:>5.2}, Train Acc: {2:>6.2%}, Val Loss: {3:>5.2}, Val Acc: {4:>6.2%}, Time: {5} {6}' print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc, time_dif, improve)) writer.add_scalar("loss/train", loss.item(), total_batch) writer.add_scalar("loss/dev", dev_loss, total_batch) writer.add_scalar("acc/train", train_acc, total_batch) writer.add_scalar("acc/dev", dev_acc, total_batch) model.train() total_batch += 1 if total_batch - last_improve > config.require_improvement: # 验证集loss超过1000batch没下降,结束训练 print("No optimization for a long time, auto-stopping...") flag = True break if flag: break writer.close() test(config, model, test_iter) def test(config, model, test_iter): # test model.load_state_dict(torch.load(config.save_path)) model.eval() start_time = time.time() test_acc, test_loss, test_report, test_confusion = evaluate(config, model, test_iter, test=True) msg = 'Test Loss: {0:>5.2}, Test Acc: {1:>6.2%}' print(msg.format(test_loss, test_acc)) print("Precision, Recall and F1-Score...") print(test_report) print("Confusion Matrix...") print(test_confusion) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) def evaluate(config, model, data_iter, test=False): model.eval() loss_total = 0 predict_all = np.array([], dtype=int) labels_all = np.array([], dtype=int) with torch.no_grad(): for texts, labels in data_iter: outputs = model(texts) loss = F.cross_entropy(outputs, labels) loss_total += loss labels = labels.data.cpu().numpy() predic = torch.max(outputs.data, 1)[1].cpu().numpy() labels_all = np.append(labels_all, labels) predict_all = np.append(predict_all, predic) acc = metrics.accuracy_score(labels_all, predict_all) if test: report = metrics.classification_report(labels_all, predict_all, target_names=config.class_list, digits=4) confusion = metrics.confusion_matrix(labels_all, predict_all) return acc, loss_total / len(data_iter), report, confusion return acc, loss_total / len(data_iter)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。