Tag Archives: Multithreading

ExecutorService with Callable and Future

In last post I talked about ExecutorService basics.

Let us take it forward to understand usage of Future keyword. So far we have seen Executor service usage to start executing a thread in controlled way. What if we need to get a returned object from the thread being executed. Callable interface comes to help. Instead of implmenting Runnable, we can implement Callable to create thread. In this case we will override call() method method instead of run().

From Javadocs: The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

The result of callable class is retrived in Future instance. Instead of executor.execute, we will call submit method which forces ExecutorService to return the Future instance. execute can also be used with Runnable instance, but in that case it will return null on completion as run() method cannot return an object.

Here is an example

public class CallableTest {
int num=1;
public static void main(String s[])
{
ExecutorService executor = Executors.newFixedThreadPool(5);
//for(int i=0;i<10;i++) { Future ft=executor.submit(new RunTest());
Future ft1=executor.submit(new RunTest2());
System.out.println(“step 1”);
try {
CallableTest ct=ft.get();
System.out.println(“ct val:”+ct.num);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(“step 2”);
try {
System.out.println(ft1.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(“step 3”);

}
executor.shutdown();
}
}

class RunTest implements Callable
{
@Override
public CallableTest call() throws Exception {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
CallableTest ct=new CallableTest();
ct.num=10;
// TODO Auto-generated method stub
return ct;
}

}

class RunTest2 implements Runnable
{
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

Further Reading

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Callable.html

ExecutorService

Here is refresher of multithreading in java.

Java (since 1.5) has come up with more controlled and cleaner way to handle threads, i.e. ExecutorService.

ExecutorService is a way in which we can create thread pools and execute threads in controlled manner, i.e. can define number of threads to be allowed at the same time.

ExecutorService ex = Executors.newSingleThreadExecutor();
ExecutorService ex = Executors.newFixedThreadPool(5);
ExecutorService ex = Executors.newScheduledThreadPool(10);

In first case we are creating a singlethreaded pool, whereas in second case we are defining a pool of fixed length. In third case we are creating a pool of threads which can perform activities at a defined time.

Read more
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html
http://tutorials.jenkov.com/java-util-concurrent/executorservice.html

A simple code example

public class test {
public static void main(String s[])
{
ExecutorService executor = Executors.newFixedThreadPool(5);
for(int i=0;i<10;i++) { executor.execute(new RunTest()); executor.execute(new RunTest2()); } executor.shutdown(); } } class RunTest implements Runnable { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } // TODO Auto-generated method stub System.out.println("run"+Thread.currentThread().getName()); } } class RunTest2 implements Runnable { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } // TODO Auto-generated method stub System.out.println("run2"+Thread.currentThread().getName()); } }

Play around with thread pool size to understand the usage.

Understanding wait, notify and notifyAll using producer-consumer problem

In last post I mentioned about some of the Java Thread’s utility methods. Here I will try to explain wait and notify concept in Threads using classic producer – consumer problem.

Wait: It tells current thread to wait till some notifies it to resume execution, myThread.wait(). There is another version of wait which takes milliseconds as input, wait(2000), it basically tells the thread to wait till someone notifies or a max of 2 seconds.

Notify: Notify statement wakes up one of the threads (randomly chosen in case multiple threads are waiting) waiting on object, to resume execution.

Notifyall: The only difference between notify and notifyAll is that later one notifies all the threads waiting on the object.

Producer- Consumer problem: This is a classic problem where producer and consumer are 2 independent entities. As the name suggests, producer is producing something which is consumed by consumer. The challenge is that one of the entities can be faster than other, for example consumer is consuming faster than producer is producing, in this case the error condition will occur when consumer has nothing to consume (as producer is slow). Vice-versa can be that producer is producing too fast for consumer to consume, so that the storage area is overflowing (as consumer is not able to consume at same speed).

To solve the problem, wait and notify operations come to rescue. So if one entity reaches error state, say consumer has run out of objects to consume, it will wait till a new object is available.

Here is an example of case two where producer is producing faster, so evetually will need to wait at a stage where queue is full till consumer has consumed objects.

public class ProducerConsumer {

public static void main(String s[])
{
Queue q=new Queue();
Producer p=new Producer(q);
Consumer c=new Consumer(q);
Thread t1=new Thread(p);
Thread t2=new Thread(c);
t1.start();
t2.start();
System.out.println(“Exiting main thread now”);
}

}
class Queue{

int arr[]=new int[10];
int index=0;
final int MAX=9;

public synchronized int remove()
{
if(index==0)
{
try {
System.out.println(“waiting to remove”);
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
one }
}

index–;
int num=arr[index];

notify();
return num;
}

public synchronized void add(int num)
{
if(index==MAX)
{
try {
System.out.println(“waiting to add”);
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
notify();

arr[index]=num;
index++;

}

}

class Producer implements Runnable{
Queue q;
int num=0;
Producer(Queue q)
{
this.q=q;
}

@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<100;i++)
{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(“adding:”+num);
q.add(num++);
}

}

}

class Consumer implements Runnable{
Queue q;
int num=0;
Consumer(Queue q)
{
this.q=q;
}

@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<100;i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(“removing:”+ q.remove());
}

}

}

The second case (consumer is faster than producer) can be simulated by reversing the sleep time.

Thread Basics: Sleep, Join and Yield

When dealing with Java threads, some basic concepts can come handy in order to manage control among threads.

Sleep: A simple command which will tell the thread to go to sleep for given milliseconds. e.g. Thread.sleep(1000), tells current thread to wait for one second.

Join: This command tells program to wait till my current thread is done (killed). It has two versions. t1.join(), tells current thread to wait till thread T1 is killed before moving to next statement. A second verion takes milliseconds as input, t1.join(2000) tells current thread to wait till t1 is killed or 2 seconds which ever occurs first.

Yield: Yield is used by a thread to voluntarily give up control in favor of other threads. Though this is not guranteed- read more – http://www.javamex.com/tutorials/threads/yield.shtml

An example depicting sleep and join operations.

public class thread {

public static void main(String s[])
{
RunnableTest r=new RunnableTest();

Thread t1=new Thread(r);
t1.setName("t1");
t1.start();

Thread t2=new Thread(r);
t2.setName("t2");
t2.start();

try {
t1.join();
t2.join(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println("EXiting main thread now");
}

}

class RunnableTest implements Runnable{
int count=0;
public synchronized void test()
{
try {
count++;
Thread.sleep(4000); //This will just introduce some delay
System.out.println(Thread.currentThread().getName()+" says Hello from Runnable:"+count);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void run() {
test();
}
}

Understanding Thread Priority

Here is a little refresher on concept of threads in Java

http://kamalmeet.com/java/multithreading-runnable-vs-thread/
http://kamalmeet.com/java/handling-multithreading-with-synchronization/
http://kamalmeet.com/java/synchronization-object-vs-class-level/

There can be times where you might want to give priority to one thread over another. Java provides you a way to do that by setting priority of threads.

Thread.setPriority

An Example

package test;

public class thread {

public static void main(String s[])
{
RunnableTest r=new RunnableTest();
for(int i=0;i<5;i++) { Thread t=new Thread(r); t.setName("t:"+i); t.setPriority(Thread.NORM_PRIORITY+i); t.start(); } } } class RunnableTest implements Runnable{ int count=0; public synchronized void test() { try { count++; System.out.println(Thread.currentThread().getPriority()); Thread.sleep(1000); //This will just introduce some delay System.out.println(Thread.currentThread().getName()+" says Hello from Runnable:"+count); } catch (Exception e) { e.printStackTrace(); } } @Override public void run() { test(); } }

But there is a catch. Threads are handled at operating system level, so Java is depending on OS to make sure thread priority is taken care of. Here is a good explanation of thread priority in Java being handled in different OS - http://www.javamex.com/tutorials/threads/priority_what.shtml

Synchronization: Object vs Class level

In my last post I wrote about how synchronization can help making sure that multiple threads do not execute same block of code at the same time. I will take the concept of synchronization a little further here. Going back to previous example.

public void test()
{
synchronized(this){
try {

count++;
Thread.sleep(1000);
System.out.println(“Hello from Runnable:”+count);
} catch (Exception e) {
e.printStackTrace();
}
}
}

synchronized(this) or using synchronized keyword at method level makes sure that no 2 threads execute the block of code at the same time. But there is a bit more to it. To elaborate, I have modified my test method slightly

public void test()
{
synchronized(this){
try {
count++;
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+" says Hello from Runnable:"+count);
} catch (Exception e) {
e.printStackTrace();
}
}
}

And modified calling main method

public static void main(String s[])
{

Runnable r=new RunnableTest();
Runnable r2=new RunnableTest();

for(int i=0;i<20;i++)
{
Thread t1=new Thread(r);
t1.setName("1");
t1.start();

Thread t2=new Thread(r2);
t2.start();
t2.setName("2");
}
}

Now if I run this we will see something like

2 says Hello from Runnable:1
1 says Hello from Runnable:1
1 says Hello from Runnable:2
2 says Hello from Runnable:2
1 says Hello from Runnable:3
2 says Hello from Runnable:3

And if you will observe the output, it is appearing in pairs. So let us understand what is happening here.

First change, instead of one object of runnable we are using 2 objects, this is to basically highlight the behavior in synchronized(this) block. Second change is that I added name for thread so that we can print it out to understand which thread is actually executing.

Coming back to synchronized(this), look at the “this” keyword. “this” in Java refers to current object. So when we synchronize on this, it means we are synchronizing on current object, so as we are using two different objects, hence we see the behavior that two stream of threads is executing simultaneously in synchronized block. So we can say that with synchronized this, we make sure only one thread of the object is executing in synchronized block at a time.

Now there can be a situation where you want exactly one thread for whole class gets executed at a time, no matter if thread belongs to same object or not. There is a simple solution to it.

public void test()
{
synchronized(RunnableTest.class){ //Change this with class name
try {
count++;
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+" says Hello from Runnable:"+count);
} catch (Exception e) {
e.printStackTrace();
}
}
}

There are other options for synchronization as well, not of much practical use (atleast I have not used them ever), but worth noting.

synchronized(“1”)

Remember, string is an object. So we are synchronizing based on a constant object here, which will make sure only one thread executes at a time, irrespective of class or objects. So for our example this will be same as synchronized(RunnableTest.class).

A more practical use can when we are trying to synchronizing a group of thread.

String key=Thread.currentThread().getName();
public void test()
{
synchronized(key){
try {
count++;
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+" says Hello from Runnable:"+count);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Here I am using key to group threads based on which synchronization is done. The key here is simply name of thread, but it can be more realistic in real scenario.

Handling Multithreading with Synchronization

In last post I wrote about how can we speed up our processing using multiple threads in Java, basically doing parallel processing using threads. Using multithreading definitely helps speed up processing, but it can also result in corrupting data if not used properly. Let’s look at this example

//A simple Runnable class
public class RunnableTest implements Runnable{
int count=0;
public void test()
{
try {
count++;
Thread.sleep(1000); //This will just introduce some delay
System.out.println("Hello from Runnable:"+count);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void run() {
test();
}
}
and call it from somewhere like

Runnable r=new RunnableTest();

for(int i=0;i<20;i++)
{
Thread t1=new Thread(r);
t1.start();
}

Output:
Hello from Runnable:20
Hello from Runnable:20
Hello from Runnable:20
Hello from Runnable:20
Hello from Runnable:20
..
..

We are calling run method 20 times which in turn calls test. We have deliberately introduced some delay in test to give it a feel of real method which is taking time to execute some statements. The output depicts that instead of count being printed as 1, 2, 3 and so on, it is 20 for all the calls. Which means that all the threads are trying to execute same block of code simultaneously. Though parallel processing of code is desirable behavior from multithreading, it can cause issues in scenarios mentioned above.

Let’s say in actual code we are trying to write data into a database. Instead of count we have ID. So first thread creates and ID as X. Then it does some processing and creates an insert statement. While it is doing the processing, a second thread starts executing the same code and creates another ID Y which overrides the ID X (same as count is being modified). So when our thread 1 reaches to the code where it is inserting the data, it will try inserting ID as Y (as X is overridden by Y). Then thread 2 comes and again try to insert ID Y, causing an error.

So in real world we can have blocks of code (or complete methods) which we want to make sure does not get executed by multiple threads at the same time. Fortunately Java provides a simple way to do this. The keyword is synchronized.

public void test()
{
synchronized(this){
try {

count++;
Thread.sleep(1000);
System.out.println(“Hello from Runnable:”+count);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Now if we run the same code, we will get

Hello from Runnable:1
Hello from Runnable:2
Hello from Runnable:3
Hello from Runnable:4
Hello from Runnable:5
Hello from Runnable:6

Synchronized block makes sure that only one thread is executing blocked code at a time. Another way to synchronize is define method itself as synchronized.

public synchronized void test()

This will make sure only one thread enters this method at a time.

Multithreading: Runnable vs Thread

What is multithreading? First of all let us understand what is thread? A thread is simply flow of control in a program/ process.

Now Let’s understand what is multithreading? Let’s take an example, there is a code which reads data from a file and uploads it to a database. Let’s say there are 10 such files and each file takes 1 hr to finish data upload. In a single threaded environment, it will simply translate to 10 hrs job as our code will read and upload files one by one.  Whereas when we move to multithreading, that is multiple threads are executing at the same time asynchronously, it will result in parallel processing of these files and the total time taken might be close to time taken by one file only.

Before getting into details lets understand how to implement multithreading in Java? There are 2 ways to achieve this.

1. Extend Thread class
public class ThreadTest extends Thread{

public void test()
{
System.out.println("Hello from Thread");
}
public void run()
{
test();
}
}

2. Implement Runnable interface

public class RunnableTest implements Runnable{
public void test()
{
System.out.println("Hello from Runnable");
}

@Override
public void run() {
test();
}
}

And somewhere in the main method

public static void main(String s[])
{
Thread t1=new ThreadTest();
t1.start();
Thread t2=new Thread(new RunnableTest());
t2.start();
}

As you can see the important method while using threads is “run”. This method gets called automatically by JVM whenever a thread starts.

Note that we do not call the run method directly, as that will be like calling a normal method and will wait for control to come back to calling class. Whereas using start will make sure the control flow continues without waiting for method call return. To understand what I am referring here, lets change the main method slightly

for(int i=0;i<20;i++)
{
Thread t1=new ThreadTest();
t1.start();
Thread t2=new Thread(new RunnableTest());
t2.start();
}

We will something like (no order)

Hello from Thread
Hello from Runnable
Hello from Thread
Hello from Runnable
Hello from Runnable
Hello from Thread
Hello from Thread
Hello from Thread
Hello from Runnable
Hello from Runnable

Now let’s change it to run

for(int i=0;i<20;i++)
{
Thread t1=new ThreadTest();
t1.run();
Thread t2=new Thread(new RunnableTest());
t2.run();
}

Output

Hello from Thread
Hello from Runnable
Hello from Thread
Hello from Runnable
Hello from Thread
Hello from Runnable

So not actually using multithreading here.

Coming back to original example. Now you can see if we write the code for uploading the data from file to run method, and execute it in multithreaded fashion, we will be able to upload files simultaneously.

We have seen 2 ways to create threads in Java, one by extending thread class and other by implementing runnable. Which is better?

Usually we go for implementing runnable. When implementing runnable we can only override run method, and that is the only thing which we need in most of the cases. Whereas when we extent Thread, we can override other methods like start, sleep, stop etc, which is usually not required. Additionally, extending Thread has the obvious implementation that now our class cannot extend any other class (as Java allows only one class to be extended).