赞
踩
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
大家好,今天要来完成task3的学习笔记,应该也是最后一个了,task4有点拖我自己制定的学习计划的进度了,所以就先记录到这里,之后有时间可能会补充。task3官方链接
基于循环或卷积神经网络的序列到序列建模方法是现存机器翻译任务中的经典方法。然而,它们在建模文本长程依赖方面都存在一定的局限性。
Transformer 在原论文中第一次提出就是将其应用到机器翻译领域,它的出现使得机器翻译的性能和效率迈向了一个新的阶段。它摒弃了循环结构,并完全通过注意力机制完成对源语言序列和目标语言序列全局依赖的建模。在抽取每个单词的上下文特征时,Transformer 通过自注意力机制(self-attention)衡量上下文中每一个单词对当前单词的重要程度。
在这个过程当中没有任何的循环单元参与计算。这种高度可并行化的编码过程使得模型的运行变得十分高效。当前几乎大部分的大语言模型都是基于 Transformer 结构,本节以应用于机器翻译的基于Transformer 的编码器和解码器介绍该模型。
Transformer的主要组件包括编码器(Encoder)、解码器(Decoder)和注意力层。其核心是利用多头自注意力机制(Multi-Head Self-Attention),使每个位置的表示不仅依赖于当前位置,还能够直接获取其他位置的表示。
详细可参考【超详细】【原理篇&实战篇】一文读懂Transformer
Transformer的编码组件Encoder block是由6个encoder堆叠而成,一个encoder是由多头注意力Multi-Head Attention和全连接前馈神经网络Feed Forward Network构成。
从编码器输入的句子首先会经过一个自注意力层,这一层帮助编码器在对每个单词编码的时候时刻关注句子的其它单词。
解码组件Decoder block也是由6个decoder堆叠而成,一个decoder是由掩码多头注意力Masked Multi-Head Attention、多头注意力Multi-Head Attension和前馈神经网络Feed Forward Network构成。
为了得到不同位置对应的编码,Transformer 模型使用不同频率的正余弦函数如下所示:
其中,
p
o
s
pos
pos表示单词所在的位置,
2
i
2i
2i 和
2
i
+
1
2i+1
2i+1 表示位置编码向量中的对应维度,
d
d
d则对应位置编码的总维度。
首先,正余弦函数的范围在[-1,1],位置编码与原词嵌入相加不会使结果偏离太远而导致破环语义信息。
# 定义tokenizer en_tokenizer = get_tokenizer('spacy', language='en_core_web_trf') zh_tokenizer = lambda x: list(jieba.cut(x)) # 使用jieba分词 # 读取数据函数 def read_data(file_path: str) -> List[str]: with open(file_path, 'r', encoding='utf-8') as f: return [line.strip() for line in f] # 数据预处理函数 def preprocess_data(en_data: List[str], zh_data: List[str]) -> List[Tuple[List[str], List[str]]]: processed_data = [] for en, zh in zip(en_data, zh_data): en_tokens = en_tokenizer(en.lower())[:MAX_LENGTH] zh_tokens = zh_tokenizer(zh)[:MAX_LENGTH] if en_tokens and zh_tokens: # 确保两个序列都不为空 processed_data.append((en_tokens, zh_tokens)) return processed_data # 构建词汇表 def build_vocab(data: List[Tuple[List[str], List[str]]]): en_vocab = build_vocab_from_iterator( (en for en, _ in data), specials=['<unk>', '<pad>', '<bos>', '<eos>'] ) zh_vocab = build_vocab_from_iterator( (zh for _, zh in data), specials=['<unk>', '<pad>', '<bos>', '<eos>'] ) en_vocab.set_default_index(en_vocab['<unk>']) zh_vocab.set_default_index(zh_vocab['<unk>']) return en_vocab, zh_vocab class TranslationDataset(Dataset): def __init__(self, data: List[Tuple[List[str], List[str]]], en_vocab, zh_vocab): self.data = data self.en_vocab = en_vocab self.zh_vocab = zh_vocab def __len__(self): return len(self.data) def __getitem__(self, idx): en, zh = self.data[idx] en_indices = [self.en_vocab['<bos>']] + [self.en_vocab[token] for token in en] + [self.en_vocab['<eos>']] zh_indices = [self.zh_vocab['<bos>']] + [self.zh_vocab[token] for token in zh] + [self.zh_vocab['<eos>']] return en_indices, zh_indices def collate_fn(batch): en_batch, zh_batch = [], [] for en_item, zh_item in batch: if en_item and zh_item: # 确保两个序列都不为空 # print("都不为空") en_batch.append(torch.tensor(en_item)) zh_batch.append(torch.tensor(zh_item)) else: print("存在为空") if not en_batch or not zh_batch: # 如果整个批次为空,返回空张量 return torch.tensor([]), torch.tensor([]) # src_sequences = [item[0] for item in batch] # trg_sequences = [item[1] for item in batch] en_batch = nn.utils.rnn.pad_sequence(en_batch, batch_first=True, padding_value=en_vocab['<pad>']) zh_batch = nn.utils.rnn.pad_sequence(zh_batch, batch_first=True, padding_value=zh_vocab['<pad>']) # en_batch = pad_sequence(en_batch, batch_first=True, padding_value=en_vocab['<pad>']) # zh_batch = pad_sequence(zh_batch, batch_first=True, padding_value=zh_vocab['<pad>']) return en_batch, zh_batch # 数据加载函数 def load_data(train_path: str, dev_en_path: str, dev_zh_path: str, test_en_path: str): # 读取训练数据 train_data = read_data(train_path) train_en, train_zh = zip(*(line.split('\t') for line in train_data)) # 读取开发集和测试集 dev_en = read_data(dev_en_path) dev_zh = read_data(dev_zh_path) test_en = read_data(test_en_path) # 预处理数据 train_processed = preprocess_data(train_en, train_zh) dev_processed = preprocess_data(dev_en, dev_zh) test_processed = [(en_tokenizer(en.lower())[:MAX_LENGTH], []) for en in test_en if en.strip()] # 构建词汇表 global en_vocab, zh_vocab en_vocab, zh_vocab = build_vocab(train_processed) # 创建数据集 train_dataset = TranslationDataset(train_processed, en_vocab, zh_vocab) dev_dataset = TranslationDataset(dev_processed, en_vocab, zh_vocab) test_dataset = TranslationDataset(test_processed, en_vocab, zh_vocab) from torch.utils.data import Subset # 假设你有10000个样本,你只想用前1000个样本进行测试 indices = list(range(N)) train_dataset = Subset(train_dataset, indices) # 创建数据加载器 train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn, drop_last=True) dev_loader = DataLoader(dev_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn, drop_last=True) test_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, drop_last=True) return train_loader, dev_loader, test_loader, en_vocab, zh_vocab
模型构建
class PositionalEncoding(nn.Module): def __init__(self, d_model, dropout=0.1, max_len=5000): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:x.size(0), :] return self.dropout(x) class TransformerModel(nn.Module): def __init__(self, src_vocab, tgt_vocab, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout): super(TransformerModel, self).__init__() self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout) self.src_embedding = nn.Embedding(len(src_vocab), d_model) self.tgt_embedding = nn.Embedding(len(tgt_vocab), d_model) self.positional_encoding = PositionalEncoding(d_model, dropout) self.fc_out = nn.Linear(d_model, len(tgt_vocab)) self.src_vocab = src_vocab self.tgt_vocab = tgt_vocab self.d_model = d_model def forward(self, src, tgt): # 调整src和tgt的维度 src = src.transpose(0, 1) # (seq_len, batch_size) tgt = tgt.transpose(0, 1) # (seq_len, batch_size) src_mask = self.transformer.generate_square_subsequent_mask(src.size(0)).to(src.device) tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(0)).to(tgt.device) src_padding_mask = (src == self.src_vocab['<pad>']).transpose(0, 1) tgt_padding_mask = (tgt == self.tgt_vocab['<pad>']).transpose(0, 1) src_embedded = self.positional_encoding(self.src_embedding(src) * math.sqrt(self.d_model)) tgt_embedded = self.positional_encoding(self.tgt_embedding(tgt) * math.sqrt(self.d_model)) output = self.transformer(src_embedded, tgt_embedded, src_mask, tgt_mask, None, src_padding_mask, tgt_padding_mask, src_padding_mask) return self.fc_out(output).transpose(0, 1) def initialize_model(src_vocab, tgt_vocab, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1): model = TransformerModel(src_vocab, tgt_vocab, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout) return model
训练
# 定义优化器 def initialize_optimizer(model, learning_rate=0.001): return optim.Adam(model.parameters(), lr=learning_rate) # 运行时间 def epoch_time(start_time, end_time): elapsed_time = end_time - start_time elapsed_mins = int(elapsed_time / 60) elapsed_secs = int(elapsed_time - (elapsed_mins * 60)) return elapsed_mins, elapsed_secs def train(model, iterator, optimizer, criterion, clip): model.train() epoch_loss = 0 for i, batch in enumerate(iterator): src, tgt = batch if src.numel() == 0 or tgt.numel() == 0: continue src, tgt = src.to(DEVICE), tgt.to(DEVICE) optimizer.zero_grad() output = model(src, tgt[:, :-1]) output_dim = output.shape[-1] output = output.contiguous().view(-1, output_dim) tgt = tgt[:, 1:].contiguous().view(-1) loss = criterion(output, tgt) loss.backward() clip_grad_norm_(model.parameters(), clip) optimizer.step() epoch_loss += loss.item() return epoch_loss / len(iterator) def evaluate(model, iterator, criterion): model.eval() epoch_loss = 0 with torch.no_grad(): for i, batch in enumerate(iterator): src, tgt = batch if src.numel() == 0 or tgt.numel() == 0: continue src, tgt = src.to(DEVICE), tgt.to(DEVICE) output = model(src, tgt[:, :-1]) output_dim = output.shape[-1] output = output.contiguous().view(-1, output_dim) tgt = tgt[:, 1:].contiguous().view(-1) loss = criterion(output, tgt) epoch_loss += loss.item() return epoch_loss / len(iterator) def translate_sentence(src_indexes, src_vocab, tgt_vocab, model, device, max_length=50): model.eval() src_tensor = src_indexes.unsqueeze(0).to(device) # 添加批次维度 with torch.no_grad(): encoder_outputs = model.transformer.encoder(model.positional_encoding(model.src_embedding(src_tensor) * math.sqrt(model.d_model))) trg_indexes = [tgt_vocab['<bos>']] for i in range(max_length): trg_tensor = torch.LongTensor(trg_indexes).unsqueeze(0).to(device) with torch.no_grad(): output = model(src_tensor, trg_tensor) pred_token = output.argmax(2)[:, -1].item() trg_indexes.append(pred_token) if pred_token == tgt_vocab['<eos>']: break trg_tokens = [tgt_vocab.get_itos()[i] for i in trg_indexes] return trg_tokens[1:-1] # 移除<bos>和<eos>标记 def calculate_bleu(dev_loader, src_vocab, tgt_vocab, model, device): model.eval() translations = [] references = [] with torch.no_grad(): for src, tgt in dev_loader: src = src.to(device) for sentence in src: translated = translate_sentence(sentence, src_vocab, tgt_vocab, model, device) translations.append(' '.join(translated)) for reference in tgt: ref_tokens = [tgt_vocab.get_itos()[idx] for idx in reference if idx not in [tgt_vocab['<bos>'], tgt_vocab['<eos>'], tgt_vocab['<pad>']]] references.append([' '.join(ref_tokens)]) bleu = sacrebleu.corpus_bleu(translations, references) return bleu.score # 主训练循环 def train_model(model, train_iterator, valid_iterator, optimizer, criterion, N_EPOCHS=10, CLIP=1, save_path = '../model/best-model_transformer.pt'): best_valid_loss = float('inf') for epoch in range(N_EPOCHS): start_time = time.time() #print(f"Starting Epoch {epoch + 1}") train_loss = train(model, train_iterator, optimizer, criterion, CLIP) valid_loss = evaluate(model, valid_iterator, criterion) end_time = time.time() epoch_mins, epoch_secs = epoch_time(start_time, end_time) if valid_loss < best_valid_loss: best_valid_loss = valid_loss torch.save(model.state_dict(), save_path) print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s') print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}') print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')
我们把训练集数据设为148363,循环次数设为25,多了损失不再下降了
# 定义常量 MAX_LENGTH = 100 # 最大句子长度 BATCH_SIZE = 32 DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') N = 148363 # 采样训练集的数量,最多148363 train_path = '../dataset/train.txt' dev_en_path = '../dataset/dev_en.txt' dev_zh_path = '../dataset/dev_zh.txt' test_en_path = '../dataset/test_en.txt' train_loader, dev_loader, test_loader, en_vocab, zh_vocab = load_data( train_path, dev_en_path, dev_zh_path, test_en_path ) print(f"英语词汇表大小: {len(en_vocab)}") print(f"中文词汇表大小: {len(zh_vocab)}") print(f"训练集大小: {len(train_loader.dataset)}") print(f"开发集大小: {len(dev_loader.dataset)}") print(f"测试集大小: {len(test_loader.dataset)}") # 主函数 if __name__ == '__main__': # 模型参数 D_MODEL = 256 NHEAD = 8 NUM_ENCODER_LAYERS = 3 NUM_DECODER_LAYERS = 3 DIM_FEEDFORWARD = 512 DROPOUT = 0.1 N_EPOCHS = 5 CLIP = 1 # 初始化模型 model = initialize_model(en_vocab, zh_vocab, D_MODEL, NHEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, DIM_FEEDFORWARD, DROPOUT).to(DEVICE) print(f'The model has {sum(p.numel() for p in model.parameters() if p.requires_grad):,} trainable parameters') # 定义损失函数 criterion = nn.CrossEntropyLoss(ignore_index=zh_vocab['<pad>']) # 初始化优化器 optimizer = optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9) # 训练模型 save_path = '../model/best-model_transformer.pt' train_model(model, train_loader, dev_loader, optimizer, criterion, N_EPOCHS, CLIP, save_path=save_path) print(f"训练完成!模型已保存到:{save_path}")
测一下BLEU-4评分
对测试集进行翻译
# 加载最佳模型
model.load_state_dict(torch.load('../model/best-model_transformer.pt'))
save_dir = '../results/submit_task3.txt'
with open(save_dir, 'w') as f:
translated_sentences = []
for batch in test_loader: # 遍历所有数据
src, _ = batch
src = src.to(DEVICE)
translated = translate_sentence(src[0], en_vocab, zh_vocab, model, DEVICE) #翻译结果
results = "".join(translated)
f.write(results + '\n') # 将结果写入文件
print(f"翻译完成,结果已保存到{save_dir}")
让我们来看看结果,可以看到14万左右的数据,跑的时间是task2的十分之一。
BLEU-4的分数几乎是100倍。
最后的翻译结果也好了很多(这里并没有进行数据清洗)。
最后来看看讯飞的评分
也是比task2高了非常多。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。