Lock-free Immutable ConcurrentQueue
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
3
down vote
favorite
Similar to the code review I posted last week for an agent-based immutable replacement for ConcurrentDictionary
, I have also created an agent-based immutable replacement for ConcurrentQueue
. This uses a MailboxProcessor
and an immutable queue based on Okasaki's implementation in Purely Functional Data Structures with a few extra operations. I am particularly interested in understanding if there's any way I can combine the QueueAgent
and the InternalQueueAgent
into one type (without the mutual-recursion), and if there's any way to do the asynchronous Peek
and Dequeue
operations without the internal ImmutableQueue
s for the PeekListeners
and DequeueListeners
. The idea behind those operations is to support a "yield until a message is available" behavior similar to an asynchronous Peek
or Receive
operation on MSMQ or RabbitMQ. I also welcome any general feedback on the implementation.
My code for the immutable queue is as follows:
open System.Collections.Generic
/// An F# Immutable Queue, based on Okasaki's implementation in Purely-Functional Data Structures
type ImmutableQueue<'message> private (front: 'message list, rear: 'message list) =
let enqueue message =
match front, message::rear with
| , newRear -> ImmutableQueue(newRear |> List.rev, )
| _, newRear -> ImmutableQueue(front, newRear)
let enqueueAll messages =
let orderedMessages = messages |> List.rev
match front, orderedMessages@rear with
| , newRear -> ImmutableQueue(newRear |> List.rev, )
| _, newRear -> ImmutableQueue(front, newRear)
let dequeue () =
match front with
| message::tail ->
message, (match tail with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(tail, rear))
| _ -> failwith "Cannot dequeue from empty queue!"
let dequeueAll () =
(front @ (rear |> List.rev), ImmutableQueue<'message>(, ) )
let tryDequeue () =
match front with
| message::tail ->
(message, (match tail with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(tail, rear)))
|> Some
| _ -> None
let tryPeek () =
match front with
| message::tail -> Some message
| _ -> None
let reverse () =
match front with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(front, rear)
let getEnumerator () =
(seq > List.rev
).GetEnumerator()
static member Empty = ImmutableQueue<'message>(, )
static member From messages = ImmutableQueue<'message>(messages, )
member __.IsEmpty = front.IsEmpty && rear.IsEmpty
member __.Length = front.Length + rear.Length
member __.HasMessages = front.IsEmpty |> not
member __.Enqueue message = enqueue message
member __.EnqueueAll messages = enqueueAll messages
member __.Dequeue () = dequeue ()
member __.DequeueAll () = dequeueAll ()
member __.TryDequeue () = tryDequeue()
member __.TryPeek () = tryPeek()
member __.Reverse () = reverse()
member __.GetEnumerator () = getEnumerator()
interface IEnumerable<'message> with
member this.GetEnumerator () = this.GetEnumerator()
interface System.Collections.IEnumerable with
member this.GetEnumerator () = this.GetEnumerator() :> System.Collections.IEnumerator
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Queue =
/// Create an empty queue of the given message type
let empty<'message> = ImmutableQueue<'message>.Empty
/// Enqueue a message in the given queue
let inline enqueue message (queue: ImmutableQueue<'message>) = queue.Enqueue message
/// Enqueue all of the provided messages in the given queue
let inline enqueueAll messages (queue: ImmutableQueue<'message>) = queue.EnqueueAll messages
/// Check if the given queue is empty
let inline isEmpty (queue: ImmutableQueue<'message>) = queue.IsEmpty
/// Compute the length (number of messages) of the given queue
let inline length (queue: ImmutableQueue<'message>) = queue.Length
/// Check if the given queue contains any messages
let inline hasMessages (queue: ImmutableQueue<'message>) = queue.HasMessages
/// Create a queue from an F# list
let inline ofList messages = messages |> ImmutableQueue.From
/// Create a queue fron an F# sequence
let inline ofSeq messages = messages |> Seq.toList |> ofList
/// Dequeue the message at the front of the given queue
let inline dequeue (queue: ImmutableQueue<'message>) = queue.Dequeue()
/// Dequeue all the messages from the given queue
let inline dequeueAll (queue: ImmutableQueue<'message>) = queue.DequeueAll()
/// Try to dequeue the message at the front of the given queue
let inline tryDequeue (queue: ImmutableQueue<'message>) = queue.TryDequeue()
/// Try to peek the message at the front of the given queue
let inline tryPeek (queue: ImmutableQueue<'message>) = queue.TryPeek()
/// Reverse the order of all messages in the given queue
let inline rev (queue: ImmutableQueue<'message>) = queue.Reverse()
And here's my implementation of QueueAgent
:
open System.Collections.Concurrent
open System.Collections.Generic
type private QueueMessage<'a> =
| Enqueue of 'a
| EnqueueAll of 'a list
| TryDequeue of AsyncReplyChannel<'a option>
| TryPeek of AsyncReplyChannel<'a option>
| Dequeue of AsyncReplyChannel<'a>
| DequeueAll of AsyncReplyChannel<'a seq>
| Peek of AsyncReplyChannel<'a>
| Count of AsyncReplyChannel<int>
| GetAll of AsyncReplyChannel<'a seq>
type private InternalQueueMessage<'a> =
| AddDequeueListener of AsyncReplyChannel<'a>
| AddPeekListener of AsyncReplyChannel<'a>
| ItemEnqueued of QueueAgent<'a>
and private Listeners<'a> =
PeekListeners: ImmutableQueue<AsyncReplyChannel<'a>>
DequeueListeners: ImmutableQueue<AsyncReplyChannel<'a>>
static member Empty() = PeekListeners = ImmutableQueue<AsyncReplyChannel<'a>>.Empty; DequeueListeners = ImmutableQueue<AsyncReplyChannel<'a>>.Empty
and QueueAgent<'a> () as this =
let internalQueue = InternalQueueAgent<'a>()
let agent =
MailboxProcessor<QueueMessage<'a>>.Start
<| fun inbox ->
let rec loop state =
async Peek channel ->
match state
loop ImmutableQueue<'a>.Empty
let enqueue item =
agent.Post <| Enqueue item
let enqueueAll items =
agent.Post <| EnqueueAll items
let tryDequeue () =
agent.PostAndReply TryDequeue
let asyncTryDequeue () =
agent.PostAndAsyncReply TryDequeue
let tryPeek () =
agent.PostAndReply TryPeek
let asyncTryPeek () =
agent.PostAndAsyncReply TryPeek
let dequeue () =
agent.PostAndReply Dequeue
let asyncDequeue () =
agent.PostAndAsyncReply Dequeue
let dequeueAll () =
agent.PostAndReply DequeueAll
let asyncDequeueAll () =
agent.PostAndAsyncReply DequeueAll
let peek () =
agent.PostAndReply Peek
let asyncPeek () =
agent.PostAndAsyncReply Peek
let count () =
agent.PostAndReply Count
let asyncCount () =
agent.PostAndAsyncReply Count
let getAll () =
agent.PostAndReply GetAll
let asyncGetAll () =
agent.PostAndAsyncReply GetAll
member __.Enqueue item = enqueue item
member __.EnqueueAll items = enqueueAll items
member __.TryDequeue () = tryDequeue ()
member __.AsyncTryDequeue () = asyncTryDequeue ()
member __.TryPeek () = tryPeek ()
member __.AsyncTryPeek () = asyncTryPeek ()
member __.Dequeue () = dequeue ()
member __.AsyncDequeue () = asyncDequeue ()
member __.DequeueAll () = dequeueAll ()
member __.AsyncDequeueAll () = asyncDequeueAll ()
member __.Peek () = peek ()
member __.AsyncPeek () = asyncPeek ()
member __.Count = count()
member __.AsyncCount () = asyncCount ()
member __.GetAll () = getAll ()
member __.AsyncGetAll () = asyncGetAll ()
interface IEnumerable<'a> with
member __.GetEnumerator () = (getAll () :> IEnumerable<'a>).GetEnumerator()
interface System.Collections.IEnumerable with
member __.GetEnumerator () = (getAll () :> System.Collections.IEnumerable).GetEnumerator()
interface IProducerConsumerCollection<'a> with
member __.CopyTo (array: 'a array, index) = getAll () |> Seq.iteri (fun i item -> array.[index + i] <- item)
member __.CopyTo (array: System.Array, index) = getAll () |> Seq.iteri (fun i item -> array.SetValue(item, index + i))
member __.TryAdd item = enqueue item;true
member __.TryTake item =
match tryDequeue () with
| Some element -> item <- element;true
| None -> false
member __.ToArray () = getAll () |> Seq.toArray
member __.Count = count ()
member __.SyncRoot = this |> box
member __.IsSynchronized = true
and private InternalQueueAgent<'a> () =
let agent =
MailboxProcessor<InternalQueueMessage<'a>>.Start
<| fun inbox ->
let rec loop (state: Listeners<'a>) =
async
let! message = inbox.Receive()
match message with
loop <| Listeners<'a>.Empty()
member __.Post message = agent.Post message
And finally, here are my unit tests for QueueAgent
:
open Microsoft.VisualStudio.TestTools.UnitTesting
[<AutoOpen>]
module Common =
let inline equal expected actual = Assert.AreEqual(expected, actual)
let inline notEqual expected actual = Assert.AreNotEqual(expected, actual)
let inline isTrue value = Assert.IsTrue(value)
let inline isFalse value = Assert.IsFalse(value)
[<TestClass>]
type QueueAgentTests () =
[<TestMethod>]
member __.``Enqueueing should add a message to the queue`` () =
async > equal 3
[<TestMethod>]
member __.``Dequeueing should wait for message to be enqueued, then remove the message from the queue`` () =
async > equal "test"
let! count = queue.AsyncCount()
count |> Async.RunSynchronously
[<TestMethod>]
member __.``Peeking should return the first message in the queue without removing it`` () =
async > equal "test"
let! count = queue.AsyncCount()
count |> Async.RunSynchronously
[<TestMethod>]
member __.``TryDequeue should return None if there are no messages in the queue`` () =
async
let queue = QueueAgent<string>()
let! dequeueResult = queue.AsyncTryDequeue()
dequeueResult |> Async.RunSynchronously
[<TestMethod>]
member __.``TryDequeue should return Some if there are messages in the queue, and remove one message from the queue`` () =
async > equal 2
|> Async.RunSynchronously
[<TestMethod>]
member __.``TryPeek should return None if there are no messages in the queue`` () =
async > equal None
|> Async.RunSynchronously
[<TestMethod>]
member __.``TryPeek should return Some if there are messages in the queue, and not modify the queue`` () =
async > equal < |> Async.RunSynchronously
[<TestMethod>]
member __.``EnqueueAll should enqueue all elements of a list in the queue`` () =
async
let queue = QueueAgent<string>()
let list = [1..10] |> Async.RunSynchronously
[<TestMethod>]
member __.``DequeueAll should remove all elements from the queue`` () =
async
let queue = QueueAgent<string>()
let list = [1..10] |> Async.RunSynchronously
[<TestMethod>]
member __.``QueueAgent should be thread-safe`` () =
let queue = QueueAgent<string>()
[1..10]
|> List.map (fun i ->
async
queue.Enqueue <)
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
queue.Count |> equal 10
[1..5]
|> List.map (fun _ -> queue.AsyncDequeue())
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
queue.Count |> equal 5
functional-programming f# queue lock-free immutability
add a comment |Â
up vote
3
down vote
favorite
Similar to the code review I posted last week for an agent-based immutable replacement for ConcurrentDictionary
, I have also created an agent-based immutable replacement for ConcurrentQueue
. This uses a MailboxProcessor
and an immutable queue based on Okasaki's implementation in Purely Functional Data Structures with a few extra operations. I am particularly interested in understanding if there's any way I can combine the QueueAgent
and the InternalQueueAgent
into one type (without the mutual-recursion), and if there's any way to do the asynchronous Peek
and Dequeue
operations without the internal ImmutableQueue
s for the PeekListeners
and DequeueListeners
. The idea behind those operations is to support a "yield until a message is available" behavior similar to an asynchronous Peek
or Receive
operation on MSMQ or RabbitMQ. I also welcome any general feedback on the implementation.
My code for the immutable queue is as follows:
open System.Collections.Generic
/// An F# Immutable Queue, based on Okasaki's implementation in Purely-Functional Data Structures
type ImmutableQueue<'message> private (front: 'message list, rear: 'message list) =
let enqueue message =
match front, message::rear with
| , newRear -> ImmutableQueue(newRear |> List.rev, )
| _, newRear -> ImmutableQueue(front, newRear)
let enqueueAll messages =
let orderedMessages = messages |> List.rev
match front, orderedMessages@rear with
| , newRear -> ImmutableQueue(newRear |> List.rev, )
| _, newRear -> ImmutableQueue(front, newRear)
let dequeue () =
match front with
| message::tail ->
message, (match tail with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(tail, rear))
| _ -> failwith "Cannot dequeue from empty queue!"
let dequeueAll () =
(front @ (rear |> List.rev), ImmutableQueue<'message>(, ) )
let tryDequeue () =
match front with
| message::tail ->
(message, (match tail with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(tail, rear)))
|> Some
| _ -> None
let tryPeek () =
match front with
| message::tail -> Some message
| _ -> None
let reverse () =
match front with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(front, rear)
let getEnumerator () =
(seq > List.rev
).GetEnumerator()
static member Empty = ImmutableQueue<'message>(, )
static member From messages = ImmutableQueue<'message>(messages, )
member __.IsEmpty = front.IsEmpty && rear.IsEmpty
member __.Length = front.Length + rear.Length
member __.HasMessages = front.IsEmpty |> not
member __.Enqueue message = enqueue message
member __.EnqueueAll messages = enqueueAll messages
member __.Dequeue () = dequeue ()
member __.DequeueAll () = dequeueAll ()
member __.TryDequeue () = tryDequeue()
member __.TryPeek () = tryPeek()
member __.Reverse () = reverse()
member __.GetEnumerator () = getEnumerator()
interface IEnumerable<'message> with
member this.GetEnumerator () = this.GetEnumerator()
interface System.Collections.IEnumerable with
member this.GetEnumerator () = this.GetEnumerator() :> System.Collections.IEnumerator
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Queue =
/// Create an empty queue of the given message type
let empty<'message> = ImmutableQueue<'message>.Empty
/// Enqueue a message in the given queue
let inline enqueue message (queue: ImmutableQueue<'message>) = queue.Enqueue message
/// Enqueue all of the provided messages in the given queue
let inline enqueueAll messages (queue: ImmutableQueue<'message>) = queue.EnqueueAll messages
/// Check if the given queue is empty
let inline isEmpty (queue: ImmutableQueue<'message>) = queue.IsEmpty
/// Compute the length (number of messages) of the given queue
let inline length (queue: ImmutableQueue<'message>) = queue.Length
/// Check if the given queue contains any messages
let inline hasMessages (queue: ImmutableQueue<'message>) = queue.HasMessages
/// Create a queue from an F# list
let inline ofList messages = messages |> ImmutableQueue.From
/// Create a queue fron an F# sequence
let inline ofSeq messages = messages |> Seq.toList |> ofList
/// Dequeue the message at the front of the given queue
let inline dequeue (queue: ImmutableQueue<'message>) = queue.Dequeue()
/// Dequeue all the messages from the given queue
let inline dequeueAll (queue: ImmutableQueue<'message>) = queue.DequeueAll()
/// Try to dequeue the message at the front of the given queue
let inline tryDequeue (queue: ImmutableQueue<'message>) = queue.TryDequeue()
/// Try to peek the message at the front of the given queue
let inline tryPeek (queue: ImmutableQueue<'message>) = queue.TryPeek()
/// Reverse the order of all messages in the given queue
let inline rev (queue: ImmutableQueue<'message>) = queue.Reverse()
And here's my implementation of QueueAgent
:
open System.Collections.Concurrent
open System.Collections.Generic
type private QueueMessage<'a> =
| Enqueue of 'a
| EnqueueAll of 'a list
| TryDequeue of AsyncReplyChannel<'a option>
| TryPeek of AsyncReplyChannel<'a option>
| Dequeue of AsyncReplyChannel<'a>
| DequeueAll of AsyncReplyChannel<'a seq>
| Peek of AsyncReplyChannel<'a>
| Count of AsyncReplyChannel<int>
| GetAll of AsyncReplyChannel<'a seq>
type private InternalQueueMessage<'a> =
| AddDequeueListener of AsyncReplyChannel<'a>
| AddPeekListener of AsyncReplyChannel<'a>
| ItemEnqueued of QueueAgent<'a>
and private Listeners<'a> =
PeekListeners: ImmutableQueue<AsyncReplyChannel<'a>>
DequeueListeners: ImmutableQueue<AsyncReplyChannel<'a>>
static member Empty() = PeekListeners = ImmutableQueue<AsyncReplyChannel<'a>>.Empty; DequeueListeners = ImmutableQueue<AsyncReplyChannel<'a>>.Empty
and QueueAgent<'a> () as this =
let internalQueue = InternalQueueAgent<'a>()
let agent =
MailboxProcessor<QueueMessage<'a>>.Start
<| fun inbox ->
let rec loop state =
async Peek channel ->
match state
loop ImmutableQueue<'a>.Empty
let enqueue item =
agent.Post <| Enqueue item
let enqueueAll items =
agent.Post <| EnqueueAll items
let tryDequeue () =
agent.PostAndReply TryDequeue
let asyncTryDequeue () =
agent.PostAndAsyncReply TryDequeue
let tryPeek () =
agent.PostAndReply TryPeek
let asyncTryPeek () =
agent.PostAndAsyncReply TryPeek
let dequeue () =
agent.PostAndReply Dequeue
let asyncDequeue () =
agent.PostAndAsyncReply Dequeue
let dequeueAll () =
agent.PostAndReply DequeueAll
let asyncDequeueAll () =
agent.PostAndAsyncReply DequeueAll
let peek () =
agent.PostAndReply Peek
let asyncPeek () =
agent.PostAndAsyncReply Peek
let count () =
agent.PostAndReply Count
let asyncCount () =
agent.PostAndAsyncReply Count
let getAll () =
agent.PostAndReply GetAll
let asyncGetAll () =
agent.PostAndAsyncReply GetAll
member __.Enqueue item = enqueue item
member __.EnqueueAll items = enqueueAll items
member __.TryDequeue () = tryDequeue ()
member __.AsyncTryDequeue () = asyncTryDequeue ()
member __.TryPeek () = tryPeek ()
member __.AsyncTryPeek () = asyncTryPeek ()
member __.Dequeue () = dequeue ()
member __.AsyncDequeue () = asyncDequeue ()
member __.DequeueAll () = dequeueAll ()
member __.AsyncDequeueAll () = asyncDequeueAll ()
member __.Peek () = peek ()
member __.AsyncPeek () = asyncPeek ()
member __.Count = count()
member __.AsyncCount () = asyncCount ()
member __.GetAll () = getAll ()
member __.AsyncGetAll () = asyncGetAll ()
interface IEnumerable<'a> with
member __.GetEnumerator () = (getAll () :> IEnumerable<'a>).GetEnumerator()
interface System.Collections.IEnumerable with
member __.GetEnumerator () = (getAll () :> System.Collections.IEnumerable).GetEnumerator()
interface IProducerConsumerCollection<'a> with
member __.CopyTo (array: 'a array, index) = getAll () |> Seq.iteri (fun i item -> array.[index + i] <- item)
member __.CopyTo (array: System.Array, index) = getAll () |> Seq.iteri (fun i item -> array.SetValue(item, index + i))
member __.TryAdd item = enqueue item;true
member __.TryTake item =
match tryDequeue () with
| Some element -> item <- element;true
| None -> false
member __.ToArray () = getAll () |> Seq.toArray
member __.Count = count ()
member __.SyncRoot = this |> box
member __.IsSynchronized = true
and private InternalQueueAgent<'a> () =
let agent =
MailboxProcessor<InternalQueueMessage<'a>>.Start
<| fun inbox ->
let rec loop (state: Listeners<'a>) =
async
let! message = inbox.Receive()
match message with
loop <| Listeners<'a>.Empty()
member __.Post message = agent.Post message
And finally, here are my unit tests for QueueAgent
:
open Microsoft.VisualStudio.TestTools.UnitTesting
[<AutoOpen>]
module Common =
let inline equal expected actual = Assert.AreEqual(expected, actual)
let inline notEqual expected actual = Assert.AreNotEqual(expected, actual)
let inline isTrue value = Assert.IsTrue(value)
let inline isFalse value = Assert.IsFalse(value)
[<TestClass>]
type QueueAgentTests () =
[<TestMethod>]
member __.``Enqueueing should add a message to the queue`` () =
async > equal 3
[<TestMethod>]
member __.``Dequeueing should wait for message to be enqueued, then remove the message from the queue`` () =
async > equal "test"
let! count = queue.AsyncCount()
count |> Async.RunSynchronously
[<TestMethod>]
member __.``Peeking should return the first message in the queue without removing it`` () =
async > equal "test"
let! count = queue.AsyncCount()
count |> Async.RunSynchronously
[<TestMethod>]
member __.``TryDequeue should return None if there are no messages in the queue`` () =
async
let queue = QueueAgent<string>()
let! dequeueResult = queue.AsyncTryDequeue()
dequeueResult |> Async.RunSynchronously
[<TestMethod>]
member __.``TryDequeue should return Some if there are messages in the queue, and remove one message from the queue`` () =
async > equal 2
|> Async.RunSynchronously
[<TestMethod>]
member __.``TryPeek should return None if there are no messages in the queue`` () =
async > equal None
|> Async.RunSynchronously
[<TestMethod>]
member __.``TryPeek should return Some if there are messages in the queue, and not modify the queue`` () =
async > equal < |> Async.RunSynchronously
[<TestMethod>]
member __.``EnqueueAll should enqueue all elements of a list in the queue`` () =
async
let queue = QueueAgent<string>()
let list = [1..10] |> Async.RunSynchronously
[<TestMethod>]
member __.``DequeueAll should remove all elements from the queue`` () =
async
let queue = QueueAgent<string>()
let list = [1..10] |> Async.RunSynchronously
[<TestMethod>]
member __.``QueueAgent should be thread-safe`` () =
let queue = QueueAgent<string>()
[1..10]
|> List.map (fun i ->
async
queue.Enqueue <)
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
queue.Count |> equal 10
[1..5]
|> List.map (fun _ -> queue.AsyncDequeue())
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
queue.Count |> equal 5
functional-programming f# queue lock-free immutability
add a comment |Â
up vote
3
down vote
favorite
up vote
3
down vote
favorite
Similar to the code review I posted last week for an agent-based immutable replacement for ConcurrentDictionary
, I have also created an agent-based immutable replacement for ConcurrentQueue
. This uses a MailboxProcessor
and an immutable queue based on Okasaki's implementation in Purely Functional Data Structures with a few extra operations. I am particularly interested in understanding if there's any way I can combine the QueueAgent
and the InternalQueueAgent
into one type (without the mutual-recursion), and if there's any way to do the asynchronous Peek
and Dequeue
operations without the internal ImmutableQueue
s for the PeekListeners
and DequeueListeners
. The idea behind those operations is to support a "yield until a message is available" behavior similar to an asynchronous Peek
or Receive
operation on MSMQ or RabbitMQ. I also welcome any general feedback on the implementation.
My code for the immutable queue is as follows:
open System.Collections.Generic
/// An F# Immutable Queue, based on Okasaki's implementation in Purely-Functional Data Structures
type ImmutableQueue<'message> private (front: 'message list, rear: 'message list) =
let enqueue message =
match front, message::rear with
| , newRear -> ImmutableQueue(newRear |> List.rev, )
| _, newRear -> ImmutableQueue(front, newRear)
let enqueueAll messages =
let orderedMessages = messages |> List.rev
match front, orderedMessages@rear with
| , newRear -> ImmutableQueue(newRear |> List.rev, )
| _, newRear -> ImmutableQueue(front, newRear)
let dequeue () =
match front with
| message::tail ->
message, (match tail with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(tail, rear))
| _ -> failwith "Cannot dequeue from empty queue!"
let dequeueAll () =
(front @ (rear |> List.rev), ImmutableQueue<'message>(, ) )
let tryDequeue () =
match front with
| message::tail ->
(message, (match tail with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(tail, rear)))
|> Some
| _ -> None
let tryPeek () =
match front with
| message::tail -> Some message
| _ -> None
let reverse () =
match front with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(front, rear)
let getEnumerator () =
(seq > List.rev
).GetEnumerator()
static member Empty = ImmutableQueue<'message>(, )
static member From messages = ImmutableQueue<'message>(messages, )
member __.IsEmpty = front.IsEmpty && rear.IsEmpty
member __.Length = front.Length + rear.Length
member __.HasMessages = front.IsEmpty |> not
member __.Enqueue message = enqueue message
member __.EnqueueAll messages = enqueueAll messages
member __.Dequeue () = dequeue ()
member __.DequeueAll () = dequeueAll ()
member __.TryDequeue () = tryDequeue()
member __.TryPeek () = tryPeek()
member __.Reverse () = reverse()
member __.GetEnumerator () = getEnumerator()
interface IEnumerable<'message> with
member this.GetEnumerator () = this.GetEnumerator()
interface System.Collections.IEnumerable with
member this.GetEnumerator () = this.GetEnumerator() :> System.Collections.IEnumerator
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Queue =
/// Create an empty queue of the given message type
let empty<'message> = ImmutableQueue<'message>.Empty
/// Enqueue a message in the given queue
let inline enqueue message (queue: ImmutableQueue<'message>) = queue.Enqueue message
/// Enqueue all of the provided messages in the given queue
let inline enqueueAll messages (queue: ImmutableQueue<'message>) = queue.EnqueueAll messages
/// Check if the given queue is empty
let inline isEmpty (queue: ImmutableQueue<'message>) = queue.IsEmpty
/// Compute the length (number of messages) of the given queue
let inline length (queue: ImmutableQueue<'message>) = queue.Length
/// Check if the given queue contains any messages
let inline hasMessages (queue: ImmutableQueue<'message>) = queue.HasMessages
/// Create a queue from an F# list
let inline ofList messages = messages |> ImmutableQueue.From
/// Create a queue fron an F# sequence
let inline ofSeq messages = messages |> Seq.toList |> ofList
/// Dequeue the message at the front of the given queue
let inline dequeue (queue: ImmutableQueue<'message>) = queue.Dequeue()
/// Dequeue all the messages from the given queue
let inline dequeueAll (queue: ImmutableQueue<'message>) = queue.DequeueAll()
/// Try to dequeue the message at the front of the given queue
let inline tryDequeue (queue: ImmutableQueue<'message>) = queue.TryDequeue()
/// Try to peek the message at the front of the given queue
let inline tryPeek (queue: ImmutableQueue<'message>) = queue.TryPeek()
/// Reverse the order of all messages in the given queue
let inline rev (queue: ImmutableQueue<'message>) = queue.Reverse()
And here's my implementation of QueueAgent
:
open System.Collections.Concurrent
open System.Collections.Generic
type private QueueMessage<'a> =
| Enqueue of 'a
| EnqueueAll of 'a list
| TryDequeue of AsyncReplyChannel<'a option>
| TryPeek of AsyncReplyChannel<'a option>
| Dequeue of AsyncReplyChannel<'a>
| DequeueAll of AsyncReplyChannel<'a seq>
| Peek of AsyncReplyChannel<'a>
| Count of AsyncReplyChannel<int>
| GetAll of AsyncReplyChannel<'a seq>
type private InternalQueueMessage<'a> =
| AddDequeueListener of AsyncReplyChannel<'a>
| AddPeekListener of AsyncReplyChannel<'a>
| ItemEnqueued of QueueAgent<'a>
and private Listeners<'a> =
PeekListeners: ImmutableQueue<AsyncReplyChannel<'a>>
DequeueListeners: ImmutableQueue<AsyncReplyChannel<'a>>
static member Empty() = PeekListeners = ImmutableQueue<AsyncReplyChannel<'a>>.Empty; DequeueListeners = ImmutableQueue<AsyncReplyChannel<'a>>.Empty
and QueueAgent<'a> () as this =
let internalQueue = InternalQueueAgent<'a>()
let agent =
MailboxProcessor<QueueMessage<'a>>.Start
<| fun inbox ->
let rec loop state =
async Peek channel ->
match state
loop ImmutableQueue<'a>.Empty
let enqueue item =
agent.Post <| Enqueue item
let enqueueAll items =
agent.Post <| EnqueueAll items
let tryDequeue () =
agent.PostAndReply TryDequeue
let asyncTryDequeue () =
agent.PostAndAsyncReply TryDequeue
let tryPeek () =
agent.PostAndReply TryPeek
let asyncTryPeek () =
agent.PostAndAsyncReply TryPeek
let dequeue () =
agent.PostAndReply Dequeue
let asyncDequeue () =
agent.PostAndAsyncReply Dequeue
let dequeueAll () =
agent.PostAndReply DequeueAll
let asyncDequeueAll () =
agent.PostAndAsyncReply DequeueAll
let peek () =
agent.PostAndReply Peek
let asyncPeek () =
agent.PostAndAsyncReply Peek
let count () =
agent.PostAndReply Count
let asyncCount () =
agent.PostAndAsyncReply Count
let getAll () =
agent.PostAndReply GetAll
let asyncGetAll () =
agent.PostAndAsyncReply GetAll
member __.Enqueue item = enqueue item
member __.EnqueueAll items = enqueueAll items
member __.TryDequeue () = tryDequeue ()
member __.AsyncTryDequeue () = asyncTryDequeue ()
member __.TryPeek () = tryPeek ()
member __.AsyncTryPeek () = asyncTryPeek ()
member __.Dequeue () = dequeue ()
member __.AsyncDequeue () = asyncDequeue ()
member __.DequeueAll () = dequeueAll ()
member __.AsyncDequeueAll () = asyncDequeueAll ()
member __.Peek () = peek ()
member __.AsyncPeek () = asyncPeek ()
member __.Count = count()
member __.AsyncCount () = asyncCount ()
member __.GetAll () = getAll ()
member __.AsyncGetAll () = asyncGetAll ()
interface IEnumerable<'a> with
member __.GetEnumerator () = (getAll () :> IEnumerable<'a>).GetEnumerator()
interface System.Collections.IEnumerable with
member __.GetEnumerator () = (getAll () :> System.Collections.IEnumerable).GetEnumerator()
interface IProducerConsumerCollection<'a> with
member __.CopyTo (array: 'a array, index) = getAll () |> Seq.iteri (fun i item -> array.[index + i] <- item)
member __.CopyTo (array: System.Array, index) = getAll () |> Seq.iteri (fun i item -> array.SetValue(item, index + i))
member __.TryAdd item = enqueue item;true
member __.TryTake item =
match tryDequeue () with
| Some element -> item <- element;true
| None -> false
member __.ToArray () = getAll () |> Seq.toArray
member __.Count = count ()
member __.SyncRoot = this |> box
member __.IsSynchronized = true
and private InternalQueueAgent<'a> () =
let agent =
MailboxProcessor<InternalQueueMessage<'a>>.Start
<| fun inbox ->
let rec loop (state: Listeners<'a>) =
async
let! message = inbox.Receive()
match message with
loop <| Listeners<'a>.Empty()
member __.Post message = agent.Post message
And finally, here are my unit tests for QueueAgent
:
open Microsoft.VisualStudio.TestTools.UnitTesting
[<AutoOpen>]
module Common =
let inline equal expected actual = Assert.AreEqual(expected, actual)
let inline notEqual expected actual = Assert.AreNotEqual(expected, actual)
let inline isTrue value = Assert.IsTrue(value)
let inline isFalse value = Assert.IsFalse(value)
[<TestClass>]
type QueueAgentTests () =
[<TestMethod>]
member __.``Enqueueing should add a message to the queue`` () =
async > equal 3
[<TestMethod>]
member __.``Dequeueing should wait for message to be enqueued, then remove the message from the queue`` () =
async > equal "test"
let! count = queue.AsyncCount()
count |> Async.RunSynchronously
[<TestMethod>]
member __.``Peeking should return the first message in the queue without removing it`` () =
async > equal "test"
let! count = queue.AsyncCount()
count |> Async.RunSynchronously
[<TestMethod>]
member __.``TryDequeue should return None if there are no messages in the queue`` () =
async
let queue = QueueAgent<string>()
let! dequeueResult = queue.AsyncTryDequeue()
dequeueResult |> Async.RunSynchronously
[<TestMethod>]
member __.``TryDequeue should return Some if there are messages in the queue, and remove one message from the queue`` () =
async > equal 2
|> Async.RunSynchronously
[<TestMethod>]
member __.``TryPeek should return None if there are no messages in the queue`` () =
async > equal None
|> Async.RunSynchronously
[<TestMethod>]
member __.``TryPeek should return Some if there are messages in the queue, and not modify the queue`` () =
async > equal < |> Async.RunSynchronously
[<TestMethod>]
member __.``EnqueueAll should enqueue all elements of a list in the queue`` () =
async
let queue = QueueAgent<string>()
let list = [1..10] |> Async.RunSynchronously
[<TestMethod>]
member __.``DequeueAll should remove all elements from the queue`` () =
async
let queue = QueueAgent<string>()
let list = [1..10] |> Async.RunSynchronously
[<TestMethod>]
member __.``QueueAgent should be thread-safe`` () =
let queue = QueueAgent<string>()
[1..10]
|> List.map (fun i ->
async
queue.Enqueue <)
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
queue.Count |> equal 10
[1..5]
|> List.map (fun _ -> queue.AsyncDequeue())
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
queue.Count |> equal 5
functional-programming f# queue lock-free immutability
Similar to the code review I posted last week for an agent-based immutable replacement for ConcurrentDictionary
, I have also created an agent-based immutable replacement for ConcurrentQueue
. This uses a MailboxProcessor
and an immutable queue based on Okasaki's implementation in Purely Functional Data Structures with a few extra operations. I am particularly interested in understanding if there's any way I can combine the QueueAgent
and the InternalQueueAgent
into one type (without the mutual-recursion), and if there's any way to do the asynchronous Peek
and Dequeue
operations without the internal ImmutableQueue
s for the PeekListeners
and DequeueListeners
. The idea behind those operations is to support a "yield until a message is available" behavior similar to an asynchronous Peek
or Receive
operation on MSMQ or RabbitMQ. I also welcome any general feedback on the implementation.
My code for the immutable queue is as follows:
open System.Collections.Generic
/// An F# Immutable Queue, based on Okasaki's implementation in Purely-Functional Data Structures
type ImmutableQueue<'message> private (front: 'message list, rear: 'message list) =
let enqueue message =
match front, message::rear with
| , newRear -> ImmutableQueue(newRear |> List.rev, )
| _, newRear -> ImmutableQueue(front, newRear)
let enqueueAll messages =
let orderedMessages = messages |> List.rev
match front, orderedMessages@rear with
| , newRear -> ImmutableQueue(newRear |> List.rev, )
| _, newRear -> ImmutableQueue(front, newRear)
let dequeue () =
match front with
| message::tail ->
message, (match tail with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(tail, rear))
| _ -> failwith "Cannot dequeue from empty queue!"
let dequeueAll () =
(front @ (rear |> List.rev), ImmutableQueue<'message>(, ) )
let tryDequeue () =
match front with
| message::tail ->
(message, (match tail with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(tail, rear)))
|> Some
| _ -> None
let tryPeek () =
match front with
| message::tail -> Some message
| _ -> None
let reverse () =
match front with
| -> ImmutableQueue(rear |> List.rev, )
| _ -> ImmutableQueue(front, rear)
let getEnumerator () =
(seq > List.rev
).GetEnumerator()
static member Empty = ImmutableQueue<'message>(, )
static member From messages = ImmutableQueue<'message>(messages, )
member __.IsEmpty = front.IsEmpty && rear.IsEmpty
member __.Length = front.Length + rear.Length
member __.HasMessages = front.IsEmpty |> not
member __.Enqueue message = enqueue message
member __.EnqueueAll messages = enqueueAll messages
member __.Dequeue () = dequeue ()
member __.DequeueAll () = dequeueAll ()
member __.TryDequeue () = tryDequeue()
member __.TryPeek () = tryPeek()
member __.Reverse () = reverse()
member __.GetEnumerator () = getEnumerator()
interface IEnumerable<'message> with
member this.GetEnumerator () = this.GetEnumerator()
interface System.Collections.IEnumerable with
member this.GetEnumerator () = this.GetEnumerator() :> System.Collections.IEnumerator
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Queue =
/// Create an empty queue of the given message type
let empty<'message> = ImmutableQueue<'message>.Empty
/// Enqueue a message in the given queue
let inline enqueue message (queue: ImmutableQueue<'message>) = queue.Enqueue message
/// Enqueue all of the provided messages in the given queue
let inline enqueueAll messages (queue: ImmutableQueue<'message>) = queue.EnqueueAll messages
/// Check if the given queue is empty
let inline isEmpty (queue: ImmutableQueue<'message>) = queue.IsEmpty
/// Compute the length (number of messages) of the given queue
let inline length (queue: ImmutableQueue<'message>) = queue.Length
/// Check if the given queue contains any messages
let inline hasMessages (queue: ImmutableQueue<'message>) = queue.HasMessages
/// Create a queue from an F# list
let inline ofList messages = messages |> ImmutableQueue.From
/// Create a queue fron an F# sequence
let inline ofSeq messages = messages |> Seq.toList |> ofList
/// Dequeue the message at the front of the given queue
let inline dequeue (queue: ImmutableQueue<'message>) = queue.Dequeue()
/// Dequeue all the messages from the given queue
let inline dequeueAll (queue: ImmutableQueue<'message>) = queue.DequeueAll()
/// Try to dequeue the message at the front of the given queue
let inline tryDequeue (queue: ImmutableQueue<'message>) = queue.TryDequeue()
/// Try to peek the message at the front of the given queue
let inline tryPeek (queue: ImmutableQueue<'message>) = queue.TryPeek()
/// Reverse the order of all messages in the given queue
let inline rev (queue: ImmutableQueue<'message>) = queue.Reverse()
And here's my implementation of QueueAgent
:
open System.Collections.Concurrent
open System.Collections.Generic
type private QueueMessage<'a> =
| Enqueue of 'a
| EnqueueAll of 'a list
| TryDequeue of AsyncReplyChannel<'a option>
| TryPeek of AsyncReplyChannel<'a option>
| Dequeue of AsyncReplyChannel<'a>
| DequeueAll of AsyncReplyChannel<'a seq>
| Peek of AsyncReplyChannel<'a>
| Count of AsyncReplyChannel<int>
| GetAll of AsyncReplyChannel<'a seq>
type private InternalQueueMessage<'a> =
| AddDequeueListener of AsyncReplyChannel<'a>
| AddPeekListener of AsyncReplyChannel<'a>
| ItemEnqueued of QueueAgent<'a>
and private Listeners<'a> =
PeekListeners: ImmutableQueue<AsyncReplyChannel<'a>>
DequeueListeners: ImmutableQueue<AsyncReplyChannel<'a>>
static member Empty() = PeekListeners = ImmutableQueue<AsyncReplyChannel<'a>>.Empty; DequeueListeners = ImmutableQueue<AsyncReplyChannel<'a>>.Empty
and QueueAgent<'a> () as this =
let internalQueue = InternalQueueAgent<'a>()
let agent =
MailboxProcessor<QueueMessage<'a>>.Start
<| fun inbox ->
let rec loop state =
async Peek channel ->
match state
loop ImmutableQueue<'a>.Empty
let enqueue item =
agent.Post <| Enqueue item
let enqueueAll items =
agent.Post <| EnqueueAll items
let tryDequeue () =
agent.PostAndReply TryDequeue
let asyncTryDequeue () =
agent.PostAndAsyncReply TryDequeue
let tryPeek () =
agent.PostAndReply TryPeek
let asyncTryPeek () =
agent.PostAndAsyncReply TryPeek
let dequeue () =
agent.PostAndReply Dequeue
let asyncDequeue () =
agent.PostAndAsyncReply Dequeue
let dequeueAll () =
agent.PostAndReply DequeueAll
let asyncDequeueAll () =
agent.PostAndAsyncReply DequeueAll
let peek () =
agent.PostAndReply Peek
let asyncPeek () =
agent.PostAndAsyncReply Peek
let count () =
agent.PostAndReply Count
let asyncCount () =
agent.PostAndAsyncReply Count
let getAll () =
agent.PostAndReply GetAll
let asyncGetAll () =
agent.PostAndAsyncReply GetAll
member __.Enqueue item = enqueue item
member __.EnqueueAll items = enqueueAll items
member __.TryDequeue () = tryDequeue ()
member __.AsyncTryDequeue () = asyncTryDequeue ()
member __.TryPeek () = tryPeek ()
member __.AsyncTryPeek () = asyncTryPeek ()
member __.Dequeue () = dequeue ()
member __.AsyncDequeue () = asyncDequeue ()
member __.DequeueAll () = dequeueAll ()
member __.AsyncDequeueAll () = asyncDequeueAll ()
member __.Peek () = peek ()
member __.AsyncPeek () = asyncPeek ()
member __.Count = count()
member __.AsyncCount () = asyncCount ()
member __.GetAll () = getAll ()
member __.AsyncGetAll () = asyncGetAll ()
interface IEnumerable<'a> with
member __.GetEnumerator () = (getAll () :> IEnumerable<'a>).GetEnumerator()
interface System.Collections.IEnumerable with
member __.GetEnumerator () = (getAll () :> System.Collections.IEnumerable).GetEnumerator()
interface IProducerConsumerCollection<'a> with
member __.CopyTo (array: 'a array, index) = getAll () |> Seq.iteri (fun i item -> array.[index + i] <- item)
member __.CopyTo (array: System.Array, index) = getAll () |> Seq.iteri (fun i item -> array.SetValue(item, index + i))
member __.TryAdd item = enqueue item;true
member __.TryTake item =
match tryDequeue () with
| Some element -> item <- element;true
| None -> false
member __.ToArray () = getAll () |> Seq.toArray
member __.Count = count ()
member __.SyncRoot = this |> box
member __.IsSynchronized = true
and private InternalQueueAgent<'a> () =
let agent =
MailboxProcessor<InternalQueueMessage<'a>>.Start
<| fun inbox ->
let rec loop (state: Listeners<'a>) =
async
let! message = inbox.Receive()
match message with
loop <| Listeners<'a>.Empty()
member __.Post message = agent.Post message
And finally, here are my unit tests for QueueAgent
:
open Microsoft.VisualStudio.TestTools.UnitTesting
[<AutoOpen>]
module Common =
let inline equal expected actual = Assert.AreEqual(expected, actual)
let inline notEqual expected actual = Assert.AreNotEqual(expected, actual)
let inline isTrue value = Assert.IsTrue(value)
let inline isFalse value = Assert.IsFalse(value)
[<TestClass>]
type QueueAgentTests () =
[<TestMethod>]
member __.``Enqueueing should add a message to the queue`` () =
async > equal 3
[<TestMethod>]
member __.``Dequeueing should wait for message to be enqueued, then remove the message from the queue`` () =
async > equal "test"
let! count = queue.AsyncCount()
count |> Async.RunSynchronously
[<TestMethod>]
member __.``Peeking should return the first message in the queue without removing it`` () =
async > equal "test"
let! count = queue.AsyncCount()
count |> Async.RunSynchronously
[<TestMethod>]
member __.``TryDequeue should return None if there are no messages in the queue`` () =
async
let queue = QueueAgent<string>()
let! dequeueResult = queue.AsyncTryDequeue()
dequeueResult |> Async.RunSynchronously
[<TestMethod>]
member __.``TryDequeue should return Some if there are messages in the queue, and remove one message from the queue`` () =
async > equal 2
|> Async.RunSynchronously
[<TestMethod>]
member __.``TryPeek should return None if there are no messages in the queue`` () =
async > equal None
|> Async.RunSynchronously
[<TestMethod>]
member __.``TryPeek should return Some if there are messages in the queue, and not modify the queue`` () =
async > equal < |> Async.RunSynchronously
[<TestMethod>]
member __.``EnqueueAll should enqueue all elements of a list in the queue`` () =
async
let queue = QueueAgent<string>()
let list = [1..10] |> Async.RunSynchronously
[<TestMethod>]
member __.``DequeueAll should remove all elements from the queue`` () =
async
let queue = QueueAgent<string>()
let list = [1..10] |> Async.RunSynchronously
[<TestMethod>]
member __.``QueueAgent should be thread-safe`` () =
let queue = QueueAgent<string>()
[1..10]
|> List.map (fun i ->
async
queue.Enqueue <)
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
queue.Count |> equal 10
[1..5]
|> List.map (fun _ -> queue.AsyncDequeue())
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
queue.Count |> equal 5
functional-programming f# queue lock-free immutability
asked May 8 at 12:40
Aaron M. Eshbach
3297
3297
add a comment |Â
add a comment |Â
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f193920%2flock-free-immutable-concurrentqueue%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password