View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License
17   */
18  
19  package org.apache.hadoop.hbase.regionserver.wal;
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.io.util.Dictionary;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.io.WritableUtils;
33  
34  import com.google.common.base.Preconditions;
35  
36  /**
37   * A set of static functions for running our custom WAL compression/decompression.
38   * Also contains a command line tool to compress and uncompress HLogs.
39   */
40  @InterfaceAudience.Private
41  public class Compressor {
42    /**
43     * Command line tool to compress and uncompress WALs.
44     */
45    public static void main(String[] args) throws IOException {
46      if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) {
47        printHelp();
48        System.exit(-1);
49      }
50  
51      Path inputPath = new Path(args[0]);
52      Path outputPath = new Path(args[1]);
53  
54      transformFile(inputPath, outputPath);
55    }
56  
57    private static void printHelp() {
58      System.err.println("usage: Compressor <input> <output>");
59      System.err.println("If <input> HLog is compressed, <output> will be decompressed.");
60      System.err.println("If <input> HLog is uncompressed, <output> will be compressed.");
61      return;
62    }
63  
64    private static void transformFile(Path input, Path output)
65        throws IOException {
66      Configuration conf = HBaseConfiguration.create();
67  
68      FileSystem inFS = input.getFileSystem(conf);
69      FileSystem outFS = output.getFileSystem(conf);
70  
71      HLog.Reader in = HLogFactory.createReader(inFS, input, conf, null, false);
72      HLog.Writer out = null;
73  
74      try {
75        if (!(in instanceof ReaderBase)) {
76          System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName());
77          return;
78        }
79        boolean compress = ((ReaderBase)in).hasCompression();
80        conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
81        out = HLogFactory.createWALWriter(outFS, output, conf);
82  
83        HLog.Entry e = null;
84        while ((e = in.next()) != null) out.append(e);
85      } finally {
86        in.close();
87        if (out != null) {
88          out.close();
89          out = null;
90        }
91      }
92    }
93  
94    /**
95     * Reads the next compressed entry and returns it as a byte array
96     * 
97     * @param in the DataInput to read from
98     * @param dict the dictionary we use for our read.
99     * @return the uncompressed array.
100    */
101   @Deprecated
102   static byte[] readCompressed(DataInput in, Dictionary dict)
103       throws IOException {
104     byte status = in.readByte();
105 
106     if (status == Dictionary.NOT_IN_DICTIONARY) {
107       int length = WritableUtils.readVInt(in);
108       // if this isn't in the dictionary, we need to add to the dictionary.
109       byte[] arr = new byte[length];
110       in.readFully(arr);
111       if (dict != null) dict.addEntry(arr, 0, length);
112       return arr;
113     } else {
114       // Status here is the higher-order byte of index of the dictionary entry
115       // (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are
116       // shorts).
117       short dictIdx = toShort(status, in.readByte());
118       byte[] entry = dict.getEntry(dictIdx);
119       if (entry == null) {
120         throw new IOException("Missing dictionary entry for index "
121             + dictIdx);
122       }
123       return entry;
124     }
125   }
126 
127   /**
128    * Reads a compressed entry into an array.
129    * The output into the array ends up length-prefixed.
130    * 
131    * @param to the array to write into
132    * @param offset array offset to start writing to
133    * @param in the DataInput to read from
134    * @param dict the dictionary to use for compression
135    * 
136    * @return the length of the uncompressed data
137    */
138   @Deprecated
139   static int uncompressIntoArray(byte[] to, int offset, DataInput in,
140       Dictionary dict) throws IOException {
141     byte status = in.readByte();
142 
143     if (status == Dictionary.NOT_IN_DICTIONARY) {
144       // status byte indicating that data to be read is not in dictionary.
145       // if this isn't in the dictionary, we need to add to the dictionary.
146       int length = WritableUtils.readVInt(in);
147       in.readFully(to, offset, length);
148       dict.addEntry(to, offset, length);
149       return length;
150     } else {
151       // the status byte also acts as the higher order byte of the dictionary
152       // entry
153       short dictIdx = toShort(status, in.readByte());
154       byte[] entry;
155       try {
156         entry = dict.getEntry(dictIdx);
157       } catch (Exception ex) {
158         throw new IOException("Unable to uncompress the log entry", ex);
159       }
160       if (entry == null) {
161         throw new IOException("Missing dictionary entry for index "
162             + dictIdx);
163       }
164       // now we write the uncompressed value.
165       Bytes.putBytes(to, offset, entry, 0, entry.length);
166       return entry.length;
167     }
168   }
169 
170   /**
171    * Compresses and writes an array to a DataOutput
172    * 
173    * @param data the array to write.
174    * @param out the DataOutput to write into
175    * @param dict the dictionary to use for compression
176    */
177   @Deprecated
178   static void writeCompressed(byte[] data, int offset, int length,
179       DataOutput out, Dictionary dict)
180       throws IOException {
181     short dictIdx = Dictionary.NOT_IN_DICTIONARY;
182     if (dict != null) {
183       dictIdx = dict.findEntry(data, offset, length);
184     }
185     if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
186       // not in dict
187       out.writeByte(Dictionary.NOT_IN_DICTIONARY);
188       WritableUtils.writeVInt(out, length);
189       out.write(data, offset, length);
190     } else {
191       out.writeShort(dictIdx);
192     }
193   }
194 
195   static short toShort(byte hi, byte lo) {
196     short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
197     Preconditions.checkArgument(s >= 0);
198     return s;
199   }
200 }