View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License
17   */
18  
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  
25  import org.apache.hadoop.hbase.HBaseConfiguration;
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
28  import org.apache.hadoop.hbase.util.Bytes;
29  import org.apache.hadoop.io.WritableUtils;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import com.google.common.base.Preconditions;
34  
35  /**
36   * A set of static functions for running our custom WAL compression/decompression.
37   * Also contains a command line tool to compress and uncompress HLogs.
38   */
39  public class Compressor {
40    /**
41     * Command line tool to compress and uncompress WALs.
42     */
43    public static void main(String[] args) throws IOException {
44      if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) {
45        printHelp();
46        System.exit(-1);
47      }
48  
49      Path inputPath = new Path(args[0]);
50      Path outputPath = new Path(args[1]);
51  
52      transformFile(inputPath, outputPath);
53    }
54  
55    private static void printHelp() {
56      System.err.println("usage: Compressor <input> <output>");
57      System.err.println("If <input> HLog is compressed, <output> will be decompressed.");
58      System.err.println("If <input> HLog is uncompressed, <output> will be compressed.");
59      return;
60    }
61  
62    private static void transformFile(Path input, Path output)
63        throws IOException {
64      SequenceFileLogReader in = new SequenceFileLogReader();
65      SequenceFileLogWriter out = new SequenceFileLogWriter();
66  
67      try {
68        Configuration conf = HBaseConfiguration.create();
69  
70        FileSystem inFS = input.getFileSystem(conf);
71        FileSystem outFS = output.getFileSystem(conf);
72  
73        in.init(inFS, input, conf);
74        boolean compress = in.reader.isWALCompressionEnabled();
75  
76        conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
77        out.init(outFS, output, conf);
78  
79        Entry e = null;
80        while ((e = in.next()) != null) out.append(e);
81      } finally {
82        in.close();
83        out.close();
84      }
85    }
86  
87    /**
88     * Reads the next compressed entry and returns it as a byte array
89     * 
90     * @param in the DataInput to read from
91     * @param dict the dictionary we use for our read.
92     * 
93     * @param the uncompressed array.
94     */
95    static byte[] readCompressed(DataInput in, Dictionary dict)
96        throws IOException {
97      byte status = in.readByte();
98  
99      if (status == Dictionary.NOT_IN_DICTIONARY) {
100       int length = WritableUtils.readVInt(in);
101       // if this isn't in the dictionary, we need to add to the dictionary.
102       byte[] arr = new byte[length];
103       in.readFully(arr);
104       if (dict != null) dict.addEntry(arr, 0, length);
105       return arr;
106     } else {
107       // Status here is the higher-order byte of index of the dictionary entry
108       // (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are
109       // shorts).
110       short dictIdx = toShort(status, in.readByte());
111       byte[] entry = dict.getEntry(dictIdx);
112       if (entry == null) {
113         throw new IOException("Missing dictionary entry for index "
114             + dictIdx);
115       }
116       return entry;
117     }
118   }
119 
120   /**
121    * Reads a compressed entry into an array.
122    * The output into the array ends up length-prefixed.
123    * 
124    * @param to the array to write into
125    * @param offset array offset to start writing to
126    * @param in the DataInput to read from
127    * @param dict the dictionary to use for compression
128    * 
129    * @return the length of the uncompressed data
130    */
131   static int uncompressIntoArray(byte[] to, int offset, DataInput in,
132       Dictionary dict) throws IOException {
133     byte status = in.readByte();
134 
135     if (status == Dictionary.NOT_IN_DICTIONARY) {
136       // status byte indicating that data to be read is not in dictionary.
137       // if this isn't in the dictionary, we need to add to the dictionary.
138       int length = WritableUtils.readVInt(in);
139       in.readFully(to, offset, length);
140       dict.addEntry(to, offset, length);
141       return length;
142     } else {
143       // the status byte also acts as the higher order byte of the dictionary
144       // entry
145       short dictIdx = toShort(status, in.readByte());
146       byte[] entry;
147       try {
148         entry = dict.getEntry(dictIdx);
149       } catch (Exception ex) {
150         throw new IOException("Unable to uncompress the log entry", ex);
151       }
152       if (entry == null) {
153         throw new IOException("Missing dictionary entry for index "
154             + dictIdx);
155       }
156       // now we write the uncompressed value.
157       Bytes.putBytes(to, offset, entry, 0, entry.length);
158       return entry.length;
159     }
160   }
161 
162   /**
163    * Compresses and writes an array to a DataOutput
164    * 
165    * @param data the array to write.
166    * @param out the DataOutput to write into
167    * @param dict the dictionary to use for compression
168    */
169   static void writeCompressed(byte[] data, int offset, int length,
170       DataOutput out, Dictionary dict)
171       throws IOException {
172     short dictIdx = Dictionary.NOT_IN_DICTIONARY;
173     if (dict != null) {
174       dictIdx = dict.findEntry(data, offset, length);
175     }
176     if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
177       // not in dict
178       out.writeByte(Dictionary.NOT_IN_DICTIONARY);
179       WritableUtils.writeVInt(out, length);
180       out.write(data, offset, length);
181     } else {
182       out.writeShort(dictIdx);
183     }
184   }
185 
186   static short toShort(byte hi, byte lo) {
187     short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
188     Preconditions.checkArgument(s >= 0);
189     return s;
190   }
191 }