001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver.wal;
021
022import java.io.IOException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FSDataInputStream;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseInterfaceAudience;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.io.util.LRUDictionary;
030import org.apache.hadoop.hbase.util.CommonFSUtils;
031import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
032import org.apache.hadoop.hbase.wal.WAL.Entry;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
038public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
039  private static final Logger LOG = LoggerFactory.getLogger(ReaderBase.class);
040  protected Configuration conf;
041  protected FileSystem fs;
042  protected Path path;
043  protected long edit = 0;
044  protected long fileLength;
045  /**
046   * Compression context to use reading.  Can be null if no compression.
047   */
048  protected CompressionContext compressionContext = null;
049  protected boolean emptyCompressionContext = true;
050
051  /**
052   * Default constructor.
053   */
054  public ReaderBase() {
055  }
056
057  @Override
058  public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
059      throws IOException {
060    this.conf = conf;
061    this.path = path;
062    this.fs = fs;
063    this.fileLength = this.fs.getFileStatus(path).getLen();
064    String cellCodecClsName = initReader(stream);
065
066    boolean compression = hasCompression();
067    if (compression) {
068      // If compression is enabled, new dictionaries are created here.
069      try {
070        if (compressionContext == null) {
071          compressionContext = new CompressionContext(LRUDictionary.class,
072            CommonFSUtils.isRecoveredEdits(path), hasTagCompression());
073        } else {
074          compressionContext.clear();
075        }
076      } catch (Exception e) {
077        throw new IOException("Failed to initialize CompressionContext", e);
078      }
079    }
080    initAfterCompression(cellCodecClsName);
081  }
082
083  @Override
084  public Entry next() throws IOException {
085    return next(null);
086  }
087
088  @Override
089  public Entry next(Entry reuse) throws IOException {
090    Entry e = reuse;
091    if (e == null) {
092      e = new Entry();
093    }
094
095    boolean hasEntry = false;
096    try {
097      hasEntry = readNext(e);
098    } catch (IllegalArgumentException iae) {
099      TableName tableName = e.getKey().getTableName();
100      if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
101        // It is old ROOT table edit, ignore it
102        LOG.info("Got an old ROOT edit, ignoring ");
103        return next(e);
104      }
105      else throw iae;
106    }
107    edit++;
108    if (compressionContext != null && emptyCompressionContext) {
109      emptyCompressionContext = false;
110    }
111    return hasEntry ? e : null;
112  }
113
114  @Override
115  public void seek(long pos) throws IOException {
116    if (compressionContext != null && emptyCompressionContext) {
117      while (next() != null) {
118        if (getPosition() == pos) {
119          emptyCompressionContext = false;
120          break;
121        }
122      }
123    }
124    seekOnFs(pos);
125  }
126
127  /**
128   * Initializes the log reader with a particular stream (may be null).
129   * Reader assumes ownership of the stream if not null and may use it. Called once.
130   * @return the class name of cell Codec, null if such information is not available
131   */
132  protected abstract String initReader(FSDataInputStream stream) throws IOException;
133
134  /**
135   * Initializes the compression after the shared stuff has been initialized. Called once.
136   */
137  protected abstract void initAfterCompression() throws IOException;
138
139  /**
140   * Initializes the compression after the shared stuff has been initialized. Called once.
141   * @param cellCodecClsName class name of cell Codec
142   */
143  protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
144  /**
145   * @return Whether compression is enabled for this log.
146   */
147  protected abstract boolean hasCompression();
148
149  /**
150   * @return Whether tag compression is enabled for this log.
151   */
152  protected abstract boolean hasTagCompression();
153
154  /**
155   * Read next entry.
156   * @param e The entry to read into.
157   * @return Whether there was anything to read.
158   */
159  protected abstract boolean readNext(Entry e) throws IOException;
160
161  /**
162   * Performs a filesystem-level seek to a certain position in an underlying file.
163   */
164  protected abstract void seekOnFs(long pos) throws IOException;
165}