赞
踩
目录
使用Pytorch相关API,设计两种网络结构,一种网络结构中只有全连接层,一种使用文本处理中最为常用的LSTM,将数据集进行10分类,观察并对比两者的分类效果。
模型情感分类的数据集是经典的IMDB数据集,数据集下载地址:http://ai.stanford.edu/~amaas/data/sentiment/。这是一份包含了5万条流行电影的评论数据,其中训练集25000条,测试集25000条。数据格式如下:
数据的标签以文件名的方式呈现,图中左边为名称,其中名称包含两部分,分别是序号和情感评分,即序号_情感评分。情感评分中1-4为neg,5-10为pos,共有10个分类。右边为文件中的评论内容。每个文件中文本长度不一定相等。
数据集的组织形式如下:
下载完数据集,在aclImdb文件夹中,有如下文件:
train和test分别表示训练数据和测试数据所在的文件夹,其中文件夹中的内容如下:
随意点开一个neg/pos,文件夹中都是txt文件,每个文件代表一条样本数据:
这些路径在后续写代码的过程中都会用得到,因为需要根据路径来读取每个txt文件。
具体可以细分为如下几步:
准备数据集,实例化dataset,准备dataloader,即设计一个类来获取样本
构建模型,定义模型多少层、形状变化、用什么激活函数等
模型训练,观察迭代过程中的损失
模型评估,观察分类的准确率
这里再着重考虑一下评论文本该怎么表示:首先评论的文本需要分词处理,处理成一个个单词,但是由于评论有长有短,所以这里需要统一长度,假设每条评论文本都有max_len个词,比50个词长的文本进行截取操作,比50个词短的文本则填充到50。接着是关于词的表示,我们用word embedding,即词向量的形式表示一个词,词向量的维度是embedding dim。这里词向量是调用pytorch中nn.Embedding方法实现的,按照给定的词向量的维度,该方法会给每个词一个随机的词向量,当然这种方法的词向量肯定不太好,nn.Embedding方法还可以加载预训练好的词向量,比如glove,word2vec等,感兴趣的可以尝试一下。
nn.Embedding方法除了需要指定embedding dim这个参数以外,它是基于一个已有的词典来随机分配词向量的,所以我们还需要从训练数据中构建一个词典,词典的大小是训练样本中的所有词,构建过程下文中会讲到,构建了这个词典后,每个词在词典中都会映射到一个特有的,用数字表示的ID上,比如hello这个词在词典中对应的是367,world对应897。nn.Embedding方法就是按照这个来分配每个词的随机向量表示的。构建完成后,比如在测试阶段,我们也需要对测试样本进行处理,并得到向量表示,这样才能喂给神经网络得到预测结果,此时要是一个词在训练样本从没有出现过,那么这个词在词典中就找不到对应的数字表示了,所以构建词典的时候我们指定一个特殊的值"UNK",其值是0,也就是说,没出现过的词,都映射为0。前面我们还说过评论需要统一长度,填充词也被预先定义成“PAD”,其值是1。
比如对于一个测试样本,分词后得到:["ni", "hao", "shi", "jie"],其中ni和jie在训练样本中出现过,它们的ID分别为34和90,hao和shi没有出现,假设max_len为5,那么["ni", "hao", "shi", "jie"]可以表示为:[34, 0, 0, 90, 1],然后每个词都转换为词向量,喂给神经网络,得到输出与实际结果进行比较,观察是否正确分类。
上面叙述了一个大概的流程,具体的过程在下面会详细提到。
准备数据集和之前的方法一样,实例化dataset,准备dataloader,最终我们的数据可以处理成如下格式:
图中示意的是batch_size等于2的情况,也就是说dataloader一次只加载两个样本。其中[4,6]是两个样本的情感标签,后面的text是两个样本中分词得到的内容,形式为(['token1', 'token2'...],['token1', 'token2'...]),元组中每个列表对应一个样本,所以每个[]中共有max_len个token。
其中关键点在于:
如何完成基础Dataset的构建和Dataloader的准备
每个batch中文本的长度不一致的问题如何解决
每个batch中的文本如何转化为数字序列
- import torch
- from torch.utils.data import DataLoader,Dataset
- import os
- import re
-
- # 路径需要根据情况修改,文件太大的时候可以引用绝对路径
- data_base_path = r"data\aclImdb"
-
- #1. 定义tokenize的方法,对评论文本分词
- def tokenize(text):
- # fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
- fileters = ['!','"','#','$','%','&','\(','\)','\*','\+',',','-','\.','/',':',';','<','=','>','\?','@'
- ,'\[','\\','\]','^','_','`','\{','\|','\}','~','\t','\n','\x97','\x96','”','“',]
- # sub方法是替换
- text = re.sub("<.*?>"," ",text,flags=re.S) # 去掉<...>中间的内容,主要是文本内容中存在<br/>等内容
- text = re.sub("|".join(fileters)," ",text,flags=re.S) # 替换掉特殊字符,'|'是把所有要匹配的特殊字符连在一起
- return [i.strip() for i in text.split()] # 去掉前后多余的空格
-
- #2. 准备dataset
- class ImdbDataset(Dataset):
- def __init__(self,mode):
- super(ImdbDataset,self).__init__()
- # 读取所有的训练文件夹名称
- if mode=="train":
- text_path = [os.path.join(data_base_path,i) for i in ["train/neg","train/pos"]]
- else:
- text_path = [os.path.join(data_base_path,i) for i in ["test/neg","test/pos"]]
-
- self.total_file_path_list = []
- # 进一步获取所有文件的名称
- for i in text_path:
- self.total_file_path_list.extend([os.path.join(i,j) for j in os.listdir(i)])
-
-
- def __getitem__(self, idx):
- cur_path = self.total_file_path_list[idx]
- # 返回path最后的文件名。如果path以/或\结尾,那么就会返回空值。即os.path.split(path)的第二个元素。
- # cur_filename返回的是如:“0_3.txt”的文件名
- cur_filename = os.path.basename(cur_path)
- # 标题的形式是:3_4.txt 前面的3是索引,后面的4是分类
- # 原本的分类是1-10,现在变为0-9
- label = int(cur_filename.split("_")[-1].split(".")[0]) -1 #处理标题,获取label,-1是因为要转化为[0-9]
- text = tokenize(open(cur_path).read().strip()) #直接按照空格进行分词
- return label,text
-
- def __len__(self):
- return len(self.total_file_path_list)
-
- # 测试是否能成功获取数据
- dataset = ImdbDataset(mode="train")
- print(dataset[0])
- # out:(2, ['Story', 'of', 'a', 'man', 'who', 'has', 'unnatural', 'feelings'...])
-
- # 2. 实例化,准备dataloader
- dataset = ImdbDataset(mode="train")
- dataloader = DataLoader(dataset=dataset,batch_size=2,shuffle=True)
-
- #3. 观察数据输出结果,在pytorch新版本(1.6)中,这里已经不能运行了,需要加上下面的`collate_fn`函数来运行,即使能够运行,结果也是不正确的
- for idx,(label,text) in enumerate(dataloader):
- print("idx:",idx)
- print("lable:",label)
- print("text:",text)
- break

输出如下:
- idx: 0
- table: tensor([3, 1])
- text: [('I', 'Want'), ('thought', 'a'), ('this', 'great'), ('was', 'recipe'), ('a', 'for'), ('great', 'failure'), ('idea', 'Take'), ('but', 'a'), ('boy', 's'), ('was', 'y'), ('it', 'plot'), ('poorly', 'add'), ('executed', 'in'), ('We', 'some'), ('do', 'weak'), ('get', 'completely'), ('a', 'undeveloped'), ('broad', 'characters'), ('sense', 'and'), ('of', 'than'), ('how', 'throw'), ('complex', 'in'), ('and', 'the'), ('challenging', 'worst'), ('the', 'special'), ('backstage', 'effects'), ('operations', 'a'), ('of', 'horror'), ('a', 'movie'), ('show', 'has'), ('are', 'known'), ('but', 'Let'), ('virtually', 'stew'), ('no', 'for'), ...('show', 'somehow'), ('rather', 'destroy'), ('than', 'every'), ('anything', 'copy'), ('worth', 'of'), ('watching', 'this'), ('for', 'film'), ('its', 'so'), ('own', 'it'), ('merit', 'will')]
很明显这里出现了问题,我们希望的是(['token1', 'token2'...],['token1', 'token2'...])的形式,但是结果中却把单词两两组合了。出现问题的原因在于Dataloader
中的参数collate_fn,
collate_fn
的默认值为torch自定义的default_collate
,collate_fn
的作用就是对每个batch进行处理,而默认的default_collate
处理出错。
在默认定义的collate_fn方法中,有一个参数batch,值为([tokens, label], [tokens, label])。也就是根据你的batch_size,决定元组中有多少个item,默认的collate_fn方法对batch做一个zip操作,把两个输入的item组合在一起,把两个目标值组合在一起,但是这里的输入是['Story', 'of', 'a', 'man', 'who', 'has', 'unnatural']是这样的形式,会再进行一次zip操作,多进行了一次两两组合(!!!),但是这显然不是我们想要的。前面的手写数字识别的程序中,由于图片用像素表示,而且我们已经将图片转换为tensor了,所以不会出错。
那么怎么才能获取到正确结果呢?
方法1:考虑先把数据转化为数字序列,观察其结果是否符合要求,之前使用DataLoader并未出现类似错误
方法2:考虑自定义一个collate_fn
,观察结果
这里使用方式2,自定义一个collate_fn
,然后观察结果:
- # 自定义的collate_fn方法
- def collate_fn(batch):
- # 手动zip操作,并转换为list,否则无法获取文本和标签了
- batch = list(zip(*batch))
- labels = torch.tensor(batch[0], dtype=torch.int32)
- texts = batch[1]
- texts = torch.tensor([ws.transform(i, max_len) for i in texts])
- del batch
- # 注意这里long()不可少,否则会报错
- return labels.long(), texts.long()
-
- #此时输出正常
- for idx,(label,text) in enumerate(dataloader):
- print("idx:",idx)
- print("label:",label)
- print("text:",text)
- break
- # table: tensor([2, 9], dtype=torch.int32) 2, 9是两个分类
- # text:([], [])

最后我们可以准备一个get_dataloader方法,更方便的获取数据:
- # 获取数据的方法
- def get_dataloader(train=True):
- if train:
- mode = 'train'
- else:
- mode = "test"
- dataset = ImdbDataset(mode)
- batch_size = train_batch_size if train else test_batch_size
- return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
我们这里使用的word embedding,不会直接把文本转化为向量,而是先转化为数字,再把数字转化为向量,那么这个过程该如何实现呢?
这里我们可以考虑把文本中的每个词语和其对应的数字,使用字典保存,同时实现方法把句子通过字典映射为包含数字的列表。
实现文本序列化之前,考虑以下几点:
如何使用字典把词语和数字进行对应
不同的词语出现的次数不尽相同,是否需要对高频或者低频词语进行过滤,以及总的词语数量是否需要进行限制
得到词典之后,如何把句子转化为数字序列
不同句子长度不相同,每个batch的句子如何构造成相同的长度(可以对短句子进行填充,填充特殊字符)
对于新出现的词语在词典中没有出现怎么办(可以使用特殊字符代替)
思路分析:
对所有句子进行分词
词语存入字典,根据次数对词语进行过滤,并统计次数
实现文本转数字序列的方法
实现数字序列转文本方法(其实该任务中用不到这个方法)
- # Word2Sequence
- class Word2Sequence:
- # 未出现过的词
- UNK_TAG = "UNK"
- PAD_TAG = "PAD"
- # 填充的词
- UNK = 0
- PAD = 1
-
- def __init__(self):
- self.dict = {
- self.UNK_TAG: self.UNK,
- self.PAD_TAG: self.PAD
- }
- self.count = {}
-
- def to_index(self, word):
- """word -> index"""
- return self.dict.get(word, self.UNK)
-
- def to_word(self, index):
- """index -> word"""
- if index in self.inversed_dict:
- return self.inversed_dict[index]
- return self.UNK_TAG
-
- def __len__(self):
- return len(self.dict)
-
- def fit(self, sentence):
- """count字典中存储每个单词出现的次数"""
- for word in sentence:
- self.count[word] = self.count.get(word, 0) + 1
-
- def build_vocab(self, min_count=None, max_count=None, max_feature=None):
- """
- 构建词典
- 只筛选出现次数在[min_count,max_count]之间的词
- 词典最大的容纳的词为max_feature,按照出现次数降序排序,要是max_feature有规定,出现频率很低的词就被舍弃了
- """
- if min_count is not None:
- self.count = {word: count for word, count in self.count.items() if count >= min_count}
-
- if max_count is not None:
- self.count = {word: count for word, count in self.count.items() if count <= max_count}
-
- if max_feature is not None:
- self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_feature])
- # 给词典中每个词分配一个数字ID
- for word in self.count:
- self.dict[word] = len(self.dict)
- # 构建一个数字映射到单词的词典,方法反向转换,但程序中用不太到
- self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
-
- def transform(self, sentence, max_len=None):
- """
- 根据词典给每个词分配的数字ID,将给定的sentence(字符串序列)转换为数字序列
- max_len:统一文本的单词个数
- """
- if max_len is not None:
- r = [self.PAD] * max_len
- else:
- r = [self.PAD] * len(sentence)
- # 截断文本
- if max_len is not None and len(sentence) > max_len:
- sentence = sentence[:max_len]
- for index, word in enumerate(sentence):
- r[index] = self.to_index(word)
- return np.array(r, dtype=np.int64)
-
- def inverse_transform(self, indices):
- """数字序列-->单词序列"""
- sentence = []
- for i in indices:
- word = self.to_word(i)
- sentence.append(word)
- return sentence

定义完这个类之后,可以简单测试一下效果:
- # 测试Word2Sequence
- w2s = Word2Sequence()
- voc = [["你", "好", "么"],
- ["你", "好", "哦"]]
- for i in voc:
- w2s.fit(i)
- w2s.build_vocab()
- print(w2s.dict)
- print(w2s.transform(["你", "好", "嘛"]))
结果如下:
- {'UNK': 0, 'PAD': 1, '你': 2, '好': 3, '么': 4, '哦': 5}
- [2 3 0]
功能经测试正确,那么我们就可以用训练文本构建词典了,注意不能读取测试样本。
- # 建立词表
- def fit_save_word_sequence():
- word_to_sequence = Word2Sequence()
- train_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
- # total_file_path_list存储总的需要读取的txt文件
- total_file_path_list = []
- for i in train_path:
- total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
- # tqdm是显示进度条的
- for cur_path in tqdm(total_file_path_list, ascii=True, desc="fitting"):
- word_to_sequence.fit(tokenize(open(cur_path, encoding="utf-8").read().strip()))
- word_to_sequence.build_vocab()
- # 对wordSequesnce进行保存
- pickle.dump(word_to_sequence, open("model/ws.pkl", "wb"))
执行该方法:
fit_save_word_sequence()
结果会生成如下文件,注意,model文件夹需要事先创建!
文件生成成功后,可以使用如下的代码加载,ws就是Word2Sequence类的一个实例,在后续工作中我们会用到。
ws = pickle.load(open("./model/ws.pkl", "rb"))
- class IMDBModel(nn.Module):
- def __init__(self):
- # 定义了两个全连接层,其中最后一个全连接层使用了softmax激活函数,并将输入的维度转换为10,实现10分类
- super(IMDBModel, self).__init__()
- # nn.Embedding方法参数:
- # len(ws):词典的总的词的数量。
- # 300:词向量的维度,即embedding dim
- # padding_idx,填充词
- self.embedding = nn.Embedding(len(ws), 300, padding_idx=ws.PAD)
- self.fc1 = nn.Linear(max_len * 300, 128)
- self.fc = nn.Linear(128, 10)
-
- def forward(self, x):
- embed = self.embedding(x)
- embed = embed.view(x.size(0), -1)
- out = self.fc1(embed)
- out = F.relu(out)
- out = self.fc(out)
- return F.log_softmax(out, dim=-1)

关于pytorch中LSTM api的使用,已经输入输出的理解,可以参考我之前写的博客:
万字长文:深入理解各类型神经网络(简单神经网络,CNN,LSTM)的输入和输出
- class IMDBModel(nn.Module):
- def __init__(self):
- super(IMDBModel, self).__init__()
- self.hidden_size = 64
- self.embedding_dim = 200
- self.num_layer = 2
- self.bidirectional = True
- self.bi_num = 2 if self.bidirectional else 1
- self.dropout = 0.5
- # 以上部分为超参数,可以自行修改
- self.embedding = nn.Embedding(len(ws), self.embedding_dim, padding_idx=ws.PAD)
- self.lstm = nn.LSTM(self.embedding_dim, self.hidden_size,
- self.num_layer, bidirectional=True, dropout=self.dropout)
- self.fc = nn.Linear(self.hidden_size * self.bi_num, 20)
- self.fc2 = nn.Linear(20, 10)
-
- def forward(self, x):
- x = self.embedding(x)
- x = x.permute(1, 0, 2) # 进行轴交换
- h_0, c_0 = self.init_hidden_state(x.size(1))
- _, (h_n, c_n) = self.lstm(x, (h_0, c_0))
- # 只要最后一个lstm单元处理的结果,取前向LSTM和后向LSTM的结果进行简单拼接
- out = torch.cat([h_n[-2, :, :], h_n[-1, :, :]], dim=-1)
- out = self.fc(out)
- out = F.relu(out)
- out = self.fc2(out)
- return F.log_softmax(out, dim=-1)
-
- def init_hidden_state(self, batch_size):
- h_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
- c_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
- return h_0, c_0

首先指定一些超参数,放在程序的最前面:
- train_batch_size = 512
- test_batch_size = 500
- max_len = 50
训练和测试:
- def train(epoch):
- mode = True
- train_dataloader = get_dataloader(mode)
- for idx, (target, input) in enumerate(train_dataloader):
- optimizer.zero_grad()
- output = imdb_model(input)
- loss = F.nll_loss(output, target)
- loss.backward()
- optimizer.step()
- if idx % 10 == 0:
- print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- epoch, idx * len(input), len(train_dataloader.dataset),
- 100. * idx / len(train_dataloader), loss.item()))
- torch.save(imdb_model.state_dict(), "model/mnist_net.pkl")
- torch.save(optimizer.state_dict(), 'model/mnist_optimizer.pkl')
-
-
- def test():
- test_loss = 0
- correct = 0
- mode = False
- imdb_model.eval()
- test_dataloader = get_dataloader(mode)
- with torch.no_grad():
- for target, input in test_dataloader:
- output = imdb_model(input)
- test_loss += F.nll_loss(output, target, reduction="sum")
- pred = torch.max(output, dim=-1, keepdim=False)[-1]
- correct += pred.eq(target.data).sum()
- test_loss = test_loss / len(test_dataloader.dataset)
- print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
- test_loss, correct, len(test_dataloader.dataset),
- 100. * correct / len(test_dataloader.dataset)))

- if __name__ == '__main__':
- # # 测试数据集的功能
- # dataset = ImdbDataset(mode="train")
- # dataloader = DataLoader(dataset=dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
- # for idx, (label, text) in enumerate(dataloader):
- # print("idx:", idx)
- # print("lable:", label)
- # print("text:", text)
- # break
-
- # 测试Word2Sequence
- # w2s = Word2Sequence()
- # voc = [["你", "好", "么"],
- # ["你", "好", "哦"]]
- # for i in voc:
- # w2s.fit(i)
- # w2s.build_vocab()
- # print(w2s.dict)
- # print(w2s.transform(["你", "好", "嘛"]))
- fit_save_word_sequence()
-
- # 训练和测试
- test()
- for i in range(3):
- train(i)
- print(
- "训练第{}轮的测试结果-----------------------------------------------------------------------------------------".format(
- i + 1))
- test()

其中两个不同网络结构训练和测试的代码是相同的,不过在LSTM网络中,有一处参数的变化:
train_batch_size = 64
需要特别注意的是,中间有一行代码:
ws = pickle.load(open("./model/ws.pkl", "rb"))
这行代码得建立词库才能正常执行,也就是必须有这个文件才能不报错,所以在训练和测试之前,先执行:
fit_save_word_sequence()
- import torch
- from torch.utils.data import DataLoader, Dataset
- import os
- import re
- import numpy as np
- import pickle
- from tqdm import tqdm
- import torch.nn as nn
- import torch.nn.functional as F
- from torch import optim
-
- data_base_path = r'E:\2020.1.16\BaiduNetdiskDownload\python5.0\课件资' \
- r'料V5.0解压密码:www.hoh0.com\课件资料V5.0\阶段9-人工智' \
- r'能NLP项目\第四天\代码\data\aclImdb_v1\aclImdb'
-
- train_batch_size = 512
- test_batch_size = 500
- max_len = 50
-
-
- # 分词的API
- def tokenize(text):
- # fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
- fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
- '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '”',
- '“', ]
- text = re.sub("<.*?>", " ", text, flags=re.S)
- text = re.sub("|".join(fileters), " ", text, flags=re.S)
- return [i.strip() for i in text.split()]
-
-
- # 自定义的数据集
- class ImdbDataset(Dataset):
- def __init__(self, mode):
- super(ImdbDataset, self).__init__()
- if mode == "train":
- text_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
- else:
- text_path = [os.path.join(data_base_path, i) for i in ["test/neg", "test/pos"]]
-
- self.total_file_path_list = []
- for i in text_path:
- self.total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
- # print(self.total_file_path_list)
-
- def __getitem__(self, idx):
- cur_path = self.total_file_path_list[idx]
- cur_filename = os.path.basename(cur_path)
- label = int(cur_filename.split("_")[-1].split(".")[0]) - 1
- text = tokenize(open(cur_path, encoding="utf-8").read().strip())
- return label, text
-
- def __len__(self):
- return len(self.total_file_path_list)
-
-
- # 自定义的collate_fn方法
- def collate_fn(batch):
- # 手动zip操作,并转换为list,否则无法获取文本和标签了
- batch = list(zip(*batch))
- labels = torch.tensor(batch[0], dtype=torch.int32)
- texts = batch[1]
- texts = torch.tensor([ws.transform(i, max_len) for i in texts])
- del batch
- # 注意这里long()不可少,否则会报错
- return labels.long(), texts.long()
-
-
- # 获取数据的方法
- def get_dataloader(train=True):
- if train:
- mode = 'train'
- else:
- mode = "test"
- dataset = ImdbDataset(mode)
- batch_size = train_batch_size if train else test_batch_size
- return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
-
-
- # Word2Sequence
- class Word2Sequence:
- # 未出现过的词
- UNK_TAG = "UNK"
- PAD_TAG = "PAD"
- # 填充的词
- UNK = 0
- PAD = 1
-
- def __init__(self):
- self.dict = {
- self.UNK_TAG: self.UNK,
- self.PAD_TAG: self.PAD
- }
- self.count = {}
-
- def to_index(self, word):
- """word -> index"""
- return self.dict.get(word, self.UNK)
-
- def to_word(self, index):
- """index -> word"""
- if index in self.inversed_dict:
- return self.inversed_dict[index]
- return self.UNK_TAG
-
- def __len__(self):
- return len(self.dict)
-
- def fit(self, sentence):
- """count字典中存储每个单词出现的次数"""
- for word in sentence:
- self.count[word] = self.count.get(word, 0) + 1
-
- def build_vocab(self, min_count=None, max_count=None, max_feature=None):
- """
- 构建词典
- 只筛选出现次数在[min_count,max_count]之间的词
- 词典最大的容纳的词为max_feature,按照出现次数降序排序,要是max_feature有规定,出现频率很低的词就被舍弃了
- """
- if min_count is not None:
- self.count = {word: count for word, count in self.count.items() if count >= min_count}
-
- if max_count is not None:
- self.count = {word: count for word, count in self.count.items() if count <= max_count}
-
- if max_feature is not None:
- self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_feature])
- # 给词典中每个词分配一个数字ID
- for word in self.count:
- self.dict[word] = len(self.dict)
- # 构建一个数字映射到单词的词典,方法反向转换,但程序中用不太到
- self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
-
- def transform(self, sentence, max_len=None):
- """
- 根据词典给每个词分配的数字ID,将给定的sentence(字符串序列)转换为数字序列
- max_len:统一文本的单词个数
- """
- if max_len is not None:
- r = [self.PAD] * max_len
- else:
- r = [self.PAD] * len(sentence)
- # 截断文本
- if max_len is not None and len(sentence) > max_len:
- sentence = sentence[:max_len]
- for index, word in enumerate(sentence):
- r[index] = self.to_index(word)
- return np.array(r, dtype=np.int64)
-
- def inverse_transform(self, indices):
- """数字序列-->单词序列"""
- sentence = []
- for i in indices:
- word = self.to_word(i)
- sentence.append(word)
- return sentence
-
-
- # 建立词表
- def fit_save_word_sequence():
- word_to_sequence = Word2Sequence()
- train_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
- # total_file_path_list存储总的需要读取的txt文件
- total_file_path_list = []
- for i in train_path:
- total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
- # tqdm是显示进度条的
- for cur_path in tqdm(total_file_path_list, ascii=True, desc="fitting"):
- word_to_sequence.fit(tokenize(open(cur_path, encoding="utf-8").read().strip()))
- word_to_sequence.build_vocab()
- # 对wordSequesnce进行保存
- pickle.dump(word_to_sequence, open("model/ws.pkl", "wb"))
-
-
- ws = pickle.load(open("./model/ws.pkl", "rb"))
-
-
- # print(len(ws))
- # 模型
- class IMDBModel(nn.Module):
- def __init__(self):
- # 定义了两个全连接层,其中最后一个全连接层使用了softmax激活函数,并将输入的维度转换为10,实现10分类
- super(IMDBModel, self).__init__()
- # nn.Embedding方法参数:
- # len(ws):词典的总的词的数量。
- # 300:词向量的维度,即embedding dim
- # padding_idx,填充词
- self.embedding = nn.Embedding(len(ws), 300, padding_idx=ws.PAD)
- self.fc1 = nn.Linear(max_len * 300, 128)
- self.fc = nn.Linear(128, 10)
-
- def forward(self, x):
- embed = self.embedding(x)
- embed = embed.view(x.size(0), -1)
- out = self.fc1(embed)
- out = F.relu(out)
- out = self.fc(out)
- return F.log_softmax(out, dim=-1)
-
-
- # 实例化
- imdb_model = IMDBModel()
- # 优化器
- optimizer = optim.Adam(imdb_model.parameters())
- # 交叉熵损失
- criterion = nn.CrossEntropyLoss()
-
-
- def train(epoch):
- mode = True
- train_dataloader = get_dataloader(mode)
- for idx, (target, input) in enumerate(train_dataloader):
- optimizer.zero_grad()
- output = imdb_model(input)
- loss = F.nll_loss(output, target)
- loss.backward()
- optimizer.step()
- if idx % 10 == 0:
- print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- epoch, idx * len(input), len(train_dataloader.dataset),
- 100. * idx / len(train_dataloader), loss.item()))
- torch.save(imdb_model.state_dict(), "model/mnist_net.pkl")
- torch.save(optimizer.state_dict(), 'model/mnist_optimizer.pkl')
-
-
- def test():
- test_loss = 0
- correct = 0
- mode = False
- imdb_model.eval()
- test_dataloader = get_dataloader(mode)
- with torch.no_grad():
- for target, input in test_dataloader:
- output = imdb_model(input)
- test_loss += F.nll_loss(output, target, reduction="sum")
- pred = torch.max(output, dim=-1, keepdim=False)[-1]
- correct += pred.eq(target.data).sum()
- test_loss = test_loss / len(test_dataloader.dataset)
- print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
- test_loss, correct, len(test_dataloader.dataset),
- 100. * correct / len(test_dataloader.dataset)))
-
-
- if __name__ == '__main__':
- # # 测试数据集的功能
- # dataset = ImdbDataset(mode="train")
- # dataloader = DataLoader(dataset=dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
- # for idx, (label, text) in enumerate(dataloader):
- # print("idx:", idx)
- # print("lable:", label)
- # print("text:", text)
- # break
-
- # 测试Word2Sequence
- # w2s = Word2Sequence()
- # voc = [["你", "好", "么"],
- # ["你", "好", "哦"]]
- # for i in voc:
- # w2s.fit(i)
- # w2s.build_vocab()
- # print(w2s.dict)
- # print(w2s.transform(["你", "好", "嘛"]))
- # fit_save_word_sequence()
-
- # 训练和测试
- test()
- for i in range(3):
- train(i)
- print(
- "训练第{}轮的测试结果-----------------------------------------------------------------------------------------".format(
- i + 1))
- test()

其中有些注释参照上面的程序,两段代码只有模型定义和一个train_batch_size不相同。
- import torch
- from torch.utils.data import DataLoader, Dataset
- import os
- import re
- import numpy as np
- import pickle
- from tqdm import tqdm
- import torch.nn as nn
- import torch.nn.functional as F
- from torch import optim
-
- data_base_path = r'E:\2020.1.16\BaiduNetdiskDownload\python5.0\课件资' \
- r'料V5.0解压密码:www.hoh0.com\课件资料V5.0\阶段9-人工智' \
- r'能NLP项目\第四天\代码\data\aclImdb_v1\aclImdb'
-
- train_batch_size = 64
- test_batch_size = 500
- max_len = 50
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
-
-
- # 分词的API
- def tokenize(text):
- # fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
- fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
- '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '”',
- '“', ]
- text = re.sub("<.*?>", " ", text, flags=re.S)
- text = re.sub("|".join(fileters), " ", text, flags=re.S)
- return [i.strip() for i in text.split()]
-
-
- # 自定义的数据集
- class ImdbDataset(Dataset):
- def __init__(self, mode):
- super(ImdbDataset, self).__init__()
- if mode == "train":
- text_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
- else:
- text_path = [os.path.join(data_base_path, i) for i in ["test/neg", "test/pos"]]
-
- self.total_file_path_list = []
- for i in text_path:
- self.total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
- # print(self.total_file_path_list)
-
- def __getitem__(self, idx):
- cur_path = self.total_file_path_list[idx]
- cur_filename = os.path.basename(cur_path)
- label = int(cur_filename.split("_")[-1].split(".")[0]) - 1
- text = tokenize(open(cur_path, encoding="utf-8").read().strip())
- return label, text
-
- def __len__(self):
- return len(self.total_file_path_list)
-
-
- # 自定义的collate_fn方法
- def collate_fn(batch):
- batch = list(zip(*batch))
- labels = torch.tensor(batch[0], dtype=torch.int32)
- texts = batch[1]
- texts = torch.tensor([ws.transform(i, max_len) for i in texts])
- del batch
- return labels.long(), texts.long()
-
-
- # 获取数据的方法
- def get_dataloader(train=True):
- if train:
- mode = 'train'
- else:
- mode = "test"
- dataset = ImdbDataset(mode)
- batch_size = train_batch_size if train else test_batch_size
- return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
-
-
- # Word2Sequence
- class Word2Sequence:
- UNK_TAG = "UNK"
- PAD_TAG = "PAD"
- UNK = 0
- PAD = 1
-
- def __init__(self):
- self.dict = {
- self.UNK_TAG: self.UNK,
- self.PAD_TAG: self.PAD
- }
- self.fited = False
- self.count = {}
-
- def to_index(self, word):
- return self.dict.get(word, self.UNK)
-
- def to_word(self, index):
- if index in self.inversed_dict:
- return self.inversed_dict[index]
- return self.UNK_TAG
-
- def __len__(self):
- return len(self.dict)
-
- def fit(self, sentence):
- for word in sentence:
- self.count[word] = self.count.get(word, 0) + 1
-
- def build_vocab(self, min_count=None, max_count=None, max_feature=None):
- if min_count is not None:
- self.count = {word: count for word, count in self.count.items() if count >= min_count}
-
- if max_count is not None:
- self.count = {word: count for word, count in self.count.items() if count <= max_count}
-
- if max_feature is not None:
- self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_feature])
-
- for word in self.count:
- self.dict[word] = len(self.dict)
-
- self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
-
- def transform(self, sentence, max_len=None):
- if max_len is not None:
- r = [self.PAD] * max_len
- else:
- r = [self.PAD] * len(sentence)
- if max_len is not None and len(sentence) > max_len:
- sentence = sentence[:max_len]
- for index, word in enumerate(sentence):
- r[index] = self.to_index(word)
- return np.array(r, dtype=np.int64)
-
- def inverse_transform(self, indices):
- sentence = []
- for i in indices:
- word = self.to_word(i)
- sentence.append(word)
- return sentence
-
-
- # 建立词表
- def fit_save_word_sequence():
- word_to_sequence = Word2Sequence()
- train_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
- total_file_path_list = []
- for i in train_path:
- total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
- for cur_path in tqdm(total_file_path_list, ascii=True, desc="fitting"):
- word_to_sequence.fit(tokenize(open(cur_path, encoding="utf-8").read().strip()))
- word_to_sequence.build_vocab()
- pickle.dump(word_to_sequence, open("model/ws.pkl", "wb"))
-
-
- ws = pickle.load(open("./model/ws.pkl", "rb"))
-
-
- # print(len(ws))
- # 模型
- class IMDBModel(nn.Module):
- def __init__(self):
- super(IMDBModel, self).__init__()
- self.hidden_size = 64
- self.embedding_dim = 200
- self.num_layer = 2
- self.bidirectional = True
- self.bi_num = 2 if self.bidirectional else 1
- self.dropout = 0.5
- # 以上部分为超参数,可以自行修改
- self.embedding = nn.Embedding(len(ws), self.embedding_dim, padding_idx=ws.PAD)
- self.lstm = nn.LSTM(self.embedding_dim, self.hidden_size,
- self.num_layer, bidirectional=True, dropout=self.dropout)
- self.fc = nn.Linear(self.hidden_size * self.bi_num, 20)
- self.fc2 = nn.Linear(20, 10)
-
- def forward(self, x):
- x = self.embedding(x)
- x = x.permute(1, 0, 2) # 进行轴交换
- h_0, c_0 = self.init_hidden_state(x.size(1))
- _, (h_n, c_n) = self.lstm(x, (h_0, c_0))
- # 只要最后一个lstm单元处理的结果,取前向LSTM和后向LSTM的结果进行简单拼接
- out = torch.cat([h_n[-2, :, :], h_n[-1, :, :]], dim=-1)
- out = self.fc(out)
- out = F.relu(out)
- out = self.fc2(out)
- return F.log_softmax(out, dim=-1)
-
- def init_hidden_state(self, batch_size):
- h_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
- c_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
- return h_0, c_0
-
-
- imdb_model = IMDBModel()
- optimizer = optim.Adam(imdb_model.parameters())
- criterion = nn.CrossEntropyLoss()
-
-
- def train(epoch):
- mode = True
- train_dataloader = get_dataloader(mode)
- for idx, (target, input) in enumerate(train_dataloader):
- optimizer.zero_grad()
- output = imdb_model(input)
- loss = F.nll_loss(output, target)
- loss.backward()
- optimizer.step()
- if idx % 10 == 0:
- print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- epoch, idx * len(input), len(train_dataloader.dataset),
- 100. * idx / len(train_dataloader), loss.item()))
- torch.save(imdb_model.state_dict(), "model/mnist_net_lstm.pkl")
- torch.save(optimizer.state_dict(), 'model/mnist_optimizer_lstm.pkl')
-
-
- def test():
- test_loss = 0
- correct = 0
- mode = False
- imdb_model.eval()
- test_dataloader = get_dataloader(mode)
- with torch.no_grad():
- for target, input in test_dataloader:
- output = imdb_model(input)
- test_loss += F.nll_loss(output, target, reduction="sum")
- pred = torch.max(output, dim=-1, keepdim=False)[-1]
- correct += pred.eq(target.data).sum()
- test_loss = test_loss / len(test_dataloader.dataset)
- print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
- test_loss, correct, len(test_dataloader.dataset),
- 100. * correct / len(test_dataloader.dataset)))
-
-
- if __name__ == '__main__':
- # # 测试数据集的功能
- # dataset = ImdbDataset(mode="train")
- # dataloader = DataLoader(dataset=dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
- # for idx, (label, text) in enumerate(dataloader):
- # print("idx:", idx)
- # print("lable:", label)
- # print("text:", text)
- # break
-
- # 测试Word2Sequence
- # fit_save_word_sequence()
- # w2s = Word2Sequence()
- # voc = [["你", "好", "么"],
- # ["你", "好", "哦"]]
- # for i in voc:
- # w2s.fit(i)
- # w2s.build_vocab()
- # print(w2s.dict)
- # print(w2s.transform(["你", "好", "嘛"]))
-
- # 训练和测试
- test()
- for i in range(3):
- train(i)
- print(
- "训练第{}轮的测试结果-----------------------------------------------------------------------------------------".format(
- i + 1))
- test()

以下是在我的电脑中,仅仅只使用CPU的情况下跑得的结果,训练和测试都比较慢,其中LSTM的效果不理想,可能是数据集不足以支撑起10分类。
1.没有训练,直接测试:精确率大约在15%左右
2.全连接神经网络:精确率大约在25%左右
3.LSTM:精确率大约在35%左右
结果仅供参考
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。