001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.FilterInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024
025/**
026 * Reads bytes up to a maximum length, if its count goes above that, it stops.
027 * <p>
028 * This is useful to wrap ServletInputStreams. The ServletInputStream will block if you try to read content from it that isn't there, because it doesn't know
029 * whether the content hasn't arrived yet or whether the content has finished. So, one of these, initialized with the Content-length sent in the
030 * ServletInputStream's header, will stop it blocking, providing it's been sent with a correct content length.
031 * </p>
032 *
033 * @since 2.0
034 */
035public class BoundedInputStream extends FilterInputStream {
036
037    /** The max count of bytes to read. */
038    private final long maxCount;
039
040    /** The count of bytes read. */
041    private long count;
042
043    /** The marked position. */
044    private long mark = EOF;
045
046    /** Flag if close should be propagated. */
047    private boolean propagateClose = true;
048
049    /**
050     * Constructs a new {@link BoundedInputStream} that wraps the given input
051     * stream and is unlimited.
052     *
053     * @param in The wrapped input stream.
054     */
055    public BoundedInputStream(final InputStream in) {
056        this(in, EOF);
057    }
058
059    /**
060     * Constructs a new {@link BoundedInputStream} that wraps the given input
061     * stream and limits it to a certain size.
062     *
063     * @param inputStream The wrapped input stream.
064     * @param maxLength The maximum number of bytes to return.
065     */
066    public BoundedInputStream(final InputStream inputStream, final long maxLength) {
067        // Some badly designed methods - e.g. the servlet API - overload length
068        // such that "-1" means stream finished
069        super(inputStream);
070        this.maxCount = maxLength;
071    }
072
073    /**
074     * {@inheritDoc}
075     */
076    @Override
077    public int available() throws IOException {
078        if (isMaxLength()) {
079            onMaxLength(maxCount, count);
080            return 0;
081        }
082        return in.available();
083    }
084
085    /**
086     * Invokes the delegate's {@code close()} method
087     * if {@link #isPropagateClose()} is {@code true}.
088     *
089     * @throws IOException if an I/O error occurs.
090     */
091    @Override
092    public void close() throws IOException {
093        if (propagateClose) {
094            in.close();
095        }
096    }
097
098    /**
099     * Gets the count of bytes read.
100     *
101     * @return The count of bytes read.
102     * @since 2.12.0
103     */
104    public long getCount() {
105        return count;
106    }
107
108    /**
109     * Gets the max count of bytes to read.
110     *
111     * @return The max count of bytes to read.
112     * @since 2.12.0
113     */
114    public long getMaxLength() {
115        return maxCount;
116    }
117
118    private boolean isMaxLength() {
119        return maxCount >= 0 && count >= maxCount;
120    }
121
122    /**
123     * Tests whether the {@link #close()} method
124     * should propagate to the underling {@link InputStream}.
125     *
126     * @return {@code true} if calling {@link #close()}
127     * propagates to the {@code close()} method of the
128     * underlying stream or {@code false} if it does not.
129     */
130    public boolean isPropagateClose() {
131        return propagateClose;
132    }
133
134    /**
135     * Invokes the delegate's {@code mark(int)} method.
136     *
137     * @param readLimit read ahead limit
138     */
139    @Override
140    public synchronized void mark(final int readLimit) {
141        in.mark(readLimit);
142        mark = count;
143    }
144
145    /**
146     * Invokes the delegate's {@code markSupported()} method.
147     *
148     * @return true if mark is supported, otherwise false
149     */
150    @Override
151    public boolean markSupported() {
152        return in.markSupported();
153    }
154
155    /**
156     * A caller has caused a request that would cross the {@code maxLength} boundary.
157     *
158     * @param maxLength The max count of bytes to read.
159     * @param count The count of bytes read.
160     * @throws IOException Subclasses may throw.
161     * @since 2.12.0
162     */
163    protected void onMaxLength(final long maxLength, final long count) throws IOException {
164        // for subclasses
165    }
166
167    /**
168     * Invokes the delegate's {@code read()} method if
169     * the current position is less than the limit.
170     *
171     * @return the byte read or -1 if the end of stream or
172     * the limit has been reached.
173     * @throws IOException if an I/O error occurs.
174     */
175    @Override
176    public int read() throws IOException {
177        if (isMaxLength()) {
178            onMaxLength(maxCount, count);
179            return EOF;
180        }
181        final int result = in.read();
182        count++;
183        return result;
184    }
185
186    /**
187     * Invokes the delegate's {@code read(byte[])} method.
188     *
189     * @param b the buffer to read the bytes into
190     * @return the number of bytes read or -1 if the end of stream or
191     * the limit has been reached.
192     * @throws IOException if an I/O error occurs.
193     */
194    @Override
195    public int read(final byte[] b) throws IOException {
196        return this.read(b, 0, b.length);
197    }
198
199    /**
200     * Invokes the delegate's {@code read(byte[], int, int)} method.
201     *
202     * @param b the buffer to read the bytes into
203     * @param off The start offset
204     * @param len The number of bytes to read
205     * @return the number of bytes read or -1 if the end of stream or
206     * the limit has been reached.
207     * @throws IOException if an I/O error occurs.
208     */
209    @Override
210    public int read(final byte[] b, final int off, final int len) throws IOException {
211        if (isMaxLength()) {
212            onMaxLength(maxCount, count);
213            return EOF;
214        }
215        final long maxRead = maxCount >= 0 ? Math.min(len, maxCount - count) : len;
216        final int bytesRead = in.read(b, off, (int) maxRead);
217
218        if (bytesRead == EOF) {
219            return EOF;
220        }
221
222        count += bytesRead;
223        return bytesRead;
224    }
225
226    /**
227     * Invokes the delegate's {@code reset()} method.
228     *
229     * @throws IOException if an I/O error occurs.
230     */
231    @Override
232    public synchronized void reset() throws IOException {
233        in.reset();
234        count = mark;
235    }
236
237    /**
238     * Sets whether the {@link #close()} method
239     * should propagate to the underling {@link InputStream}.
240     *
241     * @param propagateClose {@code true} if calling
242     * {@link #close()} propagates to the {@code close()}
243     * method of the underlying stream or
244     * {@code false} if it does not.
245     */
246    public void setPropagateClose(final boolean propagateClose) {
247        this.propagateClose = propagateClose;
248    }
249
250    /**
251     * Invokes the delegate's {@code skip(long)} method.
252     *
253     * @param n the number of bytes to skip
254     * @return the actual number of bytes skipped
255     * @throws IOException if an I/O error occurs.
256     */
257    @Override
258    public long skip(final long n) throws IOException {
259        final long toSkip = maxCount >= 0 ? Math.min(n, maxCount - count) : n;
260        final long skippedBytes = in.skip(toSkip);
261        count += skippedBytes;
262        return skippedBytes;
263    }
264
265    /**
266     * Invokes the delegate's {@code toString()} method.
267     *
268     * @return the delegate's {@code toString()}
269     */
270    @Override
271    public String toString() {
272        return in.toString();
273    }
274}