001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 024import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; 025 026import io.opentelemetry.api.trace.Span; 027import io.opentelemetry.api.trace.StatusCode; 028import io.opentelemetry.context.Scope; 029import java.io.IOException; 030import java.io.PrintWriter; 031import java.lang.management.MemoryType; 032import java.lang.management.MemoryUsage; 033import java.lang.reflect.Constructor; 034import java.net.BindException; 035import java.net.InetAddress; 036import java.net.InetSocketAddress; 037import java.time.Duration; 038import java.util.ArrayList; 039import java.util.Collection; 040import java.util.Collections; 041import java.util.Comparator; 042import java.util.HashSet; 043import java.util.Iterator; 044import java.util.List; 045import java.util.Map; 046import java.util.Map.Entry; 047import java.util.Objects; 048import java.util.Optional; 049import java.util.Set; 050import java.util.SortedMap; 051import java.util.Timer; 052import java.util.TimerTask; 053import java.util.TreeMap; 054import java.util.TreeSet; 055import java.util.concurrent.ConcurrentHashMap; 056import java.util.concurrent.ConcurrentMap; 057import java.util.concurrent.ConcurrentSkipListMap; 058import java.util.concurrent.ThreadLocalRandom; 059import java.util.concurrent.TimeUnit; 060import java.util.concurrent.atomic.AtomicBoolean; 061import java.util.concurrent.locks.ReentrantReadWriteLock; 062import java.util.stream.Collectors; 063import javax.management.MalformedObjectNameException; 064import javax.servlet.http.HttpServlet; 065import org.apache.commons.lang3.StringUtils; 066import org.apache.commons.lang3.SystemUtils; 067import org.apache.hadoop.conf.Configuration; 068import org.apache.hadoop.fs.FileSystem; 069import org.apache.hadoop.fs.Path; 070import org.apache.hadoop.hbase.Abortable; 071import org.apache.hadoop.hbase.CacheEvictionStats; 072import org.apache.hadoop.hbase.CallQueueTooBigException; 073import org.apache.hadoop.hbase.ChoreService; 074import org.apache.hadoop.hbase.ClockOutOfSyncException; 075import org.apache.hadoop.hbase.CoordinatedStateManager; 076import org.apache.hadoop.hbase.DoNotRetryIOException; 077import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException; 078import org.apache.hadoop.hbase.HBaseConfiguration; 079import org.apache.hadoop.hbase.HBaseInterfaceAudience; 080import org.apache.hadoop.hbase.HConstants; 081import org.apache.hadoop.hbase.HDFSBlocksDistribution; 082import org.apache.hadoop.hbase.HealthCheckChore; 083import org.apache.hadoop.hbase.MetaRegionLocationCache; 084import org.apache.hadoop.hbase.MetaTableAccessor; 085import org.apache.hadoop.hbase.NotServingRegionException; 086import org.apache.hadoop.hbase.PleaseHoldException; 087import org.apache.hadoop.hbase.ScheduledChore; 088import org.apache.hadoop.hbase.Server; 089import org.apache.hadoop.hbase.ServerName; 090import org.apache.hadoop.hbase.Stoppable; 091import org.apache.hadoop.hbase.TableDescriptors; 092import org.apache.hadoop.hbase.TableName; 093import org.apache.hadoop.hbase.YouAreDeadException; 094import org.apache.hadoop.hbase.ZNodeClearer; 095import org.apache.hadoop.hbase.client.ClusterConnection; 096import org.apache.hadoop.hbase.client.Connection; 097import org.apache.hadoop.hbase.client.ConnectionUtils; 098import org.apache.hadoop.hbase.client.RegionInfo; 099import org.apache.hadoop.hbase.client.RegionInfoBuilder; 100import org.apache.hadoop.hbase.client.RegionServerRegistry; 101import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 102import org.apache.hadoop.hbase.client.ServerConnectionUtils; 103import org.apache.hadoop.hbase.client.locking.EntityLock; 104import org.apache.hadoop.hbase.client.locking.LockServiceClient; 105import org.apache.hadoop.hbase.conf.ConfigurationManager; 106import org.apache.hadoop.hbase.conf.ConfigurationObserver; 107import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 108import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 109import org.apache.hadoop.hbase.exceptions.RegionMovedException; 110import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 111import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 112import org.apache.hadoop.hbase.executor.ExecutorService; 113import org.apache.hadoop.hbase.executor.ExecutorType; 114import org.apache.hadoop.hbase.fs.HFileSystem; 115import org.apache.hadoop.hbase.http.InfoServer; 116import org.apache.hadoop.hbase.io.hfile.BlockCache; 117import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 118import org.apache.hadoop.hbase.io.hfile.HFile; 119import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 120import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 121import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; 122import org.apache.hadoop.hbase.ipc.RpcClient; 123import org.apache.hadoop.hbase.ipc.RpcClientFactory; 124import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 125import org.apache.hadoop.hbase.ipc.RpcServer; 126import org.apache.hadoop.hbase.ipc.RpcServerInterface; 127import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 128import org.apache.hadoop.hbase.ipc.ServerRpcController; 129import org.apache.hadoop.hbase.log.HBaseMarkers; 130import org.apache.hadoop.hbase.master.HMaster; 131import org.apache.hadoop.hbase.master.LoadBalancer; 132import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; 133import org.apache.hadoop.hbase.mob.MobFileCache; 134import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; 135import org.apache.hadoop.hbase.monitoring.TaskMonitor; 136import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 137import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; 138import org.apache.hadoop.hbase.net.Address; 139import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 140import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 141import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 142import org.apache.hadoop.hbase.quotas.QuotaUtil; 143import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 144import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 145import org.apache.hadoop.hbase.quotas.RegionSize; 146import org.apache.hadoop.hbase.quotas.RegionSizeStore; 147import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 148import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 149import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 150import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 151import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 152import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 153import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 154import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 155import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet; 156import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; 157import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 158import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 159import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 160import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 161import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 162import org.apache.hadoop.hbase.security.SecurityConstants; 163import org.apache.hadoop.hbase.security.Superusers; 164import org.apache.hadoop.hbase.security.User; 165import org.apache.hadoop.hbase.security.UserProvider; 166import org.apache.hadoop.hbase.security.access.AccessChecker; 167import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; 168import org.apache.hadoop.hbase.trace.TraceUtil; 169import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent; 170import org.apache.hadoop.hbase.util.Addressing; 171import org.apache.hadoop.hbase.util.Bytes; 172import org.apache.hadoop.hbase.util.CommonFSUtils; 173import org.apache.hadoop.hbase.util.CompressionTest; 174import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 175import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 176import org.apache.hadoop.hbase.util.FSTableDescriptors; 177import org.apache.hadoop.hbase.util.FSUtils; 178import org.apache.hadoop.hbase.util.JvmPauseMonitor; 179import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 180import org.apache.hadoop.hbase.util.Pair; 181import org.apache.hadoop.hbase.util.RetryCounter; 182import org.apache.hadoop.hbase.util.RetryCounterFactory; 183import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 184import org.apache.hadoop.hbase.util.Sleeper; 185import org.apache.hadoop.hbase.util.Threads; 186import org.apache.hadoop.hbase.util.VersionInfo; 187import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 188import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; 189import org.apache.hadoop.hbase.wal.WAL; 190import org.apache.hadoop.hbase.wal.WALFactory; 191import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; 192import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 193import org.apache.hadoop.hbase.zookeeper.RegionServerAddressTracker; 194import org.apache.hadoop.hbase.zookeeper.ZKAuthentication; 195import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 196import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 197import org.apache.hadoop.hbase.zookeeper.ZKUtil; 198import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 199import org.apache.hadoop.ipc.RemoteException; 200import org.apache.hadoop.util.ReflectionUtils; 201import org.apache.yetus.audience.InterfaceAudience; 202import org.apache.zookeeper.KeeperException; 203import org.slf4j.Logger; 204import org.slf4j.LoggerFactory; 205 206import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 207import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 208import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 209import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 210import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 211import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 212import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 213import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 214import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 215import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 216 217import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 218import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 248 249/** 250 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There 251 * are many HRegionServers in a single HBase deployment. 252 */ 253@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 254@SuppressWarnings({ "deprecation" }) 255public class HRegionServer extends Thread 256 implements RegionServerServices, LastSequenceId, ConfigurationObserver { 257 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 258 259 /** 260 * For testing only! Set to true to skip notifying region assignment to master . 261 */ 262 @InterfaceAudience.Private 263 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") 264 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 265 266 /** 267 * A map from RegionName to current action in progress. Boolean value indicates: true - if open 268 * region action in progress false - if close region action in progress 269 */ 270 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 271 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 272 273 /** 274 * Used to cache the open/close region procedures which already submitted. See 275 * {@link #submitRegionProcedure(long)}. 276 */ 277 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 278 /** 279 * Used to cache the open/close region procedures which already executed. See 280 * {@link #submitRegionProcedure(long)}. 281 */ 282 private final Cache<Long, Long> executedRegionProcedures = 283 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 284 285 /** 286 * Used to cache the moved-out regions 287 */ 288 private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder() 289 .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build(); 290 291 private MemStoreFlusher cacheFlusher; 292 293 private HeapMemoryManager hMemManager; 294 295 /** 296 * Cluster connection to be shared by services. Initialized at server startup and closed when 297 * server shuts down. Clients must never close it explicitly. Clients hosted by this Server should 298 * make use of this clusterConnection rather than create their own; if they create their own, 299 * there is no way for the hosting server to shutdown ongoing client RPCs. 300 */ 301 protected ClusterConnection clusterConnection; 302 303 /** 304 * Go here to get table descriptors. 305 */ 306 protected TableDescriptors tableDescriptors; 307 308 // Replication services. If no replication, this handler will be null. 309 private ReplicationSourceService replicationSourceHandler; 310 private ReplicationSinkService replicationSinkHandler; 311 312 // Compactions 313 private CompactSplit compactSplitThread; 314 315 /** 316 * Map of regions currently being served by this region server. Key is the encoded region name. 317 * All access should be synchronized. 318 */ 319 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 320 /** 321 * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it 322 * need to be a ConcurrentHashMap? 323 */ 324 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 325 326 /** 327 * Map of encoded region names to the DataNode locations they should be hosted on We store the 328 * value as Address since InetSocketAddress is required by the HDFS API (create() that takes 329 * favored nodes as hints for placing file blocks). We could have used ServerName here as the 330 * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API 331 * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and 332 * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the 333 * conversion on demand from Address to InetSocketAddress will guarantee the resolution results 334 * will be fresh when we need it. 335 */ 336 private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 337 338 private LeaseManager leaseManager; 339 340 // Instance of the hbase executor executorService. 341 protected ExecutorService executorService; 342 343 private volatile boolean dataFsOk; 344 private HFileSystem dataFs; 345 private HFileSystem walFs; 346 347 // Set when a report to the master comes back with a message asking us to 348 // shutdown. Also set by call to stop when debugging or running unit tests 349 // of HRegionServer in isolation. 350 private volatile boolean stopped = false; 351 352 // Go down hard. Used if file system becomes unavailable and also in 353 // debugging and unit tests. 354 private AtomicBoolean abortRequested; 355 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 356 // Default abort timeout is 1200 seconds for safe 357 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 358 // Will run this task when abort timeout 359 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 360 361 // A state before we go into stopped state. At this stage we're closing user 362 // space regions. 363 private boolean stopping = false; 364 private volatile boolean killed = false; 365 private volatile boolean shutDown = false; 366 367 protected final Configuration conf; 368 369 private Path dataRootDir; 370 private Path walRootDir; 371 372 private final int threadWakeFrequency; 373 final int msgInterval; 374 375 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 376 private final int compactionCheckFrequency; 377 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 378 private final int flushCheckFrequency; 379 380 // Stub to do region server status calls against the master. 381 private volatile RegionServerStatusService.BlockingInterface rssStub; 382 private volatile LockService.BlockingInterface lockStub; 383 // RPC client. Used to make the stub above that does region server status checking. 384 private RpcClient rpcClient; 385 386 private RpcRetryingCallerFactory rpcRetryingCallerFactory; 387 private RpcControllerFactory rpcControllerFactory; 388 389 private UncaughtExceptionHandler uncaughtExceptionHandler; 390 391 // Info server. Default access so can be used by unit tests. REGIONSERVER 392 // is name of the webapp and the attribute name used stuffing this instance 393 // into web context. 394 protected InfoServer infoServer; 395 private JvmPauseMonitor pauseMonitor; 396 397 /** region server process name */ 398 public static final String REGIONSERVER = "regionserver"; 399 400 private MetricsRegionServer metricsRegionServer; 401 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 402 403 /** 404 * ChoreService used to schedule tasks that we want to run periodically 405 */ 406 private ChoreService choreService; 407 408 /** 409 * Check for compactions requests. 410 */ 411 private ScheduledChore compactionChecker; 412 413 /** 414 * Check for flushes 415 */ 416 private ScheduledChore periodicFlusher; 417 418 private volatile WALFactory walFactory; 419 420 private LogRoller walRoller; 421 422 // A thread which calls reportProcedureDone 423 private RemoteProcedureResultReporter procedureResultReporter; 424 425 // flag set after we're done setting up server threads 426 final AtomicBoolean online = new AtomicBoolean(false); 427 428 // zookeeper connection and watcher 429 protected final ZKWatcher zooKeeper; 430 431 // master address tracker 432 private final MasterAddressTracker masterAddressTracker; 433 434 /** 435 * Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache 436 * entries. Used for serving ClientMetaService. 437 */ 438 private final MetaRegionLocationCache metaRegionLocationCache; 439 /** 440 * Cache for all the region servers in the cluster. Used for serving ClientMetaService. 441 */ 442 private final RegionServerAddressTracker regionServerAddressTracker; 443 444 // Cluster Status Tracker 445 protected final ClusterStatusTracker clusterStatusTracker; 446 447 // Log Splitting Worker 448 private SplitLogWorker splitLogWorker; 449 450 // A sleeper that sleeps for msgInterval. 451 protected final Sleeper sleeper; 452 453 private final int operationTimeout; 454 private final int shortOperationTimeout; 455 456 // Time to pause if master says 'please hold' 457 private final long retryPauseTime; 458 459 private final RegionServerAccounting regionServerAccounting; 460 461 private SlowLogTableOpsChore slowLogTableOpsChore = null; 462 463 // Block cache 464 private BlockCache blockCache; 465 // The cache for mob files 466 private MobFileCache mobFileCache; 467 468 /** The health check chore. */ 469 private HealthCheckChore healthCheckChore; 470 471 /** The nonce manager chore. */ 472 private ScheduledChore nonceManagerChore; 473 474 private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap(); 475 476 /** 477 * The server name the Master sees us as. Its made from the hostname the master passes us, port, 478 * and server startcode. Gets set after registration against Master. 479 */ 480 protected ServerName serverName; 481 482 /** 483 * hostname specified by hostname config 484 */ 485 protected String useThisHostnameInstead; 486 487 /** 488 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use 489 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead. 490 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a> 491 */ 492 @Deprecated 493 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 494 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 495 "hbase.regionserver.hostname.disable.master.reversedns"; 496 497 /** 498 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive. 499 * Exception will be thrown if both are used. 500 */ 501 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 502 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 503 "hbase.unsafe.regionserver.hostname.disable.master.reversedns"; 504 505 /** 506 * This servers startcode. 507 */ 508 protected final long startcode; 509 510 /** 511 * Unique identifier for the cluster we are a part of. 512 */ 513 protected String clusterId; 514 515 // chore for refreshing store files for secondary regions 516 private StorefileRefresherChore storefileRefresher; 517 518 private volatile RegionServerCoprocessorHost rsHost; 519 520 private RegionServerProcedureManagerHost rspmHost; 521 522 private RegionServerRpcQuotaManager rsQuotaManager; 523 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 524 525 /** 526 * Nonce manager. Nonces are used to make operations like increment and append idempotent in the 527 * case where client doesn't receive the response from a successful operation and retries. We 528 * track the successful ops for some time via a nonce sent by client and handle duplicate 529 * operations (currently, by failing them; in future we might use MVCC to return result). Nonces 530 * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL 531 * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past 532 * records. If we don't read the records, we don't read and recover the nonces. Some WALs within 533 * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL 534 * recovery during normal region move, so nonces will not be transfered. We can have separate 535 * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path 536 * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a 537 * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part 538 * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't 539 * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be 540 * recovered during move. 541 */ 542 final ServerNonceManager nonceManager; 543 544 private UserProvider userProvider; 545 546 protected final RSRpcServices rpcServices; 547 548 private CoordinatedStateManager csm; 549 550 /** 551 * Configuration manager is used to register/deregister and notify the configuration observers 552 * when the regionserver is notified that there was a change in the on disk configs. 553 */ 554 protected final ConfigurationManager configurationManager; 555 556 private BrokenStoreFileCleaner brokenStoreFileCleaner; 557 558 private RSMobFileCleanerChore rsMobFileCleanerChore; 559 560 @InterfaceAudience.Private 561 CompactedHFilesDischarger compactedFileDischarger; 562 563 private volatile ThroughputController flushThroughputController; 564 565 private SecureBulkLoadManager secureBulkLoadManager; 566 567 private FileSystemUtilizationChore fsUtilizationChore; 568 569 private final NettyEventLoopGroupConfig eventLoopGroupConfig; 570 571 /** 572 * Provide online slow log responses from ringbuffer 573 */ 574 private NamedQueueRecorder namedQueueRecorder = null; 575 576 /** 577 * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to 578 * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing 579 * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a 580 * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace 581 * {@link #TEST_SKIP_REPORTING_TRANSITION} ? 582 */ 583 private final boolean masterless; 584 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 585 586 /** regionserver codec list **/ 587 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 588 589 // A timer to shutdown the process if abort takes too long 590 private Timer abortMonitor; 591 592 /** 593 * Starts a HRegionServer at the default location. 594 * <p/> 595 * Don't start any services or managers in here in the Constructor. Defer till after we register 596 * with the Master as much as possible. See {@link #startServices}. 597 */ 598 public HRegionServer(final Configuration conf) throws IOException { 599 super("RegionServer"); // thread name 600 final Span span = TraceUtil.createSpan("HRegionServer.cxtor"); 601 try (Scope ignored = span.makeCurrent()) { 602 this.startcode = EnvironmentEdgeManager.currentTime(); 603 this.conf = conf; 604 this.dataFsOk = true; 605 this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 606 this.eventLoopGroupConfig = setupNetty(this.conf); 607 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); 608 HFile.checkHFileVersion(this.conf); 609 checkCodecs(this.conf); 610 this.userProvider = UserProvider.instantiate(conf); 611 FSUtils.setupShortCircuitRead(this.conf); 612 613 // Disable usage of meta replicas in the regionserver 614 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 615 // Config'ed params 616 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 617 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 618 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 619 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); 620 621 this.sleeper = new Sleeper(this.msgInterval, this); 622 623 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 624 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 625 626 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 627 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 628 629 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 630 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 631 632 this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME, 633 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); 634 635 this.abortRequested = new AtomicBoolean(false); 636 this.stopped = false; 637 638 initNamedQueueRecorder(conf); 639 rpcServices = createRpcServices(); 640 useThisHostnameInstead = getUseThisHostnameInstead(conf); 641 String hostName = StringUtils.isBlank(useThisHostnameInstead) 642 ? this.rpcServices.isa.getHostName() 643 : this.useThisHostnameInstead; 644 serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); 645 646 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); 647 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); 648 649 // login the zookeeper client principal (if using security) 650 ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, 651 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); 652 // login the server principal (if using secure Hadoop) 653 login(userProvider, hostName); 654 // init superusers and add the server principal (if using security) 655 // or process owner as default super user. 656 Superusers.initialize(conf); 657 regionServerAccounting = new RegionServerAccounting(conf); 658 659 boolean isMasterNotCarryTable = 660 this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf); 661 662 // no need to instantiate block cache and mob file cache when master not carry table 663 if (!isMasterNotCarryTable) { 664 blockCache = BlockCacheFactory.createBlockCache(conf); 665 mobFileCache = new MobFileCache(conf); 666 } 667 668 uncaughtExceptionHandler = 669 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 670 671 initializeFileSystem(); 672 673 this.configurationManager = new ConfigurationManager(); 674 setupWindows(getConfiguration(), getConfigurationManager()); 675 676 // Some unit tests don't need a cluster, so no zookeeper at all 677 // Open connection to zookeeper and set primary watcher 678 zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this, 679 canCreateBaseZNode()); 680 // If no master in cluster, skip trying to track one or look for a cluster status. 681 if (!this.masterless) { 682 if ( 683 conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 684 ) { 685 this.csm = new ZkCoordinatedStateManager(this); 686 } 687 688 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 689 masterAddressTracker.start(); 690 691 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); 692 clusterStatusTracker.start(); 693 } else { 694 masterAddressTracker = null; 695 clusterStatusTracker = null; 696 } 697 this.rpcServices.start(zooKeeper); 698 this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper); 699 if (!(this instanceof HMaster)) { 700 // do not create this field for HMaster, we have another region server tracker for HMaster. 701 this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this); 702 } else { 703 this.regionServerAddressTracker = null; 704 } 705 // This violates 'no starting stuff in Constructor' but Master depends on the below chore 706 // and executor being created and takes a different startup route. Lots of overlap between HRS 707 // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super 708 // Master expects Constructor to put up web servers. Ugh. 709 // class HRS. TODO. 710 this.choreService = new ChoreService(getName(), true); 711 this.executorService = new ExecutorService(getName()); 712 putUpWebUI(); 713 span.setStatus(StatusCode.OK); 714 } catch (Throwable t) { 715 // Make sure we log the exception. HRegionServer is often started via reflection and the 716 // cause of failed startup is lost. 717 TraceUtil.setError(span, t); 718 LOG.error("Failed construction RegionServer", t); 719 throw t; 720 } finally { 721 span.end(); 722 } 723 } 724 725 private void initNamedQueueRecorder(Configuration conf) { 726 if (!(this instanceof HMaster)) { 727 final boolean isOnlineLogProviderEnabled = conf.getBoolean( 728 HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 729 if (isOnlineLogProviderEnabled) { 730 this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); 731 } 732 } else { 733 final boolean isBalancerDecisionRecording = 734 conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, 735 BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); 736 final boolean isBalancerRejectionRecording = 737 conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, 738 BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED); 739 if (isBalancerDecisionRecording || isBalancerRejectionRecording) { 740 this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); 741 } 742 } 743 } 744 745 // HMaster should override this method to load the specific config for master 746 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 747 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); 748 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 749 if (!StringUtils.isBlank(hostname)) { 750 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " 751 + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " 752 + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " 753 + UNSAFE_RS_HOSTNAME_KEY + " is used"; 754 throw new IOException(msg); 755 } else { 756 return rpcServices.isa.getHostName(); 757 } 758 } else { 759 return hostname; 760 } 761 } 762 763 /** 764 * If running on Windows, do windows-specific setup. 765 */ 766 private static void setupWindows(final Configuration conf, ConfigurationManager cm) { 767 if (!SystemUtils.IS_OS_WINDOWS) { 768 HBasePlatformDependent.handle("HUP", (number, name) -> { 769 conf.reloadConfiguration(); 770 cm.notifyAllObservers(conf); 771 }); 772 } 773 } 774 775 private static NettyEventLoopGroupConfig setupNetty(Configuration conf) { 776 // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL. 777 NettyEventLoopGroupConfig nelgc = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); 778 NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 779 NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 780 return nelgc; 781 } 782 783 private void initializeFileSystem() throws IOException { 784 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase 785 // checksum verification enabled, then automatically switch off hdfs checksum verification. 786 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 787 String walDirUri = CommonFSUtils.getDirUri(this.conf, 788 new Path(conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR)))); 789 // set WAL's uri 790 if (walDirUri != null) { 791 CommonFSUtils.setFsDefault(this.conf, walDirUri); 792 } 793 // init the WALFs 794 this.walFs = new HFileSystem(this.conf, useHBaseChecksum); 795 this.walRootDir = CommonFSUtils.getWALRootDir(this.conf); 796 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else 797 // underlying hadoop hdfs accessors will be going against wrong filesystem 798 // (unless all is set to defaults). 799 String rootDirUri = 800 CommonFSUtils.getDirUri(this.conf, new Path(conf.get(HConstants.HBASE_DIR))); 801 if (rootDirUri != null) { 802 CommonFSUtils.setFsDefault(this.conf, rootDirUri); 803 } 804 // init the filesystem 805 this.dataFs = new HFileSystem(this.conf, useHBaseChecksum); 806 this.dataRootDir = CommonFSUtils.getRootDir(this.conf); 807 this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir, 808 !canUpdateTableDescriptor(), cacheTableDescriptor()); 809 } 810 811 protected void login(UserProvider user, String host) throws IOException { 812 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 813 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 814 } 815 816 /** 817 * Wait for an active Master. See override in Master superclass for how it is used. 818 */ 819 protected void waitForMasterActive() { 820 } 821 822 protected String getProcessName() { 823 return REGIONSERVER; 824 } 825 826 protected boolean canCreateBaseZNode() { 827 return this.masterless; 828 } 829 830 protected boolean canUpdateTableDescriptor() { 831 return false; 832 } 833 834 protected boolean cacheTableDescriptor() { 835 return false; 836 } 837 838 protected RSRpcServices createRpcServices() throws IOException { 839 return new RSRpcServices(this); 840 } 841 842 protected void configureInfoServer() { 843 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 844 infoServer.setAttribute(REGIONSERVER, this); 845 } 846 847 protected Class<? extends HttpServlet> getDumpServlet() { 848 return RSDumpServlet.class; 849 } 850 851 /** 852 * Used by {@link RSDumpServlet} to generate debugging information. 853 */ 854 public void dumpRowLocks(final PrintWriter out) { 855 StringBuilder sb = new StringBuilder(); 856 for (HRegion region : getRegions()) { 857 if (region.getLockedRows().size() > 0) { 858 for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) { 859 sb.setLength(0); 860 sb.append(region.getTableDescriptor().getTableName()).append(",") 861 .append(region.getRegionInfo().getEncodedName()).append(","); 862 sb.append(rowLockContext.toString()); 863 out.println(sb); 864 } 865 } 866 } 867 } 868 869 @Override 870 public boolean registerService(com.google.protobuf.Service instance) { 871 /* 872 * No stacking of instances is allowed for a single executorService name 873 */ 874 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType(); 875 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 876 if (coprocessorServiceHandlers.containsKey(serviceName)) { 877 LOG.error("Coprocessor executorService " + serviceName 878 + " already registered, rejecting request from " + instance); 879 return false; 880 } 881 882 coprocessorServiceHandlers.put(serviceName, instance); 883 if (LOG.isDebugEnabled()) { 884 LOG.debug( 885 "Registered regionserver coprocessor executorService: executorService=" + serviceName); 886 } 887 return true; 888 } 889 890 protected ClusterConnection createClusterConnection() throws IOException { 891 // Create a cluster connection that when appropriate, can short-circuit and go directly to the 892 // local server if the request is to the local server bypassing RPC. Can be used for both local 893 // and remote invocations. 894 return ServerConnectionUtils.createShortCircuitConnection(conf, userProvider.getCurrent(), 895 serverName, rpcServices, rpcServices, new RegionServerRegistry(this)); 896 } 897 898 /** 899 * Run test on configured codecs to make sure supporting libs are in place. 900 * @param c configuration object 901 * @throws IOException if compression test fails for any regionserver codec 902 */ 903 private static void checkCodecs(final Configuration c) throws IOException { 904 // check to see if the codec list is available: 905 String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null); 906 if (codecs == null) { 907 return; 908 } 909 for (String codec : codecs) { 910 if (!CompressionTest.testCompression(codec)) { 911 throw new IOException( 912 "Compression codec " + codec + " not supported, aborting RS construction"); 913 } 914 } 915 } 916 917 public String getClusterId() { 918 return this.clusterId; 919 } 920 921 /** 922 * Setup our cluster connection if not already initialized. 923 */ 924 protected synchronized void setupClusterConnection() throws IOException { 925 if (clusterConnection == null) { 926 clusterConnection = createClusterConnection(); 927 } 928 } 929 930 /** 931 * All initialization needed before we go register with Master.<br> 932 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 933 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 934 */ 935 private void preRegistrationInitialization() { 936 final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization"); 937 try (Scope ignored = span.makeCurrent()) { 938 initializeZooKeeper(); 939 setupClusterConnection(); 940 // Setup RPC client for master communication 941 this.rpcClient = RpcClientFactory.createClient(conf, clusterId, 942 new InetSocketAddress(this.rpcServices.isa.getAddress(), 0), 943 clusterConnection.getConnectionMetrics()); 944 span.setStatus(StatusCode.OK); 945 } catch (Throwable t) { 946 // Call stop if error or process will stick around for ever since server 947 // puts up non-daemon threads. 948 TraceUtil.setError(span, t); 949 this.rpcServices.stop(); 950 abort("Initialization of RS failed. Hence aborting RS.", t); 951 } finally { 952 span.end(); 953 } 954 } 955 956 /** 957 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 958 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 959 * <p> 960 * Finally open long-living server short-circuit connection. 961 */ 962 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 963 justification = "cluster Id znode read would give us correct response") 964 private void initializeZooKeeper() throws IOException, InterruptedException { 965 // Nothing to do in here if no Master in the mix. 966 if (this.masterless) { 967 return; 968 } 969 970 // Create the master address tracker, register with zk, and start it. Then 971 // block until a master is available. No point in starting up if no master 972 // running. 973 blockAndCheckIfStopped(this.masterAddressTracker); 974 975 // Wait on cluster being up. Master will set this flag up in zookeeper 976 // when ready. 977 blockAndCheckIfStopped(this.clusterStatusTracker); 978 979 // If we are HMaster then the cluster id should have already been set. 980 if (clusterId == null) { 981 // Retrieve clusterId 982 // Since cluster status is now up 983 // ID should have already been set by HMaster 984 try { 985 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 986 if (clusterId == null) { 987 this.abort("Cluster ID has not been set"); 988 } 989 LOG.info("ClusterId : " + clusterId); 990 } catch (KeeperException e) { 991 this.abort("Failed to retrieve Cluster ID", e); 992 } 993 } 994 995 waitForMasterActive(); 996 if (isStopped() || isAborted()) { 997 return; // No need for further initialization 998 } 999 1000 // watch for snapshots and other procedures 1001 try { 1002 rspmHost = new RegionServerProcedureManagerHost(); 1003 rspmHost.loadProcedures(conf); 1004 rspmHost.initialize(this); 1005 } catch (KeeperException e) { 1006 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 1007 } 1008 } 1009 1010 /** 1011 * Utilty method to wait indefinitely on a znode availability while checking if the region server 1012 * is shut down 1013 * @param tracker znode tracker to use 1014 * @throws IOException any IO exception, plus if the RS is stopped 1015 * @throws InterruptedException if the waiting thread is interrupted 1016 */ 1017 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 1018 throws IOException, InterruptedException { 1019 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 1020 if (this.stopped) { 1021 throw new IOException("Received the shutdown message while waiting."); 1022 } 1023 } 1024 } 1025 1026 /** Returns True if the cluster is up. */ 1027 @Override 1028 public boolean isClusterUp() { 1029 return this.masterless 1030 || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 1031 } 1032 1033 /** 1034 * The HRegionServer sticks in this loop until closed. 1035 */ 1036 @Override 1037 public void run() { 1038 if (isStopped()) { 1039 LOG.info("Skipping run; stopped"); 1040 return; 1041 } 1042 try { 1043 // Do pre-registration initializations; zookeeper, lease threads, etc. 1044 preRegistrationInitialization(); 1045 } catch (Throwable e) { 1046 abort("Fatal exception during initialization", e); 1047 } 1048 1049 try { 1050 if (!isStopped() && !isAborted()) { 1051 ShutdownHook.install(conf, dataFs, this, Thread.currentThread()); 1052 // Initialize the RegionServerCoprocessorHost now that our ephemeral 1053 // node was created, in case any coprocessors want to use ZooKeeper 1054 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 1055 1056 // Try and register with the Master; tell it we are here. Break if server is stopped or 1057 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 1058 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 1059 // come up. 1060 LOG.debug("About to register with Master."); 1061 TraceUtil.trace(() -> { 1062 RetryCounterFactory rcf = 1063 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 1064 RetryCounter rc = rcf.create(); 1065 while (keepLooping()) { 1066 RegionServerStartupResponse w = reportForDuty(); 1067 if (w == null) { 1068 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 1069 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 1070 this.sleeper.sleep(sleepTime); 1071 } else { 1072 handleReportForDutyResponse(w); 1073 break; 1074 } 1075 } 1076 }, "HRegionServer.registerWithMaster"); 1077 } 1078 1079 if (!isStopped() && isHealthy()) { 1080 TraceUtil.trace(() -> { 1081 // start the snapshot handler and other procedure handlers, 1082 // since the server is ready to run 1083 if (this.rspmHost != null) { 1084 this.rspmHost.start(); 1085 } 1086 // Start the Quota Manager 1087 if (this.rsQuotaManager != null) { 1088 rsQuotaManager.start(getRpcServer().getScheduler()); 1089 } 1090 if (this.rsSpaceQuotaManager != null) { 1091 this.rsSpaceQuotaManager.start(); 1092 } 1093 }, "HRegionServer.startup"); 1094 } 1095 1096 // We registered with the Master. Go into run mode. 1097 long lastMsg = EnvironmentEdgeManager.currentTime(); 1098 long oldRequestCount = -1; 1099 // The main run loop. 1100 while (!isStopped() && isHealthy()) { 1101 if (!isClusterUp()) { 1102 if (onlineRegions.isEmpty()) { 1103 stop("Exiting; cluster shutdown set and not carrying any regions"); 1104 } else if (!this.stopping) { 1105 this.stopping = true; 1106 LOG.info("Closing user regions"); 1107 closeUserRegions(this.abortRequested.get()); 1108 } else { 1109 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 1110 if (allUserRegionsOffline) { 1111 // Set stopped if no more write requests tp meta tables 1112 // since last time we went around the loop. Any open 1113 // meta regions will be closed on our way out. 1114 if (oldRequestCount == getWriteRequestCount()) { 1115 stop("Stopped; only catalog regions remaining online"); 1116 break; 1117 } 1118 oldRequestCount = getWriteRequestCount(); 1119 } else { 1120 // Make sure all regions have been closed -- some regions may 1121 // have not got it because we were splitting at the time of 1122 // the call to closeUserRegions. 1123 closeUserRegions(this.abortRequested.get()); 1124 } 1125 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 1126 } 1127 } 1128 long now = EnvironmentEdgeManager.currentTime(); 1129 if ((now - lastMsg) >= msgInterval) { 1130 tryRegionServerReport(lastMsg, now); 1131 lastMsg = EnvironmentEdgeManager.currentTime(); 1132 } 1133 if (!isStopped() && !isAborted()) { 1134 this.sleeper.sleep(); 1135 } 1136 } // for 1137 } catch (Throwable t) { 1138 if (!rpcServices.checkOOME(t)) { 1139 String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: "; 1140 abort(prefix + t.getMessage(), t); 1141 } 1142 } 1143 1144 final Span span = TraceUtil.createSpan("HRegionServer exiting main loop"); 1145 try (Scope ignored = span.makeCurrent()) { 1146 if (this.leaseManager != null) { 1147 this.leaseManager.closeAfterLeasesExpire(); 1148 } 1149 if (this.splitLogWorker != null) { 1150 splitLogWorker.stop(); 1151 } 1152 if (this.infoServer != null) { 1153 LOG.info("Stopping infoServer"); 1154 try { 1155 this.infoServer.stop(); 1156 } catch (Exception e) { 1157 LOG.error("Failed to stop infoServer", e); 1158 } 1159 } 1160 // Send cache a shutdown. 1161 if (blockCache != null) { 1162 blockCache.shutdown(); 1163 } 1164 if (mobFileCache != null) { 1165 mobFileCache.shutdown(); 1166 } 1167 1168 // Send interrupts to wake up threads if sleeping so they notice shutdown. 1169 // TODO: Should we check they are alive? If OOME could have exited already 1170 if (this.hMemManager != null) { 1171 this.hMemManager.stop(); 1172 } 1173 if (this.cacheFlusher != null) { 1174 this.cacheFlusher.interruptIfNecessary(); 1175 } 1176 if (this.compactSplitThread != null) { 1177 this.compactSplitThread.interruptIfNecessary(); 1178 } 1179 1180 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 1181 if (rspmHost != null) { 1182 rspmHost.stop(this.abortRequested.get() || this.killed); 1183 } 1184 1185 if (this.killed) { 1186 // Just skip out w/o closing regions. Used when testing. 1187 } else if (abortRequested.get()) { 1188 if (this.dataFsOk) { 1189 closeUserRegions(abortRequested.get()); // Don't leave any open file handles 1190 } 1191 LOG.info("aborting server " + this.serverName); 1192 } else { 1193 closeUserRegions(abortRequested.get()); 1194 LOG.info("stopping server " + this.serverName); 1195 } 1196 1197 if (this.clusterConnection != null && !clusterConnection.isClosed()) { 1198 try { 1199 this.clusterConnection.close(); 1200 } catch (IOException e) { 1201 // Although the {@link Closeable} interface throws an {@link 1202 // IOException}, in reality, the implementation would never do that. 1203 LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e); 1204 } 1205 } 1206 1207 // Closing the compactSplit thread before closing meta regions 1208 if (!this.killed && containsMetaTableRegions()) { 1209 if (!abortRequested.get() || this.dataFsOk) { 1210 if (this.compactSplitThread != null) { 1211 this.compactSplitThread.join(); 1212 this.compactSplitThread = null; 1213 } 1214 closeMetaTableRegions(abortRequested.get()); 1215 } 1216 } 1217 1218 if (!this.killed && this.dataFsOk) { 1219 waitOnAllRegionsToClose(abortRequested.get()); 1220 LOG.info("stopping server " + this.serverName + "; all regions closed."); 1221 } 1222 1223 // Stop the quota manager 1224 if (rsQuotaManager != null) { 1225 rsQuotaManager.stop(); 1226 } 1227 if (rsSpaceQuotaManager != null) { 1228 rsSpaceQuotaManager.stop(); 1229 rsSpaceQuotaManager = null; 1230 } 1231 1232 // flag may be changed when closing regions throws exception. 1233 if (this.dataFsOk) { 1234 shutdownWAL(!abortRequested.get()); 1235 } 1236 1237 // Make sure the proxy is down. 1238 if (this.rssStub != null) { 1239 this.rssStub = null; 1240 } 1241 if (this.lockStub != null) { 1242 this.lockStub = null; 1243 } 1244 if (this.rpcClient != null) { 1245 this.rpcClient.close(); 1246 } 1247 if (this.leaseManager != null) { 1248 this.leaseManager.close(); 1249 } 1250 if (this.pauseMonitor != null) { 1251 this.pauseMonitor.stop(); 1252 } 1253 1254 if (!killed) { 1255 stopServiceThreads(); 1256 } 1257 1258 if (this.rpcServices != null) { 1259 this.rpcServices.stop(); 1260 } 1261 1262 try { 1263 deleteMyEphemeralNode(); 1264 } catch (KeeperException.NoNodeException nn) { 1265 // pass 1266 } catch (KeeperException e) { 1267 LOG.warn("Failed deleting my ephemeral node", e); 1268 } 1269 // We may have failed to delete the znode at the previous step, but 1270 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1271 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1272 1273 if (this.zooKeeper != null) { 1274 this.zooKeeper.close(); 1275 } 1276 this.shutDown = true; 1277 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1278 span.setStatus(StatusCode.OK); 1279 } finally { 1280 span.end(); 1281 } 1282 } 1283 1284 private boolean containsMetaTableRegions() { 1285 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1286 } 1287 1288 private boolean areAllUserRegionsOffline() { 1289 if (getNumberOfOnlineRegions() > 2) { 1290 return false; 1291 } 1292 boolean allUserRegionsOffline = true; 1293 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1294 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1295 allUserRegionsOffline = false; 1296 break; 1297 } 1298 } 1299 return allUserRegionsOffline; 1300 } 1301 1302 /** Returns Current write count for all online regions. */ 1303 private long getWriteRequestCount() { 1304 long writeCount = 0; 1305 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1306 writeCount += e.getValue().getWriteRequestsCount(); 1307 } 1308 return writeCount; 1309 } 1310 1311 @InterfaceAudience.Private 1312 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1313 throws IOException { 1314 RegionServerStatusService.BlockingInterface rss = rssStub; 1315 if (rss == null) { 1316 // the current server could be stopping. 1317 return; 1318 } 1319 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1320 final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport"); 1321 try (Scope ignored = span.makeCurrent()) { 1322 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1323 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1324 request.setLoad(sl); 1325 rss.regionServerReport(null, request.build()); 1326 span.setStatus(StatusCode.OK); 1327 } catch (ServiceException se) { 1328 IOException ioe = ProtobufUtil.getRemoteException(se); 1329 if (ioe instanceof YouAreDeadException) { 1330 // This will be caught and handled as a fatal error in run() 1331 TraceUtil.setError(span, ioe); 1332 throw ioe; 1333 } 1334 if (rssStub == rss) { 1335 rssStub = null; 1336 } 1337 TraceUtil.setError(span, se); 1338 // Couldn't connect to the master, get location from zk and reconnect 1339 // Method blocks until new master is found or we are stopped 1340 createRegionServerStatusStub(true); 1341 } finally { 1342 span.end(); 1343 } 1344 } 1345 1346 /** 1347 * Reports the given map of Regions and their size on the filesystem to the active Master. 1348 * @param regionSizeStore The store containing region sizes 1349 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1350 */ 1351 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1352 RegionServerStatusService.BlockingInterface rss = rssStub; 1353 if (rss == null) { 1354 // the current server could be stopping. 1355 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1356 return true; 1357 } 1358 try { 1359 buildReportAndSend(rss, regionSizeStore); 1360 } catch (ServiceException se) { 1361 IOException ioe = ProtobufUtil.getRemoteException(se); 1362 if (ioe instanceof PleaseHoldException) { 1363 LOG.trace("Failed to report region sizes to Master because it is initializing." 1364 + " This will be retried.", ioe); 1365 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1366 return true; 1367 } 1368 if (rssStub == rss) { 1369 rssStub = null; 1370 } 1371 createRegionServerStatusStub(true); 1372 if (ioe instanceof DoNotRetryIOException) { 1373 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1374 if (doNotRetryEx.getCause() != null) { 1375 Throwable t = doNotRetryEx.getCause(); 1376 if (t instanceof UnsupportedOperationException) { 1377 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1378 return false; 1379 } 1380 } 1381 } 1382 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1383 } 1384 return true; 1385 } 1386 1387 /** 1388 * Builds the region size report and sends it to the master. Upon successful sending of the 1389 * report, the region sizes that were sent are marked as sent. 1390 * @param rss The stub to send to the Master 1391 * @param regionSizeStore The store containing region sizes 1392 */ 1393 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1394 RegionSizeStore regionSizeStore) throws ServiceException { 1395 RegionSpaceUseReportRequest request = 1396 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1397 rss.reportRegionSpaceUse(null, request); 1398 // Record the number of size reports sent 1399 if (metricsRegionServer != null) { 1400 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1401 } 1402 } 1403 1404 /** 1405 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1406 * @param regionSizes The size in bytes of regions 1407 * @return The corresponding protocol buffer message. 1408 */ 1409 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1410 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1411 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1412 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1413 } 1414 return request.build(); 1415 } 1416 1417 /** 1418 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf 1419 * message. 1420 * @param regionInfo The RegionInfo 1421 * @param sizeInBytes The size in bytes of the Region 1422 * @return The protocol buffer 1423 */ 1424 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1425 return RegionSpaceUse.newBuilder() 1426 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1427 .setRegionSize(Objects.requireNonNull(sizeInBytes)).build(); 1428 } 1429 1430 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1431 throws IOException { 1432 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1433 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1434 // the wrapper to compute those numbers in one place. 1435 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1436 // Instead they should be stored in an HBase table so that external visibility into HBase is 1437 // improved; Additionally the load balancer will be able to take advantage of a more complete 1438 // history. 1439 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1440 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1441 long usedMemory = -1L; 1442 long maxMemory = -1L; 1443 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1444 if (usage != null) { 1445 usedMemory = usage.getUsed(); 1446 maxMemory = usage.getMax(); 1447 } 1448 1449 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1450 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1451 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1452 serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024)); 1453 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1454 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1455 Builder coprocessorBuilder = Coprocessor.newBuilder(); 1456 for (String coprocessor : coprocessors) { 1457 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1458 } 1459 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1460 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1461 for (HRegion region : regions) { 1462 if (region.getCoprocessorHost() != null) { 1463 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1464 for (String regionCoprocessor : regionCoprocessors) { 1465 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1466 } 1467 } 1468 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1469 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1470 .getCoprocessors()) { 1471 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1472 } 1473 } 1474 serverLoad.setReportStartTime(reportStartTime); 1475 serverLoad.setReportEndTime(reportEndTime); 1476 if (this.infoServer != null) { 1477 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1478 } else { 1479 serverLoad.setInfoServerPort(-1); 1480 } 1481 MetricsUserAggregateSource userSource = 1482 metricsRegionServer.getMetricsUserAggregate().getSource(); 1483 if (userSource != null) { 1484 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1485 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1486 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1487 } 1488 } 1489 1490 // for the replicationLoad purpose. Only need to get from one executorService 1491 // either source or sink will get the same info 1492 ReplicationSourceService rsources = getReplicationSourceService(); 1493 if (rsources != null) { 1494 // always refresh first to get the latest value 1495 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); 1496 if (rLoad != null) { 1497 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1498 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1499 .getReplicationLoadSourceEntries()) { 1500 serverLoad.addReplLoadSource(rLS); 1501 } 1502 1503 } 1504 } 1505 1506 TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask 1507 .newBuilder().setDescription(task.getDescription()) 1508 .setStatus(task.getStatus() != null ? task.getStatus() : "") 1509 .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) 1510 .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build())); 1511 1512 return serverLoad.build(); 1513 } 1514 1515 private String getOnlineRegionsAsPrintableString() { 1516 StringBuilder sb = new StringBuilder(); 1517 for (Region r : this.onlineRegions.values()) { 1518 if (sb.length() > 0) { 1519 sb.append(", "); 1520 } 1521 sb.append(r.getRegionInfo().getEncodedName()); 1522 } 1523 return sb.toString(); 1524 } 1525 1526 /** 1527 * Wait on regions close. 1528 */ 1529 private void waitOnAllRegionsToClose(final boolean abort) { 1530 // Wait till all regions are closed before going out. 1531 int lastCount = -1; 1532 long previousLogTime = 0; 1533 Set<String> closedRegions = new HashSet<>(); 1534 boolean interrupted = false; 1535 try { 1536 while (!onlineRegions.isEmpty()) { 1537 int count = getNumberOfOnlineRegions(); 1538 // Only print a message if the count of regions has changed. 1539 if (count != lastCount) { 1540 // Log every second at most 1541 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 1542 previousLogTime = EnvironmentEdgeManager.currentTime(); 1543 lastCount = count; 1544 LOG.info("Waiting on " + count + " regions to close"); 1545 // Only print out regions still closing if a small number else will 1546 // swamp the log. 1547 if (count < 10 && LOG.isDebugEnabled()) { 1548 LOG.debug("Online Regions=" + this.onlineRegions); 1549 } 1550 } 1551 } 1552 // Ensure all user regions have been sent a close. Use this to 1553 // protect against the case where an open comes in after we start the 1554 // iterator of onlineRegions to close all user regions. 1555 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1556 RegionInfo hri = e.getValue().getRegionInfo(); 1557 if ( 1558 !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) 1559 && !closedRegions.contains(hri.getEncodedName()) 1560 ) { 1561 closedRegions.add(hri.getEncodedName()); 1562 // Don't update zk with this close transition; pass false. 1563 closeRegionIgnoreErrors(hri, abort); 1564 } 1565 } 1566 // No regions in RIT, we could stop waiting now. 1567 if (this.regionsInTransitionInRS.isEmpty()) { 1568 if (!onlineRegions.isEmpty()) { 1569 LOG.info("We were exiting though online regions are not empty," 1570 + " because some regions failed closing"); 1571 } 1572 break; 1573 } else { 1574 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream() 1575 .map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1576 } 1577 if (sleepInterrupted(200)) { 1578 interrupted = true; 1579 } 1580 } 1581 } finally { 1582 if (interrupted) { 1583 Thread.currentThread().interrupt(); 1584 } 1585 } 1586 } 1587 1588 private static boolean sleepInterrupted(long millis) { 1589 boolean interrupted = false; 1590 try { 1591 Thread.sleep(millis); 1592 } catch (InterruptedException e) { 1593 LOG.warn("Interrupted while sleeping"); 1594 interrupted = true; 1595 } 1596 return interrupted; 1597 } 1598 1599 private void shutdownWAL(final boolean close) { 1600 if (this.walFactory != null) { 1601 try { 1602 if (close) { 1603 walFactory.close(); 1604 } else { 1605 walFactory.shutdown(); 1606 } 1607 } catch (Throwable e) { 1608 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1609 LOG.error("Shutdown / close of WAL failed: " + e); 1610 LOG.debug("Shutdown / close exception details:", e); 1611 } 1612 } 1613 } 1614 1615 /** 1616 * get NamedQueue Provider to add different logs to ringbuffer n 1617 */ 1618 public NamedQueueRecorder getNamedQueueRecorder() { 1619 return this.namedQueueRecorder; 1620 } 1621 1622 /* 1623 * Run init. Sets up wal and starts up all server threads. 1624 * @param c Extra configuration. 1625 */ 1626 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1627 throws IOException { 1628 try { 1629 boolean updateRootDir = false; 1630 for (NameStringPair e : c.getMapEntriesList()) { 1631 String key = e.getName(); 1632 // The hostname the master sees us as. 1633 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1634 String hostnameFromMasterPOV = e.getValue(); 1635 this.serverName = 1636 ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(), this.startcode); 1637 if ( 1638 !StringUtils.isBlank(useThisHostnameInstead) 1639 && !hostnameFromMasterPOV.equals(useThisHostnameInstead) 1640 ) { 1641 String msg = "Master passed us a different hostname to use; was=" 1642 + this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV; 1643 LOG.error(msg); 1644 throw new IOException(msg); 1645 } 1646 if ( 1647 StringUtils.isBlank(useThisHostnameInstead) 1648 && !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName()) 1649 ) { 1650 String msg = "Master passed us a different hostname to use; was=" 1651 + rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV; 1652 LOG.error(msg); 1653 } 1654 continue; 1655 } 1656 1657 String value = e.getValue(); 1658 if (key.equals(HConstants.HBASE_DIR)) { 1659 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1660 updateRootDir = true; 1661 } 1662 } 1663 1664 if (LOG.isDebugEnabled()) { 1665 LOG.debug("Config from master: " + key + "=" + value); 1666 } 1667 this.conf.set(key, value); 1668 } 1669 // Set our ephemeral znode up in zookeeper now we have a name. 1670 createMyEphemeralNode(); 1671 1672 if (updateRootDir) { 1673 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1674 initializeFileSystem(); 1675 } 1676 1677 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1678 // config param for task trackers, but we can piggyback off of it. 1679 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1680 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1681 } 1682 1683 // Save it in a file, this will allow to see if we crash 1684 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1685 1686 // This call sets up an initialized replication and WAL. Later we start it up. 1687 setupWALAndReplication(); 1688 // Init in here rather than in constructor after thread name has been set 1689 final MetricsTable metricsTable = 1690 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1691 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1692 this.metricsRegionServer = 1693 new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable); 1694 // Now that we have a metrics source, start the pause monitor 1695 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1696 pauseMonitor.start(); 1697 1698 // There is a rare case where we do NOT want services to start. Check config. 1699 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1700 startServices(); 1701 } 1702 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1703 // or make sense of it. 1704 startReplicationService(); 1705 1706 // Set up ZK 1707 LOG.info( 1708 "Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + ", sessionid=0x" 1709 + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1710 1711 // Wake up anyone waiting for this server to online 1712 synchronized (online) { 1713 online.set(true); 1714 online.notifyAll(); 1715 } 1716 } catch (Throwable e) { 1717 stop("Failed initialization"); 1718 throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed"); 1719 } finally { 1720 sleeper.skipSleepCycle(); 1721 } 1722 } 1723 1724 protected void initializeMemStoreChunkCreator() { 1725 if (MemStoreLAB.isEnabled(conf)) { 1726 // MSLAB is enabled. So initialize MemStoreChunkPool 1727 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from 1728 // it. 1729 Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf); 1730 long globalMemStoreSize = pair.getFirst(); 1731 boolean offheap = this.regionServerAccounting.isOffheap(); 1732 // When off heap memstore in use, take full area for chunk pool. 1733 float poolSizePercentage = offheap 1734 ? 1.0F 1735 : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); 1736 float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, 1737 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); 1738 int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); 1739 float indexChunkSizePercent = conf.getFloat(MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_KEY, 1740 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1741 // init the chunkCreator 1742 ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, 1743 initialCountPercentage, this.hMemManager, indexChunkSizePercent); 1744 } 1745 } 1746 1747 private void startHeapMemoryManager() { 1748 if (this.blockCache != null) { 1749 this.hMemManager = 1750 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1751 this.hMemManager.start(getChoreService()); 1752 } 1753 } 1754 1755 private void createMyEphemeralNode() throws KeeperException { 1756 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1757 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1758 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1759 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1760 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1761 } 1762 1763 private void deleteMyEphemeralNode() throws KeeperException { 1764 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1765 } 1766 1767 @Override 1768 public RegionServerAccounting getRegionServerAccounting() { 1769 return regionServerAccounting; 1770 } 1771 1772 // Round the size with KB or MB. 1773 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1 1774 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See 1775 // HBASE-26340 for more details on why this is important. 1776 private static int roundSize(long sizeInByte, int sizeUnit) { 1777 if (sizeInByte == 0) { 1778 return 0; 1779 } else if (sizeInByte < sizeUnit) { 1780 return 1; 1781 } else { 1782 return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE); 1783 } 1784 } 1785 1786 /** 1787 * @param r Region to get RegionLoad for. 1788 * @param regionLoadBldr the RegionLoad.Builder, can be null 1789 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1790 * @return RegionLoad instance. 1791 */ 1792 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1793 RegionSpecifier.Builder regionSpecifier) throws IOException { 1794 byte[] name = r.getRegionInfo().getRegionName(); 1795 int stores = 0; 1796 int storefiles = 0; 1797 int storeRefCount = 0; 1798 int maxCompactedStoreFileRefCount = 0; 1799 long storeUncompressedSize = 0L; 1800 long storefileSize = 0L; 1801 long storefileIndexSize = 0L; 1802 long rootLevelIndexSize = 0L; 1803 long totalStaticIndexSize = 0L; 1804 long totalStaticBloomSize = 0L; 1805 long totalCompactingKVs = 0L; 1806 long currentCompactedKVs = 0L; 1807 List<HStore> storeList = r.getStores(); 1808 stores += storeList.size(); 1809 for (HStore store : storeList) { 1810 storefiles += store.getStorefilesCount(); 1811 int currentStoreRefCount = store.getStoreRefCount(); 1812 storeRefCount += currentStoreRefCount; 1813 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1814 maxCompactedStoreFileRefCount = 1815 Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); 1816 storeUncompressedSize += store.getStoreSizeUncompressed(); 1817 storefileSize += store.getStorefilesSize(); 1818 // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1819 storefileIndexSize += store.getStorefilesRootLevelIndexSize(); 1820 CompactionProgress progress = store.getCompactionProgress(); 1821 if (progress != null) { 1822 totalCompactingKVs += progress.getTotalCompactingKVs(); 1823 currentCompactedKVs += progress.currentCompactedKVs; 1824 } 1825 rootLevelIndexSize += store.getStorefilesRootLevelIndexSize(); 1826 totalStaticIndexSize += store.getTotalStaticIndexSize(); 1827 totalStaticBloomSize += store.getTotalStaticBloomSize(); 1828 } 1829 1830 int unitMB = 1024 * 1024; 1831 int unitKB = 1024; 1832 1833 int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); 1834 int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); 1835 int storefileSizeMB = roundSize(storefileSize, unitMB); 1836 int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB); 1837 int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); 1838 int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); 1839 int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); 1840 1841 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); 1842 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); 1843 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname()); 1844 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); 1845 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); 1846 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); 1847 if (regionLoadBldr == null) { 1848 regionLoadBldr = RegionLoad.newBuilder(); 1849 } 1850 if (regionSpecifier == null) { 1851 regionSpecifier = RegionSpecifier.newBuilder(); 1852 } 1853 1854 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1855 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1856 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores) 1857 .setStorefiles(storefiles).setStoreRefCount(storeRefCount) 1858 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1859 .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB) 1860 .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB) 1861 .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1862 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1863 .setReadRequestsCount(r.getReadRequestsCount()) 1864 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1865 .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs) 1866 .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality) 1867 .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) 1868 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) 1869 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) 1870 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); 1871 r.setCompleteSequenceId(regionLoadBldr); 1872 return regionLoadBldr.build(); 1873 } 1874 1875 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1876 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1877 userLoadBldr.setUserName(user); 1878 userSource.getClientMetrics().values().stream() 1879 .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1880 .setHostName(clientMetrics.getHostName()) 1881 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1882 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1883 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) 1884 .forEach(userLoadBldr::addClientMetrics); 1885 return userLoadBldr.build(); 1886 } 1887 1888 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1889 HRegion r = onlineRegions.get(encodedRegionName); 1890 return r != null ? createRegionLoad(r, null, null) : null; 1891 } 1892 1893 /** 1894 * Inner class that runs on a long period checking if regions need compaction. 1895 */ 1896 private static class CompactionChecker extends ScheduledChore { 1897 private final HRegionServer instance; 1898 private final int majorCompactPriority; 1899 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1900 // Iteration is 1-based rather than 0-based so we don't check for compaction 1901 // immediately upon region server startup 1902 private long iteration = 1; 1903 1904 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1905 super("CompactionChecker", stopper, sleepTime); 1906 this.instance = h; 1907 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1908 1909 /* 1910 * MajorCompactPriority is configurable. If not set, the compaction will use default priority. 1911 */ 1912 this.majorCompactPriority = this.instance.conf 1913 .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); 1914 } 1915 1916 @Override 1917 protected void chore() { 1918 for (Region r : this.instance.onlineRegions.values()) { 1919 if (r == null) { 1920 continue; 1921 } 1922 HRegion hr = (HRegion) r; 1923 for (HStore s : hr.stores.values()) { 1924 try { 1925 long multiplier = s.getCompactionCheckMultiplier(); 1926 assert multiplier > 0; 1927 if (iteration % multiplier != 0) { 1928 continue; 1929 } 1930 if (s.needsCompaction()) { 1931 // Queue a compaction. Will recognize if major is needed. 1932 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1933 getName() + " requests compaction"); 1934 } else if (s.shouldPerformMajorCompaction()) { 1935 s.triggerMajorCompaction(); 1936 if ( 1937 majorCompactPriority == DEFAULT_PRIORITY 1938 || majorCompactPriority > hr.getCompactPriority() 1939 ) { 1940 this.instance.compactSplitThread.requestCompaction(hr, s, 1941 getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, 1942 CompactionLifeCycleTracker.DUMMY, null); 1943 } else { 1944 this.instance.compactSplitThread.requestCompaction(hr, s, 1945 getName() + " requests major compaction; use configured priority", 1946 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1947 } 1948 } 1949 } catch (IOException e) { 1950 LOG.warn("Failed major compaction check on " + r, e); 1951 } 1952 } 1953 } 1954 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1955 } 1956 } 1957 1958 private static class PeriodicMemStoreFlusher extends ScheduledChore { 1959 private final HRegionServer server; 1960 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1961 private final static int MIN_DELAY_TIME = 0; // millisec 1962 private final long rangeOfDelayMs; 1963 1964 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1965 super("MemstoreFlusherChore", server, cacheFlushInterval); 1966 this.server = server; 1967 1968 final long configuredRangeOfDelay = server.getConfiguration() 1969 .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 1970 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 1971 } 1972 1973 @Override 1974 protected void chore() { 1975 final StringBuilder whyFlush = new StringBuilder(); 1976 for (HRegion r : this.server.onlineRegions.values()) { 1977 if (r == null) { 1978 continue; 1979 } 1980 if (r.shouldFlush(whyFlush)) { 1981 FlushRequester requester = server.getFlushRequester(); 1982 if (requester != null) { 1983 long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME; 1984 // Throttle the flushes by putting a delay. If we don't throttle, and there 1985 // is a balanced write-load on the regions in a table, we might end up 1986 // overwhelming the filesystem with too many flushes at once. 1987 if (requester.requestDelayedFlush(r, delay)) { 1988 LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(), 1989 r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay); 1990 } 1991 } 1992 } 1993 } 1994 } 1995 } 1996 1997 /** 1998 * Report the status of the server. A server is online once all the startup is completed (setting 1999 * up filesystem, starting executorService threads, etc.). This method is designed mostly to be 2000 * useful in tests. 2001 * @return true if online, false if not. 2002 */ 2003 public boolean isOnline() { 2004 return online.get(); 2005 } 2006 2007 /** 2008 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 2009 * be hooked up to WAL. 2010 */ 2011 private void setupWALAndReplication() throws IOException { 2012 WALFactory factory = new WALFactory(conf, serverName.toString(), (Server) this); 2013 // TODO Replication make assumptions here based on the default filesystem impl 2014 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 2015 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 2016 2017 Path logDir = new Path(walRootDir, logName); 2018 LOG.debug("logDir={}", logDir); 2019 if (this.walFs.exists(logDir)) { 2020 throw new RegionServerRunningException( 2021 "Region server has already created directory at " + this.serverName.toString()); 2022 } 2023 // Always create wal directory as now we need this when master restarts to find out the live 2024 // region servers. 2025 if (!this.walFs.mkdirs(logDir)) { 2026 throw new IOException("Can not create wal directory " + logDir); 2027 } 2028 // Instantiate replication if replication enabled. Pass it the log directories. 2029 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); 2030 this.walFactory = factory; 2031 } 2032 2033 /** 2034 * Start up replication source and sink handlers. 2035 */ 2036 private void startReplicationService() throws IOException { 2037 if ( 2038 this.replicationSourceHandler == this.replicationSinkHandler 2039 && this.replicationSourceHandler != null 2040 ) { 2041 this.replicationSourceHandler.startReplicationService(); 2042 } else { 2043 if (this.replicationSourceHandler != null) { 2044 this.replicationSourceHandler.startReplicationService(); 2045 } 2046 if (this.replicationSinkHandler != null) { 2047 this.replicationSinkHandler.startReplicationService(); 2048 } 2049 } 2050 } 2051 2052 /** Returns Master address tracker instance. */ 2053 public MasterAddressTracker getMasterAddressTracker() { 2054 return this.masterAddressTracker; 2055 } 2056 2057 /** 2058 * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need 2059 * to run. This is called after we've successfully registered with the Master. Install an 2060 * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We 2061 * cannot set the handler on all threads. Server's internal Listener thread is off limits. For 2062 * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries 2063 * to run should trigger same critical condition and the shutdown will run. On its way out, this 2064 * server will shut down Server. Leases are sort of inbetween. It has an internal thread that 2065 * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped 2066 * by this hosting server. Worker logs the exception and exits. 2067 */ 2068 private void startServices() throws IOException { 2069 if (!isStopped() && !isAborted()) { 2070 initializeThreads(); 2071 } 2072 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); 2073 this.secureBulkLoadManager.start(); 2074 2075 // Health checker thread. 2076 if (isHealthCheckerConfigured()) { 2077 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 2078 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 2079 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 2080 } 2081 2082 this.walRoller = new LogRoller(this); 2083 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 2084 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 2085 2086 // Create the CompactedFileDischarger chore executorService. This chore helps to 2087 // remove the compacted files that will no longer be used in reads. 2088 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 2089 // 2 mins so that compacted files can be archived before the TTLCleaner runs 2090 int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 2091 this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this); 2092 choreService.scheduleChore(compactedFileDischarger); 2093 2094 // Start executor services 2095 final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3); 2096 executorService.startExecutorService(executorService.new ExecutorConfig() 2097 .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads)); 2098 final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1); 2099 executorService.startExecutorService(executorService.new ExecutorConfig() 2100 .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads)); 2101 final int openPriorityRegionThreads = 2102 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3); 2103 executorService.startExecutorService( 2104 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION) 2105 .setCorePoolSize(openPriorityRegionThreads)); 2106 final int closeRegionThreads = 2107 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3); 2108 executorService.startExecutorService(executorService.new ExecutorConfig() 2109 .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads)); 2110 final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1); 2111 executorService.startExecutorService(executorService.new ExecutorConfig() 2112 .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads)); 2113 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 2114 final int storeScannerParallelSeekThreads = 2115 conf.getInt("hbase.storescanner.parallel.seek.threads", 10); 2116 executorService.startExecutorService( 2117 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK) 2118 .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true)); 2119 } 2120 final int logReplayOpsThreads = 2121 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 2122 executorService.startExecutorService( 2123 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS) 2124 .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true)); 2125 // Start the threads for compacted files discharger 2126 final int compactionDischargerThreads = 2127 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10); 2128 executorService.startExecutorService(executorService.new ExecutorConfig() 2129 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER) 2130 .setCorePoolSize(compactionDischargerThreads)); 2131 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 2132 final int regionReplicaFlushThreads = 2133 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 2134 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 2135 executorService.startExecutorService(executorService.new ExecutorConfig() 2136 .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS) 2137 .setCorePoolSize(regionReplicaFlushThreads)); 2138 } 2139 final int refreshPeerThreads = 2140 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2); 2141 executorService.startExecutorService(executorService.new ExecutorConfig() 2142 .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads)); 2143 2144 final int switchRpcThrottleThreads = 2145 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); 2146 executorService.startExecutorService( 2147 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE) 2148 .setCorePoolSize(switchRpcThrottleThreads)); 2149 final int claimReplicationQueueThreads = 2150 conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); 2151 executorService.startExecutorService( 2152 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE) 2153 .setCorePoolSize(claimReplicationQueueThreads)); 2154 2155 Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", 2156 uncaughtExceptionHandler); 2157 if (this.cacheFlusher != null) { 2158 this.cacheFlusher.start(uncaughtExceptionHandler); 2159 } 2160 Threads.setDaemonThreadRunning(this.procedureResultReporter, 2161 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 2162 2163 if (this.compactionChecker != null) { 2164 choreService.scheduleChore(compactionChecker); 2165 } 2166 if (this.periodicFlusher != null) { 2167 choreService.scheduleChore(periodicFlusher); 2168 } 2169 if (this.healthCheckChore != null) { 2170 choreService.scheduleChore(healthCheckChore); 2171 } 2172 if (this.nonceManagerChore != null) { 2173 choreService.scheduleChore(nonceManagerChore); 2174 } 2175 if (this.storefileRefresher != null) { 2176 choreService.scheduleChore(storefileRefresher); 2177 } 2178 if (this.fsUtilizationChore != null) { 2179 choreService.scheduleChore(fsUtilizationChore); 2180 } 2181 if (this.slowLogTableOpsChore != null) { 2182 choreService.scheduleChore(slowLogTableOpsChore); 2183 } 2184 if (this.brokenStoreFileCleaner != null) { 2185 choreService.scheduleChore(brokenStoreFileCleaner); 2186 } 2187 2188 if (this.rsMobFileCleanerChore != null) { 2189 choreService.scheduleChore(rsMobFileCleanerChore); 2190 } 2191 2192 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 2193 // an unhandled exception, it will just exit. 2194 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 2195 uncaughtExceptionHandler); 2196 2197 // Create the log splitting worker and start it 2198 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 2199 // quite a while inside Connection layer. The worker won't be available for other 2200 // tasks even after current task is preempted after a split task times out. 2201 Configuration sinkConf = HBaseConfiguration.create(conf); 2202 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2203 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 2204 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2205 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 2206 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 2207 if ( 2208 this.csm != null 2209 && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 2210 ) { 2211 // SplitLogWorker needs csm. If none, don't start this. 2212 this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); 2213 splitLogWorker.start(); 2214 LOG.debug("SplitLogWorker started"); 2215 } 2216 2217 // Memstore services. 2218 startHeapMemoryManager(); 2219 // Call it after starting HeapMemoryManager. 2220 initializeMemStoreChunkCreator(); 2221 } 2222 2223 private void initializeThreads() { 2224 // Cache flushing thread. 2225 this.cacheFlusher = new MemStoreFlusher(conf, this); 2226 2227 // Compaction thread 2228 this.compactSplitThread = new CompactSplit(this); 2229 2230 // Background thread to check for compactions; needed if region has not gotten updates 2231 // in a while. It will take care of not checking too frequently on store-by-store basis. 2232 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 2233 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 2234 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 2235 2236 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 2237 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 2238 if (isSlowLogTableEnabled) { 2239 // default chore duration: 10 min 2240 final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); 2241 slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder); 2242 } 2243 2244 if (this.nonceManager != null) { 2245 // Create the scheduled chore that cleans up nonces. 2246 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2247 } 2248 2249 // Setup the Quota Manager 2250 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2251 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2252 2253 if (QuotaUtil.isQuotaEnabled(conf)) { 2254 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2255 } 2256 2257 boolean onlyMetaRefresh = false; 2258 int storefileRefreshPeriod = 2259 conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2260 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2261 if (storefileRefreshPeriod == 0) { 2262 storefileRefreshPeriod = 2263 conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2264 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2265 onlyMetaRefresh = true; 2266 } 2267 if (storefileRefreshPeriod > 0) { 2268 this.storefileRefresher = 2269 new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); 2270 } 2271 2272 int brokenStoreFileCleanerPeriod = 2273 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, 2274 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); 2275 int brokenStoreFileCleanerDelay = 2276 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, 2277 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); 2278 double brokenStoreFileCleanerDelayJitter = 2279 conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, 2280 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); 2281 double jitterRate = 2282 (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; 2283 long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); 2284 this.brokenStoreFileCleaner = 2285 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), 2286 brokenStoreFileCleanerPeriod, this, conf, this); 2287 2288 this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); 2289 2290 registerConfigurationObservers(); 2291 } 2292 2293 private void registerConfigurationObservers() { 2294 // Registering the compactSplitThread object with the ConfigurationManager. 2295 configurationManager.registerObserver(this.compactSplitThread); 2296 configurationManager.registerObserver(this.rpcServices); 2297 configurationManager.registerObserver(this); 2298 } 2299 2300 /** 2301 * Puts up the webui. 2302 */ 2303 private void putUpWebUI() throws IOException { 2304 int port = 2305 this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT); 2306 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); 2307 2308 if (this instanceof HMaster) { 2309 port = conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT); 2310 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); 2311 } 2312 // -1 is for disabling info server 2313 if (port < 0) { 2314 return; 2315 } 2316 2317 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { 2318 String msg = "Failed to start http info server. Address " + addr 2319 + " does not belong to this host. Correct configuration parameter: " 2320 + "hbase.regionserver.info.bindAddress"; 2321 LOG.error(msg); 2322 throw new IOException(msg); 2323 } 2324 // check if auto port bind enabled 2325 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false); 2326 while (true) { 2327 try { 2328 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf); 2329 infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet()); 2330 configureInfoServer(); 2331 this.infoServer.start(); 2332 break; 2333 } catch (BindException e) { 2334 if (!auto) { 2335 // auto bind disabled throw BindException 2336 LOG.error("Failed binding http info server to port: " + port); 2337 throw e; 2338 } 2339 // auto bind enabled, try to use another port 2340 LOG.info("Failed binding http info server to port: " + port); 2341 port++; 2342 LOG.info("Retry starting http info server with port: " + port); 2343 } 2344 } 2345 port = this.infoServer.getPort(); 2346 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port); 2347 int masterInfoPort = 2348 conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT); 2349 conf.setInt("hbase.master.info.port.orig", masterInfoPort); 2350 conf.setInt(HConstants.MASTER_INFO_PORT, port); 2351 } 2352 2353 /* 2354 * Verify that server is healthy 2355 */ 2356 private boolean isHealthy() { 2357 if (!dataFsOk) { 2358 // File system problem 2359 return false; 2360 } 2361 // Verify that all threads are alive 2362 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2363 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2364 && (this.walRoller == null || this.walRoller.isAlive()) 2365 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2366 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2367 if (!healthy) { 2368 stop("One or more threads are no longer alive -- stop"); 2369 } 2370 return healthy; 2371 } 2372 2373 @Override 2374 public List<WAL> getWALs() { 2375 return walFactory.getWALs(); 2376 } 2377 2378 @Override 2379 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2380 try { 2381 WAL wal = walFactory.getWAL(regionInfo); 2382 if (this.walRoller != null) { 2383 this.walRoller.addWAL(wal); 2384 } 2385 return wal; 2386 } catch (FailedCloseWALAfterInitializedErrorException ex) { 2387 // see HBASE-21751 for details 2388 abort("WAL can not clean up after init failed", ex); 2389 throw ex; 2390 } 2391 } 2392 2393 public LogRoller getWalRoller() { 2394 return walRoller; 2395 } 2396 2397 WALFactory getWalFactory() { 2398 return walFactory; 2399 } 2400 2401 @Override 2402 public Connection getConnection() { 2403 return getClusterConnection(); 2404 } 2405 2406 @Override 2407 public ClusterConnection getClusterConnection() { 2408 return this.clusterConnection; 2409 } 2410 2411 @Override 2412 public void stop(final String msg) { 2413 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2414 } 2415 2416 /** 2417 * Stops the regionserver. 2418 * @param msg Status message 2419 * @param force True if this is a regionserver abort 2420 * @param user The user executing the stop request, or null if no user is associated 2421 */ 2422 public void stop(final String msg, final boolean force, final User user) { 2423 if (!this.stopped) { 2424 LOG.info("***** STOPPING region server '" + this + "' *****"); 2425 if (this.rsHost != null) { 2426 // when forced via abort don't allow CPs to override 2427 try { 2428 this.rsHost.preStop(msg, user); 2429 } catch (IOException ioe) { 2430 if (!force) { 2431 LOG.warn("The region server did not stop", ioe); 2432 return; 2433 } 2434 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2435 } 2436 } 2437 this.stopped = true; 2438 LOG.info("STOPPED: " + msg); 2439 // Wakes run() if it is sleeping 2440 sleeper.skipSleepCycle(); 2441 } 2442 } 2443 2444 public void waitForServerOnline() { 2445 while (!isStopped() && !isOnline()) { 2446 synchronized (online) { 2447 try { 2448 online.wait(msgInterval); 2449 } catch (InterruptedException ie) { 2450 Thread.currentThread().interrupt(); 2451 break; 2452 } 2453 } 2454 } 2455 } 2456 2457 @Override 2458 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2459 HRegion r = context.getRegion(); 2460 long openProcId = context.getOpenProcId(); 2461 long masterSystemTime = context.getMasterSystemTime(); 2462 rpcServices.checkOpen(); 2463 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2464 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2465 // Do checks to see if we need to compact (references or too many files) 2466 for (HStore s : r.stores.values()) { 2467 if (s.hasReferences() || s.needsCompaction()) { 2468 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2469 } 2470 } 2471 long openSeqNum = r.getOpenSeqNum(); 2472 if (openSeqNum == HConstants.NO_SEQNUM) { 2473 // If we opened a region, we should have read some sequence number from it. 2474 LOG.error( 2475 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2476 openSeqNum = 0; 2477 } 2478 2479 // Notify master 2480 if ( 2481 !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2482 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo())) 2483 ) { 2484 throw new IOException( 2485 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2486 } 2487 2488 triggerFlushInPrimaryRegion(r); 2489 2490 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2491 } 2492 2493 /** 2494 * Helper method for use in tests. Skip the region transition report when there's no master around 2495 * to receive it. 2496 */ 2497 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2498 final TransitionCode code = context.getCode(); 2499 final long openSeqNum = context.getOpenSeqNum(); 2500 long masterSystemTime = context.getMasterSystemTime(); 2501 final RegionInfo[] hris = context.getHris(); 2502 2503 if (code == TransitionCode.OPENED) { 2504 Preconditions.checkArgument(hris != null && hris.length == 1); 2505 if (hris[0].isMetaRegion()) { 2506 LOG.warn( 2507 "meta table location is stored in master local store, so we can not skip reporting"); 2508 return false; 2509 } else { 2510 try { 2511 MetaTableAccessor.updateRegionLocation(clusterConnection, hris[0], serverName, openSeqNum, 2512 masterSystemTime); 2513 } catch (IOException e) { 2514 LOG.info("Failed to update meta", e); 2515 return false; 2516 } 2517 } 2518 } 2519 return true; 2520 } 2521 2522 private ReportRegionStateTransitionRequest 2523 createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) { 2524 final TransitionCode code = context.getCode(); 2525 final long openSeqNum = context.getOpenSeqNum(); 2526 final RegionInfo[] hris = context.getHris(); 2527 final long[] procIds = context.getProcIds(); 2528 2529 ReportRegionStateTransitionRequest.Builder builder = 2530 ReportRegionStateTransitionRequest.newBuilder(); 2531 builder.setServer(ProtobufUtil.toServerName(serverName)); 2532 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2533 transition.setTransitionCode(code); 2534 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2535 transition.setOpenSeqNum(openSeqNum); 2536 } 2537 for (RegionInfo hri : hris) { 2538 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2539 } 2540 for (long procId : procIds) { 2541 transition.addProcId(procId); 2542 } 2543 2544 return builder.build(); 2545 } 2546 2547 @Override 2548 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2549 if (TEST_SKIP_REPORTING_TRANSITION) { 2550 return skipReportingTransition(context); 2551 } 2552 final ReportRegionStateTransitionRequest request = 2553 createReportRegionStateTransitionRequest(context); 2554 2555 int tries = 0; 2556 long pauseTime = this.retryPauseTime; 2557 // Keep looping till we get an error. We want to send reports even though server is going down. 2558 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2559 // HRegionServer does down. 2560 while (this.clusterConnection != null && !this.clusterConnection.isClosed()) { 2561 RegionServerStatusService.BlockingInterface rss = rssStub; 2562 try { 2563 if (rss == null) { 2564 createRegionServerStatusStub(); 2565 continue; 2566 } 2567 ReportRegionStateTransitionResponse response = 2568 rss.reportRegionStateTransition(null, request); 2569 if (response.hasErrorMessage()) { 2570 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2571 break; 2572 } 2573 // Log if we had to retry else don't log unless TRACE. We want to 2574 // know if were successful after an attempt showed in logs as failed. 2575 if (tries > 0 || LOG.isTraceEnabled()) { 2576 LOG.info("TRANSITION REPORTED " + request); 2577 } 2578 // NOTE: Return mid-method!!! 2579 return true; 2580 } catch (ServiceException se) { 2581 IOException ioe = ProtobufUtil.getRemoteException(se); 2582 boolean pause = ioe instanceof ServerNotRunningYetException 2583 || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException; 2584 if (pause) { 2585 // Do backoff else we flood the Master with requests. 2586 pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries); 2587 } else { 2588 pauseTime = this.retryPauseTime; // Reset. 2589 } 2590 LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" 2591 + tries + ")" 2592 + (pause 2593 ? " after " + pauseTime + "ms delay (Master is coming online...)." 2594 : " immediately."), 2595 ioe); 2596 if (pause) { 2597 Threads.sleep(pauseTime); 2598 } 2599 tries++; 2600 if (rssStub == rss) { 2601 rssStub = null; 2602 } 2603 } 2604 } 2605 return false; 2606 } 2607 2608 /** 2609 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2610 * block this thread. See RegionReplicaFlushHandler for details. 2611 */ 2612 private void triggerFlushInPrimaryRegion(final HRegion region) { 2613 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2614 return; 2615 } 2616 TableName tn = region.getTableDescriptor().getTableName(); 2617 if ( 2618 !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) 2619 || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) || 2620 // If the memstore replication not setup, we do not have to wait for observing a flush event 2621 // from primary before starting to serve reads, because gaps from replication is not 2622 // applicable,this logic is from 2623 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by 2624 // HBASE-13063 2625 !region.getTableDescriptor().hasRegionMemStoreReplication() 2626 ) { 2627 region.setReadsEnabled(true); 2628 return; 2629 } 2630 2631 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2632 // RegionReplicaFlushHandler might reset this. 2633 2634 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2635 if (this.executorService != null) { 2636 this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, 2637 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); 2638 } else { 2639 LOG.info("Executor is null; not running flush of primary region replica for {}", 2640 region.getRegionInfo()); 2641 } 2642 } 2643 2644 @Override 2645 public RpcServerInterface getRpcServer() { 2646 return rpcServices.rpcServer; 2647 } 2648 2649 @InterfaceAudience.Private 2650 public RSRpcServices getRSRpcServices() { 2651 return rpcServices; 2652 } 2653 2654 /** 2655 * Cause the server to exit without closing the regions it is serving, the log it is using and 2656 * without notifying the master. Used unit testing and on catastrophic events such as HDFS is 2657 * yanked out from under hbase or we OOME. n * the reason we are aborting n * the exception that 2658 * caused the abort, or null 2659 */ 2660 @Override 2661 public void abort(String reason, Throwable cause) { 2662 if (!setAbortRequested()) { 2663 // Abort already in progress, ignore the new request. 2664 LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason); 2665 return; 2666 } 2667 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2668 if (cause != null) { 2669 LOG.error(HBaseMarkers.FATAL, msg, cause); 2670 } else { 2671 LOG.error(HBaseMarkers.FATAL, msg); 2672 } 2673 // HBASE-4014: show list of coprocessors that were loaded to help debug 2674 // regionserver crashes.Note that we're implicitly using 2675 // java.util.HashSet's toString() method to print the coprocessor names. 2676 LOG.error(HBaseMarkers.FATAL, 2677 "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors()); 2678 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2679 try { 2680 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2681 } catch (MalformedObjectNameException | IOException e) { 2682 LOG.warn("Failed dumping metrics", e); 2683 } 2684 2685 // Do our best to report our abort to the master, but this may not work 2686 try { 2687 if (cause != null) { 2688 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2689 } 2690 // Report to the master but only if we have already registered with the master. 2691 RegionServerStatusService.BlockingInterface rss = rssStub; 2692 if (rss != null && this.serverName != null) { 2693 ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); 2694 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2695 builder.setErrorMessage(msg); 2696 rss.reportRSFatalError(null, builder.build()); 2697 } 2698 } catch (Throwable t) { 2699 LOG.warn("Unable to report fatal error to master", t); 2700 } 2701 2702 scheduleAbortTimer(); 2703 // shutdown should be run as the internal user 2704 stop(reason, true, null); 2705 } 2706 2707 /** 2708 * Sets the abort state if not already set. 2709 * @return True if abortRequested set to True successfully, false if an abort is already in 2710 * progress. 2711 */ 2712 protected boolean setAbortRequested() { 2713 return abortRequested.compareAndSet(false, true); 2714 } 2715 2716 @Override 2717 public boolean isAborted() { 2718 return abortRequested.get(); 2719 } 2720 2721 /* 2722 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does 2723 * close socket in case want to bring up server on old hostname+port immediately. 2724 */ 2725 @InterfaceAudience.Private 2726 protected void kill() { 2727 this.killed = true; 2728 abort("Simulated kill"); 2729 } 2730 2731 // Limits the time spent in the shutdown process. 2732 private void scheduleAbortTimer() { 2733 if (this.abortMonitor == null) { 2734 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2735 TimerTask abortTimeoutTask = null; 2736 try { 2737 Constructor<? extends TimerTask> timerTaskCtor = 2738 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2739 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2740 timerTaskCtor.setAccessible(true); 2741 abortTimeoutTask = timerTaskCtor.newInstance(); 2742 } catch (Exception e) { 2743 LOG.warn("Initialize abort timeout task failed", e); 2744 } 2745 if (abortTimeoutTask != null) { 2746 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2747 } 2748 } 2749 } 2750 2751 protected final void shutdownChore(ScheduledChore chore) { 2752 if (chore != null) { 2753 chore.shutdown(); 2754 } 2755 } 2756 2757 /** 2758 * Wait on all threads to finish. Presumption is that all closes and stops have already been 2759 * called. 2760 */ 2761 protected void stopServiceThreads() { 2762 // clean up the scheduled chores 2763 if (this.choreService != null) { 2764 shutdownChore(nonceManagerChore); 2765 shutdownChore(compactionChecker); 2766 shutdownChore(compactedFileDischarger); 2767 shutdownChore(periodicFlusher); 2768 shutdownChore(healthCheckChore); 2769 shutdownChore(storefileRefresher); 2770 shutdownChore(fsUtilizationChore); 2771 shutdownChore(slowLogTableOpsChore); 2772 shutdownChore(rsMobFileCleanerChore); 2773 // cancel the remaining scheduled chores (in case we missed out any) 2774 // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any 2775 choreService.shutdown(); 2776 } 2777 2778 if (this.cacheFlusher != null) { 2779 this.cacheFlusher.join(); 2780 } 2781 if (this.walRoller != null) { 2782 this.walRoller.close(); 2783 } 2784 if (this.compactSplitThread != null) { 2785 this.compactSplitThread.join(); 2786 } 2787 if (this.executorService != null) { 2788 this.executorService.shutdown(); 2789 } 2790 if ( 2791 this.replicationSourceHandler != null 2792 && this.replicationSourceHandler == this.replicationSinkHandler 2793 ) { 2794 this.replicationSourceHandler.stopReplicationService(); 2795 } else { 2796 if (this.replicationSourceHandler != null) { 2797 this.replicationSourceHandler.stopReplicationService(); 2798 } 2799 if (this.replicationSinkHandler != null) { 2800 this.replicationSinkHandler.stopReplicationService(); 2801 } 2802 } 2803 } 2804 2805 /** Returns Return the object that implements the replication source executorService. */ 2806 @InterfaceAudience.Private 2807 public ReplicationSourceService getReplicationSourceService() { 2808 return replicationSourceHandler; 2809 } 2810 2811 /** Returns Return the object that implements the replication sink executorService. */ 2812 ReplicationSinkService getReplicationSinkService() { 2813 return replicationSinkHandler; 2814 } 2815 2816 /** 2817 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2818 * connection, the current rssStub must be null. Method will block until a master is available. 2819 * You can break from this block by requesting the server stop. 2820 * @return master + port, or null if server has been stopped 2821 */ 2822 private synchronized ServerName createRegionServerStatusStub() { 2823 // Create RS stub without refreshing the master node from ZK, use cached data 2824 return createRegionServerStatusStub(false); 2825 } 2826 2827 /** 2828 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2829 * connection, the current rssStub must be null. Method will block until a master is available. 2830 * You can break from this block by requesting the server stop. 2831 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2832 * @return master + port, or null if server has been stopped 2833 */ 2834 @InterfaceAudience.Private 2835 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2836 if (rssStub != null) { 2837 return masterAddressTracker.getMasterAddress(); 2838 } 2839 ServerName sn = null; 2840 long previousLogTime = 0; 2841 RegionServerStatusService.BlockingInterface intRssStub = null; 2842 LockService.BlockingInterface intLockStub = null; 2843 boolean interrupted = false; 2844 try { 2845 while (keepLooping()) { 2846 sn = this.masterAddressTracker.getMasterAddress(refresh); 2847 if (sn == null) { 2848 if (!keepLooping()) { 2849 // give up with no connection. 2850 LOG.debug("No master found and cluster is stopped; bailing out"); 2851 return null; 2852 } 2853 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2854 LOG.debug("No master found; retry"); 2855 previousLogTime = EnvironmentEdgeManager.currentTime(); 2856 } 2857 refresh = true; // let's try pull it from ZK directly 2858 if (sleepInterrupted(200)) { 2859 interrupted = true; 2860 } 2861 continue; 2862 } 2863 2864 // If we are on the active master, use the shortcut 2865 if (this instanceof HMaster && sn.equals(getServerName())) { 2866 intRssStub = ((HMaster) this).getMasterRpcServices(); 2867 intLockStub = ((HMaster) this).getMasterRpcServices(); 2868 break; 2869 } 2870 try { 2871 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, 2872 userProvider.getCurrent(), shortOperationTimeout); 2873 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2874 intLockStub = LockService.newBlockingStub(channel); 2875 break; 2876 } catch (IOException e) { 2877 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2878 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 2879 if (e instanceof ServerNotRunningYetException) { 2880 LOG.info("Master isn't available yet, retrying"); 2881 } else { 2882 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2883 } 2884 previousLogTime = EnvironmentEdgeManager.currentTime(); 2885 } 2886 if (sleepInterrupted(200)) { 2887 interrupted = true; 2888 } 2889 } 2890 } 2891 } finally { 2892 if (interrupted) { 2893 Thread.currentThread().interrupt(); 2894 } 2895 } 2896 this.rssStub = intRssStub; 2897 this.lockStub = intLockStub; 2898 return sn; 2899 } 2900 2901 /** 2902 * @return True if we should break loop because cluster is going down or this server has been 2903 * stopped or hdfs has gone bad. 2904 */ 2905 private boolean keepLooping() { 2906 return !this.stopped && isClusterUp(); 2907 } 2908 2909 /* 2910 * Let the master know we're here Run initialization using parameters passed us by the master. 2911 * @return A Map of key/value configurations we got from the Master else null if we failed to 2912 * register. n 2913 */ 2914 private RegionServerStartupResponse reportForDuty() throws IOException { 2915 if (this.masterless) { 2916 return RegionServerStartupResponse.getDefaultInstance(); 2917 } 2918 ServerName masterServerName = createRegionServerStatusStub(true); 2919 RegionServerStatusService.BlockingInterface rss = rssStub; 2920 if (masterServerName == null || rss == null) { 2921 return null; 2922 } 2923 RegionServerStartupResponse result = null; 2924 try { 2925 rpcServices.requestCount.reset(); 2926 rpcServices.rpcGetRequestCount.reset(); 2927 rpcServices.rpcScanRequestCount.reset(); 2928 rpcServices.rpcFullScanRequestCount.reset(); 2929 rpcServices.rpcMultiRequestCount.reset(); 2930 rpcServices.rpcMutateRequestCount.reset(); 2931 LOG.info("reportForDuty to master=" + masterServerName + " with isa=" + rpcServices.isa 2932 + ", startcode=" + this.startcode); 2933 long now = EnvironmentEdgeManager.currentTime(); 2934 int port = rpcServices.isa.getPort(); 2935 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2936 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2937 request.setUseThisHostnameInstead(useThisHostnameInstead); 2938 } 2939 request.setPort(port); 2940 request.setServerStartCode(this.startcode); 2941 request.setServerCurrentTime(now); 2942 result = rss.regionServerStartup(null, request.build()); 2943 } catch (ServiceException se) { 2944 IOException ioe = ProtobufUtil.getRemoteException(se); 2945 if (ioe instanceof ClockOutOfSyncException) { 2946 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe); 2947 // Re-throw IOE will cause RS to abort 2948 throw ioe; 2949 } else if (ioe instanceof ServerNotRunningYetException) { 2950 LOG.debug("Master is not running yet"); 2951 } else { 2952 LOG.warn("error telling master we are up", se); 2953 } 2954 rssStub = null; 2955 } 2956 return result; 2957 } 2958 2959 @Override 2960 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2961 try { 2962 GetLastFlushedSequenceIdRequest req = 2963 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2964 RegionServerStatusService.BlockingInterface rss = rssStub; 2965 if (rss == null) { // Try to connect one more time 2966 createRegionServerStatusStub(); 2967 rss = rssStub; 2968 if (rss == null) { 2969 // Still no luck, we tried 2970 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2971 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2972 .build(); 2973 } 2974 } 2975 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2976 return RegionStoreSequenceIds.newBuilder() 2977 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2978 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2979 } catch (ServiceException e) { 2980 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2981 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2982 .build(); 2983 } 2984 } 2985 2986 /** 2987 * Close meta region if we carry it 2988 * @param abort Whether we're running an abort. 2989 */ 2990 private void closeMetaTableRegions(final boolean abort) { 2991 HRegion meta = null; 2992 this.onlineRegionsLock.writeLock().lock(); 2993 try { 2994 for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) { 2995 RegionInfo hri = e.getValue().getRegionInfo(); 2996 if (hri.isMetaRegion()) { 2997 meta = e.getValue(); 2998 } 2999 if (meta != null) { 3000 break; 3001 } 3002 } 3003 } finally { 3004 this.onlineRegionsLock.writeLock().unlock(); 3005 } 3006 if (meta != null) { 3007 closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 3008 } 3009 } 3010 3011 /** 3012 * Schedule closes on all user regions. Should be safe calling multiple times because it wont' 3013 * close regions that are already closed or that are closing. 3014 * @param abort Whether we're running an abort. 3015 */ 3016 private void closeUserRegions(final boolean abort) { 3017 this.onlineRegionsLock.writeLock().lock(); 3018 try { 3019 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 3020 HRegion r = e.getValue(); 3021 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 3022 // Don't update zk with this close transition; pass false. 3023 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 3024 } 3025 } 3026 } finally { 3027 this.onlineRegionsLock.writeLock().unlock(); 3028 } 3029 } 3030 3031 /** Returns the info server */ 3032 public InfoServer getInfoServer() { 3033 return infoServer; 3034 } 3035 3036 /** Returns true if a stop has been requested. */ 3037 @Override 3038 public boolean isStopped() { 3039 return this.stopped; 3040 } 3041 3042 @Override 3043 public boolean isStopping() { 3044 return this.stopping; 3045 } 3046 3047 @Override 3048 public Configuration getConfiguration() { 3049 return conf; 3050 } 3051 3052 protected Map<String, HRegion> getOnlineRegions() { 3053 return this.onlineRegions; 3054 } 3055 3056 public int getNumberOfOnlineRegions() { 3057 return this.onlineRegions.size(); 3058 } 3059 3060 /** 3061 * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM 3062 * as client; HRegion cannot be serialized to cross an rpc. 3063 */ 3064 public Collection<HRegion> getOnlineRegionsLocalContext() { 3065 Collection<HRegion> regions = this.onlineRegions.values(); 3066 return Collections.unmodifiableCollection(regions); 3067 } 3068 3069 @Override 3070 public void addRegion(HRegion region) { 3071 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 3072 configurationManager.registerObserver(region); 3073 } 3074 3075 /** 3076 * @return A new Map of online regions sorted by region off-heap size with the first entry being 3077 * the biggest. If two regions are the same size, then the last one found wins; i.e. this 3078 * method may NOT return all regions. 3079 */ 3080 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() { 3081 // we'll sort the regions in reverse 3082 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 3083 // Copy over all regions. Regions are sorted by size with biggest first. 3084 for (HRegion region : this.onlineRegions.values()) { 3085 sortedRegions.put(region.getMemStoreOffHeapSize(), region); 3086 } 3087 return sortedRegions; 3088 } 3089 3090 /** 3091 * @return A new Map of online regions sorted by region heap size with the first entry being the 3092 * biggest. If two regions are the same size, then the last one found wins; i.e. this 3093 * method may NOT return all regions. 3094 */ 3095 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() { 3096 // we'll sort the regions in reverse 3097 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 3098 // Copy over all regions. Regions are sorted by size with biggest first. 3099 for (HRegion region : this.onlineRegions.values()) { 3100 sortedRegions.put(region.getMemStoreHeapSize(), region); 3101 } 3102 return sortedRegions; 3103 } 3104 3105 /** Returns time stamp in millis of when this region server was started */ 3106 public long getStartcode() { 3107 return this.startcode; 3108 } 3109 3110 /** Returns reference to FlushRequester */ 3111 @Override 3112 public FlushRequester getFlushRequester() { 3113 return this.cacheFlusher; 3114 } 3115 3116 @Override 3117 public CompactionRequester getCompactionRequestor() { 3118 return this.compactSplitThread; 3119 } 3120 3121 @Override 3122 public LeaseManager getLeaseManager() { 3123 return leaseManager; 3124 } 3125 3126 /** Returns Return the rootDir. */ 3127 protected Path getDataRootDir() { 3128 return dataRootDir; 3129 } 3130 3131 @Override 3132 public FileSystem getFileSystem() { 3133 return dataFs; 3134 } 3135 3136 /** Returns {@code true} when the data file system is available, {@code false} otherwise. */ 3137 boolean isDataFileSystemOk() { 3138 return this.dataFsOk; 3139 } 3140 3141 /** Returns Return the walRootDir. */ 3142 public Path getWALRootDir() { 3143 return walRootDir; 3144 } 3145 3146 /** Returns Return the walFs. */ 3147 public FileSystem getWALFileSystem() { 3148 return walFs; 3149 } 3150 3151 @Override 3152 public String toString() { 3153 return getServerName().toString(); 3154 } 3155 3156 @Override 3157 public ZKWatcher getZooKeeper() { 3158 return zooKeeper; 3159 } 3160 3161 @Override 3162 public CoordinatedStateManager getCoordinatedStateManager() { 3163 return csm; 3164 } 3165 3166 @Override 3167 public ServerName getServerName() { 3168 return serverName; 3169 } 3170 3171 public RegionServerCoprocessorHost getRegionServerCoprocessorHost() { 3172 return this.rsHost; 3173 } 3174 3175 @Override 3176 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 3177 return this.regionsInTransitionInRS; 3178 } 3179 3180 @Override 3181 public ExecutorService getExecutorService() { 3182 return executorService; 3183 } 3184 3185 @Override 3186 public ChoreService getChoreService() { 3187 return choreService; 3188 } 3189 3190 @Override 3191 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 3192 return rsQuotaManager; 3193 } 3194 3195 // 3196 // Main program and support routines 3197 // 3198 /** 3199 * Load the replication executorService objects, if any 3200 */ 3201 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 3202 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { 3203 if ( 3204 (server instanceof HMaster) 3205 && (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf)) 3206 ) { 3207 return; 3208 } 3209 // read in the name of the source replication class from the config file. 3210 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 3211 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 3212 3213 // read in the name of the sink replication class from the config file. 3214 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 3215 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 3216 3217 // If both the sink and the source class names are the same, then instantiate 3218 // only one object. 3219 if (sourceClassname.equals(sinkClassname)) { 3220 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3221 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 3222 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 3223 } else { 3224 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3225 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 3226 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 3227 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 3228 } 3229 } 3230 3231 private static <T extends ReplicationService> T newReplicationInstance(String classname, 3232 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 3233 Path oldLogDir, WALFactory walFactory) throws IOException { 3234 final Class<? extends T> clazz; 3235 try { 3236 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 3237 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 3238 } catch (java.lang.ClassNotFoundException nfe) { 3239 throw new IOException("Could not find class for " + classname); 3240 } 3241 T service = ReflectionUtils.newInstance(clazz, conf); 3242 service.initialize(server, walFs, logDir, oldLogDir, walFactory); 3243 return service; 3244 } 3245 3246 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() { 3247 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 3248 if (!this.isOnline()) { 3249 return walGroupsReplicationStatus; 3250 } 3251 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 3252 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 3253 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 3254 for (ReplicationSourceInterface source : allSources) { 3255 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 3256 } 3257 return walGroupsReplicationStatus; 3258 } 3259 3260 /** 3261 * Utility for constructing an instance of the passed HRegionServer class. 3262 */ 3263 static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass, 3264 final Configuration conf) { 3265 try { 3266 Constructor<? extends HRegionServer> c = 3267 regionServerClass.getConstructor(Configuration.class); 3268 return c.newInstance(conf); 3269 } catch (Exception e) { 3270 throw new RuntimeException( 3271 "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); 3272 } 3273 } 3274 3275 /** 3276 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 3277 */ 3278 public static void main(String[] args) { 3279 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 3280 VersionInfo.logVersion(); 3281 Configuration conf = HBaseConfiguration.create(); 3282 @SuppressWarnings("unchecked") 3283 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 3284 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 3285 3286 new HRegionServerCommandLine(regionServerClass).doMain(args); 3287 } 3288 3289 /** 3290 * Gets the online regions of the specified table. This method looks at the in-memory 3291 * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions. 3292 * If a region on this table has been closed during a disable, etc., it will not be included in 3293 * the returned list. So, the returned list may not necessarily be ALL regions in this table, its 3294 * all the ONLINE regions in the table. 3295 * @param tableName table to limit the scope of the query 3296 * @return Online regions from <code>tableName</code> 3297 */ 3298 @Override 3299 public List<HRegion> getRegions(TableName tableName) { 3300 List<HRegion> tableRegions = new ArrayList<>(); 3301 synchronized (this.onlineRegions) { 3302 for (HRegion region : this.onlineRegions.values()) { 3303 RegionInfo regionInfo = region.getRegionInfo(); 3304 if (regionInfo.getTable().equals(tableName)) { 3305 tableRegions.add(region); 3306 } 3307 } 3308 } 3309 return tableRegions; 3310 } 3311 3312 @Override 3313 public List<HRegion> getRegions() { 3314 List<HRegion> allRegions; 3315 synchronized (this.onlineRegions) { 3316 // Return a clone copy of the onlineRegions 3317 allRegions = new ArrayList<>(onlineRegions.values()); 3318 } 3319 return allRegions; 3320 } 3321 3322 /** 3323 * Gets the online tables in this RS. This method looks at the in-memory onlineRegions. 3324 * @return all the online tables in this RS 3325 */ 3326 public Set<TableName> getOnlineTables() { 3327 Set<TableName> tables = new HashSet<>(); 3328 synchronized (this.onlineRegions) { 3329 for (Region region : this.onlineRegions.values()) { 3330 tables.add(region.getTableDescriptor().getTableName()); 3331 } 3332 } 3333 return tables; 3334 } 3335 3336 public String[] getRegionServerCoprocessors() { 3337 TreeSet<String> coprocessors = new TreeSet<>(); 3338 try { 3339 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3340 } catch (IOException exception) { 3341 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " 3342 + "skipping."); 3343 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3344 } 3345 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3346 for (HRegion region : regions) { 3347 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3348 try { 3349 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3350 } catch (IOException exception) { 3351 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region 3352 + "; skipping."); 3353 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3354 } 3355 } 3356 coprocessors.addAll(rsHost.getCoprocessors()); 3357 return coprocessors.toArray(new String[0]); 3358 } 3359 3360 /** 3361 * Try to close the region, logs a warning on failure but continues. 3362 * @param region Region to close 3363 */ 3364 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3365 try { 3366 if (!closeRegion(region.getEncodedName(), abort, null)) { 3367 LOG 3368 .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing"); 3369 } 3370 } catch (IOException e) { 3371 LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing", 3372 e); 3373 } 3374 } 3375 3376 /** 3377 * Close asynchronously a region, can be called from the master or internally by the regionserver 3378 * when stopping. If called from the master, the region will update the status. 3379 * <p> 3380 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3381 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3382 * </p> 3383 * <p> 3384 * If a close was in progress, this new request will be ignored, and an exception thrown. 3385 * </p> 3386 * @param encodedName Region to close 3387 * @param abort True if we are aborting 3388 * @param destination Where the Region is being moved too... maybe null if unknown. 3389 * @return True if closed a region. 3390 * @throws NotServingRegionException if the region is not online 3391 */ 3392 protected boolean closeRegion(String encodedName, final boolean abort, 3393 final ServerName destination) throws NotServingRegionException { 3394 // Check for permissions to close. 3395 HRegion actualRegion = this.getRegion(encodedName); 3396 // Can be null if we're calling close on a region that's not online 3397 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3398 try { 3399 actualRegion.getCoprocessorHost().preClose(false); 3400 } catch (IOException exp) { 3401 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3402 return false; 3403 } 3404 } 3405 3406 // previous can come back 'null' if not in map. 3407 final Boolean previous = 3408 this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE); 3409 3410 if (Boolean.TRUE.equals(previous)) { 3411 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " 3412 + "trying to OPEN. Cancelling OPENING."); 3413 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3414 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3415 // We're going to try to do a standard close then. 3416 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." 3417 + " Doing a standard close now"); 3418 return closeRegion(encodedName, abort, destination); 3419 } 3420 // Let's get the region from the online region list again 3421 actualRegion = this.getRegion(encodedName); 3422 if (actualRegion == null) { // If already online, we still need to close it. 3423 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3424 // The master deletes the znode when it receives this exception. 3425 throw new NotServingRegionException( 3426 "The region " + encodedName + " was opening but not yet served. Opening is cancelled."); 3427 } 3428 } else if (previous == null) { 3429 LOG.info("Received CLOSE for {}", encodedName); 3430 } else if (Boolean.FALSE.equals(previous)) { 3431 LOG.info("Received CLOSE for the region: " + encodedName 3432 + ", which we are already trying to CLOSE, but not completed yet"); 3433 return true; 3434 } 3435 3436 if (actualRegion == null) { 3437 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3438 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3439 // The master deletes the znode when it receives this exception. 3440 throw new NotServingRegionException( 3441 "The region " + encodedName + " is not online, and is not opening."); 3442 } 3443 3444 CloseRegionHandler crh; 3445 final RegionInfo hri = actualRegion.getRegionInfo(); 3446 if (hri.isMetaRegion()) { 3447 crh = new CloseMetaHandler(this, this, hri, abort); 3448 } else { 3449 crh = new CloseRegionHandler(this, this, hri, abort, destination); 3450 } 3451 this.executorService.submit(crh); 3452 return true; 3453 } 3454 3455 /** 3456 * @return HRegion for the passed binary <code>regionName</code> or null if named region is not 3457 * member of the online regions. 3458 */ 3459 public HRegion getOnlineRegion(final byte[] regionName) { 3460 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3461 return this.onlineRegions.get(encodedRegionName); 3462 } 3463 3464 @Override 3465 public HRegion getRegion(final String encodedRegionName) { 3466 return this.onlineRegions.get(encodedRegionName); 3467 } 3468 3469 @Override 3470 public boolean removeRegion(final HRegion r, ServerName destination) { 3471 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3472 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3473 if (destination != null) { 3474 long closeSeqNum = r.getMaxFlushedSeqId(); 3475 if (closeSeqNum == HConstants.NO_SEQNUM) { 3476 // No edits in WAL for this region; get the sequence number when the region was opened. 3477 closeSeqNum = r.getOpenSeqNum(); 3478 if (closeSeqNum == HConstants.NO_SEQNUM) { 3479 closeSeqNum = 0; 3480 } 3481 } 3482 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3483 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3484 if (selfMove) { 3485 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put( 3486 r.getRegionInfo().getEncodedName(), 3487 new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3488 } 3489 } 3490 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3491 return toReturn != null; 3492 } 3493 3494 /** 3495 * Protected Utility method for safely obtaining an HRegion handle. 3496 * @param regionName Name of online {@link HRegion} to return 3497 * @return {@link HRegion} for <code>regionName</code> 3498 */ 3499 protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { 3500 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3501 return getRegionByEncodedName(regionName, encodedRegionName); 3502 } 3503 3504 public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { 3505 return getRegionByEncodedName(null, encodedRegionName); 3506 } 3507 3508 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3509 throws NotServingRegionException { 3510 HRegion region = this.onlineRegions.get(encodedRegionName); 3511 if (region == null) { 3512 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3513 if (moveInfo != null) { 3514 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3515 } 3516 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3517 String regionNameStr = 3518 regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName); 3519 if (isOpening != null && isOpening) { 3520 throw new RegionOpeningException( 3521 "Region " + regionNameStr + " is opening on " + this.serverName); 3522 } 3523 throw new NotServingRegionException( 3524 "" + regionNameStr + " is not online on " + this.serverName); 3525 } 3526 return region; 3527 } 3528 3529 /** 3530 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't 3531 * already. 3532 * @param t Throwable 3533 * @param msg Message to log in error. Can be null. 3534 * @return Throwable converted to an IOE; methods can only let out IOEs. 3535 */ 3536 private Throwable cleanup(final Throwable t, final String msg) { 3537 // Don't log as error if NSRE; NSRE is 'normal' operation. 3538 if (t instanceof NotServingRegionException) { 3539 LOG.debug("NotServingRegionException; " + t.getMessage()); 3540 return t; 3541 } 3542 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3543 if (msg == null) { 3544 LOG.error("", e); 3545 } else { 3546 LOG.error(msg, e); 3547 } 3548 if (!rpcServices.checkOOME(t)) { 3549 checkFileSystem(); 3550 } 3551 return t; 3552 } 3553 3554 /** 3555 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3556 * @return Make <code>t</code> an IOE if it isn't already. 3557 */ 3558 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3559 return (t instanceof IOException ? (IOException) t 3560 : msg == null || msg.length() == 0 ? new IOException(t) 3561 : new IOException(msg, t)); 3562 } 3563 3564 /** 3565 * Checks to see if the file system is still accessible. If not, sets abortRequested and 3566 * stopRequested 3567 * @return false if file system is not available 3568 */ 3569 boolean checkFileSystem() { 3570 if (this.dataFsOk && this.dataFs != null) { 3571 try { 3572 FSUtils.checkFileSystemAvailable(this.dataFs); 3573 } catch (IOException e) { 3574 abort("File System not available", e); 3575 this.dataFsOk = false; 3576 } 3577 } 3578 return this.dataFsOk; 3579 } 3580 3581 @Override 3582 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3583 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3584 Address[] addr = new Address[favoredNodes.size()]; 3585 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3586 // it is a map of region name to Address[] 3587 for (int i = 0; i < favoredNodes.size(); i++) { 3588 addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); 3589 } 3590 regionFavoredNodesMap.put(encodedRegionName, addr); 3591 } 3592 3593 /** 3594 * Return the favored nodes for a region given its encoded name. Look at the comment around 3595 * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here. 3596 * @param encodedRegionName the encoded region name. 3597 * @return array of favored locations 3598 */ 3599 @Override 3600 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3601 return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); 3602 } 3603 3604 @Override 3605 public ServerNonceManager getNonceManager() { 3606 return this.nonceManager; 3607 } 3608 3609 private static class MovedRegionInfo { 3610 private final ServerName serverName; 3611 private final long seqNum; 3612 3613 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3614 this.serverName = serverName; 3615 this.seqNum = closeSeqNum; 3616 } 3617 3618 public ServerName getServerName() { 3619 return serverName; 3620 } 3621 3622 public long getSeqNum() { 3623 return seqNum; 3624 } 3625 } 3626 3627 /** 3628 * We need a timeout. If not there is a risk of giving a wrong information: this would double the 3629 * number of network calls instead of reducing them. 3630 */ 3631 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3632 3633 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, 3634 boolean selfMove) { 3635 if (selfMove) { 3636 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3637 return; 3638 } 3639 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" 3640 + closeSeqNum); 3641 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3642 } 3643 3644 void removeFromMovedRegions(String encodedName) { 3645 movedRegionInfoCache.invalidate(encodedName); 3646 } 3647 3648 @InterfaceAudience.Private 3649 public MovedRegionInfo getMovedRegion(String encodedRegionName) { 3650 return movedRegionInfoCache.getIfPresent(encodedRegionName); 3651 } 3652 3653 @InterfaceAudience.Private 3654 public int movedRegionCacheExpiredTime() { 3655 return TIMEOUT_REGION_MOVED; 3656 } 3657 3658 private String getMyEphemeralNodePath() { 3659 return zooKeeper.getZNodePaths().getRsPath(serverName); 3660 } 3661 3662 private boolean isHealthCheckerConfigured() { 3663 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3664 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3665 } 3666 3667 /** Returns the underlying {@link CompactSplit} for the servers */ 3668 public CompactSplit getCompactSplitThread() { 3669 return this.compactSplitThread; 3670 } 3671 3672 CoprocessorServiceResponse execRegionServerService( 3673 @SuppressWarnings("UnusedParameters") final RpcController controller, 3674 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3675 try { 3676 ServerRpcController serviceController = new ServerRpcController(); 3677 CoprocessorServiceCall call = serviceRequest.getCall(); 3678 String serviceName = call.getServiceName(); 3679 com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName); 3680 if (service == null) { 3681 throw new UnknownProtocolException(null, 3682 "No registered coprocessor executorService found for " + serviceName); 3683 } 3684 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 3685 service.getDescriptorForType(); 3686 3687 String methodName = call.getMethodName(); 3688 com.google.protobuf.Descriptors.MethodDescriptor methodDesc = 3689 serviceDesc.findMethodByName(methodName); 3690 if (methodDesc == null) { 3691 throw new UnknownProtocolException(service.getClass(), 3692 "Unknown method " + methodName + " called on executorService " + serviceName); 3693 } 3694 3695 com.google.protobuf.Message request = 3696 CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3697 final com.google.protobuf.Message.Builder responseBuilder = 3698 service.getResponsePrototype(methodDesc).newBuilderForType(); 3699 service.callMethod(methodDesc, serviceController, request, message -> { 3700 if (message != null) { 3701 responseBuilder.mergeFrom(message); 3702 } 3703 }); 3704 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3705 if (exception != null) { 3706 throw exception; 3707 } 3708 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3709 } catch (IOException ie) { 3710 throw new ServiceException(ie); 3711 } 3712 } 3713 3714 /** 3715 * May be null if this is a master which not carry table. 3716 * @return The block cache instance used by the regionserver. 3717 */ 3718 @Override 3719 public Optional<BlockCache> getBlockCache() { 3720 return Optional.ofNullable(this.blockCache); 3721 } 3722 3723 /** 3724 * May be null if this is a master which not carry table. 3725 * @return The cache for mob files used by the regionserver. 3726 */ 3727 @Override 3728 public Optional<MobFileCache> getMobFileCache() { 3729 return Optional.ofNullable(this.mobFileCache); 3730 } 3731 3732 @Override 3733 public AccessChecker getAccessChecker() { 3734 return rpcServices.getAccessChecker(); 3735 } 3736 3737 @Override 3738 public ZKPermissionWatcher getZKPermissionWatcher() { 3739 return rpcServices.getZkPermissionWatcher(); 3740 } 3741 3742 /** Returns : Returns the ConfigurationManager object for testing purposes. */ 3743 @InterfaceAudience.Private 3744 ConfigurationManager getConfigurationManager() { 3745 return configurationManager; 3746 } 3747 3748 /** Returns Return table descriptors implementation. */ 3749 @Override 3750 public TableDescriptors getTableDescriptors() { 3751 return this.tableDescriptors; 3752 } 3753 3754 /** 3755 * Reload the configuration from disk. 3756 */ 3757 void updateConfiguration() { 3758 LOG.info("Reloading the configuration from disk."); 3759 // Reload the configuration from disk. 3760 conf.reloadConfiguration(); 3761 configurationManager.notifyAllObservers(conf); 3762 } 3763 3764 CacheEvictionStats clearRegionBlockCache(Region region) { 3765 long evictedBlocks = 0; 3766 3767 for (Store store : region.getStores()) { 3768 for (StoreFile hFile : store.getStorefiles()) { 3769 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3770 } 3771 } 3772 3773 return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build(); 3774 } 3775 3776 @Override 3777 public double getCompactionPressure() { 3778 double max = 0; 3779 for (Region region : onlineRegions.values()) { 3780 for (Store store : region.getStores()) { 3781 double normCount = store.getCompactionPressure(); 3782 if (normCount > max) { 3783 max = normCount; 3784 } 3785 } 3786 } 3787 return max; 3788 } 3789 3790 @Override 3791 public HeapMemoryManager getHeapMemoryManager() { 3792 return hMemManager; 3793 } 3794 3795 public MemStoreFlusher getMemStoreFlusher() { 3796 return cacheFlusher; 3797 } 3798 3799 /** 3800 * For testing 3801 * @return whether all wal roll request finished for this regionserver 3802 */ 3803 @InterfaceAudience.Private 3804 public boolean walRollRequestFinished() { 3805 return this.walRoller.walRollFinished(); 3806 } 3807 3808 @Override 3809 public ThroughputController getFlushThroughputController() { 3810 return flushThroughputController; 3811 } 3812 3813 @Override 3814 public double getFlushPressure() { 3815 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3816 // return 0 during RS initialization 3817 return 0.0; 3818 } 3819 return getRegionServerAccounting().getFlushPressure(); 3820 } 3821 3822 @Override 3823 public void onConfigurationChange(Configuration newConf) { 3824 ThroughputController old = this.flushThroughputController; 3825 if (old != null) { 3826 old.stop("configuration change"); 3827 } 3828 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3829 try { 3830 Superusers.initialize(newConf); 3831 } catch (IOException e) { 3832 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3833 } 3834 3835 // update region server coprocessor if the configuration has changed. 3836 if ( 3837 CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf, 3838 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY) 3839 ) { 3840 LOG.info("Update region server coprocessors because the configuration has changed"); 3841 this.rsHost = new RegionServerCoprocessorHost(this, newConf); 3842 } 3843 } 3844 3845 @Override 3846 public MetricsRegionServer getMetrics() { 3847 return metricsRegionServer; 3848 } 3849 3850 @Override 3851 public SecureBulkLoadManager getSecureBulkLoadManager() { 3852 return this.secureBulkLoadManager; 3853 } 3854 3855 @Override 3856 public EntityLock regionLock(final List<RegionInfo> regionInfos, final String description, 3857 final Abortable abort) { 3858 return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) 3859 .regionLock(regionInfos, description, abort); 3860 } 3861 3862 @Override 3863 public void unassign(byte[] regionName) throws IOException { 3864 clusterConnection.getAdmin().unassign(regionName, false); 3865 } 3866 3867 @Override 3868 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3869 return this.rsSpaceQuotaManager; 3870 } 3871 3872 @Override 3873 public boolean reportFileArchivalForQuotas(TableName tableName, 3874 Collection<Entry<String, Long>> archivedFiles) { 3875 if (TEST_SKIP_REPORTING_TRANSITION) { 3876 return false; 3877 } 3878 RegionServerStatusService.BlockingInterface rss = rssStub; 3879 if (rss == null || rsSpaceQuotaManager == null) { 3880 // the current server could be stopping. 3881 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3882 return false; 3883 } 3884 try { 3885 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3886 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3887 rss.reportFileArchival(null, request); 3888 } catch (ServiceException se) { 3889 IOException ioe = ProtobufUtil.getRemoteException(se); 3890 if (ioe instanceof PleaseHoldException) { 3891 if (LOG.isTraceEnabled()) { 3892 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3893 + " This will be retried.", ioe); 3894 } 3895 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3896 return false; 3897 } 3898 if (rssStub == rss) { 3899 rssStub = null; 3900 } 3901 // re-create the stub if we failed to report the archival 3902 createRegionServerStatusStub(true); 3903 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3904 return false; 3905 } 3906 return true; 3907 } 3908 3909 public NettyEventLoopGroupConfig getEventLoopGroupConfig() { 3910 return eventLoopGroupConfig; 3911 } 3912 3913 @Override 3914 public Connection createConnection(Configuration conf) throws IOException { 3915 User user = UserProvider.instantiate(conf).getCurrent(); 3916 return ServerConnectionUtils.createShortCircuitConnection(conf, user, this.serverName, 3917 this.rpcServices, this.rpcServices, new RegionServerRegistry(this)); 3918 } 3919 3920 void executeProcedure(long procId, RSProcedureCallable callable) { 3921 executorService.submit(new RSProcedureHandler(this, procId, callable)); 3922 } 3923 3924 public void remoteProcedureComplete(long procId, Throwable error) { 3925 procedureResultReporter.complete(procId, error); 3926 } 3927 3928 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3929 RegionServerStatusService.BlockingInterface rss; 3930 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 3931 for (;;) { 3932 rss = rssStub; 3933 if (rss != null) { 3934 break; 3935 } 3936 createRegionServerStatusStub(); 3937 } 3938 try { 3939 rss.reportProcedureDone(null, request); 3940 } catch (ServiceException se) { 3941 if (rssStub == rss) { 3942 rssStub = null; 3943 } 3944 throw ProtobufUtil.getRemoteException(se); 3945 } 3946 } 3947 3948 /** 3949 * Will ignore the open/close region procedures which already submitted or executed. When master 3950 * had unfinished open/close region procedure and restarted, new active master may send duplicate 3951 * open/close region request to regionserver. The open/close request is submitted to a thread pool 3952 * and execute. So first need a cache for submitted open/close region procedures. After the 3953 * open/close region request executed and report region transition succeed, cache it in executed 3954 * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3955 * transition succeed, master will not send the open/close region request to regionserver again. 3956 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3957 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See 3958 * HBASE-22404 for more details. 3959 * @param procId the id of the open/close region procedure 3960 * @return true if the procedure can be submitted. 3961 */ 3962 boolean submitRegionProcedure(long procId) { 3963 if (procId == -1) { 3964 return true; 3965 } 3966 // Ignore the region procedures which already submitted. 3967 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3968 if (previous != null) { 3969 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3970 return false; 3971 } 3972 // Ignore the region procedures which already executed. 3973 if (executedRegionProcedures.getIfPresent(procId) != null) { 3974 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3975 return false; 3976 } 3977 return true; 3978 } 3979 3980 /** 3981 * See {@link #submitRegionProcedure(long)}. 3982 * @param procId the id of the open/close region procedure 3983 */ 3984 public void finishRegionProcedure(long procId) { 3985 executedRegionProcedures.put(procId, procId); 3986 submittedRegionProcedures.remove(procId); 3987 } 3988 3989 public boolean isShutDown() { 3990 return shutDown; 3991 } 3992 3993 /** 3994 * Force to terminate region server when abort timeout. 3995 */ 3996 private static class SystemExitWhenAbortTimeout extends TimerTask { 3997 3998 public SystemExitWhenAbortTimeout() { 3999 } 4000 4001 @Override 4002 public void run() { 4003 LOG.warn("Aborting region server timed out, terminating forcibly" 4004 + " and does not wait for any running shutdown hooks or finalizers to finish their work." 4005 + " Thread dump to stdout."); 4006 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 4007 Runtime.getRuntime().halt(1); 4008 } 4009 } 4010 4011 @InterfaceAudience.Private 4012 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 4013 return compactedFileDischarger; 4014 } 4015 4016 /** 4017 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}} 4018 * @return pause time 4019 */ 4020 @InterfaceAudience.Private 4021 public long getRetryPauseTime() { 4022 return this.retryPauseTime; 4023 } 4024 4025 public Optional<ServerName> getActiveMaster() { 4026 return Optional.ofNullable(masterAddressTracker.getMasterAddress()); 4027 } 4028 4029 public List<ServerName> getBackupMasters() { 4030 return masterAddressTracker.getBackupMasters(); 4031 } 4032 4033 public Iterator<ServerName> getRegionServers() { 4034 return regionServerAddressTracker.getRegionServers().iterator(); 4035 } 4036 4037 public MetaRegionLocationCache getMetaRegionLocationCache() { 4038 return this.metaRegionLocationCache; 4039 } 4040 4041 @InterfaceAudience.Private 4042 public BrokenStoreFileCleaner getBrokenStoreFileCleaner() { 4043 return brokenStoreFileCleaner; 4044 } 4045 4046 @InterfaceAudience.Private 4047 public RSMobFileCleanerChore getRSMobFileCleanerChore() { 4048 return rsMobFileCleanerChore; 4049 } 4050}