View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License
17   */
18  
19  package org.apache.hadoop.hbase.regionserver.wal;
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.io.util.Dictionary;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.io.WritableUtils;
33  
34  import com.google.common.base.Preconditions;
35  
36  import org.apache.hadoop.hbase.wal.WAL;
37  import org.apache.hadoop.hbase.wal.WALFactory;
38  import org.apache.hadoop.hbase.wal.WALProvider;
39  
40  /**
41   * A set of static functions for running our custom WAL compression/decompression.
42   * Also contains a command line tool to compress and uncompress WALs.
43   */
44  @InterfaceAudience.Private
45  public class Compressor {
46    /**
47     * Command line tool to compress and uncompress WALs.
48     */
49    public static void main(String[] args) throws IOException {
50      if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) {
51        printHelp();
52        System.exit(-1);
53      }
54  
55      Path inputPath = new Path(args[0]);
56      Path outputPath = new Path(args[1]);
57  
58      transformFile(inputPath, outputPath);
59    }
60  
61    private static void printHelp() {
62      System.err.println("usage: Compressor <input> <output>");
63      System.err.println("If <input> WAL is compressed, <output> will be decompressed.");
64      System.err.println("If <input> WAL is uncompressed, <output> will be compressed.");
65      return;
66    }
67  
68    private static void transformFile(Path input, Path output)
69        throws IOException {
70      Configuration conf = HBaseConfiguration.create();
71  
72      FileSystem inFS = input.getFileSystem(conf);
73      FileSystem outFS = output.getFileSystem(conf);
74  
75      WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf);
76      WALProvider.Writer out = null;
77  
78      try {
79        if (!(in instanceof ReaderBase)) {
80          System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName());
81          return;
82        }
83        boolean compress = ((ReaderBase)in).hasCompression();
84        conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
85        out = WALFactory.createWALWriter(outFS, output, conf);
86  
87        WAL.Entry e = null;
88        while ((e = in.next()) != null) out.append(e);
89      } finally {
90        in.close();
91        if (out != null) {
92          out.close();
93          out = null;
94        }
95      }
96    }
97  
98    /**
99     * Reads the next compressed entry and returns it as a byte array
100    * 
101    * @param in the DataInput to read from
102    * @param dict the dictionary we use for our read.
103    * @return the uncompressed array.
104    */
105   @Deprecated
106   static byte[] readCompressed(DataInput in, Dictionary dict)
107       throws IOException {
108     byte status = in.readByte();
109 
110     if (status == Dictionary.NOT_IN_DICTIONARY) {
111       int length = WritableUtils.readVInt(in);
112       // if this isn't in the dictionary, we need to add to the dictionary.
113       byte[] arr = new byte[length];
114       in.readFully(arr);
115       if (dict != null) dict.addEntry(arr, 0, length);
116       return arr;
117     } else {
118       // Status here is the higher-order byte of index of the dictionary entry
119       // (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are
120       // shorts).
121       short dictIdx = toShort(status, in.readByte());
122       byte[] entry = dict.getEntry(dictIdx);
123       if (entry == null) {
124         throw new IOException("Missing dictionary entry for index "
125             + dictIdx);
126       }
127       return entry;
128     }
129   }
130 
131   /**
132    * Reads a compressed entry into an array.
133    * The output into the array ends up length-prefixed.
134    * 
135    * @param to the array to write into
136    * @param offset array offset to start writing to
137    * @param in the DataInput to read from
138    * @param dict the dictionary to use for compression
139    * 
140    * @return the length of the uncompressed data
141    */
142   @Deprecated
143   static int uncompressIntoArray(byte[] to, int offset, DataInput in,
144       Dictionary dict) throws IOException {
145     byte status = in.readByte();
146 
147     if (status == Dictionary.NOT_IN_DICTIONARY) {
148       // status byte indicating that data to be read is not in dictionary.
149       // if this isn't in the dictionary, we need to add to the dictionary.
150       int length = WritableUtils.readVInt(in);
151       in.readFully(to, offset, length);
152       dict.addEntry(to, offset, length);
153       return length;
154     } else {
155       // the status byte also acts as the higher order byte of the dictionary
156       // entry
157       short dictIdx = toShort(status, in.readByte());
158       byte[] entry;
159       try {
160         entry = dict.getEntry(dictIdx);
161       } catch (Exception ex) {
162         throw new IOException("Unable to uncompress the log entry", ex);
163       }
164       if (entry == null) {
165         throw new IOException("Missing dictionary entry for index "
166             + dictIdx);
167       }
168       // now we write the uncompressed value.
169       Bytes.putBytes(to, offset, entry, 0, entry.length);
170       return entry.length;
171     }
172   }
173 
174   /**
175    * Compresses and writes an array to a DataOutput
176    * 
177    * @param data the array to write.
178    * @param out the DataOutput to write into
179    * @param dict the dictionary to use for compression
180    */
181   @Deprecated
182   static void writeCompressed(byte[] data, int offset, int length,
183       DataOutput out, Dictionary dict)
184       throws IOException {
185     short dictIdx = Dictionary.NOT_IN_DICTIONARY;
186     if (dict != null) {
187       dictIdx = dict.findEntry(data, offset, length);
188     }
189     if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
190       // not in dict
191       out.writeByte(Dictionary.NOT_IN_DICTIONARY);
192       WritableUtils.writeVInt(out, length);
193       out.write(data, offset, length);
194     } else {
195       out.writeShort(dictIdx);
196     }
197   }
198 
199   static short toShort(byte hi, byte lo) {
200     short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
201     Preconditions.checkArgument(s >= 0);
202     return s;
203   }
204 }