• 【奋斗者说】天辰物流穆怀永:幸福道路上永远有我们奋斗的身影 2018-12-06
  • 美国单边主义失道必然寡助 2018-12-06
  • 山东官宣留下功勋大外援 莫泰再战CBA两年! 2018-12-05
  • 广州市越秀区人民法院公告专栏 2018-12-04
  • 杨箕寺右 百余龙舟齐汇江面 2018-12-04
  • 国务院关税税则委员会办公室有关负责人对自美部分进口商品加征关税的解读 2018-12-02
  • 乌鲁木齐市惠民举措催热文化消费 2018-11-27
  • 希望在线教育公益平台获第十二届人民企业社会责任奖年度案例奖 2018-11-27
  • Tensorflow数据读取指南

    黑龙江快乐10分前3组 www.7vv77.com tensorflow的灵活性带来的学习成本是很多人头疼的问题,在tf中,读取数据基本有四种方法:
    1. tf.data (官方推荐):方便地构建复杂的输入管道
    2. Feeding:通过input_fn来yield数据
    3. QueueRunner:基于队列的输入管道
    4. 预加载数据。用constant或variable在内存中存储所有的数据

    其实从tf的历史发展轨??梢钥闯?,tf.data是在feedintg技术和QueueRunner的基础上发展而来。
    Feeding技术

    1
    2
    3
    4
    with tf.Session():
      input = tf.placeholder(tf.float32)
      classifier = ...
      print(classifier.eval(feed_dict={input: my_python_preprocessing_fn()}))

    简单教程中的数据加载,都是一行伪代码:(x, y) = load_data() 就搞定了
    但现实中,令训练数据的使用变得复杂的主要原因有很多:
    一、数据集不一致,不同的数据源的数据格式不一样,为了统一,我们需要一个转接层,我们称之为统一化过程
    二、数据集过大,不适合一次性导入,需要分批次导入,边训练边导入
    三、需要数据增强,在线生成,甚至完全在线生成,这样就需要将训练时的feed和读取的统一化数据之间加入一个动态生成过程
    四、当然批次化(batch),train和val的划分也都是重复性的工作

    tf.data就是用来解决以上问题的。

    两个重要类介绍:tf.data.Dataset和tf.data.Iterator,
    – Dataset用来表示一组数据,是一个序列,序列的每个元素包含一个或多个Tensor对象。
    – Iterator用户从数据集中抽取元素,Iterator.get_next()可以返回数据集的下一个元素,可以看作是输入管道与模型之间的接口。

    Dataset
    面对输入的数据集,我们要建立一个清晰的数据流概念,一个原始的数据集开始,后续的每一次转换都是流水线式的操作。因而我们可以看到这样的代码形式:data.TextLineDataset(filenames).map(decode_func).shuffle(buffer_size=10000).batch(batch_size).make_initializable_iterator()。

    Dataset的每一个元素一定是相同的结构,每个元素包含一个或多个Tensor对象,每个Tensor都有一个tf.DType来表示tensor中的元素数据类型,还有伊特tf.TensorShape来表示tensor中每个元素的形状。
    Dataset的map、flat_map、filter等等方法都是常用的操作链上过滤器方法。过滤器模式是常见的设计模式。

    Iterator
    如果说Dataset是从源中一步一步创建出一个统一化的数据集的话,Iterator就是负责怎么访问这个统一后的数据集然后喂给你的模型。有四种迭代器
    – one-shot
    – initializable
    – reinitializable
    – feedable

    one-shot最简单,就是一个元素一个元素的吐,但不能初始化,即参数化,就是遍历一遍数据集就什么也干不了了。
    initializable迭代器就可以初始化,可以接受一个placeholder,作为初始化参数

    1
    2
    3
    4
    5
    6
    max_value = tf.placeholder(tf.int64, shape=[])
    dataset = tf.data.Dataset.range(max_value)
    iterator = dataset.make_initializable_iterator()
    next_element = iterator.get_next()
    # Initialize an iterator over a dataset with 10 elements.
    sess.run(iterator.initializer, feed_dict={max_value: 10})

    initializable更典型的应用方式:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    # initializable iterator to switch between dataset
    EPOCHS = 10
    x, y = tf.placeholder(tf.float32, shape=[None,2]), tf.placeholder(tf.float32, shape=[None,1])
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
    test_data = (np.array([[1,2]]), np.array([[0]]))
    iter = dataset.make_initializable_iterator()
    features, labels = iter.get_next()
    with tf.Session() as sess:
    #     initialise iterator with train data
        sess.run(iter.initializer, feed_dict={ x: train_data[0], y: train_data[1]})
        for _ in range(EPOCHS):
            sess.run([features, labels])
    #     switch to test data
        sess.run(iter.initializer, feed_dict={ x: test_data[0], y: test_data[1]})
        print(sess.run([features, labels]))

    Dataset的创建依赖一个placeholder,所以只是表明了数据集的结构信息,具体的数据是什么还得在运行期的时候赋值。

    但如果我们不想在运行期使用feed_dict怎么办呢,用reinitializable迭代器支持多个数据集对象的初始化,创建的迭代器不再是dataset.make_xxx_iterator这种形式,而是使用结构直接使用Iterator的静态方法初始化tf.data.Iterator.from_structure(train_dataset.output_types, train_dataset.output_shapes)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    # Reinitializable iterator to switch between Datasets
    EPOCHS = 10
    # making fake data using numpy
    train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
    test_data = (np.random.sample((10,2)), np.random.sample((10,1)))
    # create two datasets, one for training and one for test
    train_dataset = tf.data.Dataset.from_tensor_slices(train_data)
    test_dataset = tf.data.Dataset.from_tensor_slices(test_data)
    # create a iterator of the correct shape and type
    iter = tf.data.Iterator.from_structure(train_dataset.output_types,
    train_dataset.output_shapes)
    features, labels = iter.get_next()
    # create the initialisation operations
    train_init_op = iter.make_initializer(train_dataset)
    test_init_op = iter.make_initializer(test_dataset)
    with tf.Session() as sess:
        sess.run(train_init_op) # switch to train dataset
        for _ in range(EPOCHS):
            sess.run([features, labels])
        sess.run(test_init_op) # switch to val dataset
        print(sess.run([features, labels]))

    feedable结合了initializable和reinitializable两者,可以使用placeholder和多个数据集一起初始化。如果说reinitializable方便在数据集间切换,那么feedable就是方便在迭代器间切换。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    # Define training and validation datasets with the same structure.
    training_dataset = tf.data.Dataset.range(100).map(
        lambda x: x + tf.random_uniform([], -10, 10, tf.int64)).repeat()
    validation_dataset = tf.data.Dataset.range(50)
    #
    # A feedable iterator is defined by a handle placeholder and its structure. We
    # could use the `output_types` and `output_shapes` properties of either
    # `training_dataset` or `validation_dataset` here, because they have
    # identical structure.
    handle = tf.placeholder(tf.string, shape=[])
    iterator = tf.data.Iterator.from_string_handle(
        handle, training_dataset.output_types, training_dataset.output_shapes)
    #
    next_element = iterator.get_next()
    #
    # You can use feedable iterators with a variety of different kinds of iterator
    # (such as one-shot and initializable iterators).
    training_iterator = training_dataset.make_one_shot_iterator()
    validation_iterator = validation_dataset.make_initializable_iterator()
    #
    # The `Iterator.string_handle()` method returns a tensor that can be evaluated
    # and used to feed the `handle` placeholder.
    training_handle = sess.run(training_iterator.string_handle())
    validation_handle = sess.run(validation_iterator.string_handle())
    #
    # Loop forever, alternating between training and validation.
    while True:
      # Run 200 steps using the training dataset. Note that the training dataset is
      # infinite, and we resume from where we left off in the previous `while` loop
      # iteration.
      for _ in range(200):
        sess.run(next_element, feed_dict={handle: training_handle})
      #
      # Run one pass over the validation dataset.
      sess.run(validation_iterator.initializer)
      for _ in range(50):
        sess.run(next_element, feed_dict={handle: validation_handle})

    但怎么说呢,tensorflow的设计师一如既往地发挥了他们接口设计纷繁复杂的能力,真的是极其混乱难用,一点也不清晰,要不然tensorflow里也不会引入keras包了。

    数据的消费

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    EPOCHS = 10
    BATCH_SIZE = 16
    # using two numpy arrays
    features, labels = (np.array([np.random.sample((100,2))]),
                        np.array([np.random.sample((100,1))]))
    dataset = tf.data.Dataset.from_tensor_slices((features,labels)).repeat().batch(BATCH_SIZE)
    iter = dataset.make_one_shot_iterator()
    x, y = iter.get_next()
    # make a simple model
    net = tf.layers.dense(x, 8, activation=tf.tanh) # pass the first value from iter.get_next() as input
    net = tf.layers.dense(net, 8, activation=tf.tanh)
    prediction = tf.layers.dense(net, 1, activation=tf.tanh)
    loss = tf.losses.mean_squared_error(prediction, y) # pass the second value from iter.get_net() as label
    train_op = tf.train.AdamOptimizer().minimize(loss)
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        for i in range(EPOCHS):
            _, loss_value = sess.run([train_op, loss])
            print("Iter: {}, Loss: {:.4f}".format(i, loss_value))

    其中x, y = iter.get_next()表示x,y都是一个获取数据的操作,每次sess.run就是执行一次x(), y()这样就获得一个样本数据。

    kera使用数据集就特别简单

    1
    2
    3
    4
    5
    6
    7
    8
    9
    dataset = tf.data.Dataset.from_tensor_slices((data, labels))
    dataset = dataset.batch(32).repeat()
    #
    val_dataset = tf.data.Dataset.from_tensor_slices((val_data, val_labels))
    val_dataset = val_dataset.batch(32).repeat()
    #
    model.fit(dataset, epochs=10, steps_per_epoch=30,
              validation_data=val_dataset,
              validation_steps=3)

    过滤器函数
    从原始数据变成统一的数据格式,最重要的就是map

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    def input_parser(img_path, label):
        # convert the label to one-hot encoding
        one_hot = tf.one_hot(label, NUM_CLASSES)
        #
        # read the img from file
        img_file = tf.read_file(img_path)
        img_decoded = tf.image.decode_image(img_file, channels=3)
        #
        return img_decoded, one_hot

    tr_data = tr_data.map(input_parser)

    input_parser接受了数据集每个元素,将转换完的结果返回,数据集就脱胎换骨了。

    图像的转换处理
    对于图像的转换处理,我们常常用到 tf.image下的decode_bmp decode_jpeg resize_images transpose_image等等函数,具体的参考[2]。
    一个image文件,变成一个Tensor一般需要这样几步:
    1. 获取文件名列表
    2. [Optional]shuffle文件名列表
    3. [Optional]根据epoch扔掉一些文件
    4. 建立文件名的queue
    5. 使用对应文件的Reader,不同的文件格式用不同的reader,比如TFRecordReader解析TFRecord格式,普通文件的话,就用FileReader,但读取普通文件也比较复杂,比如读单个文件可以使用tf.gfile.FastGfile(path, “r”).read(),对于filename_queue,则使用WholeFileReader(),如下面的语句

    1
    2
    3
    4
    image_reader = tf.WholeFileReader()
    data_queue = tf.train.string_input_producer([image_dir], shuffle=False)
    image_key, image_value = image_reader.read(data_queue)
    img = tf.image.decode_jpeg(image_value, channels=3)

    实际上还有更方便的read_file方法:

    1
    2
    image_value = tf.read_file(image_dir)
    img = tf.image.decode_jpeg(image_value, channels=3)

    以csv的读取为例,看queue的使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])
    #
    reader = tf.TextLineReader()
    key, value = reader.read(filename_queue)
    #
    # Default values, in case of empty columns. Also specifies the type of the
    # decoded result.
    record_defaults = [[1], [1], [1], [1], [1]]
    col1, col2, col3, col4, col5 = tf.decode_csv(
    value, record_defaults=record_defaults)
    features = tf.stack([col1, col2, col3, col4])
    #
    with tf.Session() as sess:
      # Start populating the filename queue.
      coord = tf.train.Coordinator()
      threads = tf.train.start_queue_runners(coord=coord)
      #
      for i in range(1200):
        # Retrieve a single instance:
        example, label = sess.run([features, col5])
      #
      coord.request_stop()
      coord.join(threads)

    当然,google建议将任何形式的原始数据都转换成TFRecords文件。关于TFRecords文件如何制作,请参考[9]。

    1
    2
    3
    4
    5
    dataset = tf.data.TFRecordDataset(filename)
    dataset = dataset.repeat(num_epochs)
    #
    # map takes a python function and applies it to every sample
    dataset = dataset.map(decode)

    6. 使用Decoder从Reader中反解成Tensor
    7. [Optional] preprocessing预处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    def read_my_file_format(filename_queue):
      reader = tf.SomeReader()
      key, record_string = reader.read(filename_queue)
      example, label = tf.some_decoder(record_string)
      processed_example = some_processing(example)
      return processed_example, label
    #
    def input_pipeline(filenames, batch_size, num_epochs=None):
      filename_queue = tf.train.string_input_producer(
          filenames, num_epochs=num_epochs, shuffle=True)
      example, label = read_my_file_format(filename_queue)
      # min_after_dequeue defines how big a buffer we will randomly sample
      #   from -- bigger means better shuffling but slower start up and more
      #   memory used.
      # capacity must be larger than min_after_dequeue and the amount larger
      #   determines the maximum we will prefetch.  Recommendation:
      #   min_after_dequeue + (num_threads + a small safety margin) * batch_size
      min_after_dequeue = 10000
      capacity = min_after_dequeue + 3 * batch_size
      example_batch, label_batch = tf.train.shuffle_batch(
          [example, label], batch_size=batch_size, capacity=capacity,
          min_after_dequeue=min_after_dequeue)
      return example_batch, label_batch

    参考资料
    [1] tf.data指引 https://www.tensorflow.org/guide/datasets
    [2] tensorflow Images处理指引 https://www.tensorflow.org/api_guides/python/image
    [3] 使用coordinator加载图像 https://gist.github.com/eerwitt/518b0c9564e500b4b50f
    [4] 使用Tensorlofw读取大数据踩坑之路 https://zhuanlan.zhihu.com/p/28450111
    [5] Example of TensorFlows new Input Pipeline https://kratzert.github.io/2017/06/15/example-of-tensorflows-new-input-pipeline.html
    [6]load_jpeg_with_tensorflow https://gist.github.com/eerwitt/518b0c9564e500b4b50f
    [7] tensorflow Inputs and Readers指引 https://www.tensorflow.org/api_guides/python/io_ops#Readers
    [8]tensorflow example mnist 数据集的读取 https://github.com/tensorflow/tensorflow/blob/r1.10/tensorflow/examples/tutorials/mnist/fully_connected_feed.py
    [9] //www.machinelearninguru.com/deep_learning/tensorflow/basics/tfrecord/tfrecord.html

  • 【奋斗者说】天辰物流穆怀永:幸福道路上永远有我们奋斗的身影 2018-12-06
  • 美国单边主义失道必然寡助 2018-12-06
  • 山东官宣留下功勋大外援 莫泰再战CBA两年! 2018-12-05
  • 广州市越秀区人民法院公告专栏 2018-12-04
  • 杨箕寺右 百余龙舟齐汇江面 2018-12-04
  • 国务院关税税则委员会办公室有关负责人对自美部分进口商品加征关税的解读 2018-12-02
  • 乌鲁木齐市惠民举措催热文化消费 2018-11-27
  • 希望在线教育公益平台获第十二届人民企业社会责任奖年度案例奖 2018-11-27