001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.OutputStream; 023import java.lang.reflect.Constructor; 024import java.lang.reflect.InvocationTargetException; 025import java.nio.ByteBuffer; 026import org.apache.hadoop.hbase.Tag; 027import org.apache.hadoop.hbase.io.util.Dictionary; 028import org.apache.hadoop.hbase.io.util.StreamUtils; 029import org.apache.hadoop.hbase.nio.ByteBuff; 030import org.apache.hadoop.hbase.util.ByteBufferUtils; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.io.IOUtils; 033import org.apache.yetus.audience.InterfaceAudience; 034 035/** 036 * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This 037 * will be used for compressing tags while writing into HFiles and WALs. 038 */ 039@InterfaceAudience.Private 040public class TagCompressionContext { 041 private final Dictionary tagDict; 042 043 public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity) 044 throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException, 045 InvocationTargetException { 046 Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor(); 047 tagDict = dictConstructor.newInstance(); 048 tagDict.init(dictCapacity); 049 } 050 051 public void clear() { 052 tagDict.clear(); 053 } 054 055 /** 056 * Compress tags one by one and writes to the OutputStream. 057 * @param out Stream to which the compressed tags to be written 058 * @param in Source where tags are available 059 * @param offset Offset for the tags bytes 060 * @param length Length of all tag bytes 061 */ 062 public void compressTags(OutputStream out, byte[] in, int offset, int length) throws IOException { 063 int pos = offset; 064 int endOffset = pos + length; 065 assert pos < endOffset; 066 while (pos < endOffset) { 067 int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE); 068 pos += Tag.TAG_LENGTH_SIZE; 069 Dictionary.write(out, in, pos, tagLen, tagDict); 070 pos += tagLen; 071 } 072 } 073 074 /** 075 * Compress tags one by one and writes to the OutputStream. 076 * @param out Stream to which the compressed tags to be written 077 * @param in Source buffer where tags are available 078 * @param offset Offset for the tags byte buffer 079 * @param length Length of all tag bytes 080 */ 081 public void compressTags(OutputStream out, ByteBuffer in, int offset, int length) 082 throws IOException { 083 if (in.hasArray()) { 084 // Offset we are given is relative to ByteBuffer#arrayOffset 085 compressTags(out, in.array(), in.arrayOffset() + offset, length); 086 } else { 087 int pos = offset; 088 int endOffset = pos + length; 089 assert pos < endOffset; 090 while (pos < endOffset) { 091 int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE); 092 pos += Tag.TAG_LENGTH_SIZE; 093 Dictionary.write(out, in, pos, tagLen, tagDict); 094 pos += tagLen; 095 } 096 } 097 } 098 099 /** 100 * Uncompress tags from the InputStream and writes to the destination array. 101 * @param src Stream where the compressed tags are available 102 * @param dest Destination array where to write the uncompressed tags 103 * @param offset Offset in destination where tags to be written 104 * @param length Length of all tag bytes 105 */ 106 public void uncompressTags(InputStream src, byte[] dest, int offset, int length) 107 throws IOException { 108 int endOffset = offset + length; 109 while (offset < endOffset) { 110 byte status = StreamUtils.readByte(src); 111 if (status == Dictionary.NOT_IN_DICTIONARY) { 112 int tagLen = StreamUtils.readRawVarint32(src); 113 offset = Bytes.putAsShort(dest, offset, tagLen); 114 IOUtils.readFully(src, dest, offset, tagLen); 115 tagDict.addEntry(dest, offset, tagLen); 116 offset += tagLen; 117 } else { 118 short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src)); 119 byte[] entry = tagDict.getEntry(dictIdx); 120 if (entry == null) { 121 throw new IOException("Missing dictionary entry for index " + dictIdx); 122 } 123 offset = Bytes.putAsShort(dest, offset, entry.length); 124 System.arraycopy(entry, 0, dest, offset, entry.length); 125 offset += entry.length; 126 } 127 } 128 } 129 130 /** 131 * Uncompress tags from the input ByteBuffer and writes to the destination array. 132 * @param src Buffer where the compressed tags are available 133 * @param dest Destination array where to write the uncompressed tags 134 * @param offset Offset in destination where tags to be written 135 * @param length Length of all tag bytes 136 * @return bytes count read from source to uncompress all tags. 137 */ 138 public int uncompressTags(ByteBuff src, byte[] dest, int offset, int length) throws IOException { 139 int srcBeginPos = src.position(); 140 int endOffset = offset + length; 141 while (offset < endOffset) { 142 byte status = src.get(); 143 int tagLen; 144 if (status == Dictionary.NOT_IN_DICTIONARY) { 145 tagLen = StreamUtils.readRawVarint32(src); 146 offset = Bytes.putAsShort(dest, offset, tagLen); 147 src.get(dest, offset, tagLen); 148 tagDict.addEntry(dest, offset, tagLen); 149 offset += tagLen; 150 } else { 151 short dictIdx = StreamUtils.toShort(status, src.get()); 152 byte[] entry = tagDict.getEntry(dictIdx); 153 if (entry == null) { 154 throw new IOException("Missing dictionary entry for index " + dictIdx); 155 } 156 tagLen = entry.length; 157 offset = Bytes.putAsShort(dest, offset, tagLen); 158 System.arraycopy(entry, 0, dest, offset, tagLen); 159 offset += tagLen; 160 } 161 } 162 return src.position() - srcBeginPos; 163 } 164 165 /** 166 * Uncompress tags from the InputStream and writes to the destination buffer. 167 * @param src Stream where the compressed tags are available 168 * @param dest Destination buffer where to write the uncompressed tags 169 * @param length Length of all tag bytes 170 * @throws IOException when the dictionary does not have the entry 171 */ 172 public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException { 173 if (dest.hasArray()) { 174 uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length); 175 } else { 176 byte[] tagBuf = new byte[length]; 177 uncompressTags(src, tagBuf, 0, length); 178 dest.put(tagBuf); 179 } 180 } 181}