目录
- 1. 结构
- 2. 代码解读
1. 结构
我画的:
2. 代码解读
导包
import nltk
import numpy as np
import re
import shutil
import tensorflow as tf
import os
import unicodedatafrom nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
数据集的预处理
def clean_up_logs(data_dir):checkpoint_dir = os.path.join(data_dir, "checkpoints")if os.path.exists(checkpoint_dir):shutil.rmtree(checkpoint_dir, ignore_errors=True)os.mkdir(checkpoint_dir)return checkpoint_dir
这个函数通过去除重音符号、添加标点符号周围的空格、去除非字母和标点符号的字符、去除多余的空格以及转换为小写,对输入的英文句子进行了全面的预处理。
def preprocess_sentence(sent):sent = "".join([c for c in unicodedata.normalize("NFD", sent) if unicodedata.category(c) != "Mn"])sent = re.sub(r"([!.?])", r" \1", sent)sent = re.sub(r"[^a-zA-Z!.?]+", r" ", sent)sent = re.sub(r"\s+", " ", sent)sent = sent.lower()return sent
这里需要注意的是decoder
输入fr_sent_in 每一句的开头需要加上BOS (begin of sentence)
y_label为fr_sent_out 用于做评估 做loss计算 每一句的结尾 需要加上EOS (end of sentence)
def download_and_read():en_sents, fr_sents_in, fr_sents_out = [], [], []local_file = os.path.join("datasets", "fra.txt")with open(local_file, "r", encoding="utf-8") as fin:for i, line in enumerate(fin):en_sent, fr_sent, *_ = line.strip().split("\t")fr_sent = preprocess_sentence(fr_sent)fr_sent_in = [w for w in ("BOS " + fr_sent).split()] # decoder输出为法语 需要在开头加上BOS标记fr_sent_out = [w for w in (fr_sent + " EOS").split()] # decoder输出为法语 需要在结尾加上EOS标记en_sents.append(en_sent)fr_sents_in.append(fr_sent_in)fr_sents_out.append(fr_sent_out)if i >= NUM_SENT_PAIRS - 1:breakreturn en_sents, fr_sents_in, fr_sents_out
encoder部分
encoder的call方法 需要传入x 以及初始的state则会输出 encoder_out,encoder_state
具体的rnn实现使用封装好的GRU,控制参数return_state=True
class Encoder(tf.keras.Model):def __init__(self, vocab_size, embedding_dim, num_timesteps, encoder_dim, **kwargs):super(Encoder, self).__init__(**kwargs)self.encoder_dim = encoder_dimself.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=num_timesteps)self.rnn = tf.keras.layers.GRU(encoder_dim, return_sequences=False, return_state=True)def call(self, x, state):x = self.embedding(x)x, state = self.rnn(x, initial_state=state)return x, statedef init_state(self, batch_size):return tf.zeros((batch_size, self.encoder_dim))
decoder部分
call方法也需要传输x,state然后返回 decoder_out,decoder_state
rnn的实现也是使用封装好的GRU,控制参数return_state=True, return_sequences=True
rnn的返回值x,再输入全链接层,输出下一个单子是哪一个
class Decoder(tf.keras.Model):def __init__(self, vocab_size, embedding_dim, num_timesteps, decoder_dim, **kwargs):super(Decoder, self).__init__(**kwargs)self.decoder_dim = decoder_dimself.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=num_timesteps)self.rnn = tf.keras.layers.GRU(decoder_dim, return_state=True, return_sequences=True)self.dense = tf.keras.layers.Dense(vocab_size)def call(self, x, state):x = self.embedding(x)x, state = self.rnn(x, state)x = self.dense(x)return x, state
损失函数采用稀疏交叉熵SparseCategoricalCrossentropy 有了loss的计算才能求导计算梯度然后反向传播
def loss_func(ytrue, ypred):scce = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)# tf.math.equal(ytrue, 0):判断 ytrue 中的每个元素是否等于 0(通常 0 表示填充)。# tf.math.logical_not:对上述结果取反,得到一个布尔张量,其中 True 表示非填充部分,False 表示填充部分。mask = tf.math.logical_not(tf.math.equal(ytrue, 0))# tf.cast:将布尔张量转换为整数张量(True 变为 1,False 变为 0)mask = tf.cast(mask, dtype=tf.int64)# 计算损失,使用 mask 作为样本权重,这样填充部分的损失将被忽略。loss = scce(ytrue, ypred, sample_weight=mask)return loss
训练step的定义
@tf.function
def train_step(encoder_in, decoder_in, decoder_out, encoder_state):with tf.GradientTape() as tape:encoder_out, encoder_state = encoder(encoder_in, encoder_state)decoder_state = encoder_statedecoder_pred, decoder_state = decoder(decoder_in, decoder_state)loss = loss_func(decoder_out, decoder_pred)variables = encoder.trainable_variables + decoder.trainable_variablesgradients = tape.gradient(loss, variables) # 梯度计算 tf2版本自动求导的功能optimizer.apply_gradients(zip(gradients, variables))return loss
推理计算
def predict(encoder, decoder, batch_size, sents_en, data_en, sents_fr_out, word2idx_fr, idx2wprd_fr):# 随机的取一句random_id = np.random.choice(len(sents_en))print("Input : ", " ".join(sents_en[random_id]))print("Output : ", " ".join(sents_fr_out[random_id]))encoder_in = tf.expand_dims(data_en[random_id], axis=0)decoder_out = tf.expand_dims(sents_fr_out[random_id], axis=0)encoder_state = encoder.init_state(1)encoder_out, encoder_state = encoder(encoder_in, encoder_state)decoder_state = encoder_statedecoder_in = tf.expand_dims(tf.constant([word2idx_fr["BOS"]]), axis=0)pred_sent_fr = []while True:decoder_pred, decoder_state = decoder(decoder_in, decoder_state)decoder_pred = tf.argmax(decoder_pred, axis=-1)pred_word = idx2wprd_fr[decoder_pred.numpy()[0][0]]pred_sent_fr.append(pred_word)if pred_word == "EOS":breakdecoder_in = decoder_predprint("predict: ", " ".join(pred_sent_fr))
bleu分数的计算
def evaluate_bleu_score(encoder, decoder, test_dataset, word2idx_fr, idx2word_fr):bleu_scores = []smooth_fn = SmoothingFunction()for encoder_in, decoder_in, decoder_out in test_dataset:encoder_state = encoder.init_state(batch_size)encoder_out, encoder_state = encoder(encoder_in, encoder_state)decoder_state = encoder_statedecoder_pred, decoder_state = decoder(decoder_in, decoder_state)# compute argmaxdecoder_pred = tf.argmax(decoder_pred, axis=-1).numpy()# decoder_out 是y_truefor i in range(decoder_out.shape[0]): # 取到y_true的一句话ref_sent = [idx2word_fr[j] for j in decoder_out[i].to_list() if j > 0]hyp_sent = [idx2word_fr[j] for j in decoder_pred[i].to_list() if j > 0]# remove EOSref_sent = ref_sent[0:-1]hyp_sent = hyp_sent[0:-1]bleu_score = sentence_bleu([ref_sent], hyp_sent, smoothing_function=smooth_fn)bleu_scores.append(bleu_score)return np.mean(np.array(bleu_scores)) # 取均值
一些全局变量
NUM_SENT_PAIRS = 30000
EMBEDDING_DIM = 256
ENCODER_DIM, DECODER_DIM = 1024, 1024
BATCH_SIZE = 64
NUM_EPOCHS = 30
NUM_EPOCHS = 5tf.random.set_seed(30)data_dir = "datasets"
checkpoint_dir = clean_up_logs(data_dir)# datasets preparation
download_url = "http://www.manythings.org/anki/fra-eng.zip"
sents_en, sents_fr_in, sents_fr_out = download_and_read()
分词器处理样本
tokenizer_en = tf.keras.preprocessing.text.Tokenizer(filters="", lower=False)
tokenizer_en.fit_on_texts(sents_en)
data_en = tokenizer_en.texts_to_sequences(sents_en)
data_en = tf.keras.preprocessing.sequence.pad_sequences(data_en, padding="post")tokenizer_fr = tf.keras.preprocessing.text.Tokenizer(filters="", lower=False)
tokenizer_fr.fit_on_texts(sents_fr_in)
tokenizer_fr.fit_on_texts(sents_fr_out)data_fr_in = tokenizer_fr.texts_to_sequences(sents_fr_in)
data_fr_in = tf.keras.preprocessing.sequence.pad_sequences(data_fr_in, padding='post')data_fr_out = tokenizer_fr.texts_to_sequences(sents_fr_out)
data_fr_out = tf.keras.preprocessing.sequence.pad_sequences(data_fr_out, padding="post")vocab_size_en = len(tokenizer_en.word_index)
vocab_size_fr = len(tokenizer_fr.word_index)
word2idx_en = tokenizer_en.word_index
idx2word_en = {v: k for k, v in word2idx_en.items()}word2idx_fr = tokenizer_fr.word_index
idx2word_fr = {v: k for k, v in word2idx_fr.items()}print(f"Vocab size (en): {vocab_size_en}")
print(f"Vocab size (fr): {vocab_size_fr}")maxlen_en = data_en.shape[1]
maxlen_fr = data_fr_out.shape[1]
print(f"seq len (en): {maxlen_en}")
print(f"seq len (fr): {maxlen_fr}")
数据集的划分
batch_size = BATCH_SIZE
dataset = tf.data.Dataset.from_tensor_slices((data_en, data_fr_in, data_fr_out))
dataset = dataset.shuffle(10000)
test_size = NUM_SENT_PAIRS // 4
test_dataset = dataset.take(test_size).batch(batch_size, drop_remainder=True)
train_dataset = dataset.skip(test_size).batch(batch_size, drop_remainder=True)
encoder decoder输入输出维度的检查
# check encoder/decoder dimensions
embedding_dim = EMBEDDING_DIM
encoder_dim, decoder_dim = ENCODER_DIM, DECODER_DIMencoder = Encoder(vocab_size_en+1, embedding_dim, maxlen_en, encoder_dim)
decoder = Decoder(vocab_size_fr+1, embedding_dim, maxlen_fr, decoder_dim)optimizer = tf.keras.optimizers.Adam()
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer,encoder=encoder,decoder=decoder)for encoder_in, decoder_in, decoder_out in train_dataset:encoder_state = encoder.init_state(batch_size)encoder_out, encoder_state = encoder(encoder_in, encoder_state)decoder_state = encoder_statedecoder_pred, decoder_state = decoder(decoder_in, decoder_state)break
print("encoder input :", encoder_in.shape)
print("encoder output :", encoder_out.shape, "state: ", encoder_state.shape)
print("decoder output (logits) :", decoder_pred.shape, "state: ", decoder_state.shape)
print("decoder output (labels) :", decoder_out.shape)
训练
# training step
num_epochs = NUM_EPOCHS
for e in range(num_epochs):encoder_state = encoder.init_state(batch_size)for batch, data in enumerate(train_dataset):encoder_in, decoder_in, decoder_out = data# decoder_out is the label value# decoder_in feed into decoder and will return decoder_pred and state# print(encoder_in.shape, decoder_in.shape, decoder_out.shape)loss = train_step(encoder_in, decoder_in, decoder_out, encoder_state)print("Epoch: {}, Loss: {:.4f}".format(e+1, loss.numpy()))if e % 10 == 0:checkpoint.save(file_prefix=checkpoint_prefix)predict(encoder, decoder, batch_size, sents_en, data_en, sents_fr_out, word2idx_fr, idx2word_fr)eval_score = evaluate_bleu_score(encoder, decoder, test_dataset, word2idx_fr, idx2word_fr)print("Eval Score (BLEU): {:.3e}".format(eval_score))checkpoint.save(file_prefix=checkpoint_prefix)