  • queues are first in, first out (FIFO) abstract structures (i.e., items are removed in the same order they are added) that can be implemented with two arrays or a dynamic array (linked list), as long as items are added and removed from opposite sides.

  • queues support enqueue (add to one end) and dequeue (remove from the other end, or tail).

  • if implemented with a dynamic array, a more efficient solution is to use a circular queue (ring buffer), i.e., a fixed-size array and two pointers to indicate the starting and ending positions.

    • an advantage of circular queues is that we can use the spaces in front of the queue.
    • in a normal queue, once the queue becomes full, we cannot insert the next element even if there is a space in front of the queue. but using the circular queue, we can use the space to store new values.
  • queues are often used in breath-first search (where you store a list of nodes to be processed) or when implementing a cache.

  • in python, queues can be called with collections.deque() (and methods popleft() and insert()).

designing a circular queue

  • a circular queue can be built with either arrays or linked lists (nodes).

with an array

  • to build a ring with a fixed size array, any of the elements could be considered as the head.

class CircularQueue:

    def __init__(self, size):
        self.head = 0
        self.tail = 0
        self.size = size
        self.queue = [None] * self.size
    def enqueue(self, value: int) -> bool:
        if self.is_full():
            return False

        if self.is_empty():
            self.head = 0
        while self.queue[self.tail] is not None:
            self.tail += 1 
            if self.tail == self.size:
                self.tail = 0
        self.queue[self.tail] = value

        return True

    def dequeue(self) -> bool:
        if self.is_empty():
            return False

        value = self.queue[self.head]
        self.queue[self.head] = None
        self.head += 1

        if self.head == self.size:
            self.head = 0

        return True

    def front(self) -> int:
        return self.queue[self.head] or False
    def rear(self) -> int:
        return self.queue[self.tail] or False
    def is_empty(self) -> bool:
        for n in self.queue:
            if n is not None:
                return False
        return True

    def is_full(self) -> bool:
        for n in self.queue:
            if n is None:
                return False
        return True

  • to enqueue, you loop the queue with the tail index until you find a None (even if it has to loop back):

while queue[self.tail]:

      self.tail += 1 
      if self.tail == self.size:
            self.tail = 0
self.queue[self.tail] = value

  • to dequeue, you simply pop the head value:

value = self.queue[self.head]
self.queue[self.head] = None
self.head += 1

  • as long as we know the length of the queue, we can instantly locate its tails based on this formula:

tail_index = (head_index + current_length - 1) % size

  • there is one occasion when tail index is set to zero:

    • when the enqueue operation adds to the last position in the array and tail has to loop back (self.tail == self.size).
  • there are two occasions when head index is set to zero:

    • when the queue is checked as empty
    • when the dequeue operation popped the last element in the array and head has to loop back (self.head == self.size).

  • another version used only one index (for head) and calculate the tail with the equation:

(self.head + self.count - 1) % self.size

  • and the next tail with:

(self.head + self.count) % self.size

  • and the next head is always given by:

(self.head + 1) % self.size

    • in the example below we also implement the methods is_empty and is_full using an extra counter variable self.count that can be compared to self.size or 0 and used to validate rear and front.

class CircularQueue:

    def __init__(self, size):
        self.head = 0
        self.count = 0
        self.queue = [0] * size
        self.size = size

    def _get_tail(self):
        return (self.head + self.count - 1) % self.size

    def _get_next_tail(self):
        return (self.head + self.count) % self.size

    def _get_next_head(self):
        return (self.head + 1) % self.size

    def enqueue(self, value: int) -> bool:

        if self.is_empty():
          return False
        self.queue[self._get_next_tail()] = value
        self.count += 1
        return True

    def dequeue(self) -> bool:

        if self.is_empty():
          return False
        self.head = self._get_next_head()
        self.count -= 1
        return True

    def Front(self) -> int:
        if self.is_empty():
          return False
      return self.queue[self.head]

    def Rear(self) -> int:
        if self.is_empty():
          return False
        return self.queue[self._get_tail()]

    def isEmpty(self) -> bool:
        return self.count == 0

    def isFull(self) -> bool:
        return self.count == self.size

with linked lists

  • a linked list could be more memory efficient, since it does not pre-allocate memory for unused capacity (and size of the queue is not "artificial" like before).

  • note that this queue is not thread-safe: the data structure could be corrupted in a multi-threaded environment (as race-condition could occur). to mitigate this problem, one could add the protection of a lock.

class Node:
    def __init__(self, value, next=None):
        self.value = value = next

class Queue:

    def __init__(self, size):
        self.size = size
        self.count = 0
        self.head = None
        self.tail = None
    def enqueue(self, value: int) -> bool:
        if self.is_full():
            return False
        if self.is_empty():
            self.head = self.tail = Node(value)
   = Node(value)
            self.tail =
        self.count += 1
        return True

    def dequeue(self) -> bool:
        if self.is_empty():
            return False
        self.head =
        self.count -= 1
        return True

    def front(self) -> int:
        if self.is_empty():
            return False
        return self.head.value

    def rear(self) -> int:
        if self.is_empty():
            return False
        return self.tail.value
    def is_empty(self) -> bool:
        return self.count == 0

    def is_full(self) -> bool:
        return self.count == self.size

a stream with rate limiter

class Logger:

    def __init__(self):
        self.msg_set = set()
        self.msg_queue = deque()
    def print_message(self, timestamp, message) -> bool:
        while self.msg_queue:
            msg, msg_timestamp = self.msg_queue[0]
            if timestamp - msg_timestamp >= 10:
        if message not in self.msg_set:
            self.msg_queue.append((message, timestamp))
            return True
        return False

moving average

  • given a stream of integers and a window size, the moving average in the sliding window can be calculated with a queue:

class MovingAverage:

    def __init__(self, size: int):
        self.queue = []
        self.size = size

    def next(self, val: int) -> float:
        if len(self.queue) > self.size:
        return sum(self.queue) / len(self.queue)