001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.io.util.Dictionary; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.wal.WAL; 031import org.apache.hadoop.hbase.wal.WALFactory; 032import org.apache.hadoop.hbase.wal.WALProvider; 033import org.apache.hadoop.io.WritableUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035 036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 037 038/** 039 * A set of static functions for running our custom WAL compression/decompression. Also contains a 040 * command line tool to compress and uncompress WALs. 041 */ 042@InterfaceAudience.Private 043public class Compressor { 044 /** 045 * Command line tool to compress and uncompress WALs. 046 */ 047 public static void main(String[] args) throws IOException { 048 if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) { 049 printHelp(); 050 System.exit(-1); 051 } 052 053 Path inputPath = new Path(args[0]); 054 Path outputPath = new Path(args[1]); 055 056 transformFile(inputPath, outputPath); 057 } 058 059 private static void printHelp() { 060 System.err.println("usage: Compressor <input> <output>"); 061 System.err.println("If <input> WAL is compressed, <output> will be decompressed."); 062 System.err.println("If <input> WAL is uncompressed, <output> will be compressed."); 063 return; 064 } 065 066 private static void transformFile(Path input, Path output) throws IOException { 067 Configuration conf = HBaseConfiguration.create(); 068 069 FileSystem inFS = input.getFileSystem(conf); 070 FileSystem outFS = output.getFileSystem(conf); 071 072 WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf); 073 WALProvider.Writer out = null; 074 075 try { 076 if (!(in instanceof ReaderBase)) { 077 System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName()); 078 return; 079 } 080 boolean compress = ((ReaderBase) in).hasCompression(); 081 conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); 082 out = WALFactory.createWALWriter(outFS, output, conf); 083 084 WAL.Entry e = null; 085 while ((e = in.next()) != null) 086 out.append(e); 087 } finally { 088 in.close(); 089 if (out != null) { 090 out.close(); 091 out = null; 092 } 093 } 094 } 095 096 /** 097 * Reads the next compressed entry and returns it as a byte array 098 * @param in the DataInput to read from 099 * @param dict the dictionary we use for our read. 100 * @return the uncompressed array. 101 */ 102 @Deprecated 103 static byte[] readCompressed(DataInput in, Dictionary dict) throws IOException { 104 byte status = in.readByte(); 105 106 if (status == Dictionary.NOT_IN_DICTIONARY) { 107 int length = WritableUtils.readVInt(in); 108 // if this isn't in the dictionary, we need to add to the dictionary. 109 byte[] arr = new byte[length]; 110 in.readFully(arr); 111 if (dict != null) dict.addEntry(arr, 0, length); 112 return arr; 113 } else { 114 // Status here is the higher-order byte of index of the dictionary entry 115 // (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are 116 // shorts). 117 short dictIdx = toShort(status, in.readByte()); 118 byte[] entry = dict.getEntry(dictIdx); 119 if (entry == null) { 120 throw new IOException("Missing dictionary entry for index " + dictIdx); 121 } 122 return entry; 123 } 124 } 125 126 /** 127 * Reads a compressed entry into an array. The output into the array ends up length-prefixed. 128 * @param to the array to write into 129 * @param offset array offset to start writing to 130 * @param in the DataInput to read from 131 * @param dict the dictionary to use for compression 132 * @return the length of the uncompressed data 133 */ 134 @Deprecated 135 static int uncompressIntoArray(byte[] to, int offset, DataInput in, Dictionary dict) 136 throws IOException { 137 byte status = in.readByte(); 138 139 if (status == Dictionary.NOT_IN_DICTIONARY) { 140 // status byte indicating that data to be read is not in dictionary. 141 // if this isn't in the dictionary, we need to add to the dictionary. 142 int length = WritableUtils.readVInt(in); 143 in.readFully(to, offset, length); 144 dict.addEntry(to, offset, length); 145 return length; 146 } else { 147 // the status byte also acts as the higher order byte of the dictionary 148 // entry 149 short dictIdx = toShort(status, in.readByte()); 150 byte[] entry; 151 try { 152 entry = dict.getEntry(dictIdx); 153 } catch (Exception ex) { 154 throw new IOException("Unable to uncompress the log entry", ex); 155 } 156 if (entry == null) { 157 throw new IOException("Missing dictionary entry for index " + dictIdx); 158 } 159 // now we write the uncompressed value. 160 Bytes.putBytes(to, offset, entry, 0, entry.length); 161 return entry.length; 162 } 163 } 164 165 /** 166 * Compresses and writes an array to a DataOutput 167 * @param data the array to write. 168 * @param out the DataOutput to write into 169 * @param dict the dictionary to use for compression 170 */ 171 @Deprecated 172 static void writeCompressed(byte[] data, int offset, int length, DataOutput out, Dictionary dict) 173 throws IOException { 174 short dictIdx = Dictionary.NOT_IN_DICTIONARY; 175 if (dict != null) { 176 dictIdx = dict.findEntry(data, offset, length); 177 } 178 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { 179 // not in dict 180 out.writeByte(Dictionary.NOT_IN_DICTIONARY); 181 WritableUtils.writeVInt(out, length); 182 out.write(data, offset, length); 183 } else { 184 out.writeShort(dictIdx); 185 } 186 } 187 188 static short toShort(byte hi, byte lo) { 189 short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF)); 190 Preconditions.checkArgument(s >= 0); 191 return s; 192 } 193}