FixAntenna/NetCore/Common/Threading/Queue/SynchronizeBlockingQueue.cs (101 lines of code) (raw):
// Copyright (c) 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Collections.Generic;
using System.Threading;
namespace Epam.FixAntenna.NetCore.Common.Threading.Queue
{
internal class SynchronizeBlockingQueue<TE> : ISimpleBlockingQueue<TE>
{
/// <summary>
/// The queued items
/// </summary>
private readonly TE[] _items;
/// <summary>
/// Number of items in the queue
/// </summary>
private int _count;
/// <summary>
/// items index for next put, offer, or add.
/// </summary>
private int _putIndex;
/// <summary>
/// items index for next take, poll or remove
/// </summary>
private int _takeIndex;
private readonly object _sync = new object();
public SynchronizeBlockingQueue(int limit)
{
_items = new TE[limit];
}
public virtual void Put(TE item)
{
lock (_sync)
{
if (EqualityComparer<TE>.Default.Equals(item, default))
{
throw new NullReferenceException();
}
try
{
while (_count == _items.Length)
{
Monitor.Wait(_sync);
}
}
catch (ThreadInterruptedException)
{
Monitor.PulseAll(_sync);
throw;
}
Insert(item);
Monitor.PulseAll(_sync);
}
}
public virtual TE Take()
{
lock (_sync)
{
try
{
while (_count == 0)
{
Monitor.Wait(_sync);
}
}
catch (ThreadAbortException)
{
Monitor.PulseAll(_sync);
throw;
}
catch (ThreadInterruptedException)
{
Monitor.PulseAll(_sync);
throw;
}
var item = ExtractItem();
Monitor.PulseAll(_sync);
return item;
}
}
// this doc comment is overridden to remove the reference to collections
// greater in size than Integer.MAX_VALUE
/// <summary>
/// Returns the number of elements in this queue.
/// </summary>
/// <value> the number of elements in this queue </value>
public virtual int Size
{
get
{
lock (_sync)
{
return _count;
}
}
}
public virtual bool IsEmpty
{
get { return Size == 0; }
}
/// <summary>
/// Circularly increment i.
/// </summary>
internal int Inc(int i)
{
return ++i == _items.Length ? 0 : i;
}
/// <summary>
/// Inserts element at current put position
/// </summary>
private void Insert(TE item)
{
_items[_putIndex] = item;
_putIndex = Inc(_putIndex);
++_count;
}
/// <summary>
/// Extracts element at current take position.
/// </summary>
private TE ExtractItem()
{
var items = _items;
var item = items[_takeIndex];
items[_takeIndex] = default;
_takeIndex = Inc(_takeIndex);
--_count;
return item;
}
}
}