【tensorflow2.0】数据管道dataset
如果需要训练的数据大小不大,例如不到1G,那么可以直接全部读入内存中进行训练,这样一般效率最高。 但如果需要训练的数据很大,例如超过10G,无法一次载入内存,那么通常需要在训练的过程中分批逐渐读入。 使用 tf.data API 可以构建数据输入管道,轻松处理大量的数据,不同的数据格式,以及不同的数据转换。 一,构建数据管道可以从 Numpy array,Pandas DataFrame,Python generator,csv文件,文本文件,文件路径,tfrecords文件等方式构建数据管道。 其中通过Numpy array,文件路径构建数据管道是最常用的方法。 通过tfrecords文件方式构建数据管道较为复杂,需要对样本构建tf.Example后压缩成字符串写到tfrecoreds文件,读取后再解析成tf.Example。 但tfrecoreds文件的优点是压缩后文件较小,便于网络传播,加载速度较快。 首先看下iris部分数据: 'feature_names': [sepal length (cm)',sepal width (cm)petal length (cm)petal width (cm)],1)">filename': /usr/local/lib/python3.6/dist-packages/sklearn/datasets/data/iris.csvtarget: array([0,1,1,12,1)">]),1)">target_names': array([setosa',versicolorvirginica'],dtype=<U10') 1,从Numpy array构建数据管道 # 从Numpy array构建数据管道 import tensorflow as tf numpy as np from sklearn datasets iris = datasets.load_iris() ds1 = tf.data.Dataset.from_tensor_slices((iris["data"],iris["])) for features,label in ds1.take(5): print(features,label)
2,从 Pandas DataFrame构建数据管道 从 Pandas DataFrame构建数据管道 datasets pandas as pd iris = datasets.load_iris() dfiris = pd.DataFrame(iris[ iris.feature_names) ds2 = tf.data.Dataset.from_tensor_slices((dfiris.to_dict(list"),1)">])) in ds2.take(3{'sepal?length?(cm)':?<tf.Tensor:?shape=(),?dtype=float32,?numpy=5.1>,?'sepal?width?(cm)':?<tf.Tensor:?shape=(),?dtype=float32,?numpy=3.5>,?'petal?length?(cm)':?<tf.Tensor:?shape=(),?numpy=1.4>,?'petal?width?(cm)':?<tf.Tensor:?shape=(),?numpy=0.2>}?tf.Tensor(0,?shape=(),?dtype=int64) 3,从Python generator构建数据管道 从Python generator构建数据管道 from matplotlib pyplot as plt from tensorflow.keras.preprocessing.image ImageDataGenerator 定义一个从文件中读取图片的generator image_generator = ImageDataGenerator(rescale=1.0/255).flow_from_directory( ./data/cifar2/test/),batch_size=20binary) classdict = image_generator.class_indices print(classdict) Found 2000 images belonging to 2 classes. {'airplane': 0,'automobile': 1} def generator(): in image_generator: yield (features,label) ds3 = tf.data.Dataset.from_generator(generator,output_types=(tf.float32,tf.int32)) %matplotlib inline %config InlineBackend.figure_format = svg plt.figure(figsize=(6,6)) for i,(img,label) in enumerate(ds3.unbatch().take(9)): ax=plt.subplot(3,3,i+1) ax.imshow(img.numpy()) ax.set_title(label = %d"%label) ax.set_xticks([]) ax.set_yticks([]) plt.show() 4,从csv文件构建数据管道 从csv文件构建数据管道 ds4 = tf.data.experimental.make_csv_dataset( file_pattern = [./data/titanic/train.csv",1)">./data/titanic/test.csv=Survived""=True) for data,1)">in ds4.take(2print(data,label)
5,从文本文件构建数据管道 从文本文件构建数据管道 ds5 = tf.data.TextLineDataset( filenames = [] ).skip(1) 略去第一行header for line in ds5.take(5print(line)
6,从文件路径构建数据管道 s6 = tf.data.Dataset.list_files(./data/cifar2/train/*/*.jpg) for file in ds6.take(5print(file)
def load_image(img_path,size = (32,1)">)): label = 1 if tf.strings.regex_full_match(img_path,1)">.*/automobile/.*") else 0 img = tf.io.read_file(img_path) img = tf.image.decode_jpeg(img) 注意此处为jpeg格式 img = tf.image.resize(img,size) return(img,label) %' in enumerate(ds6.map(load_image).take(2)): plt.figure(i) plt.imshow((img/255.0).numpy()) plt.title(label) plt.xticks([]) plt.yticks([]) 7,从tfrecords文件构建数据管道 os numpy as np inpath:原始数据路径 outpath:TFRecord文件输出路径 create_tfrecords(inpath,outpath): writer = tf.io.TFRecordWriter(outpath) dirs = os.listdir(inpath) for index,name enumerate(dirs): class_path = inpath +/"+ name+" for img_name os.listdir(class_path): img_path = class_path + img_name img = tf.io.read_file(img_path) img = tf.image.decode_image(img) img = tf.image.encode_jpeg(img) #统一成jpeg格式压缩 example = tf.train.Example( features=tf.train.Features(feature={ label': tf.train.Feature(int64_list=tf.train.Int64List(value=[index])),1)">img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img.numpy()])) })) writer.write(example.SerializeToString()) writer.close() create_tfrecords(./data/cifar2_test.tfrecords/) pyplot as plt parse_example(proto): description ={ : tf.io.FixedLenFeature([],tf.string),1)">: tf.io.FixedLenFeature([],tf.int64)} example = tf.io.parse_single_example(proto,description) img = tf.image.decode_jpeg(example["]) 注意此处为jpeg格式 img = tf.image.resize(img,(32,1)">)) label = example[] ./data/cifar2_test.tfrecords").map(parse_example).shuffle(3000) %in enumerate(ds7.take(9) ax.imshow((img/255.0).numpy()) ax.set_title(label) ax.set_xticks([]) ax.set_yticks([]) plt.show() 二,应用数据转换Dataset数据结构应用非常灵活,因为它本质上是一个Sequece序列,其每个元素可以是各种类型,例如可以是张量,列表,字典,也可以是Dataset。 Dataset包含了非常丰富的数据转换功能。
map:将转换函数映射到数据集每一个元素 ds = tf.data.Dataset.from_tensor_slices([hello worldhello Chinahello Beijing]) ds_map = ds.map(lambda x:tf.strings.split(x,1)">" )) for x ds_map: print(x)
flat_map:将转换函数映射到数据集的每一个元素,并将嵌套的Dataset压平。 ]) ds_flatmap = ds.flat_map(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x,1)">))) ds_flatmap: print(x)
interleave: 效果类似flat_map,但可以将不同来源的数据夹在一起。 ]) ds_interleave = ds.interleave( ds_interleave: filter:过滤掉某些元素。 ]) 找出含有字母a或B的元素 ds_filter = ds.filter(lambda x: tf.strings.regex_full_match(x,1)">.*[a|B].* ds_filter: print(x)
zip:将两个长度相同的Dataset横向铰合。 ds1 = tf.data.Dataset.range(0,3) ds2 = tf.data.Dataset.range(3,1)">) ds3 = tf.data.Dataset.range(6,9) ds_zip = tf.data.Dataset.zip((ds1,ds2,ds3)) for x,y,z ds_zip: print(x.numpy(),y.numpy(),z.numpy())
condatenate:将两个Dataset纵向连接。 ) ds_concat = tf.data.Dataset.concatenate(ds1,ds2) ds_concat: print(x)
reduce:执行归并操作。 ds = tf.data.Dataset.from_tensor_slices([1,5.0]) result = ds.reduce(0.0,lambda x,y:tf.add(x,y)) result
batch:构建批次,每次放一个批次。比原始数据增加一个维度。 其逆操作为unbatch。 ds = tf.data.Dataset.range(12) ds_batch = ds.batch(4 ds_batch: print(x)
padded_batch:构建批次,类似batch,但可以填充到相同的形状。 elements = [[1,2],[3,5],[6,7],[8]] ds = tf.data.Dataset.from_generator(: iter(elements),tf.int32) ds_padded_batch = ds.padded_batch(2,padded_shapes = [4 ds_padded_batch: print(x)
window:构建滑动窗口,返回Dataset of Dataset. window返回的是Dataset of Dataset,可以用flat_map压平 ds_window = ds.window(3,shift=1).flat_map(lambda x: x.batch(3,drop_remainder=True)) ds_window: print(x)
shuffle:数据顺序洗牌。 ) ds_shuffle = ds.shuffle(buffer_size = 5 ds_shuffle: print(x)
repeat:重复数据若干次,不带参数时,重复无数次。 ds = tf.data.Dataset.range(3) ds_repeat = ds.repeat(3 ds_repeat:shard:采样,从某个位置开始隔固定距离采样一个元素。 ) ds_shard = ds.shard(3,index = 1 ds_shard:take:采样,从开始位置取前几个元素。 ) ds_take = ds.take(3) list(ds_take.as_numpy_iterator())
三,提升管道性能训练深度学习模型常常会非常耗时。 模型训练的耗时主要来自于两个部分,一部分来自数据准备,另一部分来自参数迭代。 参数迭代过程的耗时通常依赖于GPU来提升。 而数据准备过程的耗时则可以通过构建高效的数据管道进行提升。 以下是一些构建高效数据管道的建议。
1,使用 prefetch 方法让数据准备和参数迭代两个过程相互并行。 tensorflow as tf 打印时间分割线 @tf.function printbar(): ts = tf.timestamp() today_ts = ts%(24*60*60) hour = tf.cast(today_ts//3600+8,tf.int32)%tf.constant(24) minite = tf.cast((today_ts%3600)//60 timeformat(m): if tf.strings.length(tf.strings.format({}: return(tf.strings.format(0{}= tf.strings.join([timeformat(hour),timeformat(minite),timeformat(second)],separator = :) tf.print(=========="*8,end = print(timestring) time 数据准备和参数迭代两个过程默认情况下是串行的。 模拟数据准备 for i in range(10): 假设每次准备数据需要2s time.sleep(2) i ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32)) 模拟参数迭代 train_step(): 假设每一步训练需要1s time.sleep(1) 训练过程预计耗时 10*2+10*1+ = 30s printbar() tf.print(tf.constant(start training... ds: train_step() printbar() tf.end training... 使用 prefetch 方法让数据准备和参数迭代两个过程相互并行。 训练过程预计耗时 max(10*2,10*1) = 20s start training with prefetch...)) tf.data.experimental.AUTOTUNE 可以让程序自动选择合适的参数 in ds.prefetch(buffer_size = tf.data.experimental.AUTOTUNE): train_step() printbar() tf.")) ================================================================================16:18:31 start training... ================================================================================16:19:01 end training... ================================================================================16:19:01 start training with prefetch... ================================================================================16:19:23 end training... 2,使用 interleave 方法可以让数据读取过程多进程执行,并将不同来源数据夹在一起。 ds_files = tf.data.Dataset.list_files(./data/titanic/*.csv) ds = ds_files.flat_map(lambda x:tf.data.TextLineDataset(x).skip(1in ds.take(4ds_files = tf.data.Dataset.list_files() ds = ds_files.interleave(in ds.take(8print(line)
3,使用 map 时设置num_parallel_calls 让数据转换过程多进行执行 ds = tf.data.Dataset.list_files( 单进程转换 start transformation...)) ds_map = ds.map(load_image) for _ pass printbar() tf.end transformation... 多进程转换 start parallel transformation...)) ds_map_parallel = ds.map(load_image,num_parallel_calls = tf.data.experimental.AUTOTUNE) ds_map_parallel: end parallel transformation...")) 4,使用 cache 方法让数据在第一个epoch后缓存到内存中,仅限于数据集不大情形。 in range(5假设每一步训练需要0s pass 训练过程预计耗时 (5*2+5*0)*3 = 30s for epoch in tf.range(3 ds: train_step() printbar() tf.epoch = ended) printbar() tf. i 使用 cache 方法让数据在第一个epoch后缓存到内存中,仅限于数据集不大情形。 ds = tf.data.Dataset.from_generator(generator,1)"> (tf.int32)).cache() 假设每一步训练需要0s time.sleep(0) 训练过程预计耗时 (5*2+5*0)+(5*0+5*0)*2 = 10s ")) ================================================================================17:07:57 start training... ================================================================================17:08:07 epoch = 0 ended ================================================================================17:08:17 epoch = 1 ended ================================================================================17:08:27 epoch = 2 end training... ================================================================================17:08:27 start training... ================================================================================17:08:37 0 ended ================================================================================17:08:37 ended ================================================================================17:08:37 end training... 5,使用 map转换时,先batch,然后采用向量化的转换方法对每个batch进行转换。 先map后batch ds = tf.data.Dataset.range(100000) ds_map_batch = ds.map(lambda x:x**2).batch(20) printbar() tf.start scalar transformation... ds_map_batch: printbar() tf.end scalar transformation... 先batch后map ds = tf.data.Dataset.range(100000) ds_batch_map = ds.batch(20).map(lambda x:x**2start vector transformation... ds_batch_map: end vector transformation...")) ================================================================================17:09:10 start scalar transformation... ================================================================================17:09:13 end scalar transformation... ================================================================================17:09:13 start vector transformation... ================================================================================17:09:14 end vector transformation... ? 参考: 开源电子书地址:https://lyhue1991.github.io/eat_tensorflow2_in_30_days/ GitHub 项目地址:https://github.com/lyhue1991/eat_tensorflow2_in_30_days (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- haskell – cabal:如何添加文本文件作为构建依赖项
- 懒惰评估“#{}” – ruby??中的字符串
- 技术前沿:Oracle 18c 最新特性概览
- flex实现pdf,word以及图片格式的预览(包括bmp格式的)
- _bss_start _armboot_start
- ruby-on-rails – 在Ruby on Rails中默认没有启用Rack :: D
- ruby-on-rails-3 – 如何使用Rails的资产管道在JavaScript页
- Qt操作xml引发内部异常(Qt5Xml.lib和Qt5Xmld.lib)
- React自己写的一个地图小组件
- DOJO组件生命周期(the life cycle of dojo widget)