2018年2月24日 星期六

[TensorFlow] Thread and Queue

 TensorFlow    Python   Queue   Thread 


Introduction


We will learn how to use the following queue in TensorFlow.


1.  FIFOQueue
Like the name, first in and first out. Often used on loading training data with order.

2.  RandomShuffleQueue
Dequeue with random order.



Environment


Python 3.6.2
TensorFlow 1.5.0



Implement


FIFOQueue

Reference:
adventuresinmachinelearning.com
Source code: Github

import tensorflow as tf
   
dummy_input = tf.random_normal([3], mean=0, stddev=1)
dummy_input = tf.Print(dummy_input, data=[dummy_input], message='Create new dummy inputs: ', summarize=6)

q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)
enqueue_op = q.enqueue_many(dummy_input)

data = q.dequeue()
data = tf.Print(data, data=[q.size(),data], message='How many items are left in queue: ')

# create a fake graph that we can call upon
fg = data + 1

with tf.Session() as sess:
    sess,run(enqueue_op)
    for i in range(0,3):
        sess.run(fg)
   
   


Output:




RandomShuffleQueue

Reference: adventuresinmachinelearning.com
Source code: Github


import tensorflow as tf
   
dummy_input = tf.random_normal([10], mean=0, stddev=1)
dummy_input = tf.Print(dummy_input, data=[dummy_input], message='Create new dummy inputs: ', summarize=6)

q = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2, dtypes=tf.float32)
enqueue_op = q.enqueue_many(dummy_input)

data = q.dequeue()
data = tf.Print(data, data=[q.size(),data], message='How many items are left in queue: ')
# create a fake graph that we can call upon
fg = data + 1

with tf.Session() as sess:
    sess.run(enqueue_op)
    for i in range(0,8):
        sess.run(fg)
   

Output:




Set timeout and notify out-of-range

While enqueueing, if the queue is full when this operation executes, it will block until the element has been enqueued.
On the other hand, if the queue is empty when executing dequeueing, it will block until there is an element to dequeue.

For example, the following code will fire the block cuz we dequeue for 5 times while there no element in queue after the third dequeue.

import tensorflow as tf
   
dummy_input = tf.random_normal([3], mean=0, stddev=1)
# … skip
with tf.Session() as sess:
    sess,run(enqueue_op)
    for i in range(0,5):
        sess.run(fg)

So we update the above codes by setting timeout_in_ms(Time to wait for operation to complete in milliseconds) into tf.RunOptions.

with tf.Session() as sess:

    run_option = tf.RunOptions(timeout_in_ms=5000)
    try:
        sess.run(enqueue_op, options=run_option)
        for i in range(0,9):
            sess.run(fg, options=run_option)
    except tf.errors.DeadlineExceededError:
        print('Out of range')



Threading and Queues

Though we can catch the blocking, we would rather to make the queue always stay filled until the dequeue action is stopped. In TensorFlow, we can use QueueRunners to fire one or many asynchronous threads for enqueueing and prevent a queue from running dry.

# … Skip
enqueue_op = q.enqueue_many(dummy_input)

# Set a queue runner to handle enqueue_op outside the main thread asynchronously
qr = tf.train.QueueRunner(q, enqueue_ops= [enqueue_op] * 1)
tf.train.add_queue_runner(qr)

with tf.Session() as sess:
    enqueue_threads = qr.create_threads(sess, start=True)
    # sess.run(enqueue_op, options=run_option) # Remove this line


Notice that
1 is the number of thread we will create for enqueue_ops.
You can start the threads later by
enqueue_threads = qr.create_threads(sess, start=False)
for thread in enqueue_threads:
     thread.start()


But the above codes will make the queue blocked again cuz we didn’t tell the threads to stop after we stopped dequeue, and the queue exceeded its capacity in the end.

tf.train.Coordinator implements a simple mechanism to coordinate the termination of a set of threads.


with tf.Session() as sess:
    coord = tf.train.Coordinator()
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)

    run_option = tf.RunOptions(timeout_in_ms=5000)
    for i in range(0,10):
        sess.run(fg, options=run_option)

    coord.request_stop()
    coord.join(enqueue_threads)        

tf.train.Coordinator : request_stop()

Threads are given 'stop_grace_period_secs' seconds to terminate. If any of them is still alive after that period expires, a RuntimeError is raised.



Wait for all threads to terminate. Here is an example from TensorFlow document.

try:
  coord = Coordinator()
  # Start a number of threads, passing the coordinator to each of them.
  ...start thread 1...(coord, ...)
  ...start thread N...(coord, ...)
  # Wait for all the threads to terminate, give them 10s grace period
  coord.join(threads, stop_grace_period_secs=10)
except RuntimeError:
  ...one of the threads took more than 10s to stop after request_stop()
  ...was called.
except Exception:
  ...exception that was passed to coord.request_stop()




Completed codes

Source code: Github

q = tf.FIFOQueue(capacity=3, dtypes=tf.float32, shapes=[])

dummy_input = tf.random_normal([3], mean=0, stddev=1)
dummy_input = tf.Print(dummy_input, data=[dummy_input], message='Create new dummy inputs: ', summarize=6)
enqueue_op = q.enqueue_many(dummy_input)

# Set a queue runner to handle enqueue_op outside the main thread asynchronously
qr = tf.train.QueueRunner(q, enqueue_ops= [enqueue_op] * 1)
tf.train.add_queue_runner(qr)

data = q.dequeue_many(3)
data = tf.Print(data, data=[q.size(),data], message='How many items are left in queue: ')
# create a fake graph that we can call upon
fg = data + 1

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)

    run_option = tf.RunOptions(timeout_in_ms=5000)
    try:
        for i in range(0,5):
            if coord.should_stop(): # coord.should_stop() returns True as soon as coord.request_stop() has been called.
                break
            sess.run(fg, options=run_option)

 coord.request_stop()
        # Wait for all the threads to terminate, give them 10s grace period
        coord.join(enqueue_threads, stop_grace_period_secs=10)
           
    except tf.errors.DeadlineExceededError:
        print('Out of range')



Output:




Reference







2018年2月22日 星期四

[TensorFlow] Save and Restore model


 TensorFlow    Python   Save and Restore  



Introduction


While we have trained a model and would like to apply it to prediction data, here are two ways to save and load it.

Checkpoint file

Generate .ckpt file by tf.train.Saver class, it includes variables and tensors.
The Variables are default saved in checkpoint file with Variable.name.
Notice that it stores the graph structure separately from the variable values.


ttf.train.Saver class adds ops to save and restore variables to and from checkpoints.
Checkpoints are binary files in a proprietary format which map variable names to tensor values.
The best way to examine the contents of a checkpoint is to load it by the Saver.

Savers can automatically number checkpoint filenames with a provided counter. This lets you keep multiple checkpoints at different steps while training a model. For example you can number the checkpoint filenames with the training step number. To avoid filling up disks, savers manage checkpoint files automatically. For example, they can keep only the N most recent files, or one checkpoint for every N hours of training.



Graph proto file

Use tf.train.write_graph to write GraphDef into a *.pd binary file.
We can use tf.train.write_graph to save the GraphDef.
However, a GraphDef cannot save the Variables and we will use tf.graph_util.convert_variables_to_constants to replace the variables in a graph with constants of the same values in the later sample code.





Environment


Python 3.6.2
TensorFlow 1.5.0
matplotlib  2.1.2



Implement


Before we started

Here are some of TensorFlow’s APIs we will use later.

3.  tf.argmax


Checkpoint file

Full Source code: Github

Save

# Set a global step with trainable = false
global_step = tf.Variable(0, name='global_step', trainable=False)

# Call this after declaring all tf.Variables.
saver = tf.train.Saver()

# This variable won't be stored, since it is declared after tf.train.Saver()
non_storable_variable = tf.Variable(777)

with tf.Session() as sess:
     tf.global_variables_initializer().run()
    for i in range(start, 100):
        global_step.assign(i).eval() # set and update global_step with index
        saver.save(sess, ckpt_dir + "/model.ckpt", global_step=global_step)




Restore

with tf.Session() as sess:
    # you need to initialize all variables
    tf.global_variables_initializer().run()

    # Load last train state
    ckpt = tf.train.get_checkpoint_state(ckpt_dir) # Returns CheckpointState from the "checkpoint" file
    if ckpt and ckpt.model_checkpoint_path:
        saver.restore(sess, ckpt.model_checkpoint_path) # restore all variables


Demo

The full sample code is here.

While I ran 10 steps,
start = global_step.eval()
for i in range(start, 10):
    # …

the output result is like following





TensorFlow generated 3 kinds of files (.data,.index,.meta) and only the latest 5 checkpoint files would be kept in default.
You can change the default max_to_keep number like this,
saver = tf.train.Saver(max_to_keep=6)



The 3 kinds of files as Checkpoint are

1.  meta file:
describes the saved graph structure, includes
GraphDef, SaverDef, and so on; then apply tf.train.import_meta_graph('/tmp/model.ckpt.meta'), will restore Saver and Graph.

2.  index file:
it is a string-string immutable table(tensorflow::table::Table). Each key is a name of a tensor and its value is a serialized BundleEntryProto. Each BundleEntryProto describes the metadata of a tensor: which of the "data" files contains the content of a tensor, the offset into that file, checksum, some auxiliary data, etc.

3.  data file:
it is TensorBundle collection, save the values of all variables.

After the first 10 steps, now I update the range as following and run the training again.

for i in range(start, 20):
    # …

Now the model started training from step 10.



Graph proto file

If you want to only save the GraphDef into binary file in the previous example.

# Launch the graph in a session
with tf.Session() as sess:
sess.run(train_op, feed_dict={})
      tf.train.write_graph(sess.graph_def, '/tmp/tfmodel','train.pbtxt')



Here is a sample code for saving and restoring constant in Graph proto file.

import tensorflow as tf
from tensorflow.python.framework.graph_util import convert_variables_to_constants
import os

a = tf.Variable([[1],[2]], dtype=tf.float32, name='a')
b = tf.Variable(3, dtype=tf.float32, name='b')

output = tf.add(a, b, name='out') # Tensor must have a name

graph_dir = "./graph_dir"
if not os.path.exists(graph_dir):
    os.makedirs(graph_dir)

# Save graph file
with tf.Session() as sess:
    tf.global_variables_initializer().run()
   
    # Convert Variable to constant, "out" is the name of the tensor
    graph = convert_variables_to_constants(sess, sess.graph_def, ["out"])
    tf.train.write_graph(graph, graph_dir,'graph.pb', as_text=False)

# Restore graph file
with tf.Session() as sess:
    with tf.gfile.FastGFile(os.path.join(graph_dir,'graph.pb'),'rb') as f:
        graph_def=tf.GraphDef()
        graph_def.ParseFromString(f.read())
        sess.graph.as_default()
        output = tf.import_graph_def(graph_def, return_elements=['out:0'])
        print(sess.run(output))



Output:





Reference