最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

看点:C#中定义自己的消费队列(下)

来源:博客园


(资料图)

一 背景

在上一篇中我们介绍了一个关于使用C#中的Queue来定义自己的消费队列,这篇文章我将再次使用Queue来定义另外一种消费队列,这个队列中会使用到System.Threading.Timer来定义一个10ms的Interval,和上一篇中产生数据一个个消费不同这篇文章中介绍的消费队列中消费定时器时间间隔内的所有待消费项,前面我们还是一样会通过源码来一步步讲述其内部原理,最后会通过几个单元测试来验证对应的使用。

二 源码

using System;using System.Collections.Generic;using System.Diagnostics;using System.Threading;namespace Pangea.Common.Utility.Buffer{    public class CustomConsumeQueue : IDisposable    {        #region Fields        public const int INTERVAL_CONSUMING = 10;//单位:ms        // used for dispose        private bool _isDisposed = false;        protected const bool FINALIZE_DISPOSING = false;        protected const bool EXPLICIT_DISPOSING = true;        private Timer _timer;        private Queue _queue;        private readonly object _lockObj = new object();        private readonly Action> _consumerQueueItemsAction;#if DEBUG        // the counter for working thread numbers in current;        public int _threadCounter = 0;#endif        #endregion        #region Ctor        public CustomConsumeQueue(Action> consumerQueueItemsAction)        {            _queue = new Queue();            _timer = new Timer(new TimerCallback(BeginTakeQueueItems));            _timer.Change(Timeout.Infinite, Timeout.Infinite);            _consumerQueueItemsAction = consumerQueueItemsAction;        }        ~CustomConsumeQueue()        {            Dispose(FINALIZE_DISPOSING);        }        #endregion        #region Methods        #region Public        public bool Add(T item)        {            if (_isDisposed) return false;            lock (_lockObj)            {                _queue.Enqueue(item);                _timer.Change(INTERVAL_CONSUMING, Timeout.Infinite);            }            return true;        }        public int PendingToConsumeCount()        {            lock (_lockObj)            {                return _queue.Count;            }        }        public void Dispose()        {            Dispose(EXPLICIT_DISPOSING);            GC.SuppressFinalize(this);        }        public void Dispose(bool disposingMode)        {            if (_isDisposed) return;            if (disposingMode == EXPLICIT_DISPOSING)            {                // release managed resource whne dispose by explicit            }            _timer?.Dispose();            _timer = null;            _isDisposed = true;        }        #endregion        #region Private        private void BeginTakeQueueItems(object state)        {            ThreadPool.QueueUserWorkItem(state1 =>            {#if DEBUG                Interlocked.Increment(ref _threadCounter);#endif                try                {                    T[] itemsArray = null;                    lock (_lockObj)                    {                        itemsArray = new T[_queue.Count];                        _queue.CopyTo(itemsArray, 0);                        _queue.Clear();                        if (_isDisposed == false)                        {                            // may throw a disposed exception                            ((Timer)state).Change(Timeout.Infinite, Timeout.Infinite);                        }                    }                    Trace.WriteLine($"[{DateTime.Now:HH-mm-ss fff}] Begin into consume procedure,QueueCount:{itemsArray.Length},Time:{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}");                    //begin consumer queue items                    _consumerQueueItemsAction.Invoke(itemsArray);                }                catch (Exception ex)                {                    // TODO Log exception, terminate current thread                }                finally                {#if DEBUG                    Interlocked.Decrement(ref _threadCounter);#endif                }            });        }        #endregion        #endregion    }}

2.1 原理讲解

  1. Add 的过程在Add的时候除了将待消费项添加到默认的_queue对象中以外就开始设置定时器触发的Interval为INTERVAL_CONSUMING,这个值默认设置为10ms,也就是当前定时器的触发间隔在添加消费项的时候会默认开启。
  2. 定时器消费过程定时器消费的时候会一次性将10ms的间隔内所有消费项一次性取到,然后一次性消费掉,另外需要注意的是,在定时器触发时间回调的过程中需要重新设置定时器的IntervalTimeout.Infinite,这样就能够将原来的定时器停掉,在下一次再次添加的时候重新设置默认的Interval,这样就能够一次往复交替进行,这个是消费过程的主要逻辑。
  3. 实现IDispose模式当前的CustomConsumeQueue显式实现IDispose的接口,代码中也都实现了对_isDisposed字段的判断,这个在使用的过程中需要特别注意。

三 测试

单元测试用例如下:

using NUnit.Framework;using Pangea.Common.Utility.Buffer;using System;using System.Collections.Generic;using System.Diagnostics;using System.Threading;namespace ACM.Framework.Test.Modules.Utils{    public class CustomConsumeQueueTest    {        CustomConsumeQueue _consumingQueue = null;        [SetUp]        public void Setup()        {            CustomConsumeQueue consumeQueue = new CustomConsumeQueue(StartConsuming);            _consumingQueue = consumeQueue;        }        [Test]        public void GeneralWritingTest()        {            int consumedCount = 0;            CustomConsumeQueue consumeQueue = new CustomConsumeQueue(queueItems =>            {                int currentConsumedCount = queueItems.Count;                Console.WriteLine($"[{DateTime.Now:HH-mm-ss fff}] {currentConsumedCount} items has been consumed.");                consumedCount += currentConsumedCount;            });            int producedCount = new Random(Guid.NewGuid().GetHashCode()).Next(1, 100);            AddItem(consumeQueue, producedCount);            int loopTime = 1000;            while (consumedCount != producedCount)            {                Thread.Sleep(100);                loopTime -= 100;            }            Assert.IsTrue(consumeQueue.PendingToConsumeCount() == 0);            Assert.IsTrue(consumeQueue._threadCounter == 0);            Assert.IsTrue(consumedCount == producedCount);        }        [Test]        public void TimerWritingTest()        {            int times = 20;            int paralleCount = 5;            int completedThreadCount = 0;            for (int i = 0; i < paralleCount; i++)            {                int localIndex = i;                ThreadPool.QueueUserWorkItem(obj =>                {                    while (times-- > 0)                    {                        AddItem(_consumingQueue, new Random(Guid.NewGuid().GetHashCode()).Next(1, 100));                        Thread.Yield();                    }                    Interlocked.Increment(ref completedThreadCount);                    Trace.WriteLine($"***************Begin run into parralle thread:{localIndex}************************");                });            }            while (completedThreadCount != 5)            {                Thread.Sleep(100);            }            Thread.Sleep(1000);            Assert.IsTrue(_consumingQueue.PendingToConsumeCount() == 0);            Assert.IsTrue(_consumingQueue._threadCounter == 0);        }        [Test]        public void AddAfterDisposeTest()        {            string item1 = GetRandomString();            bool result1 = _consumingQueue.Add(item1);            Assert.IsTrue(result1);            Thread.Sleep(1000); // Console.WriteLine will cost lots of time            Assert.IsTrue(_consumingQueue.PendingToConsumeCount() == 0);            Assert.IsTrue(_consumingQueue._threadCounter == 0, $"[{DateTime.Now:HH-mm-ss fff}] Thread numbers is not zero.");            _consumingQueue.Dispose();            string item2 = GetRandomString();            bool result2 = _consumingQueue.Add(item2);            Assert.IsFalse(result2);        }        private void StartConsuming(IList queueItems)        {            Console.WriteLine($"[{DateTime.Now:HH-mm-ss fff}] {queueItems.Count} items has been consumed.");        }        private void AddItem(CustomConsumeQueue queue, int numbers)        {            IList items = new List();            for (int i = 0; i < numbers; i++)            {                items.Add(GetRandomString());            }            foreach (var item in items)            {                queue.Add(item);                Thread.Sleep(new Random(Guid.NewGuid().GetHashCode()).Next(1, 50));            }        }        private string GetRandomString()        {            return Guid.NewGuid().ToString();        }    }}

测试的单元用例主要包含下面的几个部分:

  1. GeneralWritingTest()实现单个线程增加PendingItem,然后实现消费的过程。
  2. TimerWritingTest()使用5个独立的线程增加PendingItem,在每个线程中随机产生消费项,然后判断消费过程是否正确。
  3. AddAfterDisposeTest() 主要测试代码实现Dispose的过程,在添加然后消费后中途在显式的Dispose当前的Queue,判断后续是否能够再继续进行添加。

关键词: