Pipe the output of a thread to the input of other threadsTag(s): Thread
The idea is to make the READ operation ATOMIC. Once the READING is started for one thread, it cannot be interrupted by another one.
Pipeline Thread
+----------+ +----------+
| thread A | ---- > | thread B |
+----------+ | +----------+
(produce) | (consume)
|
|
| +----------+
+- > | thread C |
+----------+
(CONSUME)The ProduceData.class and ConsumeData.class are the same as the previous JAVA How-to.
[AtomicInputStream.java]
import java.io.*;
public class AtomicInputStream extends PipedInputStream {
int atom = 0;
AtomicInputStream(int atom) {
super();
this.atom = atom;
}
public synchronized int atomicRead(byte[] x) {
try {
read(x, 0, atom);
}
catch (Exception e) {
e.printStackTrace();
return -1;
}
return atom;
}
}[SendProduction.java]
import java.io.*;
public class SendProduction extends ProduceData {
OutputStream os;
SendProduction(OutputStream os) {
super(os);
this.os = os;
}
public boolean dataProduction() {
byte[] j = new byte[3];
boolean done = false;
j[0] = 0 ; j[1] = 1; j[2] = 2;
while(!done) {
try {
os.write(j, 0, 3);
}
catch (Exception e) {
e.printStackTrace();
return true;
}
}
return done;
}
}[ReceiveProduction.java]
import java.io.*;
public class ReceiveProduction extends ConsumeData {
AtomicInputStream as;
ReceiveProduction(AtomicInputStream as) {
super(as);
this.as = as;
}
public boolean dataConsumption() {
byte [] i = new byte[3];
try {
for (;;) {
as.read(i);
System.out.println
(Thread.currentThread().getName()+": " +
i[0] + " " + i[1] + " " + i[2]);
}
}
catch (Exception e) {
e.printStackTrace();
}
return true;
}
}[TestThread.java]
import java.io.*;
public class TestThread {
public static void main(String a[]){
try {
PipedOutputStream os = new PipedOutputStream();
AtomicInputStream as = new AtomicInputStream(3);
os.connect(as);
new SendProduction(os);
new ReceiveProduction(as);
new ReceiveProduction(as);
new ReceiveProduction(as);
}
catch (Exception e) {
e.printStackTrace();
}
}
}To support many producers, simply create a AtomicOutputStream class with a synchronized atomicWrite method in it.
Using a PipedOutputStream is complicated, a simpler solution is to use Queue (especially with Java 5). A producer sends data to the Queue and the the consumers extract the data. See this HowTo.
mail_outline
Send comment, question or suggestion to howto@rgagnon.com
Send comment, question or suggestion to howto@rgagnon.com