TensorFlow实现RNN循环神经网络
RNN(recurrent neural Network)循环神经网络 主要用于自然语言处理(nature language processing,NLP) RNN主要用途是处理和预测序列数据 RNN广泛的用于 语音识别、语言模型、机器翻译 RNN的来源就是为了刻画一个序列当前的输出与之前的信息影响后面节点的输出 RNN 是包含循环的网络,允许信息的持久化。 RNN会记忆之前的信息,并利用之前的信息影响后面节点的输出。 RNN的隐藏层之间的节点是有相连的,隐藏层的输入不仅仅包括输入层的输出,还包括上一时刻隐藏层的输出。 RNN会对于每一个时刻的输入结合当前模型的状态给出一个输出。 RNN理论上被看作同一个神经网络结构被无限复制的结果,目前RNN无法做到真正的无限循环,一般以循环体展开。 RNN图: RNN最擅长的问题是与时间序列相关的。 RNN对于一个序列数据,可以将序列上不同时刻的数据依次输入循环神经网络的输入层,而输出可以是对序列中下一个时刻的预测,也可以是对当前时刻信息的处理结果。 RNN 的关键点之一就是他们可以用来连接先前的信息到当前的任务上 展开后的RNN: 循环体网络中的参数在不同的时刻也是共享的。 RNN的状态是通过一个向量来表示,这个向量的维度也称为RNN隐藏层的大小。 假如该向量为h,输入为x,激活函数为tanh,则有如图: 前向传播的计算过程: 理论上RNN可以支持任意长度的序列,但是如果序列太长会导致优化时实现梯度消失的问题,一般会设置最大长度,超长会对其截断。 代码实现简单的RNN: import numpy as np # 定义RNN的参数。 X = [1,2] state = [0.0,0.0] w_cell_state = np.asarray([[0.1,0.2],[0.3,0.4]]) w_cell_input = np.asarray([0.5,0.6]) b_cell = np.asarray([0.1,-0.1]) w_output = np.asarray([[1.0],[2.0]]) b_output = 0.1 # 执行前向传播过程。 for i in range(len(X)): before_activation = np.dot(state,w_cell_state) + X[i] * w_cell_input + b_cell state = np.tanh(before_activation) final_output = np.dot(state,w_output) + b_output print ("before activation: ",before_activation) print ("state: ",state) print ("output: ",final_output) LSTM(long short-term memory)长短时记忆网络: LSTM解决了RNN不支持长期依赖的问题,使其大幅度提升记忆时长。 RNN被成功应用的关键就是LSTM。 LSTM是一种拥有三个“门”结构的特殊网络结构。 粉色的圈代表 pointwise 的操作,诸如向量的和,而黄色的矩阵就是学习到的神经网络层。合在一起的线表示向量的连接,分开的线表示内容被复制,然后分发到不同的位置。 LSTM核心思想: LSTM 的关键就是细胞状态,水平线在图上方贯穿运行。 细胞状态类似于传送带。直接在整个链上运行,只有一些少量的线性交互,信息在上面流传保持不变会很容易。 LSTM 有通过精心设计的称作为“门”的结构来去除或者增加信息到细胞状态的能力。 “门”是一种让信息选择式通过的方法,包含一个sigmoid 神经网络层和一个 pointwise (按位做乘法)的操作。 之所以称之为“门”,因为使用Sigmoid 作为激活函数的层会输出 0 到 1 之间的数值,描述每个部分有多少信息量可以通过这个结构。 0 代表“不许任何量通过”,1 就指“允许任意量通过”! LSTM的公式: 代码实现: import tensorflow as tf # 定义一个LSTM结构 lstm = rnn_cell.BasicLSTMCell(lstm_hidden_size) # 将LSTM中的状态初始化为全0数组,每次使用一个batch的训练样本 state = lstm.zero_state(batch_size,tf.float32) # 定义损失函数 loss = 0.0 # 规定一个最大序列长度 for i in range(num_steps): # 复用之前定义的变量 if i > 0: tf.get_variable_scope().reuse_variables() # 将当前输入和前一时刻的状态传入定义的LSTM结构,得到输出和更新后的状态 lstm_output,state = lstm(current_input,state) # 将当前时刻的LSTM结构的输出传入一个全连接层得到最后的输出。 final_output = fully_connectd(lstm_output) # 计算当前时刻输出的损失 loss += calc_loss(final_output,expected_output) 双向循环神经网络 经典的循环神经网络中的状态传输是从前往后单向的,然而当前时刻的输出不仅和之前的状态有关系,也和之后的状态相关。 双向循环神经网络能解决状态单向传输的问题。 双向循环神经网络是由两个循环神经网络反向上下叠加在一起组成,两个循环神经网络的状态共同决定输出。 也就是时间t时的输出不仅仅取决于过去的记忆,也同样取决于后面发生的事情。 深层(双向)循环神经网络 深层循环神经网络似于双向循环神经网络,只不过是每个时长内都有多层。 深层循环神经网络有更强的学习能力。 深层循环神经网络在每个时刻上将循环体结构复制多次,和卷积神经网络类似,每一层循环体中的参数是一致的,不同层的参数可以不同。 TensorFlow中使用MultiRNNCell实现深层循环神经网络中每一个时刻的前向传播过程。剩下的步骤和RNN的构建步骤相同。 RNN中的dropout 通过dropout方法可以上卷积神经网络更加健壮,类似的用在RNN上也能取得同样的效果。 类似卷积神经网络,RNN只在最后的全连接层使用dropout。 RNN一般只在不同层循环体结构中使用dropout,不在同层循环体使用。 同一时刻t中,不同循环体之间会使用dropout。 在TensorFlow中,使用DropoutWrapper类实现dropout功能。 通过input_keep_prob参数控制输入的dropout概率 通过output_keep_prob参数控制输出的dropout概率 TensorFlow样例实现RNN语言模型 代码: import numpy as np import tensorflow as tf import reader DATA_PATH = "../datasets/PTB/data" HIDDEN_SIZE = 200 # 隐藏层规模 NUM_LAYERS = 2 # 深层RNN中的LSTM结构的层数 VOCAB_SIZE = 10000 # 单词标识符个数 LEARNING_RATE = 1.0 # 学习速率 TRAIN_BATCH_SIZE = 20 # 训练数据大小 TRAIN_NUM_STEP = 35 # 训练数据截断长度 # 测试时不需要截断 EVAL_BATCH_SIZE = 1 # 测试数据大小 EVAL_NUM_STEP = 1 # 测试数据截断长度 NUM_EPOCH = 2 # 使用训练数据轮数 KEEP_PROB = 0.5 # 节点不被dropout MAX_GRAD_NORM = 5 # 控制梯度膨胀参数 # 定义一个类来描述模型结构。 class PTBModel (object): def __init__(self,is_training,batch_size,num_steps): self.batch_size = batch_size self.num_steps = num_steps # 定义输入层。 self.input_data = tf.placeholder (tf.int32,[batch_size,num_steps]) self.targets = tf.placeholder (tf.int32,num_steps]) # 定义使用LSTM结构及训练时使用dropout。 lstm_cell = tf.contrib.rnn.BasicLSTMCell (HIDDEN_SIZE) if is_training: lstm_cell = tf.contrib.rnn.DropoutWrapper (lstm_cell,output_keep_prob=KEEP_PROB) cell = tf.contrib.rnn.MultiRNNCell ([lstm_cell] * NUM_LAYERS) # 初始化最初的状态。 self.initial_state = cell.zero_state (batch_size,tf.float32) embedding = tf.get_variable ("embedding",[VOCAB_SIZE,HIDDEN_SIZE]) # 将原本单词ID转为单词向量。 inputs = tf.nn.embedding_lookup (embedding,self.input_data) if is_training: inputs = tf.nn.dropout (inputs,KEEP_PROB) # 定义输出列表。 outputs = [] state = self.initial_state with tf.variable_scope ("RNN"): for time_step in range (num_steps): if time_step > 0: tf.get_variable_scope ().reuse_variables () cell_output,state = cell (inputs[:,time_step,:],state) outputs.append (cell_output) output = tf.reshape (tf.concat (outputs,1),[-1,HIDDEN_SIZE]) weight = tf.get_variable ("weight",[HIDDEN_SIZE,VOCAB_SIZE]) bias = tf.get_variable ("bias",[VOCAB_SIZE]) logits = tf.matmul (output,weight) + bias # 定义交叉熵损失函数和平均损失。 loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example ( [logits],[tf.reshape (self.targets,[-1])],[tf.ones ([batch_size * num_steps],dtype=tf.float32)]) self.cost = tf.reduce_sum (loss) / batch_size self.final_state = state # 只在训练模型时定义反向传播操作。 if not is_training: return trainable_variables = tf.trainable_variables () # 控制梯度大小,定义优化方法和训练步骤。 grads,_ = tf.clip_by_global_norm (tf.gradients (self.cost,trainable_variables),MAX_GRAD_NORM) optimizer = tf.train.GradientDescentOptimizer (LEARNING_RATE) self.train_op = optimizer.apply_gradients (zip (grads,trainable_variables)) # 使用给定的模型model在数据data上运行train_op并返回在全部数据上的perplexity值 def run_epoch(session,model,data,train_op,output_log,epoch_size): total_costs = 0.0 iters = 0 state = session.run(model.initial_state) # 训练一个epoch。 for step in range(epoch_size): x,y = session.run(data) cost,state,_ = session.run([model.cost,model.final_state,train_op],{model.input_data: x,model.targets: y,model.initial_state: state}) total_costs += cost iters += model.num_steps if output_log and step % 100 == 0: print("After %d steps,perplexity is %.3f" % (step,np.exp(total_costs / iters))) return np.exp(total_costs / iters) # 定义主函数并执行 def main(): train_data,valid_data,test_data,_ = reader.ptb_raw_data(DATA_PATH) # 计算一个epoch需要训练的次数 train_data_len = len(train_data) train_batch_len = train_data_len // TRAIN_BATCH_SIZE train_epoch_size = (train_batch_len - 1) // TRAIN_NUM_STEP valid_data_len = len(valid_data) valid_batch_len = valid_data_len // EVAL_BATCH_SIZE valid_epoch_size = (valid_batch_len - 1) // EVAL_NUM_STEP test_data_len = len(test_data) test_batch_len = test_data_len // EVAL_BATCH_SIZE test_epoch_size = (test_batch_len - 1) // EVAL_NUM_STEP initializer = tf.random_uniform_initializer(-0.05,0.05) with tf.variable_scope("language_model",reuse=None,initializer=initializer): train_model = PTBModel(True,TRAIN_BATCH_SIZE,TRAIN_NUM_STEP) with tf.variable_scope("language_model",reuse=True,initializer=initializer): eval_model = PTBModel(False,EVAL_BATCH_SIZE,EVAL_NUM_STEP) # 训练模型。 with tf.Session() as session: tf.global_variables_initializer().run() train_queue = reader.ptb_producer(train_data,train_model.batch_size,train_model.num_steps) eval_queue = reader.ptb_producer(valid_data,eval_model.batch_size,eval_model.num_steps) test_queue = reader.ptb_producer(test_data,eval_model.num_steps) coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=session,coord=coord) for i in range(NUM_EPOCH): print("In iteration: %d" % (i + 1)) run_epoch(session,train_model,train_queue,train_model.train_op,True,train_epoch_size) valid_perplexity = run_epoch(session,eval_model,eval_queue,tf.no_op(),False,valid_epoch_size) print("Epoch: %d Validation Perplexity: %.3f" % (i + 1,valid_perplexity)) test_perplexity = run_epoch(session,test_queue,test_epoch_size) print("Test Perplexity: %.3f" % test_perplexity) coord.request_stop() coord.join(threads) if __name__ == "__main__": main() 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |