001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.atomic.AtomicReference; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.client.RegionInfo; 032import org.apache.hadoop.hbase.client.RegionReplicaUtil; 033import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 034import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 035import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader; 036import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader; 037import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 038import org.apache.hadoop.hbase.util.CancelableProgressable; 039import org.apache.hadoop.hbase.util.CommonFSUtils; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 042import org.apache.hadoop.hbase.wal.WALProvider.Writer; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 048 049/** 050 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the 051 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the 052 * configuration setting "hbase.wal.provider". Available implementations: 053 * <ul> 054 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 055 * "asyncfs"</li> 056 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop 057 * FileSystem interface via an asynchronous client.</li> 058 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 059 * FileSystem interface via HDFS's synchronous DFSClient.</li> 060 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 061 * server.</li> 062 * </ul> 063 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. 064 */ 065@InterfaceAudience.Private 066public class WALFactory { 067 068 /** 069 * Used in tests for injecting customized stream reader implementation, for example, inject fault 070 * when reading, etc. 071 * <p/> 072 * After removing the sequence file based WAL, we always use protobuf based WAL reader, and we 073 * will also determine whether the WAL file is encrypted and we should use 074 * {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the 075 * header of the WAL file, so we do not need to specify a specical reader to read the WAL file 076 * either. 077 * <p/> 078 * So typically you should not use this config in production. 079 */ 080 public static final String WAL_STREAM_READER_CLASS_IMPL = 081 "hbase.regionserver.wal.stream.reader.impl"; 082 083 private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); 084 085 /** 086 * Maps between configuration names for providers and implementation classes. 087 */ 088 enum Providers { 089 defaultProvider(AsyncFSWALProvider.class), 090 filesystem(FSHLogProvider.class), 091 multiwal(RegionGroupingProvider.class), 092 asyncfs(AsyncFSWALProvider.class); 093 094 final Class<? extends WALProvider> clazz; 095 096 Providers(Class<? extends WALProvider> clazz) { 097 this.clazz = clazz; 098 } 099 } 100 101 public static final String WAL_PROVIDER = "hbase.wal.provider"; 102 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 103 104 public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 105 106 public static final String REPLICATION_WAL_PROVIDER = "hbase.wal.replication_provider"; 107 108 public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; 109 110 static final String REPLICATION_WAL_PROVIDER_ID = "rep"; 111 112 final String factoryId; 113 final Abortable abortable; 114 private final WALProvider provider; 115 // The meta updates are written to a different wal. If this 116 // regionserver holds meta regions, then this ref will be non-null. 117 // lazily intialized; most RegionServers don't deal with META 118 private final LazyInitializedWALProvider metaProvider; 119 // This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and 120 // generate a lot useless data, see HBASE-27775 for more details. 121 private final LazyInitializedWALProvider replicationProvider; 122 123 /** 124 * Configuration-specified WAL Reader used when a custom reader is requested 125 */ 126 private final Class<? extends WALStreamReader> walStreamReaderClass; 127 128 /** 129 * How long to attempt opening in-recovery wals 130 */ 131 private final int timeoutMillis; 132 133 private final Configuration conf; 134 135 private final ExcludeDatanodeManager excludeDatanodeManager; 136 137 // Used for the singleton WALFactory, see below. 138 private WALFactory(Configuration conf) { 139 // this code is duplicated here so we can keep our members final. 140 // until we've moved reader/writer construction down into providers, this initialization must 141 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 142 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 143 /* TODO Both of these are probably specific to the fs wal provider */ 144 walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL, 145 ProtobufWALStreamReader.class, WALStreamReader.class); 146 Preconditions.checkArgument( 147 AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass), 148 "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(), 149 AbstractFSWALProvider.Initializer.class.getName()); 150 this.conf = conf; 151 // end required early initialization 152 153 // this instance can't create wals, just reader/writers. 154 provider = null; 155 factoryId = SINGLETON_ID; 156 this.abortable = null; 157 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 158 this.metaProvider = null; 159 this.replicationProvider = null; 160 } 161 162 Providers getDefaultProvider() { 163 return Providers.defaultProvider; 164 } 165 166 Class<? extends WALProvider> getProviderClass(String key, String defaultValue) { 167 try { 168 Providers provider = Providers.valueOf(conf.get(key, defaultValue)); 169 170 // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as 171 // the default and we can't use it, we want to fall back to FSHLog which we know works on 172 // all versions. 173 if ( 174 provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class 175 && !AsyncFSWALProvider.load() 176 ) { 177 // AsyncFSWAL has better performance in most cases, and also uses less resources, we will 178 // try to use it if possible. It deeply hacks into the internal of DFSClient so will be 179 // easily broken when upgrading hadoop. 180 LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider"); 181 return FSHLogProvider.class; 182 } 183 184 // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't 185 // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and 186 // not fall back to FSHLogProvider. 187 return provider.clazz; 188 } catch (IllegalArgumentException exception) { 189 // Fall back to them specifying a class name 190 // Note that the passed default class shouldn't actually be used, since the above only fails 191 // when there is a config value present. 192 return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); 193 } 194 } 195 196 static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException { 197 LOG.info("Instantiating WALProvider of type {}", clazz); 198 try { 199 return clazz.getDeclaredConstructor().newInstance(); 200 } catch (Exception e) { 201 LOG.error("couldn't set up WALProvider, the configured class is " + clazz); 202 LOG.debug("Exception details for failure to load WALProvider.", e); 203 throw new IOException("couldn't set up WALProvider", e); 204 } 205 } 206 207 /** 208 * Create a WALFactory. 209 */ 210 @RestrictedApi(explanation = "Should only be called in tests", link = "", 211 allowedOnPath = ".*/src/test/.*|.*/HBaseTestingUtility.java|.*/WALPerformanceEvaluation.java") 212 public WALFactory(Configuration conf, String factoryId) throws IOException { 213 // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider 214 // for HMaster or HRegionServer which take system table only. See HBASE-19999 215 this(conf, factoryId, null, true); 216 } 217 218 /** 219 * Create a WALFactory. 220 * <p/> 221 * This is the constructor you should use when creating a WALFactory in normal code, to make sure 222 * that the {@code factoryId} is the server name. We need this assumption in some places for 223 * parsing the server name out from the wal file name. 224 * @param conf must not be null, will keep a reference to read params in later reader/writer 225 * instances. 226 * @param serverName use to generate the factoryId, which will be append at the first of the final 227 * file name 228 * @param abortable the server associated with this WAL file 229 */ 230 public WALFactory(Configuration conf, ServerName serverName, Abortable abortable) 231 throws IOException { 232 this(conf, serverName.toString(), abortable, false); 233 } 234 235 private static void createWALDirectory(Configuration conf, String factoryId) throws IOException { 236 FileSystem walFs = CommonFSUtils.getWALFileSystem(conf); 237 Path walRootDir = CommonFSUtils.getWALRootDir(conf); 238 Path walDir = new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(factoryId)); 239 if (!walFs.exists(walDir) && !walFs.mkdirs(walDir)) { 240 throw new IOException("Can not create wal directory " + walDir); 241 } 242 } 243 244 /** 245 * @param conf must not be null, will keep a reference to read params in later 246 * reader/writer instances. 247 * @param factoryId a unique identifier for this factory. used i.e. by filesystem 248 * implementations to make a directory 249 * @param abortable the server associated with this WAL file 250 * @param createWalDirectory pass {@code true} for testing purpose, to create the wal directory 251 * automatically. In normal code path, we should create it in 252 * HRegionServer setup. 253 */ 254 private WALFactory(Configuration conf, String factoryId, Abortable abortable, 255 boolean createWalDirectory) throws IOException { 256 // until we've moved reader/writer construction down into providers, this initialization must 257 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 258 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 259 /* TODO Both of these are probably specific to the fs wal provider */ 260 walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL, 261 ProtobufWALStreamReader.class, WALStreamReader.class); 262 Preconditions.checkArgument( 263 AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass), 264 "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(), 265 AbstractFSWALProvider.Initializer.class.getName()); 266 this.conf = conf; 267 this.factoryId = factoryId; 268 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 269 this.abortable = abortable; 270 this.metaProvider = new LazyInitializedWALProvider(this, 271 AbstractFSWALProvider.META_WAL_PROVIDER_ID, META_WAL_PROVIDER, this.abortable); 272 this.replicationProvider = new LazyInitializedWALProvider(this, REPLICATION_WAL_PROVIDER_ID, 273 REPLICATION_WAL_PROVIDER, this.abortable); 274 // end required early initialization 275 if (conf.getBoolean(WAL_ENABLED, true)) { 276 if (createWalDirectory) { 277 // for testing only 278 createWALDirectory(conf, factoryId); 279 } 280 WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 281 provider.init(this, conf, null, this.abortable); 282 provider.addWALActionsListener(new MetricsWAL()); 283 this.provider = provider; 284 } else { 285 // special handling of existing configuration behavior. 286 LOG.warn("Running with WAL disabled."); 287 provider = new DisabledWALProvider(); 288 provider.init(this, conf, factoryId, null); 289 } 290 } 291 292 public Configuration getConf() { 293 return conf; 294 } 295 296 /** 297 * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to 298 * replay and edits that have gone to any wals from this factory. 299 */ 300 public void close() throws IOException { 301 List<IOException> ioes = new ArrayList<>(); 302 // these fields could be null if the WALFactory is created only for being used in the 303 // getInstance method. 304 if (metaProvider != null) { 305 try { 306 metaProvider.close(); 307 } catch (IOException e) { 308 ioes.add(e); 309 } 310 } 311 if (replicationProvider != null) { 312 try { 313 replicationProvider.close(); 314 } catch (IOException e) { 315 ioes.add(e); 316 } 317 } 318 if (provider != null) { 319 try { 320 provider.close(); 321 } catch (IOException e) { 322 ioes.add(e); 323 } 324 } 325 if (!ioes.isEmpty()) { 326 IOException ioe = new IOException("Failed to close WALFactory"); 327 for (IOException e : ioes) { 328 ioe.addSuppressed(e); 329 } 330 throw ioe; 331 } 332 } 333 334 /** 335 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you 336 * are not ending cleanly and will need to replay edits from this factory's wals, use this method 337 * if you can as it will try to leave things as tidy as possible. 338 */ 339 public void shutdown() throws IOException { 340 List<IOException> ioes = new ArrayList<>(); 341 // these fields could be null if the WALFactory is created only for being used in the 342 // getInstance method. 343 if (metaProvider != null) { 344 try { 345 metaProvider.shutdown(); 346 } catch (IOException e) { 347 ioes.add(e); 348 } 349 } 350 if (replicationProvider != null) { 351 try { 352 replicationProvider.shutdown(); 353 } catch (IOException e) { 354 ioes.add(e); 355 } 356 } 357 if (provider != null) { 358 try { 359 provider.shutdown(); 360 } catch (IOException e) { 361 ioes.add(e); 362 } 363 } 364 if (!ioes.isEmpty()) { 365 IOException ioe = new IOException("Failed to shutdown WALFactory"); 366 for (IOException e : ioes) { 367 ioe.addSuppressed(e); 368 } 369 throw ioe; 370 } 371 } 372 373 public List<WAL> getWALs() { 374 return provider.getWALs(); 375 } 376 377 @RestrictedApi(explanation = "Should only be called in tests", link = "", 378 allowedOnPath = ".*/src/test/.*") 379 WALProvider getMetaProvider() throws IOException { 380 return metaProvider.getProvider(); 381 } 382 383 @RestrictedApi(explanation = "Should only be called in tests", link = "", 384 allowedOnPath = ".*/src/test/.*") 385 WALProvider getReplicationProvider() throws IOException { 386 return replicationProvider.getProvider(); 387 } 388 389 /** 390 * @param region the region which we want to get a WAL for. Could be null. 391 */ 392 public WAL getWAL(RegionInfo region) throws IOException { 393 // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up. 394 if (region != null && RegionReplicaUtil.isDefaultReplica(region)) { 395 if (region.isMetaRegion()) { 396 return metaProvider.getProvider().getWAL(region); 397 } else if (ReplicationStorageFactory.isReplicationQueueTable(conf, region.getTable())) { 398 return replicationProvider.getProvider().getWAL(region); 399 } 400 } 401 return provider.getWAL(region); 402 } 403 404 public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException { 405 return createStreamReader(fs, path, (CancelableProgressable) null); 406 } 407 408 /** 409 * Create a one-way stream reader for the WAL. 410 * @return A WAL reader. Close when done with it. 411 */ 412 public WALStreamReader createStreamReader(FileSystem fs, Path path, 413 CancelableProgressable reporter) throws IOException { 414 return createStreamReader(fs, path, reporter, -1); 415 } 416 417 /** 418 * Create a one-way stream reader for the WAL, and start reading from the given 419 * {@code startPosition}. 420 * @return A WAL reader. Close when done with it. 421 */ 422 public WALStreamReader createStreamReader(FileSystem fs, Path path, 423 CancelableProgressable reporter, long startPosition) throws IOException { 424 try { 425 // A wal file could be under recovery, so it may take several 426 // tries to get it open. Instead of claiming it is corrupted, retry 427 // to open it up to 5 minutes by default. 428 long startWaiting = EnvironmentEdgeManager.currentTime(); 429 long openTimeout = timeoutMillis + startWaiting; 430 int nbAttempt = 0; 431 WALStreamReader reader = null; 432 while (true) { 433 try { 434 reader = walStreamReaderClass.getDeclaredConstructor().newInstance(); 435 ((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition); 436 return reader; 437 } catch (Exception e) { 438 // catch Exception so that we close reader for all exceptions. If we don't 439 // close the reader, we leak a socket. 440 if (reader != null) { 441 reader.close(); 442 } 443 444 // Only inspect the Exception to consider retry when it's an IOException 445 if (e instanceof IOException) { 446 String msg = e.getMessage(); 447 if ( 448 msg != null && (msg.contains("Cannot obtain block length") 449 || msg.contains("Could not obtain the last block") 450 || msg.matches("Blocklist for [^ ]* has changed.*")) 451 ) { 452 if (++nbAttempt == 1) { 453 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 454 } 455 if (reporter != null && !reporter.progress()) { 456 throw new InterruptedIOException("Operation is cancelled"); 457 } 458 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 459 LOG.error("Can't open after " + nbAttempt + " attempts and " 460 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); 461 } else { 462 try { 463 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 464 continue; // retry 465 } catch (InterruptedException ie) { 466 InterruptedIOException iioe = new InterruptedIOException(); 467 iioe.initCause(ie); 468 throw iioe; 469 } 470 } 471 throw new LeaseNotRecoveredException(e); 472 } 473 } 474 475 // Rethrow the original exception if we are not retrying due to HDFS-isms. 476 throw e; 477 } 478 } 479 } catch (IOException ie) { 480 throw ie; 481 } catch (Exception e) { 482 throw new IOException("Cannot get log reader", e); 483 } 484 } 485 486 /** 487 * Create a writer for the WAL. Uses defaults. 488 * <p> 489 * Should be package-private. public only for tests and 490 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 491 * @return A WAL writer. Close when done with it. 492 */ 493 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 494 return FSHLogProvider.createWriter(conf, fs, path, false); 495 } 496 497 /** 498 * Should be package-private, visible for recovery testing. Uses defaults. 499 * @return an overwritable writer for recovered edits. caller should close. 500 */ 501 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 502 throws IOException { 503 return FSHLogProvider.createWriter(conf, fs, path, true); 504 } 505 506 // These static methods are currently used where it's impractical to 507 // untangle the reliance on state in the filesystem. They rely on singleton 508 // WALFactory that just provides Reader / Writers. 509 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 510 private static final AtomicReference<WALFactory> singleton = new AtomicReference<>(); 511 private static final String SINGLETON_ID = WALFactory.class.getName(); 512 513 // Public only for FSHLog 514 public static WALFactory getInstance(Configuration configuration) { 515 WALFactory factory = singleton.get(); 516 if (null == factory) { 517 WALFactory temp = new WALFactory(configuration); 518 if (singleton.compareAndSet(null, temp)) { 519 factory = temp; 520 } else { 521 // someone else beat us to initializing 522 try { 523 temp.close(); 524 } catch (IOException exception) { 525 LOG.debug("failed to close temporary singleton. ignoring.", exception); 526 } 527 factory = singleton.get(); 528 } 529 } 530 return factory; 531 } 532 533 /** 534 * Create a tailing reader for the given path. Mainly used in replication. 535 */ 536 public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf, 537 long startPosition) throws IOException { 538 ProtobufWALTailingReader reader = new ProtobufWALTailingReader(); 539 reader.init(fs, path, conf, startPosition); 540 return reader; 541 } 542 543 /** 544 * Create a one-way stream reader for a given path. 545 */ 546 public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf) 547 throws IOException { 548 return createStreamReader(fs, path, conf, -1); 549 } 550 551 /** 552 * Create a one-way stream reader for a given path. 553 */ 554 public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf, 555 long startPosition) throws IOException { 556 return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null, 557 startPosition); 558 } 559 560 /** 561 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 562 * @return a Writer that will overwrite files. Caller must close. 563 */ 564 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 565 final Configuration configuration) throws IOException { 566 return FSHLogProvider.createWriter(configuration, fs, path, true); 567 } 568 569 /** 570 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 571 * @return a writer that won't overwrite files. Caller must close. 572 */ 573 public static Writer createWALWriter(final FileSystem fs, final Path path, 574 final Configuration configuration) throws IOException { 575 return FSHLogProvider.createWriter(configuration, fs, path, false); 576 } 577 578 public WALProvider getWALProvider() { 579 return this.provider; 580 } 581 582 /** 583 * Returns all the wal providers, for example, the default one, the one for hbase:meta and the one 584 * for hbase:replication. 585 */ 586 public List<WALProvider> getAllWALProviders() { 587 List<WALProvider> providers = new ArrayList<>(); 588 if (provider != null) { 589 providers.add(provider); 590 } 591 WALProvider meta = metaProvider.getProviderNoCreate(); 592 if (meta != null) { 593 providers.add(meta); 594 } 595 WALProvider replication = replicationProvider.getProviderNoCreate(); 596 if (replication != null) { 597 providers.add(replication); 598 } 599 return providers; 600 } 601 602 public ExcludeDatanodeManager getExcludeDatanodeManager() { 603 return excludeDatanodeManager; 604 } 605 606 @RestrictedApi(explanation = "Should only be called in tests", link = "", 607 allowedOnPath = ".*/src/test/.*") 608 public String getFactoryId() { 609 return this.factoryId; 610 } 611}