Source code for AFL.automation.shared.MutableQueue

import threading
import warnings
from queue import Empty, Full
import time

[docs] class MutableQueue: '''Thread-safe, mutable queue Unlike the standard library CPython queue, this class supportes positional inserts, deletions, and reordering. The tradeoff is performance for both reads and writes to the queue. '''
[docs] def __init__(self): # Queue storage object self.queue = list() # Lock must be held whenever the queue list is mutated self.lock = threading.Lock() # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. self.not_empty = threading.Condition(self.lock) self.iteration_id = time.time()
[docs] def qsize(self): return len(self.queue)
[docs] def iterationid(self): return self.iteration_id
[docs] def empty(self): with self.lock: return not self.qsize()
def _put(self,item,loc): self.queue.insert(loc,item) self.iteration_id = time.time() def _get(self,loc=0): self.iteration_id = time.time() return self.queue.pop(loc)
[docs] def put(self,item,loc): '''Insert an item at the top of the queue''' with self.lock: self._put(item,loc) self.not_empty.notify()# notify any waiting threads
[docs] def remove(self,loc): '''Remove an item from the queue''' with self.lock: self.iteration_id = time.time() if loc>=self.qsize(): raise IndexError self._get(loc)
[docs] def get(self,loc=0,block=True,timeout=None): '''Get next item from queue''' if timeout is not None: warnings.warn('Timeout is included for compatibility but is not implemented') with self.not_empty:#implies self.lock if not block and not self.qsize(): raise Empty else: while not self.qsize(): self.not_empty.wait() #releases self.lock until notify if loc>=self.qsize(): raise IndexError return self._get(loc)
[docs] def move(self,old_index,new_index=None): '''Move item in queue''' with self.lock: self.iteration_id = time.time() if new_index is None: new_index = self.qsize() if old_index<new_index: self.queue.insert(new_index+1,self.queue[old_index]) del self.queue[old_index] elif old_index>new_index: self.queue.insert(new_index,self.queue[old_index]) del self.queue[old_index+1]
[docs] def clear(self): '''Remove all items from the queue''' with self.lock: self.queue.clear() self.iteration_id = time.time()