001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver.wal;
021
022import java.io.IOException;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FSDataInputStream;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.io.util.LRUDictionary;
031import org.apache.hadoop.hbase.util.FSUtils;
032import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
033import org.apache.hadoop.hbase.wal.WAL.Entry;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
039public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
040  private static final Logger LOG = LoggerFactory.getLogger(ReaderBase.class);
041  protected Configuration conf;
042  protected FileSystem fs;
043  protected Path path;
044  protected long edit = 0;
045  protected long fileLength;
046  /**
047   * Compression context to use reading.  Can be null if no compression.
048   */
049  protected CompressionContext compressionContext = null;
050  protected boolean emptyCompressionContext = true;
051
052  /**
053   * Default constructor.
054   */
055  public ReaderBase() {
056  }
057
058  @Override
059  public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
060      throws IOException {
061    this.conf = conf;
062    this.path = path;
063    this.fs = fs;
064    this.fileLength = this.fs.getFileStatus(path).getLen();
065    String cellCodecClsName = initReader(stream);
066
067    boolean compression = hasCompression();
068    if (compression) {
069      // If compression is enabled, new dictionaries are created here.
070      try {
071        if (compressionContext == null) {
072          compressionContext = new CompressionContext(LRUDictionary.class,
073              FSUtils.isRecoveredEdits(path), hasTagCompression());
074        } else {
075          compressionContext.clear();
076        }
077      } catch (Exception e) {
078        throw new IOException("Failed to initialize CompressionContext", e);
079      }
080    }
081    initAfterCompression(cellCodecClsName);
082  }
083
084  @Override
085  public Entry next() throws IOException {
086    return next(null);
087  }
088
089  @Override
090  public Entry next(Entry reuse) throws IOException {
091    Entry e = reuse;
092    if (e == null) {
093      e = new Entry();
094    }
095
096    boolean hasEntry = false;
097    try {
098      hasEntry = readNext(e);
099    } catch (IllegalArgumentException iae) {
100      TableName tableName = e.getKey().getTableName();
101      if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
102        // It is old ROOT table edit, ignore it
103        LOG.info("Got an old ROOT edit, ignoring ");
104        return next(e);
105      }
106      else throw iae;
107    }
108    edit++;
109    if (compressionContext != null && emptyCompressionContext) {
110      emptyCompressionContext = false;
111    }
112    return hasEntry ? e : null;
113  }
114
115  @Override
116  public void seek(long pos) throws IOException {
117    if (compressionContext != null && emptyCompressionContext) {
118      while (next() != null) {
119        if (getPosition() == pos) {
120          emptyCompressionContext = false;
121          break;
122        }
123      }
124    }
125    seekOnFs(pos);
126  }
127
128  /**
129   * Initializes the log reader with a particular stream (may be null).
130   * Reader assumes ownership of the stream if not null and may use it. Called once.
131   * @return the class name of cell Codec, null if such information is not available
132   */
133  protected abstract String initReader(FSDataInputStream stream) throws IOException;
134
135  /**
136   * Initializes the compression after the shared stuff has been initialized. Called once.
137   */
138  protected abstract void initAfterCompression() throws IOException;
139
140  /**
141   * Initializes the compression after the shared stuff has been initialized. Called once.
142   * @param cellCodecClsName class name of cell Codec
143   */
144  protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
145  /**
146   * @return Whether compression is enabled for this log.
147   */
148  protected abstract boolean hasCompression();
149
150  /**
151   * @return Whether tag compression is enabled for this log.
152   */
153  protected abstract boolean hasTagCompression();
154
155  /**
156   * Read next entry.
157   * @param e The entry to read into.
158   * @return Whether there was anything to read.
159   */
160  protected abstract boolean readNext(Entry e) throws IOException;
161
162  /**
163   * Performs a filesystem-level seek to a certain position in an underlying file.
164   */
165  protected abstract void seekOnFs(long pos) throws IOException;
166}