Share this page 

Pipe the output of a thread to the input of another oneTag(s): Thread


We use the PipedOutputStream/PipedInputStream duo. When these streams are connected together what is written in the PipedOutputStream can be read in PipedInputStream. That connection acts like a queue (FIFO).
Pipeline Thread
  
      +----------+         +----------+
      | thread A |  --- >  | thread B |
      +----------+         +----------+
(PRODUCE)           (CONSUME)

[ProduceData.java]

import java.io.*;

public abstract class ProduceData implements Runnable {
  OutputStream os;

  public ProduceData(OutputStream os) {
    this.os = os;
    Thread t = new Thread(this);
    t.start();
    }
 
  public abstract boolean dataProduction();

  public void run() {
    while(dataProduction()) ;
    }
}

[ConsumeData.java]

import java.io.*;

public abstract class ConsumeData implements Runnable {
  InputStream is;

  public ConsumeData(InputStream is) {
    this.is = is;
    Thread t = new Thread(this);
    t.start();
    }

  public abstract boolean dataConsumption();

  public void run(){
    while(dataConsumption());
    }
}

Next we implement the methods to prepare/send the data.

[SendProduction.java]

import java.io.*;

public class SendProduction extends ProduceData {
   OutputStream output;

   SendProduction(OutputStream os) {
     super(os);
     this.output = os;
     }

   public boolean dataProduction() {
     byte[] j = new byte[1];
     boolean done = false;
     java.util.Random r = new java.util.Random();
      while(!done) {
       try {
         j[0] = (byte)(Math.abs(r.nextInt()) % 255);
         System.out.print(".");
         output.write(j);
         }
       catch (Exception e) { 
         e.printStackTrace();
         return true;
         }  
       }
     return done;   
     }
}

We implement the method to receive and process the data.

[ReceiveProduction.java]

import java.io.*;

public class ReceiveProduction extends ConsumeData {
  InputStream input;

  ReceiveProduction(InputStream is) {
     super(is);
     this.input = is;
     }

   public boolean dataConsumption() {
     int i = 0; 
     try {
       for (;;) {
          i = input.read();
          System.out.println("  " + i);
          }
       }
     catch (Exception e) {
       e.printStackTrace();
       }
     return true;
     } 
}

[TestThread.java]

import java.io.*;

public class TestThread {
  public static void main(String a[]){
    try {
       PipedOutputStream os = new PipedOutputStream();
       PipedInputStream is  = new PipedInputStream();
       os.connect(is);
       new SendProduction(os);
       new ReceiveProduction(is);       
       }
    catch (Exception e) {}
    }
}