MultiThreading with C# Threads

C# Threading

With .Net 4.0 we have the "Task" object in the Task Parallel Library (TPL), which is improved in .Net 4.5, and in C# 5.0 we get the "async/await" technology. We will start with the old-fashioned "Thread" class and move to "Task" and "async/await".

For more info see msdn.microsoft.com/concurrency.

First two terms:

  1. "Async"

    This is typically used to give better response time for users by pushing long-running tasks into the background so the machine can be more responsive.

  2. "Parallel"

    This is used to speed up the results of a CPU-bound task by performing calculations concurrently.

Types of Parallelism

  1. Data

    Same operation being executed across different data

  2. Task

    Same operation being executed across the same or different data.

  3. Data flow

    Data flows from one task to the next like in a pipeline. Parallelism requires communication and orchestration.

  4. Delightfully parallel

    Calculations are totally independent of each other - best possible case.

  1. The Parallel Class

    This is a higher level of abstraction. The number of background tasks are "chunked" into small sets.

    using System.Threading.Tasks; //to get the Parallel class
    
    Parallel.For(0,max, (i) => { Calculate(i); });
    
    Parallel.ForEach(list,(item) => { Calculate(item); });
    
    Parallel.Invoke(
    () => myTask1(),
    () => myTask2(),
    () => myTask3(),
    () => myTask4()
    );
    
    

    Exceptions thrown by For, ForEach, and Invoke are saved until all tasks complete and then are thrown as an AggregateException.

  2. ParallelLoopResult

    ParallelLoopState allows you to stop a loop.

    using System;
    using System.Collections.Specialized;
    using System.Security.Cryptography;
    using System.Threading.Tasks;
    
    namespace Practice
    {
        class TasksArgs
        {
            private static int n = 100;
            static void Main()
            {
                ParallelLoopResult parallelLoopResult = Parallel.For(0, n, 
                    (int i, ParallelLoopState loopControl) =>
                {
                    if (i > n - 2)
                    {
                        loopControl.Stop();
                    }
                    else
                    {
                        Console.WriteLine("working on " + i);
                    }
                });
    
                if (!parallelLoopResult.IsCompleted)
                {
                    Console.Out.WriteLine("Problem with parallel loop.");
                }
    
                Console.Write("Press 'Enter' to exit.");Console.In.ReadLine();
            }
        }
    }
    

    LoopControl.Stop() halts execution as fast as possible. LoopControl.Break() halts after all the earlier tasks have completed.

  1. Using "Thread" and "Thread.start()"
    1. Classic Style

      Simple multi-threaded example in c#.

      using System;
      using System.Threading;
      
      namespace Practice
      {
          class Threads1
          {
              static void Main()
              {
                  Thread t = new Thread(Cow);
                  t.Start();
                  for (int i = 0; i < 10; i++)
                  {
                     Console.Out.Write("Baa");Thread.Sleep(200);
                  }
                  Console.Write("Press 'Enter' to exit.");Console.In.ReadLine();
              }
              public static void Cow()
              {
                  for (int i = 0; i < 10; i++)
                  {
                      Console.Out.Write("Moo"); Thread.Sleep(200);
                  }
              }
          }
      }
      
      

      The output will not necessarily be the same; this time it produces:

      BaaBaaMooBaaMooBaaMooBaaMooBaaMooBaaMooBaaMooBaaBaaMoo
      
    2. Using Lambda functions

      You can use lambda functions to define the task for a thread.

      using System;
      using System.Threading;
      
      namespace Practice
      {
          public class Thread3
          {
              public static void Main(string[] args)
              {
                  Thread pig = new Thread(() => Console.Write("Oink"));
                  Thread cat = new Thread(() => Console.Write("Meow"));
                  pig.Start();
                  cat.Start();
              }
          }
      }
      
  2. Concurrency Issues
    1. The Problem

      When two threads change the same data bad things can happen. When the result of an operation depends on the timing of different threads it is called a "Race Condition".

      A race condition can be solved by the use of locks to protect shared code, but this inhibits parallelism but is simple to implement. Race conditions can be solved without the use of locks, but this requires more thought.

      An object is said to be "thread-safe" if it can be used in parallel and not have race conditions. For example, "Console.Out" and "ConcurrentQueue" are thread-safe, but "List" is not. Read the object's documentation to determine if it's thread-safe.

      Concurrent Data Collections: ConcurrentBag<T>,ConcurrentQueue<T>, ConcurrentStack<T>, ConcurrentDictionary<T>, BlockingCollection<T>.

      To create a task takes time and memory. A rule of thumb is that a task to be profitable needs to consume 300 cycles of the CPU.

      By default tasks will run in random order. To guarantee tasks start in creation order use the "TaskCreationOptions.Fairness" option in Task.Factory.StartNew()>

      For long running tasks use the "TaskCreationOptions.LongRunning" option so the task is created on a non-worker pool thread.

      In the following example two threads increment and then decrement a global variable. In C# incrementing and decrementing are not necessarily atomic and creates a race condition.

      using System;
      using System.Threading;
      
      public class ThreadTest2 {
      public class MyJob {
      	public static int counter = 0;
      	public int repetitions=100000000;
      	public void runme() {
      		for(int i=0;i<repetitions;i++) {
      			counter++;
      			counter--;
      		}
      	Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
      	}
      }
      	
          public static void Main(string[] args) {
      	MyJob myJob = new MyJob();
      	Thread thread = new Thread(myJob.runme);
      	thread.Name = "first";
              
      	MyJob myJob2 = new MyJob();
      	Thread thread2 = new Thread(myJob2.runme);
      	thread2.Name = "second";
              
      	thread.Start();
      	thread2.Start();
          }
      }
      

      The results can be:

                  first: 0
                  second: 0
      

      or sometimes

                  first: 1
                  second: 0
      

      We can never be sure of the outcome. We can overcome this in several ways.

    2. A Solution is to use the "lock()" construct

      lock(Object object) takes an object as the argument. In this example we can use "this" since both threads are using the same "MyJob" instance. In static functions you can use the "type" object, e.g., "lock(typeof(Util))" to synchronize.

      public class MyJob {
      public static int counter = 0;
      public int repetitions=100000000;
      public void runme() {
      	lock(this) {
      	   for(int i=0;i<repetitions;i++) {
      		counter++;
      		counter--;
      	   }
      	}
      	Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
        }
      }
      
    3. Hardware Locking

      Using Threading.Interlocked.Increment the CLR will guarantee an atomic operation in hardware (if possible). Another interesting method in the namespace is "Exchange()" which swaps two values atomically.

      public class MyJob {
      	public static int counter = 0;
      	public int repetitions=100000000;
      	public void runme() {
      		for(int i=0;i<repetitions;i++) {
      			System.Threading.Interlocked.Increment(ref counter);
      			System.Threading.Interlocked.Decrement(ref counter);
      		}
      		Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
      		
      	}
      }
      

      The results will always be:

                  first: 0
                  second: 0
      

      Hardware interlocking should be faster. While testing this you may need to have the cached memory on your system flushed. To do this use a utility from Sysinternals called "RamMap" and empty the standby list.

    4. The Synchronization attribute can be used.

      This locks the entire object - everything is single access. For performance reasons it is better to just lock the "critical section", the smallest section of code that causes an issue. Note the object must descend from ContextBoundObject.

      [System.Runtime.Remoting.Contexts.Synchronization]
      public class MyJob: ContextBoundObject {
      	public static int counter = 0;
      	public int repetitions=100000000;
      	public void runme() {
      		for(int i=0;i<repetitions;i++) {
      			counter++;
      			counter--;
      		}
      		Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
      		
      	}
      }
      
  3. Example of using Join()

    Join() hitches the fate of a thread to the current thread. Execution of the calling thread will wait until the callee's process completes. In this example we Join on a thread that takes 2 seconds to complete, then Join on a second that still has 2 seconds to go.

    using System;
    using System.Threading;
    //Example showing use of Join()
    public class ThreadTest4 {
    	public class MyJob {
    		public static int counter = 0;
    		public int waitfor=1;
    		public int finalState = -1;
    		//sleeps then fills in finalState to simulate work done
    		public void runme() {
    		Thread.Sleep(waitfor);
    		finalState = waitfor;
    		Console.WriteLine(Thread.CurrentThread.Name+" finished sleeping finalState: "+finalState);
    		}
    	}
    	
        public static void Main(string[] args) {
    	MyJob myJob = new MyJob();
    	myJob.waitfor = 2000;
    	Thread thread = new Thread(myJob.runme);
    	thread.Name = "first";
            
    	MyJob myJob2 = new MyJob();
    	myJob2.waitfor = 4000;
    	Thread thread2 = new Thread(myJob2.runme);
    	thread2.Name = "second";
            
    	thread.Start();
    	thread2.Start();
    	
    	Console.WriteLine("After start.");
    	Console.WriteLine("Before first join.");
    	thread.Join();
    	Console.WriteLine("After first join.");
    	Console.WriteLine("Before second join.");
    	thread2.Join();
    	Console.WriteLine("After second join.");
    	
    	Console.WriteLine("myJob.finalState="+myJob.finalState);
    	Console.WriteLine("myJob2.finalState="+myJob2.finalState);
        }
    }
    

    This produces

    After start.
    Before first join.
    first finished sleeping finalState: 2000
    After first join.
    Before second join.
    second finished sleeping finalState: 4000
    After second join.
    myJob.finalState=2000
    myJob2.finalState=4000
    
  4. Join() with timeout

    What if a process may never finish? The Join() method has an optional parameter specifying how many millisecs to wait. The Join will give up after that time and return a 'false'. The following example shows the main thread waiting 2 seconds for a job that will take 8 seconds. After waiting 2 seconds, it gets a 'false' value back implying it did not finish in 2 seconds. Lacking any mercy we waste the process and move on to wait for the second, which has already completed.

    using System;
    using System.Threading;
    //Example showing use of Join()
    public class ThreadTest6 {
    	public class MyJob {
    		public static int counter = 0;
    		public int waitfor=1;
    		public int finalState = -1;
    		//sleeps then fills in finalState to simulate work done
    		public void runme() {
    		Thread.Sleep(waitfor);
    		finalState = waitfor;
    		Console.WriteLine(Thread.CurrentThread.Name+" finished sleeping finalState: "+finalState);
    		}
    	}
    	
        public static void Main(string[] args) {
    	MyJob myJob = new MyJob();
    	myJob.waitfor = 8000;
    	Thread thread = new Thread(myJob.runme);
    	thread.Name = "first";
            
    	MyJob myJob2 = new MyJob();
    	myJob2.waitfor = 500;
    	Thread thread2 = new Thread(myJob2.runme);
    	thread2.Name = "second";
            
    	thread.Start();
    	thread2.Start();
    	
    	Console.WriteLine("After start.");
    	Console.WriteLine("Before first join.");
    	bool finished = thread.Join(2000);
    	if(!finished) { thread.Abort(); }
    	Console.WriteLine("finishedP:"+finished);
    	
    	Console.WriteLine("After first join.");
    	Console.WriteLine("Before second join.");
    	thread2.Join(2000);
    	Console.WriteLine("After second join.");
    	
    	Console.WriteLine("myJob.finalState="+myJob.finalState);
    	Console.WriteLine("myJob2.finalState="+myJob2.finalState);
        }
    }
    

    Which produces:

    After start.
    Before first join.
    second finished sleeping finalState: 500
    finishedP:False
    After first join.
    Before second join.
    After second join.
    myJob.finalState=-1
    myJob2.finalState=500
    
  5. Using the "Task" object to create asynchronous threads.

    In .Net 4.0 the "Task" object was introduced in "System.Threading.Tasks" with the Task Parallel Library (TPL). This puts syntactic sugar over creating Threading objects. In .Net 4.5 "Task.Run()" was created to make things even easier since they auto-start.

    A Task allows us to check on status, wait, grab the results when it's done and look at exceptions thrown.

    1. Waiting
      Task task1 = Task.Factory.StartNew( lambda1 );
      Task task2 = Task.Factory.StartNew( lambda2 );
      Task[] tasks = new Task[] {task1, task2};
      ...
      task1.Wait();//waits for a single task
      Task.WaitAll(tasks); //waits for all tasks
      ...
      int index = Task.WaitAny(tasks); //waits until one finishes
      
      Console.WriteLine(task.Status);//one of RanToCompletion, Canceled, or Faulted
      
      task.Result calls implicit Wait().
    2. Composing tasks

      task2 can be scheduled to run after task1 completes by using "ContinueWith".

      Task task1 = Task.Factory.StartNew( lambda1 );
      Task task2 = task1.ContinueWith( (antecedent) =>
      lambda2 /* which can access antecedent.Result*/);
      
      Task.Factory.ContinueWhenAll(tasks, (tasks) =>
      { ... //code to run when all tasks have finished }
      
      Task.Factory.ContinueWhenAny(tasks, (firstTask) =>
      { ... //code to run when one task has finished }
      
      
      

      After starting a thread to run, control will return to the main thread until a "task.Wait()" or a reference to "task.Result" is reached.

      using System;
      using System.Threading;
      using System.Threading.Tasks;
      
      namespace Practice
      {
          class Tasks
          {
              static void Main()
              {
                  string favoriteAnimal = "Artic Fox";
                  Task task = Task.Run(() =>
                  {
                      Thread.Sleep(2);
                      favoriteAnimal = "Owl";
                  });
                  Console.Out.WriteLine("favoriteAnimal = {0}", favoriteAnimal);//Artic Fox
                  task.Wait(5000);//wait for task to complete or 5000 ms timeout
                  Console.Out.WriteLine("favoriteAnimal = {0}", favoriteAnimal);//Owl
              Console.Write("Press 'Enter' to exit.");Console.In.ReadLine();
              }
          }
      }
      
    3. Exceptions

      When a task throws an exception the task is terminated, the exception is saved in an AggregateException and stored in the task's Exception property. The aggregate exception is thrown later when the task's Wait(), Result(), or WaitAll() are called and the original exception is in the "inner" exception of the aggregate exception.

      The exceptions may be deeply nested, but can be flatten by using the "Flatten()" method on an AggreagateException.

      If you do not handle (or "observe" exceptions as Microsoft likes to call this) a task's exceptions, the exception will be thrown during garbage collection - not an ideal time.

      To observe the exception do one of the following:
      1. invoke ".Wait()" or access ".Result"
      2. invoke "Task.WaitAll()"
      3. examine the ".Exception" property
      4. subscribe to "TaskScheduler.UnobservedTaskException". Useful when using 3rd party TPL libraries. Be sure to call exception.SetObserved() inside the handler.
      TaskScheduler.UnobservedTaskException +=
       new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
      
      static void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
      e.SetObserved();
      }
      
    4. Cancelling a Task

      Cancelling a task requires cooperative between the calling code and the task itself.

      var cancellationTokenSource = new CancellationTokenSource();
      var token = cancellationTokenSource.Token;
      Task t = Task.Factory.StartNew( () =>
      {
      while(...) {
      if(token.IsCancellationRequested) {
      ... //cleanup
      token.ThrowIfCancellationRequested();//throws OperationCancelledException
      }
      ...//do work
      }
      }, token);
      
      //in main program
      if(need to cancel) cancellationTokenSource.Cancel();
      
      
    5. Passing Data to Tasks

      Task.Factory.StartNew() allows us to pass in an object parameter after the definition of the lambda.

      Task t = Task.Factory.StartNew( (arg) => { 
         Console.WriteLine((string)arg);
      },"apples");
      
    6. Returning a value

      With generic tasks you can return a value of type T

      using System;
      using System.Threading;
      using System.Threading.Tasks;
      
      namespace Practice
      {
          public class Thread4
          {
              public static void Main(string[] args)
              {
                  Task<string> task = Task.Run(() =>
                  {
                      Thread.Sleep(300);
                      return "stew";
                  });
                  string result = task.Result;
                  Console.Out.WriteLine("result = {0}", result);
                  Console.Write("Press 'Enter' to exit.");
                  Console.In.ReadLine();
              }
          }
      }
      
  6. Awaiters and Continuations

    It's common to have something you want to execute after a task is finished. This is called a 'continuation' method. We can run a continuation right after a task finishes by getting the "Awaiter" object from a task and assigning a delegate to that object. The "Awaiter" object will then tell the task that when it is finished to run this delegate.

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace Practice
    {
        public class Threads5
        {
            public static void Main(string[] args)
            {
                Task<int> task = Task.Run(() =>
                {
                    Thread.Sleep(TimeSpan.FromSeconds(2));
                    return 17;
                });
                var awaiter = task.GetAwaiter();
                awaiter.OnCompleted(() =>
                {
                    int result = awaiter.GetResult();
                    Console.Out.WriteLine("result = {0}", result);
                });
    
                Console.Write("Press 'Enter' to exit.");
                Console.In.ReadLine();
            }
        }
    }
    

    A common scenario is to have the user interface update after a compute intensive task finishes, but only the thread that created a UI component can update that component, so you have to run a continuation task on the main thread. This is done by specifying a context to the thread with "TaskScheduler.FromCurrentSynchronizationContext()".

    To launch tasks more efficiently use "Task.Factory.StartNew( code );" instead of "Task.Run(code);"

  7. General Notes:

    When launching many long-running tasks, i.e., much more than the number of processors, the default behavior of the TPL is to assume each task only takes a few seconds to complete, so the TPL will slowly flood the system with tasks and thrashing will ensue.

    To implement the Producer-Consumer pattern, use a "BlockingCollection".

    To get the number of processors to set the number of consumers use "System.Environment.Processorcount".

    Parallel LINQ (plinq) can be easy to use by just adding ".AsParallel()" after the source, "(from employee in elist.AsParallel() select employee.pay).Sum();". Plinq only works on in-memory data.

    Threads have one of the following priorities:

     enum ThreadPriority { Lowest, BelowNormal, Normal, AboveNormal, Highest }
    

    Threads can have a status of "foreground" (the default) or "background". If all other threads have completed, a "foreground" task will keep an application running, while a "background" task will not. The status does not effect priority.

    The Asynchronous Programming Model (APM) has a "Begin" to initiate and an "End" to finish.

  8. Async/Await

    Async / await Example from my notes on C# 5.0.

    Before the C# 5.0, we had to write asynchronous methods with callbacks and be careful about error conditions. With "async" and "await" it's much more straight forward.

    using System;
    using System.Collections.Generic;
    using System.Net.Http;
    using System.Threading.Tasks;
    using Newtonsoft.Json;
    
    namespace AsyncHttpClient
    {
        /// <summary>
        /// Tiny example of using HttpClient.GetAsync() with Generics.
        /// Uses the REST API calls from the most excellent mysafeinfo.com for presidents and Beatles albums
        /// This prints:
        /// Requesting presidents
        /// Requesting Beatles albums
        /// ... waiting ...
        /// first president = number: '1','', party: '', term: ''
        /// first Beatles album = album name: 'Please Please Me (Mono)', date: '1963-03-22T00:00:00'
        /// </summary>
        public class President
        {
            public int president;
            public string name, party, term;
            public override string ToString() { return $"number: '{president}','{name}', party: '{party}', term: '{term}'"; }
        }
        public class BeatlesAlbum
        {
            public string album, date;
            public override string ToString() { return $"album name: '{album}', date: '{date}'"; }
        }
    
        class AsyncHttpClientExample
        {
                private static void Main()
            {
                string presidentsUrl = "https://mysafeinfo.com/api/data?list=presidents&format=json";
                string beatlesUrl = "https://mysafeinfo.com/api/data?list=beatlesalbums&format=json&select=ent,typ,rd&alias=ent=artist,typ=album,rd=date";
                var asyncHttpClientExample = new AsyncHttpClientExample();
                Console.Out.WriteLine("Requesting presidents");
                var presidents = asyncHttpClientExample.GetAsync<List<President>>(presidentsUrl);
                Console.Out.WriteLine("Requesting Beatles albums");
                var albums = asyncHttpClientExample.GetAsync<List<BeatlesAlbum>>(beatlesUrl);
                Console.Out.WriteLine("... waiting ...");
                Console.Out.WriteLine("first president = {0}", presidents.Result[0]);
                Console.Out.WriteLine("first Beatles album = {0}", albums.Result[0]);
            }
            private async Task<T> GetAsync<T>(string url)
            {
                HttpClient client = new HttpClient(new HttpClientHandler());
                HttpResponseMessage response = await client.GetAsync(url).ConfigureAwait(false);
                var jsonString = response.Content.ReadAsStringAsync().Result;
                T result = JsonConvert.DeserializeObject<T>(jsonString);
    
                return result;
            }
        }
    }