勇哥要做到的目标是:
task工厂按添加的顺序依次执行。
下面程序我规定的顺序为:
A...
A...
B...
B...
C...
C...
从源码一来看,每次执行次序都不同。
如果我们把lock那段代码启用,效果如下:
可以看到仅保证了两次输出是连续,而不能保证执行顺序跟task工厂add的顺序一样。
代码还有一个问题是:
myScheduler调度器并没有发挥作用,twork根本执行不到。
以上问题的解决版本见后面。
问题源码1:
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; namespace WindowsFormsApplication1 { public partial class Form1 : Form { myScheduler sd = new myScheduler(); work worker = new work(); CancellationTokenSource cts = new CancellationTokenSource(); public Form1() { InitializeComponent(); } private async void button1_Click(object sender, EventArgs e) { var res = await appendTask(); richTextBox1.AppendText("ok "+DateTime.Now.ToString()); } List<int> list1 = new List<int>() { 0, 1, 2 }; private Task<int> appendTask() { return Task.Run<int>(() => { //这里不能用for循环传i,否则会... //for (var i = 0; i < 3; i++) //{ foreach (var m in list1) { Task.Factory.StartNew(() => worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token); } //} while (true) { if(cts.IsCancellationRequested) { break; } if (sd.taskCount() == 0) { break; } Thread.Sleep(2); } return 0; }); } private void Form1_Load(object sender, EventArgs e) { worker.worked += Worker_worked; } private void Worker_worked(object sender, work.workEventArgs e) { AppendTextString(richTextBox1, e.outmsg + Environment.NewLine); } public static void AppendTextString(Control ctrl, string text, bool AndCRLF = true) { if (ctrl == null) return; if (ctrl.InvokeRequired) { Action<Control, string, bool> method = AppendTextString; ctrl.Invoke(method, new object[] { ctrl, text, AndCRLF }); } else { if (ctrl.Text == "") { ctrl.Text = text; } else { if (AndCRLF) { ctrl.Text += text; } else { ctrl.Text += text; } if (ctrl is RichTextBox) { var obj = ((RichTextBox)ctrl); obj.Select(obj.TextLength, 0); obj.ScrollToCaret(); } } } } } public class work { public delegate void workEvnetHadle(object sender, workEventArgs e); public event workEvnetHadle worked; public class workEventArgs : EventArgs { public readonly string outmsg; public workEventArgs(string msg1) { this.outmsg = msg1; } } public virtual void OnWork(workEventArgs e) { if(worked!=null) { worked(this, e); } } static readonly object obj1 = new object(); string[] strary = new string[3] { "A...","B...","C..."}; public struct workstruct { public CancellationToken workcts; public int workjs; } public void workfun(workstruct data) { //lock (obj1) //{ for (int i = 0; i < 2; i++) { if(data.workcts.IsCancellationRequested) { break; } OnWork(new workEventArgs(strary[data.workjs])); Thread.Sleep(600); } //} } } public class myScheduler:TaskScheduler,IDisposable { private ManualResetEvent[] mc = new ManualResetEvent[2]; private List<Task> taskList = new List<Task>(); private Thread taskThread = null; public myScheduler() { mc[0] = new ManualResetEvent(false); mc[1] = new ManualResetEvent(false); taskThread = new Thread(new ThreadStart(twork)); taskThread.IsBackground = true; taskThread.Start(); } public int taskCount() { return taskList.Count; } private void twork() { while(true) { if(waitTask()) { Thread.Sleep(2); continue; } Task task=null; if(TryDequeue(task)) { if(TryExecuteTask(task)) { int k1 = 0; } } } } private bool waitTask() { if(taskList.Count<1) { } //var s1=WaitHandle.WaitAny(mc); return false; } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { throw new NotImplementedException(); } protected override void QueueTask(Task task) { if(null!=task) { taskList.Add(task); } } protected override IEnumerable<Task> GetScheduledTasks() { return taskList.ToArray(); } protected override bool TryDequeue(Task task) { return base.TryDequeue(task); } private void Dispose(bool f1) { if (!f1) return; } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } public override int MaximumConcurrencyLevel { get { return base.MaximumConcurrencyLevel; } } } }
最终源码解决了上面的全部问题。
几个知识点:
1. IfEmptyWait中,如果任务为空,则程序停留在var s1=WaitHandle.WaitAny(mc);
其中WaitHandle.WaitAny(mc)返回值是满足等待的对象的数组索引
在QueueTask中,如果任务不为空,则mc[0].Set(),让上面的信号继续。
2. protected override void QueueTask(Task task),它的task由下面语句传入。
Task.Factory.StartNew(() =>
worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token,TaskCreationOptions.None,sd);
3. BlockingCollection<Task> 自带阻塞功能的线程安全集合类
本例子不能使用List<Task>,原因是它不能take一个元素。
Add 方法用于向集合添加元素。
Take 方法用于从集合中获取元素。当集合为空时,Take 方法将阻塞,直到获取到新元素。
CompleteAdding 方法标记集合为完成状态,此后不能再向集合中添加元素,调用 Add 将抛出 System.InvalidOperationException 异常。
调用 CompleteAdding 方法将使阻塞状态的 Take 方法抛出 System.InvalidOperationException 异常。
实例化 BlockingCollection<T> 时,可以传入 boundedCapacity 参数,设置集合的上限,集合中元素到达上限后,Add 方法将阻塞。
TryAdd 方法在集合满时,不会阻塞,而是直接返回 false,并且丢弃要插入的元素。
TryTake 方法在集合为空时不会阻塞,而是会返回 false。
当有多个线程 Take 时,将形成一个 Take 队列,依次获取到元素。
4 if(!IfEmptyWait())
注意这里是条件是!
private bool IfEmptyWait()
{
if(taskList.Count<1)
{
mc[0].Reset();
}
var s1=WaitHandle.WaitAny(mc);
return s1==0;
}
当程序启动后,mc[0], mc[1]都没有信号,会卡在 var s1=WaitHandle.WaitAny(mc);
当点击“启动”按钮后,mc[1].set, 这时 var s1=1,于是程序继续。直到全部任务完成。
当全部任务完成后,这时候执行mc[0].Reset(),而mc[1]还是set(有信号),因此s1=1
当点击“停止”后,mc[1].reset(无信号),程序又会卡在会卡在 var s1=WaitHandle.WaitAny(mc);
当s1=0的时候,会执行后面的运行task。
5. 自定义任务调度器myScheduler,如何跟Task.Factory发生关联。
注意下面调用的最后一个参数sd
Task.Factory.StartNew(() =>
worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token,TaskCreationOptions.None,sd);
最终源码:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; namespace WindowsFormsApplication1 { public partial class Form1 : Form { myScheduler sd = new myScheduler(); work worker = new work(); CancellationTokenSource cts = new CancellationTokenSource(); public Form1() { InitializeComponent(); TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException; } private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) { throw new NotImplementedException(); } private async void button1_Click(object sender, EventArgs e) { cts = new CancellationTokenSource(); var res = await appendTask(); richTextBox1.AppendText("ok "+DateTime.Now.ToString()+Environment.NewLine); } List<int> list1 = new List<int>() { 0, 1, 2 }; private Task<int> appendTask() { return Task.Run<int>(() => { //这里不能用for循环传i,否则会... //for (var i = 0; i < 3; i++) //{ sd.TaskIsOver = false; sd.Continue(); foreach (var m in list1) { Task.Factory.StartNew(() => worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token,TaskCreationOptions.None,sd); } //} while (true) { if(cts.IsCancellationRequested) { break; } if (sd.TaskIsOver) { worker.OnWork(new work.workEventArgs("TaskIsOver=true")); break; } Thread.Sleep(2); } return 0; }); } private void Form1_Load(object sender, EventArgs e) { worker.worked += Worker_worked; } private void Worker_worked(object sender, work.workEventArgs e) { AppendTextString(richTextBox1, e.outmsg + Environment.NewLine); } public static void AppendTextString(Control ctrl, string text, bool AndCRLF = true) { if (ctrl == null) return; if (ctrl.InvokeRequired) { Action<Control, string, bool> method = AppendTextString; ctrl.Invoke(method, new object[] { ctrl, text, AndCRLF }); } else { if (ctrl.Text == "") { ctrl.Text = text; } else { if (AndCRLF) { ctrl.Text += text; } else { ctrl.Text += text; } if (ctrl is RichTextBox) { var obj = ((RichTextBox)ctrl); obj.Select(obj.TextLength, 0); obj.ScrollToCaret(); } } } } private void button2_Click(object sender, EventArgs e) { //停止 sd.Pause(); } private void button3_Click(object sender, EventArgs e) { //急停 cts.Cancel(); } } public class work { public delegate void workEvnetHadle(object sender, workEventArgs e); public event workEvnetHadle worked; public class workEventArgs : EventArgs { public readonly string outmsg; public workEventArgs(string msg1) { this.outmsg = msg1; } } public virtual void OnWork(workEventArgs e) { if(worked!=null) { worked(this, e); } } static readonly object obj1 = new object(); string[] strary = new string[3] { "A...","B...","C..."}; public struct workstruct { public CancellationToken workcts; public int workjs; } public void workfun(workstruct data) { //lock (obj1) //{ for (int i = 0; i < 2; i++) { if(data.workcts.IsCancellationRequested) { return; } OnWork(new workEventArgs(strary[data.workjs])); Thread.Sleep(600); } //} } } public class myScheduler:TaskScheduler,IDisposable { private ManualResetEvent[] mc = new ManualResetEvent[2]; private BlockingCollection<Task> taskList = new BlockingCollection<Task>(); private Thread taskThread = null; public bool TaskIsOver { get; set; } = false; public myScheduler() { mc[0] = new ManualResetEvent(false); mc[1] = new ManualResetEvent(false); taskThread = new Thread(new ThreadStart(twork)); taskThread.IsBackground = true; taskThread.Start(); } public int taskCount() { return taskList.Count; } public void Pause() { mc[1].Reset(); } public void Continue() { mc[1].Set(); } private void twork() { while(true) { if(!IfEmptyWait()) { Thread.Sleep(2); continue; } Task task=null; try { if (taskList.TryTake(out task)) { if (null != task) { mc[1].WaitOne(); if (TryExecuteTask(task)) { task.ContinueWith(t => { var exp = t.Exception; }); task.Wait(); if (taskList.Count < 1) { TaskIsOver = true; } } } } } catch(AggregateException ex) { ex.Handle(predicate: e => { Console.WriteLine(e.Message); if (task.IsCanceled) { } if (task.IsFaulted) { } return true; }); } } } private bool IfEmptyWait() { if(taskList.Count<1) { mc[0].Reset(); } var s1=WaitHandle.WaitAny(mc); return s1==0; } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } protected override void QueueTask(Task task) { if(null!=task) { taskList.Add(task); if(taskList.Count>0) { mc[0].Set(); } } } protected override IEnumerable<Task> GetScheduledTasks() { return taskList.ToArray(); } protected override bool TryDequeue(Task task) { //var res= base.TryDequeue(task); //return res; return taskList.TryTake(out task); } private void Dispose(bool f1) { if (!f1) return; } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } public override int MaximumConcurrencyLevel { get { return base.MaximumConcurrencyLevel; } } } }
源码下载:
链接:https://pan.baidu.com/s/1I9FfSiPB1QGlh9oWW0LOuA
提取码:edes
--来自百度网盘超级会员V4的分享
另外再贴段一位朋友讲解的AutoResetEvent和ManualResetEvent区别:
在.Net多线程编程中,AutoResetEvent和ManualResetEvent这两个类经常用到, 他们的用法很类似,但也有区别。
Set方法将信号置为发送状态,Reset方法将信号置为不发送状态,WaitOne等待信号的发送。
可以通过构造函数的参数值来决定其初始状态,若为true则非阻塞状态,为false为阻塞状态。
如果某个线程调用WaitOne方法,则当信号处于发送状态时,该线程会得到信号, 继续向下执行。
其区别就在调用后,AutoResetEvent.WaitOne()每次只允许一个线程进入,当某个线程得到信号后,AutoResetEvent会自动又将信号置为不发送状态,则其他调用WaitOne的线程只有继续等待.也就是说,AutoResetEvent一次只唤醒一个线程;
而ManualResetEvent则可以唤醒多个线程,因为当某个线程调用了ManualResetEvent.Set()方法后,其他调用WaitOne的线程获得信号得以继续执行,而ManualResetEvent不会自动将信号置为不发送。
也就是说,除非手工调用了ManualResetEvent.Reset()方法,则ManualResetEvent将一直保持有信号状态,ManualResetEvent也就可以同时唤醒多个线程继续执行。

