C#4.0的并行库TPL,即Task(一)http://47.98.154.65/?id=1793
C#4.0的并行库TPL,即Task(二) http://47.98.154.65/?id=1798
C#4.0的并行库TPL,即Task(三) http://47.98.154.65/?id=1808
C#4.0的并行库TPL,即Task(四) http://47.98.154.65/?id=1815
C#4.0的并行库TPL,即Task(五) http://47.98.154.65/?id=1816
.net 4.0引入的TPL,即任务并行库。
它是在线程池之上的又一个抽象层。
TPL的核心概念是任务。一个任务代表了一个异步操作,该操作可以通过多种方式运行,可以使用或者不使用独立线程运行。
一个任务可以通过多种方式和其它任务组合起来。例如,可以同时启动多个任务,等待所有任务完成,然后运行一个任务对之前的所有任务的结果进行一些计算。
TPL与之前的模式相比,其中一个关键的优势是其具有用于组合任务的使得的API。
TPL处理异常有多种方式。由于一个任务可能会由多个其它任务组成,这些任务也可能依次拥有各自的子任务,所以有一个AggregateException的概念。这种异常可以捕获底层任务内部的所有的异常,并允许单独处理这些异常。
另外,在.net 5.0里面,新的异步编程交键字await和async里面已经包含有TPL的支持了。
例子一:创建任务
new Task()后必须要Start。
如果使用.Run()或者是.StartNew就不用显示的Start。
标记了TaskCreationOptions.LongRunning,则表示任务不使用线程池,而是使用Thread来运行。
using System; using System.Threading; using System.Threading.Tasks; namespace Chapter4.Recipe1 { class Program { static void Main(string[] args) { var t1 = new Task(() => TaskMethod("Task 1")); var t2 = new Task(() => TaskMethod("Task 2")); t2.Start(); t1.Start(); Task.Run(() => TaskMethod("Task 3")); Task.Factory.StartNew(() => TaskMethod("Task 4")); Task.Factory.StartNew(() => TaskMethod("Task 5"), TaskCreationOptions.LongRunning); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } static void TaskMethod(string name) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } } }
示例二:任务的基本操作
首先直接运行了TaskMethod,它于是被同步运行,并且不属于线程池。
Task 1,使用Start并等待结果,该任务被放置于线程池,并且由于那句 int result = task.Result
所以主线程会等待,直到任务返回前都一直是阻塞状态。
Task 2,跟直接运行TaskMethod是一样,这个用法可以让该任务运行在主线程中,避免使用线程池来运行非常短暂的操作。
Task 3跟Task 1的区别是,它没有阻塞主线程,它循环打印出任务的状态。状态分别为:
Created、Running和RanToCompletion。
using System; using System.Threading; using System.Threading.Tasks; namespace Chapter4.Recipe2 { class Program { static void Main(string[] args) { TaskMethod("Main Thread Task"); Task<int> task = CreateTask("Task 1"); task.Start(); int result = task.Result; Console.WriteLine("Result is: {0}", result); task = CreateTask("Task 2"); task.RunSynchronously(); result = task.Result; Console.WriteLine("Result is: {0}", result); task = CreateTask("Task 3"); Console.WriteLine(task.Status); task.Start(); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); result = task.Result; Console.WriteLine("Result is: {0}", result); Console.ReadKey(); } static Task<int> CreateTask(string name) { return new Task<int>(() => TaskMethod(name)); } static int TaskMethod(string name) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); return 42; } } }
示例三:组合任务(相互依赖的任务)
程序启动后,创建两个任务 firstTask,secondTask
在(1)处,为任务firstTask创建了一个后续任务,这个后续任务会在当前任务完成后运行。
然后启动两个任务后,等4秒,这个时间足够两个任务完成。
然后在(2)处,secondTask任务执行了一个后续操作,并通过指定两个选项:TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously来尝试同步执行该后续操作。即如果后续操作比较短暂,这种方式就很有用,
因为放置在主线程中运行比放置在线程池中运行要快。
可以实现这一点是因为第二个任务恰好在那刻完成。如果你把4秒的sleep方法注释掉,将会看到
该代码被放置线程池中去,这是因为还未从之前的任务中得到结果。
在(3)处我们为之前的后续任务又定义了一个后续任务,但这里使用了一个GetAwaiter和OnCompleted方法,这些方法是C#5.0的异步机制。
在(4)处演示了父子线程,我们创建了一个新任务,当运行该任务时,通过提供一个TaskCreationOptions.AttachedToParent
选项来运行一个子任务。
注意:子任务必须在父任务运动的时候创建,并正确的附加给父任务!
只得分所得分子任务结束工作,父任务才会完成。通过一个TaskContinuationOptions选项还可以给子任务加上后续操作,这时候,只得分后续操作结束后,父任务才能完成。
using System; using System.Threading; using System.Threading.Tasks; namespace Chapter4.Recipe3 { class Program { static void Main(string[] args) { var firstTask = new Task<int>(() => TaskMethod("First Task", 3)); var secondTask = new Task<int>(() => TaskMethod("Second Task", 2)); //(1) firstTask.ContinueWith( t => Console.WriteLine("The first answer is {0}. Thread id {1}, is thread pool thread: {2}", t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread), TaskContinuationOptions.OnlyOnRanToCompletion); firstTask.Start(); secondTask.Start(); Thread.Sleep(TimeSpan.FromSeconds(4)); //(2) Task continuation = secondTask.ContinueWith( t => Console.WriteLine("The second answer is {0}. Thread id {1}, is thread pool thread: {2}", t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread), TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously); //(3) continuation.GetAwaiter().OnCompleted( () => Console.WriteLine("Continuation Task Completed! Thread id {0}, is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread)); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(); //(4) firstTask = new Task<int>(() => { var innerTask = Task.Factory.StartNew(() => TaskMethod("Second Task", 5), TaskCreationOptions.AttachedToParent); innerTask.ContinueWith(t => TaskMethod("Third Task", 2), TaskContinuationOptions.AttachedToParent); return TaskMethod("First Task", 2); }); firstTask.Start(); while (!firstTask.IsCompleted) { Console.WriteLine(firstTask.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(firstTask.Status); Thread.Sleep(TimeSpan.FromSeconds(10)); Console.ReadKey(); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } } }
示例四:取消选项
仔细看longTask的创建代码,我们将给底层传递一次取消标志,然后给任务的构造函数再传递一次。
为什么需要提供取消标志两次呢?
答案是如果在任务实际启动前取消它,该任务的TPL基础设施有责任处理该取消操作,
因为这些代码根本不会执行。通过得到的第一个任务状态可以知道它被取消。如果尝试对该
任务调用Start方法,将会得到InvalidOperationException异常。
然后你需要自己来写代码处理取消过程。注意在TaskMethod的代码中你可以看到我们的处理代码。
在你取消任务后,任务的状态仍然是RanToCompletion,因为从TPL的角度来看,该任务正常完成了它的
工作。辨别这两种情况是非常重要的,并且需要理解每种情况下职责的不同。
using System; using System.Threading; using System.Threading.Tasks; namespace Chapter4.Recipe6 { class Program { private static void Main(string[] args) { var cts = new CancellationTokenSource(); var longTask = new Task<int>(() => TaskMethod("Task 1", 10, cts.Token), cts.Token); Console.WriteLine(longTask.Status); cts.Cancel(); Console.WriteLine(longTask.Status); Console.WriteLine("First task has been cancelled before execution"); cts = new CancellationTokenSource(); longTask = new Task<int>(() => TaskMethod("Task 2", 10, cts.Token), cts.Token); longTask.Start(); for (int i = 0; i < 5; i++ ) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine(longTask.Status); } cts.Cancel(); for (int i = 0; i < 5; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine(longTask.Status); } Console.WriteLine("A task has been completed with result {0}.", longTask.Result); Console.ReadKey(); } private static int TaskMethod(string name, int seconds, CancellationToken token) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); for (int i = 0; i < seconds; i ++) { Thread.Sleep(TimeSpan.FromSeconds(1)); if (token.IsCancellationRequested) return -1; } return 42*seconds; } } }
示例五:处理任务中的异常
第一个例子只是使用catch捕获异常,它是被称为AggregateException的异常。可以通过访问InnerException属性来得到底层异常。
第二个例子使用GetAwaiter和GetResult来访问任务结果。这种情况下无需封装异常,因为TPL的基础设施会提取异常。
最后一个例子展示了两个任务抛出异常的情形。
只有之前的任务完成前得分异常时,该后续操作才会操作。通过给后续操作传递TaskContinuationOptions.OnlyOnFaulted选项时可以实现该行为。
由于任务可以各种形式进行连接在一起,因为AggregateException异常可能包含其它普通异常的集合。这么多异常怎么提取呢?
这时候可以使用根异常的Flatten方法。它将返回一个集合。该集合包含以层级结构中每个子聚合异常中的内部异常。
using System; using System.Threading; using System.Threading.Tasks; namespace Chapter4.Recipe7 { class Program { static void Main(string[] args) { Task<int> task; try { task = Task.Run(() => TaskMethod("Task 1", 2)); int result = task.Result; Console.WriteLine("Result: {0}", result); } catch (Exception ex) { Console.WriteLine("Exception caught: {0}", ex); } Console.WriteLine("----------------------------------------------"); Console.WriteLine(); try { task = Task.Run(() => TaskMethod("Task 2", 2)); int result = task.GetAwaiter().GetResult(); Console.WriteLine("Result: {0}", result); } catch (Exception ex) { Console.WriteLine("Exception caught: {0}", ex); } Console.WriteLine("----------------------------------------------"); Console.WriteLine(); var t1 = new Task<int>(() => TaskMethod("Task 3", 3)); var t2 = new Task<int>(() => TaskMethod("Task 4", 2)); var complexTask = Task.WhenAll(t1, t2); var exceptionHandler = complexTask.ContinueWith(t => Console.WriteLine("Exception caught: {0}", t.Exception), TaskContinuationOptions.OnlyOnFaulted ); t1.Start(); t2.Start(); Thread.Sleep(TimeSpan.FromSeconds(5)); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); throw new Exception("Boom!"); return 42 * seconds; } } }

