001// ***************************************************************************************************************************
002// * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file *
003// * distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file        *
004// * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance            *
005// * with the License.  You may obtain a copy of the License at                                                              *
006// *                                                                                                                         *
007// *  http://www.apache.org/licenses/LICENSE-2.0                                                                             *
008// *                                                                                                                         *
009// * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an  *
010// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the        *
011// * specific language governing permissions and limitations under the License.                                              *
012// ***************************************************************************************************************************
013package org.apache.juneau.utils;
014
015import static org.apache.juneau.internal.IOUtils.*;
016import static org.apache.juneau.internal.ThrowableUtils.*;
017
018import java.io.*;
019import java.util.*;
020
021import org.apache.juneau.internal.*;
022
023/**
024 * A utility class for piping input streams and readers to output streams and writers.
025 *
026 * <p>
027 * A typical usage is as follows...
028 * <p class='bcode w800'>
029 *    InputStream in = getInputStream();
030 *    Writer out = getWriter();
031 *    IOPipe.create(in, out).closeOut().run();
032 * </p>
033 *
034 * <p>
035 * By default, the input stream is closed and the output stream is not.
036 * This can be changed by calling {@link #closeOut()} and {@link #close(boolean, boolean)}.
037 */
038public class IOPipe {
039
040   private Object input, output;
041   private boolean byLines;
042   private boolean closeIn = true, closeOut;
043   private int buffSize = 1024;
044   private LineProcessor lineProcessor;
045
046   private IOPipe(Object input, Object output) {
047      assertFieldNotNull(input, "input");
048      assertFieldNotNull(output, "output");
049
050      if (input instanceof CharSequence)
051         this.input = new StringReader(input.toString());
052      else if (input instanceof InputStream || input instanceof Reader)
053         this.input = input;
054      else
055         illegalArg("Invalid input class type.  Must be one of the following:  InputStream, Reader, CharSequence");
056
057      if (output instanceof OutputStream || output instanceof Writer)
058         this.output = output;
059      else
060         illegalArg("Invalid output class type.  Must be one of the following:  OutputStream, Writer");
061   }
062
063   /**
064    * Creates a new pipe with the specified input and output.
065    *
066    * @param input The input.  Must be one of the following types:  Reader, InputStream, CharSequence.
067    * @param output The output.  Must be one of the following types:  Writer, OutputStream.
068    * @return This object (for method chaining).
069    */
070   public static IOPipe create(Object input, Object output) {
071      return new IOPipe(input, output);
072   }
073
074   /**
075    * Close output after piping.
076    *
077    * @return This object (for method chaining).
078    */
079   public IOPipe closeOut() {
080      this.closeOut = true;
081      return this;
082   }
083
084   /**
085    * Specifies whether to close the input and output after piping.
086    *
087    * @param in Close input stream.  Default is <jk>true</jk>.
088    * @param out Close output stream.  Default is <jk>false</jk>.
089    * @return This object (for method chaining).
090    */
091   public IOPipe close(boolean in, boolean out) {
092      this.closeIn = in;
093      this.closeOut = out;
094      return this;
095   }
096
097   /**
098    * Specifies the temporary buffer size.
099    *
100    * @param buffSize The buffer size.  Default is <code>1024</code>.
101    * @return This object (for method chaining).
102    */
103   public IOPipe buffSize(int buffSize) {
104      assertFieldPositive(buffSize, "buffSize");
105      this.buffSize = buffSize;
106      return this;
107   }
108
109   /**
110    * Specifies whether the content should be piped line-by-line.
111    *
112    * <p>
113    * This can be useful if you're trying to pipe console-based input.
114    *
115    * @param byLines Pipe content line-by-line.  Default is <jk>false</jk>.
116    * @return This object (for method chaining).
117    */
118   public IOPipe byLines(boolean byLines) {
119      this.byLines = byLines;
120      return this;
121   }
122
123   /**
124    * Same as calling {@link #byLines()} with <jk>true</jk>.
125    *
126    * @return This object (for method chaining).
127    */
128   public IOPipe byLines() {
129      this.byLines = true;
130      return this;
131   }
132
133   /**
134    * Specifies a line processor that can be used to process lines before they're piped to the output.
135    *
136    * @param lineProcessor The line processor.
137    * @return This object (for method chaining).
138    */
139   public IOPipe lineProcessor(LineProcessor lineProcessor) {
140      this.lineProcessor = lineProcessor;
141      return this;
142   }
143
144   /**
145    * Interface to implement for the {@link #lineProcessor(LineProcessor)} method.
146    */
147   public interface LineProcessor {
148      /**
149       * Process the specified line.
150       *
151       * @param line The line to process.
152       * @return The processed line.
153       */
154      public String process(String line);
155   }
156
157   /**
158    * Performs the piping of the input to the output.
159    *
160    * @return The number of bytes (if streams) or characters (if readers/writers) piped.
161    * @throws IOException
162    */
163   public int run() throws IOException {
164
165      int c = 0;
166
167      try {
168         if (input instanceof InputStream && output instanceof OutputStream && lineProcessor == null) {
169            InputStream in = (InputStream)input;
170            OutputStream out = (OutputStream)output;
171            byte[] b = new byte[buffSize];
172            int i;
173            while ((i = in.read(b)) > 0) {
174               c += i;
175               out.write(b, 0, i);
176            }
177            out.flush();
178         } else {
179            Reader in = (input instanceof Reader ? (Reader)input : new InputStreamReader((InputStream)input, UTF8));
180            Writer out = (output instanceof Writer ? (Writer)output : new OutputStreamWriter((OutputStream)output, UTF8));
181            output = out;
182            input = in;
183            if (byLines || lineProcessor != null) {
184               try (Scanner s = new Scanner(in)) {
185                  while (s.hasNextLine()) {
186                     String l = s.nextLine();
187                     if (lineProcessor != null)
188                        l = lineProcessor.process(l);
189                     if (l != null) {
190                        out.write(l);
191                        out.write("\n");
192                        out.flush();
193                        c += l.length() + 1;
194                     }
195                  }
196               }
197            } else {
198               int i;
199               char[] b = new char[buffSize];
200               while ((i = in.read(b)) > 0) {
201                  c += i;
202                  out.write(b, 0, i);
203               }
204            }
205            out.flush();
206         }
207      } finally {
208         closeQuietly(input, output);
209      }
210      return c;
211   }
212
213   private void closeQuietly(Object input, Object output) {
214      if (closeIn)
215         IOUtils.closeQuietly(input);
216      if (closeOut)
217         IOUtils.closeQuietly(output);
218   }
219}