Pipe the output of a thread to the input of other threadsTag(s): Thread
The idea is to make the READ operation ATOMIC. Once the READING is started for one thread, it cannot be interrupted by another one.
Pipeline Thread +----------+ +----------+ | thread A | ---- > | thread B | +----------+ | +----------+ (produce) | (consume) | | | +----------+ +- > | thread C | +----------+ (CONSUME)
The ProduceData.class and ConsumeData.class are the same as the previous JAVA How-to.
[AtomicInputStream.java]
import java.io.*; public class AtomicInputStream extends PipedInputStream { int atom = 0; AtomicInputStream(int atom) { super(); this.atom = atom; } public synchronized int atomicRead(byte[] x) { try { read(x, 0, atom); } catch (Exception e) { e.printStackTrace(); return -1; } return atom; } }
[SendProduction.java]
import java.io.*; public class SendProduction extends ProduceData { OutputStream os; SendProduction(OutputStream os) { super(os); this.os = os; } public boolean dataProduction() { byte[] j = new byte[3]; boolean done = false; j[0] = 0 ; j[1] = 1; j[2] = 2; while(!done) { try { os.write(j, 0, 3); } catch (Exception e) { e.printStackTrace(); return true; } } return done; } }
[ReceiveProduction.java]
import java.io.*; public class ReceiveProduction extends ConsumeData { AtomicInputStream as; ReceiveProduction(AtomicInputStream as) { super(as); this.as = as; } public boolean dataConsumption() { byte [] i = new byte[3]; try { for (;;) { as.read(i); System.out.println (Thread.currentThread().getName()+": " + i[0] + " " + i[1] + " " + i[2]); } } catch (Exception e) { e.printStackTrace(); } return true; } }
[TestThread.java]
import java.io.*; public class TestThread { public static void main(String a[]){ try { PipedOutputStream os = new PipedOutputStream(); AtomicInputStream as = new AtomicInputStream(3); os.connect(as); new SendProduction(os); new ReceiveProduction(as); new ReceiveProduction(as); new ReceiveProduction(as); } catch (Exception e) { e.printStackTrace(); } } }
To support many producers, simply create a AtomicOutputStream class with a synchronized atomicWrite method in it.
Using a PipedOutputStream is complicated, a simpler solution is to use Queue (especially with Java 5). A producer sends data to the Queue and the the consumers extract the data. See this HowTo.
mail_outline
Send comment, question or suggestion to howto@rgagnon.com
Send comment, question or suggestion to howto@rgagnon.com