赞
踩
一、数据预处理
1.定义预处理参数,文件data_util.py
- from keras.models import Model
- from keras.layers import Input,LSTM,Dense
- import numpy as np
- import pandas as pd
-
- num_samples = 100000
- # 定义路径
- question_path = 'question.txt'
- answer_path = 'answer.txt'
-
-
- max_encoder_seq_length = None
- max_decoder_seq_length = None
- num_encoder_tokens = None
- num_decoder_tokens = None
2.获取训练数据X, Y
- def get_xy_data():
- input_texts = []
- target_texts = []
- with open(question_path, 'r', encoding='utf-8') as f:
- input_texts = f.read().split('\n')
- input_texts = input_texts[:min(num_samples,len(input_texts)-1)]
- with open(answer_path, 'r', encoding='utf-8') as f:
- target_texts = ['\t' + line + '\n' for line in f.read().split('\n')]
- target_texts = target_texts[:min(num_samples,len(input_texts)-1)]
-
- return input_texts, target_texts
3.需要将Input数据向量化,这里根据Input数据X, Y获取字符词典
- def get_vocab_dict(X, Y):
- global max_encoder_seq_length, max_decoder_seq_length, num_encoder_tokens, num_decoder_tokens
- input_texts = X
- target_texts = Y
- input_characters = set()
- target_characters = set()
- for line in input_texts[:min(num_samples,len(input_texts)-1)]:
- for char in line:
- if char not in input_characters:
- input_characters.add(char)
- for line in target_texts[:min(num_samples,len(target_texts)-1)]:
- for char in line:
- if char not in target_characters:
- target_characters.add(char)
-
- input_characters = sorted(list(input_characters))
- target_characters = sorted(list(target_characters))
- num_encoder_tokens = len(input_characters)
- num_decoder_tokens = len(target_characters)
- max_encoder_seq_length = max([len(txt) for txt in input_texts])
- max_decoder_seq_length = max([len(txt) for txt in target_texts])
-
- print('Number of samples:', len(input_texts))
- print('Number of unique input tokens:', num_encoder_tokens)
- print('Number of unique output tokens:', num_decoder_tokens)
- print('Max sequence length for inputs:', max_encoder_seq_length)
- print('Max sequence length for outputs:', max_encoder_seq_length)
-
- input_token_index = dict(
- [(char,i) for i, char in enumerate(input_characters)])
- target_token_index = dict(
- [(char,i) for i, char in enumerate(target_characters)])
-
- return input_token_index, target_token_index

4.需要建立一个逆转词典,用于预测阶段将向量转化为可识别的字符
- def get_rev_dict(input_token_index, target_token_index):
- reverse_input_char_index = dict(
- (i, char) for char, i in input_token_index.items())
- reverse_target_char_index = dict(
- (i, char) for char, i in target_token_index.items())
- return reverse_input_char_index, reverse_target_char_index
二、训练
1.定义参数
- from keras.models import Model
- from keras.layers import Input,LSTM,Dense
- import numpy as np
- import pandas as pd
- import data_util
- from data_util import get_vocab_dict
- from data_util import get_xy_data
-
- # 定义超参数
- batch_size = 32
- epochs = 100
- latent_dim = 256
-
- input_texts = []
- target_texts = []
- input_token_index = []
- target_token_index = []
- encoder_input_data = None
- decoder_input_data = None
- decoder_target_data = None

2.调用预处理 data_util.py 得到训练数据和词典
- def data_deal():
- global encoder_input_data,decoder_input_data,decoder_target_data
- global input_texts, target_texts, input_token_index,target_token_index
- input_texts, target_texts = get_xy_data()
- input_token_index, target_token_index = get_vocab_dict(input_texts, target_texts)
-
- # 每个input_text句子都是一个二维矩阵,
- # 那么input_texts是多个二维矩阵组合的三维矩阵
- encoder_input_data = np.zeros(
- (len(input_texts), data_util.max_encoder_seq_length, len(input_token_index)),dtype='float32')
- decoder_input_data = np.zeros(
- (len(input_texts), data_util.max_decoder_seq_length, len(target_token_index)),dtype='float32')
- decoder_target_data = np.zeros(
- (len(input_texts), data_util.max_decoder_seq_length, len(target_token_index)),dtype='float32')
-
- for i, (input_text, target_text) in enumerate(zip(input_texts, target_texts)):
- for t, char in enumerate(input_text):
- encoder_input_data[i, t, input_token_index[char]] = 1
- for t, char in enumerate(target_text):
- decoder_input_data[i, t, target_token_index[char]] = 1
- if t > 0:
- decoder_target_data[i, t-1, target_token_index[char]] =1

3.建立seq2seq模型
- def build_model():
- global input_token_index,target_token_index
- encoder_inputs = Input(shape=(None, len(input_token_index)))
- encoder = LSTM(latent_dim, return_state=True)
- encoder_outputs, state_h, state_c = encoder(encoder_inputs)
- encoder_states = [state_h, state_c]
- decoder_inputs = Input(shape=(None, len(target_token_index)))
- decoder_lstm = LSTM(latent_dim, return_sequences=True,return_state=True)
- decoder_outputs, _, _ = decoder_lstm(decoder_inputs,
- initial_state=encoder_states)
- decoder_dense = Dense(len(target_token_index), activation='softmax')
- decoder_outputs = decoder_dense(decoder_outputs)
- model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
- # 新序列预测时需要的encoder
- encoder_model = Model(encoder_inputs, encoder_states)
- # 新序列预测时需要的decoder
- decoder_state_input_h = Input(shape=(latent_dim,))
- decoder_state_input_c = Input(shape=(latent_dim,))
- decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
- decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
- decoder_states = [state_h, state_c]
- decoder_outputs = decoder_dense(decoder_outputs)
- decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)
-
- return model, encoder_model, decoder_model

4.训练模型并保存
- # 训练并保存
- if __name__ == "__main__":
- data_deal()
- model,encoder_model,decoder_model = build_model()
- model.compile(optimizer='rmsprop',loss='categorical_crossentropy')
- model.fit([encoder_input_data,decoder_input_data],decoder_target_data,
- batch_size=batch_size,
- epochs=epochs,
- validation_split=0.2)
- model.save('model.h5')
- encoder_model.save('encoder_model.h5')
- decoder_model.save('decoder_model.h5')
三、预测
1.定义参数
- from keras.models import Model,load_model
- from keras.layers import Input,LSTM,Dense
- import numpy as np
- import pandas as pd
-
- from data_util import get_vocab_dict
- from data_util import get_xy_data
- from data_util import get_rev_dict
- import data_util
-
- latent_dim = 256
-
- # 语料向量化
- input_texts = []
- target_texts = []
- input_token_index = []
- target_token_index = []

2.开始预测
- # 开始inference
- def decoder_sequence(input_seq):
- # Encode the input as state vectors
- states_value = encoder_model.predict(input_seq)
-
- target_seq = np.zeros((1,1,data_util.num_decoder_tokens))
- # '\t' is starting character
- target_seq[0,0,target_token_index['\t']] = 1
-
- # Sampling loop for a batch of sequences
- stop_condition = False
- decoded_sentence= ''
- while not stop_condition:
- output_tokens, h, c = decoder_model.predict(
- [target_seq] + states_value)
- sampled_token_index = np.argmax(output_tokens[0,-1,:])
- sampled_char = reverse_target_char_index[sampled_token_index]
- decoded_sentence += sampled_char
- if(sampled_char == '\n' or len(decoded_sentence) > data_util.max_decoder_seq_length):
- stop_condition = True
-
- # Update the target sequenco to predict next token
- target_seq = np.zeros((1,1,data_util.num_decoder_tokens))
- target_seq[0,0,sampled_token_index] = 1
-
- # Update state
- states_value = [h, c]
-
- return decoded_sentence
-
- def predict_ans(question):
- input_seq = np.zeros((1, data_util.max_encoder_seq_length, data_util.num_encoder_tokens),dtype='float16')
- for t, char in list(enumerate(question)):
- input_seq[0,t,input_token_index[char]] = 1
- decoded_sentence = decoder_sequence(input_seq)
- return decoded_sentence
-
- if __name__ == "__main__":
- input_texts, target_texts = get_xy_data()
- input_token_index, target_token_index = get_vocab_dict(input_texts, target_texts)
- reverse_input_char_index, reverse_target_char_index = get_rev_dict(input_token_index, target_token_index)
- encoder_model = load_model('encoder_model.h5')
- decoder_model = load_model('decoder_model.h5')
- print('Decoded sentence:', predict_ans('这是个傻子'))

github:项目链接
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。