赞
踩
随着深度学习技术的持续发展,神经机器翻译已经成为自然语言处理领域的重要研究方向。在众多的神经机器翻译模型中,基于Transformer的模型因其出色的性能和广泛的应用前景而备受关注。本文将重点介绍如何使用Python和PyTorch实现基于Transformer的机器翻译。
在实现基于Transformer的神经机器翻译之前,需要进行数据准备工作。这包括数据集的下载、预处理和分词等步骤。在本文中,我们将使用从JParaCrawl下载的日语-英语并行数据集。该数据集被描述为由NTT创建的最大的公开可用的英语-日语并行语料库,主要通过网络抓取和自动句对齐创建。
- import math
- import torchtext
- import torch
- import torch.nn as nn
- from torch import Tensor
- from torch.nn.utils.rnn import pad_sequence
- from torch.utils.data import DataLoader
- from collections import Counter
- from torchtext.vocab import Vocab
- from torch.nn import TransformerEncoder, TransformerDecoder, TransformerEncoderLayer, TransformerDecoderLayer
- import io
- import time
- import pandas as pd
- import numpy as np
- import pickle
- import tqdm
- import sentencepiece as spm
- torch.manual_seed(0)
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
- #print(torch.cuda.get_device_name(0))
-
- df = pd.read_csv('zh-ja.bicleaner05.txt', sep='\\t', engine='python', header=None)
- trainen = df[2].values.tolist()#[:10000]
- trainja = df[3].values.tolist()#[:10000]
- # trainen.pop(5972)
- # trainja.pop(5972)
-
- print(trainen[500])
- print(trainja[500])

代码运行结果:
输出语料库中索引500的元素
与英语或其他按字母顺序排列的语言不同,日语句子不包含空格来分隔单词。我们可以使用JParaCrawl提供的标记器,它是使用sentencepece为日语和英语创建的,能够为日语和英语文本生成合适的分词单元。
- en_tokenizer = spm.SentencePieceProcessor(model_file='enja_spm_models/spm.en.nopretok.model')
- ja_tokenizer = spm.SentencePieceProcessor(model_file='enja_spm_models/spm.ja.nopretok.model')
-
- en_tokenizer.encode("All residents aged 20 to 59 years who live in Japan must enroll in public pension system.", out_type='str')
-
- ja_tokenizer.encode("年金 日本に住んでいる20歳~60歳の全ての人は、公的年金制度に加入しなければなりません。", out_type='str')
代码运行结果:
使用标记器和原始句子,然后构建一个从TorchText导入的Vocab对象。根据数据集的大小和计算能力,这个过程可能需要几秒钟或几分钟的时间。不同的标记器也会影响构建词汇表所需的时间。一旦我们获得了词汇表对象,我们可以利用它和标记器对象来为训练数据创建张量。
- def build_vocab(sentences, tokenizer):#使用给定的tokenizer从句子列表中构建词汇表。
- counter = Counter()
- for sentence in sentences:
- counter.update(tokenizer.encode(sentence, out_type=str))
- return Vocab(counter, specials=['<unk>', '<pad>', '<bos>', '<eos>']) # 构建日语和英语的词汇表
- ja_vocab = build_vocab(trainja, ja_tokenizer)
- en_vocab = build_vocab(trainen, en_tokenizer)
-
- def data_process(ja, en):
- data = []
- for (raw_ja, raw_en) in zip(ja, en):
- ja_tensor_ = torch.tensor([ja_vocab[token] for token in ja_tokenizer.encode(raw_ja.rstrip("\n"), out_type=str)],
- dtype=torch.long) # 将日语句子转换为张量
- en_tensor_ = torch.tensor([en_vocab[token] for token in en_tokenizer.encode(raw_en.rstrip("\n"), out_type=str)],
- dtype=torch.long) # 将英语句子转换为张量
- data.append((ja_tensor_, en_tensor_)) # 将处理后的日语和英语张量对添加到数据列表中
- return data
- train_data = data_process(trainja, trainen) # 使用数据处理函数处理训练集数据

下面,我们定义如何生成训练批次数据,并使用这些数据创建了一个迭代器,以便在模型训练过程中使用。可以根据个人设备的内存容量,数据大小来随意更改批大小。
- BATCH_SIZE = 8 # 批处理大小
- PAD_IDX = ja_vocab['<pad>'] # 获取日语词汇表中的填充索引
- BOS_IDX = ja_vocab['<bos>'] # 获取日语词汇表中的起始标记索引
- EOS_IDX = ja_vocab['<eos>'] # 获取日语词汇表中的结束标记索引
-
- def generate_batch(data_batch):
- ja_batch, en_batch = [], []
- for (ja_item, en_item) in data_batch:
- # 在日语句子张量的开头和结尾添加起始标记和结束标记
- ja_batch.append(torch.cat([torch.tensor([BOS_IDX]), ja_item, torch.tensor([EOS_IDX])], dim=0))
- # 在英语句子张量的开头和结尾添加起始标记和结束标记
- en_batch.append(torch.cat([torch.tensor([BOS_IDX]), en_item, torch.tensor([EOS_IDX])], dim=0))
-
- # 对日语句子张量和英语句子张量进行填充,使它们在批次中具有相同的长度
- ja_batch = pad_sequence(ja_batch, padding_value=PAD_IDX)
- en_batch = pad_sequence(en_batch, padding_value=PAD_IDX)
-
- return ja_batch, en_batch
-
- # 使用 DataLoader 创建训练数据迭代器,每次生成一个批次数据
- train_iter = DataLoader(train_data, batch_size=BATCH_SIZE,
- shuffle=True, collate_fn=generate_batch)

Transformer是一种Seq2Seq模型,最初在“Attention is all you need”论文中提出,用于解决机器翻译任务。Transformer模型由编码器和解码器块组成,每个块包含固定数量的层。
编码器通过一系列的多头注意力和前馈网络层对输入序列进行处理。编码器的输出称为“记忆”,将其与目标张量一起馈送到解码器。编码器和解码器在训练时采用端到端的教师强制技术。
下面,我们定义一个基于Transformer的序列到序列(Seq2Seq)模型,用于处理序列数据的翻译或转换任务。该模型由编码器(encoder)和解码器(decoder)组成,它们共享相同的维度和一些层的参数。
- from torch.nn import (TransformerEncoder, TransformerDecoder,
- TransformerEncoderLayer, TransformerDecoderLayer)
-
-
- class Seq2SeqTransformer(nn.Module):
- def __init__(self, num_encoder_layers: int, num_decoder_layers: int,
- emb_size: int, src_vocab_size: int, tgt_vocab_size: int,
- dim_feedforward:int = 512, dropout:float = 0.1): # 初始化编码器层和解码器层
- super(Seq2SeqTransformer, self).__init__()
- encoder_layer = TransformerEncoderLayer(d_model=emb_size, nhead=NHEAD,
- dim_feedforward=dim_feedforward)
- self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
- decoder_layer = TransformerDecoderLayer(d_model=emb_size, nhead=NHEAD,
- dim_feedforward=dim_feedforward)
- self.transformer_decoder = TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)
-
- self.generator = nn.Linear(emb_size, tgt_vocab_size) # 线性层用于生成输出词汇
- self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size) # 源语言和目标语言的词嵌入
- self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
- self.positional_encoding = PositionalEncoding(emb_size, dropout=dropout)
-
- def forward(self, src: Tensor, trg: Tensor, src_mask: Tensor,
- tgt_mask: Tensor, src_padding_mask: Tensor,
- tgt_padding_mask: Tensor, memory_key_padding_mask: Tensor):
- src_emb = self.positional_encoding(self.src_tok_emb(src)) # 对输入的源语言和目标语言进行位置编码
- tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
- memory = self.transformer_encoder(src_emb, src_mask, src_padding_mask) # 编码阶段:通过编码器编码源语言序列
- outs = self.transformer_decoder(tgt_emb, memory, tgt_mask, None,
- tgt_padding_mask, memory_key_padding_mask)
- return self.generator(outs)# 通过线性层生成最终的输出词汇分布
-
- def encode(self, src: Tensor, src_mask: Tensor):
- return self.transformer_encoder(self.positional_encoding(
- self.src_tok_emb(src)), src_mask)
-
- def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
- return self.transformer_decoder(self.positional_encoding(
- self.tgt_tok_emb(tgt)), memory,
- tgt_mask)
-
- class PositionalEncoding(nn.Module):
- def __init__(self, emb_size: int, dropout, maxlen: int = 5000):
- super(PositionalEncoding, self).__init__()
- den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size) # 计算正弦和余弦函数的参数
- pos = torch.arange(0, maxlen).reshape(maxlen, 1) # 创建位置编码矩阵
- pos_embedding = torch.zeros((maxlen, emb_size))
- pos_embedding[:, 0::2] = torch.sin(pos * den)
- pos_embedding[:, 1::2] = torch.cos(pos * den)
- pos_embedding = pos_embedding.unsqueeze(-2)
-
- self.dropout = nn.Dropout(dropout) # 初始化dropout层
- self.register_buffer('pos_embedding', pos_embedding)
-
- def forward(self, token_embedding: Tensor): # 在输入的token嵌入上应用位置编码并使用dropout
- return self.dropout(token_embedding +
- self.pos_embedding[:token_embedding.size(0),:])
-
- class TokenEmbedding(nn.Module):
- def __init__(self, vocab_size: int, emb_size):
- super(TokenEmbedding, self).__init__()
- self.embedding = nn.Embedding(vocab_size, emb_size)
- self.emb_size = emb_size
- def forward(self, tokens: Tensor): # 返回通过词嵌入层的嵌入向量,并乘以sqrt(嵌入维度)进行缩放
- return self.embedding(tokens.long()) * math.sqrt(self.emb_size)
-
- def generate_square_subsequent_mask(sz):
- mask = (torch.triu(torch.ones((sz, sz), device=device)) == 1).transpose(0, 1) # 创建一个上三角矩阵,其对角线及以下元素为1,其余为0
- mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0)) # 将mask转换为浮点型,并用-inf填充所有0元素,用0.0填充所有1元素
- return mask
-
- def create_mask(src, tgt):
- src_seq_len = src.shape[0]
- tgt_seq_len = tgt.shape[0]
-
- tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
- src_mask = torch.zeros((src_seq_len, src_seq_len), device=device).type(torch.bool) # 创建一个全零的源序列mask,数据类型为bool类型
-
- src_padding_mask = (src == PAD_IDX).transpose(0, 1) # 创建源序列和目标序列的padding mask,PAD_IDX是指定的填充标识符
- tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
- return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask
-
- SRC_VOCAB_SIZE = len(ja_vocab)
- TGT_VOCAB_SIZE = len(en_vocab)
- EMB_SIZE = 512
- NHEAD = 8
- FFN_HID_DIM = 512
- BATCH_SIZE = 16
- NUM_ENCODER_LAYERS = 3
- NUM_DECODER_LAYERS = 3
- NUM_EPOCHS = 16
- transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS,
- EMB_SIZE, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE,
- FFN_HID_DIM)
-
- for p in transformer.parameters(): # 初始化Transformer模型中维度大于1的参数,采用Xavier均匀分布初始化方法
- if p.dim() > 1:
- nn.init.xavier_uniform_(p)
-
- transformer = transformer.to(device)
-
- loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX) # 定义交叉熵损失函数,忽略填充标识符PAD_IDX
-
- optimizer = torch.optim.Adam(
- transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9
- ) # 定义Adam优化器,用于更新Transformer模型的参数
- def train_epoch(model, train_iter, optimizer): # 将模型设置为训练模式
- model.train()
- losses = 0
- for idx, (src, tgt) in enumerate(train_iter):
- src = src.to(device)
- tgt = tgt.to(device)
-
- tgt_input = tgt[:-1, :]
-
- src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
-
- logits = model(src, tgt_input, src_mask, tgt_mask,
- src_padding_mask, tgt_padding_mask, src_padding_mask) # 前向传播,计算logits
-
- optimizer.zero_grad() # 梯度清零
-
- tgt_out = tgt[1:,:] # 获取目标序列的输出数据
- loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
- loss.backward()
-
- optimizer.step()
- losses += loss.item()
- return losses / len(train_iter)
-
-
- def evaluate(model, val_iter):
- model.eval() # 将模型设置为评估模式
- losses = 0
- for idx, (src, tgt) in (enumerate(valid_iter)):
- src = src.to(device)
- tgt = tgt.to(device) # 去掉目标序列最后一个位置的数据作为输入
-
- tgt_input = tgt[:-1, :]
-
- src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
-
- logits = model(src, tgt_input, src_mask, tgt_mask,
- src_padding_mask, tgt_padding_mask, src_padding_mask) # 前向传播,计算logits
- tgt_out = tgt[1:,:]
- loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
- losses += loss.item() # 累计损失值
- return losses / len(val_iter)

上面的代码基于Transformer的序列到序列模型,包括模型初始化、参数设定、设备分配、损失函数和优化器的定义,以及训练与评估过程的实现。
在构建Seq2SeqTransformer模型时,首先定义了一系列关键的超参数。这些超参数包括源语言和目标语言的词汇表大小SRCVOCABSIZE和TGTVOCABSIZE,嵌入层的维度EMBSIZE,多头注意力机制的头数NHEAD,前馈网络隐藏层的维度FFNHIDDIM,批次大小BATCHSIZE,以及编码器和解码器的层数NUMENCODERLAYERS和NUMDECODERLAYERS,同时设置了训练轮数NUM_EPOCHS。接着,基于这些超参数配置,创建了Transformer模型的实例。为了确保训练过程中的梯度流动顺畅,所有模型参数都采用了Xavier均匀初始化方法。随后,模型被分配到特定的设备上,最好是GPU,以实现高效计算。
在准备训练阶段,我们定义了交叉熵损失函数lossfn。特别地,该损失函数能够忽略填充索引PADIDX,以避免在计算损失时对填充部分进行惩罚。此外,还创建了Adam优化器optimizer,用于在模型训练过程中更新参数。
训练过程由trainepoch函数实现。该函数将模型设置为训练模式,并循环遍历训练迭代器trainiter中的每个epoch。在每个批次处理中,源语言和目标语言的数据被送至设备,生成目标序列的相应输入(不包括序列的最后一个时间步),同时创建了模型所需的掩码。模型根据此生成预测输出logits,然后执行梯度清零、损失计算、反向传播和参数更新,最后返回该epoch的平均损失。
模型评估通过evaluate函数完成。在评估模式下,模型处理验证迭代器val_iter上的数据,计算损失,但不进行反向传播和参数更新。最终,我们得到验证集的平均损失,以评估模型的性能。
最终,一旦我们准备好必要的类和函数,便可以开始模型训练。显然,训练所需的时间会因计算能力、模型参数和数据集大小等因素而有所不同。
- for epoch in tqdm.tqdm(range(1,NUM_EPOCHS+1)):
- start_time=time.time()
- train_loss=train_epoch(transformer,train_iter,optimizer)
- end_time= time.time()
- print((f"Epoch:{epoch},Trainloss:{train_loss:.3f},"f"Epoch time={(end_time-start_time):.3f}s"))
在这部分,我们创建翻译新句子的函数,包括获取日语句子、标记化、转换为张量、推理,然后将结果解码回句子,但这次是英语。然后,我们可以调用翻译函数并传递所需的参数。
- def greedy_decode(model, src, src_mask, max_len, start_symbol):
- src = src.to(device) # 将源序列和源序列mask移动到指定设备上(如GPU)
- src_mask = src_mask.to(device)
- memory = model.encode(src, src_mask) # 使用编码器对源序列进行编码,得到记忆(memory)
- ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(device) # 初始化目标序列,以起始符号填充
- for i in range(max_len-1):
- memory = memory.to(device)
- memory_mask = torch.zeros(ys.shape[0], memory.shape[0]).to(device).type(torch.bool) # 创建目标序列的mask
- tgt_mask = (generate_square_subsequent_mask(ys.size(0))
- .type(torch.bool)).to(device)
- out = model.decode(ys, memory, tgt_mask)
- out = out.transpose(0, 1)
- prob = model.generator(out[:, -1]) # 通过生成器获取概率最大的下一个单词
- _, next_word = torch.max(prob, dim = 1)
- next_word = next_word.item()
- ys = torch.cat([ys,
- torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
- if next_word == EOS_IDX: # 如果预测到终止符号,停止解码
- break
- return ys
- def translate(model, src, src_vocab, tgt_vocab, src_tokenizer):
- model.eval()
- tokens = [BOS_IDX] + [src_vocab.stoi[tok] for tok in src_tokenizer.encode(src, out_type=str)]+ [EOS_IDX]
- num_tokens = len(tokens)
- src = (torch.LongTensor(tokens).reshape(num_tokens, 1) )
- src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
- tgt_tokens = greedy_decode(model, src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()
- return " ".join([tgt_vocab.itos[tok] for tok in tgt_tokens]).replace("<bos>", "").replace("<eos>", "")
-
- translate(transformer, "HSコード 8515 はんだ付け用、ろう付け用又は溶接用の機器(電気式(電気加熱ガス式を含む。)", ja_vocab, en_vocab, ja_tokenizer)

最后,在完成训练后,我们首先会使用Pickle保存Vocab对象(envocab和javocab)。此外,我们还可以利用PyTorch的保存和加载功能将模型保存以备将来使用。通常情况下,有两种保存模型的方法,具体取决于我们未来使用模型的目的。第一种方法仅用于推理阶段,我们可以随后加载模型并将其用于从日语翻译成英语。第二种方法同样适用于推理,但当我们希望稍后加载模型并恢复训练时,也同样有效。
- import pickle
- # 打开想要存储数据的文件
- file = open('en_vocab.pkl', 'wb')
- #将信息转储到该文件
- pickle.dump(en_vocab, file)
- file.close()
- file = open('ja_vocab.pkl', 'wb')
- pickle.dump(ja_vocab, file)
- file.close()#关闭文件
-
- # save model for inference
- torch.save(transformer.state_dict(), 'inference_model')
-
- # save model + checkpoint to resume training later
- torch.save({
- 'epoch': NUM_EPOCHS,
- 'model_state_dict': transformer.state_dict(),
- 'optimizer_state_dict': optimizer.state_dict(),
- 'loss': train_loss,
- }, 'model_checkpoint.tar')# 使用torch.save函数将以下内容保存到'model_checkpoint.tar'文件中

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。