Recently I've stumbled into an issue with Android's PipedInput/OutputStreams.


I have used them before and they have served me well, however int the following scenarioit did not play out as expected.

I have a class 'DecodeFeed' with methods- read(byte[], int length) :: int- write(byte[], int length) :: int- getProcessedStream() :: InputStream


An instance of this class may be passed to a jni function, which may arbitrarily callthe 'read' and 'write' methods to perform some local computation and then write back.If I output the data at the Java-Layer from within DecodeFeed.write, the results are,as expected. To implement 'getProcessedStream()', I created a PipedInputSteam 'ret' thatis connected to a PipedOutputStream 'pipe'. The 'write' method then calls pipe.write(buffer).Another thread now calls 'getProcessedStream()' and tries to read it, as you would reada normal InputStream.

However the data is broken. The reading thread uses read(buffer)::int, where the buffer size = 1024. However many a times, the method returns '1' as the number of bytes read. This seems unlikely, given that the 'write' method of 'DecodeFeed' usually writes something like 4096 bytes, every time it is called.


No matter how much I dabbled with wrapping the piped stream into buffered streams, the data gotmessed up, when sent through the pipe.


I would be interested to hear, if someone had similar problems and if so, how you fixed them.I will provide my own solution as an answer below.


To overcome the issues indicated above, I've implemented a Buffer class that I'm using inconjunction with a class 'IOStream' that subclasses 'InputStream'. This approach has workedfor me, so I share my solution. However I think that the PipedInput/OutputStreams should behave basically the same way. Maybe there is some gotcha that I didn't get, if not, I would consider this a serious issue.

package com.pack.io;

import java.io.IOException;
import java.io.InputStream;

public class IOStream extends InputStream {

public static final String TAG = "IOStream";

private Buffer local_buffer = null;

int write_pos = 0;

public IOStream() throws IOException {
    local_buffer = new Buffer();

 * @param size_in_kb defaults to 16 with default constructor
public IOStream(final int size_in_kb) {
    local_buffer = new Buffer(size_in_kb);

public void close() throws IOException{

public int read() throws IOException {
    return local_buffer.read();

public int read(byte[] buffer) throws IOException {
    return local_buffer.read(buffer);

public int read(byte[] buffer, int offset, int bytes) throws IOException {
    return local_buffer.read(buffer, offset, bytes);

public int write(byte[] buffer) throws IOException {
    return local_buffer.write(buffer);

public int write(final byte[] buffer, final int offset, final int bytes) throws IOException {
    return local_buffer.write(buffer, offset, bytes);


package com.pack.io;

public class Mutex {
private int mx_nb_resources = 1;
private int nb_resources = 0;
private final Object _mutex = new Object();

 * Construct a mutex, where at max mx_nb_resources
 * are allowed access
 * @param mx_nb_resources defaults to 1 (using the default constructor)
public Mutex(int mx_nb_resources) {
    this.mx_nb_resources = mx_nb_resources;

public Mutex() {}

public void lock() {
    synchronized (_mutex) {
        if (nb_resources > mx_nb_resources) {
            try {
            } catch (InterruptedException e) {}

public void unlock() {
    synchronized (_mutex) {

package com.pack.io;

import java.io.IOException;

public class Buffer {

public static final String TAG = "Buffer";

private Object _mutex = new Object();
private Mutex _read_mutex = new Mutex();
private Mutex _write_mutex = new Mutex();
private byte[][] local_buffer = null;
private int local_buffer_size = 0;
private int mx_local_bufer_size = 0;
private int chunk_size = 1024;
private int nb_chunks = 16;

int write_pos = 0;
private volatile boolean done = false;

 * Threadable Buffer class that may be written to and read from
 * - buffer size defaults to 16kb
 * - read-methods block, if not enough data is available
 * - write methods block, if buffer is full
 * - flush is a noop
 * - close will cause future read/writes to be noops, returning -1
public Buffer() {

 * Default constructor sets buffersize to 16 kb
 * @param size_in_kb set buffersize as desired
public Buffer(final int size_in_kb) {
    this.nb_chunks = size_in_kb;

private void init() {
    local_buffer = new byte[nb_chunks][chunk_size];
    mx_local_bufer_size = nb_chunks * chunk_size;

public void flush() {
    // Can't really do much here. In case a read/write thread is
    // waiting, it will be poked as soon as possible anyways.
    // It could be argued, that flush() should cause the
    // cache to be emptied before accepting new writes,
    // however this seems impractical and unnecessary.
    // For what it's worth, this method is provided for the
    // case, where an subclass of Buffer wants to use it.

public void close() throws IOException{
    synchronized (_mutex) {
        done = true;

public int read() throws IOException {
    byte[] buffer = new byte[1];
    return buffer[0];

public int read(byte[] buffer) throws IOException {
    return read(buffer, 0, buffer.length);

public int read(byte[] buffer, int offset, int bytes) throws IOException {
    synchronized (_mutex) {
        int bytes_read = 0;
        try {
            // block while not enough data is available
            while (local_buffer_size < bytes + offset && !done) {
                try {
                } catch (InterruptedException e) {}
            if (done) return -1;

            int pos = offset;

            for (int i = 0; i < bytes && pos < local_buffer_size; pos++, i++) {
                buffer[i] = local_buffer[pos / chunk_size][pos % chunk_size];

            bytes_read = Math.min(bytes, pos - offset);

            // left shift
            for (int i = 0, j = bytes_read + offset; j < local_buffer_size; i++, j++) {
                local_buffer[i / chunk_size][i % chunk_size] =
                        local_buffer[j / chunk_size][j % chunk_size];

            return bytes_read;
        } finally {
            local_buffer_size -= (bytes_read);
            write_pos -= (bytes_read + offset);
            _mutex.notify(); // write() might be waiting until more space is available, so notify

public int write(final byte[] buffer) throws IOException {
    return write(buffer, 0, buffer.length);

public int write(final byte[] buffer, final int offset, final int bytes) throws IOException {
    synchronized (_mutex) {
        try {
            // block while not enough room in the local_buffer
            while (local_buffer_size + bytes + offset > mx_local_bufer_size) {
                try {
                } catch (InterruptedException e) {}
            if (done) return -1;

            write_pos += offset;
            for (int i = 0; i < bytes; write_pos++, i++) {
                local_buffer[write_pos / chunk_size][write_pos % chunk_size]
                        = buffer[i];

            return bytes;
        } finally {
            local_buffer_size += bytes;
            _mutex.notify(); // read() might be waiting for input, so notify


07-26 10:54