001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.atomic.AtomicReference; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.client.RegionInfo; 032import org.apache.hadoop.hbase.client.RegionReplicaUtil; 033import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 034import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 035import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader; 036import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader; 037import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 038import org.apache.hadoop.hbase.util.CancelableProgressable; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 041import org.apache.hadoop.hbase.wal.WALProvider.Writer; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 047 048/** 049 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the 050 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the 051 * configuration setting "hbase.wal.provider". Available implementations: 052 * <ul> 053 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 054 * "asyncfs"</li> 055 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop 056 * FileSystem interface via an asynchronous client.</li> 057 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 058 * FileSystem interface via HDFS's synchronous DFSClient.</li> 059 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 060 * server.</li> 061 * </ul> 062 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. 063 */ 064@InterfaceAudience.Private 065public class WALFactory { 066 067 /** 068 * Used in tests for injecting customized stream reader implementation, for example, inject fault 069 * when reading, etc. 070 * <p/> 071 * After removing the sequence file based WAL, we always use protobuf based WAL reader, and we 072 * will also determine whether the WAL file is encrypted and we should use 073 * {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the 074 * header of the WAL file, so we do not need to specify a specical reader to read the WAL file 075 * either. 076 * <p/> 077 * So typically you should not use this config in production. 078 */ 079 public static final String WAL_STREAM_READER_CLASS_IMPL = 080 "hbase.regionserver.wal.stream.reader.impl"; 081 082 private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); 083 084 /** 085 * Maps between configuration names for providers and implementation classes. 086 */ 087 enum Providers { 088 defaultProvider(AsyncFSWALProvider.class), 089 filesystem(FSHLogProvider.class), 090 multiwal(RegionGroupingProvider.class), 091 asyncfs(AsyncFSWALProvider.class); 092 093 final Class<? extends WALProvider> clazz; 094 095 Providers(Class<? extends WALProvider> clazz) { 096 this.clazz = clazz; 097 } 098 } 099 100 public static final String WAL_PROVIDER = "hbase.wal.provider"; 101 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 102 103 public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 104 105 public static final String REPLICATION_WAL_PROVIDER = "hbase.wal.replication_provider"; 106 107 public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; 108 109 static final String REPLICATION_WAL_PROVIDER_ID = "rep"; 110 111 final String factoryId; 112 final Abortable abortable; 113 private final WALProvider provider; 114 // The meta updates are written to a different wal. If this 115 // regionserver holds meta regions, then this ref will be non-null. 116 // lazily intialized; most RegionServers don't deal with META 117 private final LazyInitializedWALProvider metaProvider; 118 // This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and 119 // generate a lot useless data, see HBASE-27775 for more details. 120 private final LazyInitializedWALProvider replicationProvider; 121 122 /** 123 * Configuration-specified WAL Reader used when a custom reader is requested 124 */ 125 private final Class<? extends WALStreamReader> walStreamReaderClass; 126 127 /** 128 * How long to attempt opening in-recovery wals 129 */ 130 private final int timeoutMillis; 131 132 private final Configuration conf; 133 134 private final ExcludeDatanodeManager excludeDatanodeManager; 135 136 // Used for the singleton WALFactory, see below. 137 private WALFactory(Configuration conf) { 138 // this code is duplicated here so we can keep our members final. 139 // until we've moved reader/writer construction down into providers, this initialization must 140 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 141 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 142 /* TODO Both of these are probably specific to the fs wal provider */ 143 walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL, 144 ProtobufWALStreamReader.class, WALStreamReader.class); 145 Preconditions.checkArgument( 146 AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass), 147 "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(), 148 AbstractFSWALProvider.Initializer.class.getName()); 149 this.conf = conf; 150 // end required early initialization 151 152 // this instance can't create wals, just reader/writers. 153 provider = null; 154 factoryId = SINGLETON_ID; 155 this.abortable = null; 156 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 157 this.metaProvider = null; 158 this.replicationProvider = null; 159 } 160 161 Providers getDefaultProvider() { 162 return Providers.defaultProvider; 163 } 164 165 Class<? extends WALProvider> getProviderClass(String key, String defaultValue) { 166 try { 167 Providers provider = Providers.valueOf(conf.get(key, defaultValue)); 168 169 // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as 170 // the default and we can't use it, we want to fall back to FSHLog which we know works on 171 // all versions. 172 if ( 173 provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class 174 && !AsyncFSWALProvider.load() 175 ) { 176 // AsyncFSWAL has better performance in most cases, and also uses less resources, we will 177 // try to use it if possible. It deeply hacks into the internal of DFSClient so will be 178 // easily broken when upgrading hadoop. 179 LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider"); 180 return FSHLogProvider.class; 181 } 182 183 // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't 184 // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and 185 // not fall back to FSHLogProvider. 186 return provider.clazz; 187 } catch (IllegalArgumentException exception) { 188 // Fall back to them specifying a class name 189 // Note that the passed default class shouldn't actually be used, since the above only fails 190 // when there is a config value present. 191 return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); 192 } 193 } 194 195 static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException { 196 LOG.info("Instantiating WALProvider of type {}", clazz); 197 try { 198 return clazz.getDeclaredConstructor().newInstance(); 199 } catch (Exception e) { 200 LOG.error("couldn't set up WALProvider, the configured class is " + clazz); 201 LOG.debug("Exception details for failure to load WALProvider.", e); 202 throw new IOException("couldn't set up WALProvider", e); 203 } 204 } 205 206 /** 207 * Create a WALFactory. 208 */ 209 @RestrictedApi(explanation = "Should only be called in tests", link = "", 210 allowedOnPath = ".*/src/test/.*|.*/HBaseTestingUtility.java|.*/WALPerformanceEvaluation.java") 211 public WALFactory(Configuration conf, String factoryId) throws IOException { 212 // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider 213 // for HMaster or HRegionServer which take system table only. See HBASE-19999 214 this(conf, factoryId, null); 215 } 216 217 /** 218 * Create a WALFactory. 219 * <p/> 220 * This is the constructor you should use when creating a WALFactory in normal code, to make sure 221 * that the {@code factoryId} is the server name. We need this assumption in some places for 222 * parsing the server name out from the wal file name. 223 * @param conf must not be null, will keep a reference to read params in later reader/writer 224 * instances. 225 * @param serverName use to generate the factoryId, which will be append at the first of the final 226 * file name 227 * @param abortable the server associated with this WAL file 228 */ 229 public WALFactory(Configuration conf, ServerName serverName, Abortable abortable) 230 throws IOException { 231 this(conf, serverName.toString(), abortable); 232 } 233 234 /** 235 * @param conf must not be null, will keep a reference to read params in later reader/writer 236 * instances. 237 * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations 238 * to make a directory 239 * @param abortable the server associated with this WAL file 240 */ 241 private WALFactory(Configuration conf, String factoryId, Abortable abortable) throws IOException { 242 // until we've moved reader/writer construction down into providers, this initialization must 243 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 244 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 245 /* TODO Both of these are probably specific to the fs wal provider */ 246 walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL, 247 ProtobufWALStreamReader.class, WALStreamReader.class); 248 Preconditions.checkArgument( 249 AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass), 250 "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(), 251 AbstractFSWALProvider.Initializer.class.getName()); 252 this.conf = conf; 253 this.factoryId = factoryId; 254 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 255 this.abortable = abortable; 256 this.metaProvider = new LazyInitializedWALProvider(this, 257 AbstractFSWALProvider.META_WAL_PROVIDER_ID, META_WAL_PROVIDER, this.abortable); 258 this.replicationProvider = new LazyInitializedWALProvider(this, REPLICATION_WAL_PROVIDER_ID, 259 REPLICATION_WAL_PROVIDER, this.abortable); 260 // end required early initialization 261 if (conf.getBoolean(WAL_ENABLED, true)) { 262 WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 263 provider.init(this, conf, null, this.abortable); 264 provider.addWALActionsListener(new MetricsWAL()); 265 this.provider = provider; 266 } else { 267 // special handling of existing configuration behavior. 268 LOG.warn("Running with WAL disabled."); 269 provider = new DisabledWALProvider(); 270 provider.init(this, conf, factoryId, null); 271 } 272 } 273 274 public Configuration getConf() { 275 return conf; 276 } 277 278 /** 279 * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to 280 * replay and edits that have gone to any wals from this factory. 281 */ 282 public void close() throws IOException { 283 List<IOException> ioes = new ArrayList<>(); 284 // these fields could be null if the WALFactory is created only for being used in the 285 // getInstance method. 286 if (metaProvider != null) { 287 try { 288 metaProvider.close(); 289 } catch (IOException e) { 290 ioes.add(e); 291 } 292 } 293 if (replicationProvider != null) { 294 try { 295 replicationProvider.close(); 296 } catch (IOException e) { 297 ioes.add(e); 298 } 299 } 300 if (provider != null) { 301 try { 302 provider.close(); 303 } catch (IOException e) { 304 ioes.add(e); 305 } 306 } 307 if (!ioes.isEmpty()) { 308 IOException ioe = new IOException("Failed to close WALFactory"); 309 for (IOException e : ioes) { 310 ioe.addSuppressed(e); 311 } 312 throw ioe; 313 } 314 } 315 316 /** 317 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you 318 * are not ending cleanly and will need to replay edits from this factory's wals, use this method 319 * if you can as it will try to leave things as tidy as possible. 320 */ 321 public void shutdown() throws IOException { 322 List<IOException> ioes = new ArrayList<>(); 323 // these fields could be null if the WALFactory is created only for being used in the 324 // getInstance method. 325 if (metaProvider != null) { 326 try { 327 metaProvider.shutdown(); 328 } catch (IOException e) { 329 ioes.add(e); 330 } 331 } 332 if (replicationProvider != null) { 333 try { 334 replicationProvider.shutdown(); 335 } catch (IOException e) { 336 ioes.add(e); 337 } 338 } 339 if (provider != null) { 340 try { 341 provider.shutdown(); 342 } catch (IOException e) { 343 ioes.add(e); 344 } 345 } 346 if (!ioes.isEmpty()) { 347 IOException ioe = new IOException("Failed to shutdown WALFactory"); 348 for (IOException e : ioes) { 349 ioe.addSuppressed(e); 350 } 351 throw ioe; 352 } 353 } 354 355 public List<WAL> getWALs() { 356 return provider.getWALs(); 357 } 358 359 @RestrictedApi(explanation = "Should only be called in tests", link = "", 360 allowedOnPath = ".*/src/test/.*") 361 WALProvider getMetaProvider() throws IOException { 362 return metaProvider.getProvider(); 363 } 364 365 @RestrictedApi(explanation = "Should only be called in tests", link = "", 366 allowedOnPath = ".*/src/test/.*") 367 WALProvider getReplicationProvider() throws IOException { 368 return replicationProvider.getProvider(); 369 } 370 371 /** 372 * @param region the region which we want to get a WAL for. Could be null. 373 */ 374 public WAL getWAL(RegionInfo region) throws IOException { 375 // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up. 376 if (region != null && RegionReplicaUtil.isDefaultReplica(region)) { 377 if (region.isMetaRegion()) { 378 return metaProvider.getProvider().getWAL(region); 379 } else if (ReplicationStorageFactory.isReplicationQueueTable(conf, region.getTable())) { 380 return replicationProvider.getProvider().getWAL(region); 381 } 382 } 383 return provider.getWAL(region); 384 } 385 386 public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException { 387 return createStreamReader(fs, path, (CancelableProgressable) null); 388 } 389 390 /** 391 * Create a one-way stream reader for the WAL. 392 * @return A WAL reader. Close when done with it. 393 */ 394 public WALStreamReader createStreamReader(FileSystem fs, Path path, 395 CancelableProgressable reporter) throws IOException { 396 return createStreamReader(fs, path, reporter, -1); 397 } 398 399 /** 400 * Create a one-way stream reader for the WAL, and start reading from the given 401 * {@code startPosition}. 402 * @return A WAL reader. Close when done with it. 403 */ 404 public WALStreamReader createStreamReader(FileSystem fs, Path path, 405 CancelableProgressable reporter, long startPosition) throws IOException { 406 try { 407 // A wal file could be under recovery, so it may take several 408 // tries to get it open. Instead of claiming it is corrupted, retry 409 // to open it up to 5 minutes by default. 410 long startWaiting = EnvironmentEdgeManager.currentTime(); 411 long openTimeout = timeoutMillis + startWaiting; 412 int nbAttempt = 0; 413 WALStreamReader reader = null; 414 while (true) { 415 try { 416 reader = walStreamReaderClass.getDeclaredConstructor().newInstance(); 417 ((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition); 418 return reader; 419 } catch (Exception e) { 420 // catch Exception so that we close reader for all exceptions. If we don't 421 // close the reader, we leak a socket. 422 if (reader != null) { 423 reader.close(); 424 } 425 426 // Only inspect the Exception to consider retry when it's an IOException 427 if (e instanceof IOException) { 428 String msg = e.getMessage(); 429 if ( 430 msg != null && (msg.contains("Cannot obtain block length") 431 || msg.contains("Could not obtain the last block") 432 || msg.matches("Blocklist for [^ ]* has changed.*")) 433 ) { 434 if (++nbAttempt == 1) { 435 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 436 } 437 if (reporter != null && !reporter.progress()) { 438 throw new InterruptedIOException("Operation is cancelled"); 439 } 440 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 441 LOG.error("Can't open after " + nbAttempt + " attempts and " 442 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); 443 } else { 444 try { 445 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 446 continue; // retry 447 } catch (InterruptedException ie) { 448 InterruptedIOException iioe = new InterruptedIOException(); 449 iioe.initCause(ie); 450 throw iioe; 451 } 452 } 453 throw new LeaseNotRecoveredException(e); 454 } 455 } 456 457 // Rethrow the original exception if we are not retrying due to HDFS-isms. 458 throw e; 459 } 460 } 461 } catch (IOException ie) { 462 throw ie; 463 } catch (Exception e) { 464 throw new IOException("Cannot get log reader", e); 465 } 466 } 467 468 /** 469 * Create a writer for the WAL. Uses defaults. 470 * <p> 471 * Should be package-private. public only for tests and 472 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 473 * @return A WAL writer. Close when done with it. 474 */ 475 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 476 return FSHLogProvider.createWriter(conf, fs, path, false); 477 } 478 479 /** 480 * Should be package-private, visible for recovery testing. Uses defaults. 481 * @return an overwritable writer for recovered edits. caller should close. 482 */ 483 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 484 throws IOException { 485 return FSHLogProvider.createWriter(conf, fs, path, true); 486 } 487 488 // These static methods are currently used where it's impractical to 489 // untangle the reliance on state in the filesystem. They rely on singleton 490 // WALFactory that just provides Reader / Writers. 491 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 492 private static final AtomicReference<WALFactory> singleton = new AtomicReference<>(); 493 private static final String SINGLETON_ID = WALFactory.class.getName(); 494 495 // Public only for FSHLog 496 public static WALFactory getInstance(Configuration configuration) { 497 WALFactory factory = singleton.get(); 498 if (null == factory) { 499 WALFactory temp = new WALFactory(configuration); 500 if (singleton.compareAndSet(null, temp)) { 501 factory = temp; 502 } else { 503 // someone else beat us to initializing 504 try { 505 temp.close(); 506 } catch (IOException exception) { 507 LOG.debug("failed to close temporary singleton. ignoring.", exception); 508 } 509 factory = singleton.get(); 510 } 511 } 512 return factory; 513 } 514 515 /** 516 * Create a tailing reader for the given path. Mainly used in replication. 517 */ 518 public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf, 519 long startPosition) throws IOException { 520 ProtobufWALTailingReader reader = new ProtobufWALTailingReader(); 521 reader.init(fs, path, conf, startPosition); 522 return reader; 523 } 524 525 /** 526 * Create a one-way stream reader for a given path. 527 */ 528 public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf) 529 throws IOException { 530 return createStreamReader(fs, path, conf, -1); 531 } 532 533 /** 534 * Create a one-way stream reader for a given path. 535 */ 536 public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf, 537 long startPosition) throws IOException { 538 return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null, 539 startPosition); 540 } 541 542 /** 543 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 544 * @return a Writer that will overwrite files. Caller must close. 545 */ 546 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 547 final Configuration configuration) throws IOException { 548 return FSHLogProvider.createWriter(configuration, fs, path, true); 549 } 550 551 /** 552 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 553 * @return a writer that won't overwrite files. Caller must close. 554 */ 555 public static Writer createWALWriter(final FileSystem fs, final Path path, 556 final Configuration configuration) throws IOException { 557 return FSHLogProvider.createWriter(configuration, fs, path, false); 558 } 559 560 public WALProvider getWALProvider() { 561 return this.provider; 562 } 563 564 /** 565 * Returns all the wal providers, for example, the default one, the one for hbase:meta and the one 566 * for hbase:replication. 567 */ 568 public List<WALProvider> getAllWALProviders() { 569 List<WALProvider> providers = new ArrayList<>(); 570 if (provider != null) { 571 providers.add(provider); 572 } 573 WALProvider meta = metaProvider.getProviderNoCreate(); 574 if (meta != null) { 575 providers.add(meta); 576 } 577 WALProvider replication = replicationProvider.getProviderNoCreate(); 578 if (replication != null) { 579 providers.add(replication); 580 } 581 return providers; 582 } 583 584 public ExcludeDatanodeManager getExcludeDatanodeManager() { 585 return excludeDatanodeManager; 586 } 587 588 @RestrictedApi(explanation = "Should only be called in tests", link = "", 589 allowedOnPath = ".*/src/test/.*") 590 public String getFactoryId() { 591 return this.factoryId; 592 } 593}