001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver.wal; 021 022import java.io.IOException; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FSDataInputStream; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.io.util.LRUDictionary; 031import org.apache.hadoop.hbase.util.FSUtils; 032import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 033import org.apache.hadoop.hbase.wal.WAL.Entry; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) 039public abstract class ReaderBase implements AbstractFSWALProvider.Reader { 040 private static final Logger LOG = LoggerFactory.getLogger(ReaderBase.class); 041 protected Configuration conf; 042 protected FileSystem fs; 043 protected Path path; 044 protected long edit = 0; 045 protected long fileLength; 046 /** 047 * Compression context to use reading. Can be null if no compression. 048 */ 049 protected CompressionContext compressionContext = null; 050 protected boolean emptyCompressionContext = true; 051 052 /** 053 * Default constructor. 054 */ 055 public ReaderBase() { 056 } 057 058 @Override 059 public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream) 060 throws IOException { 061 this.conf = conf; 062 this.path = path; 063 this.fs = fs; 064 this.fileLength = this.fs.getFileStatus(path).getLen(); 065 String cellCodecClsName = initReader(stream); 066 067 boolean compression = hasCompression(); 068 if (compression) { 069 // If compression is enabled, new dictionaries are created here. 070 try { 071 if (compressionContext == null) { 072 compressionContext = new CompressionContext(LRUDictionary.class, 073 FSUtils.isRecoveredEdits(path), hasTagCompression()); 074 } else { 075 compressionContext.clear(); 076 } 077 } catch (Exception e) { 078 throw new IOException("Failed to initialize CompressionContext", e); 079 } 080 } 081 initAfterCompression(cellCodecClsName); 082 } 083 084 @Override 085 public Entry next() throws IOException { 086 return next(null); 087 } 088 089 @Override 090 public Entry next(Entry reuse) throws IOException { 091 Entry e = reuse; 092 if (e == null) { 093 e = new Entry(); 094 } 095 if (compressionContext != null) { 096 e.setCompressionContext(compressionContext); 097 } 098 099 boolean hasEntry = false; 100 try { 101 hasEntry = readNext(e); 102 } catch (IllegalArgumentException iae) { 103 TableName tableName = e.getKey().getTableName(); 104 if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) { 105 // It is old ROOT table edit, ignore it 106 LOG.info("Got an old ROOT edit, ignoring "); 107 return next(e); 108 } 109 else throw iae; 110 } 111 edit++; 112 if (compressionContext != null && emptyCompressionContext) { 113 emptyCompressionContext = false; 114 } 115 return hasEntry ? e : null; 116 } 117 118 @Override 119 public void seek(long pos) throws IOException { 120 if (compressionContext != null && emptyCompressionContext) { 121 while (next() != null) { 122 if (getPosition() == pos) { 123 emptyCompressionContext = false; 124 break; 125 } 126 } 127 } 128 seekOnFs(pos); 129 } 130 131 /** 132 * Initializes the log reader with a particular stream (may be null). 133 * Reader assumes ownership of the stream if not null and may use it. Called once. 134 * @return the class name of cell Codec, null if such information is not available 135 */ 136 protected abstract String initReader(FSDataInputStream stream) throws IOException; 137 138 /** 139 * Initializes the compression after the shared stuff has been initialized. Called once. 140 */ 141 protected abstract void initAfterCompression() throws IOException; 142 143 /** 144 * Initializes the compression after the shared stuff has been initialized. Called once. 145 * @param cellCodecClsName class name of cell Codec 146 */ 147 protected abstract void initAfterCompression(String cellCodecClsName) throws IOException; 148 /** 149 * @return Whether compression is enabled for this log. 150 */ 151 protected abstract boolean hasCompression(); 152 153 /** 154 * @return Whether tag compression is enabled for this log. 155 */ 156 protected abstract boolean hasTagCompression(); 157 158 /** 159 * Read next entry. 160 * @param e The entry to read into. 161 * @return Whether there was anything to read. 162 */ 163 protected abstract boolean readNext(Entry e) throws IOException; 164 165 /** 166 * Performs a filesystem-level seek to a certain position in an underlying file. 167 */ 168 protected abstract void seekOnFs(long pos) throws IOException; 169}