001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT; 026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; 027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT; 028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY; 029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT; 030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; 031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; 032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; 033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; 034 035import io.opentelemetry.api.trace.Span; 036import io.opentelemetry.api.trace.StatusCode; 037import io.opentelemetry.context.Scope; 038import java.io.IOException; 039import java.io.PrintWriter; 040import java.lang.management.MemoryUsage; 041import java.lang.reflect.Constructor; 042import java.net.InetSocketAddress; 043import java.time.Duration; 044import java.util.ArrayList; 045import java.util.Collection; 046import java.util.Collections; 047import java.util.Comparator; 048import java.util.HashSet; 049import java.util.Iterator; 050import java.util.List; 051import java.util.Map; 052import java.util.Map.Entry; 053import java.util.Objects; 054import java.util.Optional; 055import java.util.Set; 056import java.util.SortedMap; 057import java.util.Timer; 058import java.util.TimerTask; 059import java.util.TreeMap; 060import java.util.TreeSet; 061import java.util.concurrent.ConcurrentHashMap; 062import java.util.concurrent.ConcurrentMap; 063import java.util.concurrent.ConcurrentSkipListMap; 064import java.util.concurrent.ThreadLocalRandom; 065import java.util.concurrent.TimeUnit; 066import java.util.concurrent.atomic.AtomicBoolean; 067import java.util.concurrent.locks.ReentrantReadWriteLock; 068import java.util.stream.Collectors; 069import javax.management.MalformedObjectNameException; 070import javax.servlet.http.HttpServlet; 071import org.apache.commons.lang3.StringUtils; 072import org.apache.commons.lang3.mutable.MutableFloat; 073import org.apache.hadoop.conf.Configuration; 074import org.apache.hadoop.fs.FileSystem; 075import org.apache.hadoop.fs.Path; 076import org.apache.hadoop.hbase.Abortable; 077import org.apache.hadoop.hbase.CacheEvictionStats; 078import org.apache.hadoop.hbase.CallQueueTooBigException; 079import org.apache.hadoop.hbase.ClockOutOfSyncException; 080import org.apache.hadoop.hbase.DoNotRetryIOException; 081import org.apache.hadoop.hbase.ExecutorStatusChore; 082import org.apache.hadoop.hbase.HBaseConfiguration; 083import org.apache.hadoop.hbase.HBaseInterfaceAudience; 084import org.apache.hadoop.hbase.HBaseServerBase; 085import org.apache.hadoop.hbase.HConstants; 086import org.apache.hadoop.hbase.HDFSBlocksDistribution; 087import org.apache.hadoop.hbase.HRegionLocation; 088import org.apache.hadoop.hbase.HealthCheckChore; 089import org.apache.hadoop.hbase.MetaTableAccessor; 090import org.apache.hadoop.hbase.NotServingRegionException; 091import org.apache.hadoop.hbase.PleaseHoldException; 092import org.apache.hadoop.hbase.ScheduledChore; 093import org.apache.hadoop.hbase.ServerName; 094import org.apache.hadoop.hbase.Stoppable; 095import org.apache.hadoop.hbase.TableName; 096import org.apache.hadoop.hbase.YouAreDeadException; 097import org.apache.hadoop.hbase.ZNodeClearer; 098import org.apache.hadoop.hbase.client.ConnectionUtils; 099import org.apache.hadoop.hbase.client.RegionInfo; 100import org.apache.hadoop.hbase.client.RegionInfoBuilder; 101import org.apache.hadoop.hbase.client.locking.EntityLock; 102import org.apache.hadoop.hbase.client.locking.LockServiceClient; 103import org.apache.hadoop.hbase.conf.ConfigurationObserver; 104import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 105import org.apache.hadoop.hbase.exceptions.RegionMovedException; 106import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 107import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 108import org.apache.hadoop.hbase.executor.ExecutorType; 109import org.apache.hadoop.hbase.http.InfoServer; 110import org.apache.hadoop.hbase.io.hfile.BlockCache; 111import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 112import org.apache.hadoop.hbase.io.hfile.HFile; 113import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 114import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 115import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException; 116import org.apache.hadoop.hbase.ipc.RpcClient; 117import org.apache.hadoop.hbase.ipc.RpcServer; 118import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 119import org.apache.hadoop.hbase.ipc.ServerRpcController; 120import org.apache.hadoop.hbase.log.HBaseMarkers; 121import org.apache.hadoop.hbase.mob.MobFileCache; 122import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; 123import org.apache.hadoop.hbase.monitoring.TaskMonitor; 124import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 125import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore; 126import org.apache.hadoop.hbase.net.Address; 127import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 128import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 129import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 130import org.apache.hadoop.hbase.quotas.QuotaUtil; 131import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 132import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 133import org.apache.hadoop.hbase.quotas.RegionSize; 134import org.apache.hadoop.hbase.quotas.RegionSizeStore; 135import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 136import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 137import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 138import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 139import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 140import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 141import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 142import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 143import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet; 144import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; 145import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; 146import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 147import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 148import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 149import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; 150import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 151import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; 152import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 153import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 154import org.apache.hadoop.hbase.security.SecurityConstants; 155import org.apache.hadoop.hbase.security.Superusers; 156import org.apache.hadoop.hbase.security.User; 157import org.apache.hadoop.hbase.security.UserProvider; 158import org.apache.hadoop.hbase.trace.TraceUtil; 159import org.apache.hadoop.hbase.util.Bytes; 160import org.apache.hadoop.hbase.util.CompressionTest; 161import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 162import org.apache.hadoop.hbase.util.DNS; 163import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 164import org.apache.hadoop.hbase.util.FSUtils; 165import org.apache.hadoop.hbase.util.FutureUtils; 166import org.apache.hadoop.hbase.util.JvmPauseMonitor; 167import org.apache.hadoop.hbase.util.Pair; 168import org.apache.hadoop.hbase.util.RetryCounter; 169import org.apache.hadoop.hbase.util.RetryCounterFactory; 170import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 171import org.apache.hadoop.hbase.util.Strings; 172import org.apache.hadoop.hbase.util.Threads; 173import org.apache.hadoop.hbase.util.VersionInfo; 174import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 175import org.apache.hadoop.hbase.wal.WAL; 176import org.apache.hadoop.hbase.wal.WALFactory; 177import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 178import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 179import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 180import org.apache.hadoop.hbase.zookeeper.ZKUtil; 181import org.apache.hadoop.ipc.RemoteException; 182import org.apache.hadoop.util.ReflectionUtils; 183import org.apache.yetus.audience.InterfaceAudience; 184import org.apache.zookeeper.KeeperException; 185import org.slf4j.Logger; 186import org.slf4j.LoggerFactory; 187 188import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 189import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 190import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 191import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 192import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 193import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses; 194import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 195import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 196import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 197import org.apache.hbase.thirdparty.com.google.protobuf.Message; 198import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 199import org.apache.hbase.thirdparty.com.google.protobuf.Service; 200import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 201import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 202import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 203 204import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 205import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 234 235/** 236 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There 237 * are many HRegionServers in a single HBase deployment. 238 */ 239@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 240@SuppressWarnings({ "deprecation" }) 241public class HRegionServer extends HBaseServerBase<RSRpcServices> 242 implements RegionServerServices, LastSequenceId { 243 244 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 245 246 int unitMB = 1024 * 1024; 247 int unitKB = 1024; 248 249 /** 250 * For testing only! Set to true to skip notifying region assignment to master . 251 */ 252 @InterfaceAudience.Private 253 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") 254 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 255 256 /** 257 * A map from RegionName to current action in progress. Boolean value indicates: true - if open 258 * region action in progress false - if close region action in progress 259 */ 260 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 261 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 262 263 /** 264 * Used to cache the open/close region procedures which already submitted. See 265 * {@link #submitRegionProcedure(long)}. 266 */ 267 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 268 /** 269 * Used to cache the open/close region procedures which already executed. See 270 * {@link #submitRegionProcedure(long)}. 271 */ 272 private final Cache<Long, Long> executedRegionProcedures = 273 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 274 275 /** 276 * Used to cache the moved-out regions 277 */ 278 private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder() 279 .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build(); 280 281 private MemStoreFlusher cacheFlusher; 282 283 private HeapMemoryManager hMemManager; 284 285 // Replication services. If no replication, this handler will be null. 286 private ReplicationSourceService replicationSourceHandler; 287 private ReplicationSinkService replicationSinkHandler; 288 private boolean sameReplicationSourceAndSink; 289 290 // Compactions 291 private CompactSplit compactSplitThread; 292 293 /** 294 * Map of regions currently being served by this region server. Key is the encoded region name. 295 * All access should be synchronized. 296 */ 297 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 298 /** 299 * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it 300 * need to be a ConcurrentHashMap? 301 */ 302 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 303 304 /** 305 * Map of encoded region names to the DataNode locations they should be hosted on We store the 306 * value as Address since InetSocketAddress is required by the HDFS API (create() that takes 307 * favored nodes as hints for placing file blocks). We could have used ServerName here as the 308 * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API 309 * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and 310 * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the 311 * conversion on demand from Address to InetSocketAddress will guarantee the resolution results 312 * will be fresh when we need it. 313 */ 314 private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 315 316 private LeaseManager leaseManager; 317 318 private volatile boolean dataFsOk; 319 private volatile boolean isGlobalReadOnlyEnabled; 320 321 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 322 // Default abort timeout is 1200 seconds for safe 323 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 324 // Will run this task when abort timeout 325 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 326 327 // A state before we go into stopped state. At this stage we're closing user 328 // space regions. 329 private boolean stopping = false; 330 private volatile boolean killed = false; 331 332 private final int threadWakeFrequency; 333 334 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 335 private final int compactionCheckFrequency; 336 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 337 private final int flushCheckFrequency; 338 339 // Stub to do region server status calls against the master. 340 private volatile RegionServerStatusService.BlockingInterface rssStub; 341 private volatile LockService.BlockingInterface lockStub; 342 // RPC client. Used to make the stub above that does region server status checking. 343 private RpcClient rpcClient; 344 345 private UncaughtExceptionHandler uncaughtExceptionHandler; 346 347 private JvmPauseMonitor pauseMonitor; 348 349 private RSSnapshotVerifier rsSnapshotVerifier; 350 351 /** region server process name */ 352 public static final String REGIONSERVER = "regionserver"; 353 354 private MetricsRegionServer metricsRegionServer; 355 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 356 357 /** 358 * Check for compactions requests. 359 */ 360 private ScheduledChore compactionChecker; 361 362 /** 363 * Check for flushes 364 */ 365 private ScheduledChore periodicFlusher; 366 367 private volatile WALFactory walFactory; 368 369 private LogRoller walRoller; 370 371 // A thread which calls reportProcedureDone 372 private RemoteProcedureResultReporter procedureResultReporter; 373 374 // flag set after we're done setting up server threads 375 final AtomicBoolean online = new AtomicBoolean(false); 376 377 // master address tracker 378 private final MasterAddressTracker masterAddressTracker; 379 380 // Log Splitting Worker 381 private SplitLogWorker splitLogWorker; 382 383 private final int shortOperationTimeout; 384 385 // Time to pause if master says 'please hold' 386 private final long retryPauseTime; 387 388 private final RegionServerAccounting regionServerAccounting; 389 390 private NamedQueueServiceChore namedQueueServiceChore = null; 391 392 // Block cache 393 private BlockCache blockCache; 394 // The cache for mob files 395 private MobFileCache mobFileCache; 396 397 /** The health check chore. */ 398 private HealthCheckChore healthCheckChore; 399 400 /** The Executor status collect chore. */ 401 private ExecutorStatusChore executorStatusChore; 402 403 /** The nonce manager chore. */ 404 private ScheduledChore nonceManagerChore; 405 406 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap(); 407 408 /** 409 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use 410 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead. 411 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a> 412 */ 413 @Deprecated 414 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 415 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 416 "hbase.regionserver.hostname.disable.master.reversedns"; 417 418 /** 419 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive. 420 * Exception will be thrown if both are used. 421 */ 422 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 423 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 424 "hbase.unsafe.regionserver.hostname.disable.master.reversedns"; 425 426 /** 427 * Unique identifier for the cluster we are a part of. 428 */ 429 private String clusterId; 430 431 // chore for refreshing store files for secondary regions 432 private StorefileRefresherChore storefileRefresher; 433 434 private volatile RegionServerCoprocessorHost rsHost; 435 436 private RegionServerProcedureManagerHost rspmHost; 437 438 private RegionServerRpcQuotaManager rsQuotaManager; 439 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 440 441 /** 442 * Nonce manager. Nonces are used to make operations like increment and append idempotent in the 443 * case where client doesn't receive the response from a successful operation and retries. We 444 * track the successful ops for some time via a nonce sent by client and handle duplicate 445 * operations (currently, by failing them; in future we might use MVCC to return result). Nonces 446 * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL 447 * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past 448 * records. If we don't read the records, we don't read and recover the nonces. Some WALs within 449 * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL 450 * recovery during normal region move, so nonces will not be transfered. We can have separate 451 * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path 452 * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a 453 * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part 454 * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't 455 * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be 456 * recovered during move. 457 */ 458 final ServerNonceManager nonceManager; 459 460 private BrokenStoreFileCleaner brokenStoreFileCleaner; 461 462 private RSMobFileCleanerChore rsMobFileCleanerChore; 463 464 @InterfaceAudience.Private 465 CompactedHFilesDischarger compactedFileDischarger; 466 467 private volatile ThroughputController flushThroughputController; 468 469 private SecureBulkLoadManager secureBulkLoadManager; 470 471 private FileSystemUtilizationChore fsUtilizationChore; 472 473 private BootstrapNodeManager bootstrapNodeManager; 474 475 /** 476 * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to 477 * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing 478 * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a 479 * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace 480 * {@link #TEST_SKIP_REPORTING_TRANSITION} ? 481 */ 482 private final boolean masterless; 483 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 484 485 /** regionserver codec list **/ 486 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 487 488 // A timer to shutdown the process if abort takes too long 489 private Timer abortMonitor; 490 491 private RegionReplicationBufferManager regionReplicationBufferManager; 492 493 /* 494 * Chore that creates replication marker rows. 495 */ 496 private ReplicationMarkerChore replicationMarkerChore; 497 498 // A timer submit requests to the PrefetchExecutor 499 private PrefetchExecutorNotifier prefetchExecutorNotifier; 500 501 /** 502 * Starts a HRegionServer at the default location. 503 * <p/> 504 * Don't start any services or managers in here in the Constructor. Defer till after we register 505 * with the Master as much as possible. See {@link #startServices}. 506 */ 507 public HRegionServer(final Configuration conf) throws IOException { 508 super(conf, "RegionServer"); // thread name 509 final Span span = TraceUtil.createSpan("HRegionServer.cxtor"); 510 try (Scope ignored = span.makeCurrent()) { 511 this.dataFsOk = true; 512 this.masterless = !clusterMode(); 513 MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf); 514 HFile.checkHFileVersion(this.conf); 515 checkCodecs(this.conf); 516 FSUtils.setupShortCircuitRead(this.conf); 517 518 // Disable usage of meta replicas in the regionserver 519 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 520 // Config'ed params 521 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 522 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 523 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 524 525 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 526 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 527 528 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 529 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 530 531 this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME, 532 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); 533 534 regionServerAccounting = new RegionServerAccounting(conf); 535 536 blockCache = BlockCacheFactory.createBlockCache(conf); 537 // The call below, instantiates the DataTieringManager only when 538 // the configuration "hbase.regionserver.datatiering.enable" is set to true. 539 DataTieringManager.instantiate(conf, onlineRegions); 540 541 mobFileCache = new MobFileCache(conf); 542 543 rsSnapshotVerifier = new RSSnapshotVerifier(conf); 544 545 uncaughtExceptionHandler = 546 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 547 548 // If no master in cluster, skip trying to track one or look for a cluster status. 549 if (!this.masterless) { 550 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 551 masterAddressTracker.start(); 552 } else { 553 masterAddressTracker = null; 554 } 555 this.rpcServices.start(zooKeeper); 556 span.setStatus(StatusCode.OK); 557 } catch (Throwable t) { 558 // Make sure we log the exception. HRegionServer is often started via reflection and the 559 // cause of failed startup is lost. 560 TraceUtil.setError(span, t); 561 LOG.error("Failed construction RegionServer", t); 562 throw t; 563 } finally { 564 span.end(); 565 } 566 } 567 568 // HMaster should override this method to load the specific config for master 569 @Override 570 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 571 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); 572 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 573 if (!StringUtils.isBlank(hostname)) { 574 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " 575 + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " 576 + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " 577 + UNSAFE_RS_HOSTNAME_KEY + " is used"; 578 throw new IOException(msg); 579 } else { 580 return DNS.getHostname(conf, DNS.ServerType.REGIONSERVER); 581 } 582 } else { 583 return hostname; 584 } 585 } 586 587 @Override 588 protected DNS.ServerType getDNSServerType() { 589 return DNS.ServerType.REGIONSERVER; 590 } 591 592 @Override 593 protected void login(UserProvider user, String host) throws IOException { 594 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 595 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 596 } 597 598 @Override 599 protected String getProcessName() { 600 return REGIONSERVER; 601 } 602 603 @Override 604 protected RegionServerCoprocessorHost getCoprocessorHost() { 605 return getRegionServerCoprocessorHost(); 606 } 607 608 @Override 609 protected boolean canCreateBaseZNode() { 610 return !clusterMode(); 611 } 612 613 @Override 614 protected boolean canUpdateTableDescriptor() { 615 return false; 616 } 617 618 @Override 619 protected boolean cacheTableDescriptor() { 620 return false; 621 } 622 623 protected RSRpcServices createRpcServices() throws IOException { 624 return new RSRpcServices(this); 625 } 626 627 @Override 628 protected void configureInfoServer(InfoServer infoServer) { 629 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 630 infoServer.setAttribute(REGIONSERVER, this); 631 } 632 633 @Override 634 protected Class<? extends HttpServlet> getDumpServlet() { 635 return RSDumpServlet.class; 636 } 637 638 /** 639 * Used by {@link RSDumpServlet} to generate debugging information. 640 */ 641 public void dumpRowLocks(final PrintWriter out) { 642 StringBuilder sb = new StringBuilder(); 643 for (HRegion region : getRegions()) { 644 if (region.getLockedRows().size() > 0) { 645 for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) { 646 sb.setLength(0); 647 sb.append(region.getTableDescriptor().getTableName()).append(",") 648 .append(region.getRegionInfo().getEncodedName()).append(","); 649 sb.append(rowLockContext.toString()); 650 out.println(sb); 651 } 652 } 653 } 654 } 655 656 @Override 657 public boolean registerService(Service instance) { 658 // No stacking of instances is allowed for a single executorService name 659 ServiceDescriptor serviceDesc = instance.getDescriptorForType(); 660 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 661 if (coprocessorServiceHandlers.containsKey(serviceName)) { 662 LOG.error("Coprocessor executorService " + serviceName 663 + " already registered, rejecting request from " + instance); 664 return false; 665 } 666 667 coprocessorServiceHandlers.put(serviceName, instance); 668 if (LOG.isDebugEnabled()) { 669 LOG.debug( 670 "Registered regionserver coprocessor executorService: executorService=" + serviceName); 671 } 672 return true; 673 } 674 675 /** 676 * Run test on configured codecs to make sure supporting libs are in place. 677 */ 678 private static void checkCodecs(final Configuration c) throws IOException { 679 // check to see if the codec list is available: 680 String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null); 681 if (codecs == null) { 682 return; 683 } 684 for (String codec : codecs) { 685 if (!CompressionTest.testCompression(codec)) { 686 throw new IOException( 687 "Compression codec " + codec + " not supported, aborting RS construction"); 688 } 689 } 690 } 691 692 public String getClusterId() { 693 return this.clusterId; 694 } 695 696 /** 697 * All initialization needed before we go register with Master.<br> 698 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 699 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 700 */ 701 private void preRegistrationInitialization() { 702 final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization"); 703 try (Scope ignored = span.makeCurrent()) { 704 initializeZooKeeper(); 705 setupClusterConnection(); 706 bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker); 707 regionReplicationBufferManager = new RegionReplicationBufferManager(this); 708 // Setup RPC client for master communication 709 this.rpcClient = asyncClusterConnection.getRpcClient(); 710 span.setStatus(StatusCode.OK); 711 } catch (Throwable t) { 712 // Call stop if error or process will stick around for ever since server 713 // puts up non-daemon threads. 714 TraceUtil.setError(span, t); 715 this.rpcServices.stop(); 716 abort("Initialization of RS failed. Hence aborting RS.", t); 717 } finally { 718 span.end(); 719 } 720 } 721 722 /** 723 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 724 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 725 * <p> 726 * Finally open long-living server short-circuit connection. 727 */ 728 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 729 justification = "cluster Id znode read would give us correct response") 730 private void initializeZooKeeper() throws IOException, InterruptedException { 731 // Nothing to do in here if no Master in the mix. 732 if (this.masterless) { 733 return; 734 } 735 736 // Create the master address tracker, register with zk, and start it. Then 737 // block until a master is available. No point in starting up if no master 738 // running. 739 blockAndCheckIfStopped(this.masterAddressTracker); 740 741 // Wait on cluster being up. Master will set this flag up in zookeeper 742 // when ready. 743 blockAndCheckIfStopped(this.clusterStatusTracker); 744 745 // If we are HMaster then the cluster id should have already been set. 746 if (clusterId == null) { 747 // Retrieve clusterId 748 // Since cluster status is now up 749 // ID should have already been set by HMaster 750 try { 751 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 752 if (clusterId == null) { 753 this.abort("Cluster ID has not been set"); 754 } 755 LOG.info("ClusterId : " + clusterId); 756 } catch (KeeperException e) { 757 this.abort("Failed to retrieve Cluster ID", e); 758 } 759 } 760 761 if (isStopped() || isAborted()) { 762 return; // No need for further initialization 763 } 764 765 // watch for snapshots and other procedures 766 try { 767 rspmHost = new RegionServerProcedureManagerHost(); 768 rspmHost.loadProcedures(conf); 769 rspmHost.initialize(this); 770 } catch (KeeperException e) { 771 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 772 } 773 } 774 775 /** 776 * Utilty method to wait indefinitely on a znode availability while checking if the region server 777 * is shut down 778 * @param tracker znode tracker to use 779 * @throws IOException any IO exception, plus if the RS is stopped 780 * @throws InterruptedException if the waiting thread is interrupted 781 */ 782 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 783 throws IOException, InterruptedException { 784 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 785 if (this.stopped) { 786 throw new IOException("Received the shutdown message while waiting."); 787 } 788 } 789 } 790 791 /** Returns True if the cluster is up. */ 792 @Override 793 public boolean isClusterUp() { 794 return this.masterless 795 || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 796 } 797 798 private void initializeReplicationMarkerChore() { 799 boolean replicationMarkerEnabled = 800 conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); 801 // If replication or replication marker is not enabled then return immediately. 802 if (replicationMarkerEnabled) { 803 int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY, 804 REPLICATION_MARKER_CHORE_DURATION_DEFAULT); 805 replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf); 806 } 807 } 808 809 @Override 810 public boolean isStopping() { 811 return stopping; 812 } 813 814 /** 815 * The HRegionServer sticks in this loop until closed. 816 */ 817 @Override 818 public void run() { 819 if (isStopped()) { 820 LOG.info("Skipping run; stopped"); 821 return; 822 } 823 try { 824 // Do pre-registration initializations; zookeeper, lease threads, etc. 825 preRegistrationInitialization(); 826 } catch (Throwable e) { 827 abort("Fatal exception during initialization", e); 828 } 829 830 try { 831 if (!isStopped() && !isAborted()) { 832 installShutdownHook(); 833 834 CoprocessorConfigurationUtil.syncReadOnlyConfigurations(conf, 835 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 836 837 // Initialize the RegionServerCoprocessorHost now that our ephemeral 838 // node was created, in case any coprocessors want to use ZooKeeper 839 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 840 841 // Try and register with the Master; tell it we are here. Break if server is stopped or 842 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 843 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 844 // come up. 845 LOG.debug("About to register with Master."); 846 TraceUtil.trace(() -> { 847 RetryCounterFactory rcf = 848 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 849 RetryCounter rc = rcf.create(); 850 while (keepLooping()) { 851 RegionServerStartupResponse w = reportForDuty(); 852 if (w == null) { 853 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 854 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 855 this.sleeper.sleep(sleepTime); 856 } else { 857 handleReportForDutyResponse(w); 858 break; 859 } 860 } 861 }, "HRegionServer.registerWithMaster"); 862 } 863 864 if (!isStopped() && isHealthy()) { 865 TraceUtil.trace(() -> { 866 // start the snapshot handler and other procedure handlers, 867 // since the server is ready to run 868 if (this.rspmHost != null) { 869 this.rspmHost.start(); 870 } 871 // Start the Quota Manager 872 if (this.rsQuotaManager != null) { 873 rsQuotaManager.start(getRpcServer().getScheduler()); 874 } 875 if (this.rsSpaceQuotaManager != null) { 876 this.rsSpaceQuotaManager.start(); 877 } 878 }, "HRegionServer.startup"); 879 } 880 881 // We registered with the Master. Go into run mode. 882 long lastMsg = EnvironmentEdgeManager.currentTime(); 883 long oldRequestCount = -1; 884 // The main run loop. 885 while (!isStopped() && isHealthy()) { 886 if (!isClusterUp()) { 887 if (onlineRegions.isEmpty()) { 888 stop("Exiting; cluster shutdown set and not carrying any regions"); 889 } else if (!this.stopping) { 890 this.stopping = true; 891 LOG.info("Closing user regions"); 892 closeUserRegions(isAborted()); 893 } else { 894 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 895 if (allUserRegionsOffline) { 896 // Set stopped if no more write requests tp meta tables 897 // since last time we went around the loop. Any open 898 // meta regions will be closed on our way out. 899 if (oldRequestCount == getWriteRequestCount()) { 900 stop("Stopped; only catalog regions remaining online"); 901 break; 902 } 903 oldRequestCount = getWriteRequestCount(); 904 } else { 905 // Make sure all regions have been closed -- some regions may 906 // have not got it because we were splitting at the time of 907 // the call to closeUserRegions. 908 closeUserRegions(this.abortRequested.get()); 909 } 910 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 911 } 912 } 913 long now = EnvironmentEdgeManager.currentTime(); 914 if ((now - lastMsg) >= msgInterval) { 915 tryRegionServerReport(lastMsg, now); 916 lastMsg = EnvironmentEdgeManager.currentTime(); 917 } 918 if (!isStopped() && !isAborted()) { 919 this.sleeper.sleep(); 920 } 921 } // for 922 } catch (Throwable t) { 923 if (!rpcServices.checkOOME(t)) { 924 String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: "; 925 abort(prefix + t.getMessage(), t); 926 } 927 } 928 929 final Span span = TraceUtil.createSpan("HRegionServer exiting main loop"); 930 try (Scope ignored = span.makeCurrent()) { 931 if (this.leaseManager != null) { 932 this.leaseManager.closeAfterLeasesExpire(); 933 } 934 if (this.splitLogWorker != null) { 935 splitLogWorker.stop(); 936 } 937 stopInfoServer(); 938 // Send cache a shutdown. 939 if (blockCache != null) { 940 blockCache.shutdown(); 941 } 942 if (mobFileCache != null) { 943 mobFileCache.shutdown(); 944 } 945 946 // Send interrupts to wake up threads if sleeping so they notice shutdown. 947 // TODO: Should we check they are alive? If OOME could have exited already 948 if (this.hMemManager != null) { 949 this.hMemManager.stop(); 950 } 951 if (this.cacheFlusher != null) { 952 this.cacheFlusher.interruptIfNecessary(); 953 } 954 if (this.compactSplitThread != null) { 955 this.compactSplitThread.interruptIfNecessary(); 956 } 957 958 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 959 if (rspmHost != null) { 960 rspmHost.stop(this.abortRequested.get() || this.killed); 961 } 962 963 if (this.killed) { 964 // Just skip out w/o closing regions. Used when testing. 965 } else if (abortRequested.get()) { 966 if (this.dataFsOk) { 967 closeUserRegions(abortRequested.get()); // Don't leave any open file handles 968 } 969 LOG.info("aborting server " + this.serverName); 970 } else { 971 closeUserRegions(abortRequested.get()); 972 LOG.info("stopping server " + this.serverName); 973 } 974 regionReplicationBufferManager.stop(); 975 closeClusterConnection(); 976 // Closing the compactSplit thread before closing meta regions 977 if (!this.killed && containsMetaTableRegions()) { 978 if (!abortRequested.get() || this.dataFsOk) { 979 if (this.compactSplitThread != null) { 980 this.compactSplitThread.join(); 981 this.compactSplitThread = null; 982 } 983 closeMetaTableRegions(abortRequested.get()); 984 } 985 } 986 987 if (!this.killed && this.dataFsOk) { 988 waitOnAllRegionsToClose(abortRequested.get()); 989 LOG.info("stopping server " + this.serverName + "; all regions closed."); 990 } 991 992 // Stop the quota manager 993 if (rsQuotaManager != null) { 994 rsQuotaManager.stop(); 995 } 996 if (rsSpaceQuotaManager != null) { 997 rsSpaceQuotaManager.stop(); 998 rsSpaceQuotaManager = null; 999 } 1000 1001 // flag may be changed when closing regions throws exception. 1002 if (this.dataFsOk) { 1003 shutdownWAL(!abortRequested.get()); 1004 } 1005 1006 // Make sure the proxy is down. 1007 if (this.rssStub != null) { 1008 this.rssStub = null; 1009 } 1010 if (this.lockStub != null) { 1011 this.lockStub = null; 1012 } 1013 if (this.rpcClient != null) { 1014 this.rpcClient.close(); 1015 } 1016 if (this.leaseManager != null) { 1017 this.leaseManager.close(); 1018 } 1019 if (this.pauseMonitor != null) { 1020 this.pauseMonitor.stop(); 1021 } 1022 1023 if (!killed) { 1024 stopServiceThreads(); 1025 } 1026 1027 if (this.rpcServices != null) { 1028 this.rpcServices.stop(); 1029 } 1030 1031 try { 1032 deleteMyEphemeralNode(); 1033 } catch (KeeperException.NoNodeException nn) { 1034 // pass 1035 } catch (KeeperException e) { 1036 LOG.warn("Failed deleting my ephemeral node", e); 1037 } 1038 // We may have failed to delete the znode at the previous step, but 1039 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1040 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1041 1042 closeZooKeeper(); 1043 closeTableDescriptors(); 1044 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1045 span.setStatus(StatusCode.OK); 1046 } finally { 1047 span.end(); 1048 } 1049 } 1050 1051 private boolean containsMetaTableRegions() { 1052 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1053 } 1054 1055 private boolean areAllUserRegionsOffline() { 1056 if (getNumberOfOnlineRegions() > 2) { 1057 return false; 1058 } 1059 boolean allUserRegionsOffline = true; 1060 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1061 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1062 allUserRegionsOffline = false; 1063 break; 1064 } 1065 } 1066 return allUserRegionsOffline; 1067 } 1068 1069 /** Returns Current write count for all online regions. */ 1070 private long getWriteRequestCount() { 1071 long writeCount = 0; 1072 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1073 writeCount += e.getValue().getWriteRequestsCount(); 1074 } 1075 return writeCount; 1076 } 1077 1078 @InterfaceAudience.Private 1079 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1080 throws IOException { 1081 RegionServerStatusService.BlockingInterface rss = rssStub; 1082 if (rss == null) { 1083 // the current server could be stopping. 1084 return; 1085 } 1086 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1087 final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport"); 1088 try (Scope ignored = span.makeCurrent()) { 1089 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1090 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1091 request.setLoad(sl); 1092 rss.regionServerReport(null, request.build()); 1093 span.setStatus(StatusCode.OK); 1094 } catch (ServiceException se) { 1095 IOException ioe = ProtobufUtil.getRemoteException(se); 1096 if (ioe instanceof YouAreDeadException) { 1097 // This will be caught and handled as a fatal error in run() 1098 TraceUtil.setError(span, ioe); 1099 throw ioe; 1100 } 1101 if (rssStub == rss) { 1102 rssStub = null; 1103 } 1104 TraceUtil.setError(span, se); 1105 // Couldn't connect to the master, get location from zk and reconnect 1106 // Method blocks until new master is found or we are stopped 1107 createRegionServerStatusStub(true); 1108 } finally { 1109 span.end(); 1110 } 1111 } 1112 1113 /** 1114 * Reports the given map of Regions and their size on the filesystem to the active Master. 1115 * @param regionSizeStore The store containing region sizes 1116 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1117 */ 1118 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1119 RegionServerStatusService.BlockingInterface rss = rssStub; 1120 if (rss == null) { 1121 // the current server could be stopping. 1122 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1123 return true; 1124 } 1125 try { 1126 buildReportAndSend(rss, regionSizeStore); 1127 } catch (ServiceException se) { 1128 IOException ioe = ProtobufUtil.getRemoteException(se); 1129 if (ioe instanceof PleaseHoldException) { 1130 LOG.trace("Failed to report region sizes to Master because it is initializing." 1131 + " This will be retried.", ioe); 1132 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1133 return true; 1134 } 1135 if (rssStub == rss) { 1136 rssStub = null; 1137 } 1138 createRegionServerStatusStub(true); 1139 if (ioe instanceof DoNotRetryIOException) { 1140 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1141 if (doNotRetryEx.getCause() != null) { 1142 Throwable t = doNotRetryEx.getCause(); 1143 if (t instanceof UnsupportedOperationException) { 1144 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1145 return false; 1146 } 1147 } 1148 } 1149 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1150 } 1151 return true; 1152 } 1153 1154 /** 1155 * Builds the region size report and sends it to the master. Upon successful sending of the 1156 * report, the region sizes that were sent are marked as sent. 1157 * @param rss The stub to send to the Master 1158 * @param regionSizeStore The store containing region sizes 1159 */ 1160 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1161 RegionSizeStore regionSizeStore) throws ServiceException { 1162 RegionSpaceUseReportRequest request = 1163 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1164 rss.reportRegionSpaceUse(null, request); 1165 // Record the number of size reports sent 1166 if (metricsRegionServer != null) { 1167 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1168 } 1169 } 1170 1171 /** 1172 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1173 * @param regionSizes The size in bytes of regions 1174 * @return The corresponding protocol buffer message. 1175 */ 1176 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1177 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1178 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1179 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1180 } 1181 return request.build(); 1182 } 1183 1184 /** 1185 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf 1186 * message. 1187 * @param regionInfo The RegionInfo 1188 * @param sizeInBytes The size in bytes of the Region 1189 * @return The protocol buffer 1190 */ 1191 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1192 return RegionSpaceUse.newBuilder() 1193 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1194 .setRegionSize(Objects.requireNonNull(sizeInBytes)).build(); 1195 } 1196 1197 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1198 throws IOException { 1199 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1200 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1201 // the wrapper to compute those numbers in one place. 1202 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1203 // Instead they should be stored in an HBase table so that external visibility into HBase is 1204 // improved; Additionally the load balancer will be able to take advantage of a more complete 1205 // history. 1206 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1207 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1208 long usedMemory = -1L; 1209 long maxMemory = -1L; 1210 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1211 if (usage != null) { 1212 usedMemory = usage.getUsed(); 1213 maxMemory = usage.getMax(); 1214 } 1215 1216 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1217 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1218 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1219 serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024)); 1220 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1221 serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount()); 1222 serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount()); 1223 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1224 Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder(); 1225 for (String coprocessor : coprocessors) { 1226 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1227 } 1228 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1229 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1230 for (HRegion region : regions) { 1231 if (region.getCoprocessorHost() != null) { 1232 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1233 for (String regionCoprocessor : regionCoprocessors) { 1234 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1235 } 1236 } 1237 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1238 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1239 .getCoprocessors()) { 1240 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1241 } 1242 } 1243 1244 getBlockCache().ifPresent(cache -> { 1245 cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> { 1246 regionCachedInfo.forEach((regionName, prefetchSize) -> { 1247 serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB)); 1248 }); 1249 }); 1250 }); 1251 serverLoad.setCacheFreeSize(regionServerWrapper.getBlockCacheFreeSize()); 1252 if (DataTieringManager.getInstance() != null) { 1253 DataTieringManager.getInstance().getRegionColdDataSize() 1254 .forEach((regionName, coldDataSize) -> serverLoad.putRegionColdData(regionName, 1255 roundSize(coldDataSize.getSecond(), unitMB))); 1256 } 1257 serverLoad.setReportStartTime(reportStartTime); 1258 serverLoad.setReportEndTime(reportEndTime); 1259 if (this.infoServer != null) { 1260 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1261 } else { 1262 serverLoad.setInfoServerPort(-1); 1263 } 1264 MetricsUserAggregateSource userSource = 1265 metricsRegionServer.getMetricsUserAggregate().getSource(); 1266 if (userSource != null) { 1267 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1268 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1269 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1270 } 1271 } 1272 1273 if (sameReplicationSourceAndSink && replicationSourceHandler != null) { 1274 // always refresh first to get the latest value 1275 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1276 if (rLoad != null) { 1277 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1278 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1279 .getReplicationLoadSourceEntries()) { 1280 serverLoad.addReplLoadSource(rLS); 1281 } 1282 } 1283 } else { 1284 if (replicationSourceHandler != null) { 1285 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1286 if (rLoad != null) { 1287 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1288 .getReplicationLoadSourceEntries()) { 1289 serverLoad.addReplLoadSource(rLS); 1290 } 1291 } 1292 } 1293 if (replicationSinkHandler != null) { 1294 ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad(); 1295 if (rLoad != null) { 1296 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1297 } 1298 } 1299 } 1300 1301 TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask 1302 .newBuilder().setDescription(task.getDescription()) 1303 .setStatus(task.getStatus() != null ? task.getStatus() : "") 1304 .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) 1305 .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build())); 1306 1307 return serverLoad.build(); 1308 } 1309 1310 private String getOnlineRegionsAsPrintableString() { 1311 StringBuilder sb = new StringBuilder(); 1312 for (Region r : this.onlineRegions.values()) { 1313 if (sb.length() > 0) { 1314 sb.append(", "); 1315 } 1316 sb.append(r.getRegionInfo().getEncodedName()); 1317 } 1318 return sb.toString(); 1319 } 1320 1321 /** 1322 * Wait on regions close. 1323 */ 1324 private void waitOnAllRegionsToClose(final boolean abort) { 1325 // Wait till all regions are closed before going out. 1326 int lastCount = -1; 1327 long previousLogTime = 0; 1328 Set<String> closedRegions = new HashSet<>(); 1329 boolean interrupted = false; 1330 try { 1331 while (!onlineRegions.isEmpty()) { 1332 int count = getNumberOfOnlineRegions(); 1333 // Only print a message if the count of regions has changed. 1334 if (count != lastCount) { 1335 // Log every second at most 1336 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 1337 previousLogTime = EnvironmentEdgeManager.currentTime(); 1338 lastCount = count; 1339 LOG.info("Waiting on " + count + " regions to close"); 1340 // Only print out regions still closing if a small number else will 1341 // swamp the log. 1342 if (count < 10 && LOG.isDebugEnabled()) { 1343 LOG.debug("Online Regions=" + this.onlineRegions); 1344 } 1345 } 1346 } 1347 // Ensure all user regions have been sent a close. Use this to 1348 // protect against the case where an open comes in after we start the 1349 // iterator of onlineRegions to close all user regions. 1350 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1351 RegionInfo hri = e.getValue().getRegionInfo(); 1352 if ( 1353 !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) 1354 && !closedRegions.contains(hri.getEncodedName()) 1355 ) { 1356 closedRegions.add(hri.getEncodedName()); 1357 // Don't update zk with this close transition; pass false. 1358 closeRegionIgnoreErrors(hri, abort); 1359 } 1360 } 1361 // No regions in RIT, we could stop waiting now. 1362 if (this.regionsInTransitionInRS.isEmpty()) { 1363 if (!onlineRegions.isEmpty()) { 1364 LOG.info("We were exiting though online regions are not empty," 1365 + " because some regions failed closing"); 1366 } 1367 break; 1368 } else { 1369 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream() 1370 .map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1371 } 1372 if (sleepInterrupted(200)) { 1373 interrupted = true; 1374 } 1375 } 1376 } finally { 1377 if (interrupted) { 1378 Thread.currentThread().interrupt(); 1379 } 1380 } 1381 } 1382 1383 private static boolean sleepInterrupted(long millis) { 1384 boolean interrupted = false; 1385 try { 1386 Thread.sleep(millis); 1387 } catch (InterruptedException e) { 1388 LOG.warn("Interrupted while sleeping"); 1389 interrupted = true; 1390 } 1391 return interrupted; 1392 } 1393 1394 private void shutdownWAL(final boolean close) { 1395 if (this.walFactory != null) { 1396 try { 1397 if (close) { 1398 walFactory.close(); 1399 } else { 1400 walFactory.shutdown(); 1401 } 1402 } catch (Throwable e) { 1403 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1404 LOG.error("Shutdown / close of WAL failed: " + e); 1405 LOG.debug("Shutdown / close exception details:", e); 1406 } 1407 } 1408 } 1409 1410 /** 1411 * Run init. Sets up wal and starts up all server threads. 1412 * @param c Extra configuration. 1413 */ 1414 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1415 throws IOException { 1416 try { 1417 boolean updateRootDir = false; 1418 for (NameStringPair e : c.getMapEntriesList()) { 1419 String key = e.getName(); 1420 // The hostname the master sees us as. 1421 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1422 String hostnameFromMasterPOV = e.getValue(); 1423 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, 1424 rpcServices.getSocketAddress().getPort(), this.startcode); 1425 String expectedHostName = rpcServices.getSocketAddress().getHostName(); 1426 // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it 1427 // is set to disable. so we will use the ip of the RegionServer to compare with the 1428 // hostname passed by the Master, see HBASE-27304 for details. 1429 if ( 1430 StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent() 1431 && InetAddresses.isInetAddress(getActiveMaster().get().getHostname()) 1432 ) { 1433 expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress(); 1434 } 1435 boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead) 1436 ? Strings.hostnamesEqual(hostnameFromMasterPOV, expectedHostName) 1437 : Strings.hostnamesEqual(hostnameFromMasterPOV, useThisHostnameInstead); 1438 1439 if (!isHostnameConsist) { 1440 String msg = "Master passed us a different hostname to use; was=" 1441 + (StringUtils.isBlank(useThisHostnameInstead) 1442 ? expectedHostName 1443 : this.useThisHostnameInstead) 1444 + ", but now=" + hostnameFromMasterPOV; 1445 LOG.error(msg); 1446 throw new IOException(msg); 1447 } 1448 continue; 1449 } 1450 1451 String value = e.getValue(); 1452 if (key.equals(HConstants.HBASE_DIR)) { 1453 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1454 updateRootDir = true; 1455 } 1456 } 1457 1458 if (LOG.isDebugEnabled()) { 1459 LOG.debug("Config from master: " + key + "=" + value); 1460 } 1461 this.conf.set(key, value); 1462 } 1463 // Set our ephemeral znode up in zookeeper now we have a name. 1464 createMyEphemeralNode(); 1465 1466 if (updateRootDir) { 1467 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1468 initializeFileSystem(); 1469 } 1470 1471 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1472 // config param for task trackers, but we can piggyback off of it. 1473 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1474 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1475 } 1476 1477 // Save it in a file, this will allow to see if we crash 1478 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1479 1480 // This call sets up an initialized replication and WAL. Later we start it up. 1481 setupWALAndReplication(); 1482 // Init in here rather than in constructor after thread name has been set 1483 final MetricsTable metricsTable = 1484 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1485 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1486 this.metricsRegionServer = 1487 new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable); 1488 // Now that we have a metrics source, start the pause monitor 1489 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1490 pauseMonitor.start(); 1491 1492 // There is a rare case where we do NOT want services to start. Check config. 1493 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1494 startServices(); 1495 } 1496 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1497 // or make sense of it. 1498 startReplicationService(); 1499 1500 // Set up ZK 1501 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress() 1502 + ", sessionid=0x" 1503 + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1504 1505 // Wake up anyone waiting for this server to online 1506 synchronized (online) { 1507 online.set(true); 1508 online.notifyAll(); 1509 } 1510 } catch (Throwable e) { 1511 stop("Failed initialization"); 1512 throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed"); 1513 } finally { 1514 sleeper.skipSleepCycle(); 1515 } 1516 } 1517 1518 private void startHeapMemoryManager() { 1519 if (this.blockCache != null) { 1520 this.hMemManager = 1521 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1522 this.hMemManager.start(getChoreService()); 1523 } 1524 } 1525 1526 private void createMyEphemeralNode() throws KeeperException { 1527 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1528 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1529 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1530 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1531 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1532 } 1533 1534 private void deleteMyEphemeralNode() throws KeeperException { 1535 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1536 } 1537 1538 @Override 1539 public RegionServerAccounting getRegionServerAccounting() { 1540 return regionServerAccounting; 1541 } 1542 1543 // Round the size with KB or MB. 1544 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1 1545 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See 1546 // HBASE-26340 for more details on why this is important. 1547 private static int roundSize(long sizeInByte, int sizeUnit) { 1548 if (sizeInByte == 0) { 1549 return 0; 1550 } else if (sizeInByte < sizeUnit) { 1551 return 1; 1552 } else { 1553 return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE); 1554 } 1555 } 1556 1557 /** 1558 * @param r Region to get RegionLoad for. 1559 * @param regionLoadBldr the RegionLoad.Builder, can be null 1560 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1561 * @return RegionLoad instance. 1562 */ 1563 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1564 RegionSpecifier.Builder regionSpecifier) throws IOException { 1565 byte[] name = r.getRegionInfo().getRegionName(); 1566 String regionEncodedName = r.getRegionInfo().getEncodedName(); 1567 int stores = 0; 1568 int storefiles = 0; 1569 int storeRefCount = 0; 1570 int maxCompactedStoreFileRefCount = 0; 1571 long storeUncompressedSize = 0L; 1572 long storefileSize = 0L; 1573 long storefileIndexSize = 0L; 1574 long rootLevelIndexSize = 0L; 1575 long totalStaticIndexSize = 0L; 1576 long totalStaticBloomSize = 0L; 1577 long totalCompactingKVs = 0L; 1578 long currentCompactedKVs = 0L; 1579 long totalRegionSize = 0L; 1580 List<HStore> storeList = r.getStores(); 1581 stores += storeList.size(); 1582 for (HStore store : storeList) { 1583 storefiles += store.getStorefilesCount(); 1584 int currentStoreRefCount = store.getStoreRefCount(); 1585 storeRefCount += currentStoreRefCount; 1586 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1587 maxCompactedStoreFileRefCount = 1588 Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); 1589 storeUncompressedSize += store.getStoreSizeUncompressed(); 1590 storefileSize += store.getStorefilesSize(); 1591 totalRegionSize += store.getHFilesSize(); 1592 // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1593 storefileIndexSize += store.getStorefilesRootLevelIndexSize(); 1594 CompactionProgress progress = store.getCompactionProgress(); 1595 if (progress != null) { 1596 totalCompactingKVs += progress.getTotalCompactingKVs(); 1597 currentCompactedKVs += progress.currentCompactedKVs; 1598 } 1599 rootLevelIndexSize += store.getStorefilesRootLevelIndexSize(); 1600 totalStaticIndexSize += store.getTotalStaticIndexSize(); 1601 totalStaticBloomSize += store.getTotalStaticBloomSize(); 1602 } 1603 1604 int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); 1605 int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); 1606 int storefileSizeMB = roundSize(storefileSize, unitMB); 1607 int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB); 1608 int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); 1609 int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); 1610 int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); 1611 int regionSizeMB = roundSize(totalRegionSize, unitMB); 1612 final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f); 1613 getBlockCache().ifPresent(bc -> { 1614 bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> { 1615 if (regionCachedInfo.containsKey(regionEncodedName)) { 1616 currentRegionCachedRatio.setValue(regionSizeMB == 0 1617 ? 0.0f 1618 : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB); 1619 } 1620 }); 1621 }); 1622 final MutableFloat currentRegionColdDataRatio = new MutableFloat(0.0f); 1623 if (DataTieringManager.getInstance() != null) { 1624 DataTieringManager.getInstance().getRegionColdDataSize().computeIfPresent(regionEncodedName, 1625 (k, v) -> { 1626 int coldSizeMB = roundSize(v.getSecond(), unitMB); 1627 currentRegionColdDataRatio 1628 .setValue(regionSizeMB == 0 ? 0.0f : (float) coldSizeMB / regionSizeMB); 1629 return v; 1630 }); 1631 } 1632 1633 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); 1634 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); 1635 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname()); 1636 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); 1637 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); 1638 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); 1639 if (regionLoadBldr == null) { 1640 regionLoadBldr = RegionLoad.newBuilder(); 1641 } 1642 if (regionSpecifier == null) { 1643 regionSpecifier = RegionSpecifier.newBuilder(); 1644 } 1645 1646 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1647 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1648 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores) 1649 .setStorefiles(storefiles).setStoreRefCount(storeRefCount) 1650 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1651 .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB) 1652 .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB) 1653 .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1654 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1655 .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount()) 1656 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1657 .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs) 1658 .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality) 1659 .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) 1660 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) 1661 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) 1662 .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB) 1663 .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue()) 1664 .setCurrentRegionColdDataRatio(currentRegionColdDataRatio.floatValue()); 1665 r.setCompleteSequenceId(regionLoadBldr); 1666 return regionLoadBldr.build(); 1667 } 1668 1669 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1670 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1671 userLoadBldr.setUserName(user); 1672 userSource.getClientMetrics().values().stream() 1673 .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1674 .setHostName(clientMetrics.getHostName()) 1675 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1676 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1677 .setReadRequestsCount(clientMetrics.getReadRequestsCount()) 1678 .setHostAddress(clientMetrics.getHostAddress()).setUserName(clientMetrics.getUserName()) 1679 .setClientVersion(clientMetrics.getClientVersion()) 1680 .setServiceName(clientMetrics.getServiceName()) 1681 .setClientVersion(clientMetrics.getClientVersion()).build()) 1682 .forEach(userLoadBldr::addClientMetrics); 1683 return userLoadBldr.build(); 1684 } 1685 1686 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1687 HRegion r = onlineRegions.get(encodedRegionName); 1688 return r != null ? createRegionLoad(r, null, null) : null; 1689 } 1690 1691 /** 1692 * Inner class that runs on a long period checking if regions need compaction. 1693 */ 1694 private static class CompactionChecker extends ScheduledChore { 1695 private final HRegionServer instance; 1696 private final int majorCompactPriority; 1697 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1698 // Iteration is 1-based rather than 0-based so we don't check for compaction 1699 // immediately upon region server startup 1700 private long iteration = 1; 1701 1702 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1703 super("CompactionChecker", stopper, sleepTime); 1704 this.instance = h; 1705 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1706 1707 /* 1708 * MajorCompactPriority is configurable. If not set, the compaction will use default priority. 1709 */ 1710 this.majorCompactPriority = this.instance.conf 1711 .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); 1712 } 1713 1714 @Override 1715 protected void chore() { 1716 for (HRegion hr : this.instance.onlineRegions.values()) { 1717 // If region is read only or compaction is disabled at table level, there's no need to 1718 // iterate through region's stores 1719 if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) { 1720 continue; 1721 } 1722 1723 for (HStore s : hr.stores.values()) { 1724 try { 1725 long multiplier = s.getCompactionCheckMultiplier(); 1726 assert multiplier > 0; 1727 if (iteration % multiplier != 0) { 1728 continue; 1729 } 1730 if (s.needsCompaction()) { 1731 // Queue a compaction. Will recognize if major is needed. 1732 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1733 getName() + " requests compaction"); 1734 } else if (s.shouldPerformMajorCompaction()) { 1735 s.triggerMajorCompaction(); 1736 if ( 1737 majorCompactPriority == DEFAULT_PRIORITY 1738 || majorCompactPriority > hr.getCompactPriority() 1739 ) { 1740 this.instance.compactSplitThread.requestCompaction(hr, s, 1741 getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, 1742 CompactionLifeCycleTracker.DUMMY, null); 1743 } else { 1744 this.instance.compactSplitThread.requestCompaction(hr, s, 1745 getName() + " requests major compaction; use configured priority", 1746 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1747 } 1748 } 1749 } catch (IOException e) { 1750 LOG.warn("Failed major compaction check on " + hr, e); 1751 } 1752 } 1753 } 1754 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1755 } 1756 } 1757 1758 private static class PeriodicMemStoreFlusher extends ScheduledChore { 1759 private final HRegionServer server; 1760 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1761 private final static int MIN_DELAY_TIME = 0; // millisec 1762 private final long rangeOfDelayMs; 1763 1764 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1765 super("MemstoreFlusherChore", server, cacheFlushInterval); 1766 this.server = server; 1767 1768 final long configuredRangeOfDelay = server.getConfiguration() 1769 .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 1770 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 1771 } 1772 1773 @Override 1774 protected void chore() { 1775 final StringBuilder whyFlush = new StringBuilder(); 1776 for (HRegion r : this.server.onlineRegions.values()) { 1777 if (r == null) { 1778 continue; 1779 } 1780 if (r.shouldFlush(whyFlush)) { 1781 FlushRequester requester = server.getFlushRequester(); 1782 if (requester != null) { 1783 long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME; 1784 // Throttle the flushes by putting a delay. If we don't throttle, and there 1785 // is a balanced write-load on the regions in a table, we might end up 1786 // overwhelming the filesystem with too many flushes at once. 1787 if (requester.requestDelayedFlush(r, delay)) { 1788 LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(), 1789 r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay); 1790 } 1791 } 1792 } 1793 } 1794 } 1795 } 1796 1797 /** 1798 * Report the status of the server. A server is online once all the startup is completed (setting 1799 * up filesystem, starting executorService threads, etc.). This method is designed mostly to be 1800 * useful in tests. 1801 * @return true if online, false if not. 1802 */ 1803 public boolean isOnline() { 1804 return online.get(); 1805 } 1806 1807 /** 1808 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1809 * be hooked up to WAL. 1810 */ 1811 private void setupWALAndReplication() throws IOException { 1812 WALFactory factory = new WALFactory(conf, serverName, this); 1813 // TODO Replication make assumptions here based on the default filesystem impl 1814 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1815 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1816 1817 Path logDir = new Path(walRootDir, logName); 1818 LOG.debug("logDir={}", logDir); 1819 if (this.walFs.exists(logDir)) { 1820 throw new RegionServerRunningException( 1821 "Region server has already created directory at " + this.serverName.toString()); 1822 } 1823 // Create wal directory here and we will never create it again in other places. This is 1824 // important to make sure that our fencing way takes effect. See HBASE-29797 for more details. 1825 if (!this.walFs.mkdirs(logDir)) { 1826 throw new IOException("Can not create wal directory " + logDir); 1827 } 1828 // Instantiate replication if replication enabled. Pass it the log directories. 1829 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); 1830 1831 WALActionsListener walEventListener = getWALEventTrackerListener(conf); 1832 if (walEventListener != null && factory.getWALProvider() != null) { 1833 factory.getWALProvider().addWALActionsListener(walEventListener); 1834 } 1835 this.walFactory = factory; 1836 } 1837 1838 private WALActionsListener getWALEventTrackerListener(Configuration conf) { 1839 if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) { 1840 WALEventTrackerListener listener = 1841 new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName()); 1842 return listener; 1843 } 1844 return null; 1845 } 1846 1847 /** 1848 * Start up replication source and sink handlers. 1849 */ 1850 private void startReplicationService() throws IOException { 1851 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 1852 this.replicationSourceHandler.startReplicationService(); 1853 } else { 1854 if (this.replicationSourceHandler != null) { 1855 this.replicationSourceHandler.startReplicationService(); 1856 } 1857 if (this.replicationSinkHandler != null) { 1858 this.replicationSinkHandler.startReplicationService(); 1859 } 1860 } 1861 } 1862 1863 /** Returns Master address tracker instance. */ 1864 public MasterAddressTracker getMasterAddressTracker() { 1865 return this.masterAddressTracker; 1866 } 1867 1868 /** 1869 * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need 1870 * to run. This is called after we've successfully registered with the Master. Install an 1871 * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We 1872 * cannot set the handler on all threads. Server's internal Listener thread is off limits. For 1873 * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries 1874 * to run should trigger same critical condition and the shutdown will run. On its way out, this 1875 * server will shut down Server. Leases are sort of inbetween. It has an internal thread that 1876 * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped 1877 * by this hosting server. Worker logs the exception and exits. 1878 */ 1879 private void startServices() throws IOException { 1880 if (!isStopped() && !isAborted()) { 1881 initializeThreads(); 1882 } 1883 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection); 1884 this.secureBulkLoadManager.start(); 1885 1886 // Health checker thread. 1887 if (isHealthCheckerConfigured()) { 1888 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1889 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1890 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1891 } 1892 // Executor status collect thread. 1893 if ( 1894 this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED, 1895 HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED) 1896 ) { 1897 int sleepTime = 1898 this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ); 1899 executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(), 1900 this.metricsRegionServer.getMetricsSource()); 1901 } 1902 1903 this.walRoller = new LogRoller(this); 1904 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1905 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 1906 1907 // Create the CompactedFileDischarger chore executorService. This chore helps to 1908 // remove the compacted files that will no longer be used in reads. 1909 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1910 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1911 int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1912 this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this); 1913 choreService.scheduleChore(compactedFileDischarger); 1914 1915 // Start executor services 1916 final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3); 1917 executorService.startExecutorService(executorService.new ExecutorConfig() 1918 .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads)); 1919 final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1); 1920 executorService.startExecutorService(executorService.new ExecutorConfig() 1921 .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads)); 1922 final int openPriorityRegionThreads = 1923 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3); 1924 executorService.startExecutorService( 1925 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION) 1926 .setCorePoolSize(openPriorityRegionThreads)); 1927 final int closeRegionThreads = 1928 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3); 1929 executorService.startExecutorService(executorService.new ExecutorConfig() 1930 .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads)); 1931 final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1); 1932 executorService.startExecutorService(executorService.new ExecutorConfig() 1933 .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads)); 1934 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1935 final int storeScannerParallelSeekThreads = 1936 conf.getInt("hbase.storescanner.parallel.seek.threads", 10); 1937 executorService.startExecutorService( 1938 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK) 1939 .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true)); 1940 } 1941 final int logReplayOpsThreads = 1942 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 1943 executorService.startExecutorService( 1944 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS) 1945 .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true)); 1946 // Start the threads for compacted files discharger 1947 final int compactionDischargerThreads = 1948 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10); 1949 executorService.startExecutorService(executorService.new ExecutorConfig() 1950 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER) 1951 .setCorePoolSize(compactionDischargerThreads)); 1952 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1953 final int regionReplicaFlushThreads = 1954 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1955 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1956 executorService.startExecutorService(executorService.new ExecutorConfig() 1957 .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS) 1958 .setCorePoolSize(regionReplicaFlushThreads)); 1959 } 1960 final int refreshPeerThreads = 1961 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2); 1962 executorService.startExecutorService(executorService.new ExecutorConfig() 1963 .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads)); 1964 final int replaySyncReplicationWALThreads = 1965 conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1); 1966 executorService.startExecutorService(executorService.new ExecutorConfig() 1967 .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL) 1968 .setCorePoolSize(replaySyncReplicationWALThreads)); 1969 final int switchRpcThrottleThreads = 1970 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); 1971 executorService.startExecutorService( 1972 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE) 1973 .setCorePoolSize(switchRpcThrottleThreads)); 1974 final int claimReplicationQueueThreads = 1975 conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); 1976 executorService.startExecutorService( 1977 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE) 1978 .setCorePoolSize(claimReplicationQueueThreads)); 1979 final int rsSnapshotOperationThreads = 1980 conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); 1981 executorService.startExecutorService( 1982 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) 1983 .setCorePoolSize(rsSnapshotOperationThreads)); 1984 final int rsFlushOperationThreads = 1985 conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3); 1986 executorService.startExecutorService(executorService.new ExecutorConfig() 1987 .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads)); 1988 final int rsRefreshQuotasThreads = 1989 conf.getInt("hbase.regionserver.executor.refresh.quotas.threads", 1); 1990 executorService.startExecutorService( 1991 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS) 1992 .setCorePoolSize(rsRefreshQuotasThreads)); 1993 final int logRollThreads = conf.getInt("hbase.regionserver.executor.log.roll.threads", 1); 1994 executorService.startExecutorService(executorService.new ExecutorConfig() 1995 .setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads)); 1996 final int rsRefreshHFilesThreads = 1997 conf.getInt("hbase.regionserver.executor.refresh.hfiles.threads", 3); 1998 executorService.startExecutorService(executorService.new ExecutorConfig() 1999 .setExecutorType(ExecutorType.RS_REFRESH_HFILES).setCorePoolSize(rsRefreshHFilesThreads)); 2000 2001 Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", 2002 uncaughtExceptionHandler); 2003 if (this.cacheFlusher != null) { 2004 this.cacheFlusher.start(uncaughtExceptionHandler); 2005 } 2006 Threads.setDaemonThreadRunning(this.procedureResultReporter, 2007 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 2008 2009 if (this.compactionChecker != null) { 2010 choreService.scheduleChore(compactionChecker); 2011 } 2012 if (this.periodicFlusher != null) { 2013 choreService.scheduleChore(periodicFlusher); 2014 } 2015 if (this.healthCheckChore != null) { 2016 choreService.scheduleChore(healthCheckChore); 2017 } 2018 if (this.executorStatusChore != null) { 2019 choreService.scheduleChore(executorStatusChore); 2020 } 2021 if (this.nonceManagerChore != null) { 2022 choreService.scheduleChore(nonceManagerChore); 2023 } 2024 if (this.storefileRefresher != null) { 2025 choreService.scheduleChore(storefileRefresher); 2026 } 2027 if (this.fsUtilizationChore != null) { 2028 choreService.scheduleChore(fsUtilizationChore); 2029 } 2030 if (this.namedQueueServiceChore != null) { 2031 choreService.scheduleChore(namedQueueServiceChore); 2032 } 2033 if (this.brokenStoreFileCleaner != null) { 2034 choreService.scheduleChore(brokenStoreFileCleaner); 2035 } 2036 if (this.rsMobFileCleanerChore != null) { 2037 choreService.scheduleChore(rsMobFileCleanerChore); 2038 } 2039 if (replicationMarkerChore != null) { 2040 LOG.info("Starting replication marker chore"); 2041 choreService.scheduleChore(replicationMarkerChore); 2042 } 2043 2044 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 2045 // an unhandled exception, it will just exit. 2046 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 2047 uncaughtExceptionHandler); 2048 2049 // Create the log splitting worker and start it 2050 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 2051 // quite a while inside Connection layer. The worker won't be available for other 2052 // tasks even after current task is preempted after a split task times out. 2053 Configuration sinkConf = HBaseConfiguration.create(conf); 2054 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2055 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 2056 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2057 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 2058 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 2059 if ( 2060 this.csm != null 2061 && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 2062 ) { 2063 // SplitLogWorker needs csm. If none, don't start this. 2064 this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); 2065 splitLogWorker.start(); 2066 LOG.debug("SplitLogWorker started"); 2067 } 2068 2069 // Memstore services. 2070 startHeapMemoryManager(); 2071 // Call it after starting HeapMemoryManager. 2072 initializeMemStoreChunkCreator(hMemManager); 2073 } 2074 2075 private void initializeThreads() { 2076 // Cache flushing thread. 2077 this.cacheFlusher = new MemStoreFlusher(conf, this); 2078 2079 // Compaction thread 2080 this.compactSplitThread = new CompactSplit(this); 2081 2082 // Prefetch Notifier 2083 this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 2084 2085 // Background thread to check for compactions; needed if region has not gotten updates 2086 // in a while. It will take care of not checking too frequently on store-by-store basis. 2087 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 2088 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 2089 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 2090 2091 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 2092 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 2093 final boolean walEventTrackerEnabled = 2094 conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); 2095 2096 if (isSlowLogTableEnabled || walEventTrackerEnabled) { 2097 // default chore duration: 10 min 2098 // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property 2099 final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY, 2100 DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION); 2101 2102 final int namedQueueChoreDuration = 2103 conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT); 2104 // Considering min of slowLogChoreDuration and namedQueueChoreDuration 2105 int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration); 2106 2107 namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration, 2108 this.namedQueueRecorder, this.getConnection()); 2109 } 2110 2111 if (this.nonceManager != null) { 2112 // Create the scheduled chore that cleans up nonces. 2113 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2114 } 2115 2116 // Setup the Quota Manager 2117 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2118 configurationManager.registerObserver(rsQuotaManager); 2119 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2120 2121 if (QuotaUtil.isQuotaEnabled(conf)) { 2122 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2123 } 2124 2125 boolean onlyMetaRefresh = false; 2126 int storefileRefreshPeriod = 2127 conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2128 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2129 if (storefileRefreshPeriod == 0) { 2130 storefileRefreshPeriod = 2131 conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2132 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2133 onlyMetaRefresh = true; 2134 } 2135 if (storefileRefreshPeriod > 0) { 2136 this.storefileRefresher = 2137 new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); 2138 } 2139 2140 int brokenStoreFileCleanerPeriod = 2141 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, 2142 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); 2143 int brokenStoreFileCleanerDelay = 2144 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, 2145 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); 2146 double brokenStoreFileCleanerDelayJitter = 2147 conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, 2148 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); 2149 double jitterRate = 2150 (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; 2151 long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); 2152 this.brokenStoreFileCleaner = 2153 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), 2154 brokenStoreFileCleanerPeriod, this, conf, this); 2155 2156 this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); 2157 2158 registerConfigurationObservers(); 2159 initializeReplicationMarkerChore(); 2160 } 2161 2162 private void registerConfigurationObservers() { 2163 // Register Replication if possible, as now we support recreating replication peer storage, for 2164 // migrating across different replication peer storages online 2165 if (replicationSourceHandler instanceof ConfigurationObserver) { 2166 configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler); 2167 } 2168 if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) { 2169 configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler); 2170 } 2171 // Registering the compactSplitThread object with the ConfigurationManager. 2172 configurationManager.registerObserver(this.compactSplitThread); 2173 configurationManager.registerObserver(this.cacheFlusher); 2174 configurationManager.registerObserver(this.rpcServices); 2175 configurationManager.registerObserver(this.prefetchExecutorNotifier); 2176 configurationManager.registerObserver(this); 2177 } 2178 2179 /* 2180 * Verify that server is healthy 2181 */ 2182 private boolean isHealthy() { 2183 if (!dataFsOk) { 2184 // File system problem 2185 return false; 2186 } 2187 // Verify that all threads are alive 2188 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2189 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2190 && (this.walRoller == null || this.walRoller.isAlive()) 2191 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2192 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2193 if (!healthy) { 2194 stop("One or more threads are no longer alive -- stop"); 2195 } 2196 return healthy; 2197 } 2198 2199 @Override 2200 public List<WAL> getWALs() { 2201 return walFactory.getWALs(); 2202 } 2203 2204 @Override 2205 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2206 WAL wal = walFactory.getWAL(regionInfo); 2207 if (this.walRoller != null) { 2208 this.walRoller.addWAL(wal); 2209 } 2210 return wal; 2211 } 2212 2213 public LogRoller getWalRoller() { 2214 return walRoller; 2215 } 2216 2217 public WALFactory getWalFactory() { 2218 return walFactory; 2219 } 2220 2221 @Override 2222 public void stop(final String msg) { 2223 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2224 } 2225 2226 /** 2227 * Stops the regionserver. 2228 * @param msg Status message 2229 * @param force True if this is a regionserver abort 2230 * @param user The user executing the stop request, or null if no user is associated 2231 */ 2232 public void stop(final String msg, final boolean force, final User user) { 2233 if (!this.stopped) { 2234 LOG.info("***** STOPPING region server '{}' *****", this); 2235 if (this.rsHost != null) { 2236 // when forced via abort don't allow CPs to override 2237 try { 2238 this.rsHost.preStop(msg, user); 2239 } catch (IOException ioe) { 2240 if (!force) { 2241 LOG.warn("The region server did not stop", ioe); 2242 return; 2243 } 2244 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2245 } 2246 } 2247 this.stopped = true; 2248 LOG.info("STOPPED: " + msg); 2249 // Wakes run() if it is sleeping 2250 sleeper.skipSleepCycle(); 2251 } 2252 } 2253 2254 public void waitForServerOnline() { 2255 while (!isStopped() && !isOnline()) { 2256 synchronized (online) { 2257 try { 2258 online.wait(msgInterval); 2259 } catch (InterruptedException ie) { 2260 Thread.currentThread().interrupt(); 2261 break; 2262 } 2263 } 2264 } 2265 } 2266 2267 @Override 2268 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2269 HRegion r = context.getRegion(); 2270 long openProcId = context.getOpenProcId(); 2271 long masterSystemTime = context.getMasterSystemTime(); 2272 long initiatingMasterActiveTime = context.getInitiatingMasterActiveTime(); 2273 rpcServices.checkOpen(); 2274 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2275 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2276 // Do checks to see if we need to compact (references or too many files) 2277 // Skip compaction check if region is read only 2278 if (!r.isReadOnly()) { 2279 for (HStore s : r.stores.values()) { 2280 if (s.hasReferences() || s.needsCompaction()) { 2281 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2282 } 2283 } 2284 } 2285 long openSeqNum = r.getOpenSeqNum(); 2286 if (openSeqNum == HConstants.NO_SEQNUM) { 2287 // If we opened a region, we should have read some sequence number from it. 2288 LOG.error( 2289 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2290 openSeqNum = 0; 2291 } 2292 2293 // Notify master 2294 if ( 2295 !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2296 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo(), initiatingMasterActiveTime)) 2297 ) { 2298 throw new IOException( 2299 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2300 } 2301 2302 triggerFlushInPrimaryRegion(r); 2303 2304 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2305 } 2306 2307 /** 2308 * Helper method for use in tests. Skip the region transition report when there's no master around 2309 * to receive it. 2310 */ 2311 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2312 final TransitionCode code = context.getCode(); 2313 final long openSeqNum = context.getOpenSeqNum(); 2314 long masterSystemTime = context.getMasterSystemTime(); 2315 final RegionInfo[] hris = context.getHris(); 2316 2317 if (code == TransitionCode.OPENED) { 2318 Preconditions.checkArgument(hris != null && hris.length == 1); 2319 if (hris[0].isMetaRegion()) { 2320 LOG.warn( 2321 "meta table location is stored in master local store, so we can not skip reporting"); 2322 return false; 2323 } else { 2324 try { 2325 MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0], 2326 serverName, openSeqNum, masterSystemTime); 2327 } catch (IOException e) { 2328 LOG.info("Failed to update meta", e); 2329 return false; 2330 } 2331 } 2332 } 2333 return true; 2334 } 2335 2336 private ReportRegionStateTransitionRequest 2337 createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) { 2338 final TransitionCode code = context.getCode(); 2339 final long openSeqNum = context.getOpenSeqNum(); 2340 final RegionInfo[] hris = context.getHris(); 2341 final long[] procIds = context.getProcIds(); 2342 2343 ReportRegionStateTransitionRequest.Builder builder = 2344 ReportRegionStateTransitionRequest.newBuilder(); 2345 builder.setServer(ProtobufUtil.toServerName(serverName)); 2346 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2347 transition.setTransitionCode(code); 2348 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2349 transition.setOpenSeqNum(openSeqNum); 2350 } 2351 for (RegionInfo hri : hris) { 2352 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2353 } 2354 for (long procId : procIds) { 2355 transition.addProcId(procId); 2356 } 2357 transition.setInitiatingMasterActiveTime(context.getInitiatingMasterActiveTime()); 2358 2359 return builder.build(); 2360 } 2361 2362 @Override 2363 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2364 if (TEST_SKIP_REPORTING_TRANSITION) { 2365 return skipReportingTransition(context); 2366 } 2367 final ReportRegionStateTransitionRequest request = 2368 createReportRegionStateTransitionRequest(context); 2369 2370 int tries = 0; 2371 long pauseTime = this.retryPauseTime; 2372 // Keep looping till we get an error. We want to send reports even though server is going down. 2373 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2374 // HRegionServer does down. 2375 while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) { 2376 RegionServerStatusService.BlockingInterface rss = rssStub; 2377 try { 2378 if (rss == null) { 2379 createRegionServerStatusStub(); 2380 continue; 2381 } 2382 ReportRegionStateTransitionResponse response = 2383 rss.reportRegionStateTransition(null, request); 2384 if (response.hasErrorMessage()) { 2385 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2386 break; 2387 } 2388 // Log if we had to retry else don't log unless TRACE. We want to 2389 // know if were successful after an attempt showed in logs as failed. 2390 if (tries > 0 || LOG.isTraceEnabled()) { 2391 LOG.info("TRANSITION REPORTED " + request); 2392 } 2393 // NOTE: Return mid-method!!! 2394 return true; 2395 } catch (ServiceException se) { 2396 IOException ioe = ProtobufUtil.getRemoteException(se); 2397 boolean pause = ioe instanceof ServerNotRunningYetException 2398 || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException; 2399 if (pause) { 2400 // Do backoff else we flood the Master with requests. 2401 pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries); 2402 } else { 2403 pauseTime = this.retryPauseTime; // Reset. 2404 } 2405 LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" 2406 + tries + ")" 2407 + (pause 2408 ? " after " + pauseTime + "ms delay (Master is coming online...)." 2409 : " immediately."), 2410 ioe); 2411 if (pause) { 2412 Threads.sleep(pauseTime); 2413 } 2414 tries++; 2415 if (rssStub == rss) { 2416 rssStub = null; 2417 } 2418 } 2419 } 2420 return false; 2421 } 2422 2423 /** 2424 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2425 * block this thread. See RegionReplicaFlushHandler for details. 2426 */ 2427 private void triggerFlushInPrimaryRegion(final HRegion region) { 2428 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2429 return; 2430 } 2431 TableName tn = region.getTableDescriptor().getTableName(); 2432 if ( 2433 !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) 2434 || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) || 2435 // If the memstore replication not setup, we do not have to wait for observing a flush event 2436 // from primary before starting to serve reads, because gaps from replication is not 2437 // applicable,this logic is from 2438 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by 2439 // HBASE-13063 2440 !region.getTableDescriptor().hasRegionMemStoreReplication() 2441 ) { 2442 region.setReadsEnabled(true); 2443 return; 2444 } 2445 2446 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2447 // RegionReplicaFlushHandler might reset this. 2448 2449 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2450 if (this.executorService != null) { 2451 this.executorService.submit(new RegionReplicaFlushHandler(this, region)); 2452 } else { 2453 LOG.info("Executor is null; not running flush of primary region replica for {}", 2454 region.getRegionInfo()); 2455 } 2456 } 2457 2458 @InterfaceAudience.Private 2459 public RSRpcServices getRSRpcServices() { 2460 return rpcServices; 2461 } 2462 2463 /** 2464 * Cause the server to exit without closing the regions it is serving, the log it is using and 2465 * without notifying the master. Used unit testing and on catastrophic events such as HDFS is 2466 * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused 2467 * the abort, or null 2468 */ 2469 @Override 2470 public void abort(String reason, Throwable cause) { 2471 if (!setAbortRequested()) { 2472 // Abort already in progress, ignore the new request. 2473 LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason); 2474 return; 2475 } 2476 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2477 if (cause != null) { 2478 LOG.error(HBaseMarkers.FATAL, msg, cause); 2479 } else { 2480 LOG.error(HBaseMarkers.FATAL, msg); 2481 } 2482 // HBASE-4014: show list of coprocessors that were loaded to help debug 2483 // regionserver crashes.Note that we're implicitly using 2484 // java.util.HashSet's toString() method to print the coprocessor names. 2485 LOG.error(HBaseMarkers.FATAL, 2486 "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors()); 2487 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2488 try { 2489 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2490 } catch (MalformedObjectNameException | IOException e) { 2491 LOG.warn("Failed dumping metrics", e); 2492 } 2493 2494 // Do our best to report our abort to the master, but this may not work 2495 try { 2496 if (cause != null) { 2497 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2498 } 2499 // Report to the master but only if we have already registered with the master. 2500 RegionServerStatusService.BlockingInterface rss = rssStub; 2501 if (rss != null && this.serverName != null) { 2502 ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); 2503 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2504 builder.setErrorMessage(msg); 2505 rss.reportRSFatalError(null, builder.build()); 2506 } 2507 } catch (Throwable t) { 2508 LOG.warn("Unable to report fatal error to master", t); 2509 } 2510 2511 scheduleAbortTimer(); 2512 // shutdown should be run as the internal user 2513 stop(reason, true, null); 2514 } 2515 2516 /* 2517 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does 2518 * close socket in case want to bring up server on old hostname+port immediately. 2519 */ 2520 @InterfaceAudience.Private 2521 protected void kill() { 2522 this.killed = true; 2523 abort("Simulated kill"); 2524 } 2525 2526 // Limits the time spent in the shutdown process. 2527 private void scheduleAbortTimer() { 2528 if (this.abortMonitor == null) { 2529 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2530 TimerTask abortTimeoutTask = null; 2531 try { 2532 Constructor<? extends TimerTask> timerTaskCtor = 2533 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2534 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2535 timerTaskCtor.setAccessible(true); 2536 abortTimeoutTask = timerTaskCtor.newInstance(); 2537 } catch (Exception e) { 2538 LOG.warn("Initialize abort timeout task failed", e); 2539 } 2540 if (abortTimeoutTask != null) { 2541 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2542 } 2543 } 2544 } 2545 2546 /** 2547 * Wait on all threads to finish. Presumption is that all closes and stops have already been 2548 * called. 2549 */ 2550 protected void stopServiceThreads() { 2551 // clean up the scheduled chores 2552 stopChoreService(); 2553 if (bootstrapNodeManager != null) { 2554 bootstrapNodeManager.stop(); 2555 } 2556 if (this.cacheFlusher != null) { 2557 this.cacheFlusher.shutdown(); 2558 } 2559 if (this.walRoller != null) { 2560 this.walRoller.close(); 2561 } 2562 if (this.compactSplitThread != null) { 2563 this.compactSplitThread.join(); 2564 } 2565 stopExecutorService(); 2566 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 2567 this.replicationSourceHandler.stopReplicationService(); 2568 } else { 2569 if (this.replicationSourceHandler != null) { 2570 this.replicationSourceHandler.stopReplicationService(); 2571 } 2572 if (this.replicationSinkHandler != null) { 2573 this.replicationSinkHandler.stopReplicationService(); 2574 } 2575 } 2576 } 2577 2578 /** Returns Return the object that implements the replication source executorService. */ 2579 @Override 2580 public ReplicationSourceService getReplicationSourceService() { 2581 return replicationSourceHandler; 2582 } 2583 2584 /** Returns Return the object that implements the replication sink executorService. */ 2585 public ReplicationSinkService getReplicationSinkService() { 2586 return replicationSinkHandler; 2587 } 2588 2589 /** 2590 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2591 * connection, the current rssStub must be null. Method will block until a master is available. 2592 * You can break from this block by requesting the server stop. 2593 * @return master + port, or null if server has been stopped 2594 */ 2595 private synchronized ServerName createRegionServerStatusStub() { 2596 // Create RS stub without refreshing the master node from ZK, use cached data 2597 return createRegionServerStatusStub(false); 2598 } 2599 2600 /** 2601 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2602 * connection, the current rssStub must be null. Method will block until a master is available. 2603 * You can break from this block by requesting the server stop. 2604 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2605 * @return master + port, or null if server has been stopped 2606 */ 2607 @InterfaceAudience.Private 2608 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2609 if (rssStub != null) { 2610 return masterAddressTracker.getMasterAddress(); 2611 } 2612 ServerName sn = null; 2613 long previousLogTime = 0; 2614 RegionServerStatusService.BlockingInterface intRssStub = null; 2615 LockService.BlockingInterface intLockStub = null; 2616 boolean interrupted = false; 2617 try { 2618 while (keepLooping()) { 2619 sn = this.masterAddressTracker.getMasterAddress(refresh); 2620 if (sn == null) { 2621 if (!keepLooping()) { 2622 // give up with no connection. 2623 LOG.debug("No master found and cluster is stopped; bailing out"); 2624 return null; 2625 } 2626 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2627 LOG.debug("No master found; retry"); 2628 previousLogTime = EnvironmentEdgeManager.currentTime(); 2629 } 2630 refresh = true; // let's try pull it from ZK directly 2631 if (sleepInterrupted(200)) { 2632 interrupted = true; 2633 } 2634 continue; 2635 } 2636 try { 2637 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, 2638 userProvider.getCurrent(), shortOperationTimeout); 2639 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2640 intLockStub = LockService.newBlockingStub(channel); 2641 break; 2642 } catch (IOException e) { 2643 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2644 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 2645 if (e instanceof ServerNotRunningYetException) { 2646 LOG.info("Master isn't available yet, retrying"); 2647 } else { 2648 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2649 } 2650 previousLogTime = EnvironmentEdgeManager.currentTime(); 2651 } 2652 if (sleepInterrupted(200)) { 2653 interrupted = true; 2654 } 2655 } 2656 } 2657 } finally { 2658 if (interrupted) { 2659 Thread.currentThread().interrupt(); 2660 } 2661 } 2662 this.rssStub = intRssStub; 2663 this.lockStub = intLockStub; 2664 return sn; 2665 } 2666 2667 /** 2668 * @return True if we should break loop because cluster is going down or this server has been 2669 * stopped or hdfs has gone bad. 2670 */ 2671 private boolean keepLooping() { 2672 return !this.stopped && isClusterUp(); 2673 } 2674 2675 /* 2676 * Let the master know we're here Run initialization using parameters passed us by the master. 2677 * @return A Map of key/value configurations we got from the Master else null if we failed to 2678 * register. 2679 */ 2680 private RegionServerStartupResponse reportForDuty() throws IOException { 2681 if (this.masterless) { 2682 return RegionServerStartupResponse.getDefaultInstance(); 2683 } 2684 ServerName masterServerName = createRegionServerStatusStub(true); 2685 RegionServerStatusService.BlockingInterface rss = rssStub; 2686 if (masterServerName == null || rss == null) { 2687 return null; 2688 } 2689 RegionServerStartupResponse result = null; 2690 try { 2691 rpcServices.requestCount.reset(); 2692 rpcServices.rpcGetRequestCount.reset(); 2693 rpcServices.rpcScanRequestCount.reset(); 2694 rpcServices.rpcFullScanRequestCount.reset(); 2695 rpcServices.rpcMultiRequestCount.reset(); 2696 rpcServices.rpcMutateRequestCount.reset(); 2697 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2698 + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode); 2699 long now = EnvironmentEdgeManager.currentTime(); 2700 int port = rpcServices.getSocketAddress().getPort(); 2701 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2702 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2703 request.setUseThisHostnameInstead(useThisHostnameInstead); 2704 } 2705 request.setPort(port); 2706 request.setServerStartCode(this.startcode); 2707 request.setServerCurrentTime(now); 2708 result = rss.regionServerStartup(null, request.build()); 2709 } catch (ServiceException se) { 2710 IOException ioe = ProtobufUtil.getRemoteException(se); 2711 if (ioe instanceof ClockOutOfSyncException) { 2712 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe); 2713 // Re-throw IOE will cause RS to abort 2714 throw ioe; 2715 } else if (ioe instanceof DecommissionedHostRejectedException) { 2716 LOG.error(HBaseMarkers.FATAL, 2717 "Master rejected startup because the host is considered decommissioned", ioe); 2718 // Re-throw IOE will cause RS to abort 2719 throw ioe; 2720 } else if (ioe instanceof ServerNotRunningYetException) { 2721 LOG.debug("Master is not running yet"); 2722 } else { 2723 LOG.warn("error telling master we are up", se); 2724 } 2725 rssStub = null; 2726 } 2727 return result; 2728 } 2729 2730 @Override 2731 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2732 try { 2733 GetLastFlushedSequenceIdRequest req = 2734 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2735 RegionServerStatusService.BlockingInterface rss = rssStub; 2736 if (rss == null) { // Try to connect one more time 2737 createRegionServerStatusStub(); 2738 rss = rssStub; 2739 if (rss == null) { 2740 // Still no luck, we tried 2741 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2742 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2743 .build(); 2744 } 2745 } 2746 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2747 return RegionStoreSequenceIds.newBuilder() 2748 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2749 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2750 } catch (ServiceException e) { 2751 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2752 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2753 .build(); 2754 } 2755 } 2756 2757 /** 2758 * Close meta region if we carry it 2759 * @param abort Whether we're running an abort. 2760 */ 2761 private void closeMetaTableRegions(final boolean abort) { 2762 HRegion meta = null; 2763 this.onlineRegionsLock.writeLock().lock(); 2764 try { 2765 for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) { 2766 RegionInfo hri = e.getValue().getRegionInfo(); 2767 if (hri.isMetaRegion()) { 2768 meta = e.getValue(); 2769 } 2770 if (meta != null) { 2771 break; 2772 } 2773 } 2774 } finally { 2775 this.onlineRegionsLock.writeLock().unlock(); 2776 } 2777 if (meta != null) { 2778 closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2779 } 2780 } 2781 2782 /** 2783 * Schedule closes on all user regions. Should be safe calling multiple times because it wont' 2784 * close regions that are already closed or that are closing. 2785 * @param abort Whether we're running an abort. 2786 */ 2787 private void closeUserRegions(final boolean abort) { 2788 this.onlineRegionsLock.writeLock().lock(); 2789 try { 2790 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 2791 HRegion r = e.getValue(); 2792 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2793 // Don't update zk with this close transition; pass false. 2794 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2795 } 2796 } 2797 } finally { 2798 this.onlineRegionsLock.writeLock().unlock(); 2799 } 2800 } 2801 2802 protected Map<String, HRegion> getOnlineRegions() { 2803 return this.onlineRegions; 2804 } 2805 2806 public int getNumberOfOnlineRegions() { 2807 return this.onlineRegions.size(); 2808 } 2809 2810 /** 2811 * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM 2812 * as client; HRegion cannot be serialized to cross an rpc. 2813 */ 2814 public Collection<HRegion> getOnlineRegionsLocalContext() { 2815 Collection<HRegion> regions = this.onlineRegions.values(); 2816 return Collections.unmodifiableCollection(regions); 2817 } 2818 2819 @Override 2820 public void addRegion(HRegion region) { 2821 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2822 configurationManager.registerObserver(region); 2823 } 2824 2825 private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region, 2826 long size) { 2827 if (!sortedRegions.containsKey(size)) { 2828 sortedRegions.put(size, new ArrayList<>()); 2829 } 2830 sortedRegions.get(size).add(region); 2831 } 2832 2833 /** 2834 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2835 * the biggest. 2836 */ 2837 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2838 // we'll sort the regions in reverse 2839 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2840 // Copy over all regions. Regions are sorted by size with biggest first. 2841 for (HRegion region : this.onlineRegions.values()) { 2842 addRegion(sortedRegions, region, region.getMemStoreOffHeapSize()); 2843 } 2844 return sortedRegions; 2845 } 2846 2847 /** 2848 * @return A new Map of online regions sorted by region heap size with the first entry being the 2849 * biggest. 2850 */ 2851 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2852 // we'll sort the regions in reverse 2853 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2854 // Copy over all regions. Regions are sorted by size with biggest first. 2855 for (HRegion region : this.onlineRegions.values()) { 2856 addRegion(sortedRegions, region, region.getMemStoreHeapSize()); 2857 } 2858 return sortedRegions; 2859 } 2860 2861 /** Returns reference to FlushRequester */ 2862 @Override 2863 public FlushRequester getFlushRequester() { 2864 return this.cacheFlusher; 2865 } 2866 2867 @Override 2868 public CompactionRequester getCompactionRequestor() { 2869 return this.compactSplitThread; 2870 } 2871 2872 @Override 2873 public LeaseManager getLeaseManager() { 2874 return leaseManager; 2875 } 2876 2877 /** Returns {@code true} when the data file system is available, {@code false} otherwise. */ 2878 boolean isDataFileSystemOk() { 2879 return this.dataFsOk; 2880 } 2881 2882 public RegionServerCoprocessorHost getRegionServerCoprocessorHost() { 2883 return this.rsHost; 2884 } 2885 2886 @Override 2887 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2888 return this.regionsInTransitionInRS; 2889 } 2890 2891 @Override 2892 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 2893 return rsQuotaManager; 2894 } 2895 2896 // 2897 // Main program and support routines 2898 // 2899 /** 2900 * Load the replication executorService objects, if any 2901 */ 2902 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 2903 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { 2904 // read in the name of the source replication class from the config file. 2905 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 2906 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2907 2908 // read in the name of the sink replication class from the config file. 2909 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 2910 HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT); 2911 2912 // If both the sink and the source class names are the same, then instantiate 2913 // only one object. 2914 if (sourceClassname.equals(sinkClassname)) { 2915 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2916 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2917 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 2918 server.sameReplicationSourceAndSink = true; 2919 } else { 2920 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2921 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2922 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 2923 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2924 server.sameReplicationSourceAndSink = false; 2925 } 2926 } 2927 2928 private static <T extends ReplicationService> T newReplicationInstance(String classname, 2929 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 2930 Path oldLogDir, WALFactory walFactory) throws IOException { 2931 final Class<? extends T> clazz; 2932 try { 2933 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 2934 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 2935 } catch (java.lang.ClassNotFoundException nfe) { 2936 throw new IOException("Could not find class for " + classname); 2937 } 2938 T service = ReflectionUtils.newInstance(clazz, conf); 2939 service.initialize(server, walFs, logDir, oldLogDir, walFactory); 2940 return service; 2941 } 2942 2943 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() { 2944 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 2945 if (!this.isOnline()) { 2946 return walGroupsReplicationStatus; 2947 } 2948 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 2949 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 2950 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 2951 for (ReplicationSourceInterface source : allSources) { 2952 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 2953 } 2954 return walGroupsReplicationStatus; 2955 } 2956 2957 /** 2958 * Utility for constructing an instance of the passed HRegionServer class. 2959 */ 2960 static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass, 2961 final Configuration conf) { 2962 try { 2963 Constructor<? extends HRegionServer> c = 2964 regionServerClass.getConstructor(Configuration.class); 2965 return c.newInstance(conf); 2966 } catch (Exception e) { 2967 throw new RuntimeException( 2968 "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); 2969 } 2970 } 2971 2972 /** 2973 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 2974 */ 2975 public static void main(String[] args) { 2976 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 2977 VersionInfo.logVersion(); 2978 Configuration conf = HBaseConfiguration.create(); 2979 @SuppressWarnings("unchecked") 2980 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 2981 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 2982 2983 new HRegionServerCommandLine(regionServerClass).doMain(args); 2984 } 2985 2986 /** 2987 * Gets the online regions of the specified table. This method looks at the in-memory 2988 * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions. 2989 * If a region on this table has been closed during a disable, etc., it will not be included in 2990 * the returned list. So, the returned list may not necessarily be ALL regions in this table, its 2991 * all the ONLINE regions in the table. 2992 * @param tableName table to limit the scope of the query 2993 * @return Online regions from <code>tableName</code> 2994 */ 2995 @Override 2996 public List<HRegion> getRegions(TableName tableName) { 2997 List<HRegion> tableRegions = new ArrayList<>(); 2998 synchronized (this.onlineRegions) { 2999 for (HRegion region : this.onlineRegions.values()) { 3000 RegionInfo regionInfo = region.getRegionInfo(); 3001 if (regionInfo.getTable().equals(tableName)) { 3002 tableRegions.add(region); 3003 } 3004 } 3005 } 3006 return tableRegions; 3007 } 3008 3009 @Override 3010 public List<HRegion> getRegions() { 3011 List<HRegion> allRegions; 3012 synchronized (this.onlineRegions) { 3013 // Return a clone copy of the onlineRegions 3014 allRegions = new ArrayList<>(onlineRegions.values()); 3015 } 3016 return allRegions; 3017 } 3018 3019 /** 3020 * Gets the online tables in this RS. This method looks at the in-memory onlineRegions. 3021 * @return all the online tables in this RS 3022 */ 3023 public Set<TableName> getOnlineTables() { 3024 Set<TableName> tables = new HashSet<>(); 3025 synchronized (this.onlineRegions) { 3026 for (Region region : this.onlineRegions.values()) { 3027 tables.add(region.getTableDescriptor().getTableName()); 3028 } 3029 } 3030 return tables; 3031 } 3032 3033 public String[] getRegionServerCoprocessors() { 3034 TreeSet<String> coprocessors = new TreeSet<>(); 3035 try { 3036 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3037 } catch (IOException exception) { 3038 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " 3039 + "skipping."); 3040 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3041 } 3042 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3043 for (HRegion region : regions) { 3044 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3045 try { 3046 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3047 } catch (IOException exception) { 3048 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region 3049 + "; skipping."); 3050 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3051 } 3052 } 3053 coprocessors.addAll(rsHost.getCoprocessors()); 3054 return coprocessors.toArray(new String[0]); 3055 } 3056 3057 /** 3058 * Try to close the region, logs a warning on failure but continues. 3059 * @param region Region to close 3060 */ 3061 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3062 try { 3063 if (!closeRegion(region.getEncodedName(), abort, null)) { 3064 LOG 3065 .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing"); 3066 } 3067 } catch (IOException e) { 3068 LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing", 3069 e); 3070 } 3071 } 3072 3073 /** 3074 * Close asynchronously a region, can be called from the master or internally by the regionserver 3075 * when stopping. If called from the master, the region will update the status. 3076 * <p> 3077 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3078 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3079 * </p> 3080 * <p> 3081 * If a close was in progress, this new request will be ignored, and an exception thrown. 3082 * </p> 3083 * <p> 3084 * Provides additional flag to indicate if this region blocks should be evicted from the cache. 3085 * </p> 3086 * @param encodedName Region to close 3087 * @param abort True if we are aborting 3088 * @param destination Where the Region is being moved too... maybe null if unknown. 3089 * @return True if closed a region. 3090 * @throws NotServingRegionException if the region is not online 3091 */ 3092 protected boolean closeRegion(String encodedName, final boolean abort, 3093 final ServerName destination) throws NotServingRegionException { 3094 // Check for permissions to close. 3095 HRegion actualRegion = this.getRegion(encodedName); 3096 // Can be null if we're calling close on a region that's not online 3097 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3098 try { 3099 actualRegion.getCoprocessorHost().preClose(false); 3100 } catch (IOException exp) { 3101 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3102 return false; 3103 } 3104 } 3105 3106 // previous can come back 'null' if not in map. 3107 final Boolean previous = 3108 this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE); 3109 3110 if (Boolean.TRUE.equals(previous)) { 3111 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " 3112 + "trying to OPEN. Cancelling OPENING."); 3113 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3114 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3115 // We're going to try to do a standard close then. 3116 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." 3117 + " Doing a standard close now"); 3118 return closeRegion(encodedName, abort, destination); 3119 } 3120 // Let's get the region from the online region list again 3121 actualRegion = this.getRegion(encodedName); 3122 if (actualRegion == null) { // If already online, we still need to close it. 3123 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3124 // The master deletes the znode when it receives this exception. 3125 throw new NotServingRegionException( 3126 "The region " + encodedName + " was opening but not yet served. Opening is cancelled."); 3127 } 3128 } else if (previous == null) { 3129 LOG.info("Received CLOSE for {}", encodedName); 3130 } else if (Boolean.FALSE.equals(previous)) { 3131 LOG.info("Received CLOSE for the region: " + encodedName 3132 + ", which we are already trying to CLOSE, but not completed yet"); 3133 return true; 3134 } 3135 3136 if (actualRegion == null) { 3137 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3138 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3139 // The master deletes the znode when it receives this exception. 3140 throw new NotServingRegionException( 3141 "The region " + encodedName + " is not online, and is not opening."); 3142 } 3143 3144 CloseRegionHandler crh; 3145 final RegionInfo hri = actualRegion.getRegionInfo(); 3146 if (hri.isMetaRegion()) { 3147 crh = new CloseMetaHandler(this, this, hri, abort); 3148 } else { 3149 crh = new CloseRegionHandler(this, this, hri, abort, destination); 3150 } 3151 this.executorService.submit(crh); 3152 return true; 3153 } 3154 3155 /** 3156 * @return HRegion for the passed binary <code>regionName</code> or null if named region is not 3157 * member of the online regions. 3158 */ 3159 public HRegion getOnlineRegion(final byte[] regionName) { 3160 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3161 return this.onlineRegions.get(encodedRegionName); 3162 } 3163 3164 @Override 3165 public HRegion getRegion(final String encodedRegionName) { 3166 return this.onlineRegions.get(encodedRegionName); 3167 } 3168 3169 @Override 3170 public boolean removeRegion(final HRegion r, ServerName destination) { 3171 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3172 if (DataTieringManager.getInstance() != null) { 3173 DataTieringManager.getInstance().getRegionColdDataSize() 3174 .remove(r.getRegionInfo().getEncodedName()); 3175 } 3176 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3177 if (destination != null) { 3178 long closeSeqNum = r.getMaxFlushedSeqId(); 3179 if (closeSeqNum == HConstants.NO_SEQNUM) { 3180 // No edits in WAL for this region; get the sequence number when the region was opened. 3181 closeSeqNum = r.getOpenSeqNum(); 3182 if (closeSeqNum == HConstants.NO_SEQNUM) { 3183 closeSeqNum = 0; 3184 } 3185 } 3186 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3187 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3188 if (selfMove) { 3189 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put( 3190 r.getRegionInfo().getEncodedName(), 3191 new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3192 } 3193 } 3194 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3195 configurationManager.deregisterObserver(r); 3196 return toReturn != null; 3197 } 3198 3199 /** 3200 * Protected Utility method for safely obtaining an HRegion handle. 3201 * @param regionName Name of online {@link HRegion} to return 3202 * @return {@link HRegion} for <code>regionName</code> 3203 */ 3204 protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { 3205 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3206 return getRegionByEncodedName(regionName, encodedRegionName); 3207 } 3208 3209 public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { 3210 return getRegionByEncodedName(null, encodedRegionName); 3211 } 3212 3213 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3214 throws NotServingRegionException { 3215 HRegion region = this.onlineRegions.get(encodedRegionName); 3216 if (region == null) { 3217 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3218 if (moveInfo != null) { 3219 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3220 } 3221 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3222 String regionNameStr = 3223 regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName); 3224 if (isOpening != null && isOpening) { 3225 throw new RegionOpeningException( 3226 "Region " + regionNameStr + " is opening on " + this.serverName); 3227 } 3228 throw new NotServingRegionException( 3229 "" + regionNameStr + " is not online on " + this.serverName); 3230 } 3231 return region; 3232 } 3233 3234 /** 3235 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't 3236 * already. 3237 * @param t Throwable 3238 * @param msg Message to log in error. Can be null. 3239 * @return Throwable converted to an IOE; methods can only let out IOEs. 3240 */ 3241 private Throwable cleanup(final Throwable t, final String msg) { 3242 // Don't log as error if NSRE; NSRE is 'normal' operation. 3243 if (t instanceof NotServingRegionException) { 3244 LOG.debug("NotServingRegionException; " + t.getMessage()); 3245 return t; 3246 } 3247 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3248 if (msg == null) { 3249 LOG.error("", e); 3250 } else { 3251 LOG.error(msg, e); 3252 } 3253 if (!rpcServices.checkOOME(t)) { 3254 checkFileSystem(); 3255 } 3256 return t; 3257 } 3258 3259 /** 3260 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3261 * @return Make <code>t</code> an IOE if it isn't already. 3262 */ 3263 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3264 return (t instanceof IOException ? (IOException) t 3265 : msg == null || msg.length() == 0 ? new IOException(t) 3266 : new IOException(msg, t)); 3267 } 3268 3269 /** 3270 * Checks to see if the file system is still accessible. If not, sets abortRequested and 3271 * stopRequested 3272 * @return false if file system is not available 3273 */ 3274 boolean checkFileSystem() { 3275 if (this.dataFsOk && this.dataFs != null) { 3276 try { 3277 FSUtils.checkFileSystemAvailable(this.dataFs); 3278 } catch (IOException e) { 3279 abort("File System not available", e); 3280 this.dataFsOk = false; 3281 } 3282 } 3283 return this.dataFsOk; 3284 } 3285 3286 @Override 3287 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3288 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3289 Address[] addr = new Address[favoredNodes.size()]; 3290 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3291 // it is a map of region name to Address[] 3292 for (int i = 0; i < favoredNodes.size(); i++) { 3293 addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); 3294 } 3295 regionFavoredNodesMap.put(encodedRegionName, addr); 3296 } 3297 3298 /** 3299 * Return the favored nodes for a region given its encoded name. Look at the comment around 3300 * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here. 3301 * @param encodedRegionName the encoded region name. 3302 * @return array of favored locations 3303 */ 3304 @Override 3305 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3306 return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); 3307 } 3308 3309 @Override 3310 public ServerNonceManager getNonceManager() { 3311 return this.nonceManager; 3312 } 3313 3314 private static class MovedRegionInfo { 3315 private final ServerName serverName; 3316 private final long seqNum; 3317 3318 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3319 this.serverName = serverName; 3320 this.seqNum = closeSeqNum; 3321 } 3322 3323 public ServerName getServerName() { 3324 return serverName; 3325 } 3326 3327 public long getSeqNum() { 3328 return seqNum; 3329 } 3330 } 3331 3332 /** 3333 * We need a timeout. If not there is a risk of giving a wrong information: this would double the 3334 * number of network calls instead of reducing them. 3335 */ 3336 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3337 3338 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, 3339 boolean selfMove) { 3340 if (selfMove) { 3341 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3342 return; 3343 } 3344 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" 3345 + closeSeqNum); 3346 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3347 } 3348 3349 // public for being called in tests 3350 @InterfaceAudience.Private 3351 public void removeFromMovedRegions(String encodedName) { 3352 movedRegionInfoCache.invalidate(encodedName); 3353 } 3354 3355 @InterfaceAudience.Private 3356 public MovedRegionInfo getMovedRegion(String encodedRegionName) { 3357 return movedRegionInfoCache.getIfPresent(encodedRegionName); 3358 } 3359 3360 @InterfaceAudience.Private 3361 public int movedRegionCacheExpiredTime() { 3362 return TIMEOUT_REGION_MOVED; 3363 } 3364 3365 private String getMyEphemeralNodePath() { 3366 return zooKeeper.getZNodePaths().getRsPath(serverName); 3367 } 3368 3369 private boolean isHealthCheckerConfigured() { 3370 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3371 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3372 } 3373 3374 /** Returns the underlying {@link CompactSplit} for the servers */ 3375 public CompactSplit getCompactSplitThread() { 3376 return this.compactSplitThread; 3377 } 3378 3379 CoprocessorServiceResponse execRegionServerService( 3380 @SuppressWarnings("UnusedParameters") final RpcController controller, 3381 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3382 try { 3383 ServerRpcController serviceController = new ServerRpcController(); 3384 CoprocessorServiceCall call = serviceRequest.getCall(); 3385 String serviceName = call.getServiceName(); 3386 Service service = coprocessorServiceHandlers.get(serviceName); 3387 if (service == null) { 3388 throw new UnknownProtocolException(null, 3389 "No registered coprocessor executorService found for " + serviceName); 3390 } 3391 ServiceDescriptor serviceDesc = service.getDescriptorForType(); 3392 3393 String methodName = call.getMethodName(); 3394 MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName); 3395 if (methodDesc == null) { 3396 throw new UnknownProtocolException(service.getClass(), 3397 "Unknown method " + methodName + " called on executorService " + serviceName); 3398 } 3399 3400 Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3401 final Message.Builder responseBuilder = 3402 service.getResponsePrototype(methodDesc).newBuilderForType(); 3403 service.callMethod(methodDesc, serviceController, request, message -> { 3404 if (message != null) { 3405 responseBuilder.mergeFrom(message); 3406 } 3407 }); 3408 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3409 if (exception != null) { 3410 throw exception; 3411 } 3412 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3413 } catch (IOException ie) { 3414 throw new ServiceException(ie); 3415 } 3416 } 3417 3418 /** 3419 * May be null if this is a master which not carry table. 3420 * @return The block cache instance used by the regionserver. 3421 */ 3422 @Override 3423 public Optional<BlockCache> getBlockCache() { 3424 return Optional.ofNullable(this.blockCache); 3425 } 3426 3427 /** 3428 * May be null if this is a master which not carry table. 3429 * @return The cache for mob files used by the regionserver. 3430 */ 3431 @Override 3432 public Optional<MobFileCache> getMobFileCache() { 3433 return Optional.ofNullable(this.mobFileCache); 3434 } 3435 3436 CacheEvictionStats clearRegionBlockCache(Region region) { 3437 long evictedBlocks = 0; 3438 3439 for (Store store : region.getStores()) { 3440 for (StoreFile hFile : store.getStorefiles()) { 3441 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3442 } 3443 } 3444 3445 return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build(); 3446 } 3447 3448 @Override 3449 public double getCompactionPressure() { 3450 double max = 0; 3451 for (Region region : onlineRegions.values()) { 3452 for (Store store : region.getStores()) { 3453 double normCount = store.getCompactionPressure(); 3454 if (normCount > max) { 3455 max = normCount; 3456 } 3457 } 3458 } 3459 return max; 3460 } 3461 3462 @Override 3463 public HeapMemoryManager getHeapMemoryManager() { 3464 return hMemManager; 3465 } 3466 3467 public MemStoreFlusher getMemStoreFlusher() { 3468 return cacheFlusher; 3469 } 3470 3471 /** 3472 * For testing 3473 * @return whether all wal roll request finished for this regionserver 3474 */ 3475 @InterfaceAudience.Private 3476 public boolean walRollRequestFinished() { 3477 return this.walRoller.walRollFinished(); 3478 } 3479 3480 @Override 3481 public ThroughputController getFlushThroughputController() { 3482 return flushThroughputController; 3483 } 3484 3485 @Override 3486 public double getFlushPressure() { 3487 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3488 // return 0 during RS initialization 3489 return 0.0; 3490 } 3491 return getRegionServerAccounting().getFlushPressure(); 3492 } 3493 3494 /** 3495 * Dynamically updates HRegionServer's configuration. Since HRegionServer inherits from 3496 * {@link HBaseServerBase}, the {@code updatedConf} parameter references the same 3497 * {@link Configuration} object as HRegionServer's {@code this.conf} instance variable in a real 3498 * HBase deployment. This isn't necessarily the case in unit tests. 3499 * @param updatedConf the dynamically updated configuration 3500 */ 3501 @Override 3502 public void onConfigurationChange(Configuration updatedConf) { 3503 ThroughputController old = this.flushThroughputController; 3504 if (old != null) { 3505 old.stop("configuration change"); 3506 } 3507 this.flushThroughputController = FlushThroughputControllerFactory.create(this, updatedConf); 3508 try { 3509 Superusers.initialize(updatedConf); 3510 } catch (IOException e) { 3511 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3512 } 3513 3514 boolean originalIsReadOnlyEnabled = CoprocessorConfigurationUtil 3515 .areReadOnlyCoprocessorsLoaded(this.conf, CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 3516 3517 // updatedConf and this.conf reference the same Configuration object in an actual HBase 3518 // deployment. However, in unit test cases they reference different Configuration objects, so 3519 // this.conf needs to be updated. 3520 CoprocessorConfigurationUtil.maybeUpdateCoprocessors(updatedConf, this.conf, 3521 originalIsReadOnlyEnabled, this.rsHost, CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 3522 false, this.toString(), conf -> this.rsHost = new RegionServerCoprocessorHost(this, conf)); 3523 } 3524 3525 @Override 3526 public MetricsRegionServer getMetrics() { 3527 return metricsRegionServer; 3528 } 3529 3530 @Override 3531 public SecureBulkLoadManager getSecureBulkLoadManager() { 3532 return this.secureBulkLoadManager; 3533 } 3534 3535 @Override 3536 public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description, 3537 final Abortable abort) { 3538 final LockServiceClient client = 3539 new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator()); 3540 return client.regionLock(regionInfo, description, abort); 3541 } 3542 3543 @Override 3544 public void unassign(byte[] regionName) throws IOException { 3545 FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false)); 3546 } 3547 3548 @Override 3549 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3550 return this.rsSpaceQuotaManager; 3551 } 3552 3553 @Override 3554 public boolean reportFileArchivalForQuotas(TableName tableName, 3555 Collection<Entry<String, Long>> archivedFiles) { 3556 if (TEST_SKIP_REPORTING_TRANSITION) { 3557 return false; 3558 } 3559 RegionServerStatusService.BlockingInterface rss = rssStub; 3560 if (rss == null || rsSpaceQuotaManager == null) { 3561 // the current server could be stopping. 3562 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3563 return false; 3564 } 3565 try { 3566 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3567 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3568 rss.reportFileArchival(null, request); 3569 } catch (ServiceException se) { 3570 IOException ioe = ProtobufUtil.getRemoteException(se); 3571 if (ioe instanceof PleaseHoldException) { 3572 if (LOG.isTraceEnabled()) { 3573 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3574 + " This will be retried.", ioe); 3575 } 3576 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3577 return false; 3578 } 3579 if (rssStub == rss) { 3580 rssStub = null; 3581 } 3582 // re-create the stub if we failed to report the archival 3583 createRegionServerStatusStub(true); 3584 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3585 return false; 3586 } 3587 return true; 3588 } 3589 3590 void executeProcedure(long procId, long initiatingMasterActiveTime, 3591 RSProcedureCallable callable) { 3592 executorService 3593 .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable)); 3594 } 3595 3596 public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, Throwable error, 3597 byte[] procResultData) { 3598 procedureResultReporter.complete(procId, initiatingMasterActiveTime, error, procResultData); 3599 } 3600 3601 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3602 RegionServerStatusService.BlockingInterface rss; 3603 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 3604 for (;;) { 3605 rss = rssStub; 3606 if (rss != null) { 3607 break; 3608 } 3609 createRegionServerStatusStub(); 3610 } 3611 try { 3612 rss.reportProcedureDone(null, request); 3613 } catch (ServiceException se) { 3614 if (rssStub == rss) { 3615 rssStub = null; 3616 } 3617 throw ProtobufUtil.getRemoteException(se); 3618 } 3619 } 3620 3621 /** 3622 * Will ignore the open/close region procedures which already submitted or executed. When master 3623 * had unfinished open/close region procedure and restarted, new active master may send duplicate 3624 * open/close region request to regionserver. The open/close request is submitted to a thread pool 3625 * and execute. So first need a cache for submitted open/close region procedures. After the 3626 * open/close region request executed and report region transition succeed, cache it in executed 3627 * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3628 * transition succeed, master will not send the open/close region request to regionserver again. 3629 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3630 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See 3631 * HBASE-22404 for more details. 3632 * @param procId the id of the open/close region procedure 3633 * @return true if the procedure can be submitted. 3634 */ 3635 boolean submitRegionProcedure(long procId) { 3636 if (procId == -1) { 3637 return true; 3638 } 3639 // Ignore the region procedures which already submitted. 3640 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3641 if (previous != null) { 3642 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3643 return false; 3644 } 3645 // Ignore the region procedures which already executed. 3646 if (executedRegionProcedures.getIfPresent(procId) != null) { 3647 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3648 return false; 3649 } 3650 return true; 3651 } 3652 3653 /** 3654 * See {@link #submitRegionProcedure(long)}. 3655 * @param procId the id of the open/close region procedure 3656 */ 3657 public void finishRegionProcedure(long procId) { 3658 executedRegionProcedures.put(procId, procId); 3659 submittedRegionProcedures.remove(procId); 3660 } 3661 3662 /** 3663 * Force to terminate region server when abort timeout. 3664 */ 3665 private static class SystemExitWhenAbortTimeout extends TimerTask { 3666 3667 public SystemExitWhenAbortTimeout() { 3668 } 3669 3670 @Override 3671 public void run() { 3672 LOG.warn("Aborting region server timed out, terminating forcibly" 3673 + " and does not wait for any running shutdown hooks or finalizers to finish their work." 3674 + " Thread dump to stdout."); 3675 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3676 Runtime.getRuntime().halt(1); 3677 } 3678 } 3679 3680 @InterfaceAudience.Private 3681 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 3682 return compactedFileDischarger; 3683 } 3684 3685 /** 3686 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}} 3687 * @return pause time 3688 */ 3689 @InterfaceAudience.Private 3690 public long getRetryPauseTime() { 3691 return this.retryPauseTime; 3692 } 3693 3694 @Override 3695 public Optional<ServerName> getActiveMaster() { 3696 return Optional.ofNullable(masterAddressTracker.getMasterAddress()); 3697 } 3698 3699 @Override 3700 public List<ServerName> getBackupMasters() { 3701 return masterAddressTracker.getBackupMasters(); 3702 } 3703 3704 @Override 3705 public Iterator<ServerName> getBootstrapNodes() { 3706 return bootstrapNodeManager.getBootstrapNodes().iterator(); 3707 } 3708 3709 @Override 3710 public List<HRegionLocation> getMetaLocations() { 3711 return metaRegionLocationCache.getMetaRegionLocations(); 3712 } 3713 3714 @Override 3715 protected NamedQueueRecorder createNamedQueueRecord() { 3716 return NamedQueueRecorder.getInstance(conf); 3717 } 3718 3719 @Override 3720 protected boolean clusterMode() { 3721 // this method will be called in the constructor of super class, so we can not return masterless 3722 // directly here, as it will always be false. 3723 return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 3724 } 3725 3726 @InterfaceAudience.Private 3727 public BrokenStoreFileCleaner getBrokenStoreFileCleaner() { 3728 return brokenStoreFileCleaner; 3729 } 3730 3731 @InterfaceAudience.Private 3732 public RSMobFileCleanerChore getRSMobFileCleanerChore() { 3733 return rsMobFileCleanerChore; 3734 } 3735 3736 RSSnapshotVerifier getRsSnapshotVerifier() { 3737 return rsSnapshotVerifier; 3738 } 3739 3740 @Override 3741 protected void stopChores() { 3742 shutdownChore(nonceManagerChore); 3743 shutdownChore(compactionChecker); 3744 shutdownChore(compactedFileDischarger); 3745 shutdownChore(periodicFlusher); 3746 shutdownChore(healthCheckChore); 3747 shutdownChore(executorStatusChore); 3748 shutdownChore(storefileRefresher); 3749 shutdownChore(fsUtilizationChore); 3750 shutdownChore(namedQueueServiceChore); 3751 shutdownChore(brokenStoreFileCleaner); 3752 shutdownChore(rsMobFileCleanerChore); 3753 shutdownChore(replicationMarkerChore); 3754 } 3755 3756 @Override 3757 public RegionReplicationBufferManager getRegionReplicationBufferManager() { 3758 return regionReplicationBufferManager; 3759 } 3760}