ExecutorService with Callable and Future

In last post I talked about ExecutorService basics.

Let us take it forward to understand usage of Future keyword. So far we have seen Executor service usage to start executing a thread in controlled way. What if we need to get a returned object from the thread being executed. Callable interface comes to help. Instead of implmenting Runnable, we can implement Callable to create thread. In this case we will override call() method method instead of run().

From Javadocs: The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

The result of callable class is retrived in Future instance. Instead of executor.execute, we will call submit method which forces ExecutorService to return the Future instance. execute can also be used with Runnable instance, but in that case it will return null on completion as run() method cannot return an object.

Here is an example

public class CallableTest {
int num=1;
public static void main(String s[])
{
ExecutorService executor = Executors.newFixedThreadPool(5);
//for(int i=0;i<10;i++)
{
Future ft=executor.submit(new RunTest());
Future ft1=executor.submit(new RunTest2());
System.out.println(“step 1″);
try {
CallableTest ct=ft.get();
System.out.println(“ct val:”+ct.num);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(“step 2″);
try {
System.out.println(ft1.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(“step 3″);

}
executor.shutdown();
}
}

class RunTest implements Callable
{
@Override
public CallableTest call() throws Exception {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
CallableTest ct=new CallableTest();
ct.num=10;
// TODO Auto-generated method stub
return ct;
}

}

class RunTest2 implements Runnable
{
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

Further Reading

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Callable.html

ExecutorService

Here is refresher of multithreading in java.

Java (since 1.5) has come up with more controlled and cleaner way to handle threads, i.e. ExecutorService.

ExecutorService is a way in which we can create thread pools and execute threads in controlled manner, i.e. can define number of threads to be allowed at the same time.

ExecutorService ex = Executors.newSingleThreadExecutor();
ExecutorService ex = Executors.newFixedThreadPool(5);
ExecutorService ex = Executors.newScheduledThreadPool(10);

In first case we are creating a singlethreaded pool, whereas in second case we are defining a pool of fixed length. In third case we are creating a pool of threads which can perform activities at a defined time.

Read more
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html
http://tutorials.jenkov.com/java-util-concurrent/executorservice.html

A simple code example

public class test {
public static void main(String s[])
{
ExecutorService executor = Executors.newFixedThreadPool(5);
for(int i=0;i<10;i++)
{
executor.execute(new RunTest());
executor.execute(new RunTest2());
}
executor.shutdown();
}
}

class RunTest implements Runnable
{
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// TODO Auto-generated method stub
System.out.println("run"+Thread.currentThread().getName());
}

}

class RunTest2 implements Runnable
{
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// TODO Auto-generated method stub
System.out.println("run2"+Thread.currentThread().getName());
}

}

Play around with thread pool size to understand the usage.

Kill a process in linux

There can be times that a process becomes unresposive and you want to kill it forcefully.

In linux, we will need to first find out the process id

ps command- it gives a list of all processes running.

ps aux- gives all process details with, a = show processes for all users, u = display the process’s user/owner, x = also show processes not attached to a terminal

to fetch only required process- use grep, ps aux| grep ‘name’

Then kill it forcefully- kill -9 PID

Putting it all together.

Lets say I want to kill eclipse

kamalmeet@System:~$ ps aux | grep eclipse

1101 6070 1.3 1.8 46584 5500 ? Sl 12:34 1:38 /usr/bin/java -Dosgi.requiredJavaVersion=1.6 -XX:MaxPermSize=256m -Xms40m -Xmx512m -jar /home/kamalmeet/eclipse//plugins/org.eclipse.equinox.launcher_1.3.0.v20140415-2008.jar

kamalmeet@System:~$ kill -9 6070

Understanding wait, notify and notifyAll using producer-consumer problem

In last post I mentioned about some of the Java Thread’s utility methods. Here I will try to explain wait and notify concept in Threads using classic producer – consumer problem.

Wait: It tells current thread to wait till some notifies it to resume execution, myThread.wait(). There is another version of wait which takes milliseconds as input, wait(2000), it basically tells the thread to wait till someone notifies or a max of 2 seconds.

Notify: Notify statement wakes up one of the threads (randomly chosen in case multiple threads are waiting) waiting on object, to resume execution.

Notifyall: The only difference between notify and notifyAll is that later one notifies all the threads waiting on the object.

Producer- Consumer problem: This is a classic problem where producer and consumer are 2 independent entities. As the name suggests, producer is producing something which is consumed by consumer. The challenge is that one of the entities can be faster than other, for example consumer is consuming faster than producer is producing, in this case the error condition will occur when consumer has nothing to consume (as producer is slow). Vice-versa can be that producer is producing too fast for consumer to consume, so that the storage area is overflowing (as consumer is not able to consume at same speed).

To solve the problem, wait and notify operations come to rescue. So if one entity reaches error state, say consumer has run out of objects to consume, it will wait till a new object is available.

Here is an example of case two where producer is producing faster, so evetually will need to wait at a stage where queue is full till consumer has consumed objects.

public class ProducerConsumer {

public static void main(String s[])
{
Queue q=new Queue();
Producer p=new Producer(q);
Consumer c=new Consumer(q);
Thread t1=new Thread(p);
Thread t2=new Thread(c);
t1.start();
t2.start();
System.out.println(“Exiting main thread now”);
}

}
class Queue{

int arr[]=new int[10];
int index=0;
final int MAX=9;

public synchronized int remove()
{
if(index==0)
{
try {
System.out.println(“waiting to remove”);
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
one }
}

index–;
int num=arr[index];

notify();
return num;
}

public synchronized void add(int num)
{
if(index==MAX)
{
try {
System.out.println(“waiting to add”);
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
notify();

arr[index]=num;
index++;

}

}

class Producer implements Runnable{
Queue q;
int num=0;
Producer(Queue q)
{
this.q=q;
}

@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<100;i++)
{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(“adding:”+num);
q.add(num++);
}

}

}

class Consumer implements Runnable{
Queue q;
int num=0;
Consumer(Queue q)
{
this.q=q;
}

@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<100;i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(“removing:”+ q.remove());
}

}

}

The second case (consumer is faster than producer) can be simulated by reversing the sleep time.

Thread Basics: Sleep, Join and Yield

When dealing with Java threads, some basic concepts can come handy in order to manage control among threads.

Sleep: A simple command which will tell the thread to go to sleep for given milliseconds. e.g. Thread.sleep(1000), tells current thread to wait for one second.

Join: This command tells program to wait till my current thread is done (killed). It has two versions. t1.join(), tells current thread to wait till thread T1 is killed before moving to next statement. A second verion takes milliseconds as input, t1.join(2000) tells current thread to wait till t1 is killed or 2 seconds which ever occurs first.

Yield: Yield is used by a thread to voluntarily give up control in favor of other threads. Though this is not guranteed- read more – http://www.javamex.com/tutorials/threads/yield.shtml

An example depicting sleep and join operations.

public class thread {

public static void main(String s[])
{
RunnableTest r=new RunnableTest();

Thread t1=new Thread(r);
t1.setName("t1");
t1.start();

Thread t2=new Thread(r);
t2.setName("t2");
t2.start();

try {
t1.join();
t2.join(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println("EXiting main thread now");
}

}

class RunnableTest implements Runnable{
int count=0;
public synchronized void test()
{
try {
count++;
Thread.sleep(4000); //This will just introduce some delay
System.out.println(Thread.currentThread().getName()+" says Hello from Runnable:"+count);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void run() {
test();
}
}

Understanding Thread Priority

Here is a little refresher on concept of threads in Java

http://kamalmeet.com/java/multithreading-runnable-vs-thread/
http://kamalmeet.com/java/handling-multithreading-with-synchronization/
http://kamalmeet.com/java/synchronization-object-vs-class-level/

There can be times where you might want to give priority to one thread over another. Java provides you a way to do that by setting priority of threads.

Thread.setPriority

An Example

package test;

public class thread {

public static void main(String s[])
{
RunnableTest r=new RunnableTest();
for(int i=0;i<5;i++)
{
Thread t=new Thread(r);
t.setName("t:"+i);
t.setPriority(Thread.NORM_PRIORITY+i);
t.start();
}
}

}

class RunnableTest implements Runnable{
int count=0;
public synchronized void test()
{
try {
count++;
System.out.println(Thread.currentThread().getPriority());
Thread.sleep(1000); //This will just introduce some delay
System.out.println(Thread.currentThread().getName()+" says Hello from Runnable:"+count);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void run() {
test();
}
}

But there is a catch. Threads are handled at operating system level, so Java is depending on OS to make sure thread priority is taken care of. Here is a good explanation of thread priority in Java being handled in different OS - http://www.javamex.com/tutorials/threads/priority_what.shtml

Find and delete blocking sessions- Oracle

Following query would give you all the session running in your oracle database with query details

select sesion.sid,
sesion.serial#,
sesion.username,
optimizer_mode,
hash_value,
address,
cpu_time,
elapsed_time,
sql_text
from v$sqlarea sqlarea, v$session sesion
where sesion.sql_hash_value = sqlarea.hash_value
and sesion.sql_address = sqlarea.address
and sesion.username is not null;

Now you might want to kill an unwanted session

ALTER SYSTEM KILL SESSION ‘sid,serial#’ IMMEDIATE;

Double Check Locking- pattern

Double check locking pattern is used for double checking the condition before acquiring a lock on an object. This helps reducing unneccary locks.

To understand lets take a simple example of creating a singleton object.

public static MySingleton getInstance()
{
if(null==mysingleton)
mysingleton=new MySingleton();
return mysingleton;
}

The challenge with above code is that it is not threadsafe. So we might end up creating multiple objects of MySingleton in case of multithreaded environment. So we will add synchronization.

public static synchronized MySingleton getInstance()
{
if(null==mysingleton)
mysingleton=new MySingleton();
return mysingleton;
}

Though we have saved our code from multi threading, we have added additional load of synchronization, i.e. each thread will have to wait for previous thread to finish processing of this method before entering into it. The actual error case will occur only for first time, i.e. when mysingleton object is null, we would like to stop other threads to execute the block. So to optimiza the code, we will use a technique called double locking.

public static synchronized MySingleton getInstance()
{
if(null==mysingleton)
{
synchronized(MySingleton.class) {
if(null==mysingleton)
mysingleton=new MySingleton();
}
}
return mysingleton;
}

You can see we are applying synchronized block only when we are sure that we are in a case we want to stop multiple threads from going ahead, rather than blocking threads all the time.

http://en.wikipedia.org/wiki/Double-checked_locking