2018年2月24日 星期六

[TensorFlow] Thread and Queue

 TensorFlow    Python   Queue   Thread 


Introduction


We will learn how to use the following queue in TensorFlow.


1.  FIFOQueue
Like the name, first in and first out. Often used on loading training data with order.

2.  RandomShuffleQueue
Dequeue with random order.



Environment


Python 3.6.2
TensorFlow 1.5.0



Implement


FIFOQueue

Reference:
adventuresinmachinelearning.com
Source code: Github

import tensorflow as tf
   
dummy_input = tf.random_normal([3], mean=0, stddev=1)
dummy_input = tf.Print(dummy_input, data=[dummy_input], message='Create new dummy inputs: ', summarize=6)

q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)
enqueue_op = q.enqueue_many(dummy_input)

data = q.dequeue()
data = tf.Print(data, data=[q.size(),data], message='How many items are left in queue: ')

# create a fake graph that we can call upon
fg = data + 1

with tf.Session() as sess:
    sess,run(enqueue_op)
    for i in range(0,3):
        sess.run(fg)
   
   


Output:




RandomShuffleQueue

Reference: adventuresinmachinelearning.com
Source code: Github


import tensorflow as tf
   
dummy_input = tf.random_normal([10], mean=0, stddev=1)
dummy_input = tf.Print(dummy_input, data=[dummy_input], message='Create new dummy inputs: ', summarize=6)

q = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2, dtypes=tf.float32)
enqueue_op = q.enqueue_many(dummy_input)

data = q.dequeue()
data = tf.Print(data, data=[q.size(),data], message='How many items are left in queue: ')
# create a fake graph that we can call upon
fg = data + 1

with tf.Session() as sess:
    sess.run(enqueue_op)
    for i in range(0,8):
        sess.run(fg)
   

Output:




Set timeout and notify out-of-range

While enqueueing, if the queue is full when this operation executes, it will block until the element has been enqueued.
On the other hand, if the queue is empty when executing dequeueing, it will block until there is an element to dequeue.

For example, the following code will fire the block cuz we dequeue for 5 times while there no element in queue after the third dequeue.

import tensorflow as tf
   
dummy_input = tf.random_normal([3], mean=0, stddev=1)
# … skip
with tf.Session() as sess:
    sess,run(enqueue_op)
    for i in range(0,5):
        sess.run(fg)

So we update the above codes by setting timeout_in_ms(Time to wait for operation to complete in milliseconds) into tf.RunOptions.

with tf.Session() as sess:

    run_option = tf.RunOptions(timeout_in_ms=5000)
    try:
        sess.run(enqueue_op, options=run_option)
        for i in range(0,9):
            sess.run(fg, options=run_option)
    except tf.errors.DeadlineExceededError:
        print('Out of range')



Threading and Queues

Though we can catch the blocking, we would rather to make the queue always stay filled until the dequeue action is stopped. In TensorFlow, we can use QueueRunners to fire one or many asynchronous threads for enqueueing and prevent a queue from running dry.

# … Skip
enqueue_op = q.enqueue_many(dummy_input)

# Set a queue runner to handle enqueue_op outside the main thread asynchronously
qr = tf.train.QueueRunner(q, enqueue_ops= [enqueue_op] * 1)
tf.train.add_queue_runner(qr)

with tf.Session() as sess:
    enqueue_threads = qr.create_threads(sess, start=True)
    # sess.run(enqueue_op, options=run_option) # Remove this line


Notice that
1 is the number of thread we will create for enqueue_ops.
You can start the threads later by
enqueue_threads = qr.create_threads(sess, start=False)
for thread in enqueue_threads:
     thread.start()


But the above codes will make the queue blocked again cuz we didn’t tell the threads to stop after we stopped dequeue, and the queue exceeded its capacity in the end.

tf.train.Coordinator implements a simple mechanism to coordinate the termination of a set of threads.


with tf.Session() as sess:
    coord = tf.train.Coordinator()
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)

    run_option = tf.RunOptions(timeout_in_ms=5000)
    for i in range(0,10):
        sess.run(fg, options=run_option)

    coord.request_stop()
    coord.join(enqueue_threads)        

tf.train.Coordinator : request_stop()

Threads are given 'stop_grace_period_secs' seconds to terminate. If any of them is still alive after that period expires, a RuntimeError is raised.



Wait for all threads to terminate. Here is an example from TensorFlow document.

try:
  coord = Coordinator()
  # Start a number of threads, passing the coordinator to each of them.
  ...start thread 1...(coord, ...)
  ...start thread N...(coord, ...)
  # Wait for all the threads to terminate, give them 10s grace period
  coord.join(threads, stop_grace_period_secs=10)
except RuntimeError:
  ...one of the threads took more than 10s to stop after request_stop()
  ...was called.
except Exception:
  ...exception that was passed to coord.request_stop()




Completed codes

Source code: Github

q = tf.FIFOQueue(capacity=3, dtypes=tf.float32, shapes=[])

dummy_input = tf.random_normal([3], mean=0, stddev=1)
dummy_input = tf.Print(dummy_input, data=[dummy_input], message='Create new dummy inputs: ', summarize=6)
enqueue_op = q.enqueue_many(dummy_input)

# Set a queue runner to handle enqueue_op outside the main thread asynchronously
qr = tf.train.QueueRunner(q, enqueue_ops= [enqueue_op] * 1)
tf.train.add_queue_runner(qr)

data = q.dequeue_many(3)
data = tf.Print(data, data=[q.size(),data], message='How many items are left in queue: ')
# create a fake graph that we can call upon
fg = data + 1

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)

    run_option = tf.RunOptions(timeout_in_ms=5000)
    try:
        for i in range(0,5):
            if coord.should_stop(): # coord.should_stop() returns True as soon as coord.request_stop() has been called.
                break
            sess.run(fg, options=run_option)

 coord.request_stop()
        # Wait for all the threads to terminate, give them 10s grace period
        coord.join(enqueue_threads, stop_grace_period_secs=10)
           
    except tf.errors.DeadlineExceededError:
        print('Out of range')



Output:




Reference







沒有留言:

張貼留言