当前位置:   article > 正文

bert 文本分类实战_bert实战

bert实战

前言:

       由于课题需要,学习自然语言处理(NLP),于是在网上找了找文章和代码进行学习,在此记录,课题代码就不展示了,使用网上的代码和大家分享。思想和代码大部分参考苏神,在此感谢。

任务目标:

     希望bert模型解决的问题: 输入:一段话; 输出:这段话属于的类别。

任务实现原理:

      本次模型为监督学习模型,根据已有标签的文本数据集,对bert模型进行训练。使用训练好的模型对句子进行预测,输出得到句子的类别。本质上属于多分类问题。

      大致流程为,数据集预处理;划分数据集();对数据集加工处理(文本数据编码成符合bert输入的向量);构建模型(bert模型导入与使用); 将数据送入模型进行训练和预测。

模型总体结构:

 具体代码:

数据集预处理:

     将数据集读入,并打乱数据的排列顺序。

  1. mainPath = 'bert多文本分类//'
  2. rc = pd.read_csv(mainPath + 'data/tnews/toutiao_news_dataset.txt', delimiter="_!_", names=['labels', 'text'], header=None, encoding='utf-8')
  3. rc = shuffle(rc) #打乱顺序

划分数据集

     将数据集划分为训练集和测试集(验证集)

  1. # 构建全部所需数据集
  2. data_list = []
  3. for d in rc.iloc[:].itertuples(): #itertuples(): 将DataFrame迭代为元祖。
  4. data_list.append((d.text, d.labels))
  5. # 取一部分数据做训练和验证
  6. train_data = data_list[0:20000]
  7. valid_data = data_list[20000:22000]

 数据加工处理:

修改原有的字典:

修改原因:苏神解读,本来 Tokenizer 有自己的 _tokenize 方法,我这里重写了这个方法,是要保证 tokenize 之后的结果,跟原来的字符串长度等长(如果算上两个标记,那么就是等长再加 2)。 Tokenizer 自带的 _tokenize 会自动去掉空格,然后有些字符会粘在一块输出,导致 tokenize 之后的列表不等于原来字符串的长度了,这样如果做序列标注的任务会很麻烦。主要就是用 [unused1] 来表示空格类字符,而其余的不在列表的字符用 [UNK] 表示,其中 [unused*] 这些标记是未经训练的(随即初始化),是 Bert 预留出来用来增量添加词汇的标记,所以我们可以用它们来指代任何新字符。

  1. #vocabPath里存储了大量的词语,每个词语对应的着一个编号 例如10640 posts
  2. # 将词表中的词编号转换为字典
  3. # 字典的形式为 '仑': 796,
  4. #得到最原始的字典
  5. tokenDict = {}
  6. with codecs.open(vocabPath, 'r', encoding='utf-8') as reader:
  7. for line in reader:
  8. token = line.strip() # 去除首尾空格
  9. tokenDict[token] = len(tokenDict)
  10. #原始的字典存在着瑕疵,在原始的字典上需要根据自己的数据集,创造自己的字典
  11. # 重写tokenizer
  12. class OurTokenizer(Tokenizer):
  13. def _tokenize(self, content):
  14. reList = []
  15. for t in content:
  16. if t in self._token_dict:
  17. reList.append(t)
  18. elif self._is_space(t):
  19. # 用[unused1]来表示空格类字符
  20. reList.append('[unused1]')
  21. else:
  22. # 不在列表的字符用[UNK]表示
  23. reList.append('[UNK]')
  24. return reList
  25. #使用新的字典
  26. tokenizer = OurTokenizer(tokenDict)

 

   文本数据根据字典编码成符合bert输入的向量,逐批生成数据([X1,X2],Y),从而可以丢到模型中训练。

  1. def seqPadding(X, padding=0):
  2. L = [len(x) for x in X]
  3. ML = max(L)
  4. return np.array([np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X])
  5. class data_generator:
  6. def __init__(self, data, batch_size=32, shuffle=True): #构造函数,使用时执行
  7. self.data = data
  8. self.batch_size = batch_size
  9. self.shuffle = shuffle
  10. self.steps = len(self.data) // self.batch_size
  11. if len(self.data) % self.batch_size != 0:
  12. self.steps += 1
  13. def __len__(self):
  14. return self.steps
  15. def __iter__(self):
  16. while True:
  17. idxs = list(range(len(self.data))) #数据元组下标
  18. if self.shuffle:
  19. np.random.shuffle(idxs) #是否打乱数据下标顺序
  20. X1, X2, Y = [], [], []
  21. for i in idxs:
  22. d = self.data[i]
  23. text = d[0][:maxlen]
  24. x1, x2 = tokenizer.encode(first=text) # encode方法可以一步到位地生成对应模型的输入。
  25. y = d[1]
  26. X1.append(x1) ## x1 是字对应的索引 # x2 是句子对应的索引
  27. X2.append(x2)
  28. Y.append([y])
  29. if len(X1) == self.batch_size or i == idxs[-1]:
  30. X1 = seqPadding(X1) #如果等于batchsize或者最后一个值后面补充0
  31. X2 = seqPadding(X2)
  32. Y = seqPadding(Y)
  33. yield [X1, X2], Y
  34. [X1, X2, Y] = [], [], []

构建模型和训练:

    加载bert模型,并对bert模型的输出进行调整,使bert模型能够完成我们的任务目标。

  1. # 设置预训练bert模型的路径
  2. configPath = mainPath + 'chinese_roberta_wwm_ext_L-12_H-768_A-12/bert_config.json'
  3. ckpPath = mainPath + 'chinese_roberta_wwm_ext_L-12_H-768_A-12/bert_model.ckpt'
  4. vocabPath = mainPath + 'chinese_roberta_wwm_ext_L-12_H-768_A-12/vocab.txt'
  5. # bert模型设置
  6. bert_model = load_trained_model_from_checkpoint(configPath, ckpPath, seq_len=None) # 加载预训练模型
  7. for l in bert_model.layers:
  8. l.trainable = True
  9. x1_in = Input(shape=(None,))
  10. x2_in = Input(shape=(None,))
  11. x = bert_model([x1_in, x2_in])
  12. # 取出[CLS]对应的向量用来做分类
  13. x = Lambda(lambda x: x[:, 0])(x)
  14. p = Dense(15, activation='softmax')(x)
  15. model = Model([x1_in, x2_in], p)
  16. model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(1e-5), metrics=['accuracy'])
  17. model.summary()
  18. train_D = data_generator(train_data)
  19. valid_D = data_generator(valid_data)
  20. model.fit_generator(train_D.__iter__(), steps_per_epoch=len(train_D), epochs=5, validation_data=valid_D.__iter__(),
  21. validation_steps=len(valid_D))

模型预测:

  1. #测试的数据集
  2. str1 = "上港主场1-2负于国安,遭遇联赛两连败,上港到底输在哪?"
  3. str2 = "普京总统会见了拜登总统"
  4. str3 = "这3辆10万出头小钢炮,随便改改轻松秒奔驰,第一辆还是限量款"
  5. predict_D = data_generator([(str1, 0), (str2, 3), (str3, 10)], shuffle=False)
  6. #获取总的标签类别
  7. #array(['体育', '军事', '农业', '国际', '娱乐', '房产', '教育', '文化', '旅游', '民生故事', '汽车','电竞游戏', '科技', '证券股票', '财经'], dtype=object)
  8. output_label2id_file = os.path.join(mainPath, "model/keras_class/label2id.pkl")
  9. if os.path.exists(output_label2id_file):
  10. with open(output_label2id_file, 'rb') as w:
  11. labes = pickle.load(w)
  12. #加载保存的模型
  13. from keras_bert import get_custom_objects
  14. custom_objects = get_custom_objects()
  15. model = load_model(mainPath + 'model/keras_class/tnews.h5', custom_objects=custom_objects)
  16. #使用生成器获取测试的数据
  17. tmpData = predict_D.__iter__()
  18. #预测
  19. preds = model.predict_generator(tmpData, steps=len(predict_D), verbose=1)
  20. # 求每行最大值得下标,其中,axis=1表示按行计算
  21. index_maxs = np.argmax(preds, axis=1)
  22. result = [(x, labes[x]) for x in index_maxs]
  23. print(result)

输出preds,index_maxs, result

 完整代码

  1. import pickle
  2. from keras_bert import load_trained_model_from_checkpoint, Tokenizer
  3. from keras.layers import *
  4. from keras.models import Model
  5. from keras.optimizers import Adam
  6. from sklearn.preprocessing import LabelEncoder
  7. from sklearn.utils import shuffle
  8. from keras.utils.vis_utils import plot_model
  9. import codecs, gc
  10. import keras.backend as K
  11. import os
  12. import pandas as pd
  13. import numpy as np
  14. # 文件主路径定义
  15. mainPath = '你的目录/keras_bert文本分类实例/'
  16. # 从文件中读取数据,获取训练集和验证集
  17. rc = pd.read_csv(mainPath + 'data/tnews/toutiao_news_dataset.txt', delimiter="_!_", names=['labels', 'text'],
  18. header=None, encoding='utf-8') #delimiter
  19. rc = shuffle(rc) # shuffle数据,打乱
  20. # 把类别转换为数字
  21. # 一共15个类别:"教育","科技","军事","旅游","国际","证券股票","农业","电竞游戏"
  22. # "民生故事","文化","娱乐","体育","财经","房产","汽车"
  23. class_le = LabelEncoder()
  24. rc.iloc[:, 0] = class_le.fit_transform(rc.iloc[:, 0].values)
  25. # 保存标签文件
  26. output_label2id_file = os.path.join(mainPath, "model/keras_class/label2id.pkl")
  27. if not os.path.exists(output_label2id_file):
  28. with open(output_label2id_file, 'wb') as w:
  29. pickle.dump(class_le.classes_, w)
  30. # 构建全部所需数据集
  31. data_list = []
  32. for d in rc.iloc[:].itertuples():
  33. data_list.append((d.text, d.labels))
  34. # 取一部分数据做训练和验证
  35. train_data = data_list[0:20000]
  36. valid_data = data_list[20000:22000]
  37. maxlen = 100 # 设置序列长度为100,要保证序列长度不超过512
  38. # 设置预训练模型
  39. configPath = mainPath + 'chinese_roberta_wwm_ext_L-12_H-768_A-12/bert_config.json'
  40. ckpPath = mainPath + 'chinese_roberta_wwm_ext_L-12_H-768_A-12/bert_model.ckpt'
  41. vocabPath = mainPath + 'chinese_roberta_wwm_ext_L-12_H-768_A-12/vocab.txt'
  42. # 将词表中的词编号转换为字典
  43. tokenDict = {}
  44. with codecs.open(vocabPath, 'r', encoding='utf-8') as reader:
  45. for line in reader:
  46. token = line.strip()
  47. tokenDict[token] = len(tokenDict)
  48. # 重写tokenizer
  49. class OurTokenizer(Tokenizer):
  50. def _tokenize(self, content):
  51. reList = []
  52. for t in content:
  53. if t in self._token_dict:
  54. reList.append(t)
  55. elif self._is_space(t):
  56. # 用[unused1]来表示空格类字符
  57. reList.append('[unused1]')
  58. else:
  59. # 不在列表的字符用[UNK]表示
  60. reList.append('[UNK]')
  61. return reList
  62. tokenizer = OurTokenizer(tokenDict)
  63. def seqPadding(X, padding=0):
  64. L = [len(x) for x in X]
  65. ML = max(L)
  66. return np.array([np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X])
  67. class data_generator: #先将数据变成元组的形式在喂入生成器
  68. def __init__(self, data, batch_size=32, shuffle=True):
  69. self.data = data
  70. self.batch_size = batch_size
  71. self.shuffle = shuffle
  72. self.steps = len(self.data) // self.batch_size
  73. if len(self.data) % self.batch_size != 0:
  74. self.steps += 1
  75. def __len__(self):
  76. return self.steps
  77. def __iter__(self):
  78. while True:
  79. idxs = list(range(len(self.data)))
  80. if self.shuffle:
  81. np.random.shuffle(idxs)
  82. X1, X2, Y = [], [], []
  83. for i in idxs:
  84. d = self.data[i]
  85. text = d[0][:maxlen]
  86. x1, x2 = tokenizer.encode(first=text)
  87. y = d[1]
  88. X1.append(x1)
  89. X2.append(x2)
  90. Y.append([y])
  91. if len(X1) == self.batch_size or i == idxs[-1]:
  92. X1 = seqPadding(X1)
  93. X2 = seqPadding(X2)
  94. Y = seqPadding(Y)
  95. yield [X1, X2], Y
  96. [X1, X2, Y] = [], [], []
  97. # bert模型设置
  98. bert_model = load_trained_model_from_checkpoint(configPath, ckpPath, seq_len=None) # 加载预训练模型
  99. for l in bert_model.layers:
  100. l.trainable = True
  101. x1_in = Input(shape=(None,))
  102. x2_in = Input(shape=(None,))
  103. x = bert_model([x1_in, x2_in])
  104. # 取出[CLS]对应的向量用来做分类
  105. x = Lambda(lambda x: x[:, 0])(x)
  106. p = Dense(15, activation='softmax')(x)
  107. model = Model([x1_in, x2_in], p)
  108. model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(1e-5), metrics=['accuracy'])
  109. model.summary()
  110. train_D = data_generator(train_data)
  111. valid_D = data_generator(valid_data)
  112. model.fit_generator(train_D.__iter__(), steps_per_epoch=len(train_D), epochs=5, validation_data=valid_D.__iter__(),
  113. validation_steps=len(valid_D))
  114. model.save(mainPath + 'model/keras_class/tnews.h5', True, True)
  115. # 保存模型结构图
  116. plot_model(model, to_file='model/keras_class/tnews.png', show_shapes=True)

参考链接
https://blog.csdn.net/qq_39290990/article/details/121672141

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/857746?site
推荐阅读
相关标签
  

闽ICP备14008679号