An extension of the standard Python queue that only stores elements for a given number of seconds before being removed. Useful if you don’t know the volume of data being added to the queue but need to limit it in some way.
There is no decrease in performance when removing items.
Without item removal
Queue size: 10000 real 0m10.771s |
With item removal
Queue size: 4659 real 0m10.765s |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
#!/usr/bin/python """A Time bounded queue, can use any Queue class as a base as long as it follows the standard Python Queue format (http://docs.python.org/2/library/queue.html).""" from collections import deque from time import time from Queue import Queue class TimeBoundQueue(Queue): def __init__(self, max_time_seconds): '''Elements older than max_time_seconds will be removed from the queue.''' Queue.__init__(self) self.timeout = max_time_seconds self.timeout_count = 100 def _put(self, item): '''Add element to queue. Creates an internal tuple of (timestamp, element), also triggers removal of timed out elements every 100 adds.''' self.timeout_count -= 1 if self.timeout_count <= 0: self.__timeout() self.timeout_count = 100 elem = (time(), item) self.queue.append(elem) def _get(self): return self.queue.popleft()[1] def __timeout(self): '''Remove oldest elements up to time()-max_time_seconds.''' valid_timestamp = time() - self.timeout completed = False while not completed: try: timestamp, item = self.queue[0] if timestamp < valid_timestamp: # Quicker to pop than to remove a value at random position. self.queue.popleft() else: completed = True except IndexError as e: # No elements left return if __name__ == "__main__": from time import sleep t_queue = TimeBoundQueue(5) for i in range(0, 10000): t_queue.put(i) sleep(0.001) print "Queue size: {0}".format(t_queue.qsize()) |
Questions? Comments?