赞
踩
客官里面请,本次用bert实现文本领域的分类任务。本次在google colab进行实验的。那么,走起!!!
!nvidia-smi
#! -*- coding:utf-8 -*- import re, os, json, codecs, gc import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import os import time import datetime from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder from sklearn.utils import class_weight as cw from keras import Sequential from keras.models import Model from keras.layers import LSTM,Activation,Dense,Dropout,Input,Embedding,BatchNormalization,Add,concatenate,Flatten from keras.layers import Conv1D,Conv2D,Convolution1D,MaxPool1D,SeparableConv1D,SpatialDropout1D,GlobalAvgPool1D,GlobalMaxPool1D,GlobalMaxPooling1D from keras.layers.pooling import _GlobalPooling1D from keras.layers import MaxPooling2D,GlobalMaxPooling2D,GlobalAveragePooling2D from keras.optimizers import RMSprop,Adam from keras.preprocessing.text import Tokenizer from keras.preprocessing import sequence from keras.utils import to_categorical from keras.callbacks import EarlyStopping from keras.callbacks import ModelCheckpoint from keras.callbacks import ReduceLROnPlateau %matplotlib inline import warnings warnings.filterwarnings("ignore")
train_dataset_path = "train.txt"
test_dataset_path = "test.txt"
label_dataset_path = "class.txt"
train_df = pd.read_csv(train_dataset_path,encoding="utf-8",sep='\t',names=["text","label"])
test_df = pd.read_csv(test_dataset_path,encoding="utf-8",sep = '\t',names=["text","label"])
label_df = pd.read_csv(label_dataset_path,encoding="utf-8",header=None,sep = '\t')
train_df
test_df
label_df
train_df.dropna(axis=0, how='any', inplace=True)
import re
def filter(text):
text = re.sub("[A-Za-z0-9\!\=\?\%\[\]\,\(\)\>\<:<\/#\. -----\_]", "", text)
text = text.replace('图片', '')
text = text.replace('\xa0', '') # 删除nbsp
# new
r1 = "\\【.*?】+|\\《.*?》+|\\#.*?#+|[.!/_,$&%^*()<>+""'?@|:~{}#]+|[——!\\\,。=?、:“”‘’¥……()《》【】]"
cleanr = re.compile('<.*?>')
text = re.sub(cleanr, ' ', text) #去除html标签
text = re.sub(r1,'',text)
text = text.strip()
return text
def clean_text(data):
data['text'] = data['text'].apply(lambda x: filter(x))
return data
train = clean_text(train_df)
test = clean_text(test_df)
sns.countplot(train_df["label"])
plt.xlabel("Label")
plt.title("News sentiment analysis")
train_df["ocr"] = train_df["text"]
test_df["ocr"] = test_df["text"]
train_df = train_df[:45000]
train_df.shape
!pip install keras_bert
下载bert的预训练好的权重语料。
!wget -c https://storage.googleapis.com/chineseglue/pretrain_models/roeberta_zh_L-24_H-1024_A-16.zip
!unzip roeberta_zh_L-24_H-1024_A-16.zip
#! -*- coding:utf-8 -*- import re, os, json, codecs, gc import numpy as np import pandas as pd from random import choice from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import KFold from keras_bert import load_trained_model_from_checkpoint, Tokenizer from keras.layers import * from keras.callbacks import * from keras.models import Model import keras.backend as K from keras.optimizers import Adam maxlen = 512 config_path = './bert_config_large.json' # checkpoint_path = '/export/home/liuyuzhong/kaggle/bert/chinese_L-12_H-768_A-12/bert_model.ckpt' checkpoint_path = './roberta_zh_large_model.ckpt' dict_path = './vocab.txt' token_dict = {} with codecs.open(dict_path, 'r', 'utf8') as reader: for line in reader: token = line.strip() token_dict[token] = len(token_dict) class OurTokenizer(Tokenizer): def _tokenize(self, text): R = [] for c in text: if c in self._token_dict: R.append(c) elif self._is_space(c): R.append('[unused1]') # space类用未经训练的[unused1]表示 else: R.append('[UNK]') # 剩余的字符是[UNK] return R tokenizer = OurTokenizer(token_dict) def seq_padding(X, padding=0): L = [len(x) for x in X] ML = max(L) return np.array([ np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X ]) class data_generator: def __init__(self, data, batch_size=32, shuffle=True): self.data = data self.batch_size = batch_size self.shuffle = shuffle self.steps = len(self.data) // self.batch_size if len(self.data) % self.batch_size != 0: self.steps += 1 def __len__(self): return self.steps def __iter__(self): while True: idxs = list(range(len(self.data))) if self.shuffle: np.random.shuffle(idxs) X1, X2, Y = [], [], [] for i in idxs: d = self.data[i] text = d[0][:maxlen] x1, x2 = tokenizer.encode(first=text) y = d[1] X1.append(x1) X2.append(x2) Y.append([y]) if len(X1) == self.batch_size or i == idxs[-1]: X1 = seq_padding(X1) X2 = seq_padding(X2) Y = seq_padding(Y) yield [X1, X2], Y[:, 0, :] [X1, X2, Y] = [], [], [] from keras.metrics import top_k_categorical_accuracy def acc_top5(y_true, y_pred): return top_k_categorical_accuracy(y_true, y_pred, k=5) def build_bert(nclass): bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None) for l in bert_model.layers: l.trainable = True x1_in = Input(shape=(None,)) x2_in = Input(shape=(None,)) x = bert_model([x1_in, x2_in]) x = Lambda(lambda x: x[:, 0])(x) p = Dense(nclass, activation='softmax')(x) model = Model([x1_in, x2_in], p) model.compile(loss='categorical_crossentropy', optimizer=Adam(1e-5), metrics=['accuracy', acc_top5]) print(model.summary()) return model
from keras.utils import to_categorical
DATA_LIST = []
# 改
for data_row in train_df.iloc[:].itertuples():
DATA_LIST.append((data_row.ocr, to_categorical(data_row.label, 10)))
DATA_LIST = np.array(DATA_LIST)
DATA_LIST_TEST = []
for data_row in test_df.iloc[:].itertuples():
DATA_LIST_TEST.append((data_row.ocr, to_categorical(0, 10)))
DATA_LIST_TEST = np.array(DATA_LIST_TEST)
def run_cv(nfold, data, data_label, data_test): kf = KFold(n_splits=nfold, shuffle=True, random_state=520).split(data) # 改 train_model_pred = np.zeros((len(data), 10)) test_model_pred = np.zeros((len(data_test), 10)) for i, (train_fold, test_fold) in enumerate(kf): X_train, X_valid, = data[train_fold, :], data[test_fold, :] # 改 model = build_bert(10) early_stopping = EarlyStopping(monitor='val_acc', patience=3) plateau = ReduceLROnPlateau(monitor="val_acc", verbose=1, mode='max', factor=0.5, patience=2) checkpoint = ModelCheckpoint('./bert_dump/' + str(i) + '.hdf5', monitor='val_acc', verbose=2, save_best_only=True, mode='max',save_weights_only=True) train_D = data_generator(X_train, shuffle=True) valid_D = data_generator(X_valid, shuffle=True) test_D = data_generator(data_test, shuffle=False) history = model.fit_generator( train_D.__iter__(), steps_per_epoch=len(train_D), epochs=3, validation_data=valid_D.__iter__(), validation_steps=len(valid_D), callbacks=[early_stopping, plateau, checkpoint], ) model.save_weights("model2.h5") # model.load_weights('./bert_dump/' + str(i) + '.hdf5') model.save('model.h5') # HDF5文件,pip install h5py # return model train_model_pred[test_fold, :] = model.predict_generator(valid_D.__iter__(), steps=len(valid_D),verbose=1) test_model_pred += model.predict_generator(test_D.__iter__(), steps=len(test_D),verbose=1) del model; gc.collect() K.clear_session() # break return train_model_pred, test_model_pred,history
train_model_pred, test_model_pred,history = run_cv(2, DATA_LIST, None, DATA_LIST_TEST)
# 绘制训练过程中的 loss 和 acc 变化曲线 import matplotlib.pyplot as plt %matplotlib inline def history_plot(history_fit): plt.figure(figsize=(12,6)) # summarize history for accuracy plt.subplot(121) plt.plot(history_fit.history["accuracy"]) plt.plot(history_fit.history["val_accuracy"]) plt.title("model accuracy") plt.ylabel("accuracy") plt.xlabel("epoch") plt.legend(["train", "valid"], loc="upper left") # summarize history for loss plt.subplot(122) plt.plot(history_fit.history["loss"]) plt.plot(history_fit.history["val_loss"]) plt.title("model loss") plt.ylabel("loss") plt.xlabel("epoch") plt.legend(["train", "test"], loc="upper left") plt.show()
history_plot(history)
test_pred = [np.argmax(x) for x in test_model_pred]
test_df['labels'] = test_pred
test_df['labels']
test_df
from sklearn.metrics import classification_report
print(classification_report(test_df["label"], test_df["labels"]))
本次那就到此结束了!!!我们下回不见不散!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。