TensorFlow
Python Queue Thread
▌Introduction
Take a look at the GIF animation for
enqueueing and dequeueing from TensorFlow document.
▋Related articles
▌Environment
▋Python 3.6.2
▋TensorFlow 1.5.0
▌Implement
▋FIFOQueue
import tensorflow as tf
dummy_input = tf.random_normal([3], mean=0, stddev=1)
dummy_input = tf.Print(dummy_input, data=[dummy_input],
message='Create new dummy inputs: ', summarize=6)
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)
enqueue_op = q.enqueue_many(dummy_input)
data = q.dequeue()
data = tf.Print(data, data=[q.size(),data],
message='How many items are left in queue:
')
# create a fake graph that we can call upon
fg = data + 1
with tf.Session() as sess:
sess,run(enqueue_op)
for i in range(0,3):
sess.run(fg)
Output:
▋RandomShuffleQueue
import tensorflow as tf
dummy_input = tf.random_normal([10], mean=0, stddev=1)
dummy_input = tf.Print(dummy_input, data=[dummy_input],
message='Create new dummy inputs: ', summarize=6)
q = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2, dtypes=tf.float32)
enqueue_op = q.enqueue_many(dummy_input)
data = q.dequeue()
data = tf.Print(data, data=[q.size(),data],
message='How many items are left in queue:
')
# create a fake graph that we can call upon
fg = data + 1
with tf.Session() as sess:
sess.run(enqueue_op)
for i in range(0,8):
sess.run(fg)
Output:
▋Set timeout and notify out-of-range
While enqueueing, if the queue is full when this operation executes,
it will block until the element has been enqueued.
On the other hand, if the queue is empty when
executing dequeueing, it will block until there is an element to dequeue.
For example, the following code will fire the
block cuz we dequeue for 5 times while there no element in queue after the
third dequeue.
import tensorflow as tf
dummy_input = tf.random_normal([3], mean=0, stddev=1)
# … skip
with tf.Session() as sess:
sess,run(enqueue_op)
for i in range(0,5):
sess.run(fg)
So we update the above codes by setting timeout_in_ms(Time to wait
for operation to complete in milliseconds) into tf.RunOptions.
with tf.Session() as sess:
run_option =
tf.RunOptions(timeout_in_ms=5000)
try:
sess.run(enqueue_op, options=run_option)
for i in range(0,9):
sess.run(fg, options=run_option)
except
tf.errors.DeadlineExceededError:
print('Out of
range')
▋Threading and Queues
Though we can catch the
blocking, we would rather to make the queue always stay filled until the
dequeue action is stopped. In TensorFlow, we can use QueueRunners to fire one or many
asynchronous threads for enqueueing and prevent a queue from running dry.
# … Skip
enqueue_op = q.enqueue_many(dummy_input)
# Set a queue runner to handle enqueue_op outside the
main thread asynchronously
qr = tf.train.QueueRunner(q, enqueue_ops= [enqueue_op] * 1)
tf.train.add_queue_runner(qr)
with tf.Session() as sess:
enqueue_threads
= qr.create_threads(sess, start=True)
#
sess.run(enqueue_op, options=run_option) # Remove this line
Notice that
※ 1 is the number of thread we will create for enqueue_ops.
※ You can start the threads later by
enqueue_threads = qr.create_threads(sess, start=False)
for thread in
enqueue_threads:
thread.start()
But the above codes will
make the queue blocked again cuz we didn’t tell the threads to stop after we
stopped dequeue, and the queue exceeded its capacity in the end.
tf.train.Coordinator implements a simple mechanism to coordinate the termination of a set
of threads.
with tf.Session() as sess:
coord =
tf.train.Coordinator()
enqueue_threads
= qr.create_threads(sess, coord=coord, start=True)
run_option =
tf.RunOptions(timeout_in_ms=5000)
for i in range(0,10):
sess.run(fg, options=run_option)
coord.request_stop()
coord.join(enqueue_threads)
tf.train.Coordinator : request_stop()
Threads are given 'stop_grace_period_secs' seconds to terminate. If any of them is still alive after that period expires, a RuntimeError is raised.
tf.train.Coordinator : join()
Wait for all threads to
terminate. Here is an example from TensorFlow document.
try:
coord =
Coordinator()
# Start
a number of threads, passing the coordinator to each of them.
...start thread 1...(coord,
...)
...start thread
N...(coord, ...)
# Wait
for all the threads to terminate, give them 10s grace period
coord.join(threads, stop_grace_period_secs=10)
except RuntimeError:
...one of the
threads took more than 10s to stop after request_stop()
...was called.
except Exception:
...exception that
was passed to coord.request_stop()
▋Completed codes
Source code: Github
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32,
shapes=[])
dummy_input = tf.random_normal([3], mean=0, stddev=1)
dummy_input = tf.Print(dummy_input, data=[dummy_input],
message='Create new dummy inputs: ', summarize=6)
enqueue_op = q.enqueue_many(dummy_input)
# Set a queue runner to handle enqueue_op outside the
main thread asynchronously
qr = tf.train.QueueRunner(q, enqueue_ops=
[enqueue_op] * 1)
tf.train.add_queue_runner(qr)
data = q.dequeue_many(3)
data = tf.Print(data, data=[q.size(),data],
message='How many items are left in queue:
')
# create a fake graph that we can call upon
fg = data + 1
with tf.Session() as sess:
coord =
tf.train.Coordinator()
enqueue_threads
= qr.create_threads(sess, coord=coord, start=True)
run_option =
tf.RunOptions(timeout_in_ms=5000)
try:
for i in range(0,5):
if
coord.should_stop(): # coord.should_stop() returns True as soon
as coord.request_stop() has been called.
break
sess.run(fg, options=run_option)
coord.request_stop()
# Wait for
all the threads to terminate, give them 10s grace period
coord.join(enqueue_threads, stop_grace_period_secs=10)
except
tf.errors.DeadlineExceededError:
print('Out of
range')
Output:
▌Reference