C# Threading

With .Net 4.0 we have the "Task" object in the Task Parallel Library (TPL), which is improved in .Net 4.5, and in C# 5.0 we get the "async/await" technology. We will start with the old-fashioned "Thread" class and move to "Task" and "async/await".

For more info see msdn.microsoft.com/concurrency.

First two terms:

  • "Async"

    This is typically used to give better response time for users by pushing long-running tasks into the background so the machine can be more responsive.

  • "Parallel"

    This is used to speed up the results of a CPU-bound task by performing calculations concurrently.

Types of Parallelism

  1. Data

    Same operation being executed across different data

  2. Task

    Same operation being executed across the same or different data.

  3. Data flow

    Data flows from one task to the next like in a pipeline. Parallelism requires communication and orchestration.

  4. Delightfully parallel

    Calculations are totally independent of each other - best possible case.

  • The Parallel Class

    This is a higher level of abstraction. The number of background tasks are "chunked" into small sets.

    using System.Threading.Tasks; //to get the Parallel class
    
    Parallel.For(0,max, (i) => { Calculate(i); });
    
    Parallel.ForEach(list,(item) => { Calculate(item); });
    
    Parallel.Invoke(
    () => myTask1(),
    () => myTask2(),
    () => myTask3(),
    () => myTask4()
    );
    
    

    Exceptions thrown by For, ForEach, and Invoke are saved until all tasks complete and then are thrown as an AggregateException.

  • ParallelLoopResult

    ParallelLoopState allows you to stop a loop.

    using System;
    using System.Collections.Specialized;
    using System.Security.Cryptography;
    using System.Threading.Tasks;
    
    namespace Practice
    {
        class TasksArgs
        {
            private static int n = 100;
            static void Main()
            {
                ParallelLoopResult parallelLoopResult = Parallel.For(0, n, 
                    (int i, ParallelLoopState loopControl) =>
                {
                    if (i > n - 2)
                    {
                        loopControl.Stop();
                    }
                    else
                    {
                        Console.WriteLine("working on " + i);
                    }
                });
    
                if (!parallelLoopResult.IsCompleted)
                {
                    Console.Out.WriteLine("Problem with parallel loop.");
                }
    
                Console.Write("Press 'Enter' to exit.");Console.In.ReadLine();
            }
        }
    }
    

    LoopControl.Stop() halts execution as fast as possible. LoopControl.Break() halts after all the earlier tasks have completed.

    1. Using "Thread" and "Thread.start()"
      1. Classic Style

        Simple multi-threaded example in c#.

        using System;
        using System.Threading;
        
        namespace Practice
        {
            class Threads1
            {
                static void Main()
                {
                    Thread t = new Thread(Cow);
                    t.Start();
                    for (int i = 0; i < 10; i++)
                    {
                       Console.Out.Write("Baa");Thread.Sleep(200);
                    }
                    Console.Write("Press 'Enter' to exit.");Console.In.ReadLine();
                }
                public static void Cow()
                {
                    for (int i = 0; i < 10; i++)
                    {
                        Console.Out.Write("Moo"); Thread.Sleep(200);
                    }
                }
            }
        }
        
        

        The output will not necessarily be the same; this time it produces:

        BaaBaaMooBaaMooBaaMooBaaMooBaaMooBaaMooBaaMooBaaBaaMoo
        
      2. Using Lambda functions

        You can use lambda functions to define the task for a thread.

        using System;
        using System.Threading;
        
        namespace Practice
        {
            public class Thread3
            {
                public static void Main(string[] args)
                {
                    Thread pig = new Thread(() => Console.Write("Oink"));
                    Thread cat = new Thread(() => Console.Write("Meow"));
                    pig.Start();
                    cat.Start();
                }
            }
        }
        
    2. Concurrency Issues
      1. The Problem

        When two threads change the same data bad things can happen. When the result of an operation depends on the timing of different threads it is called a "Race Condition".

        A race condition can be solved by the use of locks to protect shared code, but this inhibits parallelism but is simple to implement. Race conditions can be solved without the use of locks, but this requires more thought.

        An object is said to be "thread-safe" if it can be used in parallel and not have race conditions. For example, "Console.Out" and "ConcurrentQueue" are thread-safe, but "List" is not. Read the object's documentation to determine if it's thread-safe.

        Concurrent Data Collections: ConcurrentBag<T>,ConcurrentQueue<T>, ConcurrentStack<T>, ConcurrentDictionary<T>, BlockingCollection<T>.

        To create a task takes time and memory. A rule of thumb is that a task to be profitable needs to consume 300 cycles of the CPU.

        By default tasks will run in random order. To guarantee tasks start in creation order use the "TaskCreationOptions.Fairness" option in Task.Factory.StartNew()>

        For long running tasks use the "TaskCreationOptions.LongRunning" option so the task is created on a non-worker pool thread.

        In the following example two threads increment and then decrement a global variable. In C# incrementing and decrementing are not necessarily atomic and creates a race condition.

        using System;
        using System.Threading;
        
        public class ThreadTest2 {
        public class MyJob {
        	public static int counter = 0;
        	public int repetitions=100000000;
        	public void runme() {
        		for(int i=0;i<repetitions;i++) {
        			counter++;
        			counter--;
        		}
        	Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
        	}
        }
        	
            public static void Main(string[] args) {
        	MyJob myJob = new MyJob();
        	Thread thread = new Thread(myJob.runme);
        	thread.Name = "first";
                
        	MyJob myJob2 = new MyJob();
        	Thread thread2 = new Thread(myJob2.runme);
        	thread2.Name = "second";
                
        	thread.Start();
        	thread2.Start();
            }
        }
        

        The results can be:

                    first: 0
                    second: 0
        

        or sometimes

                    first: 1
                    second: 0
        

        We can never be sure of the outcome. We can overcome this in several ways.

      2. A Solution is to use the "lock()" construct

        lock(Object object) takes an object as the argument. In this example we can use "this" since both threads are using the same "MyJob" instance. In static functions you can use the "type" object, e.g., "lock(typeof(Util))" to synchronize.

        public class MyJob {
        public static int counter = 0;
        public int repetitions=100000000;
        public void runme() {
        	lock(this) {
        	   for(int i=0;i<repetitions;i++) {
        		counter++;
        		counter--;
        	   }
        	}
        	Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
          }
        }
        
      3. Hardware Locking

        Using Threading.Interlocked.Increment the CLR will guarantee an atomic operation in hardware (if possible). Another interesting method in the namespace is "Exchange()" which swaps two values atomically.

        public class MyJob {
        	public static int counter = 0;
        	public int repetitions=100000000;
        	public void runme() {
        		for(int i=0;i<repetitions;i++) {
        			System.Threading.Interlocked.Increment(ref counter);
        			System.Threading.Interlocked.Decrement(ref counter);
        		}
        		Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
        		
        	}
        }
        

        The results will always be:

                    first: 0
                    second: 0
        

        Hardware interlocking should be faster. While testing this you may need to have the cached memory on your system flushed. To do this use a utility from Sysinternals called "RamMap" and empty the standby list.

      4. The Synchronization attribute can be used.

        This locks the entire object - everything is single access. For performance reasons it is better to just lock the "critical section", the smallest section of code that causes an issue. Note the object must descend from ContextBoundObject.

        [System.Runtime.Remoting.Contexts.Synchronization]
        public class MyJob: ContextBoundObject {
        	public static int counter = 0;
        	public int repetitions=100000000;
        	public void runme() {
        		for(int i=0;i<repetitions;i++) {
        			counter++;
        			counter--;
        		}
        		Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
        		
        	}
        }
        
    3. Example of using Join()

      Join() hitches the fate of a thread to the current thread. Execution of the calling thread will wait until the callee's process completes. In this example we Join on a thread that takes 2 seconds to complete, then Join on a second that still has 2 seconds to go.

      using System;
      using System.Threading;
      //Example showing use of Join()
      public class ThreadTest4 {
      	public class MyJob {
      		public static int counter = 0;
      		public int waitfor=1;
      		public int finalState = -1;
      		//sleeps then fills in finalState to simulate work done
      		public void runme() {
      		Thread.Sleep(waitfor);
      		finalState = waitfor;
      		Console.WriteLine(Thread.CurrentThread.Name+" finished sleeping finalState: "+finalState);
      		}
      	}
      	
          public static void Main(string[] args) {
      	MyJob myJob = new MyJob();
      	myJob.waitfor = 2000;
      	Thread thread = new Thread(myJob.runme);
      	thread.Name = "first";
              
      	MyJob myJob2 = new MyJob();
      	myJob2.waitfor = 4000;
      	Thread thread2 = new Thread(myJob2.runme);
      	thread2.Name = "second";
              
      	thread.Start();
      	thread2.Start();
      	
      	Console.WriteLine("After start.");
      	Console.WriteLine("Before first join.");
      	thread.Join();
      	Console.WriteLine("After first join.");
      	Console.WriteLine("Before second join.");
      	thread2.Join();
      	Console.WriteLine("After second join.");
      	
      	Console.WriteLine("myJob.finalState="+myJob.finalState);
      	Console.WriteLine("myJob2.finalState="+myJob2.finalState);
          }
      }
      

      This produces

      After start.
      Before first join.
      first finished sleeping finalState: 2000
      After first join.
      Before second join.
      second finished sleeping finalState: 4000
      After second join.
      myJob.finalState=2000
      myJob2.finalState=4000
      
    4. Join() with timeout

      What if a process may never finish? The Join() method has an optional parameter specifying how many millisecs to wait. The Join will give up after that time and return a 'false'. The following example shows the main thread waiting 2 seconds for a job that will take 8 seconds. After waiting 2 seconds, it gets a 'false' value back implying it did not finish in 2 seconds. Lacking any mercy we waste the process and move on to wait for the second, which has already completed.

      using System;
      using System.Threading;
      //Example showing use of Join()
      public class ThreadTest6 {
      	public class MyJob {
      		public static int counter = 0;
      		public int waitfor=1;
      		public int finalState = -1;
      		//sleeps then fills in finalState to simulate work done
      		public void runme() {
      		Thread.Sleep(waitfor);
      		finalState = waitfor;
      		Console.WriteLine(Thread.CurrentThread.Name+" finished sleeping finalState: "+finalState);
      		}
      	}
      	
          public static void Main(string[] args) {
      	MyJob myJob = new MyJob();
      	myJob.waitfor = 8000;
      	Thread thread = new Thread(myJob.runme);
      	thread.Name = "first";
              
      	MyJob myJob2 = new MyJob();
      	myJob2.waitfor = 500;
      	Thread thread2 = new Thread(myJob2.runme);
      	thread2.Name = "second";
              
      	thread.Start();
      	thread2.Start();
      	
      	Console.WriteLine("After start.");
      	Console.WriteLine("Before first join.");
      	bool finished = thread.Join(2000);
      	if(!finished) { thread.Abort(); }
      	Console.WriteLine("finishedP:"+finished);
      	
      	Console.WriteLine("After first join.");
      	Console.WriteLine("Before second join.");
      	thread2.Join(2000);
      	Console.WriteLine("After second join.");
      	
      	Console.WriteLine("myJob.finalState="+myJob.finalState);
      	Console.WriteLine("myJob2.finalState="+myJob2.finalState);
          }
      }
      

      Which produces:

      After start.
      Before first join.
      second finished sleeping finalState: 500
      finishedP:False
      After first join.
      Before second join.
      After second join.
      myJob.finalState=-1
      myJob2.finalState=500
      
    5. Using the "Task" object to create asynchronous threads.

      In .Net 4.0 the "Task" object was introduced in "System.Threading.Tasks" with the Task Parallel Library (TPL). This puts syntactic sugar over creating Threading objects. In .Net 4.5 "Task.Run()" was created to make things even easier since they auto-start.

      A Task allows us to check on status, wait, grab the results when it's done and look at exceptions thrown.

      1. Waiting
        Task task1 = Task.Factory.StartNew( lambda1 );
        Task task2 = Task.Factory.StartNew( lambda2 );
        Task[] tasks = new Task[] {task1, task2};
        ...
        task1.Wait();//waits for a single task
        Task.WaitAll(tasks); //waits for all tasks
        ...
        int index = Task.WaitAny(tasks); //waits until one finishes
        
        Console.WriteLine(task.Status);//one of RanToCompletion, Canceled, or Faulted
        
        task.Result calls implicit Wait().
      2. Composing tasks

        task2 can be scheduled to run after task1 completes by using "ContinueWith".

        Task task1 = Task.Factory.StartNew( lambda1 );
        Task task2 = task1.ContinueWith( (antecedent) =>
        lambda2 /* which can access antecedent.Result*/);
        
        Task.Factory.ContinueWhenAll(tasks, (tasks) =>
        { ... //code to run when all tasks have finished }
        
        Task.Factory.ContinueWhenAny(tasks, (firstTask) =>
        { ... //code to run when one task has finished }
        
        
        

        After starting a thread to run, control will return to the main thread until a "task.Wait()" or a reference to "task.Result" is reached.

        using System;
        using System.Threading;
        using System.Threading.Tasks;
        
        namespace Practice
        {
            class Tasks
            {
                static void Main()
                {
                    string favoriteAnimal = "Artic Fox";
                    Task task = Task.Run(() =>
                    {
                        Thread.Sleep(2);
                        favoriteAnimal = "Owl";
                    });
                    Console.Out.WriteLine("favoriteAnimal = {0}", favoriteAnimal);//Artic Fox
                    task.Wait(5000);//wait for task to complete or 5000 ms timeout
                    Console.Out.WriteLine("favoriteAnimal = {0}", favoriteAnimal);//Owl
                Console.Write("Press 'Enter' to exit.");Console.In.ReadLine();
                }
            }
        }
        
      3. Exceptions

        When a task throws an exception the task is terminated, the exception is saved in an AggregateException and stored in the task's Exception property. The aggregate exception is thrown later when the task's Wait(), Result(), or WaitAll() are called and the original exception is in the "inner" exception of the aggregate exception.

        The exceptions may be deeply nested, but can be flatten by using the "Flatten()" method on an AggreagateException.

        If you do not handle (or "observe" exceptions as Microsoft likes to call this) a task's exceptions, the exception will be thrown during garbage collection - not an ideal time.

        To observe the exception do one of the following:
        1. invoke ".Wait()" or access ".Result"
        2. invoke "Task.WaitAll()"
        3. examine the ".Exception" property
        4. subscribe to "TaskScheduler.UnobservedTaskException". Useful when using 3rd party TPL libraries. Be sure to call exception.SetObserved() inside the handler.
        TaskScheduler.UnobservedTaskException +=
         new EventHandler(TaskScheduler_UnobservedTaskException);
        
        static void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
        e.SetObserved();
        }
        
      4. Cancelling a Task

        Cancelling a task requires cooperative between the calling code and the task itself.

        var cancellationTokenSource = new CancellationTokenSource();
        var token = cancellationTokenSource.Token;
        Task t = Task.Factory.StartNew( () =>
        {
        while(...) {
        if(token.IsCancellationRequested) {
        ... //cleanup
        token.ThrowIfCancellationRequested();//throws OperationCancelledException
        }
        ...//do work
        }
        }, token);
        
        //in main program
        if(need to cancel) cancellationTokenSource.Cancel();
        
        
      5. Passing Data to Tasks

        Task.Factory.StartNew() allows us to pass in an object parameter after the definition of the lambda.

        Task t = Task.Factory.StartNew( (arg) => { 
           Console.WriteLine((string)arg);
        },"apples");
        
      6. Returning a value

        With generic tasks you can return a value of type T

        using System;
        using System.Threading;
        using System.Threading.Tasks;
        
        namespace Practice
        {
            public class Thread4
            {
                public static void Main(string[] args)
                {
                    Task<string> task = Task.Run(() =>
                    {
                        Thread.Sleep(300);
                        return "stew";
                    });
                    string result = task.Result;
                    Console.Out.WriteLine("result = {0}", result);
                    Console.Write("Press 'Enter' to exit.");
                    Console.In.ReadLine();
                }
            }
        }
        
    6. Awaiters and Continuations

      It's common to have something you want to execute after a task is finished. This is called a 'continuation' method. We can run a continuation right after a task finishes by getting the "Awaiter" object from a task and assigning a delegate to that object. The "Awaiter" object will then tell the task that when it is finished to run this delegate.

      using System;
      using System.Threading;
      using System.Threading.Tasks;
      
      namespace Practice
      {
          public class Threads5
          {
              public static void Main(string[] args)
              {
                  Task task = Task.Run(() =>
                  {
                      Thread.Sleep(TimeSpan.FromSeconds(2));
                      return 17;
                  });
                  var awaiter = task.GetAwaiter();
                  awaiter.OnCompleted(() =>
                  {
                      int result = awaiter.GetResult();
                      Console.Out.WriteLine("result = {0}", result);
                  });
      
                  Console.Write("Press 'Enter' to exit.");
                  Console.In.ReadLine();
              }
          }
      }
      

      A common scenario is to have the user interface update after a compute intensive task finishes, but only the thread that created a UI component can update that component, so you have to run a continuation task on the main thread. This is done by specifying a context to the thread with "TaskScheduler.FromCurrentSynchronizationContext()".

      To launch tasks more efficiently use "Task.Factory.StartNew( code );" instead of "Task.Run(code);"

    7. General Notes:

      When launching many long-running tasks, i.e., much more than the number of processors, the default behavior of the TPL is to assume each task only takes a few seconds to complete, so the TPL will slowly flood the system with tasks and thrashing will ensue.

      To implement the Producer-Consumer pattern, use a "BlockingCollection".

      To get the number of processors to set the number of consumers use "System.Environment.Processorcount".

      Parallel LINQ (plinq) can be easy to use by just adding ".AsParallel()" after the source, "(from employee in elist.AsParallel() select employee.pay).Sum();". Plinq only works on in-memory data.

      The Asynchronous Programming Model (APM) has a "Begin" to initiate and an "End" to finish.

  • Go to Home page. Kindly report errors, typos, or misspellings here.