001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License 017 */ 018 019package org.apache.hadoop.hbase.regionserver.wal; 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.io.util.Dictionary; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.io.WritableUtils; 033 034import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 035 036import org.apache.hadoop.hbase.wal.WAL; 037import org.apache.hadoop.hbase.wal.WALFactory; 038import org.apache.hadoop.hbase.wal.WALProvider; 039 040/** 041 * A set of static functions for running our custom WAL compression/decompression. 042 * Also contains a command line tool to compress and uncompress WALs. 043 */ 044@InterfaceAudience.Private 045public class Compressor { 046 /** 047 * Command line tool to compress and uncompress WALs. 048 */ 049 public static void main(String[] args) throws IOException { 050 if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) { 051 printHelp(); 052 System.exit(-1); 053 } 054 055 Path inputPath = new Path(args[0]); 056 Path outputPath = new Path(args[1]); 057 058 transformFile(inputPath, outputPath); 059 } 060 061 private static void printHelp() { 062 System.err.println("usage: Compressor <input> <output>"); 063 System.err.println("If <input> WAL is compressed, <output> will be decompressed."); 064 System.err.println("If <input> WAL is uncompressed, <output> will be compressed."); 065 return; 066 } 067 068 private static void transformFile(Path input, Path output) 069 throws IOException { 070 Configuration conf = HBaseConfiguration.create(); 071 072 FileSystem inFS = input.getFileSystem(conf); 073 FileSystem outFS = output.getFileSystem(conf); 074 075 WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf); 076 WALProvider.Writer out = null; 077 078 try { 079 if (!(in instanceof ReaderBase)) { 080 System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName()); 081 return; 082 } 083 boolean compress = ((ReaderBase)in).hasCompression(); 084 conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); 085 out = WALFactory.createWALWriter(outFS, output, conf); 086 087 WAL.Entry e = null; 088 while ((e = in.next()) != null) out.append(e); 089 } finally { 090 in.close(); 091 if (out != null) { 092 out.close(); 093 out = null; 094 } 095 } 096 } 097 098 /** 099 * Reads the next compressed entry and returns it as a byte array 100 * 101 * @param in the DataInput to read from 102 * @param dict the dictionary we use for our read. 103 * @return the uncompressed array. 104 */ 105 @Deprecated 106 static byte[] readCompressed(DataInput in, Dictionary dict) 107 throws IOException { 108 byte status = in.readByte(); 109 110 if (status == Dictionary.NOT_IN_DICTIONARY) { 111 int length = WritableUtils.readVInt(in); 112 // if this isn't in the dictionary, we need to add to the dictionary. 113 byte[] arr = new byte[length]; 114 in.readFully(arr); 115 if (dict != null) dict.addEntry(arr, 0, length); 116 return arr; 117 } else { 118 // Status here is the higher-order byte of index of the dictionary entry 119 // (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are 120 // shorts). 121 short dictIdx = toShort(status, in.readByte()); 122 byte[] entry = dict.getEntry(dictIdx); 123 if (entry == null) { 124 throw new IOException("Missing dictionary entry for index " 125 + dictIdx); 126 } 127 return entry; 128 } 129 } 130 131 /** 132 * Reads a compressed entry into an array. 133 * The output into the array ends up length-prefixed. 134 * 135 * @param to the array to write into 136 * @param offset array offset to start writing to 137 * @param in the DataInput to read from 138 * @param dict the dictionary to use for compression 139 * 140 * @return the length of the uncompressed data 141 */ 142 @Deprecated 143 static int uncompressIntoArray(byte[] to, int offset, DataInput in, 144 Dictionary dict) throws IOException { 145 byte status = in.readByte(); 146 147 if (status == Dictionary.NOT_IN_DICTIONARY) { 148 // status byte indicating that data to be read is not in dictionary. 149 // if this isn't in the dictionary, we need to add to the dictionary. 150 int length = WritableUtils.readVInt(in); 151 in.readFully(to, offset, length); 152 dict.addEntry(to, offset, length); 153 return length; 154 } else { 155 // the status byte also acts as the higher order byte of the dictionary 156 // entry 157 short dictIdx = toShort(status, in.readByte()); 158 byte[] entry; 159 try { 160 entry = dict.getEntry(dictIdx); 161 } catch (Exception ex) { 162 throw new IOException("Unable to uncompress the log entry", ex); 163 } 164 if (entry == null) { 165 throw new IOException("Missing dictionary entry for index " 166 + dictIdx); 167 } 168 // now we write the uncompressed value. 169 Bytes.putBytes(to, offset, entry, 0, entry.length); 170 return entry.length; 171 } 172 } 173 174 /** 175 * Compresses and writes an array to a DataOutput 176 * 177 * @param data the array to write. 178 * @param out the DataOutput to write into 179 * @param dict the dictionary to use for compression 180 */ 181 @Deprecated 182 static void writeCompressed(byte[] data, int offset, int length, 183 DataOutput out, Dictionary dict) 184 throws IOException { 185 short dictIdx = Dictionary.NOT_IN_DICTIONARY; 186 if (dict != null) { 187 dictIdx = dict.findEntry(data, offset, length); 188 } 189 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { 190 // not in dict 191 out.writeByte(Dictionary.NOT_IN_DICTIONARY); 192 WritableUtils.writeVInt(out, length); 193 out.write(data, offset, length); 194 } else { 195 out.writeShort(dictIdx); 196 } 197 } 198 199 static short toShort(byte hi, byte lo) { 200 short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF)); 201 Preconditions.checkArgument(s >= 0); 202 return s; 203 } 204}