001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.juneau.internal;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.Reader;
022import java.nio.ByteBuffer;
023import java.nio.CharBuffer;
024import java.nio.charset.Charset;
025import java.nio.charset.CharsetEncoder;
026import java.nio.charset.CoderResult;
027import java.nio.charset.CodingErrorAction;
028import java.util.Objects;
029
030/**
031 * {@link InputStream} implementation that reads a character stream from a {@link Reader}
032 * and transforms it to a byte stream using a specified charset encoding. The stream
033 * is transformed using a {@link CharsetEncoder} object, guaranteeing that all charset
034 * encodings supported by the JRE are handled correctly. In particular for charsets such as
035 * UTF-16, the implementation ensures that one and only one byte order marker
036 * is produced.
037 * <p>
038 * Since in general it is not possible to predict the number of characters to be read from the
039 * {@link Reader} to satisfy a read request on the {@link ReaderInputStream}, all reads from
040 * the {@link Reader} are buffered. There is therefore no well defined correlation
041 * between the current position of the {@link Reader} and that of the {@link ReaderInputStream}.
042 * This also implies that in general there is no need to wrap the underlying {@link Reader}
043 * in a {@link java.io.BufferedReader}.
044 * <p>
045 * {@link ReaderInputStream} implements the inverse transformation of {@link java.io.InputStreamReader};
046 * in the following example, reading from {@code in2} would return the same byte
047 * sequence as reading from {@code in} (provided that the initial byte sequence is legal
048 * with respect to the charset encoding):
049 * <pre>
050 * InputStream in = ...
051 * Charset cs = ...
052 * InputStreamReader reader = new InputStreamReader(in, cs);
053 * ReaderInputStream in2 = new ReaderInputStream(reader, cs);</pre>
054 * {@link ReaderInputStream} implements the same transformation as {@link java.io.OutputStreamWriter},
055 * except that the control flow is reversed: both classes transform a character stream
056 * into a byte stream, but {@link java.io.OutputStreamWriter} pushes data to the underlying stream,
057 * while {@link ReaderInputStream} pulls it from the underlying stream.
058 * <p>
059 * Note that while there are use cases where there is no alternative to using
060 * this class, very often the need to use this class is an indication of a flaw
061 * in the design of the code. This class is typically used in situations where an existing
062 * API only accepts an {@link InputStream}, but where the most natural way to produce the data
063 * is as a character stream, i.e. by providing a {@link Reader} instance. An example of a situation
064 * where this problem may appear is when implementing the {@code javax.activation.DataSource}
065 * interface from the Java Activation Framework.
066 * <p>
067 * Given the fact that the {@link Reader} class doesn't provide any way to predict whether the next
068 * read operation will block or not, it is not possible to provide a meaningful
069 * implementation of the {@link InputStream#available()} method. A call to this method
070 * will always return 0. Also, this class doesn't support {@link InputStream#mark(int)}.
071 * <p>
072 * Instances of {@link ReaderInputStream} are not thread safe.
073 *
074 * @since 2.0
075 */
076public class ReaderInputStream extends InputStream {
077    private static final int DEFAULT_BUFFER_SIZE = 1024;
078
079    private final Reader reader;
080    private final CharsetEncoder encoder;
081
082    /**
083     * CharBuffer used as input for the decoder. It should be reasonably
084     * large as we read data from the underlying Reader into this buffer.
085     */
086    private final CharBuffer encoderIn;
087
088    /**
089     * ByteBuffer used as output for the decoder. This buffer can be small
090     * as it is only used to transfer data from the decoder to the
091     * buffer provided by the caller.
092     */
093    private final ByteBuffer encoderOut;
094
095    private CoderResult lastCoderResult;
096    private boolean endOfInput;
097
098    /**
099     * Construct a new {@link ReaderInputStream}.
100     *
101     * @param reader the target {@link Reader}
102     * @param encoder the charset encoder
103     * @since 2.1
104     */
105    public ReaderInputStream(final Reader reader, final CharsetEncoder encoder) {
106        this(reader, encoder, DEFAULT_BUFFER_SIZE);
107    }
108
109    /**
110     * Construct a new {@link ReaderInputStream}.
111     *
112     * @param reader the target {@link Reader}
113     * @param encoder the charset encoder
114     * @param bufferSize the size of the input buffer in number of characters
115     * @since 2.1
116     */
117    public ReaderInputStream(final Reader reader, final CharsetEncoder encoder, final int bufferSize) {
118        this.reader = reader;
119        this.encoder = encoder;
120        this.encoderIn = CharBuffer.allocate(bufferSize);
121        this.encoderIn.flip();
122        this.encoderOut = ByteBuffer.allocate(128);
123        this.encoderOut.flip();
124    }
125
126    /**
127     * Construct a new {@link ReaderInputStream}.
128     *
129     * @param reader the target {@link Reader}
130     * @param charset the charset encoding
131     * @param bufferSize the size of the input buffer in number of characters
132     */
133    public ReaderInputStream(final Reader reader, final Charset charset, final int bufferSize) {
134        this(reader,
135             charset.newEncoder()
136                    .onMalformedInput(CodingErrorAction.REPLACE)
137                    .onUnmappableCharacter(CodingErrorAction.REPLACE),
138             bufferSize);
139    }
140
141    /**
142     * Construct a new {@link ReaderInputStream} with a default input buffer size of
143     * <c>1024</c> characters.
144     *
145     * @param reader the target {@link Reader}
146     * @param charset the charset encoding
147     */
148    public ReaderInputStream(final Reader reader, final Charset charset) {
149        this(reader, charset, DEFAULT_BUFFER_SIZE);
150    }
151
152    /**
153     * Construct a new {@link ReaderInputStream}.
154     *
155     * @param reader the target {@link Reader}
156     * @param charsetName the name of the charset encoding
157     * @param bufferSize the size of the input buffer in number of characters
158     */
159    public ReaderInputStream(final Reader reader, final String charsetName, final int bufferSize) {
160        this(reader, Charset.forName(charsetName), bufferSize);
161    }
162
163    /**
164     * Construct a new {@link ReaderInputStream} with a default input buffer size of
165     * <c>1024</c> characters.
166     *
167     * @param reader the target {@link Reader}
168     * @param charsetName the name of the charset encoding
169     */
170    public ReaderInputStream(final Reader reader, final String charsetName) {
171        this(reader, charsetName, DEFAULT_BUFFER_SIZE);
172    }
173
174    /**
175     * Fills the internal char buffer from the reader.
176     *
177     * @throws IOException
178     *             If an I/O error occurs
179     */
180    private void fillBuffer() throws IOException {
181        if (!endOfInput && (lastCoderResult == null || lastCoderResult.isUnderflow())) {
182            encoderIn.compact();
183            final int position = encoderIn.position();
184            // We don't use Reader#read(CharBuffer) here because it is more efficient
185            // to write directly to the underlying char array (the default implementation
186            // copies data to a temporary char array).
187            final int c = reader.read(encoderIn.array(), position, encoderIn.remaining());
188            if (c == -1) {
189                endOfInput = true;
190            } else {
191                encoderIn.position(position+c);
192            }
193            encoderIn.flip();
194        }
195        encoderOut.compact();
196        lastCoderResult = encoder.encode(encoderIn, encoderOut, endOfInput);
197        encoderOut.flip();
198    }
199
200    /**
201     * Read the specified number of bytes into an array.
202     *
203     * @param array the byte array to read into
204     * @param off the offset to start reading bytes into
205     * @param len the number of bytes to read
206     * @return the number of bytes read or <code>-1</code>
207     *         if the end of the stream has been reached
208     * @throws IOException if an I/O error occurs
209     */
210    @Override
211    public int read(final byte[] array, int off, int len) throws IOException {
212        Objects.requireNonNull(array, "array");
213        if (len < 0 || off < 0 || (off + len) > array.length) {
214            throw new IndexOutOfBoundsException("Array Size=" + array.length +
215                    ", offset=" + off + ", length=" + len);
216        }
217        int read = 0;
218        if (len == 0) {
219            return 0; // Always return 0 if len == 0
220        }
221        while (len > 0) {
222            if (encoderOut.hasRemaining()) {
223                final int c = Math.min(encoderOut.remaining(), len);
224                encoderOut.get(array, off, c);
225                off += c;
226                len -= c;
227                read += c;
228            } else {
229                fillBuffer();
230                if (endOfInput && !encoderOut.hasRemaining()) {
231                    break;
232                }
233            }
234        }
235        return read == 0 && endOfInput ? -1 : read;
236    }
237
238    /**
239     * Read the specified number of bytes into an array.
240     *
241     * @param b the byte array to read into
242     * @return the number of bytes read or <code>-1</code>
243     *         if the end of the stream has been reached
244     * @throws IOException if an I/O error occurs
245     */
246    @Override
247    public int read(final byte[] b) throws IOException {
248        return read(b, 0, b.length);
249    }
250
251    /**
252     * Read a single byte.
253     *
254     * @return either the byte read or <code>-1</code> if the end of the stream
255     *         has been reached
256     * @throws IOException if an I/O error occurs
257     */
258    @Override
259    public int read() throws IOException {
260        for (;;) {
261            if (encoderOut.hasRemaining()) {
262                return encoderOut.get() & 0xFF;
263            }
264            fillBuffer();
265            if (endOfInput && !encoderOut.hasRemaining()) {
266                return -1;
267            }
268        }
269    }
270
271    /**
272     * Close the stream. This method will cause the underlying {@link Reader}
273     * to be closed.
274     * @throws IOException if an I/O error occurs
275     */
276    @Override
277    public void close() throws IOException {
278        reader.close();
279    }
280}