001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.wal; 020 021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID; 022import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; 023import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.concurrent.atomic.AtomicBoolean; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.client.RegionInfo; 038// imports for things that haven't moved from regionserver.wal yet. 039import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 040import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 041import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 042import org.apache.hadoop.hbase.util.CommonFSUtils; 043import org.apache.hadoop.hbase.wal.WAL.Entry; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * A WAL Provider that returns a single thread safe WAL that optionally can skip parts of our normal 050 * interactions with HDFS. 051 * <p> 052 * This implementation picks a directory in HDFS based on the same mechanisms as the 053 * {@link FSHLogProvider}. Users can configure how much interaction we have with HDFS with the 054 * configuration property "hbase.wal.iotestprovider.operations". The value should be a comma 055 * separated list of allowed operations: 056 * <ul> 057 * <li><em>append</em> : edits will be written to the underlying filesystem</li> 058 * <li><em>sync</em> : wal syncs will result in hflush calls</li> 059 * <li><em>fileroll</em> : roll requests will result in creating a new file on the underlying 060 * filesystem.</li> 061 * </ul> 062 * Additionally, the special cases "all" and "none" are recognized. If ommited, the value defaults 063 * to "all." Behavior is undefined if "all" or "none" are paired with additional values. Behavior is 064 * also undefined if values not listed above are included. 065 * <p> 066 * Only those operations listed will occur between the returned WAL and HDFS. All others will be 067 * no-ops. 068 * <p> 069 * Note that in the case of allowing "append" operations but not allowing "fileroll", the returned 070 * WAL will just keep writing to the same file. This won't avoid all costs associated with file 071 * management over time, becaue the data set size may result in additional HDFS block allocations. 072 */ 073@InterfaceAudience.Private 074public class IOTestProvider implements WALProvider { 075 private static final Logger LOG = LoggerFactory.getLogger(IOTestProvider.class); 076 077 private static final String ALLOWED_OPERATIONS = "hbase.wal.iotestprovider.operations"; 078 private enum AllowedOperations { 079 all, 080 append, 081 sync, 082 fileroll, 083 none; 084 } 085 086 private WALFactory factory; 087 088 private Configuration conf; 089 090 private volatile FSHLog log; 091 092 private String providerId; 093 protected AtomicBoolean initialized = new AtomicBoolean(false); 094 095 private List<WALActionsListener> listeners = new ArrayList<>(); 096 /** 097 * @param factory factory that made us, identity used for FS layout. may not be null 098 * @param conf may not be null 099 * @param providerId differentiate between providers from one facotry, used for FS layout. may be 100 * null 101 */ 102 @Override 103 public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) 104 throws IOException { 105 if (!initialized.compareAndSet(false, true)) { 106 throw new IllegalStateException("WALProvider.init should only be called once."); 107 } 108 this.factory = factory; 109 this.conf = conf; 110 this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID; 111 112 113 } 114 115 @Override 116 public List<WAL> getWALs() { 117 return Collections.singletonList(log); 118 } 119 120 private FSHLog createWAL() throws IOException { 121 String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; 122 return new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), 123 AbstractFSWALProvider.getWALDirectoryName(factory.factoryId), 124 HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, 125 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); 126 } 127 128 @Override 129 public WAL getWAL(RegionInfo region) throws IOException { 130 FSHLog log = this.log; 131 if (log != null) { 132 return log; 133 } 134 synchronized (this) { 135 log = this.log; 136 if (log == null) { 137 log = createWAL(); 138 this.log = log; 139 } 140 } 141 return log; 142 } 143 144 @Override 145 public void close() throws IOException { 146 FSHLog log = this.log; 147 if (log != null) { 148 log.close(); 149 } 150 } 151 152 @Override 153 public void shutdown() throws IOException { 154 FSHLog log = this.log; 155 if (log != null) { 156 log.shutdown(); 157 } 158 } 159 160 private static class IOTestWAL extends FSHLog { 161 162 private final boolean doFileRolls; 163 164 // Used to differntiate between roll calls before and after we finish construction. 165 private final boolean initialized; 166 167 /** 168 * Create an edit log at the given <code>dir</code> location. 169 * 170 * You should never have to load an existing log. If there is a log at 171 * startup, it should have already been processed and deleted by the time the 172 * WAL object is started up. 173 * 174 * @param fs filesystem handle 175 * @param rootDir path to where logs and oldlogs 176 * @param logDir dir where wals are stored 177 * @param archiveDir dir where wals are archived 178 * @param conf configuration to use 179 * @param listeners Listeners on WAL events. Listeners passed here will 180 * be registered before we do anything else; e.g. the 181 * Constructor {@link #rollWriter()}. 182 * @param failIfWALExists If true IOException will be thrown if files related to this wal 183 * already exist. 184 * @param prefix should always be hostname and port in distributed env and 185 * it will be URL encoded before being used. 186 * If prefix is null, "wal" will be used 187 * @param suffix will be url encoded. null is treated as empty. non-empty must start with 188 * {@link AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER} 189 * @throws IOException 190 */ 191 public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir, 192 final String archiveDir, final Configuration conf, 193 final List<WALActionsListener> listeners, 194 final boolean failIfWALExists, final String prefix, final String suffix) 195 throws IOException { 196 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 197 Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS); 198 doFileRolls = operations.isEmpty() || operations.contains(AllowedOperations.all.name()) || 199 operations.contains(AllowedOperations.fileroll.name()); 200 initialized = true; 201 LOG.info("Initialized with file rolling " + (doFileRolls ? "enabled" : "disabled")); 202 } 203 204 private Writer noRollsWriter; 205 206 // creatWriterInstance is where the new pipeline is set up for doing file rolls 207 // if we are skipping it, just keep returning the same writer. 208 @Override 209 protected Writer createWriterInstance(final Path path) throws IOException { 210 // we get called from the FSHLog constructor (!); always roll in this case since 211 // we don't know yet if we're supposed to generally roll and 212 // we need an initial file in the case of doing appends but no rolls. 213 if (!initialized || doFileRolls) { 214 LOG.info("creating new writer instance."); 215 final ProtobufLogWriter writer = new IOTestWriter(); 216 try { 217 writer.init(fs, path, conf, false, this.blocksize); 218 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 219 throw new IOException("Can't create writer instance because underlying FileSystem " + 220 "doesn't support needed stream capabilities.", exception); 221 } 222 if (!initialized) { 223 LOG.info("storing initial writer instance in case file rolling isn't allowed."); 224 noRollsWriter = writer; 225 } 226 return writer; 227 } else { 228 LOG.info("WAL rolling disabled, returning the first writer."); 229 // Initial assignment happens during the constructor call, so there ought not be 230 // a race for first assignment. 231 return noRollsWriter; 232 } 233 } 234 } 235 236 /** 237 * Presumes init will be called by a single thread prior to any access of other methods. 238 */ 239 private static class IOTestWriter extends ProtobufLogWriter { 240 private boolean doAppends; 241 private boolean doSyncs; 242 243 @Override 244 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, 245 long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException { 246 Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS); 247 if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { 248 doAppends = doSyncs = true; 249 } else if (operations.contains(AllowedOperations.none.name())) { 250 doAppends = doSyncs = false; 251 } else { 252 doAppends = operations.contains(AllowedOperations.append.name()); 253 doSyncs = operations.contains(AllowedOperations.sync.name()); 254 } 255 LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") + 256 " and syncs " + (doSyncs ? "enabled" : "disabled")); 257 super.init(fs, path, conf, overwritable, blocksize); 258 } 259 260 @Override 261 protected String getWriterClassName() { 262 return ProtobufLogWriter.class.getSimpleName(); 263 } 264 265 @Override 266 public void append(Entry entry) throws IOException { 267 if (doAppends) { 268 super.append(entry); 269 } 270 } 271 272 @Override 273 public void sync(boolean forceSync) throws IOException { 274 if (doSyncs) { 275 super.sync(forceSync); 276 } 277 } 278 } 279 280 @Override 281 public long getNumLogFiles() { 282 return this.log.getNumLogFiles(); 283 } 284 285 @Override 286 public long getLogFileSize() { 287 return this.log.getLogFileSize(); 288 } 289 290 @Override 291 public void addWALActionsListener(WALActionsListener listener) { 292 // TODO Implement WALProvider.addWALActionLister 293 294 } 295}