001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PushbackInputStream; 024import java.util.Arrays; 025 026import org.apache.commons.compress.compressors.CompressorInputStream; 027import org.apache.commons.compress.utils.BoundedInputStream; 028import org.apache.commons.compress.utils.ByteUtils; 029import org.apache.commons.compress.utils.CountingInputStream; 030import org.apache.commons.compress.utils.IOUtils; 031import org.apache.commons.compress.utils.InputStreamStatistics; 032 033/** 034 * CompressorInputStream for the framing Snappy format. 035 * 036 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p> 037 * 038 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 039 * @since 1.7 040 */ 041public class FramedSnappyCompressorInputStream extends CompressorInputStream 042 implements InputStreamStatistics { 043 044 /** 045 * package private for tests only. 046 */ 047 static final long MASK_OFFSET = 0xa282ead8L; 048 049 private static final int STREAM_IDENTIFIER_TYPE = 0xff; 050 static final int COMPRESSED_CHUNK_TYPE = 0; 051 private static final int UNCOMPRESSED_CHUNK_TYPE = 1; 052 private static final int PADDING_CHUNK_TYPE = 0xfe; 053 private static final int MIN_UNSKIPPABLE_TYPE = 2; 054 private static final int MAX_UNSKIPPABLE_TYPE = 0x7f; 055 private static final int MAX_SKIPPABLE_TYPE = 0xfd; 056 057 // used by FramedSnappyCompressorOutputStream as well 058 static final byte[] SZ_SIGNATURE = { //NOSONAR 059 (byte) STREAM_IDENTIFIER_TYPE, // tag 060 6, 0, 0, // length 061 's', 'N', 'a', 'P', 'p', 'Y' 062 }; 063 064 /** 065 * Checks if the signature matches what is expected for a .sz file. 066 * 067 * <p>.sz files start with a chunk with tag 0xff and content sNaPpY.</p> 068 * 069 * @param signature the bytes to check 070 * @param length the number of bytes to check 071 * @return true if this is a .sz stream, false otherwise 072 */ 073 public static boolean matches(final byte[] signature, final int length) { 074 075 if (length < SZ_SIGNATURE.length) { 076 return false; 077 } 078 079 byte[] shortenedSig = signature; 080 if (signature.length > SZ_SIGNATURE.length) { 081 shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length); 082 } 083 084 return Arrays.equals(shortenedSig, SZ_SIGNATURE); 085 } 086 static long unmask(long x) { 087 // ugly, maybe we should just have used ints and deal with the 088 // overflow 089 x -= MASK_OFFSET; 090 x &= 0xffffFFFFL; 091 return (x >> 17 | x << 15) & 0xffffFFFFL; 092 } 093 094 private long unreadBytes; 095 096 private final CountingInputStream countingStream; 097 098 /** The underlying stream to read compressed data from */ 099 private final PushbackInputStream inputStream; 100 101 /** The dialect to expect */ 102 private final FramedSnappyDialect dialect; 103 104 private SnappyCompressorInputStream currentCompressedChunk; 105 106 // used in no-arg read method 107 private final byte[] oneByte = new byte[1]; 108 private boolean endReached, inUncompressedChunk; 109 private int uncompressedBytesRemaining; 110 private long expectedChecksum = -1; 111 112 private final int blockSize; 113 114 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 115 116 private final ByteUtils.ByteSupplier supplier = this::readOneByte; 117 118 /** 119 * Constructs a new input stream that decompresses 120 * snappy-framed-compressed data from the specified input stream 121 * using the {@link FramedSnappyDialect#STANDARD} dialect. 122 * @param in the InputStream from which to read the compressed data 123 * @throws IOException if reading fails 124 */ 125 public FramedSnappyCompressorInputStream(final InputStream in) throws IOException { 126 this(in, FramedSnappyDialect.STANDARD); 127 } 128 129 /** 130 * Constructs a new input stream that decompresses snappy-framed-compressed data 131 * from the specified input stream. 132 * @param in the InputStream from which to read the compressed data 133 * @param dialect the dialect used by the compressed stream 134 * @throws IOException if reading fails 135 */ 136 public FramedSnappyCompressorInputStream(final InputStream in, 137 final FramedSnappyDialect dialect) 138 throws IOException { 139 this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect); 140 } 141 142 /** 143 * Constructs a new input stream that decompresses snappy-framed-compressed data 144 * from the specified input stream. 145 * @param in the InputStream from which to read the compressed data 146 * @param blockSize the block size to use for the compressed stream 147 * @param dialect the dialect used by the compressed stream 148 * @throws IOException if reading fails 149 * @throws IllegalArgumentException if blockSize is not bigger than 0 150 * @since 1.14 151 */ 152 public FramedSnappyCompressorInputStream(final InputStream in, 153 final int blockSize, 154 final FramedSnappyDialect dialect) 155 throws IOException { 156 if (blockSize <= 0) { 157 throw new IllegalArgumentException("blockSize must be bigger than 0"); 158 } 159 countingStream = new CountingInputStream(in); 160 this.inputStream = new PushbackInputStream(countingStream, 1); 161 this.blockSize = blockSize; 162 this.dialect = dialect; 163 if (dialect.hasStreamIdentifier()) { 164 readStreamIdentifier(); 165 } 166 } 167 168 /** {@inheritDoc} */ 169 @Override 170 public int available() throws IOException { 171 if (inUncompressedChunk) { 172 return Math.min(uncompressedBytesRemaining, 173 inputStream.available()); 174 } 175 if (currentCompressedChunk != null) { 176 return currentCompressedChunk.available(); 177 } 178 return 0; 179 } 180 181 /** {@inheritDoc} */ 182 @Override 183 public void close() throws IOException { 184 try { 185 if (currentCompressedChunk != null) { 186 currentCompressedChunk.close(); 187 currentCompressedChunk = null; 188 } 189 } finally { 190 inputStream.close(); 191 } 192 } 193 194 /** 195 * @since 1.17 196 */ 197 @Override 198 public long getCompressedCount() { 199 return countingStream.getBytesRead() - unreadBytes; 200 } 201 202 /** {@inheritDoc} */ 203 @Override 204 public int read() throws IOException { 205 return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF; 206 } 207 208 /** {@inheritDoc} */ 209 @Override 210 public int read(final byte[] b, final int off, final int len) throws IOException { 211 if (len == 0) { 212 return 0; 213 } 214 int read = readOnce(b, off, len); 215 if (read == -1) { 216 readNextBlock(); 217 if (endReached) { 218 return -1; 219 } 220 read = readOnce(b, off, len); 221 } 222 return read; 223 } 224 225 private long readCrc() throws IOException { 226 final byte[] b = new byte[4]; 227 final int read = IOUtils.readFully(inputStream, b); 228 count(read); 229 if (read != 4) { 230 throw new IOException("Premature end of stream"); 231 } 232 return ByteUtils.fromLittleEndian(b); 233 } 234 235 private void readNextBlock() throws IOException { 236 verifyLastChecksumAndReset(); 237 inUncompressedChunk = false; 238 final int type = readOneByte(); 239 if (type == -1) { 240 endReached = true; 241 } else if (type == STREAM_IDENTIFIER_TYPE) { 242 inputStream.unread(type); 243 unreadBytes++; 244 pushedBackBytes(1); 245 readStreamIdentifier(); 246 readNextBlock(); 247 } else if (type == PADDING_CHUNK_TYPE 248 || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) { 249 skipBlock(); 250 readNextBlock(); 251 } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) { 252 throw new IOException("Unskippable chunk with type " + type 253 + " (hex " + Integer.toHexString(type) + ")" 254 + " detected."); 255 } else if (type == UNCOMPRESSED_CHUNK_TYPE) { 256 inUncompressedChunk = true; 257 uncompressedBytesRemaining = readSize() - 4 /* CRC */; 258 if (uncompressedBytesRemaining < 0) { 259 throw new IOException("Found illegal chunk with negative size"); 260 } 261 expectedChecksum = unmask(readCrc()); 262 } else if (type == COMPRESSED_CHUNK_TYPE) { 263 final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks(); 264 final long size = readSize() - (expectChecksum ? 4L : 0L); 265 if (size < 0) { 266 throw new IOException("Found illegal chunk with negative size"); 267 } 268 if (expectChecksum) { 269 expectedChecksum = unmask(readCrc()); 270 } else { 271 expectedChecksum = -1; 272 } 273 currentCompressedChunk = 274 new SnappyCompressorInputStream(new BoundedInputStream(inputStream, size), blockSize); 275 // constructor reads uncompressed size 276 count(currentCompressedChunk.getBytesRead()); 277 } else { 278 // impossible as all potential byte values have been covered 279 throw new IOException("Unknown chunk type " + type 280 + " detected."); 281 } 282 } 283 284 /** 285 * Read from the current chunk into the given array. 286 * 287 * @return -1 if there is no current chunk or the number of bytes 288 * read from the current chunk (which may be -1 if the end of the 289 * chunk is reached). 290 */ 291 private int readOnce(final byte[] b, final int off, final int len) throws IOException { 292 int read = -1; 293 if (inUncompressedChunk) { 294 final int amount = Math.min(uncompressedBytesRemaining, len); 295 if (amount == 0) { 296 return -1; 297 } 298 read = inputStream.read(b, off, amount); 299 if (read != -1) { 300 uncompressedBytesRemaining -= read; 301 count(read); 302 } 303 } else if (currentCompressedChunk != null) { 304 final long before = currentCompressedChunk.getBytesRead(); 305 read = currentCompressedChunk.read(b, off, len); 306 if (read == -1) { 307 currentCompressedChunk.close(); 308 currentCompressedChunk = null; 309 } else { 310 count(currentCompressedChunk.getBytesRead() - before); 311 } 312 } 313 if (read > 0) { 314 checksum.update(b, off, read); 315 } 316 return read; 317 } 318 319 private int readOneByte() throws IOException { 320 final int b = inputStream.read(); 321 if (b != -1) { 322 count(1); 323 return b & 0xFF; 324 } 325 return -1; 326 } 327 328 private int readSize() throws IOException { 329 return (int) ByteUtils.fromLittleEndian(supplier, 3); 330 } 331 332 private void readStreamIdentifier() throws IOException { 333 final byte[] b = new byte[10]; 334 final int read = IOUtils.readFully(inputStream, b); 335 count(read); 336 if (10 != read || !matches(b, 10)) { 337 throw new IOException("Not a framed Snappy stream"); 338 } 339 } 340 341 private void skipBlock() throws IOException { 342 final int size = readSize(); 343 if (size < 0) { 344 throw new IOException("Found illegal chunk with negative size"); 345 } 346 final long read = IOUtils.skip(inputStream, size); 347 count(read); 348 if (read != size) { 349 throw new IOException("Premature end of stream"); 350 } 351 } 352 353 private void verifyLastChecksumAndReset() throws IOException { 354 if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) { 355 throw new IOException("Checksum verification failed"); 356 } 357 expectedChecksum = -1; 358 checksum.reset(); 359 } 360 361}