The java.util.concurrent.BlockingQueue is designed to work in a multi-threaded world.
A Producer opens a file and puts the data into BlockingQueue.
mport java.io.*;
import java.util.concurrent.*;
public class PrepareProduction implements Runnable{
private final BlockingQueue<String> queue;
PrepareProduction(BlockingQueue<String> q) { queue = q; }
public void run() {
String thisLine;
System.out.println("Start PrepareProduction");
try {
FileInputStream fin = new FileInputStream("d:/input_data.dat");
BufferedReader input = new BufferedReader
(new InputStreamReader(fin));
while ((thisLine = input.readLine()) != null) {
queue.put(thisLine);
}
fin.close();
input.close();
// special marker for the consumer threads
// to mark the EOF
queue.put("*");
queue.put("*");
queue.put("*");
queue.put("*");
queue.put("*");
queue.put("*");
queue.put("*");
queue.put("*");
queue.put("*");
}
catch (Exception e) {
e.printStackTrace();
}
}
}
A Consumer extract a value from the Queue and execute an operation. If there is no data in the Queue, the BlockingQueue will wait. The end-of-data is marked by the presence of a special marker (the "*" in this example).
import java.util.concurrent.BlockingQueue;
public class DoProduction implements Runnable {
private final BlockingQueue<String> queue;
DoProduction(BlockingQueue<String> q) { queue = q; }
public void run() {
try {
System.out.println
("Start " + Thread.currentThread().getName());
String value = queue.take();
while (!value.equals("*")) {
//System.out.println
// (Thread.currentThread().getName()+": " + value );
/*
do something with value
*/
value = queue.take();
}
}
catch (Exception e) {
System.out.println
(Thread.currentThread().getName() + " " + e.getMessage());
}
}
}
A test class
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Test {
public static void main(String[] args) throws Exception {
BlockingQueue<String> q =
new LinkedBlockingQueue<String>();
Thread p1 = new Thread(new PrepareProduction(q));
Thread c1 = new Thread(new DoProduction(q));
Thread c2 = new Thread(new DoProduction(q));
Thread c3 = new Thread(new DoProduction(q));
Thread c4 = new Thread(new DoProduction(q));
Thread c5 = new Thread(new DoProduction(q));
Thread c6 = new Thread(new DoProduction(q));
Thread c7 = new Thread(new DoProduction(q));
Thread c8 = new Thread(new DoProduction(q));
Thread c9 = new Thread(new DoProduction(q));
p1.start();
c1.start();
c2.start();
c3.start();
c4.start();
c5.start();
c6.start();
c7.start();
c8.start();
c9.start();
p1.join();
c1.join();
c2.join();
c3.join();
c4.join();
c5.join();
c6.join();
c7.join();
c8.join();
c9.join();
System.out.println("Done.");
}
}
First the Producer
import java.io.*;
import java.util.*;
public class PrepareProduction implements Runnable{
private final List<String> queue;
PrepareProduction(List<String> q) { queue = q; }
public void run() {
String thisLine;
System.out.println("Start PrepareProduction");
try {
FileInputStream fin = new FileInputStream("d:/input_data.dat");
BufferedReader input = new BufferedReader
(new InputStreamReader(fin));
while ((thisLine = input.readLine()) != null) {
queue.add(thisLine);
}
fin.close();
input.close();
// special marker for EOF
queue.add("*");
queue.add("*");
queue.add("*");
queue.add("*");
queue.add("*");
queue.add("*");
queue.add("*");
queue.add("*");
queue.add("*");
}
catch (Exception e) {
e.printStackTrace();
}
}
}
The Consumer
import java.util.*;
public class DoProduction implements Runnable {
private final List<String> queue;
DoProduction(List<String> q) { queue = q; }
public void run() {
try {
System.out.println("Start "
+ Thread.currentThread().getName());
// you may need to wait for the first data available
// with a BlockingQueue it's done for you.
String value = queue.remove(0);
while (!value.equals("*")) {
System.out.println(Thread.currentThread().getName()
+": " + value );
/*
do something with value
*/
value = queue.remove(0);
}
}
catch (Exception e) {
System.out.println(Thread.currentThread().getName()
+ " " + e.getMessage());
}
}
}
import java.util.*;
public class Test {
public static void main(String[] args) throws Exception {
List q = Collections.synchronizedList
(new LinkedList<String>());
Thread p1 = new Thread(new PrepareProduction(q));
Thread c1 = new Thread(new DoProduction(q));
Thread c2 = new Thread(new DoProduction(q));
Thread c3 = new Thread(new DoProduction(q));
Thread c4 = new Thread(new DoProduction(q));
Thread c5 = new Thread(new DoProduction(q));
Thread c6 = new Thread(new DoProduction(q));
Thread c7 = new Thread(new DoProduction(q));
Thread c8 = new Thread(new DoProduction(q));
Thread c9 = new Thread(new DoProduction(q));
p1.start();
c1.start();
c2.start();
c3.start();
c4.start();
c5.start();
c6.start();
c7.start();
c8.start();
c9.start();
p1.join();
c1.join();
c2.join();
c3.join();
c4.join();
c5.join();
c6.join();
c7.join();
c8.join();
c9.join();
System.out.println("Done.");
}
}
Written and compiled by Réal Gagnon ©1998-2007
[ home ]