001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FSDataInputStream; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HBaseInterfaceAudience; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.io.compress.Compression; 028import org.apache.hadoop.hbase.io.util.LRUDictionary; 029import org.apache.hadoop.hbase.util.CommonFSUtils; 030import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 031import org.apache.hadoop.hbase.wal.WAL.Entry; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 037public abstract class ReaderBase implements AbstractFSWALProvider.Reader { 038 private static final Logger LOG = LoggerFactory.getLogger(ReaderBase.class); 039 protected Configuration conf; 040 protected FileSystem fs; 041 protected Path path; 042 protected long edit = 0; 043 protected long fileLength; 044 /** 045 * Compression context to use reading. Can be null if no compression. 046 */ 047 protected CompressionContext compressionContext = null; 048 protected boolean emptyCompressionContext = true; 049 050 /** 051 * Default constructor. 052 */ 053 public ReaderBase() { 054 } 055 056 @Override 057 public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream) 058 throws IOException { 059 this.conf = conf; 060 this.path = path; 061 this.fs = fs; 062 this.fileLength = this.fs.getFileStatus(path).getLen(); 063 String cellCodecClsName = initReader(stream); 064 065 boolean compression = hasCompression(); 066 if (compression) { 067 // If compression is enabled, new dictionaries are created here. 068 try { 069 if (compressionContext == null) { 070 if (LOG.isDebugEnabled()) { 071 LOG.debug( 072 "Initializing compression context for {}: isRecoveredEdits={}" 073 + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", 074 path, CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), 075 hasValueCompression(), getValueCompressionAlgorithm()); 076 } 077 compressionContext = 078 new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path), 079 hasTagCompression(), hasValueCompression(), getValueCompressionAlgorithm()); 080 } else { 081 compressionContext.clear(); 082 } 083 } catch (Exception e) { 084 throw new IOException("Failed to initialize CompressionContext", e); 085 } 086 } 087 initAfterCompression(cellCodecClsName); 088 } 089 090 @Override 091 public Entry next() throws IOException { 092 return next(null); 093 } 094 095 @Override 096 public Entry next(Entry reuse) throws IOException { 097 Entry e = reuse; 098 if (e == null) { 099 e = new Entry(); 100 } 101 102 boolean hasEntry = false; 103 try { 104 hasEntry = readNext(e); 105 } catch (IllegalArgumentException iae) { 106 TableName tableName = e.getKey().getTableName(); 107 if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) { 108 // It is old ROOT table edit, ignore it 109 LOG.info("Got an old ROOT edit, ignoring "); 110 return next(e); 111 } else throw iae; 112 } 113 edit++; 114 if (compressionContext != null && emptyCompressionContext) { 115 emptyCompressionContext = false; 116 } 117 return hasEntry ? e : null; 118 } 119 120 @Override 121 public void seek(long pos) throws IOException { 122 if (compressionContext != null && emptyCompressionContext) { 123 while (next() != null) { 124 if (getPosition() == pos) { 125 emptyCompressionContext = false; 126 break; 127 } 128 } 129 } 130 seekOnFs(pos); 131 } 132 133 /** 134 * Initializes the log reader with a particular stream (may be null). Reader assumes ownership of 135 * the stream if not null and may use it. Called once. 136 * @return the class name of cell Codec, null if such information is not available 137 */ 138 protected abstract String initReader(FSDataInputStream stream) throws IOException; 139 140 /** 141 * Initializes the compression after the shared stuff has been initialized. Called once. 142 */ 143 protected abstract void initAfterCompression() throws IOException; 144 145 /** 146 * Initializes the compression after the shared stuff has been initialized. Called once. 147 * @param cellCodecClsName class name of cell Codec 148 */ 149 protected abstract void initAfterCompression(String cellCodecClsName) throws IOException; 150 151 /** Returns Whether compression is enabled for this log. */ 152 protected abstract boolean hasCompression(); 153 154 /** Returns Whether tag compression is enabled for this log. */ 155 protected abstract boolean hasTagCompression(); 156 157 /** Returns Whether value compression is enabled for this log. */ 158 protected abstract boolean hasValueCompression(); 159 160 /** Returns Value compression algorithm for this log. */ 161 protected abstract Compression.Algorithm getValueCompressionAlgorithm(); 162 163 /** 164 * Read next entry. 165 * @param e The entry to read into. 166 * @return Whether there was anything to read. 167 */ 168 protected abstract boolean readNext(Entry e) throws IOException; 169 170 /** 171 * Performs a filesystem-level seek to a certain position in an underlying file. 172 */ 173 protected abstract void seekOnFs(long pos) throws IOException; 174}