001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.lang.reflect.Constructor; 025import java.lang.reflect.InvocationTargetException; 026import java.nio.ByteBuffer; 027 028import org.apache.hadoop.hbase.Tag; 029import org.apache.hadoop.hbase.io.util.Dictionary; 030import org.apache.hadoop.hbase.io.util.StreamUtils; 031import org.apache.hadoop.hbase.nio.ByteBuff; 032import org.apache.hadoop.hbase.util.ByteBufferUtils; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.io.IOUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036 037/** 038 * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This 039 * will be used for compressing tags while writing into HFiles and WALs. 040 */ 041@InterfaceAudience.Private 042public class TagCompressionContext { 043 private final Dictionary tagDict; 044 045 public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity) 046 throws SecurityException, NoSuchMethodException, InstantiationException, 047 IllegalAccessException, InvocationTargetException { 048 Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor(); 049 tagDict = dictConstructor.newInstance(); 050 tagDict.init(dictCapacity); 051 } 052 053 public void clear() { 054 tagDict.clear(); 055 } 056 057 /** 058 * Compress tags one by one and writes to the OutputStream. 059 * @param out Stream to which the compressed tags to be written 060 * @param in Source where tags are available 061 * @param offset Offset for the tags bytes 062 * @param length Length of all tag bytes 063 * @throws IOException 064 */ 065 public void compressTags(OutputStream out, byte[] in, int offset, int length) 066 throws IOException { 067 int pos = offset; 068 int endOffset = pos + length; 069 assert pos < endOffset; 070 while (pos < endOffset) { 071 int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE); 072 pos += Tag.TAG_LENGTH_SIZE; 073 Dictionary.write(out, in, pos, tagLen, tagDict); 074 pos += tagLen; 075 } 076 } 077 078 /** 079 * Compress tags one by one and writes to the OutputStream. 080 * @param out Stream to which the compressed tags to be written 081 * @param in Source buffer where tags are available 082 * @param offset Offset for the tags byte buffer 083 * @param length Length of all tag bytes 084 * @throws IOException 085 */ 086 public void compressTags(OutputStream out, ByteBuffer in, int offset, int length) 087 throws IOException { 088 if (in.hasArray()) { 089 compressTags(out, in.array(), offset, length); 090 } else { 091 int pos = offset; 092 int endOffset = pos + length; 093 assert pos < endOffset; 094 while (pos < endOffset) { 095 int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE); 096 pos += Tag.TAG_LENGTH_SIZE; 097 Dictionary.write(out, in, pos, tagLen, tagDict); 098 pos += tagLen; 099 } 100 } 101 } 102 103 /** 104 * Uncompress tags from the InputStream and writes to the destination array. 105 * @param src Stream where the compressed tags are available 106 * @param dest Destination array where to write the uncompressed tags 107 * @param offset Offset in destination where tags to be written 108 * @param length Length of all tag bytes 109 * @throws IOException 110 */ 111 public void uncompressTags(InputStream src, byte[] dest, int offset, int length) 112 throws IOException { 113 int endOffset = offset + length; 114 while (offset < endOffset) { 115 byte status = (byte) src.read(); 116 if (status == Dictionary.NOT_IN_DICTIONARY) { 117 int tagLen = StreamUtils.readRawVarint32(src); 118 offset = Bytes.putAsShort(dest, offset, tagLen); 119 IOUtils.readFully(src, dest, offset, tagLen); 120 tagDict.addEntry(dest, offset, tagLen); 121 offset += tagLen; 122 } else { 123 short dictIdx = StreamUtils.toShort(status, (byte) src.read()); 124 byte[] entry = tagDict.getEntry(dictIdx); 125 if (entry == null) { 126 throw new IOException("Missing dictionary entry for index " + dictIdx); 127 } 128 offset = Bytes.putAsShort(dest, offset, entry.length); 129 System.arraycopy(entry, 0, dest, offset, entry.length); 130 offset += entry.length; 131 } 132 } 133 } 134 135 /** 136 * Uncompress tags from the input ByteBuffer and writes to the destination array. 137 * @param src Buffer where the compressed tags are available 138 * @param dest Destination array where to write the uncompressed tags 139 * @param offset Offset in destination where tags to be written 140 * @param length Length of all tag bytes 141 * @return bytes count read from source to uncompress all tags. 142 * @throws IOException 143 */ 144 public int uncompressTags(ByteBuff src, byte[] dest, int offset, int length) 145 throws IOException { 146 int srcBeginPos = src.position(); 147 int endOffset = offset + length; 148 while (offset < endOffset) { 149 byte status = src.get(); 150 int tagLen; 151 if (status == Dictionary.NOT_IN_DICTIONARY) { 152 tagLen = StreamUtils.readRawVarint32(src); 153 offset = Bytes.putAsShort(dest, offset, tagLen); 154 src.get(dest, offset, tagLen); 155 tagDict.addEntry(dest, offset, tagLen); 156 offset += tagLen; 157 } else { 158 short dictIdx = StreamUtils.toShort(status, src.get()); 159 byte[] entry = tagDict.getEntry(dictIdx); 160 if (entry == null) { 161 throw new IOException("Missing dictionary entry for index " + dictIdx); 162 } 163 tagLen = entry.length; 164 offset = Bytes.putAsShort(dest, offset, tagLen); 165 System.arraycopy(entry, 0, dest, offset, tagLen); 166 offset += tagLen; 167 } 168 } 169 return src.position() - srcBeginPos; 170 } 171 172 /** 173 * Uncompress tags from the InputStream and writes to the destination buffer. 174 * @param src Stream where the compressed tags are available 175 * @param dest Destination buffer where to write the uncompressed tags 176 * @param length Length of all tag bytes 177 * @throws IOException when the dictionary does not have the entry 178 */ 179 public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException { 180 if (dest.hasArray()) { 181 uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length); 182 } else { 183 byte[] tagBuf = new byte[length]; 184 uncompressTags(src, tagBuf, 0, length); 185 dest.put(tagBuf); 186 } 187 } 188}