001// ***************************************************************************************************************************
002// * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file *
003// * distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file        *
004// * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance            *
005// * with the License.  You may obtain a copy of the License at                                                              *
006// *                                                                                                                         *
007// *  http://www.apache.org/licenses/LICENSE-2.0                                                                             *
008// *                                                                                                                         *
009// * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an  *
010// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the        *
011// * specific language governing permissions and limitations under the License.                                              *
012// ***************************************************************************************************************************
013package org.apache.juneau.utils;
014
015import static org.apache.juneau.internal.IOUtils.*;
016import static org.apache.juneau.internal.ThrowableUtils.*;
017
018import java.io.*;
019import java.util.*;
020
021import org.apache.juneau.internal.*;
022
023/**
024 * A utility class for piping input streams and readers to output streams and writers.
025 *
026 * <p>
027 * A typical usage is as follows...
028 * <p class='bcode w800'>
029 *    InputStream in = getInputStream();
030 *    Writer out = getWriter();
031 *    IOPipe.create(in, out).closeOut().run();
032 * </p>
033 *
034 * <p>
035 * By default, the input stream is closed and the output stream is not.
036 * This can be changed by calling {@link #closeOut()} and {@link #close(boolean, boolean)}.
037 */
038public class IOPipe {
039
040   private Object input, output;
041   private boolean byLines;
042   private boolean closeIn = true, closeOut;
043   private int buffSize = 1024;
044   private LineProcessor lineProcessor;
045
046   private IOPipe(Object input, Object output) {
047      assertFieldNotNull(input, "input");
048      assertFieldNotNull(output, "output");
049
050      if (input instanceof InputStream || input instanceof Reader || input instanceof File || input instanceof byte[] || input instanceof CharSequence || input == null)
051         this.input = input;
052      else
053         illegalArg("Invalid input class type.  Must be one of the following:  InputStream, Reader, CharSequence, byte[], File");
054
055      if (output instanceof OutputStream || output instanceof Writer)
056         this.output = output;
057      else
058         illegalArg("Invalid output class type.  Must be one of the following:  OutputStream, Writer");
059   }
060
061   /**
062    * Creates a new pipe with the specified input and output.
063    *
064    * @param input The input.  Must be one of the following types:  Reader, InputStream, CharSequence.
065    * @param output The output.  Must be one of the following types:  Writer, OutputStream.
066    * @return This object (for method chaining).
067    */
068   public static IOPipe create(Object input, Object output) {
069      return new IOPipe(input, output);
070   }
071
072   /**
073    * Close output after piping.
074    *
075    * @return This object (for method chaining).
076    */
077   public IOPipe closeOut() {
078      this.closeOut = true;
079      return this;
080   }
081
082   /**
083    * Specifies whether to close the input and output after piping.
084    *
085    * @param in Close input stream.  Default is <jk>true</jk>.
086    * @param out Close output stream.  Default is <jk>false</jk>.
087    * @return This object (for method chaining).
088    */
089   public IOPipe close(boolean in, boolean out) {
090      this.closeIn = in;
091      this.closeOut = out;
092      return this;
093   }
094
095   /**
096    * Specifies the temporary buffer size.
097    *
098    * @param buffSize The buffer size.  Default is <c>1024</c>.
099    * @return This object (for method chaining).
100    */
101   public IOPipe buffSize(int buffSize) {
102      assertFieldPositive(buffSize, "buffSize");
103      this.buffSize = buffSize;
104      return this;
105   }
106
107   /**
108    * Specifies whether the content should be piped line-by-line.
109    *
110    * <p>
111    * This can be useful if you're trying to pipe console-based input.
112    *
113    * @param byLines Pipe content line-by-line.  Default is <jk>false</jk>.
114    * @return This object (for method chaining).
115    */
116   public IOPipe byLines(boolean byLines) {
117      this.byLines = byLines;
118      return this;
119   }
120
121   /**
122    * Same as calling {@link #byLines()} with <jk>true</jk>.
123    *
124    * @return This object (for method chaining).
125    */
126   public IOPipe byLines() {
127      this.byLines = true;
128      return this;
129   }
130
131   /**
132    * Specifies a line processor that can be used to process lines before they're piped to the output.
133    *
134    * @param lineProcessor The line processor.
135    * @return This object (for method chaining).
136    */
137   public IOPipe lineProcessor(LineProcessor lineProcessor) {
138      this.lineProcessor = lineProcessor;
139      return this;
140   }
141
142   /**
143    * Interface to implement for the {@link #lineProcessor(LineProcessor)} method.
144    */
145   public interface LineProcessor {
146      /**
147       * Process the specified line.
148       *
149       * @param line The line to process.
150       * @return The processed line.
151       */
152      public String process(String line);
153   }
154
155   /**
156    * Performs the piping of the input to the output.
157    *
158    * @return The number of bytes (if streams) or characters (if readers/writers) piped.
159    * @throws IOException Thrown by underlying stream.
160    */
161   public int run() throws IOException {
162
163      int c = 0;
164
165      try {
166         if (input == null)
167            return 0;
168
169         if ((input instanceof InputStream || input instanceof byte[]) && output instanceof OutputStream && lineProcessor == null) {
170            OutputStream out = (OutputStream)output;
171            if (input instanceof InputStream) {
172               InputStream in = (InputStream)input;
173               byte[] b = new byte[buffSize];
174               int i;
175               while ((i = in.read(b)) > 0) {
176                  c += i;
177                  out.write(b, 0, i);
178               }
179            } else {
180               byte[] b = (byte[])input;
181               out.write(b);
182               c = b.length;
183            }
184            out.flush();
185         } else {
186            @SuppressWarnings("resource")
187            Writer out = (output instanceof Writer ? (Writer)output : new OutputStreamWriter((OutputStream)output, UTF8));
188            closeIn |= input instanceof File;
189            if (byLines || lineProcessor != null) {
190               Reader in = null;
191               if (input instanceof Reader)
192                  in = (Reader)input;
193               else if (input instanceof InputStream)
194                  in = new InputStreamReader((InputStream)input, UTF8);
195               else if (input instanceof File)
196                  in = new FileReader((File)input);
197               else if (input instanceof byte[])
198                  in = new StringReader(new String((byte[])input, "UTF8"));
199               else if (input instanceof CharSequence)
200                  in = new StringReader(input.toString());
201               try (Scanner s = new Scanner(in)) {
202                  while (s.hasNextLine()) {
203                     String l = s.nextLine();
204                     if (lineProcessor != null)
205                        l = lineProcessor.process(l);
206                     if (l != null) {
207                        out.write(l);
208                        out.write("\n");
209                        out.flush();
210                        c += l.length() + 1;
211                     }
212                  }
213               }
214            } else {
215               if (input instanceof InputStream)
216                  input = new InputStreamReader((InputStream)input, UTF8);
217               else if (input instanceof File)
218                  input = new FileReader((File)input);
219               else if (input instanceof byte[])
220                  input = new String((byte[])input, UTF8);
221
222               if (input instanceof Reader) {
223                  Reader in = (Reader)input;
224                  int i;
225                  char[] b = new char[buffSize];
226                  while ((i = in.read(b)) > 0) {
227                     c += i;
228                     out.write(b, 0, i);
229                  }
230               } else {
231                  String s = input.toString();
232                  out.write(s);
233                  c = s.length();
234               }
235            }
236            out.flush();
237         }
238      } finally {
239         closeQuietly(input, output);
240      }
241      return c;
242   }
243
244   private void closeQuietly(Object input, Object output) {
245      if (closeIn)
246         IOUtils.closeQuietly(input);
247      if (closeOut)
248         IOUtils.closeQuietly(output);
249   }
250}