001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.io.util.Dictionary; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.wal.WAL; 031import org.apache.hadoop.hbase.wal.WALFactory; 032import org.apache.hadoop.hbase.wal.WALProvider; 033import org.apache.hadoop.hbase.wal.WALStreamReader; 034import org.apache.hadoop.io.WritableUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036 037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 038 039/** 040 * A set of static functions for running our custom WAL compression/decompression. Also contains a 041 * command line tool to compress and uncompress WALs. 042 */ 043@InterfaceAudience.Private 044public class Compressor { 045 /** 046 * Command line tool to compress and uncompress WALs. 047 */ 048 public static void main(String[] args) throws IOException { 049 if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) { 050 printHelp(); 051 System.exit(-1); 052 } 053 054 Path inputPath = new Path(args[0]); 055 Path outputPath = new Path(args[1]); 056 057 transformFile(inputPath, outputPath); 058 } 059 060 private static void printHelp() { 061 System.err.println("usage: Compressor <input> <output>"); 062 System.err.println("If <input> WAL is compressed, <output> will be decompressed."); 063 System.err.println("If <input> WAL is uncompressed, <output> will be compressed."); 064 return; 065 } 066 067 private static void transformFile(Path input, Path output) throws IOException { 068 Configuration conf = HBaseConfiguration.create(); 069 070 FileSystem inFS = input.getFileSystem(conf); 071 FileSystem outFS = output.getFileSystem(conf); 072 073 WALStreamReader in = WALFactory.createStreamReader(inFS, input, conf); 074 WALProvider.Writer out = null; 075 076 try { 077 if (!(in instanceof AbstractProtobufWALReader)) { 078 System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName()); 079 return; 080 } 081 boolean compress = ((AbstractProtobufWALReader) in).hasCompression; 082 conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); 083 out = WALFactory.createWALWriter(outFS, output, conf); 084 085 WAL.Entry e = null; 086 while ((e = in.next()) != null) { 087 out.append(e); 088 } 089 } finally { 090 in.close(); 091 if (out != null) { 092 out.close(); 093 out = null; 094 } 095 } 096 } 097 098 /** 099 * Reads the next compressed entry and returns it as a byte array 100 * @param in the DataInput to read from 101 * @param dict the dictionary we use for our read. 102 * @return the uncompressed array. 103 */ 104 @Deprecated 105 static byte[] readCompressed(DataInput in, Dictionary dict) throws IOException { 106 byte status = in.readByte(); 107 108 if (status == Dictionary.NOT_IN_DICTIONARY) { 109 int length = WritableUtils.readVInt(in); 110 // if this isn't in the dictionary, we need to add to the dictionary. 111 byte[] arr = new byte[length]; 112 in.readFully(arr); 113 if (dict != null) { 114 dict.addEntry(arr, 0, length); 115 } 116 return arr; 117 } else { 118 // Status here is the higher-order byte of index of the dictionary entry 119 // (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are 120 // shorts). 121 short dictIdx = toShort(status, in.readByte()); 122 byte[] entry = dict.getEntry(dictIdx); 123 if (entry == null) { 124 throw new IOException("Missing dictionary entry for index " + dictIdx); 125 } 126 return entry; 127 } 128 } 129 130 /** 131 * Reads a compressed entry into an array. The output into the array ends up length-prefixed. 132 * @param to the array to write into 133 * @param offset array offset to start writing to 134 * @param in the DataInput to read from 135 * @param dict the dictionary to use for compression 136 * @return the length of the uncompressed data 137 */ 138 @Deprecated 139 static int uncompressIntoArray(byte[] to, int offset, DataInput in, Dictionary dict) 140 throws IOException { 141 byte status = in.readByte(); 142 143 if (status == Dictionary.NOT_IN_DICTIONARY) { 144 // status byte indicating that data to be read is not in dictionary. 145 // if this isn't in the dictionary, we need to add to the dictionary. 146 int length = WritableUtils.readVInt(in); 147 in.readFully(to, offset, length); 148 dict.addEntry(to, offset, length); 149 return length; 150 } else { 151 // the status byte also acts as the higher order byte of the dictionary 152 // entry 153 short dictIdx = toShort(status, in.readByte()); 154 byte[] entry; 155 try { 156 entry = dict.getEntry(dictIdx); 157 } catch (Exception ex) { 158 throw new IOException("Unable to uncompress the log entry", ex); 159 } 160 if (entry == null) { 161 throw new IOException("Missing dictionary entry for index " + dictIdx); 162 } 163 // now we write the uncompressed value. 164 Bytes.putBytes(to, offset, entry, 0, entry.length); 165 return entry.length; 166 } 167 } 168 169 /** 170 * Compresses and writes an array to a DataOutput 171 * @param data the array to write. 172 * @param out the DataOutput to write into 173 * @param dict the dictionary to use for compression 174 */ 175 @Deprecated 176 static void writeCompressed(byte[] data, int offset, int length, DataOutput out, Dictionary dict) 177 throws IOException { 178 short dictIdx = Dictionary.NOT_IN_DICTIONARY; 179 if (dict != null) { 180 dictIdx = dict.findEntry(data, offset, length); 181 } 182 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { 183 // not in dict 184 out.writeByte(Dictionary.NOT_IN_DICTIONARY); 185 WritableUtils.writeVInt(out, length); 186 out.write(data, offset, length); 187 } else { 188 out.writeShort(dictIdx); 189 } 190 } 191 192 static short toShort(byte hi, byte lo) { 193 short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF)); 194 Preconditions.checkArgument(s >= 0); 195 return s; 196 } 197}