001// *************************************************************************************************************************** 002// * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file * 003// * distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * 004// * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance * 005// * with the License. You may obtain a copy of the License at * 006// * * 007// * http://www.apache.org/licenses/LICENSE-2.0 * 008// * * 009// * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an * 010// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * 011// * specific language governing permissions and limitations under the License. * 012// *************************************************************************************************************************** 013package org.apache.juneau.utils; 014 015import static org.apache.juneau.internal.IOUtils.*; 016import static org.apache.juneau.internal.ThrowableUtils.*; 017 018import java.io.*; 019import java.util.*; 020 021import org.apache.juneau.internal.*; 022 023/** 024 * A utility class for piping input streams and readers to output streams and writers. 025 * 026 * <p> 027 * A typical usage is as follows... 028 * <p class='bcode w800'> 029 * InputStream in = getInputStream(); 030 * Writer out = getWriter(); 031 * IOPipe.create(in, out).closeOut().run(); 032 * </p> 033 * 034 * <p> 035 * By default, the input stream is closed and the output stream is not. 036 * This can be changed by calling {@link #closeOut()} and {@link #close(boolean, boolean)}. 037 */ 038public class IOPipe { 039 040 private Object input, output; 041 private boolean byLines; 042 private boolean closeIn = true, closeOut; 043 private int buffSize = 1024; 044 private LineProcessor lineProcessor; 045 046 private IOPipe(Object input, Object output) { 047 assertFieldNotNull(input, "input"); 048 assertFieldNotNull(output, "output"); 049 050 if (input instanceof InputStream || input instanceof Reader || input instanceof File || input instanceof byte[] || input instanceof CharSequence || input == null) 051 this.input = input; 052 else 053 illegalArg("Invalid input class type. Must be one of the following: InputStream, Reader, CharSequence, byte[], File"); 054 055 if (output instanceof OutputStream || output instanceof Writer) 056 this.output = output; 057 else 058 illegalArg("Invalid output class type. Must be one of the following: OutputStream, Writer"); 059 } 060 061 /** 062 * Creates a new pipe with the specified input and output. 063 * 064 * @param input The input. Must be one of the following types: Reader, InputStream, CharSequence. 065 * @param output The output. Must be one of the following types: Writer, OutputStream. 066 * @return This object (for method chaining). 067 */ 068 public static IOPipe create(Object input, Object output) { 069 return new IOPipe(input, output); 070 } 071 072 /** 073 * Close output after piping. 074 * 075 * @return This object (for method chaining). 076 */ 077 public IOPipe closeOut() { 078 this.closeOut = true; 079 return this; 080 } 081 082 /** 083 * Specifies whether to close the input and output after piping. 084 * 085 * @param in Close input stream. Default is <jk>true</jk>. 086 * @param out Close output stream. Default is <jk>false</jk>. 087 * @return This object (for method chaining). 088 */ 089 public IOPipe close(boolean in, boolean out) { 090 this.closeIn = in; 091 this.closeOut = out; 092 return this; 093 } 094 095 /** 096 * Specifies the temporary buffer size. 097 * 098 * @param buffSize The buffer size. Default is <c>1024</c>. 099 * @return This object (for method chaining). 100 */ 101 public IOPipe buffSize(int buffSize) { 102 assertFieldPositive(buffSize, "buffSize"); 103 this.buffSize = buffSize; 104 return this; 105 } 106 107 /** 108 * Specifies whether the content should be piped line-by-line. 109 * 110 * <p> 111 * This can be useful if you're trying to pipe console-based input. 112 * 113 * @param byLines Pipe content line-by-line. Default is <jk>false</jk>. 114 * @return This object (for method chaining). 115 */ 116 public IOPipe byLines(boolean byLines) { 117 this.byLines = byLines; 118 return this; 119 } 120 121 /** 122 * Same as calling {@link #byLines()} with <jk>true</jk>. 123 * 124 * @return This object (for method chaining). 125 */ 126 public IOPipe byLines() { 127 this.byLines = true; 128 return this; 129 } 130 131 /** 132 * Specifies a line processor that can be used to process lines before they're piped to the output. 133 * 134 * @param lineProcessor The line processor. 135 * @return This object (for method chaining). 136 */ 137 public IOPipe lineProcessor(LineProcessor lineProcessor) { 138 this.lineProcessor = lineProcessor; 139 return this; 140 } 141 142 /** 143 * Interface to implement for the {@link #lineProcessor(LineProcessor)} method. 144 */ 145 public interface LineProcessor { 146 /** 147 * Process the specified line. 148 * 149 * @param line The line to process. 150 * @return The processed line. 151 */ 152 public String process(String line); 153 } 154 155 /** 156 * Performs the piping of the input to the output. 157 * 158 * @return The number of bytes (if streams) or characters (if readers/writers) piped. 159 * @throws IOException Thrown by underlying stream. 160 */ 161 public int run() throws IOException { 162 163 int c = 0; 164 165 try { 166 if (input == null) 167 return 0; 168 169 if ((input instanceof InputStream || input instanceof byte[]) && output instanceof OutputStream && lineProcessor == null) { 170 OutputStream out = (OutputStream)output; 171 if (input instanceof InputStream) { 172 InputStream in = (InputStream)input; 173 byte[] b = new byte[buffSize]; 174 int i; 175 while ((i = in.read(b)) > 0) { 176 c += i; 177 out.write(b, 0, i); 178 } 179 } else { 180 byte[] b = (byte[])input; 181 out.write(b); 182 c = b.length; 183 } 184 out.flush(); 185 } else { 186 @SuppressWarnings("resource") 187 Writer out = (output instanceof Writer ? (Writer)output : new OutputStreamWriter((OutputStream)output, UTF8)); 188 closeIn |= input instanceof File; 189 if (byLines || lineProcessor != null) { 190 Reader in = null; 191 if (input instanceof Reader) 192 in = (Reader)input; 193 else if (input instanceof InputStream) 194 in = new InputStreamReader((InputStream)input, UTF8); 195 else if (input instanceof File) 196 in = new FileReader((File)input); 197 else if (input instanceof byte[]) 198 in = new StringReader(new String((byte[])input, "UTF8")); 199 else if (input instanceof CharSequence) 200 in = new StringReader(input.toString()); 201 try (Scanner s = new Scanner(in)) { 202 while (s.hasNextLine()) { 203 String l = s.nextLine(); 204 if (lineProcessor != null) 205 l = lineProcessor.process(l); 206 if (l != null) { 207 out.write(l); 208 out.write("\n"); 209 out.flush(); 210 c += l.length() + 1; 211 } 212 } 213 } 214 } else { 215 if (input instanceof InputStream) 216 input = new InputStreamReader((InputStream)input, UTF8); 217 else if (input instanceof File) 218 input = new FileReader((File)input); 219 else if (input instanceof byte[]) 220 input = new String((byte[])input, UTF8); 221 222 if (input instanceof Reader) { 223 Reader in = (Reader)input; 224 int i; 225 char[] b = new char[buffSize]; 226 while ((i = in.read(b)) > 0) { 227 c += i; 228 out.write(b, 0, i); 229 } 230 } else { 231 String s = input.toString(); 232 out.write(s); 233 c = s.length(); 234 } 235 } 236 out.flush(); 237 } 238 } finally { 239 closeQuietly(input, output); 240 } 241 return c; 242 } 243 244 private void closeQuietly(Object input, Object output) { 245 if (closeIn) 246 IOUtils.closeQuietly(input); 247 if (closeOut) 248 IOUtils.closeQuietly(output); 249 } 250}