Как сделать TensorFlow + Keras быстрым с помощью набора данных TFRecord?

Каков пример использования TensorFlow TFRecord с моделью Keras и tf.session.run () при сохранении набора данных в тензорах с обработчиками очереди?

Ниже приведен фрагмент, который работает, но требует следующих улучшений:

  • Используйте Model API.
  • укажите Input ()
  • Загрузить набор данных из TFRecord
  • Параллельное выполнение набора данных (например, с помощью очереди)

Вот фрагмент, есть несколько строк TODO, указывающих, что необходимо:

from keras.models import Model
import tensorflow as tf
from keras import backend as K
from keras.layers import Dense, Input
from keras.objectives import categorical_crossentropy
from tensorflow.examples.tutorials.mnist import input_data

sess = tf.Session()
K.set_session(sess)

# Can this be done more efficiently than placeholders w/ TFRecords?
img = tf.placeholder(tf.float32, shape=(None, 784))
labels = tf.placeholder(tf.float32, shape=(None, 10))

# TODO: Use Input() 
x = Dense(128, activation='relu')(img)
x = Dense(128, activation='relu')(x)
preds = Dense(10, activation='softmax')(x)
# TODO: Construct model = Model(input=inputs, output=preds)

loss = tf.reduce_mean(categorical_crossentropy(labels, preds))

# TODO: handle TFRecord data, is it the same?
mnist_data = input_data.read_data_sets('MNIST_data', one_hot=True)

train_step = tf.train.GradientDescentOptimizer(0.5).minimize(loss)

sess.run(tf.global_variables_initializer())

# TODO remove default, add queuerunner
with sess.as_default():
    for i in range(1000):
        batch = mnist_data.train.next_batch(50)
        train_step.run(feed_dict={img: batch[0],
                                  labels: batch[1]})
    print(loss.eval(feed_dict={img:    mnist_data.test.images, 
                               labels: mnist_data.test.labels}))

Почему этот вопрос актуален?

Вот начальная информация для примера проблемы семантической сегментации:


person Andrew Hundt    schedule 12.02.2017    source источник
comment
github.com/tensorflow/tensorflow/issues/8787 будет работать над полной поддержкой этой функции помимо быстрого исправления, указанного в принятом ответе.   -  person Andrew Hundt    schedule 01.06.2017
comment
обновленный запрос на вытягивание github.com/fchollet/keras/pull/6928   -  person Andrew Hundt    schedule 16.06.2017


Ответы (2)


Я не использую формат набора данных tfrecord, поэтому не буду спорить о плюсах и минусах, но я заинтересовался расширением Keras для его поддержки.

github.com/indraforyou/keras_tfrecord - это репозиторий. Кратко поясню основные изменения.

Создание и загрузка набора данных

data_to_tfrecord и read_and_decode загрузка такая же. Особое внимание следует уделять внедрению read_and_decode, иначе вы столкнетесь с загадочными ошибками во время обучения.

Инициализация и модель Keras

Теперь оба слоя tf.train.shuffle_batch и Keras Input возвращают тензор. Но тот, который возвращается tf.train.shuffle_batch, не имеет метаданных, необходимых Керасу для внутреннего пользования. Как оказалось, любой тензор можно легко превратить в тензор с метаданными keras, вызвав слой Input с параметром tensor.

Итак, это позаботится об инициализации:

x_train_, y_train_ = ktfr.read_and_decode('train.mnist.tfrecord', one_hot=True, n_class=nb_classes, is_train=True)

x_train_batch, y_train_batch = K.tf.train.shuffle_batch([x_train_, y_train_],
                                                batch_size=batch_size,
                                                capacity=2000,
                                                min_after_dequeue=1000,
                                                num_threads=32) # set the number of threads here

x_train_inp = Input(tensor=x_train_batch)

Теперь с x_train_inp можно разработать любую модель keras.

Обучение (простое)

Допустим, train_out - это выходной тензор вашей модели keras. Вы можете легко написать собственный цикл обучения в следующих строках:

loss = tf.reduce_mean(categorical_crossentropy(y_train_batch, train_out))
train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)


# sess.run(tf.global_variables_initializer())
sess.run(tf.initialize_all_variables())

with sess.as_default():
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)

    try:
      step = 0
      while not coord.should_stop():
        start_time = time.time()

        _, loss_value = sess.run([train_op, loss], feed_dict={K.learning_phase(): 0})

        duration = time.time() - start_time

        if step % 100 == 0:
          print('Step %d: loss = %.2f (%.3f sec)' % (step, loss_value,
                                                     duration))
        step += 1
    except tf.errors.OutOfRangeError:
      print('Done training for %d epochs, %d steps.' % (FLAGS.num_epochs, step))
    finally:
      coord.request_stop()

    coord.join(threads)
    sess.close()

Тренировка (стиль керас)

Одна из особенностей keras, которая делает его настолько прибыльным, - это его универсальный обучающий механизм с функциями обратного вызова.

Но для поддержки обучения типа tfrecords необходимо внести несколько изменений в функцию fit.

  • запуск потоков очереди
  • нет загрузки пакетных данных через feed_dict
  • поддержка валидации становится сложной, поскольку данные валидации также будут поступать через другой тензор, другая модель должна быть внутренне создана с общими верхними уровнями и тензором валидации, введенным другим считывателем tfrecord.

Но все это легко подкрепить другим параметром флага. Что мешает, так это особенности keras sample_weight и class_weight, которые используются для взвешивания каждого образца и взвешивания каждого класса. Для этого в compile() keras создаются заполнители () и заполнители также неявно создаются для целевых объектов (здесь), который в нашем случае не нужен, метки уже введены читателями tfrecord. Эти заполнители необходимо вводить во время выполнения сеанса, что не требуется в нашем CAE.

Поэтому, принимая во внимание эти изменения, compile_tfrecord (здесь и _20) являются расширением _20 и говорят, что compile 95% кода.

Их можно использовать следующим образом:

import keras_tfrecord as ktfr

train_model = Model(input=x_train_inp, output=train_out)
ktfr.compile_tfrecord(train_model, optimizer='rmsprop', loss='categorical_crossentropy', out_tensor_lst=[y_train_batch], metrics=['accuracy'])

train_model.summary()

ktfr.fit_tfrecord(train_model, X_train.shape[0], batch_size, nb_epoch=3)
train_model.save_weights('saved_wt.h5')

Приглашаем вас улучшить код и запросы на вытягивание.

person indraforyou    schedule 19.02.2017
comment
вау выглядит фантастически! Возможно, стоит запрос на перенос в keras-contrib, официальный репозиторий keras upstream? Я попробую это сделать, а затем, надеюсь, получу кредит на ответ + награду. Я также отредактировал ссылки, чтобы использовать хэш текущей версии keras, чтобы номера строк оставались правильными. - person Andrew Hundt; 20.02.2017
comment
вот запрос на вытягивание keras-contrib № 27 - person Andrew Hundt; 20.03.2017

Обновление 2018-08-29 теперь напрямую поддерживается в keras, см. Следующий пример:

https://github.com/keras-team/keras/blob/master/examples/mnist_tfrecord.py

Оригинальный ответ:

TFRecords поддерживаются за счет использования внешних потерь. Вот ключевые линии, создающие внешние потери:

# tf yield ops that supply dataset images and labels
x_train_batch, y_train_batch = read_and_decode_recordinput(...)

# create a basic cnn
x_train_input = Input(tensor=x_train_batch)
x_train_out = cnn_layers(x_train_input)

model = Model(inputs=x_train_input, outputs=x_train_out)
loss = keras.losses.categorical_crossentropy(y_train_batch, x_train_out)
model.add_loss(loss)

model.compile(optimizer='rmsprop', loss=None)

Вот пример для Keras 2. Он работает после применения небольшого патча # 7060:

'''MNIST dataset with TensorFlow TFRecords.

Gets to 99.25% test accuracy after 12 epochs
(there is still a lot of margin for parameter tuning).
'''
import os
import copy
import time

import numpy as np

import tensorflow as tf
from tensorflow.python.ops import data_flow_ops
from keras import backend as K
from keras.models import Model
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import Flatten
from keras.layers import Input
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.callbacks import EarlyStopping
from keras.callbacks import TensorBoard
from keras.objectives import categorical_crossentropy
from keras.utils import np_utils
from keras.utils.generic_utils import Progbar
from keras import callbacks as cbks
from keras import optimizers, objectives
from keras import metrics as metrics_module

from keras.datasets import mnist

if K.backend() != 'tensorflow':
    raise RuntimeError('This example can only run with the '
                       'TensorFlow backend for the time being, '
                       'because it requires TFRecords, which '
                       'are not supported on other platforms.')


def images_to_tfrecord(images, labels, filename):
    def _int64_feature(value):
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

    def _bytes_feature(value):
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

    """ Save data into TFRecord """
    if not os.path.isfile(filename):
        num_examples = images.shape[0]

        rows = images.shape[1]
        cols = images.shape[2]
        depth = images.shape[3]

        print('Writing', filename)
        writer = tf.python_io.TFRecordWriter(filename)
        for index in range(num_examples):
            image_raw = images[index].tostring()
            example = tf.train.Example(features=tf.train.Features(feature={
                'height': _int64_feature(rows),
                'width': _int64_feature(cols),
                'depth': _int64_feature(depth),
                'label': _int64_feature(int(labels[index])),
                'image_raw': _bytes_feature(image_raw)}))
            writer.write(example.SerializeToString())
        writer.close()
    else:
        print('tfrecord %s already exists' % filename)


def read_and_decode_recordinput(tf_glob, one_hot=True, classes=None, is_train=None,
                                batch_shape=[1000, 28, 28, 1], parallelism=1):
    """ Return tensor to read from TFRecord """
    print 'Creating graph for loading %s TFRecords...' % tf_glob
    with tf.variable_scope("TFRecords"):
        record_input = data_flow_ops.RecordInput(
            tf_glob, batch_size=batch_shape[0], parallelism=parallelism)
        records_op = record_input.get_yield_op()
        records_op = tf.split(records_op, batch_shape[0], 0)
        records_op = [tf.reshape(record, []) for record in records_op]
        progbar = Progbar(len(records_op))

        images = []
        labels = []
        for i, serialized_example in enumerate(records_op):
            progbar.update(i)
            with tf.variable_scope("parse_images", reuse=True):
                features = tf.parse_single_example(
                    serialized_example,
                    features={
                        'label': tf.FixedLenFeature([], tf.int64),
                        'image_raw': tf.FixedLenFeature([], tf.string),
                    })
                img = tf.decode_raw(features['image_raw'], tf.uint8)
                img.set_shape(batch_shape[1] * batch_shape[2])
                img = tf.reshape(img, [1] + batch_shape[1:])

                img = tf.cast(img, tf.float32) * (1. / 255) - 0.5

                label = tf.cast(features['label'], tf.int32)
                if one_hot and classes:
                    label = tf.one_hot(label, classes)

                images.append(img)
                labels.append(label)

        images = tf.parallel_stack(images, 0)
        labels = tf.parallel_stack(labels, 0)
        images = tf.cast(images, tf.float32)

        images = tf.reshape(images, shape=batch_shape)

        # StagingArea will store tensors
        # across multiple steps to
        # speed up execution
        images_shape = images.get_shape()
        labels_shape = labels.get_shape()
        copy_stage = data_flow_ops.StagingArea(
            [tf.float32, tf.float32],
            shapes=[images_shape, labels_shape])
        copy_stage_op = copy_stage.put(
            [images, labels])
        staged_images, staged_labels = copy_stage.get()

        return images, labels


def save_mnist_as_tfrecord():
    (X_train, y_train), (X_test, y_test) = mnist.load_data()
    X_train = X_train[..., np.newaxis]
    X_test = X_test[..., np.newaxis]
    images_to_tfrecord(images=X_train, labels=y_train, filename='train.mnist.tfrecord')
    images_to_tfrecord(images=X_test, labels=y_test, filename='test.mnist.tfrecord')


def cnn_layers(x_train_input):
    x = Conv2D(32, (3, 3), activation='relu', padding='valid')(x_train_input)
    x = Conv2D(64, (3, 3), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.25)(x)
    x = Flatten()(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.5)(x)
    x_train_out = Dense(classes,
                        activation='softmax',
                        name='x_train_out')(x)
    return x_train_out


sess = tf.Session()
K.set_session(sess)

save_mnist_as_tfrecord()

batch_size = 100
batch_shape = [batch_size, 28, 28, 1]
epochs = 3000
classes = 10
parallelism = 10

x_train_batch, y_train_batch = read_and_decode_recordinput(
    'train.mnist.tfrecord',
    one_hot=True,
    classes=classes,
    is_train=True,
    batch_shape=batch_shape,
    parallelism=parallelism)

x_test_batch, y_test_batch = read_and_decode_recordinput(
    'test.mnist.tfrecord',
    one_hot=True,
    classes=classes,
    is_train=True,
    batch_shape=batch_shape,
    parallelism=parallelism)


x_batch_shape = x_train_batch.get_shape().as_list()
y_batch_shape = y_train_batch.get_shape().as_list()

x_train_input = Input(tensor=x_train_batch, batch_shape=x_batch_shape)
x_train_out = cnn_layers(x_train_input)
y_train_in_out = Input(tensor=y_train_batch, batch_shape=y_batch_shape, name='y_labels')
cce = categorical_crossentropy(y_train_batch, x_train_out)
train_model = Model(inputs=[x_train_input], outputs=[x_train_out])
train_model.add_loss(cce)

train_model.compile(optimizer='rmsprop',
                    loss=None,
                    metrics=['accuracy'])
train_model.summary()

tensorboard = TensorBoard()

# tensorboard disabled due to Keras bug
train_model.fit(batch_size=batch_size,
                epochs=epochs)  # callbacks=[tensorboard])

train_model.save_weights('saved_wt.h5')

K.clear_session()

# Second Session, pure Keras
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train[..., np.newaxis]
X_test = X_test[..., np.newaxis]
x_test_inp = Input(batch_shape=(None,) + (X_test.shape[1:]))
test_out = cnn_layers(x_test_inp)
test_model = Model(inputs=x_test_inp, outputs=test_out)

test_model.load_weights('saved_wt.h5')
test_model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])
test_model.summary()

loss, acc = test_model.evaluate(X_test, np_utils.to_categorical(y_test), classes)
print('\nTest accuracy: {0}'.format(acc))

Я также работал над улучшением поддержки TFRecords в следующей проблеме и запросе на перенос:

  • # 6928 Поддержка Yield Op: высокопроизводительные большие наборы данных через TFRecords и RecordInput
  • # 7102 Предложение по разработке API входного тензора Keras

Наконец, можно использовать tf.contrib.learn.Experiment для обучения моделей Keras в TensorFlow.

person Andrew Hundt    schedule 27.06.2017
comment
Мне удалось заставить этот пример работать с внешними потерями только после включения PR github.com/fchollet/keras / pull / 7060, чтобы исправить модуль generic_utils.py. - person Al Conrad; 20.07.2017
comment
По какой-то причине data_flow_ops.RecordInput возвращает только первый пакет, затем Керас думает, что эта эпоха завершена, и перезапускает другую эпоху. Я не могу понять почему. Я знаю, что вам трудно понять, что происходит, но есть ли у вас какие-либо предложения, как это отладить? Большое спасибо. Я уверен, что файл tfrecord, который я передал, правильный (имеет более 60 тысяч изображений). - person NullSpace; 26.07.2017
comment
@NullSpace Это должен быть отдельный вопрос о переполнении стека. С текущим keras освоите один шаг == одну эпоху, поэтому просто запускайте несколько раз или попробуйте github.com / fchollet / keras / pull / 7113. - person Andrew Hundt; 26.07.2017