001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver.wal;
021
022import java.io.IOException;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FSDataInputStream;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.io.util.LRUDictionary;
031import org.apache.hadoop.hbase.util.FSUtils;
032import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
033import org.apache.hadoop.hbase.wal.WAL.Entry;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
039public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
040  private static final Logger LOG = LoggerFactory.getLogger(ReaderBase.class);
041  protected Configuration conf;
042  protected FileSystem fs;
043  protected Path path;
044  protected long edit = 0;
045  protected long fileLength;
046  /**
047   * Compression context to use reading.  Can be null if no compression.
048   */
049  protected CompressionContext compressionContext = null;
050  protected boolean emptyCompressionContext = true;
051
052  /**
053   * Default constructor.
054   */
055  public ReaderBase() {
056  }
057
058  @Override
059  public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
060      throws IOException {
061    this.conf = conf;
062    this.path = path;
063    this.fs = fs;
064    this.fileLength = this.fs.getFileStatus(path).getLen();
065    String cellCodecClsName = initReader(stream);
066
067    boolean compression = hasCompression();
068    if (compression) {
069      // If compression is enabled, new dictionaries are created here.
070      try {
071        if (compressionContext == null) {
072          compressionContext = new CompressionContext(LRUDictionary.class,
073              FSUtils.isRecoveredEdits(path), hasTagCompression());
074        } else {
075          compressionContext.clear();
076        }
077      } catch (Exception e) {
078        throw new IOException("Failed to initialize CompressionContext", e);
079      }
080    }
081    initAfterCompression(cellCodecClsName);
082  }
083
084  @Override
085  public Entry next() throws IOException {
086    return next(null);
087  }
088
089  @Override
090  public Entry next(Entry reuse) throws IOException {
091    Entry e = reuse;
092    if (e == null) {
093      e = new Entry();
094    }
095    if (compressionContext != null) {
096      e.setCompressionContext(compressionContext);
097    }
098
099    boolean hasEntry = false;
100    try {
101      hasEntry = readNext(e);
102    } catch (IllegalArgumentException iae) {
103      TableName tableName = e.getKey().getTableName();
104      if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
105        // It is old ROOT table edit, ignore it
106        LOG.info("Got an old ROOT edit, ignoring ");
107        return next(e);
108      }
109      else throw iae;
110    }
111    edit++;
112    if (compressionContext != null && emptyCompressionContext) {
113      emptyCompressionContext = false;
114    }
115    return hasEntry ? e : null;
116  }
117
118  @Override
119  public void seek(long pos) throws IOException {
120    if (compressionContext != null && emptyCompressionContext) {
121      while (next() != null) {
122        if (getPosition() == pos) {
123          emptyCompressionContext = false;
124          break;
125        }
126      }
127    }
128    seekOnFs(pos);
129  }
130
131  /**
132   * Initializes the log reader with a particular stream (may be null).
133   * Reader assumes ownership of the stream if not null and may use it. Called once.
134   * @return the class name of cell Codec, null if such information is not available
135   */
136  protected abstract String initReader(FSDataInputStream stream) throws IOException;
137
138  /**
139   * Initializes the compression after the shared stuff has been initialized. Called once.
140   */
141  protected abstract void initAfterCompression() throws IOException;
142
143  /**
144   * Initializes the compression after the shared stuff has been initialized. Called once.
145   * @param cellCodecClsName class name of cell Codec
146   */
147  protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
148  /**
149   * @return Whether compression is enabled for this log.
150   */
151  protected abstract boolean hasCompression();
152
153  /**
154   * @return Whether tag compression is enabled for this log.
155   */
156  protected abstract boolean hasTagCompression();
157
158  /**
159   * Read next entry.
160   * @param e The entry to read into.
161   * @return Whether there was anything to read.
162   */
163  protected abstract boolean readNext(Entry e) throws IOException;
164
165  /**
166   * Performs a filesystem-level seek to a certain position in an underlying file.
167   */
168  protected abstract void seekOnFs(long pos) throws IOException;
169}