001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.wal; 020 021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID; 022import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; 023import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.concurrent.atomic.AtomicBoolean; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.client.RegionInfo; 037// imports for things that haven't moved from regionserver.wal yet. 038import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 039import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 040import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.hbase.wal.WAL.Entry; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * A WAL Provider that returns a single thread safe WAL that optionally can skip parts of our normal 049 * interactions with HDFS. 050 * <p> 051 * This implementation picks a directory in HDFS based on the same mechanisms as the 052 * {@link FSHLogProvider}. Users can configure how much interaction we have with HDFS with the 053 * configuration property "hbase.wal.iotestprovider.operations". The value should be a comma 054 * separated list of allowed operations: 055 * <ul> 056 * <li><em>append</em> : edits will be written to the underlying filesystem</li> 057 * <li><em>sync</em> : wal syncs will result in hflush calls</li> 058 * <li><em>fileroll</em> : roll requests will result in creating a new file on the underlying 059 * filesystem.</li> 060 * </ul> 061 * Additionally, the special cases "all" and "none" are recognized. If ommited, the value defaults 062 * to "all." Behavior is undefined if "all" or "none" are paired with additional values. Behavior is 063 * also undefined if values not listed above are included. 064 * <p> 065 * Only those operations listed will occur between the returned WAL and HDFS. All others will be 066 * no-ops. 067 * <p> 068 * Note that in the case of allowing "append" operations but not allowing "fileroll", the returned 069 * WAL will just keep writing to the same file. This won't avoid all costs associated with file 070 * management over time, becaue the data set size may result in additional HDFS block allocations. 071 */ 072@InterfaceAudience.Private 073public class IOTestProvider implements WALProvider { 074 private static final Logger LOG = LoggerFactory.getLogger(IOTestProvider.class); 075 076 private static final String ALLOWED_OPERATIONS = "hbase.wal.iotestprovider.operations"; 077 private enum AllowedOperations { 078 all, 079 append, 080 sync, 081 fileroll, 082 none; 083 } 084 085 private WALFactory factory; 086 087 private Configuration conf; 088 089 private volatile FSHLog log; 090 091 private String providerId; 092 protected AtomicBoolean initialized = new AtomicBoolean(false); 093 094 private List<WALActionsListener> listeners = new ArrayList<>(); 095 /** 096 * @param factory factory that made us, identity used for FS layout. may not be null 097 * @param conf may not be null 098 * @param providerId differentiate between providers from one facotry, used for FS layout. may be 099 * null 100 */ 101 @Override 102 public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { 103 if (!initialized.compareAndSet(false, true)) { 104 throw new IllegalStateException("WALProvider.init should only be called once."); 105 } 106 this.factory = factory; 107 this.conf = conf; 108 this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID; 109 110 111 } 112 113 @Override 114 public List<WAL> getWALs() { 115 return Collections.singletonList(log); 116 } 117 118 private FSHLog createWAL() throws IOException { 119 String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; 120 return new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), 121 AbstractFSWALProvider.getWALDirectoryName(factory.factoryId), 122 HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, 123 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); 124 } 125 126 @Override 127 public WAL getWAL(RegionInfo region) throws IOException { 128 FSHLog log = this.log; 129 if (log != null) { 130 return log; 131 } 132 synchronized (this) { 133 log = this.log; 134 if (log == null) { 135 log = createWAL(); 136 this.log = log; 137 } 138 } 139 return log; 140 } 141 142 @Override 143 public void close() throws IOException { 144 FSHLog log = this.log; 145 if (log != null) { 146 log.close(); 147 } 148 } 149 150 @Override 151 public void shutdown() throws IOException { 152 FSHLog log = this.log; 153 if (log != null) { 154 log.shutdown(); 155 } 156 } 157 158 private static class IOTestWAL extends FSHLog { 159 160 private final boolean doFileRolls; 161 162 // Used to differntiate between roll calls before and after we finish construction. 163 private final boolean initialized; 164 165 /** 166 * Create an edit log at the given <code>dir</code> location. 167 * 168 * You should never have to load an existing log. If there is a log at 169 * startup, it should have already been processed and deleted by the time the 170 * WAL object is started up. 171 * 172 * @param fs filesystem handle 173 * @param rootDir path to where logs and oldlogs 174 * @param logDir dir where wals are stored 175 * @param archiveDir dir where wals are archived 176 * @param conf configuration to use 177 * @param listeners Listeners on WAL events. Listeners passed here will 178 * be registered before we do anything else; e.g. the 179 * Constructor {@link #rollWriter()}. 180 * @param failIfWALExists If true IOException will be thrown if files related to this wal 181 * already exist. 182 * @param prefix should always be hostname and port in distributed env and 183 * it will be URL encoded before being used. 184 * If prefix is null, "wal" will be used 185 * @param suffix will be url encoded. null is treated as empty. non-empty must start with 186 * {@link AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER} 187 * @throws IOException 188 */ 189 public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir, 190 final String archiveDir, final Configuration conf, 191 final List<WALActionsListener> listeners, 192 final boolean failIfWALExists, final String prefix, final String suffix) 193 throws IOException { 194 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 195 Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS); 196 doFileRolls = operations.isEmpty() || operations.contains(AllowedOperations.all.name()) || 197 operations.contains(AllowedOperations.fileroll.name()); 198 initialized = true; 199 LOG.info("Initialized with file rolling " + (doFileRolls ? "enabled" : "disabled")); 200 } 201 202 private Writer noRollsWriter; 203 204 // creatWriterInstance is where the new pipeline is set up for doing file rolls 205 // if we are skipping it, just keep returning the same writer. 206 @Override 207 protected Writer createWriterInstance(final Path path) throws IOException { 208 // we get called from the FSHLog constructor (!); always roll in this case since 209 // we don't know yet if we're supposed to generally roll and 210 // we need an initial file in the case of doing appends but no rolls. 211 if (!initialized || doFileRolls) { 212 LOG.info("creating new writer instance."); 213 final ProtobufLogWriter writer = new IOTestWriter(); 214 try { 215 writer.init(fs, path, conf, false, this.blocksize); 216 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 217 throw new IOException("Can't create writer instance because underlying FileSystem " + 218 "doesn't support needed stream capabilities.", exception); 219 } 220 if (!initialized) { 221 LOG.info("storing initial writer instance in case file rolling isn't allowed."); 222 noRollsWriter = writer; 223 } 224 return writer; 225 } else { 226 LOG.info("WAL rolling disabled, returning the first writer."); 227 // Initial assignment happens during the constructor call, so there ought not be 228 // a race for first assignment. 229 return noRollsWriter; 230 } 231 } 232 } 233 234 /** 235 * Presumes init will be called by a single thread prior to any access of other methods. 236 */ 237 private static class IOTestWriter extends ProtobufLogWriter { 238 private boolean doAppends; 239 private boolean doSyncs; 240 241 @Override 242 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, 243 long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException { 244 Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS); 245 if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { 246 doAppends = doSyncs = true; 247 } else if (operations.contains(AllowedOperations.none.name())) { 248 doAppends = doSyncs = false; 249 } else { 250 doAppends = operations.contains(AllowedOperations.append.name()); 251 doSyncs = operations.contains(AllowedOperations.sync.name()); 252 } 253 LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") + 254 " and syncs " + (doSyncs ? "enabled" : "disabled")); 255 super.init(fs, path, conf, overwritable, blocksize); 256 } 257 258 @Override 259 protected String getWriterClassName() { 260 return ProtobufLogWriter.class.getSimpleName(); 261 } 262 263 @Override 264 public void append(Entry entry) throws IOException { 265 if (doAppends) { 266 super.append(entry); 267 } 268 } 269 270 @Override 271 public void sync(boolean forceSync) throws IOException { 272 if (doSyncs) { 273 super.sync(forceSync); 274 } 275 } 276 } 277 278 @Override 279 public long getNumLogFiles() { 280 return this.log.getNumLogFiles(); 281 } 282 283 @Override 284 public long getLogFileSize() { 285 return this.log.getLogFileSize(); 286 } 287 288 @Override 289 public void addWALActionsListener(WALActionsListener listener) { 290 // TODO Implement WALProvider.addWALActionLister 291 292 } 293}