Jump to Real's How-to Main page

Pipe the output of a thread to the input of other threads

The idea is to make the READ operation ATOMIC. Once the READING is started for one thread, it cannot be interrupted by another one.
   Pipeline Thread
  
      +----------+         +----------+
      | thread A |  ---- > | thread B |
      +----------+    |    +----------+
       (produce)      |     (consume)
                      | 
                      |
                      |    +----------+
                      +- > | thread C |
                           +----------+ 
(CONSUME)

The ProduceData.class and ConsumeData.class are the same as the previous JAVA How-to.

[AtomicInputStream.java]

import java.io.*;

public class AtomicInputStream extends PipedInputStream {
  int atom = 0;

  AtomicInputStream(int atom) {
    super();
    this.atom = atom;
    }
  public synchronized int atomicRead(byte[] x)  {
    try {
      read(x, 0, atom);
      }
    catch (Exception e) {
      e.printStackTrace();
      return -1;
      } 
    return atom;
    }
}

[SendProduction.java]

import java.io.*;

public class SendProduction extends ProduceData {
  OutputStream os;

  SendProduction(OutputStream os) {
  super(os);
  this.os = os;
  }

  public boolean dataProduction() {
    byte[] j = new byte[3];
    boolean done = false;
    j[0] = 0 ; j[1] = 1; j[2] = 2;  
  
    while(!done) {
      try {
        os.write(j, 0, 3);
        }
      catch (Exception e) { 
        e.printStackTrace();
        return true;
        }   
      }
    return done;    
    }
}

[ReceiveProduction.java]

import java.io.*;

public class ReceiveProduction extends ConsumeData {
  AtomicInputStream as;

  ReceiveProduction(AtomicInputStream as) {
    super(as);
    this.as = as;
    }

  public boolean dataConsumption() {
    byte [] i = new byte[3]; 
    try {
      for (;;) {
        as.read(i);
        System.out.println
          (Thread.currentThread().getName()+": " +
           i[0] + " " + i[1] + " " + i[2]);
        }
      }
    catch (Exception e) {
      e.printStackTrace();
      }
    return true;
    } 
}

[TestThread.java]

import java.io.*;

public class TestThread {
   public static void main(String a[]){
     try {
       PipedOutputStream os = new PipedOutputStream();
       AtomicInputStream as = new AtomicInputStream(3);
       os.connect(as);
       new SendProduction(os);
       new ReceiveProduction(as);       
       new ReceiveProduction(as);
       new ReceiveProduction(as);           
       }
     catch (Exception e) {
       e.printStackTrace();
       }
     }
}
That's OK for the situation One Producer and Many consumers.
To support many producers, simply create a AtomicOutputStream class with a synchronized atomicWrite method in it.

Using a PipedOutputStream is complicated, a simpler solution is to use Queue (especially with Java 5). A producer sends data to the Queue and the the consumers extract the data. See this HowTo.


If you find this article useful, consider making a small donation
to show your support for this Web site and its content.

Written and compiled by Réal Gagnon ©1998-2005
[ home ]