最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

自定义一个简单的Task调度器、任务循环调度器、TaskScheduler

来源:博客园

前言:

自从接触异步(async await Task)操作后,始终都不明白,这个Task调度的问题。

接触Quartz.net已经很久了,只知道它实现了一套Task调度的方法,自己跟着Quartz.net源代码写了遍,调试后我算是明白了Task调度的一部分事()。


(资料图片)

春风来不远,只在屋东头。

理解Task运行,请参考大佬文章 https://www.cnblogs.com/artech/p/task_scheduling.html ,推荐大佬的书。

直到我看Quartz.net源代码中的任务调度 “QueuedTaskScheduler”,我才搞明白了,如何写一个简单的任务调度器,或者说线程如何执行代码,才不会造成死循环,CPU吃满等问题,下面代码有的直接从quartz.net copy过来的。

BlockingCollection类

微软文档 https://learn.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview

个人博客,中文解释通俗易懂 https://www.cnblogs.com/gl1573/p/14595985.html

BlockingCollection 提供一个很重要的“阻塞”功能。

TaskScheduler类

TaskScheduler 直译过来:表示一个对象,该对象处理将任务排队到线程上的低级工作。

该类为抽象类,其真正意义在于“对Task任务的编排”

基于TaskScheduler类实现自定义的“Task队列调度器”

源代码,我的仓库 https://github.com/qiqiqiyaya/Learning-Case/tree/main/TaskScheduler/AspNet6TaskScheduler

1.定义一个存储Task的队列容器,使用BlockingCollection容器来添加Task,为什么使用BlockingCollection,后面会解释

/// The collection of tasks to be executed on our custom threads.        private readonly BlockingCollection _blockingTaskQueue;

2.定义CancellationTokenSource变量,用于释放。通常就是调用 CancellationToken.ThrowIfCancellationRequested() ,抛出一个 “OperationCanceledException”的异常,使正在执行的Task任务停止。

3.创建Thread数组,用于存储创建出的Thread

/// The threads used by the scheduler to process work.        private readonly Thread[] _threads;

4.自定义一个类QueuedTaskScheduler,继承 “TaskScheduler”,“IDisposable”

public class QueuedTaskScheduler: System.Threading.Tasks.TaskScheduler, IDisposable

实现构造函数

public QueuedTaskScheduler(int threadCount)        {            _threadCount = threadCount;            _blockingTaskQueue = new BlockingCollection();            // create threads            _threads = new Thread[threadCount];            for (int i = 0; i < threadCount; i++)            {                _threads[i] = new Thread(ThreadBasedDispatchLoop)                {                    Priority = ThreadPriority.Normal,                    IsBackground = true,                    Name = $"threadName ({i})"                };            }            // start            foreach (var thread in _threads)            {                thread.Start();            }        }

在构造函数中创建,并启动“Thread”,构造函数接收一个“线程数量的参数”,控制开启的线程数。

Thread中实现的委托为“ThreadBasedDispatchLoop”,其表达意思是“基于循环的调度”。

5.重点来了,具体看下“ThreadBasedDispatchLoop”方法的实现

ThreadBasedDispatchLoop实现

/// The dispatch loop run by all threads in this scheduler.        private void ThreadBasedDispatchLoop()        {            _taskProcessingThread.Value = true;            try            {                // If a thread abort occurs, we"ll try to reset it and continue running.                while (true)                {                    try                    {                        // For each task queued to the scheduler, try to execute it.                        foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token))                        {                            TryExecuteTask(task);                        }                    }                    catch (ThreadAbortException)                    {                        // If we received a thread abort, and that thread abort was due to shutting down                        // or unloading, let it pass through.  Otherwise, reset the abort so we can                        // continue processing work items.                        if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())                        {#pragma warning disable SYSLIB0006                            Thread.ResetAbort();#pragma warning restore SYSLIB0006                        }                    }                }            }            catch (OperationCanceledException)            {                // If the scheduler is disposed, the cancellation token will be set and                // we"ll receive an OperationCanceledException.  That OCE should not crash the process.            }            finally            {                _taskProcessingThread.Value = false;            }        }

在外层套一层try catch捕获 CancellationTokenSource 变量,取消操作(CancellationTokenSource.Cancel())产生的异常,并且忽略该异常。

其中使用while(true),无限循环执行,?????奇了怪了,为什么以前写代码时,while(true)写了,会直接把CPU吃满,程序搞奔溃呢????

关键点就在于

_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)执行时,如果_blockingTaskQueue容器中没有元素时,执行就会被“阻塞”,这种阻塞不会造成或者造成很小的资源浪费。

_blockingTaskQueue有值时,阻塞就会停止,_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)执行,返回一个Task对象,然后开始执行 TryExecuteTask(task) ,执行Task。

6.继承 “TaskScheduler”后需要实现的几个方法

GetScheduledTasks

protected override IEnumerable? GetScheduledTasks()        {            return _blockingTaskQueue.ToList();        }

GetScheduledTasks 返回需要被调度的 Tasks

QueueTask

protected override void QueueTask(Task task)        {            // QueuedTaskScheduler 释放时,禁止向队列中添加Task            if (_disposeCancellation.IsCancellationRequested)            {                throw new ObjectDisposedException(GetType().Name);            }            _blockingTaskQueue.Add(task);        }

QueueTask 将排队等候的Task加入到 “_blockingTaskQueue”队列变量中

TryExecuteTaskInline

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)        {            // If we"re already running tasks on this threads, enable inlining            return _taskProcessingThread.Value && TryExecuteTask(task);        }

意思是,参数task是否在此线程上运行,请查看ThreadBasedDispatchLoop方法。

“ThreadLocal” 该类型变量声明生命周期跟随 “构造函数”中启动的线程,且每一个线程单独一个变量,值存储在线程上。

自此自定义“Task调度器”完成。

启动,运行QueuedTaskScheduler

1. 创建 QueuedTaskScheduler ,其中用于执行Task的线程数为 1

2.创建 Task ,并将其加入到指定的 Task调度器中。

3.调试一下

A. 创建QueuedTaskScheduler ,创建 线程 “Thread” ,并启动线程

B. 调试过程 

当_blockingTaskQueue没有Task时,执行到_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token) 就会阻塞。

自此 自定义TaskScheduler完成。

我的源代码 https://github.com/qiqiqiyaya/Learning-Case/tree/main/TaskScheduler/AspNet6TaskScheduler

关键词: