Category Archives: Threading

ExecutorService with Callable and Future

In last post I talked about ExecutorService basics.

Let us take it forward to understand usage of Future keyword. So far we have seen Executor service usage to start executing a thread in controlled way. What if we need to get a returned object from the thread being executed. Callable interface comes to help. Instead of implmenting Runnable, we can implement Callable to create thread. In this case we will override call() method method instead of run().

From Javadocs: The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

The result of callable class is retrived in Future instance. Instead of executor.execute, we will call submit method which forces ExecutorService to return the Future instance. execute can also be used with Runnable instance, but in that case it will return null on completion as run() method cannot return an object.

Further Reading

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Callable.html

package com.kamalmeet;

import java.util.concurrent.*;

public class CallableTest {
	int num = 1;

	public static void main(String s[]) {
		ExecutorService executor = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 10; i++) {
			Future<CallableTest> ft1 = executor.submit(new RunTest1());
			try {
				CallableTest ct1 = (CallableTest) ft1.get();
				System.out.println("ct1 val:" + ct1.num);
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}		}
		executor.shutdown();
	}
}

class RunTest1 implements Callable<CallableTest> {
	@Override
	public CallableTest call() throws Exception {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		CallableTest ct = new CallableTest();
		ct.num = 10;
		return ct;
	}
}

ExecutorService

Here is refresher of multithreading in java.

Java (since 1.5) has come up with more controlled and cleaner way to handle threads, i.e. ExecutorService.

ExecutorService is a way in which we can create thread pools and execute threads in controlled manner, i.e. can define number of threads to be allowed at the same time.

ExecutorService ex = Executors.newSingleThreadExecutor();
ExecutorService ex = Executors.newFixedThreadPool(5);
ExecutorService ex = Executors.newScheduledThreadPool(10);

In first case we are creating a singlethreaded pool, whereas in second case we are defining a pool of fixed length. In third case we are creating a pool of threads which can perform activities at a defined time.

Read more
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html
http://tutorials.jenkov.com/java-util-concurrent/executorservice.html

A simple code example

Play around with thread pool size to understand the usage.

package com.kamalmeet;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadTestExecutor {
	public static void main(String s[]) {
		ExecutorService executor = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 10; i++) {
			executor.execute(new RunTest());
			executor.execute(new RunTest2());
		}
		executor.shutdown();
	}
}

class RunTest implements Runnable {
	@Override
	public void run() {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("run" + Thread.currentThread().getName());
	}
}

class RunTest2 implements Runnable {
	@Override
	public void run() {
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("run2" + Thread.currentThread().getName());
	}
}

At this point let us dig a bit into what happens behind the scenes when we initialize an ExecutorService Instance. In Executor class we have this implementation for the newFixedThreadPool method

public static ExecutorService newFixedThreadPool(int nThreads) {
   return new ThreadPoolExecutor(nThreads, nThreads, 0L,  TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

Similar to the newFixedThreadPool, we have implementation for SingleThreadExector and CachedThreadPool

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

Let’s take a look at ThreadPoolExecutor as well

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters, the
     * {@linkplain Executors#defaultThreadFactory default thread factory}
     * and the {@linkplain ThreadPoolExecutor.AbortPolicy
     * default rejected execution handler}.
     *
     * <p>It may be more convenient to use one of the {@link Executors}
     * factory methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

If you need additional control on the type of threads being created, you can send ThreadFactory as an additional parameter to ThreadPoolExecutor.

Before closing on the topic, it is worth looking at Atomic Variables. Atomic variables provide a way to maintain a thread-safe state for variables. This implements CAS (Compare and Swap) approach, which is faster than implementing synchronization as that needs locking and unlocking mechanism. The most commonly used atomic variable classes in Java are AtomicIntegerAtomicLongAtomicBoolean, and AtomicReference.

Understanding wait, notify and notifyAll using producer-consumer problem

In last post I mentioned about some of the Java Thread’s utility methods. Here I will try to explain wait and notify concept in Threads using classic producer – consumer problem.

Wait: It tells current thread to wait till some notifies it to resume execution, myThread.wait(). There is another version of wait which takes milliseconds as input, wait(2000), it basically tells the thread to wait till someone notifies or a max of 2 seconds.

Notify: Notify statement wakes up one of the threads (randomly chosen in case multiple threads are waiting) waiting on object, to resume execution.

Notifyall: The only difference between notify and notifyAll is that later one notifies all the threads waiting on the object.

Producer- Consumer problem: This is a classic problem where producer and consumer are 2 independent entities. As the name suggests, producer is producing something which is consumed by consumer. The challenge is that one of the entities can be faster than other, for example consumer is consuming faster than producer is producing, in this case the error condition will occur when consumer has nothing to consume (as producer is slow). Vice-versa can be that producer is producing too fast for consumer to consume, so that the storage area is overflowing (as consumer is not able to consume at same speed).

To solve the problem, wait and notify operations come to rescue. So if one entity reaches error state, say consumer has run out of objects to consume, it will wait till a new object is available.

Here is an example of case two where producer is producing faster, so evetually will need to wait at a stage where queue is full till consumer has consumed objects.

The second case (consumer is faster than producer) can be simulated by reversing the sleep time.

package com.kamalmeet;

public class ProducerConsumer {
	public static void main(String s[]) {
		Queue q = new Queue();
		Producer p = new Producer(q);
		Consumer c = new Consumer(q);
		Thread t1 = new Thread(p);
		Thread t2 = new Thread(c);
		t1.start();
		t2.start();
		System.out.println("Exiting main thread now");
	}
}

class Queue {
	int arr[] = new int[10];
	int index = 0;
	final int MAX = 9;

	public synchronized int remove() {
		if (index == 0) {
			try {
				System.out.println("waiting to remove");
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		index--;
		int num = arr[index];
		notify();
		return num;
	}

	public synchronized void add(int num) {
		if (index == MAX) {
			try {
				System.out.println("waiting to add");
				wait();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		notify();
		arr[index] = num;
		index++;
	}
}

class Producer implements Runnable {
	Queue q;
	int num = 0;

	Producer(Queue q) {
		this.q = q;
	}

	@Override
	public void run() {
		for (int i = 0; i < 100; i++) {
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("adding:" + num);
			q.add(num++);
		}
	}
}

class Consumer implements Runnable {
	Queue q;
	int num = 0;

	Consumer(Queue q) {
		this.q = q;
	}

	@Override
	public void run() {
		for (int i = 0; i < 100; i++) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("removing:" + q.remove());
		}
	}
}

Thread Basics: Sleep, Join and Yield

When dealing with Java threads, some basic concepts can come handy in order to manage control among threads.

Sleep: A simple command which will tell the thread to go to sleep for given milliseconds. e.g. Thread.sleep(1000), tells current thread to wait for one second.

Join: This command tells program to wait till my current thread is done (killed). It has two versions. t1.join(), tells current thread to wait till thread T1 is killed before moving to next statement. A second verion takes milliseconds as input, t1.join(2000) tells current thread to wait till t1 is killed or 2 seconds which ever occurs first.

Yield: Yield is used by a thread to voluntarily give up control in favor of other threads. Though this is not guranteed- read more – http://www.javamex.com/tutorials/threads/yield.shtml

An example depicting sleep and join operations.

package com.kamalmeet;

public class TestThread {
	public static void main(String s[]) {
		RunnableTest r = new RunnableTest();
		Thread t1 = new Thread(r);
		t1.setName("t1");
		t1.start();

		Thread t2 = new Thread(r);
		t2.setName("t2");
		t2.start();

		try {
			t1.join();
			t2.join(2000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println("Exiting main thread now");
	}
}

class RunnableTest implements Runnable {
	int count = 0;
	public synchronized void test() {
		try {
			count++;
			Thread.sleep(4000); // This will just introduce some delay
			System.out.println(Thread.currentThread().getName() + " says Hello from Runnable:" + count);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	@Override
	public void run() {
		test();
	}
}

Understanding Thread Priority

Here is a little refresher on concept of threads in Java

http://kamalmeet.com/java/multithreading-runnable-vs-thread/
http://kamalmeet.com/java/handling-multithreading-with-synchronization/
http://kamalmeet.com/java/synchronization-object-vs-class-level/

There can be times where you might want to give priority to one thread over another. Java provides you a way to do that by setting priority of threads.

Thread.setPriority

An Example

package com.kamalmeet;
public class TestThread {
	public static void main(String s[]) {
		RunnableTest r = new RunnableTest();
		for (int i = 0; i < 5; i++) {
			Thread t = new Thread(r);
			t.setName("t:" + i);
			t.setPriority(Thread.NORM_PRIORITY + i);
			t.start();
		}
	}
}

class RunnableTest implements Runnable {
	int count = 0;
	public synchronized void test() {
		try {
			count++;
			System.out.println(Thread.currentThread().getPriority());
			Thread.sleep(1000);
			//This will just introduce some delay 
			System.out.println(Thread.currentThread().getName() + " says Hello from Runnable:" + count);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public void run() {
		test();
	}
}

But there is a catch. Threads are handled at the operating system level, so Java is depending on OS to make sure thread priority is taken care of. Here is a good explanation of thread priority in Java being handled in different OS – http://www.javamex.com/tutorials/threads/priority_what.shtml

Synchronization: Object vs Class level

In my last post I wrote about how synchronization can help making sure that multiple threads do not execute same block of code at the same time. I will take the concept of synchronization a little further here. Going back to previous example.

public void test()
{
 synchronized(this){
  try {
   count++;
   Thread.sleep(1000);
   System.out.println(“Hello from Runnable:”+count);
   } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

synchronized(this) or using synchronized keyword at method level makes sure that no 2 threads execute the block of code at the same time. But there is a bit more to it. To elaborate, I have modified my test method slightly

public void test()
{
 synchronized(this){
  try {
   count++;
   Thread.sleep(1000);
   System.out.println(Thread.currentThread().getName()+" says Hello from  R unnable:"+count); 
   } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

And modified calling main method

public static void main(String s[])
{
Runnable r=new RunnableTest();
Runnable r2=new RunnableTest();
for(int i=0;i<20;i++)
{
Thread t1=new Thread(r);
t1.setName("1");
t1.start();
Thread t2=new Thread(r2);
  t2.start();
  t2.setName("2");
 }
}

Now if I run this we will see something like

2 says Hello from Runnable:1
1 says Hello from Runnable:1
1 says Hello from Runnable:2
2 says Hello from Runnable:2
1 says Hello from Runnable:3
2 says Hello from Runnable:3

And if you will observe the output, it is appearing in pairs. So let us understand what is happening here.

First change, instead of one object of runnable we are using 2 objects, this is to basically highlight the behavior in synchronized(this) block. Second change is that I added name for thread so that we can print it out to understand which thread is actually executing.

Coming back to synchronized(this), look at the “this” keyword. “this” in Java refers to current object. So when we synchronize on this, it means we are synchronizing on current object, so as we are using two different objects, hence we see the behavior that two stream of threads is executing simultaneously in synchronized block. So we can say that with synchronized this, we make sure only one thread of the object is executing in synchronized block at a time.

Now there can be a situation where you want exactly one thread for whole class gets executed at a time, no matter if thread belongs to same object or not. There is a simple solution to it.

public void test()
{
 synchronized(RunnableTest.class){ //Change this with class name
  try {
   count++;
   Thread.sleep(1000);
   System.out.println(Thread.currentThread().getName()+" says Hello from  Runnable:"+count);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

There are other options for synchronization as well, not of much practical use (atleast I have not used them ever), but worth noting.

synchronized("1")

Remember, string is an object. So we are synchronizing based on a constant object here, which will make sure only one thread executes at a time, irrespective of class or objects. So for our example this will be same as synchronized(RunnableTest.class).

A more practical use can when we are trying to synchronizing a group of thread.

String key=Thread.currentThread().getName();
public void test()
{
 synchronized(key){
  try {
   count++;
   Thread.sleep(1000);
   System.out.println(Thread.currentThread().getName()+" says Hello from Runnable:"+count);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

Here I am using key to group threads based on which synchronization is done. The key here is simply name of thread, but it can be more realistic in real scenario.

Handling Multithreading with Synchronization

In last post I wrote about how can we speed up our processing using multiple threads in Java, basically doing parallel processing using threads. Using multithreading definitely helps speed up processing, but it can also result in corrupting data if not used properly. Let’s look at this example

//A simple Runnable class
public class RunnableTest implements Runnable{
 int count=0;
 public void test()
 {
  try {
   count++;
   Thread.sleep(1000); //This will just introduce some delay
   System.out.println("Hello from Runnable:"+count);
   } catch (Exception e) {
    e.printStackTrace();
  }
 }

@Override
public void run() {
test();
}
}

//and call it from somewhere like
Runnable r=new RunnableTest();
for(int i=0;i<20;i++)
{
Thread t1=new Thread(r);
t1.start();
}

Output:
Hello from Runnable:20
Hello from Runnable:20
Hello from Runnable:20
Hello from Runnable:20
Hello from Runnable:20
..
..

We are calling run method 20 times which in turn calls test. We have deliberately introduced some delay in test to give it a feel of real method which is taking time to execute some statements. The output depicts that instead of count being printed as 1, 2, 3 and so on, it is 20 for all the calls. Which means that all the threads are trying to execute same block of code simultaneously. Though parallel processing of code is desirable behavior from multithreading, it can cause issues in scenarios mentioned above.

Let’s say in actual code we are trying to write data into a database. Instead of count we have ID. So first thread creates and ID as X. Then it does some processing and creates an insert statement. While it is doing the processing, a second thread starts executing the same code and creates another ID Y which overrides the ID X (same as count is being modified). So when our thread 1 reaches to the code where it is inserting the data, it will try inserting ID as Y (as X is overridden by Y). Then thread 2 comes and again try to insert ID Y, causing an error.

So in real world we can have blocks of code (or complete methods) which we want to make sure does not get executed by multiple threads at the same time. Fortunately Java provides a simple way to do this. The keyword is synchronized.

public void test()
{
 synchronized(this){
  try {
count++;
Thread.sleep(1000);
System.out.println("Hello from Runnable:"+count);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Now if we run the same code, we will get

Hello from Runnable:1
Hello from Runnable:2
Hello from Runnable:3
Hello from Runnable:4
Hello from Runnable:5
Hello from Runnable:6

Synchronized block makes sure that only one thread is executing blocked code at a time. Another way to synchronize is define method itself as synchronized.

public synchronized void test()

This will make sure only one thread enters this method at a time.

Multithreading: Runnable vs Thread

What is multithreading? First of all let us understand what is thread? A thread is simply flow of control in a program/ process.

Now Let’s understand what is multithreading? Let’s take an example, there is a code which reads data from a file and uploads it to a database. Let’s say there are 10 such files and each file takes 1 hr to finish data upload. In a single threaded environment, it will simply translate to 10 hrs job as our code will read and upload files one by one.  Whereas when we move to multithreading, that is multiple threads are executing at the same time asynchronously, it will result in parallel processing of these files and the total time taken might be close to time taken by one file only.

Before getting into details lets understand how to implement multithreading in Java? There are 2 ways to achieve this.

//1. Extend Thread class
public class ThreadTest extends Thread{
 public void test()
 {
  System.out.println("Hello from Thread");
 }
 public void run()
 {
  test();
 }
}
 //2. Implement Runnable interface 
public class RunnableTest implements Runnable{
 public void test()
 {
  System.out.println("Hello from Runnable");
 }
@Override
 public void run() {
  test();
 }
}

//And somewhere in the main method 
public static void main(String s[])
{
 Thread t1=new ThreadTest();
 t1.start();
 Thread t2=new Thread(new RunnableTest());
 t2.start();
}

As you can see the important method while using threads is “run”. This method gets called automatically by JVM whenever a thread starts.

Note that we do not call the run method directly, as that will be like calling a normal method and will wait for control to come back to calling class. Whereas using start will make sure the control flow continues without waiting for method call return. To understand what I am referring here, lets change the main method slightly

for(int i=0;i<20;i++)
{
 Thread t1=new ThreadTest();
 t1.start();
 Thread t2=new Thread(new RunnableTest());
 t2.start();
}

We will something like (no order)

Hello from Thread
Hello from Runnable
Hello from Thread
Hello from Runnable
Hello from Runnable
Hello from Thread
Hello from Thread
Hello from Thread
Hello from Runnable
Hello from Runnable

Now let’s change it to run

for(int i=0;i<20;i++)
{
 Thread t1=new ThreadTest();
 t1.run();
 Thread t2=new Thread(new RunnableTest());
 t2.run();
}

Output

Hello from Thread
Hello from Runnable
Hello from Thread
Hello from Runnable
Hello from Thread
Hello from Runnable

So not actually using multithreading here.

Coming back to original example. Now you can see if we write the code for uploading the data from file to run method, and execute it in multithreaded fashion, we will be able to upload files simultaneously.

We have seen 2 ways to create threads in Java, one by extending thread class and other by implementing runnable. Which is better?

Usually we go for implementing runnable. When implementing runnable we can only override run method, and that is the only thing which we need in most of the cases. Whereas when we extent Thread, we can override other methods like start, sleep, stop etc, which is usually not required. Additionally, extending Thread has the obvious implementation that now our class cannot extend any other class (as Java allows only one class to be extended).