001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.io.util.Dictionary;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.wal.WAL;
031import org.apache.hadoop.hbase.wal.WALFactory;
032import org.apache.hadoop.hbase.wal.WALProvider;
033import org.apache.hadoop.hbase.wal.WALStreamReader;
034import org.apache.hadoop.io.WritableUtils;
035import org.apache.yetus.audience.InterfaceAudience;
036
037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
038
039/**
040 * A set of static functions for running our custom WAL compression/decompression. Also contains a
041 * command line tool to compress and uncompress WALs.
042 */
043@InterfaceAudience.Private
044public class Compressor {
045  /**
046   * Command line tool to compress and uncompress WALs.
047   */
048  public static void main(String[] args) throws IOException {
049    if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) {
050      printHelp();
051      System.exit(-1);
052    }
053
054    Path inputPath = new Path(args[0]);
055    Path outputPath = new Path(args[1]);
056
057    transformFile(inputPath, outputPath);
058  }
059
060  private static void printHelp() {
061    System.err.println("usage: Compressor <input> <output>");
062    System.err.println("If <input> WAL is compressed, <output> will be decompressed.");
063    System.err.println("If <input> WAL is uncompressed, <output> will be compressed.");
064    return;
065  }
066
067  private static void transformFile(Path input, Path output) throws IOException {
068    Configuration conf = HBaseConfiguration.create();
069
070    FileSystem inFS = input.getFileSystem(conf);
071    FileSystem outFS = output.getFileSystem(conf);
072
073    WALStreamReader in = WALFactory.createStreamReader(inFS, input, conf);
074    WALProvider.Writer out = null;
075
076    try {
077      if (!(in instanceof AbstractProtobufWALReader)) {
078        System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName());
079        return;
080      }
081      boolean compress = ((AbstractProtobufWALReader) in).hasCompression;
082      conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
083      out = WALFactory.createWALWriter(outFS, output, conf);
084
085      WAL.Entry e = null;
086      while ((e = in.next()) != null) {
087        out.append(e);
088      }
089    } finally {
090      in.close();
091      if (out != null) {
092        out.close();
093        out = null;
094      }
095    }
096  }
097
098  /**
099   * Reads the next compressed entry and returns it as a byte array
100   * @param in   the DataInput to read from
101   * @param dict the dictionary we use for our read.
102   * @return the uncompressed array.
103   */
104  @Deprecated
105  static byte[] readCompressed(DataInput in, Dictionary dict) throws IOException {
106    byte status = in.readByte();
107
108    if (status == Dictionary.NOT_IN_DICTIONARY) {
109      int length = WritableUtils.readVInt(in);
110      // if this isn't in the dictionary, we need to add to the dictionary.
111      byte[] arr = new byte[length];
112      in.readFully(arr);
113      if (dict != null) {
114        dict.addEntry(arr, 0, length);
115      }
116      return arr;
117    } else {
118      // Status here is the higher-order byte of index of the dictionary entry
119      // (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are
120      // shorts).
121      short dictIdx = toShort(status, in.readByte());
122      byte[] entry = dict.getEntry(dictIdx);
123      if (entry == null) {
124        throw new IOException("Missing dictionary entry for index " + dictIdx);
125      }
126      return entry;
127    }
128  }
129
130  /**
131   * Reads a compressed entry into an array. The output into the array ends up length-prefixed.
132   * @param to     the array to write into
133   * @param offset array offset to start writing to
134   * @param in     the DataInput to read from
135   * @param dict   the dictionary to use for compression
136   * @return the length of the uncompressed data
137   */
138  @Deprecated
139  static int uncompressIntoArray(byte[] to, int offset, DataInput in, Dictionary dict)
140    throws IOException {
141    byte status = in.readByte();
142
143    if (status == Dictionary.NOT_IN_DICTIONARY) {
144      // status byte indicating that data to be read is not in dictionary.
145      // if this isn't in the dictionary, we need to add to the dictionary.
146      int length = WritableUtils.readVInt(in);
147      in.readFully(to, offset, length);
148      dict.addEntry(to, offset, length);
149      return length;
150    } else {
151      // the status byte also acts as the higher order byte of the dictionary
152      // entry
153      short dictIdx = toShort(status, in.readByte());
154      byte[] entry;
155      try {
156        entry = dict.getEntry(dictIdx);
157      } catch (Exception ex) {
158        throw new IOException("Unable to uncompress the log entry", ex);
159      }
160      if (entry == null) {
161        throw new IOException("Missing dictionary entry for index " + dictIdx);
162      }
163      // now we write the uncompressed value.
164      Bytes.putBytes(to, offset, entry, 0, entry.length);
165      return entry.length;
166    }
167  }
168
169  /**
170   * Compresses and writes an array to a DataOutput
171   * @param data the array to write.
172   * @param out  the DataOutput to write into
173   * @param dict the dictionary to use for compression
174   */
175  @Deprecated
176  static void writeCompressed(byte[] data, int offset, int length, DataOutput out, Dictionary dict)
177    throws IOException {
178    short dictIdx = Dictionary.NOT_IN_DICTIONARY;
179    if (dict != null) {
180      dictIdx = dict.findEntry(data, offset, length);
181    }
182    if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
183      // not in dict
184      out.writeByte(Dictionary.NOT_IN_DICTIONARY);
185      WritableUtils.writeVInt(out, length);
186      out.write(data, offset, length);
187    } else {
188      out.writeShort(dictIdx);
189    }
190  }
191
192  static short toShort(byte hi, byte lo) {
193    short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
194    Preconditions.checkArgument(s >= 0);
195    return s;
196  }
197}