Communicate between threads using a QueueTag(s): Thread


Using BlockingQueue class
A new class in Java 5 can be used to communicate data to many Consumer classes from a Producer class.

The java.util.concurrent.BlockingQueue is designed to work in a multi-threaded world.

A Producer opens a file and puts the data into BlockingQueue.

mport java.io.*;
import java.util.concurrent.*;
 
public class PrepareProduction implements Runnable{
  private final BlockingQueue<String> queue;
 
  PrepareProduction(BlockingQueue<String> q) { queue = q; }
 
  public void run() {
    String thisLine;
    System.out.println("Start PrepareProduction");
    try {
       FileInputStream fin =  new FileInputStream("d:/input_data.dat");
       BufferedReader input = new BufferedReader
           (new InputStreamReader(fin));
       while ((thisLine = input.readLine()) != null) {
           queue.put(thisLine);
       }
       fin.close();
       input.close();
       // special marker for the consumer threads
       // to mark the EOF
       queue.put("*");
       queue.put("*");
       queue.put("*");
       queue.put("*");
       queue.put("*");
       queue.put("*");
       queue.put("*");
       queue.put("*");
       queue.put("*");
    }
    catch (Exception e) {
       e.printStackTrace();
    }
  }
}

A Consumer extract a value from the Queue and execute an operation. If there is no data in the Queue, the BlockingQueue will wait. The end-of-data is marked by the presence of a special marker (the "*" in this example).

   
import java.util.concurrent.BlockingQueue;
 
public class DoProduction implements Runnable {
  private final BlockingQueue<String> queue;
 
  DoProduction(BlockingQueue<String> q) { queue = q; }
 
  public void run() {
    try {
       System.out.println
          ("Start " + Thread.currentThread().getName());
       
       String value = queue.take();
       while (!value.equals("*")) {
         //System.out.println
         //  (Thread.currentThread().getName()+": " + value );
         /*
              do something with value
         */     
         value = queue.take();
       }
    }
    catch (Exception e) {
       System.out.println
           (Thread.currentThread().getName() + " " + e.getMessage());
    }
  }
}

A test class

 
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
 
public class Test {
  public static void main(String[] args) throws Exception {
        
    BlockingQueue<String> q =
        new LinkedBlockingQueue<String>();

    Thread p1 = new Thread(new PrepareProduction(q));
    Thread c1 = new Thread(new DoProduction(q));
    Thread c2 = new Thread(new DoProduction(q));
    Thread c3 = new Thread(new DoProduction(q));
    Thread c4 = new Thread(new DoProduction(q));
    Thread c5 = new Thread(new DoProduction(q));
    Thread c6 = new Thread(new DoProduction(q));
    Thread c7 = new Thread(new DoProduction(q));
    Thread c8 = new Thread(new DoProduction(q));
    Thread c9 = new Thread(new DoProduction(q));
       
    p1.start();
    c1.start();
    c2.start();
    c3.start();
    c4.start();
    c5.start();
    c6.start();
    c7.start();
    c8.start();
    c9.start();
 
    p1.join();
    c1.join();
    c2.join();
    c3.join();
    c4.join();
    c5.join();
    c6.join();
    c7.join();
    c8.join();
    c9.join();
       
    System.out.println("Done.");
  }
}
Using a LinkedList
If you need don't need all the benefits of BlockingQueue then a simple LinkedList can be more appropriate, especially if you need to process large amount of data in a batch process. Since you are in a multi-threaded world, it's safer to use the synchronized version of the LinkedList implementation. Keep in mind that theses examples are minimal and need more error checking!

First the Producer

 
import java.io.*;
import java.util.*;
 
public class PrepareProduction implements Runnable{
   private final List<String> queue;

   PrepareProduction(List<String> q) { queue = q; }

   public void run() {
      String thisLine;
      System.out.println("Start PrepareProduction");
      try {
         FileInputStream fin =  new FileInputStream("d:/input_data.dat");
         BufferedReader input = new BufferedReader
             (new InputStreamReader(fin));
         while ((thisLine = input.readLine()) != null) {
            queue.add(thisLine);
         }
         fin.close();
         input.close();
         // special marker for EOF
         queue.add("*");
         queue.add("*");
         queue.add("*");
         queue.add("*");
         queue.add("*");
         queue.add("*");
         queue.add("*");
         queue.add("*");
         queue.add("*");
     }
     catch (Exception e) {
         e.printStackTrace();
     }
   }
}

The Consumer

 
import java.util.*;
public class DoProduction implements Runnable {
   private final List<String> queue;
 
   DoProduction(List<String> q) { queue = q; }
 
   public void run() {
      try {
         System.out.println("Start " 
             + Thread.currentThread().getName());
       
         // you may need to wait for the first data available 
         // with a BlockingQueue it's done for you. 
         String value = queue.remove(0);
         while (!value.equals("*")) {
            System.out.println(Thread.currentThread().getName()
               +": " + value );
            /*
                do something with value
            */    
            value = queue.remove(0);
        }
      }
      catch (Exception e) {
         System.out.println(Thread.currentThread().getName() 
             + " " + e.getMessage());
      }
   }
}
The test
 
import java.util.*;
 
public class Test {
   public static void main(String[] args) throws Exception {
      
      List q = Collections.synchronizedList
         (new LinkedList<String>());
 
      Thread p1 = new Thread(new PrepareProduction(q));
      Thread c1 = new Thread(new DoProduction(q));
      Thread c2 = new Thread(new DoProduction(q));
      Thread c3 = new Thread(new DoProduction(q));
      Thread c4 = new Thread(new DoProduction(q));
      Thread c5 = new Thread(new DoProduction(q));
      Thread c6 = new Thread(new DoProduction(q));
      Thread c7 = new Thread(new DoProduction(q));
      Thread c8 = new Thread(new DoProduction(q));
      Thread c9 = new Thread(new DoProduction(q));
       
      p1.start();
      c1.start();
      c2.start();
      c3.start();
      c4.start();
      c5.start();
      c6.start();
      c7.start();
      c8.start();
      c9.start();
 
      p1.join();
      c1.join();
      c2.join();
      c3.join();
      c4.join();
      c5.join();
      c6.join();
      c7.join();
      c8.join();
      c9.join();
      System.out.println("Done.");
   }
}

blog comments powered by Disqus