赞
踩
在前面,使用了word embedding去实现了toy级别的文本情感分类,那么现在在这个模型中添加上LSTM层,观察分类效果。
为了达到更好的效果,对之前的模型做如下修改
MAX_LEN = 200
构建dataset的过程,把数据转化为2分类的问题,pos为1,neg为0,否则25000个样本完成10个类别的划分数据量是不够的
在实例化LSTM的时候,使用dropout=0.5,在model.eval()的过程中,dropout自动会为0
- import torch
- import pickle
- import torch.nn as nn
- import torch.nn.functional as F
-
- ws = pickle.load(open('./model/ws.pkl', 'rb'))
- device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
-
-
- class IMDBLstmModel(nn.Module):
- def __init__(self):
- super().__init__()
- self.embedding_dim = 200
- self.hidden_size = 64
- self.num_layer = 2
- self.bidirectional = True
- self.bi_num = 2 if self.bidirectional else 1
- self.dropout = 0.5
- # 以上部分为超参数,可以自行修改
-
- self.embedding = nn.Embedding(len(ws), self.embedding_dim, padding_idx=ws.PAD) # [N, 300]
- self.lstm = nn.LSTM(self.embedding_dim, self.hidden_size, self.num_layer, bidirectional=self.bidirectional,
- dropout=self.dropout)
-
- # 使用两个全连接层,中间使用relu激活函数
- self.fc = nn.Linear(self.hidden_size * self.bi_num, 20)
- self.fc2 = nn.Linear(20, 2)
-
- def forward(self, x):
- x = self.embedding(x)
- x = x.permute(1, 0, 2) # 进行轴交换
- h_0, c_0 = self.init_hidden_state(x.size(1))
- _, (h_n, c_0) = self.lstm(x, (h_0, c_0))
-
- # 只要最后一个lstm单元处理的结果,这里去掉了hidden_state
- out = torch.cat([h_n[-2, :, :], h_n[-1, :, :]], dim=-1)
- out = self.fc(out)
- out = F.relu(out)
- out = self.fc2(out)
- return F.log_softmax(out, dim=-1)
-
- def init_hidden_state(self, batch_size):
- h_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
- c_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
- return h_0, c_0

为了提高程序的运行速度,可以考虑把模型放在gup上运行,那么此时需要处理一下几点:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
除了上述修改外,涉及计算的所有tensor都需要转化为CUDA的tensor
初始化的h_0,c_0
训练集和测试集的input,traget
在最后可以通过tensor.cpu()
转化为torch的普通tensor
- from torch import optim
-
- train_batch_size = 64
- test_batch_size = 5000
- # imdb_model = IMDBLstmModel(MAX_LEN) # 基础model
- imdb_model = IMDBLstmModel().to(device) # 在GPU上运行,提高运行速度
- # imdb_model.load_state_dict(torch.load("model/
- optimizer = optim.Adam(imdb_model.parameters())
- criterion = nn.CrossEntropyLoss()
-
-
- def train(epoch):
- mode = True
- imdb_model.train(mode)
- train_dataloader = get_dataloader(mode, train_batch_size)
- for idx, (target, input, input_length) in enumerate(train_dataloader):
- target = target.to(device)
- input = input.to(device)
- optimizer.zero_grad()
- output = imdb_model(input)
- loss = F.nll_loss(output, target) # target需要是[0,9],不能是[1-10]
- loss.backward()
- optimizer.step()
- if idx % 10 == 0:
- pred = torch.max(output, dim=-1, keepdim=False)[-1]
- acc = pred.eq(target.data).cpu().numpy().mean() * 100. # 使用eq判断是否一致
- print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\t ACC: {:.6f}'.format(epoch, idx * len(input),
- len(train_dataloader.dataset),
- 100. * idx / len(
- train_dataloader),
- loss.item(), acc))
-
- torch.save(imdb_model.state_dict(), "model/mnist_net.pkl")
- torch.save(optimizer.state_dict(), 'model/mnist_optimizer.pkl')
-
-
- def test():
- mode = False
- imdb_model.eval()
- test_dataloader = get_dataloader(mode, test_batch_size)
- with torch.no_grad():
- for idx, (target, input, input_lenght) in enumerate(test_dataloader):
- target = target.to(device)
- input = input.to(device)
- output = imdb_model(input)
- test_loss = F.nll_loss(output, target, reduction="mean")
- pred = torch.max(output, dim=-1, keepdim=False)[-1]
- correct = pred.eq(target.data).sum()
- acc = 100. * pred.eq(target.data).cpu().numpy().mean()
- print('idx: {} Test set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(idx, test_loss, correct,
- target.size(0), acc))
-
-
- if __name__ == "__main__":
- test()
- for i in range(10):
- train(i)
- test()

- ...
- Train Epoch: 9 [20480/25000 (82%)] Loss: 0.017165 ACC: 100.000000
- Train Epoch: 9 [21120/25000 (84%)] Loss: 0.021572 ACC: 98.437500
- Train Epoch: 9 [21760/25000 (87%)] Loss: 0.058546 ACC: 98.437500
- Train Epoch: 9 [22400/25000 (90%)] Loss: 0.045248 ACC: 98.437500
- Train Epoch: 9 [23040/25000 (92%)] Loss: 0.027622 ACC: 98.437500
- Train Epoch: 9 [23680/25000 (95%)] Loss: 0.097722 ACC: 95.312500
- Train Epoch: 9 [24320/25000 (97%)] Loss: 0.026713 ACC: 98.437500
- Train Epoch: 9 [15600/25000 (100%)] Loss: 0.006082 ACC: 100.000000
- idx: 0 Test set: Avg. loss: 0.8794, Accuracy: 4053/5000 (81.06%)
- idx: 1 Test set: Avg. loss: 0.8791, Accuracy: 4018/5000 (80.36%)
- idx: 2 Test set: Avg. loss: 0.8250, Accuracy: 4087/5000 (81.74%)
- idx: 3 Test set: Avg. loss: 0.8380, Accuracy: 4074/5000 (81.48%)
- idx: 4 Test set: Avg. loss: 0.8696, Accuracy: 4027/5000 (80.54%)
可以看到模型的测试准确率稳定在81%左右。
大家可以把上述代码改为GRU,或者多层LSTM继续尝试,观察效果
目录结构:
main.py
- # 由于pickle特殊性,需要在此导入Word2Sequence
- from word_squence import Word2Sequence
- import pickle
- import os
- from dataset import tokenlize
- from tqdm import tqdm # 显示当前迭代进度
-
- TRAIN_PATH = r"../data/aclImdb/train"
-
- if __name__ == '__main__':
- ws = Word2Sequence()
- temp_data_path = [os.path.join(TRAIN_PATH, 'pos'), os.path.join(TRAIN_PATH, 'neg')]
- for data_path in temp_data_path:
- # 获取每一个文件的路径
- file_paths = [os.path.join(data_path, file_name) for file_name in os.listdir(data_path)]
- for file_path in tqdm(file_paths):
- sentence = tokenlize(open(file_path, errors='ignore').read())
- ws.fit(sentence)
- ws.build_vocab(max=10, max_features=10000)
- pickle.dump(ws, open('./model/ws.pkl', 'wb'))
- print(len(ws.dict))

model.py
- """
- 定义模型
- 模型优化方法:
- # 为使得结果更好 添加一个新的全连接层,作为输出,激活函数处理
- # 把双向LSTM的output传给一个单向LSTM再进行处理
- lib.max_len = 200
- lib.embedding_dim = 100 # 用长度为100的向量表示一个词
- lib.hidden_size = 128 # 每个隐藏层中LSTM单元个数
- lib.num_layer = 2 # 隐藏层数量
- lib.bidirectional = True # 是否双向LSTM
- lib.dropout = 0.3 # 在训练时以一定的概率使神经元失活,实际上就是让对应神经元的输出为0
- lib.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
- """
- import torch.nn as nn
- from lib import ws
- import torch.nn.functional as F
- from torch.optim import Adam
- from dataset import get_dataloader
- from tqdm import tqdm
- import torch
- import numpy as np
- import lib
- import os
-
-
- class Mymodel(nn.Module):
- def __init__(self):
- super().__init__()
- # nn.Embedding(num_embeddings - 词嵌入字典大小即一个字典里要有多少个词,embedding_dim - 每个词嵌入向量的大小。)
- self.embedding = nn.Embedding(len(ws), 100)
- # 加入LSTM
- self.lstm = nn.LSTM(input_size=lib.embedding_dim, hidden_size=lib.hidden_size, num_layers=lib.num_layer,
- batch_first=True, bidirectional=lib.bidirectional, dropout=lib.dropout)
- self.fc = nn.Linear(lib.hidden_size * 2, 2)
-
- def forward(self, input):
- """
- :param input: 形状[batch_size, max_len]
- :return:
- """
- x = self.embedding(input) # 进行embedding,形状[batch_size, max_len, 100]
-
- # x [batch_size, max_len, num_direction*hidden_size]
- # h_n[num_direction * num_layer, batch_size, hidden_size]
- x, (h_n, c_n) = self.lstm(x)
- # 获取两个方向最后一次的output(正向最后一个和反向第一个)进行concat
- # output = x[:,-1,:hidden_size] 前向,等同下方
- output_fw = h_n[-2, :, :] # 正向最后一次输出
- # output = x[:,0,hidden_size:] 反向,等同下方
- output_bw = h_n[-1, :, :] # 反向最后一次输出
- # 只要最后一个lstm单元处理的结果,这里去掉了hidden state
- output = torch.cat([output_fw, output_bw], dim=-1) # [batch_size, hidden_size*num_direction]
-
- out = self.fc(output)
-
- return F.log_softmax(out, dim=-1)
-
-
- model = Mymodel()
- optimizer = Adam(model.parameters(), lr=0.01)
- if os.path.exists('./model/model.pkl'):
- model.load_state_dict(torch.load('./model/model.pkl'))
- optimizer.load_state_dict(torch.load('./model/optimizer.pkl'))
-
-
- # 训练
- def train(epoch):
- for idx, (input, target) in enumerate(get_dataloader(train=True)):
- output = model(input)
- optimizer.zero_grad()
- loss = F.nll_loss(output, target)
- loss.backward()
- optimizer.step()
- print(loss.item())
- print('当前第%d轮,idx为%d 损失为:%lf, ' % (epoch, idx, loss.item()))
-
- # 保存模型
- if idx % 100 == 0:
- torch.save(model.state_dict(), './model/model.pkl')
- torch.save(optimizer.state_dict(), './model/optimizer.pkl')
-
-
- # 评估
- def test():
- acc_list = []
- loss_list = []
- # 开启模型评估模式
- model.eval()
- # 获取测试集数据
- test_dataloader = get_dataloader(train=False)
- # tqdm(total = 总数,ascii = #,desc=描述)
- for idx, (input, target) in tqdm(enumerate(test_dataloader), total=len(test_dataloader), ascii=True, desc='评估:'):
- with torch.no_grad():
- output = model(input)
- # 计算当前损失
- cur_loss = F.nll_loss(output, target)
- loss_list.append(cur_loss)
- pred = output.max(dim=-1)[-1]
- # 计算当前准确率
- cur_acc = pred.eq(target).float().mean()
- acc_list.append(cur_acc)
- print('准确率为:%lf, 损失为:%lf' % (np.mean(acc_list), np.mean(loss_list)))
-
-
- if __name__ == '__main__':
- for i in tqdm(range(10)):
- train(i)
- test()

dataset.py:
- import torch
- from torch.utils.data import Dataset, DataLoader
- import os
- import re
-
- """
- 完成数据集准备
- """
- TRAIN_PATH = r"..\data\aclImdb\train"
- TEST_PATH = r"..\data\aclImdb\test"
-
-
- # 分词
- def tokenlize(content):
- content = re.sub(r"<.*?>", " ", content)
- filters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>', '\?',
- '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '”', '“', ]
- content = re.sub("|".join(filters), " ", content)
- tokens = [i.strip().lower() for i in content.split()]
- return tokens
-
-
- class ImbdDateset(Dataset):
- def __init__(self, train=True):
- self.train_data_path = TRAIN_PATH
- self.test_data_path = TEST_PATH
- # 通过train和data_path控制读取train或者test数据集
- data_path = self.train_data_path if train else self.test_data_path
- # 把所有文件名放入列表
- # temp_data_path = [data_path + '/pos', data_path + '/neg']
- temp_data_path = [os.path.join(data_path, 'pos'), os.path.join(data_path, 'neg')]
- self.total_file_path = [] # 所有pos,neg评论文件的path
- # 获取每个文件名字,并拼接路径
- for path in temp_data_path:
- file_name_list = os.listdir(path)
- file_path_list = [os.path.join(path, i) for i in file_name_list if i.endswith('.txt')]
- self.total_file_path.extend(file_path_list)
-
- def __getitem__(self, index):
- # 获取index的path
- file_path = self.total_file_path[index]
- # 获取label
- label_str = file_path.split('\\')[-2]
- label = 0 if label_str == 'neg' else 1
- # 获取content
- tokens = tokenlize(open(file_path, errors='ignore').read())
- return tokens, label
-
- def __len__(self):
- return len(self.total_file_path)
-
-
- def get_dataloader(train=True):
- imdb_dataset = ImbdDateset(train)
- data_loader = DataLoader(imdb_dataset, shuffle=True, batch_size=128, collate_fn=collate_fn)
- return data_loader
-
-
- # 重新定义collate_fn
- def collate_fn(batch):
- """
- :param batch: (一个__getitem__[tokens, label], 一个__getitem__[tokens, label],..., batch_size个)
- :return:
- """
- content, label = list(zip(*batch))
- from lib import ws, max_len
- content = [ws.transform(i, max_len=max_len) for i in content]
- content = torch.LongTensor(content)
- label = torch.LongTensor(label)
- return content, label
-
-
- if __name__ == '__main__':
- for idx, (input, target) in enumerate(get_dataloader()):
- print(idx)
- print(input)
- print(target)
- break

word_squence.py
- import numpy as np
-
- """
- 构建词典,实现方法把句子转换为序列,和其翻转
- """
-
-
- class Word2Sequence(object):
- # 2个特殊类属性,标记特殊字符和填充标记
- UNK_TAG = 'UNK'
- PAD_TAG = 'PAD'
-
- UNK = 0
- PAD = 1
-
- def __init__(self):
- self.dict = {
- # 保存词语和对应的数字
- self.UNK_TAG: self.UNK,
- self.PAD_TAG: self.PAD
- }
- self.count = {} # 统计词频
-
- def fit(self, sentence):
- """
- 把单个句子保存到dict中
- :param sentence: [word1, word2 , ... , ]
- :return:
- """
- for word in sentence:
- # 对word出现的频率进行统计,当word不在sentence时,返回值是0,当word在sentence中时,返回+1,以此进行累计计数
- self.count[word] = self.count.get(word, 0) + 1
-
- def build_vocab(self, min=5, max=None, max_features=None):
- """
- 生成词典
- :param min:最小词频数
- :param max:最大词频数
- :param max_feature:一共保留多少词语
- :return:
- """
- # 删除count < min 的词语,即保留count > min 的词语
- if min is not None:
- self.count = {word: value for word, value in self.count.items() if value > min}
- # 删除count > min 的词语,即保留count < max 的词语
- if max is not None:
- self.count = {word: value for word, value in self.count.items() if value < max}
- # 限制保留的词语数
- if max_features is not None:
- # sorted 返回一个列表[(key1, value1), (key2, value2),...],True为升序
- temp = sorted(self.count.items(), key=lambda x: x[-1], reverse=True)[:max_features]
- self.count = dict(temp)
- for word in self.count:
- self.dict[word] = len(self.dict)
-
- # 得到一个翻转的dict字典
- # zip方法要比{value: word for word, value in self.dict.items()}快
- self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))
-
- def transform(self, sentence, max_len=None):
- """
- 把句子转换为序列
- :param sentence: [word1, word2...]
- :param max_len: 对句子进行填充或者裁剪
- :return:
- """
- if max_len is not None:
- # 句子长度小于最大长度,进行填充
- if max_len > len(sentence):
- sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))
- # 句子长度大于最大长度,进行裁剪
- if max_len < len(sentence):
- sentence = sentence[:max_len]
- # for word in sentence:
- # self.dict.get(word, self.UNK)
- # 字典的get(key, default=None) 如果指定键不存在,则返回默认值None。
- return [self.dict.get(word, self.UNK) for word in sentence]
-
- def inverse_transform(self, indices):
- """
- 把序列转换为句子
- :param indices: [1, 2, 3, ...]
- :return:
- """
- return [self.inverse_dict.get(idx) for idx in indices]
-
- def __len__(self):
- return len(self.dict)
-
-
- if __name__ == '__main__':
- ws = Word2Sequence()
- ws.fit(["我", "是", "我"])
- ws.fit(["我", "是", "谁"])
- ws.build_vocab(min=1, max_features=5)
- print(ws.dict)
- ret = ws.transform(['我', '爱', '北京'], max_len=10)
- print(ret)
- print(ws.inverse_transform(ret))

lib.py
- import pickle
- import torch
-
- ws = pickle.load(open('./model/ws.pkl', 'rb'))
-
- max_len = 200
- embedding_dim = 100 # 用长度为100的向量表示一个词
- hidden_size = 128 # 每个隐藏层中LSTM单元个数
- num_layer = 2 # 隐藏层数量
- bidirectional = True # 是否双向LSTM
- dropout = 0.3 # 在训练时以一定的概率使神经元失活,实际上就是让对应神经元的输出为0
-
- device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。