001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License
017 */
018
019package org.apache.hadoop.hbase.regionserver.wal;
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.io.util.Dictionary;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.io.WritableUtils;
033
034import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
035
036import org.apache.hadoop.hbase.wal.WAL;
037import org.apache.hadoop.hbase.wal.WALFactory;
038import org.apache.hadoop.hbase.wal.WALProvider;
039
040/**
041 * A set of static functions for running our custom WAL compression/decompression.
042 * Also contains a command line tool to compress and uncompress WALs.
043 */
044@InterfaceAudience.Private
045public class Compressor {
046  /**
047   * Command line tool to compress and uncompress WALs.
048   */
049  public static void main(String[] args) throws IOException {
050    if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) {
051      printHelp();
052      System.exit(-1);
053    }
054
055    Path inputPath = new Path(args[0]);
056    Path outputPath = new Path(args[1]);
057
058    transformFile(inputPath, outputPath);
059  }
060
061  private static void printHelp() {
062    System.err.println("usage: Compressor <input> <output>");
063    System.err.println("If <input> WAL is compressed, <output> will be decompressed.");
064    System.err.println("If <input> WAL is uncompressed, <output> will be compressed.");
065    return;
066  }
067
068  private static void transformFile(Path input, Path output)
069      throws IOException {
070    Configuration conf = HBaseConfiguration.create();
071
072    FileSystem inFS = input.getFileSystem(conf);
073    FileSystem outFS = output.getFileSystem(conf);
074
075    WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf);
076    WALProvider.Writer out = null;
077
078    try {
079      if (!(in instanceof ReaderBase)) {
080        System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName());
081        return;
082      }
083      boolean compress = ((ReaderBase)in).hasCompression();
084      conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
085      out = WALFactory.createWALWriter(outFS, output, conf);
086
087      WAL.Entry e = null;
088      while ((e = in.next()) != null) out.append(e);
089    } finally {
090      in.close();
091      if (out != null) {
092        out.close();
093        out = null;
094      }
095    }
096  }
097
098  /**
099   * Reads the next compressed entry and returns it as a byte array
100   * 
101   * @param in the DataInput to read from
102   * @param dict the dictionary we use for our read.
103   * @return the uncompressed array.
104   */
105  @Deprecated
106  static byte[] readCompressed(DataInput in, Dictionary dict)
107      throws IOException {
108    byte status = in.readByte();
109
110    if (status == Dictionary.NOT_IN_DICTIONARY) {
111      int length = WritableUtils.readVInt(in);
112      // if this isn't in the dictionary, we need to add to the dictionary.
113      byte[] arr = new byte[length];
114      in.readFully(arr);
115      if (dict != null) dict.addEntry(arr, 0, length);
116      return arr;
117    } else {
118      // Status here is the higher-order byte of index of the dictionary entry
119      // (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are
120      // shorts).
121      short dictIdx = toShort(status, in.readByte());
122      byte[] entry = dict.getEntry(dictIdx);
123      if (entry == null) {
124        throw new IOException("Missing dictionary entry for index "
125            + dictIdx);
126      }
127      return entry;
128    }
129  }
130
131  /**
132   * Reads a compressed entry into an array.
133   * The output into the array ends up length-prefixed.
134   * 
135   * @param to the array to write into
136   * @param offset array offset to start writing to
137   * @param in the DataInput to read from
138   * @param dict the dictionary to use for compression
139   * 
140   * @return the length of the uncompressed data
141   */
142  @Deprecated
143  static int uncompressIntoArray(byte[] to, int offset, DataInput in,
144      Dictionary dict) throws IOException {
145    byte status = in.readByte();
146
147    if (status == Dictionary.NOT_IN_DICTIONARY) {
148      // status byte indicating that data to be read is not in dictionary.
149      // if this isn't in the dictionary, we need to add to the dictionary.
150      int length = WritableUtils.readVInt(in);
151      in.readFully(to, offset, length);
152      dict.addEntry(to, offset, length);
153      return length;
154    } else {
155      // the status byte also acts as the higher order byte of the dictionary
156      // entry
157      short dictIdx = toShort(status, in.readByte());
158      byte[] entry;
159      try {
160        entry = dict.getEntry(dictIdx);
161      } catch (Exception ex) {
162        throw new IOException("Unable to uncompress the log entry", ex);
163      }
164      if (entry == null) {
165        throw new IOException("Missing dictionary entry for index "
166            + dictIdx);
167      }
168      // now we write the uncompressed value.
169      Bytes.putBytes(to, offset, entry, 0, entry.length);
170      return entry.length;
171    }
172  }
173
174  /**
175   * Compresses and writes an array to a DataOutput
176   * 
177   * @param data the array to write.
178   * @param out the DataOutput to write into
179   * @param dict the dictionary to use for compression
180   */
181  @Deprecated
182  static void writeCompressed(byte[] data, int offset, int length,
183      DataOutput out, Dictionary dict)
184      throws IOException {
185    short dictIdx = Dictionary.NOT_IN_DICTIONARY;
186    if (dict != null) {
187      dictIdx = dict.findEntry(data, offset, length);
188    }
189    if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
190      // not in dict
191      out.writeByte(Dictionary.NOT_IN_DICTIONARY);
192      WritableUtils.writeVInt(out, length);
193      out.write(data, offset, length);
194    } else {
195      out.writeShort(dictIdx);
196    }
197  }
198
199  static short toShort(byte hi, byte lo) {
200    short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
201    Preconditions.checkArgument(s >= 0);
202    return s;
203  }
204}