001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT; 026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; 027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT; 028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY; 029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT; 030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; 031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; 032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; 033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; 034 035import io.opentelemetry.api.trace.Span; 036import io.opentelemetry.api.trace.StatusCode; 037import io.opentelemetry.context.Scope; 038import java.io.IOException; 039import java.io.PrintWriter; 040import java.lang.management.MemoryUsage; 041import java.lang.reflect.Constructor; 042import java.net.InetSocketAddress; 043import java.time.Duration; 044import java.util.ArrayList; 045import java.util.Collection; 046import java.util.Collections; 047import java.util.Comparator; 048import java.util.HashSet; 049import java.util.Iterator; 050import java.util.List; 051import java.util.Map; 052import java.util.Map.Entry; 053import java.util.Objects; 054import java.util.Optional; 055import java.util.Set; 056import java.util.SortedMap; 057import java.util.Timer; 058import java.util.TimerTask; 059import java.util.TreeMap; 060import java.util.TreeSet; 061import java.util.concurrent.ConcurrentHashMap; 062import java.util.concurrent.ConcurrentMap; 063import java.util.concurrent.ConcurrentSkipListMap; 064import java.util.concurrent.ThreadLocalRandom; 065import java.util.concurrent.TimeUnit; 066import java.util.concurrent.atomic.AtomicBoolean; 067import java.util.concurrent.locks.ReentrantReadWriteLock; 068import java.util.function.Consumer; 069import java.util.stream.Collectors; 070import javax.management.MalformedObjectNameException; 071import javax.servlet.http.HttpServlet; 072import org.apache.commons.lang3.StringUtils; 073import org.apache.commons.lang3.mutable.MutableFloat; 074import org.apache.hadoop.conf.Configuration; 075import org.apache.hadoop.fs.FileSystem; 076import org.apache.hadoop.fs.Path; 077import org.apache.hadoop.hbase.Abortable; 078import org.apache.hadoop.hbase.CacheEvictionStats; 079import org.apache.hadoop.hbase.CallQueueTooBigException; 080import org.apache.hadoop.hbase.ClockOutOfSyncException; 081import org.apache.hadoop.hbase.DoNotRetryIOException; 082import org.apache.hadoop.hbase.ExecutorStatusChore; 083import org.apache.hadoop.hbase.HBaseConfiguration; 084import org.apache.hadoop.hbase.HBaseInterfaceAudience; 085import org.apache.hadoop.hbase.HBaseServerBase; 086import org.apache.hadoop.hbase.HConstants; 087import org.apache.hadoop.hbase.HDFSBlocksDistribution; 088import org.apache.hadoop.hbase.HRegionLocation; 089import org.apache.hadoop.hbase.HealthCheckChore; 090import org.apache.hadoop.hbase.MetaTableAccessor; 091import org.apache.hadoop.hbase.NotServingRegionException; 092import org.apache.hadoop.hbase.PleaseHoldException; 093import org.apache.hadoop.hbase.ScheduledChore; 094import org.apache.hadoop.hbase.ServerName; 095import org.apache.hadoop.hbase.Stoppable; 096import org.apache.hadoop.hbase.TableName; 097import org.apache.hadoop.hbase.YouAreDeadException; 098import org.apache.hadoop.hbase.ZNodeClearer; 099import org.apache.hadoop.hbase.client.ConnectionUtils; 100import org.apache.hadoop.hbase.client.RegionInfo; 101import org.apache.hadoop.hbase.client.RegionInfoBuilder; 102import org.apache.hadoop.hbase.client.locking.EntityLock; 103import org.apache.hadoop.hbase.client.locking.LockServiceClient; 104import org.apache.hadoop.hbase.conf.ConfigurationObserver; 105import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 106import org.apache.hadoop.hbase.exceptions.RegionMovedException; 107import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 108import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 109import org.apache.hadoop.hbase.executor.ExecutorType; 110import org.apache.hadoop.hbase.http.InfoServer; 111import org.apache.hadoop.hbase.io.hfile.BlockCache; 112import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 113import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 114import org.apache.hadoop.hbase.io.hfile.HFile; 115import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 116import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 117import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 118import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException; 119import org.apache.hadoop.hbase.ipc.RpcClient; 120import org.apache.hadoop.hbase.ipc.RpcServer; 121import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 122import org.apache.hadoop.hbase.ipc.ServerRpcController; 123import org.apache.hadoop.hbase.log.HBaseMarkers; 124import org.apache.hadoop.hbase.mob.MobFileCache; 125import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; 126import org.apache.hadoop.hbase.monitoring.TaskMonitor; 127import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 128import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore; 129import org.apache.hadoop.hbase.net.Address; 130import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 131import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 132import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 133import org.apache.hadoop.hbase.quotas.QuotaUtil; 134import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 135import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 136import org.apache.hadoop.hbase.quotas.RegionSize; 137import org.apache.hadoop.hbase.quotas.RegionSizeStore; 138import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 139import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 140import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 141import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 142import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 143import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 144import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 145import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 146import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet; 147import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; 148import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; 149import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 150import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 151import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 152import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; 153import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 154import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; 155import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 156import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 157import org.apache.hadoop.hbase.security.SecurityConstants; 158import org.apache.hadoop.hbase.security.Superusers; 159import org.apache.hadoop.hbase.security.User; 160import org.apache.hadoop.hbase.security.UserProvider; 161import org.apache.hadoop.hbase.trace.TraceUtil; 162import org.apache.hadoop.hbase.util.Bytes; 163import org.apache.hadoop.hbase.util.CompressionTest; 164import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 165import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 166import org.apache.hadoop.hbase.util.FSUtils; 167import org.apache.hadoop.hbase.util.FutureUtils; 168import org.apache.hadoop.hbase.util.JvmPauseMonitor; 169import org.apache.hadoop.hbase.util.Pair; 170import org.apache.hadoop.hbase.util.RetryCounter; 171import org.apache.hadoop.hbase.util.RetryCounterFactory; 172import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 173import org.apache.hadoop.hbase.util.Threads; 174import org.apache.hadoop.hbase.util.VersionInfo; 175import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 176import org.apache.hadoop.hbase.wal.WAL; 177import org.apache.hadoop.hbase.wal.WALFactory; 178import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 179import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 180import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 181import org.apache.hadoop.hbase.zookeeper.ZKUtil; 182import org.apache.hadoop.ipc.RemoteException; 183import org.apache.hadoop.util.ReflectionUtils; 184import org.apache.yetus.audience.InterfaceAudience; 185import org.apache.zookeeper.KeeperException; 186import org.slf4j.Logger; 187import org.slf4j.LoggerFactory; 188 189import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 190import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 191import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 192import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 193import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 194import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses; 195import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 196import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 197import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 198import org.apache.hbase.thirdparty.com.google.protobuf.Message; 199import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 200import org.apache.hbase.thirdparty.com.google.protobuf.Service; 201import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 202import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 203import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 204 205import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 206import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 235 236/** 237 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There 238 * are many HRegionServers in a single HBase deployment. 239 */ 240@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 241@SuppressWarnings({ "deprecation" }) 242public class HRegionServer extends HBaseServerBase<RSRpcServices> 243 implements RegionServerServices, LastSequenceId { 244 245 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 246 247 int unitMB = 1024 * 1024; 248 int unitKB = 1024; 249 250 /** 251 * For testing only! Set to true to skip notifying region assignment to master . 252 */ 253 @InterfaceAudience.Private 254 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") 255 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 256 257 /** 258 * A map from RegionName to current action in progress. Boolean value indicates: true - if open 259 * region action in progress false - if close region action in progress 260 */ 261 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 262 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 263 264 /** 265 * Used to cache the open/close region procedures which already submitted. See 266 * {@link #submitRegionProcedure(long)}. 267 */ 268 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 269 /** 270 * Used to cache the open/close region procedures which already executed. See 271 * {@link #submitRegionProcedure(long)}. 272 */ 273 private final Cache<Long, Long> executedRegionProcedures = 274 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 275 276 /** 277 * Used to cache the moved-out regions 278 */ 279 private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder() 280 .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build(); 281 282 private MemStoreFlusher cacheFlusher; 283 284 private HeapMemoryManager hMemManager; 285 286 // Replication services. If no replication, this handler will be null. 287 private ReplicationSourceService replicationSourceHandler; 288 private ReplicationSinkService replicationSinkHandler; 289 private boolean sameReplicationSourceAndSink; 290 291 // Compactions 292 private CompactSplit compactSplitThread; 293 294 /** 295 * Map of regions currently being served by this region server. Key is the encoded region name. 296 * All access should be synchronized. 297 */ 298 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 299 /** 300 * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it 301 * need to be a ConcurrentHashMap? 302 */ 303 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 304 305 /** 306 * Map of encoded region names to the DataNode locations they should be hosted on We store the 307 * value as Address since InetSocketAddress is required by the HDFS API (create() that takes 308 * favored nodes as hints for placing file blocks). We could have used ServerName here as the 309 * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API 310 * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and 311 * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the 312 * conversion on demand from Address to InetSocketAddress will guarantee the resolution results 313 * will be fresh when we need it. 314 */ 315 private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 316 317 private LeaseManager leaseManager; 318 319 private volatile boolean dataFsOk; 320 321 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 322 // Default abort timeout is 1200 seconds for safe 323 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 324 // Will run this task when abort timeout 325 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 326 327 // A state before we go into stopped state. At this stage we're closing user 328 // space regions. 329 private boolean stopping = false; 330 private volatile boolean killed = false; 331 332 private final int threadWakeFrequency; 333 334 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 335 private final int compactionCheckFrequency; 336 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 337 private final int flushCheckFrequency; 338 339 // Stub to do region server status calls against the master. 340 private volatile RegionServerStatusService.BlockingInterface rssStub; 341 private volatile LockService.BlockingInterface lockStub; 342 // RPC client. Used to make the stub above that does region server status checking. 343 private RpcClient rpcClient; 344 345 private UncaughtExceptionHandler uncaughtExceptionHandler; 346 347 private JvmPauseMonitor pauseMonitor; 348 349 private RSSnapshotVerifier rsSnapshotVerifier; 350 351 /** region server process name */ 352 public static final String REGIONSERVER = "regionserver"; 353 354 private MetricsRegionServer metricsRegionServer; 355 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 356 357 /** 358 * Check for compactions requests. 359 */ 360 private ScheduledChore compactionChecker; 361 362 /** 363 * Check for flushes 364 */ 365 private ScheduledChore periodicFlusher; 366 367 private volatile WALFactory walFactory; 368 369 private LogRoller walRoller; 370 371 // A thread which calls reportProcedureDone 372 private RemoteProcedureResultReporter procedureResultReporter; 373 374 // flag set after we're done setting up server threads 375 final AtomicBoolean online = new AtomicBoolean(false); 376 377 // master address tracker 378 private final MasterAddressTracker masterAddressTracker; 379 380 // Log Splitting Worker 381 private SplitLogWorker splitLogWorker; 382 383 private final int shortOperationTimeout; 384 385 // Time to pause if master says 'please hold' 386 private final long retryPauseTime; 387 388 private final RegionServerAccounting regionServerAccounting; 389 390 private NamedQueueServiceChore namedQueueServiceChore = null; 391 392 // Block cache 393 private BlockCache blockCache; 394 // The cache for mob files 395 private MobFileCache mobFileCache; 396 397 /** The health check chore. */ 398 private HealthCheckChore healthCheckChore; 399 400 /** The Executor status collect chore. */ 401 private ExecutorStatusChore executorStatusChore; 402 403 /** The nonce manager chore. */ 404 private ScheduledChore nonceManagerChore; 405 406 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap(); 407 408 /** 409 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use 410 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead. 411 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a> 412 */ 413 @Deprecated 414 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 415 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 416 "hbase.regionserver.hostname.disable.master.reversedns"; 417 418 /** 419 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive. 420 * Exception will be thrown if both are used. 421 */ 422 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 423 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 424 "hbase.unsafe.regionserver.hostname.disable.master.reversedns"; 425 426 /** 427 * Unique identifier for the cluster we are a part of. 428 */ 429 private String clusterId; 430 431 // chore for refreshing store files for secondary regions 432 private StorefileRefresherChore storefileRefresher; 433 434 private volatile RegionServerCoprocessorHost rsHost; 435 436 private RegionServerProcedureManagerHost rspmHost; 437 438 private RegionServerRpcQuotaManager rsQuotaManager; 439 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 440 441 /** 442 * Nonce manager. Nonces are used to make operations like increment and append idempotent in the 443 * case where client doesn't receive the response from a successful operation and retries. We 444 * track the successful ops for some time via a nonce sent by client and handle duplicate 445 * operations (currently, by failing them; in future we might use MVCC to return result). Nonces 446 * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL 447 * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past 448 * records. If we don't read the records, we don't read and recover the nonces. Some WALs within 449 * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL 450 * recovery during normal region move, so nonces will not be transfered. We can have separate 451 * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path 452 * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a 453 * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part 454 * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't 455 * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be 456 * recovered during move. 457 */ 458 final ServerNonceManager nonceManager; 459 460 private BrokenStoreFileCleaner brokenStoreFileCleaner; 461 462 private RSMobFileCleanerChore rsMobFileCleanerChore; 463 464 @InterfaceAudience.Private 465 CompactedHFilesDischarger compactedFileDischarger; 466 467 private volatile ThroughputController flushThroughputController; 468 469 private SecureBulkLoadManager secureBulkLoadManager; 470 471 private FileSystemUtilizationChore fsUtilizationChore; 472 473 private BootstrapNodeManager bootstrapNodeManager; 474 475 /** 476 * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to 477 * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing 478 * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a 479 * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace 480 * {@link #TEST_SKIP_REPORTING_TRANSITION} ? 481 */ 482 private final boolean masterless; 483 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 484 485 /** regionserver codec list **/ 486 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 487 488 // A timer to shutdown the process if abort takes too long 489 private Timer abortMonitor; 490 491 private RegionReplicationBufferManager regionReplicationBufferManager; 492 493 /* 494 * Chore that creates replication marker rows. 495 */ 496 private ReplicationMarkerChore replicationMarkerChore; 497 498 // A timer submit requests to the PrefetchExecutor 499 private PrefetchExecutorNotifier prefetchExecutorNotifier; 500 501 /** 502 * Starts a HRegionServer at the default location. 503 * <p/> 504 * Don't start any services or managers in here in the Constructor. Defer till after we register 505 * with the Master as much as possible. See {@link #startServices}. 506 */ 507 public HRegionServer(final Configuration conf) throws IOException { 508 super(conf, "RegionServer"); // thread name 509 final Span span = TraceUtil.createSpan("HRegionServer.cxtor"); 510 try (Scope ignored = span.makeCurrent()) { 511 this.dataFsOk = true; 512 this.masterless = !clusterMode(); 513 MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf); 514 HFile.checkHFileVersion(this.conf); 515 checkCodecs(this.conf); 516 FSUtils.setupShortCircuitRead(this.conf); 517 518 // Disable usage of meta replicas in the regionserver 519 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 520 // Config'ed params 521 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 522 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 523 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 524 525 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 526 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 527 528 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 529 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 530 531 this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME, 532 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); 533 534 regionServerAccounting = new RegionServerAccounting(conf); 535 536 blockCache = BlockCacheFactory.createBlockCache(conf); 537 // The call below, instantiates the DataTieringManager only when 538 // the configuration "hbase.regionserver.datatiering.enable" is set to true. 539 DataTieringManager.instantiate(conf, onlineRegions); 540 541 mobFileCache = new MobFileCache(conf); 542 543 rsSnapshotVerifier = new RSSnapshotVerifier(conf); 544 545 uncaughtExceptionHandler = 546 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 547 548 // If no master in cluster, skip trying to track one or look for a cluster status. 549 if (!this.masterless) { 550 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 551 masterAddressTracker.start(); 552 } else { 553 masterAddressTracker = null; 554 } 555 this.rpcServices.start(zooKeeper); 556 span.setStatus(StatusCode.OK); 557 } catch (Throwable t) { 558 // Make sure we log the exception. HRegionServer is often started via reflection and the 559 // cause of failed startup is lost. 560 TraceUtil.setError(span, t); 561 LOG.error("Failed construction RegionServer", t); 562 throw t; 563 } finally { 564 span.end(); 565 } 566 } 567 568 // HMaster should override this method to load the specific config for master 569 @Override 570 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 571 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); 572 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 573 if (!StringUtils.isBlank(hostname)) { 574 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " 575 + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " 576 + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " 577 + UNSAFE_RS_HOSTNAME_KEY + " is used"; 578 throw new IOException(msg); 579 } else { 580 return rpcServices.getSocketAddress().getHostName(); 581 } 582 } else { 583 return hostname; 584 } 585 } 586 587 @Override 588 protected void login(UserProvider user, String host) throws IOException { 589 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 590 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 591 } 592 593 @Override 594 protected String getProcessName() { 595 return REGIONSERVER; 596 } 597 598 @Override 599 protected RegionServerCoprocessorHost getCoprocessorHost() { 600 return getRegionServerCoprocessorHost(); 601 } 602 603 @Override 604 protected boolean canCreateBaseZNode() { 605 return !clusterMode(); 606 } 607 608 @Override 609 protected boolean canUpdateTableDescriptor() { 610 return false; 611 } 612 613 @Override 614 protected boolean cacheTableDescriptor() { 615 return false; 616 } 617 618 protected RSRpcServices createRpcServices() throws IOException { 619 return new RSRpcServices(this); 620 } 621 622 @Override 623 protected void configureInfoServer(InfoServer infoServer) { 624 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 625 infoServer.setAttribute(REGIONSERVER, this); 626 } 627 628 @Override 629 protected Class<? extends HttpServlet> getDumpServlet() { 630 return RSDumpServlet.class; 631 } 632 633 /** 634 * Used by {@link RSDumpServlet} to generate debugging information. 635 */ 636 public void dumpRowLocks(final PrintWriter out) { 637 StringBuilder sb = new StringBuilder(); 638 for (HRegion region : getRegions()) { 639 if (region.getLockedRows().size() > 0) { 640 for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) { 641 sb.setLength(0); 642 sb.append(region.getTableDescriptor().getTableName()).append(",") 643 .append(region.getRegionInfo().getEncodedName()).append(","); 644 sb.append(rowLockContext.toString()); 645 out.println(sb); 646 } 647 } 648 } 649 } 650 651 @Override 652 public boolean registerService(Service instance) { 653 // No stacking of instances is allowed for a single executorService name 654 ServiceDescriptor serviceDesc = instance.getDescriptorForType(); 655 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 656 if (coprocessorServiceHandlers.containsKey(serviceName)) { 657 LOG.error("Coprocessor executorService " + serviceName 658 + " already registered, rejecting request from " + instance); 659 return false; 660 } 661 662 coprocessorServiceHandlers.put(serviceName, instance); 663 if (LOG.isDebugEnabled()) { 664 LOG.debug( 665 "Registered regionserver coprocessor executorService: executorService=" + serviceName); 666 } 667 return true; 668 } 669 670 /** 671 * Run test on configured codecs to make sure supporting libs are in place. 672 */ 673 private static void checkCodecs(final Configuration c) throws IOException { 674 // check to see if the codec list is available: 675 String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null); 676 if (codecs == null) { 677 return; 678 } 679 for (String codec : codecs) { 680 if (!CompressionTest.testCompression(codec)) { 681 throw new IOException( 682 "Compression codec " + codec + " not supported, aborting RS construction"); 683 } 684 } 685 } 686 687 public String getClusterId() { 688 return this.clusterId; 689 } 690 691 /** 692 * All initialization needed before we go register with Master.<br> 693 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 694 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 695 */ 696 private void preRegistrationInitialization() { 697 final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization"); 698 try (Scope ignored = span.makeCurrent()) { 699 initializeZooKeeper(); 700 setupClusterConnection(); 701 bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker); 702 regionReplicationBufferManager = new RegionReplicationBufferManager(this); 703 // Setup RPC client for master communication 704 this.rpcClient = asyncClusterConnection.getRpcClient(); 705 span.setStatus(StatusCode.OK); 706 } catch (Throwable t) { 707 // Call stop if error or process will stick around for ever since server 708 // puts up non-daemon threads. 709 TraceUtil.setError(span, t); 710 this.rpcServices.stop(); 711 abort("Initialization of RS failed. Hence aborting RS.", t); 712 } finally { 713 span.end(); 714 } 715 } 716 717 /** 718 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 719 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 720 * <p> 721 * Finally open long-living server short-circuit connection. 722 */ 723 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 724 justification = "cluster Id znode read would give us correct response") 725 private void initializeZooKeeper() throws IOException, InterruptedException { 726 // Nothing to do in here if no Master in the mix. 727 if (this.masterless) { 728 return; 729 } 730 731 // Create the master address tracker, register with zk, and start it. Then 732 // block until a master is available. No point in starting up if no master 733 // running. 734 blockAndCheckIfStopped(this.masterAddressTracker); 735 736 // Wait on cluster being up. Master will set this flag up in zookeeper 737 // when ready. 738 blockAndCheckIfStopped(this.clusterStatusTracker); 739 740 // If we are HMaster then the cluster id should have already been set. 741 if (clusterId == null) { 742 // Retrieve clusterId 743 // Since cluster status is now up 744 // ID should have already been set by HMaster 745 try { 746 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 747 if (clusterId == null) { 748 this.abort("Cluster ID has not been set"); 749 } 750 LOG.info("ClusterId : " + clusterId); 751 } catch (KeeperException e) { 752 this.abort("Failed to retrieve Cluster ID", e); 753 } 754 } 755 756 if (isStopped() || isAborted()) { 757 return; // No need for further initialization 758 } 759 760 // watch for snapshots and other procedures 761 try { 762 rspmHost = new RegionServerProcedureManagerHost(); 763 rspmHost.loadProcedures(conf); 764 rspmHost.initialize(this); 765 } catch (KeeperException e) { 766 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 767 } 768 } 769 770 /** 771 * Utilty method to wait indefinitely on a znode availability while checking if the region server 772 * is shut down 773 * @param tracker znode tracker to use 774 * @throws IOException any IO exception, plus if the RS is stopped 775 * @throws InterruptedException if the waiting thread is interrupted 776 */ 777 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 778 throws IOException, InterruptedException { 779 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 780 if (this.stopped) { 781 throw new IOException("Received the shutdown message while waiting."); 782 } 783 } 784 } 785 786 /** Returns True if the cluster is up. */ 787 @Override 788 public boolean isClusterUp() { 789 return this.masterless 790 || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 791 } 792 793 private void initializeReplicationMarkerChore() { 794 boolean replicationMarkerEnabled = 795 conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); 796 // If replication or replication marker is not enabled then return immediately. 797 if (replicationMarkerEnabled) { 798 int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY, 799 REPLICATION_MARKER_CHORE_DURATION_DEFAULT); 800 replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf); 801 } 802 } 803 804 @Override 805 public boolean isStopping() { 806 return stopping; 807 } 808 809 /** 810 * The HRegionServer sticks in this loop until closed. 811 */ 812 @Override 813 public void run() { 814 if (isStopped()) { 815 LOG.info("Skipping run; stopped"); 816 return; 817 } 818 try { 819 // Do pre-registration initializations; zookeeper, lease threads, etc. 820 preRegistrationInitialization(); 821 } catch (Throwable e) { 822 abort("Fatal exception during initialization", e); 823 } 824 825 try { 826 if (!isStopped() && !isAborted()) { 827 installShutdownHook(); 828 // Initialize the RegionServerCoprocessorHost now that our ephemeral 829 // node was created, in case any coprocessors want to use ZooKeeper 830 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 831 832 // Try and register with the Master; tell it we are here. Break if server is stopped or 833 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 834 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 835 // come up. 836 LOG.debug("About to register with Master."); 837 TraceUtil.trace(() -> { 838 RetryCounterFactory rcf = 839 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 840 RetryCounter rc = rcf.create(); 841 while (keepLooping()) { 842 RegionServerStartupResponse w = reportForDuty(); 843 if (w == null) { 844 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 845 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 846 this.sleeper.sleep(sleepTime); 847 } else { 848 handleReportForDutyResponse(w); 849 break; 850 } 851 } 852 }, "HRegionServer.registerWithMaster"); 853 } 854 855 if (!isStopped() && isHealthy()) { 856 TraceUtil.trace(() -> { 857 // start the snapshot handler and other procedure handlers, 858 // since the server is ready to run 859 if (this.rspmHost != null) { 860 this.rspmHost.start(); 861 } 862 // Start the Quota Manager 863 if (this.rsQuotaManager != null) { 864 rsQuotaManager.start(getRpcServer().getScheduler()); 865 } 866 if (this.rsSpaceQuotaManager != null) { 867 this.rsSpaceQuotaManager.start(); 868 } 869 }, "HRegionServer.startup"); 870 } 871 872 // We registered with the Master. Go into run mode. 873 long lastMsg = EnvironmentEdgeManager.currentTime(); 874 long oldRequestCount = -1; 875 // The main run loop. 876 while (!isStopped() && isHealthy()) { 877 if (!isClusterUp()) { 878 if (onlineRegions.isEmpty()) { 879 stop("Exiting; cluster shutdown set and not carrying any regions"); 880 } else if (!this.stopping) { 881 this.stopping = true; 882 LOG.info("Closing user regions"); 883 closeUserRegions(isAborted()); 884 } else { 885 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 886 if (allUserRegionsOffline) { 887 // Set stopped if no more write requests tp meta tables 888 // since last time we went around the loop. Any open 889 // meta regions will be closed on our way out. 890 if (oldRequestCount == getWriteRequestCount()) { 891 stop("Stopped; only catalog regions remaining online"); 892 break; 893 } 894 oldRequestCount = getWriteRequestCount(); 895 } else { 896 // Make sure all regions have been closed -- some regions may 897 // have not got it because we were splitting at the time of 898 // the call to closeUserRegions. 899 closeUserRegions(this.abortRequested.get()); 900 } 901 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 902 } 903 } 904 long now = EnvironmentEdgeManager.currentTime(); 905 if ((now - lastMsg) >= msgInterval) { 906 tryRegionServerReport(lastMsg, now); 907 lastMsg = EnvironmentEdgeManager.currentTime(); 908 } 909 if (!isStopped() && !isAborted()) { 910 this.sleeper.sleep(); 911 } 912 } // for 913 } catch (Throwable t) { 914 if (!rpcServices.checkOOME(t)) { 915 String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: "; 916 abort(prefix + t.getMessage(), t); 917 } 918 } 919 920 final Span span = TraceUtil.createSpan("HRegionServer exiting main loop"); 921 try (Scope ignored = span.makeCurrent()) { 922 if (this.leaseManager != null) { 923 this.leaseManager.closeAfterLeasesExpire(); 924 } 925 if (this.splitLogWorker != null) { 926 splitLogWorker.stop(); 927 } 928 stopInfoServer(); 929 // Send cache a shutdown. 930 if (blockCache != null) { 931 blockCache.shutdown(); 932 } 933 if (mobFileCache != null) { 934 mobFileCache.shutdown(); 935 } 936 937 // Send interrupts to wake up threads if sleeping so they notice shutdown. 938 // TODO: Should we check they are alive? If OOME could have exited already 939 if (this.hMemManager != null) { 940 this.hMemManager.stop(); 941 } 942 if (this.cacheFlusher != null) { 943 this.cacheFlusher.interruptIfNecessary(); 944 } 945 if (this.compactSplitThread != null) { 946 this.compactSplitThread.interruptIfNecessary(); 947 } 948 949 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 950 if (rspmHost != null) { 951 rspmHost.stop(this.abortRequested.get() || this.killed); 952 } 953 954 if (this.killed) { 955 // Just skip out w/o closing regions. Used when testing. 956 } else if (abortRequested.get()) { 957 if (this.dataFsOk) { 958 closeUserRegions(abortRequested.get()); // Don't leave any open file handles 959 } 960 LOG.info("aborting server " + this.serverName); 961 } else { 962 closeUserRegions(abortRequested.get()); 963 LOG.info("stopping server " + this.serverName); 964 } 965 regionReplicationBufferManager.stop(); 966 closeClusterConnection(); 967 // Closing the compactSplit thread before closing meta regions 968 if (!this.killed && containsMetaTableRegions()) { 969 if (!abortRequested.get() || this.dataFsOk) { 970 if (this.compactSplitThread != null) { 971 this.compactSplitThread.join(); 972 this.compactSplitThread = null; 973 } 974 closeMetaTableRegions(abortRequested.get()); 975 } 976 } 977 978 if (!this.killed && this.dataFsOk) { 979 waitOnAllRegionsToClose(abortRequested.get()); 980 LOG.info("stopping server " + this.serverName + "; all regions closed."); 981 } 982 983 // Stop the quota manager 984 if (rsQuotaManager != null) { 985 rsQuotaManager.stop(); 986 } 987 if (rsSpaceQuotaManager != null) { 988 rsSpaceQuotaManager.stop(); 989 rsSpaceQuotaManager = null; 990 } 991 992 // flag may be changed when closing regions throws exception. 993 if (this.dataFsOk) { 994 shutdownWAL(!abortRequested.get()); 995 } 996 997 // Make sure the proxy is down. 998 if (this.rssStub != null) { 999 this.rssStub = null; 1000 } 1001 if (this.lockStub != null) { 1002 this.lockStub = null; 1003 } 1004 if (this.rpcClient != null) { 1005 this.rpcClient.close(); 1006 } 1007 if (this.leaseManager != null) { 1008 this.leaseManager.close(); 1009 } 1010 if (this.pauseMonitor != null) { 1011 this.pauseMonitor.stop(); 1012 } 1013 1014 if (!killed) { 1015 stopServiceThreads(); 1016 } 1017 1018 if (this.rpcServices != null) { 1019 this.rpcServices.stop(); 1020 } 1021 1022 try { 1023 deleteMyEphemeralNode(); 1024 } catch (KeeperException.NoNodeException nn) { 1025 // pass 1026 } catch (KeeperException e) { 1027 LOG.warn("Failed deleting my ephemeral node", e); 1028 } 1029 // We may have failed to delete the znode at the previous step, but 1030 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1031 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1032 1033 closeZooKeeper(); 1034 closeTableDescriptors(); 1035 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1036 span.setStatus(StatusCode.OK); 1037 } finally { 1038 span.end(); 1039 } 1040 } 1041 1042 private boolean containsMetaTableRegions() { 1043 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1044 } 1045 1046 private boolean areAllUserRegionsOffline() { 1047 if (getNumberOfOnlineRegions() > 2) { 1048 return false; 1049 } 1050 boolean allUserRegionsOffline = true; 1051 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1052 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1053 allUserRegionsOffline = false; 1054 break; 1055 } 1056 } 1057 return allUserRegionsOffline; 1058 } 1059 1060 /** Returns Current write count for all online regions. */ 1061 private long getWriteRequestCount() { 1062 long writeCount = 0; 1063 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1064 writeCount += e.getValue().getWriteRequestsCount(); 1065 } 1066 return writeCount; 1067 } 1068 1069 @InterfaceAudience.Private 1070 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1071 throws IOException { 1072 RegionServerStatusService.BlockingInterface rss = rssStub; 1073 if (rss == null) { 1074 // the current server could be stopping. 1075 return; 1076 } 1077 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1078 final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport"); 1079 try (Scope ignored = span.makeCurrent()) { 1080 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1081 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1082 request.setLoad(sl); 1083 rss.regionServerReport(null, request.build()); 1084 span.setStatus(StatusCode.OK); 1085 } catch (ServiceException se) { 1086 IOException ioe = ProtobufUtil.getRemoteException(se); 1087 if (ioe instanceof YouAreDeadException) { 1088 // This will be caught and handled as a fatal error in run() 1089 TraceUtil.setError(span, ioe); 1090 throw ioe; 1091 } 1092 if (rssStub == rss) { 1093 rssStub = null; 1094 } 1095 TraceUtil.setError(span, se); 1096 // Couldn't connect to the master, get location from zk and reconnect 1097 // Method blocks until new master is found or we are stopped 1098 createRegionServerStatusStub(true); 1099 } finally { 1100 span.end(); 1101 } 1102 } 1103 1104 /** 1105 * Reports the given map of Regions and their size on the filesystem to the active Master. 1106 * @param regionSizeStore The store containing region sizes 1107 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1108 */ 1109 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1110 RegionServerStatusService.BlockingInterface rss = rssStub; 1111 if (rss == null) { 1112 // the current server could be stopping. 1113 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1114 return true; 1115 } 1116 try { 1117 buildReportAndSend(rss, regionSizeStore); 1118 } catch (ServiceException se) { 1119 IOException ioe = ProtobufUtil.getRemoteException(se); 1120 if (ioe instanceof PleaseHoldException) { 1121 LOG.trace("Failed to report region sizes to Master because it is initializing." 1122 + " This will be retried.", ioe); 1123 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1124 return true; 1125 } 1126 if (rssStub == rss) { 1127 rssStub = null; 1128 } 1129 createRegionServerStatusStub(true); 1130 if (ioe instanceof DoNotRetryIOException) { 1131 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1132 if (doNotRetryEx.getCause() != null) { 1133 Throwable t = doNotRetryEx.getCause(); 1134 if (t instanceof UnsupportedOperationException) { 1135 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1136 return false; 1137 } 1138 } 1139 } 1140 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1141 } 1142 return true; 1143 } 1144 1145 /** 1146 * Builds the region size report and sends it to the master. Upon successful sending of the 1147 * report, the region sizes that were sent are marked as sent. 1148 * @param rss The stub to send to the Master 1149 * @param regionSizeStore The store containing region sizes 1150 */ 1151 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1152 RegionSizeStore regionSizeStore) throws ServiceException { 1153 RegionSpaceUseReportRequest request = 1154 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1155 rss.reportRegionSpaceUse(null, request); 1156 // Record the number of size reports sent 1157 if (metricsRegionServer != null) { 1158 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1159 } 1160 } 1161 1162 /** 1163 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1164 * @param regionSizes The size in bytes of regions 1165 * @return The corresponding protocol buffer message. 1166 */ 1167 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1168 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1169 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1170 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1171 } 1172 return request.build(); 1173 } 1174 1175 /** 1176 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf 1177 * message. 1178 * @param regionInfo The RegionInfo 1179 * @param sizeInBytes The size in bytes of the Region 1180 * @return The protocol buffer 1181 */ 1182 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1183 return RegionSpaceUse.newBuilder() 1184 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1185 .setRegionSize(Objects.requireNonNull(sizeInBytes)).build(); 1186 } 1187 1188 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1189 throws IOException { 1190 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1191 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1192 // the wrapper to compute those numbers in one place. 1193 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1194 // Instead they should be stored in an HBase table so that external visibility into HBase is 1195 // improved; Additionally the load balancer will be able to take advantage of a more complete 1196 // history. 1197 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1198 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1199 long usedMemory = -1L; 1200 long maxMemory = -1L; 1201 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1202 if (usage != null) { 1203 usedMemory = usage.getUsed(); 1204 maxMemory = usage.getMax(); 1205 } 1206 1207 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1208 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1209 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1210 serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024)); 1211 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1212 serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount()); 1213 serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount()); 1214 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1215 Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder(); 1216 for (String coprocessor : coprocessors) { 1217 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1218 } 1219 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1220 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1221 for (HRegion region : regions) { 1222 if (region.getCoprocessorHost() != null) { 1223 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1224 for (String regionCoprocessor : regionCoprocessors) { 1225 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1226 } 1227 } 1228 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1229 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1230 .getCoprocessors()) { 1231 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1232 } 1233 } 1234 1235 getBlockCache().ifPresent(cache -> { 1236 cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> { 1237 regionCachedInfo.forEach((regionName, prefetchSize) -> { 1238 serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB)); 1239 }); 1240 }); 1241 }); 1242 1243 serverLoad.setReportStartTime(reportStartTime); 1244 serverLoad.setReportEndTime(reportEndTime); 1245 if (this.infoServer != null) { 1246 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1247 } else { 1248 serverLoad.setInfoServerPort(-1); 1249 } 1250 MetricsUserAggregateSource userSource = 1251 metricsRegionServer.getMetricsUserAggregate().getSource(); 1252 if (userSource != null) { 1253 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1254 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1255 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1256 } 1257 } 1258 1259 if (sameReplicationSourceAndSink && replicationSourceHandler != null) { 1260 // always refresh first to get the latest value 1261 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1262 if (rLoad != null) { 1263 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1264 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1265 .getReplicationLoadSourceEntries()) { 1266 serverLoad.addReplLoadSource(rLS); 1267 } 1268 } 1269 } else { 1270 if (replicationSourceHandler != null) { 1271 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1272 if (rLoad != null) { 1273 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1274 .getReplicationLoadSourceEntries()) { 1275 serverLoad.addReplLoadSource(rLS); 1276 } 1277 } 1278 } 1279 if (replicationSinkHandler != null) { 1280 ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad(); 1281 if (rLoad != null) { 1282 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1283 } 1284 } 1285 } 1286 1287 TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask 1288 .newBuilder().setDescription(task.getDescription()) 1289 .setStatus(task.getStatus() != null ? task.getStatus() : "") 1290 .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) 1291 .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build())); 1292 1293 return serverLoad.build(); 1294 } 1295 1296 private String getOnlineRegionsAsPrintableString() { 1297 StringBuilder sb = new StringBuilder(); 1298 for (Region r : this.onlineRegions.values()) { 1299 if (sb.length() > 0) { 1300 sb.append(", "); 1301 } 1302 sb.append(r.getRegionInfo().getEncodedName()); 1303 } 1304 return sb.toString(); 1305 } 1306 1307 /** 1308 * Wait on regions close. 1309 */ 1310 private void waitOnAllRegionsToClose(final boolean abort) { 1311 // Wait till all regions are closed before going out. 1312 int lastCount = -1; 1313 long previousLogTime = 0; 1314 Set<String> closedRegions = new HashSet<>(); 1315 boolean interrupted = false; 1316 try { 1317 while (!onlineRegions.isEmpty()) { 1318 int count = getNumberOfOnlineRegions(); 1319 // Only print a message if the count of regions has changed. 1320 if (count != lastCount) { 1321 // Log every second at most 1322 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 1323 previousLogTime = EnvironmentEdgeManager.currentTime(); 1324 lastCount = count; 1325 LOG.info("Waiting on " + count + " regions to close"); 1326 // Only print out regions still closing if a small number else will 1327 // swamp the log. 1328 if (count < 10 && LOG.isDebugEnabled()) { 1329 LOG.debug("Online Regions=" + this.onlineRegions); 1330 } 1331 } 1332 } 1333 // Ensure all user regions have been sent a close. Use this to 1334 // protect against the case where an open comes in after we start the 1335 // iterator of onlineRegions to close all user regions. 1336 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1337 RegionInfo hri = e.getValue().getRegionInfo(); 1338 if ( 1339 !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) 1340 && !closedRegions.contains(hri.getEncodedName()) 1341 ) { 1342 closedRegions.add(hri.getEncodedName()); 1343 // Don't update zk with this close transition; pass false. 1344 closeRegionIgnoreErrors(hri, abort); 1345 } 1346 } 1347 // No regions in RIT, we could stop waiting now. 1348 if (this.regionsInTransitionInRS.isEmpty()) { 1349 if (!onlineRegions.isEmpty()) { 1350 LOG.info("We were exiting though online regions are not empty," 1351 + " because some regions failed closing"); 1352 } 1353 break; 1354 } else { 1355 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream() 1356 .map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1357 } 1358 if (sleepInterrupted(200)) { 1359 interrupted = true; 1360 } 1361 } 1362 } finally { 1363 if (interrupted) { 1364 Thread.currentThread().interrupt(); 1365 } 1366 } 1367 } 1368 1369 private static boolean sleepInterrupted(long millis) { 1370 boolean interrupted = false; 1371 try { 1372 Thread.sleep(millis); 1373 } catch (InterruptedException e) { 1374 LOG.warn("Interrupted while sleeping"); 1375 interrupted = true; 1376 } 1377 return interrupted; 1378 } 1379 1380 private void shutdownWAL(final boolean close) { 1381 if (this.walFactory != null) { 1382 try { 1383 if (close) { 1384 walFactory.close(); 1385 } else { 1386 walFactory.shutdown(); 1387 } 1388 } catch (Throwable e) { 1389 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1390 LOG.error("Shutdown / close of WAL failed: " + e); 1391 LOG.debug("Shutdown / close exception details:", e); 1392 } 1393 } 1394 } 1395 1396 /** 1397 * Run init. Sets up wal and starts up all server threads. 1398 * @param c Extra configuration. 1399 */ 1400 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1401 throws IOException { 1402 try { 1403 boolean updateRootDir = false; 1404 for (NameStringPair e : c.getMapEntriesList()) { 1405 String key = e.getName(); 1406 // The hostname the master sees us as. 1407 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1408 String hostnameFromMasterPOV = e.getValue(); 1409 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, 1410 rpcServices.getSocketAddress().getPort(), this.startcode); 1411 String expectedHostName = rpcServices.getSocketAddress().getHostName(); 1412 // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it 1413 // is set to disable. so we will use the ip of the RegionServer to compare with the 1414 // hostname passed by the Master, see HBASE-27304 for details. 1415 if ( 1416 StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent() 1417 && InetAddresses.isInetAddress(getActiveMaster().get().getHostname()) 1418 ) { 1419 expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress(); 1420 } 1421 boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead) 1422 ? hostnameFromMasterPOV.equals(expectedHostName) 1423 : hostnameFromMasterPOV.equals(useThisHostnameInstead); 1424 if (!isHostnameConsist) { 1425 String msg = "Master passed us a different hostname to use; was=" 1426 + (StringUtils.isBlank(useThisHostnameInstead) 1427 ? expectedHostName 1428 : this.useThisHostnameInstead) 1429 + ", but now=" + hostnameFromMasterPOV; 1430 LOG.error(msg); 1431 throw new IOException(msg); 1432 } 1433 continue; 1434 } 1435 1436 String value = e.getValue(); 1437 if (key.equals(HConstants.HBASE_DIR)) { 1438 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1439 updateRootDir = true; 1440 } 1441 } 1442 1443 if (LOG.isDebugEnabled()) { 1444 LOG.debug("Config from master: " + key + "=" + value); 1445 } 1446 this.conf.set(key, value); 1447 } 1448 // Set our ephemeral znode up in zookeeper now we have a name. 1449 createMyEphemeralNode(); 1450 1451 if (updateRootDir) { 1452 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1453 initializeFileSystem(); 1454 } 1455 1456 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1457 // config param for task trackers, but we can piggyback off of it. 1458 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1459 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1460 } 1461 1462 // Save it in a file, this will allow to see if we crash 1463 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1464 1465 // This call sets up an initialized replication and WAL. Later we start it up. 1466 setupWALAndReplication(); 1467 // Init in here rather than in constructor after thread name has been set 1468 final MetricsTable metricsTable = 1469 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1470 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1471 this.metricsRegionServer = 1472 new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable); 1473 // Now that we have a metrics source, start the pause monitor 1474 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1475 pauseMonitor.start(); 1476 1477 // There is a rare case where we do NOT want services to start. Check config. 1478 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1479 startServices(); 1480 } 1481 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1482 // or make sense of it. 1483 startReplicationService(); 1484 1485 // Set up ZK 1486 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress() 1487 + ", sessionid=0x" 1488 + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1489 1490 // Wake up anyone waiting for this server to online 1491 synchronized (online) { 1492 online.set(true); 1493 online.notifyAll(); 1494 } 1495 } catch (Throwable e) { 1496 stop("Failed initialization"); 1497 throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed"); 1498 } finally { 1499 sleeper.skipSleepCycle(); 1500 } 1501 } 1502 1503 private void startHeapMemoryManager() { 1504 if (this.blockCache != null) { 1505 this.hMemManager = 1506 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1507 this.hMemManager.start(getChoreService()); 1508 } 1509 } 1510 1511 private void createMyEphemeralNode() throws KeeperException { 1512 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1513 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1514 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1515 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1516 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1517 } 1518 1519 private void deleteMyEphemeralNode() throws KeeperException { 1520 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1521 } 1522 1523 @Override 1524 public RegionServerAccounting getRegionServerAccounting() { 1525 return regionServerAccounting; 1526 } 1527 1528 // Round the size with KB or MB. 1529 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1 1530 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See 1531 // HBASE-26340 for more details on why this is important. 1532 private static int roundSize(long sizeInByte, int sizeUnit) { 1533 if (sizeInByte == 0) { 1534 return 0; 1535 } else if (sizeInByte < sizeUnit) { 1536 return 1; 1537 } else { 1538 return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE); 1539 } 1540 } 1541 1542 private void computeIfPersistentBucketCache(Consumer<BucketCache> computation) { 1543 if (blockCache instanceof CombinedBlockCache) { 1544 BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache(); 1545 if (l2 instanceof BucketCache && ((BucketCache) l2).isCachePersistent()) { 1546 computation.accept((BucketCache) l2); 1547 } 1548 } 1549 } 1550 1551 /** 1552 * @param r Region to get RegionLoad for. 1553 * @param regionLoadBldr the RegionLoad.Builder, can be null 1554 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1555 * @return RegionLoad instance. 1556 */ 1557 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1558 RegionSpecifier.Builder regionSpecifier) throws IOException { 1559 byte[] name = r.getRegionInfo().getRegionName(); 1560 String regionEncodedName = r.getRegionInfo().getEncodedName(); 1561 int stores = 0; 1562 int storefiles = 0; 1563 int storeRefCount = 0; 1564 int maxCompactedStoreFileRefCount = 0; 1565 long storeUncompressedSize = 0L; 1566 long storefileSize = 0L; 1567 long storefileIndexSize = 0L; 1568 long rootLevelIndexSize = 0L; 1569 long totalStaticIndexSize = 0L; 1570 long totalStaticBloomSize = 0L; 1571 long totalCompactingKVs = 0L; 1572 long currentCompactedKVs = 0L; 1573 long totalRegionSize = 0L; 1574 List<HStore> storeList = r.getStores(); 1575 stores += storeList.size(); 1576 for (HStore store : storeList) { 1577 storefiles += store.getStorefilesCount(); 1578 int currentStoreRefCount = store.getStoreRefCount(); 1579 storeRefCount += currentStoreRefCount; 1580 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1581 maxCompactedStoreFileRefCount = 1582 Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); 1583 storeUncompressedSize += store.getStoreSizeUncompressed(); 1584 storefileSize += store.getStorefilesSize(); 1585 totalRegionSize += store.getHFilesSize(); 1586 // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1587 storefileIndexSize += store.getStorefilesRootLevelIndexSize(); 1588 CompactionProgress progress = store.getCompactionProgress(); 1589 if (progress != null) { 1590 totalCompactingKVs += progress.getTotalCompactingKVs(); 1591 currentCompactedKVs += progress.currentCompactedKVs; 1592 } 1593 rootLevelIndexSize += store.getStorefilesRootLevelIndexSize(); 1594 totalStaticIndexSize += store.getTotalStaticIndexSize(); 1595 totalStaticBloomSize += store.getTotalStaticBloomSize(); 1596 } 1597 1598 int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); 1599 int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); 1600 int storefileSizeMB = roundSize(storefileSize, unitMB); 1601 int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB); 1602 int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); 1603 int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); 1604 int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); 1605 int regionSizeMB = roundSize(totalRegionSize, unitMB); 1606 final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f); 1607 getBlockCache().ifPresent(bc -> { 1608 bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> { 1609 if (regionCachedInfo.containsKey(regionEncodedName)) { 1610 currentRegionCachedRatio.setValue(regionSizeMB == 0 1611 ? 0.0f 1612 : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB); 1613 } 1614 }); 1615 }); 1616 1617 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); 1618 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); 1619 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname()); 1620 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); 1621 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); 1622 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); 1623 if (regionLoadBldr == null) { 1624 regionLoadBldr = RegionLoad.newBuilder(); 1625 } 1626 if (regionSpecifier == null) { 1627 regionSpecifier = RegionSpecifier.newBuilder(); 1628 } 1629 1630 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1631 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1632 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores) 1633 .setStorefiles(storefiles).setStoreRefCount(storeRefCount) 1634 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1635 .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB) 1636 .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB) 1637 .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1638 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1639 .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount()) 1640 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1641 .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs) 1642 .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality) 1643 .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) 1644 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) 1645 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) 1646 .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB) 1647 .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue()); 1648 r.setCompleteSequenceId(regionLoadBldr); 1649 return regionLoadBldr.build(); 1650 } 1651 1652 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1653 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1654 userLoadBldr.setUserName(user); 1655 userSource.getClientMetrics().values().stream() 1656 .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1657 .setHostName(clientMetrics.getHostName()) 1658 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1659 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1660 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) 1661 .forEach(userLoadBldr::addClientMetrics); 1662 return userLoadBldr.build(); 1663 } 1664 1665 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1666 HRegion r = onlineRegions.get(encodedRegionName); 1667 return r != null ? createRegionLoad(r, null, null) : null; 1668 } 1669 1670 /** 1671 * Inner class that runs on a long period checking if regions need compaction. 1672 */ 1673 private static class CompactionChecker extends ScheduledChore { 1674 private final HRegionServer instance; 1675 private final int majorCompactPriority; 1676 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1677 // Iteration is 1-based rather than 0-based so we don't check for compaction 1678 // immediately upon region server startup 1679 private long iteration = 1; 1680 1681 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1682 super("CompactionChecker", stopper, sleepTime); 1683 this.instance = h; 1684 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1685 1686 /* 1687 * MajorCompactPriority is configurable. If not set, the compaction will use default priority. 1688 */ 1689 this.majorCompactPriority = this.instance.conf 1690 .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); 1691 } 1692 1693 @Override 1694 protected void chore() { 1695 for (HRegion hr : this.instance.onlineRegions.values()) { 1696 // If region is read only or compaction is disabled at table level, there's no need to 1697 // iterate through region's stores 1698 if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) { 1699 continue; 1700 } 1701 1702 for (HStore s : hr.stores.values()) { 1703 try { 1704 long multiplier = s.getCompactionCheckMultiplier(); 1705 assert multiplier > 0; 1706 if (iteration % multiplier != 0) { 1707 continue; 1708 } 1709 if (s.needsCompaction()) { 1710 // Queue a compaction. Will recognize if major is needed. 1711 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1712 getName() + " requests compaction"); 1713 } else if (s.shouldPerformMajorCompaction()) { 1714 s.triggerMajorCompaction(); 1715 if ( 1716 majorCompactPriority == DEFAULT_PRIORITY 1717 || majorCompactPriority > hr.getCompactPriority() 1718 ) { 1719 this.instance.compactSplitThread.requestCompaction(hr, s, 1720 getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, 1721 CompactionLifeCycleTracker.DUMMY, null); 1722 } else { 1723 this.instance.compactSplitThread.requestCompaction(hr, s, 1724 getName() + " requests major compaction; use configured priority", 1725 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1726 } 1727 } 1728 } catch (IOException e) { 1729 LOG.warn("Failed major compaction check on " + hr, e); 1730 } 1731 } 1732 } 1733 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1734 } 1735 } 1736 1737 private static class PeriodicMemStoreFlusher extends ScheduledChore { 1738 private final HRegionServer server; 1739 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1740 private final static int MIN_DELAY_TIME = 0; // millisec 1741 private final long rangeOfDelayMs; 1742 1743 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1744 super("MemstoreFlusherChore", server, cacheFlushInterval); 1745 this.server = server; 1746 1747 final long configuredRangeOfDelay = server.getConfiguration() 1748 .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 1749 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 1750 } 1751 1752 @Override 1753 protected void chore() { 1754 final StringBuilder whyFlush = new StringBuilder(); 1755 for (HRegion r : this.server.onlineRegions.values()) { 1756 if (r == null) { 1757 continue; 1758 } 1759 if (r.shouldFlush(whyFlush)) { 1760 FlushRequester requester = server.getFlushRequester(); 1761 if (requester != null) { 1762 long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME; 1763 // Throttle the flushes by putting a delay. If we don't throttle, and there 1764 // is a balanced write-load on the regions in a table, we might end up 1765 // overwhelming the filesystem with too many flushes at once. 1766 if (requester.requestDelayedFlush(r, delay)) { 1767 LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(), 1768 r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay); 1769 } 1770 } 1771 } 1772 } 1773 } 1774 } 1775 1776 /** 1777 * Report the status of the server. A server is online once all the startup is completed (setting 1778 * up filesystem, starting executorService threads, etc.). This method is designed mostly to be 1779 * useful in tests. 1780 * @return true if online, false if not. 1781 */ 1782 public boolean isOnline() { 1783 return online.get(); 1784 } 1785 1786 /** 1787 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1788 * be hooked up to WAL. 1789 */ 1790 private void setupWALAndReplication() throws IOException { 1791 WALFactory factory = new WALFactory(conf, serverName, this); 1792 // TODO Replication make assumptions here based on the default filesystem impl 1793 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1794 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1795 1796 Path logDir = new Path(walRootDir, logName); 1797 LOG.debug("logDir={}", logDir); 1798 if (this.walFs.exists(logDir)) { 1799 throw new RegionServerRunningException( 1800 "Region server has already created directory at " + this.serverName.toString()); 1801 } 1802 // Always create wal directory as now we need this when master restarts to find out the live 1803 // region servers. 1804 if (!this.walFs.mkdirs(logDir)) { 1805 throw new IOException("Can not create wal directory " + logDir); 1806 } 1807 // Instantiate replication if replication enabled. Pass it the log directories. 1808 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); 1809 1810 WALActionsListener walEventListener = getWALEventTrackerListener(conf); 1811 if (walEventListener != null && factory.getWALProvider() != null) { 1812 factory.getWALProvider().addWALActionsListener(walEventListener); 1813 } 1814 this.walFactory = factory; 1815 } 1816 1817 private WALActionsListener getWALEventTrackerListener(Configuration conf) { 1818 if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) { 1819 WALEventTrackerListener listener = 1820 new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName()); 1821 return listener; 1822 } 1823 return null; 1824 } 1825 1826 /** 1827 * Start up replication source and sink handlers. 1828 */ 1829 private void startReplicationService() throws IOException { 1830 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 1831 this.replicationSourceHandler.startReplicationService(); 1832 } else { 1833 if (this.replicationSourceHandler != null) { 1834 this.replicationSourceHandler.startReplicationService(); 1835 } 1836 if (this.replicationSinkHandler != null) { 1837 this.replicationSinkHandler.startReplicationService(); 1838 } 1839 } 1840 } 1841 1842 /** Returns Master address tracker instance. */ 1843 public MasterAddressTracker getMasterAddressTracker() { 1844 return this.masterAddressTracker; 1845 } 1846 1847 /** 1848 * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need 1849 * to run. This is called after we've successfully registered with the Master. Install an 1850 * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We 1851 * cannot set the handler on all threads. Server's internal Listener thread is off limits. For 1852 * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries 1853 * to run should trigger same critical condition and the shutdown will run. On its way out, this 1854 * server will shut down Server. Leases are sort of inbetween. It has an internal thread that 1855 * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped 1856 * by this hosting server. Worker logs the exception and exits. 1857 */ 1858 private void startServices() throws IOException { 1859 if (!isStopped() && !isAborted()) { 1860 initializeThreads(); 1861 } 1862 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection); 1863 this.secureBulkLoadManager.start(); 1864 1865 // Health checker thread. 1866 if (isHealthCheckerConfigured()) { 1867 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1868 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1869 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1870 } 1871 // Executor status collect thread. 1872 if ( 1873 this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED, 1874 HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED) 1875 ) { 1876 int sleepTime = 1877 this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ); 1878 executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(), 1879 this.metricsRegionServer.getMetricsSource()); 1880 } 1881 1882 this.walRoller = new LogRoller(this); 1883 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1884 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 1885 1886 // Create the CompactedFileDischarger chore executorService. This chore helps to 1887 // remove the compacted files that will no longer be used in reads. 1888 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1889 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1890 int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1891 this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this); 1892 choreService.scheduleChore(compactedFileDischarger); 1893 1894 // Start executor services 1895 final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3); 1896 executorService.startExecutorService(executorService.new ExecutorConfig() 1897 .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads)); 1898 final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1); 1899 executorService.startExecutorService(executorService.new ExecutorConfig() 1900 .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads)); 1901 final int openPriorityRegionThreads = 1902 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3); 1903 executorService.startExecutorService( 1904 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION) 1905 .setCorePoolSize(openPriorityRegionThreads)); 1906 final int closeRegionThreads = 1907 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3); 1908 executorService.startExecutorService(executorService.new ExecutorConfig() 1909 .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads)); 1910 final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1); 1911 executorService.startExecutorService(executorService.new ExecutorConfig() 1912 .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads)); 1913 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1914 final int storeScannerParallelSeekThreads = 1915 conf.getInt("hbase.storescanner.parallel.seek.threads", 10); 1916 executorService.startExecutorService( 1917 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK) 1918 .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true)); 1919 } 1920 final int logReplayOpsThreads = 1921 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 1922 executorService.startExecutorService( 1923 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS) 1924 .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true)); 1925 // Start the threads for compacted files discharger 1926 final int compactionDischargerThreads = 1927 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10); 1928 executorService.startExecutorService(executorService.new ExecutorConfig() 1929 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER) 1930 .setCorePoolSize(compactionDischargerThreads)); 1931 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1932 final int regionReplicaFlushThreads = 1933 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1934 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1935 executorService.startExecutorService(executorService.new ExecutorConfig() 1936 .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS) 1937 .setCorePoolSize(regionReplicaFlushThreads)); 1938 } 1939 final int refreshPeerThreads = 1940 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2); 1941 executorService.startExecutorService(executorService.new ExecutorConfig() 1942 .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads)); 1943 final int replaySyncReplicationWALThreads = 1944 conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1); 1945 executorService.startExecutorService(executorService.new ExecutorConfig() 1946 .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL) 1947 .setCorePoolSize(replaySyncReplicationWALThreads)); 1948 final int switchRpcThrottleThreads = 1949 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); 1950 executorService.startExecutorService( 1951 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE) 1952 .setCorePoolSize(switchRpcThrottleThreads)); 1953 final int claimReplicationQueueThreads = 1954 conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); 1955 executorService.startExecutorService( 1956 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE) 1957 .setCorePoolSize(claimReplicationQueueThreads)); 1958 final int rsSnapshotOperationThreads = 1959 conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); 1960 executorService.startExecutorService( 1961 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) 1962 .setCorePoolSize(rsSnapshotOperationThreads)); 1963 final int rsFlushOperationThreads = 1964 conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3); 1965 executorService.startExecutorService(executorService.new ExecutorConfig() 1966 .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads)); 1967 final int rsRefreshQuotasThreads = 1968 conf.getInt("hbase.regionserver.executor.refresh.quotas.threads", 1); 1969 executorService.startExecutorService( 1970 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS) 1971 .setCorePoolSize(rsRefreshQuotasThreads)); 1972 final int logRollThreads = conf.getInt("hbase.regionserver.executor.log.roll.threads", 1); 1973 executorService.startExecutorService(executorService.new ExecutorConfig() 1974 .setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads)); 1975 1976 Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", 1977 uncaughtExceptionHandler); 1978 if (this.cacheFlusher != null) { 1979 this.cacheFlusher.start(uncaughtExceptionHandler); 1980 } 1981 Threads.setDaemonThreadRunning(this.procedureResultReporter, 1982 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 1983 1984 if (this.compactionChecker != null) { 1985 choreService.scheduleChore(compactionChecker); 1986 } 1987 if (this.periodicFlusher != null) { 1988 choreService.scheduleChore(periodicFlusher); 1989 } 1990 if (this.healthCheckChore != null) { 1991 choreService.scheduleChore(healthCheckChore); 1992 } 1993 if (this.executorStatusChore != null) { 1994 choreService.scheduleChore(executorStatusChore); 1995 } 1996 if (this.nonceManagerChore != null) { 1997 choreService.scheduleChore(nonceManagerChore); 1998 } 1999 if (this.storefileRefresher != null) { 2000 choreService.scheduleChore(storefileRefresher); 2001 } 2002 if (this.fsUtilizationChore != null) { 2003 choreService.scheduleChore(fsUtilizationChore); 2004 } 2005 if (this.namedQueueServiceChore != null) { 2006 choreService.scheduleChore(namedQueueServiceChore); 2007 } 2008 if (this.brokenStoreFileCleaner != null) { 2009 choreService.scheduleChore(brokenStoreFileCleaner); 2010 } 2011 if (this.rsMobFileCleanerChore != null) { 2012 choreService.scheduleChore(rsMobFileCleanerChore); 2013 } 2014 if (replicationMarkerChore != null) { 2015 LOG.info("Starting replication marker chore"); 2016 choreService.scheduleChore(replicationMarkerChore); 2017 } 2018 2019 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 2020 // an unhandled exception, it will just exit. 2021 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 2022 uncaughtExceptionHandler); 2023 2024 // Create the log splitting worker and start it 2025 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 2026 // quite a while inside Connection layer. The worker won't be available for other 2027 // tasks even after current task is preempted after a split task times out. 2028 Configuration sinkConf = HBaseConfiguration.create(conf); 2029 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2030 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 2031 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2032 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 2033 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 2034 if ( 2035 this.csm != null 2036 && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 2037 ) { 2038 // SplitLogWorker needs csm. If none, don't start this. 2039 this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); 2040 splitLogWorker.start(); 2041 LOG.debug("SplitLogWorker started"); 2042 } 2043 2044 // Memstore services. 2045 startHeapMemoryManager(); 2046 // Call it after starting HeapMemoryManager. 2047 initializeMemStoreChunkCreator(hMemManager); 2048 } 2049 2050 private void initializeThreads() { 2051 // Cache flushing thread. 2052 this.cacheFlusher = new MemStoreFlusher(conf, this); 2053 2054 // Compaction thread 2055 this.compactSplitThread = new CompactSplit(this); 2056 2057 // Prefetch Notifier 2058 this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 2059 2060 // Background thread to check for compactions; needed if region has not gotten updates 2061 // in a while. It will take care of not checking too frequently on store-by-store basis. 2062 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 2063 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 2064 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 2065 2066 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 2067 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 2068 final boolean walEventTrackerEnabled = 2069 conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); 2070 2071 if (isSlowLogTableEnabled || walEventTrackerEnabled) { 2072 // default chore duration: 10 min 2073 // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property 2074 final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY, 2075 DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION); 2076 2077 final int namedQueueChoreDuration = 2078 conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT); 2079 // Considering min of slowLogChoreDuration and namedQueueChoreDuration 2080 int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration); 2081 2082 namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration, 2083 this.namedQueueRecorder, this.getConnection()); 2084 } 2085 2086 if (this.nonceManager != null) { 2087 // Create the scheduled chore that cleans up nonces. 2088 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2089 } 2090 2091 // Setup the Quota Manager 2092 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2093 configurationManager.registerObserver(rsQuotaManager); 2094 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2095 2096 if (QuotaUtil.isQuotaEnabled(conf)) { 2097 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2098 } 2099 2100 boolean onlyMetaRefresh = false; 2101 int storefileRefreshPeriod = 2102 conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2103 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2104 if (storefileRefreshPeriod == 0) { 2105 storefileRefreshPeriod = 2106 conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2107 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2108 onlyMetaRefresh = true; 2109 } 2110 if (storefileRefreshPeriod > 0) { 2111 this.storefileRefresher = 2112 new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); 2113 } 2114 2115 int brokenStoreFileCleanerPeriod = 2116 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, 2117 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); 2118 int brokenStoreFileCleanerDelay = 2119 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, 2120 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); 2121 double brokenStoreFileCleanerDelayJitter = 2122 conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, 2123 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); 2124 double jitterRate = 2125 (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; 2126 long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); 2127 this.brokenStoreFileCleaner = 2128 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), 2129 brokenStoreFileCleanerPeriod, this, conf, this); 2130 2131 this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); 2132 2133 registerConfigurationObservers(); 2134 initializeReplicationMarkerChore(); 2135 } 2136 2137 private void registerConfigurationObservers() { 2138 // Register Replication if possible, as now we support recreating replication peer storage, for 2139 // migrating across different replication peer storages online 2140 if (replicationSourceHandler instanceof ConfigurationObserver) { 2141 configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler); 2142 } 2143 if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) { 2144 configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler); 2145 } 2146 // Registering the compactSplitThread object with the ConfigurationManager. 2147 configurationManager.registerObserver(this.compactSplitThread); 2148 configurationManager.registerObserver(this.cacheFlusher); 2149 configurationManager.registerObserver(this.rpcServices); 2150 configurationManager.registerObserver(this.prefetchExecutorNotifier); 2151 configurationManager.registerObserver(this); 2152 } 2153 2154 /* 2155 * Verify that server is healthy 2156 */ 2157 private boolean isHealthy() { 2158 if (!dataFsOk) { 2159 // File system problem 2160 return false; 2161 } 2162 // Verify that all threads are alive 2163 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2164 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2165 && (this.walRoller == null || this.walRoller.isAlive()) 2166 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2167 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2168 if (!healthy) { 2169 stop("One or more threads are no longer alive -- stop"); 2170 } 2171 return healthy; 2172 } 2173 2174 @Override 2175 public List<WAL> getWALs() { 2176 return walFactory.getWALs(); 2177 } 2178 2179 @Override 2180 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2181 WAL wal = walFactory.getWAL(regionInfo); 2182 if (this.walRoller != null) { 2183 this.walRoller.addWAL(wal); 2184 } 2185 return wal; 2186 } 2187 2188 public LogRoller getWalRoller() { 2189 return walRoller; 2190 } 2191 2192 public WALFactory getWalFactory() { 2193 return walFactory; 2194 } 2195 2196 @Override 2197 public void stop(final String msg) { 2198 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2199 } 2200 2201 /** 2202 * Stops the regionserver. 2203 * @param msg Status message 2204 * @param force True if this is a regionserver abort 2205 * @param user The user executing the stop request, or null if no user is associated 2206 */ 2207 public void stop(final String msg, final boolean force, final User user) { 2208 if (!this.stopped) { 2209 LOG.info("***** STOPPING region server '{}' *****", this); 2210 if (this.rsHost != null) { 2211 // when forced via abort don't allow CPs to override 2212 try { 2213 this.rsHost.preStop(msg, user); 2214 } catch (IOException ioe) { 2215 if (!force) { 2216 LOG.warn("The region server did not stop", ioe); 2217 return; 2218 } 2219 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2220 } 2221 } 2222 this.stopped = true; 2223 LOG.info("STOPPED: " + msg); 2224 // Wakes run() if it is sleeping 2225 sleeper.skipSleepCycle(); 2226 } 2227 } 2228 2229 public void waitForServerOnline() { 2230 while (!isStopped() && !isOnline()) { 2231 synchronized (online) { 2232 try { 2233 online.wait(msgInterval); 2234 } catch (InterruptedException ie) { 2235 Thread.currentThread().interrupt(); 2236 break; 2237 } 2238 } 2239 } 2240 } 2241 2242 @Override 2243 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2244 HRegion r = context.getRegion(); 2245 long openProcId = context.getOpenProcId(); 2246 long masterSystemTime = context.getMasterSystemTime(); 2247 long initiatingMasterActiveTime = context.getInitiatingMasterActiveTime(); 2248 rpcServices.checkOpen(); 2249 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2250 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2251 // Do checks to see if we need to compact (references or too many files) 2252 // Skip compaction check if region is read only 2253 if (!r.isReadOnly()) { 2254 for (HStore s : r.stores.values()) { 2255 if (s.hasReferences() || s.needsCompaction()) { 2256 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2257 } 2258 } 2259 } 2260 long openSeqNum = r.getOpenSeqNum(); 2261 if (openSeqNum == HConstants.NO_SEQNUM) { 2262 // If we opened a region, we should have read some sequence number from it. 2263 LOG.error( 2264 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2265 openSeqNum = 0; 2266 } 2267 2268 // Notify master 2269 if ( 2270 !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2271 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo(), initiatingMasterActiveTime)) 2272 ) { 2273 throw new IOException( 2274 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2275 } 2276 2277 triggerFlushInPrimaryRegion(r); 2278 2279 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2280 } 2281 2282 /** 2283 * Helper method for use in tests. Skip the region transition report when there's no master around 2284 * to receive it. 2285 */ 2286 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2287 final TransitionCode code = context.getCode(); 2288 final long openSeqNum = context.getOpenSeqNum(); 2289 long masterSystemTime = context.getMasterSystemTime(); 2290 final RegionInfo[] hris = context.getHris(); 2291 2292 if (code == TransitionCode.OPENED) { 2293 Preconditions.checkArgument(hris != null && hris.length == 1); 2294 if (hris[0].isMetaRegion()) { 2295 LOG.warn( 2296 "meta table location is stored in master local store, so we can not skip reporting"); 2297 return false; 2298 } else { 2299 try { 2300 MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0], 2301 serverName, openSeqNum, masterSystemTime); 2302 } catch (IOException e) { 2303 LOG.info("Failed to update meta", e); 2304 return false; 2305 } 2306 } 2307 } 2308 return true; 2309 } 2310 2311 private ReportRegionStateTransitionRequest 2312 createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) { 2313 final TransitionCode code = context.getCode(); 2314 final long openSeqNum = context.getOpenSeqNum(); 2315 final RegionInfo[] hris = context.getHris(); 2316 final long[] procIds = context.getProcIds(); 2317 2318 ReportRegionStateTransitionRequest.Builder builder = 2319 ReportRegionStateTransitionRequest.newBuilder(); 2320 builder.setServer(ProtobufUtil.toServerName(serverName)); 2321 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2322 transition.setTransitionCode(code); 2323 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2324 transition.setOpenSeqNum(openSeqNum); 2325 } 2326 for (RegionInfo hri : hris) { 2327 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2328 } 2329 for (long procId : procIds) { 2330 transition.addProcId(procId); 2331 } 2332 transition.setInitiatingMasterActiveTime(context.getInitiatingMasterActiveTime()); 2333 2334 return builder.build(); 2335 } 2336 2337 @Override 2338 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2339 if (TEST_SKIP_REPORTING_TRANSITION) { 2340 return skipReportingTransition(context); 2341 } 2342 final ReportRegionStateTransitionRequest request = 2343 createReportRegionStateTransitionRequest(context); 2344 2345 int tries = 0; 2346 long pauseTime = this.retryPauseTime; 2347 // Keep looping till we get an error. We want to send reports even though server is going down. 2348 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2349 // HRegionServer does down. 2350 while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) { 2351 RegionServerStatusService.BlockingInterface rss = rssStub; 2352 try { 2353 if (rss == null) { 2354 createRegionServerStatusStub(); 2355 continue; 2356 } 2357 ReportRegionStateTransitionResponse response = 2358 rss.reportRegionStateTransition(null, request); 2359 if (response.hasErrorMessage()) { 2360 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2361 break; 2362 } 2363 // Log if we had to retry else don't log unless TRACE. We want to 2364 // know if were successful after an attempt showed in logs as failed. 2365 if (tries > 0 || LOG.isTraceEnabled()) { 2366 LOG.info("TRANSITION REPORTED " + request); 2367 } 2368 // NOTE: Return mid-method!!! 2369 return true; 2370 } catch (ServiceException se) { 2371 IOException ioe = ProtobufUtil.getRemoteException(se); 2372 boolean pause = ioe instanceof ServerNotRunningYetException 2373 || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException; 2374 if (pause) { 2375 // Do backoff else we flood the Master with requests. 2376 pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries); 2377 } else { 2378 pauseTime = this.retryPauseTime; // Reset. 2379 } 2380 LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" 2381 + tries + ")" 2382 + (pause 2383 ? " after " + pauseTime + "ms delay (Master is coming online...)." 2384 : " immediately."), 2385 ioe); 2386 if (pause) { 2387 Threads.sleep(pauseTime); 2388 } 2389 tries++; 2390 if (rssStub == rss) { 2391 rssStub = null; 2392 } 2393 } 2394 } 2395 return false; 2396 } 2397 2398 /** 2399 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2400 * block this thread. See RegionReplicaFlushHandler for details. 2401 */ 2402 private void triggerFlushInPrimaryRegion(final HRegion region) { 2403 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2404 return; 2405 } 2406 TableName tn = region.getTableDescriptor().getTableName(); 2407 if ( 2408 !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) 2409 || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) || 2410 // If the memstore replication not setup, we do not have to wait for observing a flush event 2411 // from primary before starting to serve reads, because gaps from replication is not 2412 // applicable,this logic is from 2413 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by 2414 // HBASE-13063 2415 !region.getTableDescriptor().hasRegionMemStoreReplication() 2416 ) { 2417 region.setReadsEnabled(true); 2418 return; 2419 } 2420 2421 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2422 // RegionReplicaFlushHandler might reset this. 2423 2424 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2425 if (this.executorService != null) { 2426 this.executorService.submit(new RegionReplicaFlushHandler(this, region)); 2427 } else { 2428 LOG.info("Executor is null; not running flush of primary region replica for {}", 2429 region.getRegionInfo()); 2430 } 2431 } 2432 2433 @InterfaceAudience.Private 2434 public RSRpcServices getRSRpcServices() { 2435 return rpcServices; 2436 } 2437 2438 /** 2439 * Cause the server to exit without closing the regions it is serving, the log it is using and 2440 * without notifying the master. Used unit testing and on catastrophic events such as HDFS is 2441 * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused 2442 * the abort, or null 2443 */ 2444 @Override 2445 public void abort(String reason, Throwable cause) { 2446 if (!setAbortRequested()) { 2447 // Abort already in progress, ignore the new request. 2448 LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason); 2449 return; 2450 } 2451 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2452 if (cause != null) { 2453 LOG.error(HBaseMarkers.FATAL, msg, cause); 2454 } else { 2455 LOG.error(HBaseMarkers.FATAL, msg); 2456 } 2457 // HBASE-4014: show list of coprocessors that were loaded to help debug 2458 // regionserver crashes.Note that we're implicitly using 2459 // java.util.HashSet's toString() method to print the coprocessor names. 2460 LOG.error(HBaseMarkers.FATAL, 2461 "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors()); 2462 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2463 try { 2464 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2465 } catch (MalformedObjectNameException | IOException e) { 2466 LOG.warn("Failed dumping metrics", e); 2467 } 2468 2469 // Do our best to report our abort to the master, but this may not work 2470 try { 2471 if (cause != null) { 2472 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2473 } 2474 // Report to the master but only if we have already registered with the master. 2475 RegionServerStatusService.BlockingInterface rss = rssStub; 2476 if (rss != null && this.serverName != null) { 2477 ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); 2478 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2479 builder.setErrorMessage(msg); 2480 rss.reportRSFatalError(null, builder.build()); 2481 } 2482 } catch (Throwable t) { 2483 LOG.warn("Unable to report fatal error to master", t); 2484 } 2485 2486 scheduleAbortTimer(); 2487 // shutdown should be run as the internal user 2488 stop(reason, true, null); 2489 } 2490 2491 /* 2492 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does 2493 * close socket in case want to bring up server on old hostname+port immediately. 2494 */ 2495 @InterfaceAudience.Private 2496 protected void kill() { 2497 this.killed = true; 2498 abort("Simulated kill"); 2499 } 2500 2501 // Limits the time spent in the shutdown process. 2502 private void scheduleAbortTimer() { 2503 if (this.abortMonitor == null) { 2504 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2505 TimerTask abortTimeoutTask = null; 2506 try { 2507 Constructor<? extends TimerTask> timerTaskCtor = 2508 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2509 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2510 timerTaskCtor.setAccessible(true); 2511 abortTimeoutTask = timerTaskCtor.newInstance(); 2512 } catch (Exception e) { 2513 LOG.warn("Initialize abort timeout task failed", e); 2514 } 2515 if (abortTimeoutTask != null) { 2516 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2517 } 2518 } 2519 } 2520 2521 /** 2522 * Wait on all threads to finish. Presumption is that all closes and stops have already been 2523 * called. 2524 */ 2525 protected void stopServiceThreads() { 2526 // clean up the scheduled chores 2527 stopChoreService(); 2528 if (bootstrapNodeManager != null) { 2529 bootstrapNodeManager.stop(); 2530 } 2531 if (this.cacheFlusher != null) { 2532 this.cacheFlusher.shutdown(); 2533 } 2534 if (this.walRoller != null) { 2535 this.walRoller.close(); 2536 } 2537 if (this.compactSplitThread != null) { 2538 this.compactSplitThread.join(); 2539 } 2540 stopExecutorService(); 2541 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 2542 this.replicationSourceHandler.stopReplicationService(); 2543 } else { 2544 if (this.replicationSourceHandler != null) { 2545 this.replicationSourceHandler.stopReplicationService(); 2546 } 2547 if (this.replicationSinkHandler != null) { 2548 this.replicationSinkHandler.stopReplicationService(); 2549 } 2550 } 2551 } 2552 2553 /** Returns Return the object that implements the replication source executorService. */ 2554 @Override 2555 public ReplicationSourceService getReplicationSourceService() { 2556 return replicationSourceHandler; 2557 } 2558 2559 /** Returns Return the object that implements the replication sink executorService. */ 2560 public ReplicationSinkService getReplicationSinkService() { 2561 return replicationSinkHandler; 2562 } 2563 2564 /** 2565 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2566 * connection, the current rssStub must be null. Method will block until a master is available. 2567 * You can break from this block by requesting the server stop. 2568 * @return master + port, or null if server has been stopped 2569 */ 2570 private synchronized ServerName createRegionServerStatusStub() { 2571 // Create RS stub without refreshing the master node from ZK, use cached data 2572 return createRegionServerStatusStub(false); 2573 } 2574 2575 /** 2576 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2577 * connection, the current rssStub must be null. Method will block until a master is available. 2578 * You can break from this block by requesting the server stop. 2579 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2580 * @return master + port, or null if server has been stopped 2581 */ 2582 @InterfaceAudience.Private 2583 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2584 if (rssStub != null) { 2585 return masterAddressTracker.getMasterAddress(); 2586 } 2587 ServerName sn = null; 2588 long previousLogTime = 0; 2589 RegionServerStatusService.BlockingInterface intRssStub = null; 2590 LockService.BlockingInterface intLockStub = null; 2591 boolean interrupted = false; 2592 try { 2593 while (keepLooping()) { 2594 sn = this.masterAddressTracker.getMasterAddress(refresh); 2595 if (sn == null) { 2596 if (!keepLooping()) { 2597 // give up with no connection. 2598 LOG.debug("No master found and cluster is stopped; bailing out"); 2599 return null; 2600 } 2601 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2602 LOG.debug("No master found; retry"); 2603 previousLogTime = EnvironmentEdgeManager.currentTime(); 2604 } 2605 refresh = true; // let's try pull it from ZK directly 2606 if (sleepInterrupted(200)) { 2607 interrupted = true; 2608 } 2609 continue; 2610 } 2611 try { 2612 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, 2613 userProvider.getCurrent(), shortOperationTimeout); 2614 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2615 intLockStub = LockService.newBlockingStub(channel); 2616 break; 2617 } catch (IOException e) { 2618 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2619 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 2620 if (e instanceof ServerNotRunningYetException) { 2621 LOG.info("Master isn't available yet, retrying"); 2622 } else { 2623 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2624 } 2625 previousLogTime = EnvironmentEdgeManager.currentTime(); 2626 } 2627 if (sleepInterrupted(200)) { 2628 interrupted = true; 2629 } 2630 } 2631 } 2632 } finally { 2633 if (interrupted) { 2634 Thread.currentThread().interrupt(); 2635 } 2636 } 2637 this.rssStub = intRssStub; 2638 this.lockStub = intLockStub; 2639 return sn; 2640 } 2641 2642 /** 2643 * @return True if we should break loop because cluster is going down or this server has been 2644 * stopped or hdfs has gone bad. 2645 */ 2646 private boolean keepLooping() { 2647 return !this.stopped && isClusterUp(); 2648 } 2649 2650 /* 2651 * Let the master know we're here Run initialization using parameters passed us by the master. 2652 * @return A Map of key/value configurations we got from the Master else null if we failed to 2653 * register. 2654 */ 2655 private RegionServerStartupResponse reportForDuty() throws IOException { 2656 if (this.masterless) { 2657 return RegionServerStartupResponse.getDefaultInstance(); 2658 } 2659 ServerName masterServerName = createRegionServerStatusStub(true); 2660 RegionServerStatusService.BlockingInterface rss = rssStub; 2661 if (masterServerName == null || rss == null) { 2662 return null; 2663 } 2664 RegionServerStartupResponse result = null; 2665 try { 2666 rpcServices.requestCount.reset(); 2667 rpcServices.rpcGetRequestCount.reset(); 2668 rpcServices.rpcScanRequestCount.reset(); 2669 rpcServices.rpcFullScanRequestCount.reset(); 2670 rpcServices.rpcMultiRequestCount.reset(); 2671 rpcServices.rpcMutateRequestCount.reset(); 2672 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2673 + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode); 2674 long now = EnvironmentEdgeManager.currentTime(); 2675 int port = rpcServices.getSocketAddress().getPort(); 2676 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2677 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2678 request.setUseThisHostnameInstead(useThisHostnameInstead); 2679 } 2680 request.setPort(port); 2681 request.setServerStartCode(this.startcode); 2682 request.setServerCurrentTime(now); 2683 result = rss.regionServerStartup(null, request.build()); 2684 } catch (ServiceException se) { 2685 IOException ioe = ProtobufUtil.getRemoteException(se); 2686 if (ioe instanceof ClockOutOfSyncException) { 2687 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe); 2688 // Re-throw IOE will cause RS to abort 2689 throw ioe; 2690 } else if (ioe instanceof DecommissionedHostRejectedException) { 2691 LOG.error(HBaseMarkers.FATAL, 2692 "Master rejected startup because the host is considered decommissioned", ioe); 2693 // Re-throw IOE will cause RS to abort 2694 throw ioe; 2695 } else if (ioe instanceof ServerNotRunningYetException) { 2696 LOG.debug("Master is not running yet"); 2697 } else { 2698 LOG.warn("error telling master we are up", se); 2699 } 2700 rssStub = null; 2701 } 2702 return result; 2703 } 2704 2705 @Override 2706 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2707 try { 2708 GetLastFlushedSequenceIdRequest req = 2709 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2710 RegionServerStatusService.BlockingInterface rss = rssStub; 2711 if (rss == null) { // Try to connect one more time 2712 createRegionServerStatusStub(); 2713 rss = rssStub; 2714 if (rss == null) { 2715 // Still no luck, we tried 2716 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2717 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2718 .build(); 2719 } 2720 } 2721 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2722 return RegionStoreSequenceIds.newBuilder() 2723 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2724 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2725 } catch (ServiceException e) { 2726 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2727 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2728 .build(); 2729 } 2730 } 2731 2732 /** 2733 * Close meta region if we carry it 2734 * @param abort Whether we're running an abort. 2735 */ 2736 private void closeMetaTableRegions(final boolean abort) { 2737 HRegion meta = null; 2738 this.onlineRegionsLock.writeLock().lock(); 2739 try { 2740 for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) { 2741 RegionInfo hri = e.getValue().getRegionInfo(); 2742 if (hri.isMetaRegion()) { 2743 meta = e.getValue(); 2744 } 2745 if (meta != null) { 2746 break; 2747 } 2748 } 2749 } finally { 2750 this.onlineRegionsLock.writeLock().unlock(); 2751 } 2752 if (meta != null) { 2753 closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2754 } 2755 } 2756 2757 /** 2758 * Schedule closes on all user regions. Should be safe calling multiple times because it wont' 2759 * close regions that are already closed or that are closing. 2760 * @param abort Whether we're running an abort. 2761 */ 2762 private void closeUserRegions(final boolean abort) { 2763 this.onlineRegionsLock.writeLock().lock(); 2764 try { 2765 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 2766 HRegion r = e.getValue(); 2767 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2768 // Don't update zk with this close transition; pass false. 2769 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2770 } 2771 } 2772 } finally { 2773 this.onlineRegionsLock.writeLock().unlock(); 2774 } 2775 } 2776 2777 protected Map<String, HRegion> getOnlineRegions() { 2778 return this.onlineRegions; 2779 } 2780 2781 public int getNumberOfOnlineRegions() { 2782 return this.onlineRegions.size(); 2783 } 2784 2785 /** 2786 * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM 2787 * as client; HRegion cannot be serialized to cross an rpc. 2788 */ 2789 public Collection<HRegion> getOnlineRegionsLocalContext() { 2790 Collection<HRegion> regions = this.onlineRegions.values(); 2791 return Collections.unmodifiableCollection(regions); 2792 } 2793 2794 @Override 2795 public void addRegion(HRegion region) { 2796 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2797 configurationManager.registerObserver(region); 2798 } 2799 2800 private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region, 2801 long size) { 2802 if (!sortedRegions.containsKey(size)) { 2803 sortedRegions.put(size, new ArrayList<>()); 2804 } 2805 sortedRegions.get(size).add(region); 2806 } 2807 2808 /** 2809 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2810 * the biggest. 2811 */ 2812 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2813 // we'll sort the regions in reverse 2814 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2815 // Copy over all regions. Regions are sorted by size with biggest first. 2816 for (HRegion region : this.onlineRegions.values()) { 2817 addRegion(sortedRegions, region, region.getMemStoreOffHeapSize()); 2818 } 2819 return sortedRegions; 2820 } 2821 2822 /** 2823 * @return A new Map of online regions sorted by region heap size with the first entry being the 2824 * biggest. 2825 */ 2826 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2827 // we'll sort the regions in reverse 2828 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2829 // Copy over all regions. Regions are sorted by size with biggest first. 2830 for (HRegion region : this.onlineRegions.values()) { 2831 addRegion(sortedRegions, region, region.getMemStoreHeapSize()); 2832 } 2833 return sortedRegions; 2834 } 2835 2836 /** Returns reference to FlushRequester */ 2837 @Override 2838 public FlushRequester getFlushRequester() { 2839 return this.cacheFlusher; 2840 } 2841 2842 @Override 2843 public CompactionRequester getCompactionRequestor() { 2844 return this.compactSplitThread; 2845 } 2846 2847 @Override 2848 public LeaseManager getLeaseManager() { 2849 return leaseManager; 2850 } 2851 2852 /** Returns {@code true} when the data file system is available, {@code false} otherwise. */ 2853 boolean isDataFileSystemOk() { 2854 return this.dataFsOk; 2855 } 2856 2857 public RegionServerCoprocessorHost getRegionServerCoprocessorHost() { 2858 return this.rsHost; 2859 } 2860 2861 @Override 2862 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2863 return this.regionsInTransitionInRS; 2864 } 2865 2866 @Override 2867 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 2868 return rsQuotaManager; 2869 } 2870 2871 // 2872 // Main program and support routines 2873 // 2874 /** 2875 * Load the replication executorService objects, if any 2876 */ 2877 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 2878 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { 2879 // read in the name of the source replication class from the config file. 2880 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 2881 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2882 2883 // read in the name of the sink replication class from the config file. 2884 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 2885 HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT); 2886 2887 // If both the sink and the source class names are the same, then instantiate 2888 // only one object. 2889 if (sourceClassname.equals(sinkClassname)) { 2890 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2891 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2892 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 2893 server.sameReplicationSourceAndSink = true; 2894 } else { 2895 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2896 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2897 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 2898 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2899 server.sameReplicationSourceAndSink = false; 2900 } 2901 } 2902 2903 private static <T extends ReplicationService> T newReplicationInstance(String classname, 2904 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 2905 Path oldLogDir, WALFactory walFactory) throws IOException { 2906 final Class<? extends T> clazz; 2907 try { 2908 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 2909 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 2910 } catch (java.lang.ClassNotFoundException nfe) { 2911 throw new IOException("Could not find class for " + classname); 2912 } 2913 T service = ReflectionUtils.newInstance(clazz, conf); 2914 service.initialize(server, walFs, logDir, oldLogDir, walFactory); 2915 return service; 2916 } 2917 2918 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() { 2919 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 2920 if (!this.isOnline()) { 2921 return walGroupsReplicationStatus; 2922 } 2923 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 2924 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 2925 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 2926 for (ReplicationSourceInterface source : allSources) { 2927 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 2928 } 2929 return walGroupsReplicationStatus; 2930 } 2931 2932 /** 2933 * Utility for constructing an instance of the passed HRegionServer class. 2934 */ 2935 static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass, 2936 final Configuration conf) { 2937 try { 2938 Constructor<? extends HRegionServer> c = 2939 regionServerClass.getConstructor(Configuration.class); 2940 return c.newInstance(conf); 2941 } catch (Exception e) { 2942 throw new RuntimeException( 2943 "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); 2944 } 2945 } 2946 2947 /** 2948 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 2949 */ 2950 public static void main(String[] args) { 2951 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 2952 VersionInfo.logVersion(); 2953 Configuration conf = HBaseConfiguration.create(); 2954 @SuppressWarnings("unchecked") 2955 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 2956 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 2957 2958 new HRegionServerCommandLine(regionServerClass).doMain(args); 2959 } 2960 2961 /** 2962 * Gets the online regions of the specified table. This method looks at the in-memory 2963 * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions. 2964 * If a region on this table has been closed during a disable, etc., it will not be included in 2965 * the returned list. So, the returned list may not necessarily be ALL regions in this table, its 2966 * all the ONLINE regions in the table. 2967 * @param tableName table to limit the scope of the query 2968 * @return Online regions from <code>tableName</code> 2969 */ 2970 @Override 2971 public List<HRegion> getRegions(TableName tableName) { 2972 List<HRegion> tableRegions = new ArrayList<>(); 2973 synchronized (this.onlineRegions) { 2974 for (HRegion region : this.onlineRegions.values()) { 2975 RegionInfo regionInfo = region.getRegionInfo(); 2976 if (regionInfo.getTable().equals(tableName)) { 2977 tableRegions.add(region); 2978 } 2979 } 2980 } 2981 return tableRegions; 2982 } 2983 2984 @Override 2985 public List<HRegion> getRegions() { 2986 List<HRegion> allRegions; 2987 synchronized (this.onlineRegions) { 2988 // Return a clone copy of the onlineRegions 2989 allRegions = new ArrayList<>(onlineRegions.values()); 2990 } 2991 return allRegions; 2992 } 2993 2994 /** 2995 * Gets the online tables in this RS. This method looks at the in-memory onlineRegions. 2996 * @return all the online tables in this RS 2997 */ 2998 public Set<TableName> getOnlineTables() { 2999 Set<TableName> tables = new HashSet<>(); 3000 synchronized (this.onlineRegions) { 3001 for (Region region : this.onlineRegions.values()) { 3002 tables.add(region.getTableDescriptor().getTableName()); 3003 } 3004 } 3005 return tables; 3006 } 3007 3008 public String[] getRegionServerCoprocessors() { 3009 TreeSet<String> coprocessors = new TreeSet<>(); 3010 try { 3011 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3012 } catch (IOException exception) { 3013 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " 3014 + "skipping."); 3015 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3016 } 3017 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3018 for (HRegion region : regions) { 3019 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3020 try { 3021 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3022 } catch (IOException exception) { 3023 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region 3024 + "; skipping."); 3025 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3026 } 3027 } 3028 coprocessors.addAll(rsHost.getCoprocessors()); 3029 return coprocessors.toArray(new String[0]); 3030 } 3031 3032 /** 3033 * Try to close the region, logs a warning on failure but continues. 3034 * @param region Region to close 3035 */ 3036 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3037 try { 3038 if (!closeRegion(region.getEncodedName(), abort, null)) { 3039 LOG 3040 .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing"); 3041 } 3042 } catch (IOException e) { 3043 LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing", 3044 e); 3045 } 3046 } 3047 3048 /** 3049 * Close asynchronously a region, can be called from the master or internally by the regionserver 3050 * when stopping. If called from the master, the region will update the status. 3051 * <p> 3052 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3053 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3054 * </p> 3055 * <p> 3056 * If a close was in progress, this new request will be ignored, and an exception thrown. 3057 * </p> 3058 * <p> 3059 * Provides additional flag to indicate if this region blocks should be evicted from the cache. 3060 * </p> 3061 * @param encodedName Region to close 3062 * @param abort True if we are aborting 3063 * @param destination Where the Region is being moved too... maybe null if unknown. 3064 * @return True if closed a region. 3065 * @throws NotServingRegionException if the region is not online 3066 */ 3067 protected boolean closeRegion(String encodedName, final boolean abort, 3068 final ServerName destination) throws NotServingRegionException { 3069 // Check for permissions to close. 3070 HRegion actualRegion = this.getRegion(encodedName); 3071 // Can be null if we're calling close on a region that's not online 3072 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3073 try { 3074 actualRegion.getCoprocessorHost().preClose(false); 3075 } catch (IOException exp) { 3076 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3077 return false; 3078 } 3079 } 3080 3081 // previous can come back 'null' if not in map. 3082 final Boolean previous = 3083 this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE); 3084 3085 if (Boolean.TRUE.equals(previous)) { 3086 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " 3087 + "trying to OPEN. Cancelling OPENING."); 3088 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3089 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3090 // We're going to try to do a standard close then. 3091 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." 3092 + " Doing a standard close now"); 3093 return closeRegion(encodedName, abort, destination); 3094 } 3095 // Let's get the region from the online region list again 3096 actualRegion = this.getRegion(encodedName); 3097 if (actualRegion == null) { // If already online, we still need to close it. 3098 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3099 // The master deletes the znode when it receives this exception. 3100 throw new NotServingRegionException( 3101 "The region " + encodedName + " was opening but not yet served. Opening is cancelled."); 3102 } 3103 } else if (previous == null) { 3104 LOG.info("Received CLOSE for {}", encodedName); 3105 } else if (Boolean.FALSE.equals(previous)) { 3106 LOG.info("Received CLOSE for the region: " + encodedName 3107 + ", which we are already trying to CLOSE, but not completed yet"); 3108 return true; 3109 } 3110 3111 if (actualRegion == null) { 3112 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3113 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3114 // The master deletes the znode when it receives this exception. 3115 throw new NotServingRegionException( 3116 "The region " + encodedName + " is not online, and is not opening."); 3117 } 3118 3119 CloseRegionHandler crh; 3120 final RegionInfo hri = actualRegion.getRegionInfo(); 3121 if (hri.isMetaRegion()) { 3122 crh = new CloseMetaHandler(this, this, hri, abort); 3123 } else { 3124 crh = new CloseRegionHandler(this, this, hri, abort, destination); 3125 } 3126 this.executorService.submit(crh); 3127 return true; 3128 } 3129 3130 /** 3131 * @return HRegion for the passed binary <code>regionName</code> or null if named region is not 3132 * member of the online regions. 3133 */ 3134 public HRegion getOnlineRegion(final byte[] regionName) { 3135 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3136 return this.onlineRegions.get(encodedRegionName); 3137 } 3138 3139 @Override 3140 public HRegion getRegion(final String encodedRegionName) { 3141 return this.onlineRegions.get(encodedRegionName); 3142 } 3143 3144 @Override 3145 public boolean removeRegion(final HRegion r, ServerName destination) { 3146 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3147 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3148 if (destination != null) { 3149 long closeSeqNum = r.getMaxFlushedSeqId(); 3150 if (closeSeqNum == HConstants.NO_SEQNUM) { 3151 // No edits in WAL for this region; get the sequence number when the region was opened. 3152 closeSeqNum = r.getOpenSeqNum(); 3153 if (closeSeqNum == HConstants.NO_SEQNUM) { 3154 closeSeqNum = 0; 3155 } 3156 } 3157 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3158 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3159 if (selfMove) { 3160 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put( 3161 r.getRegionInfo().getEncodedName(), 3162 new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3163 } 3164 } 3165 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3166 configurationManager.deregisterObserver(r); 3167 return toReturn != null; 3168 } 3169 3170 /** 3171 * Protected Utility method for safely obtaining an HRegion handle. 3172 * @param regionName Name of online {@link HRegion} to return 3173 * @return {@link HRegion} for <code>regionName</code> 3174 */ 3175 protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { 3176 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3177 return getRegionByEncodedName(regionName, encodedRegionName); 3178 } 3179 3180 public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { 3181 return getRegionByEncodedName(null, encodedRegionName); 3182 } 3183 3184 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3185 throws NotServingRegionException { 3186 HRegion region = this.onlineRegions.get(encodedRegionName); 3187 if (region == null) { 3188 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3189 if (moveInfo != null) { 3190 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3191 } 3192 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3193 String regionNameStr = 3194 regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName); 3195 if (isOpening != null && isOpening) { 3196 throw new RegionOpeningException( 3197 "Region " + regionNameStr + " is opening on " + this.serverName); 3198 } 3199 throw new NotServingRegionException( 3200 "" + regionNameStr + " is not online on " + this.serverName); 3201 } 3202 return region; 3203 } 3204 3205 /** 3206 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't 3207 * already. 3208 * @param t Throwable 3209 * @param msg Message to log in error. Can be null. 3210 * @return Throwable converted to an IOE; methods can only let out IOEs. 3211 */ 3212 private Throwable cleanup(final Throwable t, final String msg) { 3213 // Don't log as error if NSRE; NSRE is 'normal' operation. 3214 if (t instanceof NotServingRegionException) { 3215 LOG.debug("NotServingRegionException; " + t.getMessage()); 3216 return t; 3217 } 3218 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3219 if (msg == null) { 3220 LOG.error("", e); 3221 } else { 3222 LOG.error(msg, e); 3223 } 3224 if (!rpcServices.checkOOME(t)) { 3225 checkFileSystem(); 3226 } 3227 return t; 3228 } 3229 3230 /** 3231 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3232 * @return Make <code>t</code> an IOE if it isn't already. 3233 */ 3234 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3235 return (t instanceof IOException ? (IOException) t 3236 : msg == null || msg.length() == 0 ? new IOException(t) 3237 : new IOException(msg, t)); 3238 } 3239 3240 /** 3241 * Checks to see if the file system is still accessible. If not, sets abortRequested and 3242 * stopRequested 3243 * @return false if file system is not available 3244 */ 3245 boolean checkFileSystem() { 3246 if (this.dataFsOk && this.dataFs != null) { 3247 try { 3248 FSUtils.checkFileSystemAvailable(this.dataFs); 3249 } catch (IOException e) { 3250 abort("File System not available", e); 3251 this.dataFsOk = false; 3252 } 3253 } 3254 return this.dataFsOk; 3255 } 3256 3257 @Override 3258 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3259 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3260 Address[] addr = new Address[favoredNodes.size()]; 3261 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3262 // it is a map of region name to Address[] 3263 for (int i = 0; i < favoredNodes.size(); i++) { 3264 addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); 3265 } 3266 regionFavoredNodesMap.put(encodedRegionName, addr); 3267 } 3268 3269 /** 3270 * Return the favored nodes for a region given its encoded name. Look at the comment around 3271 * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here. 3272 * @param encodedRegionName the encoded region name. 3273 * @return array of favored locations 3274 */ 3275 @Override 3276 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3277 return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); 3278 } 3279 3280 @Override 3281 public ServerNonceManager getNonceManager() { 3282 return this.nonceManager; 3283 } 3284 3285 private static class MovedRegionInfo { 3286 private final ServerName serverName; 3287 private final long seqNum; 3288 3289 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3290 this.serverName = serverName; 3291 this.seqNum = closeSeqNum; 3292 } 3293 3294 public ServerName getServerName() { 3295 return serverName; 3296 } 3297 3298 public long getSeqNum() { 3299 return seqNum; 3300 } 3301 } 3302 3303 /** 3304 * We need a timeout. If not there is a risk of giving a wrong information: this would double the 3305 * number of network calls instead of reducing them. 3306 */ 3307 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3308 3309 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, 3310 boolean selfMove) { 3311 if (selfMove) { 3312 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3313 return; 3314 } 3315 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" 3316 + closeSeqNum); 3317 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3318 } 3319 3320 void removeFromMovedRegions(String encodedName) { 3321 movedRegionInfoCache.invalidate(encodedName); 3322 } 3323 3324 @InterfaceAudience.Private 3325 public MovedRegionInfo getMovedRegion(String encodedRegionName) { 3326 return movedRegionInfoCache.getIfPresent(encodedRegionName); 3327 } 3328 3329 @InterfaceAudience.Private 3330 public int movedRegionCacheExpiredTime() { 3331 return TIMEOUT_REGION_MOVED; 3332 } 3333 3334 private String getMyEphemeralNodePath() { 3335 return zooKeeper.getZNodePaths().getRsPath(serverName); 3336 } 3337 3338 private boolean isHealthCheckerConfigured() { 3339 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3340 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3341 } 3342 3343 /** Returns the underlying {@link CompactSplit} for the servers */ 3344 public CompactSplit getCompactSplitThread() { 3345 return this.compactSplitThread; 3346 } 3347 3348 CoprocessorServiceResponse execRegionServerService( 3349 @SuppressWarnings("UnusedParameters") final RpcController controller, 3350 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3351 try { 3352 ServerRpcController serviceController = new ServerRpcController(); 3353 CoprocessorServiceCall call = serviceRequest.getCall(); 3354 String serviceName = call.getServiceName(); 3355 Service service = coprocessorServiceHandlers.get(serviceName); 3356 if (service == null) { 3357 throw new UnknownProtocolException(null, 3358 "No registered coprocessor executorService found for " + serviceName); 3359 } 3360 ServiceDescriptor serviceDesc = service.getDescriptorForType(); 3361 3362 String methodName = call.getMethodName(); 3363 MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName); 3364 if (methodDesc == null) { 3365 throw new UnknownProtocolException(service.getClass(), 3366 "Unknown method " + methodName + " called on executorService " + serviceName); 3367 } 3368 3369 Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3370 final Message.Builder responseBuilder = 3371 service.getResponsePrototype(methodDesc).newBuilderForType(); 3372 service.callMethod(methodDesc, serviceController, request, message -> { 3373 if (message != null) { 3374 responseBuilder.mergeFrom(message); 3375 } 3376 }); 3377 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3378 if (exception != null) { 3379 throw exception; 3380 } 3381 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3382 } catch (IOException ie) { 3383 throw new ServiceException(ie); 3384 } 3385 } 3386 3387 /** 3388 * May be null if this is a master which not carry table. 3389 * @return The block cache instance used by the regionserver. 3390 */ 3391 @Override 3392 public Optional<BlockCache> getBlockCache() { 3393 return Optional.ofNullable(this.blockCache); 3394 } 3395 3396 /** 3397 * May be null if this is a master which not carry table. 3398 * @return The cache for mob files used by the regionserver. 3399 */ 3400 @Override 3401 public Optional<MobFileCache> getMobFileCache() { 3402 return Optional.ofNullable(this.mobFileCache); 3403 } 3404 3405 CacheEvictionStats clearRegionBlockCache(Region region) { 3406 long evictedBlocks = 0; 3407 3408 for (Store store : region.getStores()) { 3409 for (StoreFile hFile : store.getStorefiles()) { 3410 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3411 } 3412 } 3413 3414 return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build(); 3415 } 3416 3417 @Override 3418 public double getCompactionPressure() { 3419 double max = 0; 3420 for (Region region : onlineRegions.values()) { 3421 for (Store store : region.getStores()) { 3422 double normCount = store.getCompactionPressure(); 3423 if (normCount > max) { 3424 max = normCount; 3425 } 3426 } 3427 } 3428 return max; 3429 } 3430 3431 @Override 3432 public HeapMemoryManager getHeapMemoryManager() { 3433 return hMemManager; 3434 } 3435 3436 public MemStoreFlusher getMemStoreFlusher() { 3437 return cacheFlusher; 3438 } 3439 3440 /** 3441 * For testing 3442 * @return whether all wal roll request finished for this regionserver 3443 */ 3444 @InterfaceAudience.Private 3445 public boolean walRollRequestFinished() { 3446 return this.walRoller.walRollFinished(); 3447 } 3448 3449 @Override 3450 public ThroughputController getFlushThroughputController() { 3451 return flushThroughputController; 3452 } 3453 3454 @Override 3455 public double getFlushPressure() { 3456 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3457 // return 0 during RS initialization 3458 return 0.0; 3459 } 3460 return getRegionServerAccounting().getFlushPressure(); 3461 } 3462 3463 @Override 3464 public void onConfigurationChange(Configuration newConf) { 3465 ThroughputController old = this.flushThroughputController; 3466 if (old != null) { 3467 old.stop("configuration change"); 3468 } 3469 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3470 try { 3471 Superusers.initialize(newConf); 3472 } catch (IOException e) { 3473 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3474 } 3475 3476 // update region server coprocessor if the configuration has changed. 3477 if ( 3478 CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf, 3479 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY) 3480 ) { 3481 LOG.info("Update region server coprocessors because the configuration has changed"); 3482 this.rsHost = new RegionServerCoprocessorHost(this, newConf); 3483 } 3484 } 3485 3486 @Override 3487 public MetricsRegionServer getMetrics() { 3488 return metricsRegionServer; 3489 } 3490 3491 @Override 3492 public SecureBulkLoadManager getSecureBulkLoadManager() { 3493 return this.secureBulkLoadManager; 3494 } 3495 3496 @Override 3497 public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description, 3498 final Abortable abort) { 3499 final LockServiceClient client = 3500 new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator()); 3501 return client.regionLock(regionInfo, description, abort); 3502 } 3503 3504 @Override 3505 public void unassign(byte[] regionName) throws IOException { 3506 FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false)); 3507 } 3508 3509 @Override 3510 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3511 return this.rsSpaceQuotaManager; 3512 } 3513 3514 @Override 3515 public boolean reportFileArchivalForQuotas(TableName tableName, 3516 Collection<Entry<String, Long>> archivedFiles) { 3517 if (TEST_SKIP_REPORTING_TRANSITION) { 3518 return false; 3519 } 3520 RegionServerStatusService.BlockingInterface rss = rssStub; 3521 if (rss == null || rsSpaceQuotaManager == null) { 3522 // the current server could be stopping. 3523 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3524 return false; 3525 } 3526 try { 3527 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3528 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3529 rss.reportFileArchival(null, request); 3530 } catch (ServiceException se) { 3531 IOException ioe = ProtobufUtil.getRemoteException(se); 3532 if (ioe instanceof PleaseHoldException) { 3533 if (LOG.isTraceEnabled()) { 3534 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3535 + " This will be retried.", ioe); 3536 } 3537 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3538 return false; 3539 } 3540 if (rssStub == rss) { 3541 rssStub = null; 3542 } 3543 // re-create the stub if we failed to report the archival 3544 createRegionServerStatusStub(true); 3545 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3546 return false; 3547 } 3548 return true; 3549 } 3550 3551 void executeProcedure(long procId, long initiatingMasterActiveTime, 3552 RSProcedureCallable callable) { 3553 executorService 3554 .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable)); 3555 } 3556 3557 public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, Throwable error, 3558 byte[] procResultData) { 3559 procedureResultReporter.complete(procId, initiatingMasterActiveTime, error, procResultData); 3560 } 3561 3562 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3563 RegionServerStatusService.BlockingInterface rss; 3564 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 3565 for (;;) { 3566 rss = rssStub; 3567 if (rss != null) { 3568 break; 3569 } 3570 createRegionServerStatusStub(); 3571 } 3572 try { 3573 rss.reportProcedureDone(null, request); 3574 } catch (ServiceException se) { 3575 if (rssStub == rss) { 3576 rssStub = null; 3577 } 3578 throw ProtobufUtil.getRemoteException(se); 3579 } 3580 } 3581 3582 /** 3583 * Will ignore the open/close region procedures which already submitted or executed. When master 3584 * had unfinished open/close region procedure and restarted, new active master may send duplicate 3585 * open/close region request to regionserver. The open/close request is submitted to a thread pool 3586 * and execute. So first need a cache for submitted open/close region procedures. After the 3587 * open/close region request executed and report region transition succeed, cache it in executed 3588 * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3589 * transition succeed, master will not send the open/close region request to regionserver again. 3590 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3591 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See 3592 * HBASE-22404 for more details. 3593 * @param procId the id of the open/close region procedure 3594 * @return true if the procedure can be submitted. 3595 */ 3596 boolean submitRegionProcedure(long procId) { 3597 if (procId == -1) { 3598 return true; 3599 } 3600 // Ignore the region procedures which already submitted. 3601 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3602 if (previous != null) { 3603 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3604 return false; 3605 } 3606 // Ignore the region procedures which already executed. 3607 if (executedRegionProcedures.getIfPresent(procId) != null) { 3608 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3609 return false; 3610 } 3611 return true; 3612 } 3613 3614 /** 3615 * See {@link #submitRegionProcedure(long)}. 3616 * @param procId the id of the open/close region procedure 3617 */ 3618 public void finishRegionProcedure(long procId) { 3619 executedRegionProcedures.put(procId, procId); 3620 submittedRegionProcedures.remove(procId); 3621 } 3622 3623 /** 3624 * Force to terminate region server when abort timeout. 3625 */ 3626 private static class SystemExitWhenAbortTimeout extends TimerTask { 3627 3628 public SystemExitWhenAbortTimeout() { 3629 } 3630 3631 @Override 3632 public void run() { 3633 LOG.warn("Aborting region server timed out, terminating forcibly" 3634 + " and does not wait for any running shutdown hooks or finalizers to finish their work." 3635 + " Thread dump to stdout."); 3636 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3637 Runtime.getRuntime().halt(1); 3638 } 3639 } 3640 3641 @InterfaceAudience.Private 3642 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 3643 return compactedFileDischarger; 3644 } 3645 3646 /** 3647 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}} 3648 * @return pause time 3649 */ 3650 @InterfaceAudience.Private 3651 public long getRetryPauseTime() { 3652 return this.retryPauseTime; 3653 } 3654 3655 @Override 3656 public Optional<ServerName> getActiveMaster() { 3657 return Optional.ofNullable(masterAddressTracker.getMasterAddress()); 3658 } 3659 3660 @Override 3661 public List<ServerName> getBackupMasters() { 3662 return masterAddressTracker.getBackupMasters(); 3663 } 3664 3665 @Override 3666 public Iterator<ServerName> getBootstrapNodes() { 3667 return bootstrapNodeManager.getBootstrapNodes().iterator(); 3668 } 3669 3670 @Override 3671 public List<HRegionLocation> getMetaLocations() { 3672 return metaRegionLocationCache.getMetaRegionLocations(); 3673 } 3674 3675 @Override 3676 protected NamedQueueRecorder createNamedQueueRecord() { 3677 return NamedQueueRecorder.getInstance(conf); 3678 } 3679 3680 @Override 3681 protected boolean clusterMode() { 3682 // this method will be called in the constructor of super class, so we can not return masterless 3683 // directly here, as it will always be false. 3684 return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 3685 } 3686 3687 @InterfaceAudience.Private 3688 public BrokenStoreFileCleaner getBrokenStoreFileCleaner() { 3689 return brokenStoreFileCleaner; 3690 } 3691 3692 @InterfaceAudience.Private 3693 public RSMobFileCleanerChore getRSMobFileCleanerChore() { 3694 return rsMobFileCleanerChore; 3695 } 3696 3697 RSSnapshotVerifier getRsSnapshotVerifier() { 3698 return rsSnapshotVerifier; 3699 } 3700 3701 @Override 3702 protected void stopChores() { 3703 shutdownChore(nonceManagerChore); 3704 shutdownChore(compactionChecker); 3705 shutdownChore(compactedFileDischarger); 3706 shutdownChore(periodicFlusher); 3707 shutdownChore(healthCheckChore); 3708 shutdownChore(executorStatusChore); 3709 shutdownChore(storefileRefresher); 3710 shutdownChore(fsUtilizationChore); 3711 shutdownChore(namedQueueServiceChore); 3712 shutdownChore(brokenStoreFileCleaner); 3713 shutdownChore(rsMobFileCleanerChore); 3714 shutdownChore(replicationMarkerChore); 3715 } 3716 3717 @Override 3718 public RegionReplicationBufferManager getRegionReplicationBufferManager() { 3719 return regionReplicationBufferManager; 3720 } 3721}