TensorFlow でのデータ入力方法まとめ ①

あゆたディープラーニング担当の佐々木です。

今日は TensorFlow でモデルにデータを入れる手法についてまとめてみます。

パフォーマンスに関する本家の記事曰く、 チュートリアル等々で用いられている定番の feed_dict を使う手法だと、モデルの計算量やデータのサイズ次第では GPU を 活かしきれない事態になるそうです。(GPU の使用率がやたら低くなる) 原因としては、 CPU -> GPU 間のデータ転送が遅いことにあるようで、 次のデータが来る前に計算が終わってしまい、GPU が暇を持て余した状態になる場合があるようです。 そこで今回は Queue を用いてデータの転送を行う手法を実装してみます。

といっても先に結論を言ってしまうと、MNIST Expert ではどの手法でも GPU の 使用率は同じくらいだったので、場合によるのかなと。 CNN のようにもともと計算量の多いモデルだと、GPU の計算速度にデータの転送が追いつかないということも起こらないのかもしれません。
ただ、実装の仕方など複数のやりかたを知っておいて損はないかなと思うので、備忘録としてここに解説を残しておこうと思います。

という訳で、本記事では 2つの手法で MNIST Expert のモデルへデータを入れる実装をみていきます。 以下、基本的にはこの記事 に書いてある内容の要約 + 解説なので、英語が読める方は本家の方を読んでいただいたほうが理解が速いかもしれません。


概要

TensorFlow のデータ入力手法はざっくり 3種類あります。

  • feed_dict
  • ファイルから読み込み
  • Queue をつかう

共通なところ

まずはデータ入力以外の、共通で使用する部分を書いていきます。 といっても本家チュートリアルまんまなので、説明は割愛します。

def get_mnist_data():  
    from tensorflow.examples.tutorials.mnist import input_data
    mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
    return mnist


class Model(object):  
    def __init__(self):
        pass

    # --- Define utility functions ----
    # Weight and biases
    def weight_variable(self, shape):
        initial = tf.truncated_normal(shape, stddev= 0.1)
        return tf.Variable(initial)

    def bias_variable(self, shape):
        initial = tf.constant(0.1, shape= shape)
        return tf.Variable(initial)

    # Convolution and pooling
    def conv2d(self, input_, filter_):
        return tf.nn.conv2d(input_, filter_, strides=[1,1,1,1], padding= 'SAME')

    def max_pool_2x2(self, input_):
        return tf.nn.max_pool(input_, ksize= [1,2,2,1], strides= [1,2,2,1], padding= 'SAME')


    # ---- Get model ----
    def model(self):

        # ---- Define feeding data formulas ----
        ----!! ここが手法によって変わるところ !!----


        # ---- Get model ----
        model_y = self.cnn(x)

        # ---- Train and evaluate the model ----
        cross_entropy = tf.reduce_mean( tf.nn.softmax_cross_entropy_with_logits(logits= model_y, labels= y) )
        train_op      = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)

        # ---- Run tensorflow ----
        sess = tf.Session()
        init = tf.global_variables_initializer()
        sess.run( init )

        return sess, train_op


    # ---- Convolutional layer ----
    def cnn(self, x):
        ''' Define CNN layer
        Args:
            x:  input_features
        '''

        # First convolutional layer
        w_conv1 = self.weight_variable([5, 5, 1, 32])
        b_conv1 = self.bias_variable([32])

        x_image = tf.reshape(x, [-1, 28, 28, 1])
        h_conv1 = tf.nn.relu( self.conv2d(x_image, w_conv1) + b_conv1 )
        h_pool1 = self.max_pool_2x2(h_conv1)

        # Second convolutional layer
        w_conv2 = self.weight_variable([5, 5, 32, 64])
        b_conv2 = self.bias_variable([64])

        h_conv2 = tf.nn.relu( self.conv2d(h_pool1, w_conv2) + b_conv2 )
        h_pool2 = self.max_pool_2x2(h_conv2)

        # Density connected layer
        w_fc1 = self.weight_variable([7 * 7 * 64, 1024])
        b_fc1 = self.bias_variable([1024])

        h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64])
        h_fc1        = tf.nn.relu(tf.matmul(h_pool2_flat, w_fc1) + b_fc1)

        # Readout layer
        w_fc2 = self.weight_variable([1024, 10])
        b_fc2 = self.bias_variable([10])

        y_conv = tf.matmul(h_fc1, w_fc2) + b_fc2

        return y_conv

モデルに関するコードは上記のようになります。 基本的にデータ入力手法を変える場合には、上記のコードでいうところの model() の中身と、 train_op を呼び出す時の方法が変わるだけです。


1.feed_dict

まずは定番の feed_dict です。これはおなじみなので説明は割愛。

def model(self):  
    # ---- Define feeding data operations ----
    x = tf.placeholder(tf.float32, shape= [None, 784])
    y = tf.placeholder(tf.float32, shape= [None,  10])

    # ---- Get model ----
    model_y = self.cnn(x)

    # ---- Train and evaluate the model ----
    cross_entropy = tf.reduce_mean( tf.nn.softmax_cross_entropy_with_logits(logits= model_y, labels= y) )
    train_step    = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)

    # ---- Run tensorflow ----
    sess = tf.Session()
    init = tf.global_variables_initializer()
    sess.run( init )

    return sess, train_op

Tensorflow の式は上記のようになります。入力データを placeholder で定義しておくのがポイント。

この場合、式を実行するときは普通に feed_dict すればおkです。

# Get tensorflow formulas
model = Model()  
sess, train_op = model.model()

# Run train_op
mnist = get_mnist_data()  
batch = mnist.train.next_batch( <バッチサイズ> )  
sess.run( train_op, feed_dict= { x: batch[0], y: batch[1] } )  

2.ファイルに保存 + Queue

feed_dict に変わる手法として、入力データを一旦ファイルに保存し、そこから随時読み込んでいく手法があります。
この際、ファイルからの読み込みにはキューが使えます。また、ここで保存するファイルは TFRecorde という、 データをシリアライズして保存するもので、Tensorflow 独自のフォーマットになります。 平たく言うと pickel の Tensorflow 版みたいなものでしょうか。


クラスを修正

まずは Tensorflow の式を定義していたクラスを修正します。

class Model(object):  
    def __init__(self, mnist):
        self.train_images = mnist.train.images
        self.train_labels = mnist.train.labels
        self.test_images  = mnist.test.images
        self.test_labels  = mnist.test.labels

        self.save_protobuf()


TFRecorde の構造

TFRecorde の中身は Example というフォーマットに則り記録されています。
Example の構造はexample.proto のコードをみると分かりやすく解説してくれています。

features {  
    feature {
       key: "age"
       value { float_list {
         value: 29.0
       }}
    }
    feature {
       key: "movie"
       value { bytes_list {
         value: "The Shawshank Redemption"
         value: "Fight Club"
       }}
    }

上記のような入れ子の辞書構造でデータが格納されています。


TFRecorde 形式でファイルに保存する

入力用のデータを TFRecorde 形式に変換して、ファイルに保存するコードは以下のようになります。

def save_protobuf(self):  
    writer = tf.python_io.TFRecordWriter("ファイルパス")

    for example_idx in len( self.train_images ):
        features = self.train_images[i]
        label    = self.train_labels[i]

        # construct the Example proto object
        example = tf.train.Example(
            # Example contains a Features proto object
            features=tf.train.Features(
              # Features contains a map of string to Feature proto objects
              feature={
                # A Feature contains one of either a int64_list, float_list, or bytes_list
                'label': tf.train.Feature(
                    float_list=tf.train.FloatList(value=[label])),
                'image': tf.train.Feature(
                    float_list=tf.train.FloatList(value=features.astype("float"))),
        }))

        # use the proto object to serialize the example to a string
        serialized = example.SerializeToString()

        # write the serialized object to disk
        writer.write(serialized)

tf.train.Example() で指定の構造をもった辞書を作成し、 SerializeToString() で文字列の変換してファイルに書いています。 save_protobuf() を実行すると TFRecordWriter で指定したパスに .tfrecord 形式のファイルが保存されます。


TFRecorde ファイルからデータを読み込む
def read_and_decode_single_example(self):  
    # Define file name queue
    filename_queue = tf.train.string_input_producer( ["ファイルパス"], num_epochs= None )

    # Read single serialized example from file
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)

    # Convert serialized example into actual values
    features = tf.parse_single_example(
        serialized_example,
        features= {
            'label' : tf.FixedLenFeature([10],  'float'),
            'image' : tf.FixedLenFeature([784], 'float')
        }
    )

    # Return the converted data
    image = features[ 'label' ]
    label = features[ 'image' ]

    return image, label

string_input_producer() でファイルパスのリストを指定します。リストなので複数のファイルを指定可能。 ここで指定したファイルから、 tf.TFRecordReader().read() で Example を 1つずつ取り出していきます。 tf.TFRecordReader().read() で取り出したデータはシリアライズされているので、 tf.parse_single_example() で型を指定して復元します。


モデル

モデルは以下のようになります。
feed_dict の場合との違いは、 # ---- Input data ---- の部分だけ。

def model(self):  
    # ---- Input data ----
    # Get sinple examples
    x, y = self.read_and_decode_single_example()

    # Groups examples into batches randomly
    x_batch, y_batch = tf.train.shuffle_batch(
        [x, y],
        batch_size        = 50,
        capacity          = 2000,
        min_after_dequeue = 0
    )

    # ---- Calc CNN layer ----
    y_conv = self.cnn(x_batch)

    # ---- Train and evaluate the model ----
    cross_entropy = tf.reduce_mean( tf.nn.softmax_cross_entropy_with_logits(logits= y_conv, labels= y_batch) )
    train_op      = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)

    # ---- Run tensorflow ----
    sess = tf.Session()
    init = tf.global_variables_initializer()
    sess.run( init )

    return sess, train_op

tf.train.shuffle_batch()[x, y]batch_size 個ランダムに取り出します。 capacity, min_after_dequeue はそれぞれキューの最大値と、取り出した後に最低限残しておく数を意味しています。


学習を実施

それでは、実際に学習を行う部分のコードをみていきましょう!
まずは学習用の式 train_op と、それを動かすためのセッションを手に入れます。

# Get input data
mnist = get_mnist_data()

# Get tensorflow formulas
model          = Model( mnist )  
sess, train_op = model.model()  

続いて train_op を実行する部分。
キュー経由でモデルへデータを食わせる場合、以下のようにします。

# Training model
tf.train.start_queue_runners(sess= sess)  
for i in range(20000):  
    sess.run( train_op )

ポイントは train_op する前に start_queue_runners() することです。 start_queue_runners() を実行すると tf.train.shuffle_batch() でキューが作成され、 バックグラウンドでエンキュー/デキューが行われます。 キューの操作に関する部分は Tensorflow が裏方でやってくれるので、あとは train_op を実行するだけでおkです。


まとめ

本記事では Tensorflow にデータを食わせる手法として、feed_dictQueue を用いた手法を実装してみました。 次回は Queuefeed_dict 両方を用いた、ハイブリッドな手法を実装してみようと思います。


参考文献

[1] https://www.tensorflow.org/performance/performance_guide
[2] https://indico.io/blog/tensorflow-data-inputs-part1-placeholders-protobufs-queues/
[3] https://indico.io/blog/tensorflow-data-input-part2-extensions/
[4] https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/example/example.proto