001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.io.util.Dictionary;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.wal.WAL;
031import org.apache.hadoop.hbase.wal.WALFactory;
032import org.apache.hadoop.hbase.wal.WALProvider;
033import org.apache.hadoop.io.WritableUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035
036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
037
038/**
039 * A set of static functions for running our custom WAL compression/decompression. Also contains a
040 * command line tool to compress and uncompress WALs.
041 */
042@InterfaceAudience.Private
043public class Compressor {
044  /**
045   * Command line tool to compress and uncompress WALs.
046   */
047  public static void main(String[] args) throws IOException {
048    if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) {
049      printHelp();
050      System.exit(-1);
051    }
052
053    Path inputPath = new Path(args[0]);
054    Path outputPath = new Path(args[1]);
055
056    transformFile(inputPath, outputPath);
057  }
058
059  private static void printHelp() {
060    System.err.println("usage: Compressor <input> <output>");
061    System.err.println("If <input> WAL is compressed, <output> will be decompressed.");
062    System.err.println("If <input> WAL is uncompressed, <output> will be compressed.");
063    return;
064  }
065
066  private static void transformFile(Path input, Path output) throws IOException {
067    Configuration conf = HBaseConfiguration.create();
068
069    FileSystem inFS = input.getFileSystem(conf);
070    FileSystem outFS = output.getFileSystem(conf);
071
072    WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf);
073    WALProvider.Writer out = null;
074
075    try {
076      if (!(in instanceof ReaderBase)) {
077        System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName());
078        return;
079      }
080      boolean compress = ((ReaderBase) in).hasCompression();
081      conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
082      out = WALFactory.createWALWriter(outFS, output, conf);
083
084      WAL.Entry e = null;
085      while ((e = in.next()) != null)
086        out.append(e);
087    } finally {
088      in.close();
089      if (out != null) {
090        out.close();
091        out = null;
092      }
093    }
094  }
095
096  /**
097   * Reads the next compressed entry and returns it as a byte array
098   * @param in   the DataInput to read from
099   * @param dict the dictionary we use for our read.
100   * @return the uncompressed array.
101   */
102  @Deprecated
103  static byte[] readCompressed(DataInput in, Dictionary dict) throws IOException {
104    byte status = in.readByte();
105
106    if (status == Dictionary.NOT_IN_DICTIONARY) {
107      int length = WritableUtils.readVInt(in);
108      // if this isn't in the dictionary, we need to add to the dictionary.
109      byte[] arr = new byte[length];
110      in.readFully(arr);
111      if (dict != null) dict.addEntry(arr, 0, length);
112      return arr;
113    } else {
114      // Status here is the higher-order byte of index of the dictionary entry
115      // (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are
116      // shorts).
117      short dictIdx = toShort(status, in.readByte());
118      byte[] entry = dict.getEntry(dictIdx);
119      if (entry == null) {
120        throw new IOException("Missing dictionary entry for index " + dictIdx);
121      }
122      return entry;
123    }
124  }
125
126  /**
127   * Reads a compressed entry into an array. The output into the array ends up length-prefixed.
128   * @param to     the array to write into
129   * @param offset array offset to start writing to
130   * @param in     the DataInput to read from
131   * @param dict   the dictionary to use for compression
132   * @return the length of the uncompressed data
133   */
134  @Deprecated
135  static int uncompressIntoArray(byte[] to, int offset, DataInput in, Dictionary dict)
136    throws IOException {
137    byte status = in.readByte();
138
139    if (status == Dictionary.NOT_IN_DICTIONARY) {
140      // status byte indicating that data to be read is not in dictionary.
141      // if this isn't in the dictionary, we need to add to the dictionary.
142      int length = WritableUtils.readVInt(in);
143      in.readFully(to, offset, length);
144      dict.addEntry(to, offset, length);
145      return length;
146    } else {
147      // the status byte also acts as the higher order byte of the dictionary
148      // entry
149      short dictIdx = toShort(status, in.readByte());
150      byte[] entry;
151      try {
152        entry = dict.getEntry(dictIdx);
153      } catch (Exception ex) {
154        throw new IOException("Unable to uncompress the log entry", ex);
155      }
156      if (entry == null) {
157        throw new IOException("Missing dictionary entry for index " + dictIdx);
158      }
159      // now we write the uncompressed value.
160      Bytes.putBytes(to, offset, entry, 0, entry.length);
161      return entry.length;
162    }
163  }
164
165  /**
166   * Compresses and writes an array to a DataOutput
167   * @param data the array to write.
168   * @param out  the DataOutput to write into
169   * @param dict the dictionary to use for compression
170   */
171  @Deprecated
172  static void writeCompressed(byte[] data, int offset, int length, DataOutput out, Dictionary dict)
173    throws IOException {
174    short dictIdx = Dictionary.NOT_IN_DICTIONARY;
175    if (dict != null) {
176      dictIdx = dict.findEntry(data, offset, length);
177    }
178    if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
179      // not in dict
180      out.writeByte(Dictionary.NOT_IN_DICTIONARY);
181      WritableUtils.writeVInt(out, length);
182      out.write(data, offset, length);
183    } else {
184      out.writeShort(dictIdx);
185    }
186  }
187
188  static short toShort(byte hi, byte lo) {
189    short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
190    Preconditions.checkArgument(s >= 0);
191    return s;
192  }
193}