001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 024import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; 025import java.io.IOException; 026import java.lang.management.MemoryType; 027import java.lang.management.MemoryUsage; 028import java.lang.reflect.Constructor; 029import java.net.BindException; 030import java.net.InetAddress; 031import java.net.InetSocketAddress; 032import java.time.Duration; 033import java.util.ArrayList; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.Comparator; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Map.Entry; 041import java.util.Objects; 042import java.util.Optional; 043import java.util.Set; 044import java.util.SortedMap; 045import java.util.Timer; 046import java.util.TimerTask; 047import java.util.TreeMap; 048import java.util.TreeSet; 049import java.util.concurrent.ConcurrentHashMap; 050import java.util.concurrent.ConcurrentMap; 051import java.util.concurrent.ConcurrentSkipListMap; 052import java.util.concurrent.TimeUnit; 053import java.util.concurrent.atomic.AtomicBoolean; 054import java.util.concurrent.locks.ReentrantReadWriteLock; 055import java.util.stream.Collectors; 056import javax.management.MalformedObjectNameException; 057import javax.servlet.http.HttpServlet; 058import org.apache.commons.lang3.RandomUtils; 059import org.apache.commons.lang3.StringUtils; 060import org.apache.commons.lang3.SystemUtils; 061import org.apache.hadoop.conf.Configuration; 062import org.apache.hadoop.fs.FileSystem; 063import org.apache.hadoop.fs.Path; 064import org.apache.hadoop.hbase.Abortable; 065import org.apache.hadoop.hbase.CacheEvictionStats; 066import org.apache.hadoop.hbase.CallQueueTooBigException; 067import org.apache.hadoop.hbase.ChoreService; 068import org.apache.hadoop.hbase.ClockOutOfSyncException; 069import org.apache.hadoop.hbase.CoordinatedStateManager; 070import org.apache.hadoop.hbase.DoNotRetryIOException; 071import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException; 072import org.apache.hadoop.hbase.HBaseConfiguration; 073import org.apache.hadoop.hbase.HBaseInterfaceAudience; 074import org.apache.hadoop.hbase.HConstants; 075import org.apache.hadoop.hbase.HDFSBlocksDistribution; 076import org.apache.hadoop.hbase.HealthCheckChore; 077import org.apache.hadoop.hbase.MetaTableAccessor; 078import org.apache.hadoop.hbase.NotServingRegionException; 079import org.apache.hadoop.hbase.PleaseHoldException; 080import org.apache.hadoop.hbase.ScheduledChore; 081import org.apache.hadoop.hbase.Server; 082import org.apache.hadoop.hbase.ServerName; 083import org.apache.hadoop.hbase.Stoppable; 084import org.apache.hadoop.hbase.TableDescriptors; 085import org.apache.hadoop.hbase.TableName; 086import org.apache.hadoop.hbase.YouAreDeadException; 087import org.apache.hadoop.hbase.ZNodeClearer; 088import org.apache.hadoop.hbase.client.ClusterConnection; 089import org.apache.hadoop.hbase.client.Connection; 090import org.apache.hadoop.hbase.client.ConnectionUtils; 091import org.apache.hadoop.hbase.client.RegionInfo; 092import org.apache.hadoop.hbase.client.RegionInfoBuilder; 093import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 094import org.apache.hadoop.hbase.client.locking.EntityLock; 095import org.apache.hadoop.hbase.client.locking.LockServiceClient; 096import org.apache.hadoop.hbase.conf.ConfigurationManager; 097import org.apache.hadoop.hbase.conf.ConfigurationObserver; 098import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 099import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 100import org.apache.hadoop.hbase.exceptions.RegionMovedException; 101import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 102import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 103import org.apache.hadoop.hbase.executor.ExecutorService; 104import org.apache.hadoop.hbase.executor.ExecutorType; 105import org.apache.hadoop.hbase.fs.HFileSystem; 106import org.apache.hadoop.hbase.http.InfoServer; 107import org.apache.hadoop.hbase.io.hfile.BlockCache; 108import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 109import org.apache.hadoop.hbase.io.hfile.HFile; 110import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 111import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 112import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; 113import org.apache.hadoop.hbase.ipc.RpcClient; 114import org.apache.hadoop.hbase.ipc.RpcClientFactory; 115import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 116import org.apache.hadoop.hbase.ipc.RpcServer; 117import org.apache.hadoop.hbase.ipc.RpcServerInterface; 118import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 119import org.apache.hadoop.hbase.ipc.ServerRpcController; 120import org.apache.hadoop.hbase.log.HBaseMarkers; 121import org.apache.hadoop.hbase.master.HMaster; 122import org.apache.hadoop.hbase.master.LoadBalancer; 123import org.apache.hadoop.hbase.master.RegionState; 124import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; 125import org.apache.hadoop.hbase.mob.MobFileCache; 126import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 127import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; 128import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 129import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 130import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 131import org.apache.hadoop.hbase.quotas.QuotaUtil; 132import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 133import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 134import org.apache.hadoop.hbase.quotas.RegionSize; 135import org.apache.hadoop.hbase.quotas.RegionSizeStore; 136import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 137import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 138import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 139import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 140import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 141import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 142import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 143import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 144import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 145import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 146import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 147import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 148import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 149import org.apache.hadoop.hbase.security.SecurityConstants; 150import org.apache.hadoop.hbase.security.Superusers; 151import org.apache.hadoop.hbase.security.User; 152import org.apache.hadoop.hbase.security.UserProvider; 153import org.apache.hadoop.hbase.security.access.AccessChecker; 154import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; 155import org.apache.hadoop.hbase.trace.SpanReceiverHost; 156import org.apache.hadoop.hbase.trace.TraceUtil; 157import org.apache.hadoop.hbase.util.Addressing; 158import org.apache.hadoop.hbase.util.Bytes; 159import org.apache.hadoop.hbase.util.CommonFSUtils; 160import org.apache.hadoop.hbase.util.CompressionTest; 161import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 162import org.apache.hadoop.hbase.util.FSTableDescriptors; 163import org.apache.hadoop.hbase.util.FSUtils; 164import org.apache.hadoop.hbase.util.JvmPauseMonitor; 165import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 166import org.apache.hadoop.hbase.util.Pair; 167import org.apache.hadoop.hbase.util.RetryCounter; 168import org.apache.hadoop.hbase.util.RetryCounterFactory; 169import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 170import org.apache.hadoop.hbase.util.Sleeper; 171import org.apache.hadoop.hbase.util.Threads; 172import org.apache.hadoop.hbase.util.VersionInfo; 173import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 174import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; 175import org.apache.hadoop.hbase.wal.WAL; 176import org.apache.hadoop.hbase.wal.WALFactory; 177import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; 178import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 179import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 180import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 181import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 182import org.apache.hadoop.hbase.zookeeper.ZKUtil; 183import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 184import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 185import org.apache.hadoop.ipc.RemoteException; 186import org.apache.hadoop.util.ReflectionUtils; 187import org.apache.yetus.audience.InterfaceAudience; 188import org.apache.zookeeper.KeeperException; 189import org.slf4j.Logger; 190import org.slf4j.LoggerFactory; 191import sun.misc.Signal; 192 193import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 194import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 195import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 196import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 197import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 198import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 199import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 200import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 201import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 202import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 203 204import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 205import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 235 236/** 237 * HRegionServer makes a set of HRegions available to clients. It checks in with 238 * the HMaster. There are many HRegionServers in a single HBase deployment. 239 */ 240@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 241@SuppressWarnings({ "deprecation"}) 242public class HRegionServer extends Thread implements 243 RegionServerServices, LastSequenceId, ConfigurationObserver { 244 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 245 246 /** 247 * For testing only! Set to true to skip notifying region assignment to master . 248 */ 249 @InterfaceAudience.Private 250 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") 251 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 252 253 /** 254 * A map from RegionName to current action in progress. Boolean value indicates: 255 * true - if open region action in progress 256 * false - if close region action in progress 257 */ 258 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 259 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 260 261 /** 262 * Used to cache the open/close region procedures which already submitted. 263 * See {@link #submitRegionProcedure(long)}. 264 */ 265 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 266 /** 267 * Used to cache the open/close region procedures which already executed. 268 * See {@link #submitRegionProcedure(long)}. 269 */ 270 private final Cache<Long, Long> executedRegionProcedures = 271 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 272 273 /** 274 * Used to cache the moved-out regions 275 */ 276 private final Cache<String, MovedRegionInfo> movedRegionInfoCache = 277 CacheBuilder.newBuilder().expireAfterWrite(movedRegionCacheExpiredTime(), 278 TimeUnit.MILLISECONDS).build(); 279 280 private MemStoreFlusher cacheFlusher; 281 282 private HeapMemoryManager hMemManager; 283 284 /** 285 * Cluster connection to be shared by services. 286 * Initialized at server startup and closed when server shuts down. 287 * Clients must never close it explicitly. 288 * Clients hosted by this Server should make use of this clusterConnection rather than create 289 * their own; if they create their own, there is no way for the hosting server to shutdown 290 * ongoing client RPCs. 291 */ 292 protected ClusterConnection clusterConnection; 293 294 /** 295 * Go here to get table descriptors. 296 */ 297 protected TableDescriptors tableDescriptors; 298 299 // Replication services. If no replication, this handler will be null. 300 private ReplicationSourceService replicationSourceHandler; 301 private ReplicationSinkService replicationSinkHandler; 302 303 // Compactions 304 public CompactSplit compactSplitThread; 305 306 /** 307 * Map of regions currently being served by this region server. Key is the 308 * encoded region name. All access should be synchronized. 309 */ 310 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 311 /** 312 * Lock for gating access to {@link #onlineRegions}. 313 * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap? 314 */ 315 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 316 317 /** 318 * Map of encoded region names to the DataNode locations they should be hosted on 319 * We store the value as InetSocketAddress since this is used only in HDFS 320 * API (create() that takes favored nodes as hints for placing file blocks). 321 * We could have used ServerName here as the value class, but we'd need to 322 * convert it to InetSocketAddress at some point before the HDFS API call, and 323 * it seems a bit weird to store ServerName since ServerName refers to RegionServers 324 * and here we really mean DataNode locations. 325 */ 326 private final Map<String, InetSocketAddress[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 327 328 private LeaseManager leaseManager; 329 330 // Instance of the hbase executor executorService. 331 protected ExecutorService executorService; 332 333 private volatile boolean dataFsOk; 334 private HFileSystem dataFs; 335 private HFileSystem walFs; 336 337 // Set when a report to the master comes back with a message asking us to 338 // shutdown. Also set by call to stop when debugging or running unit tests 339 // of HRegionServer in isolation. 340 private volatile boolean stopped = false; 341 342 // Go down hard. Used if file system becomes unavailable and also in 343 // debugging and unit tests. 344 private AtomicBoolean abortRequested; 345 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 346 // Default abort timeout is 1200 seconds for safe 347 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 348 // Will run this task when abort timeout 349 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 350 351 // A state before we go into stopped state. At this stage we're closing user 352 // space regions. 353 private boolean stopping = false; 354 private volatile boolean killed = false; 355 private volatile boolean shutDown = false; 356 357 protected final Configuration conf; 358 359 private Path dataRootDir; 360 private Path walRootDir; 361 362 private final int threadWakeFrequency; 363 final int msgInterval; 364 365 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 366 private final int compactionCheckFrequency; 367 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 368 private final int flushCheckFrequency; 369 370 // Stub to do region server status calls against the master. 371 private volatile RegionServerStatusService.BlockingInterface rssStub; 372 private volatile LockService.BlockingInterface lockStub; 373 // RPC client. Used to make the stub above that does region server status checking. 374 private RpcClient rpcClient; 375 376 private RpcRetryingCallerFactory rpcRetryingCallerFactory; 377 private RpcControllerFactory rpcControllerFactory; 378 379 private UncaughtExceptionHandler uncaughtExceptionHandler; 380 381 // Info server. Default access so can be used by unit tests. REGIONSERVER 382 // is name of the webapp and the attribute name used stuffing this instance 383 // into web context. 384 protected InfoServer infoServer; 385 private JvmPauseMonitor pauseMonitor; 386 387 /** region server process name */ 388 public static final String REGIONSERVER = "regionserver"; 389 390 private MetricsRegionServer metricsRegionServer; 391 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 392 private SpanReceiverHost spanReceiverHost; 393 394 /** 395 * ChoreService used to schedule tasks that we want to run periodically 396 */ 397 private ChoreService choreService; 398 399 /** 400 * Check for compactions requests. 401 */ 402 private ScheduledChore compactionChecker; 403 404 /** 405 * Check for flushes 406 */ 407 private ScheduledChore periodicFlusher; 408 409 private volatile WALFactory walFactory; 410 411 private LogRoller walRoller; 412 413 // A thread which calls reportProcedureDone 414 private RemoteProcedureResultReporter procedureResultReporter; 415 416 // flag set after we're done setting up server threads 417 final AtomicBoolean online = new AtomicBoolean(false); 418 419 // zookeeper connection and watcher 420 protected final ZKWatcher zooKeeper; 421 422 // master address tracker 423 private final MasterAddressTracker masterAddressTracker; 424 425 // Cluster Status Tracker 426 protected final ClusterStatusTracker clusterStatusTracker; 427 428 // Log Splitting Worker 429 private SplitLogWorker splitLogWorker; 430 431 // A sleeper that sleeps for msgInterval. 432 protected final Sleeper sleeper; 433 434 private final int operationTimeout; 435 private final int shortOperationTimeout; 436 437 private final RegionServerAccounting regionServerAccounting; 438 439 private SlowLogTableOpsChore slowLogTableOpsChore = null; 440 441 // Block cache 442 private BlockCache blockCache; 443 // The cache for mob files 444 private MobFileCache mobFileCache; 445 446 /** The health check chore. */ 447 private HealthCheckChore healthCheckChore; 448 449 /** The nonce manager chore. */ 450 private ScheduledChore nonceManagerChore; 451 452 private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap(); 453 454 /** 455 * The server name the Master sees us as. Its made from the hostname the 456 * master passes us, port, and server startcode. Gets set after registration 457 * against Master. 458 */ 459 protected ServerName serverName; 460 461 /** 462 * hostname specified by hostname config 463 */ 464 protected String useThisHostnameInstead; 465 466 /** 467 * @deprecated since 2.4.0 and will be removed in 4.0.0. 468 * Use {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead. 469 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a> 470 */ 471 @Deprecated 472 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 473 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 474 "hbase.regionserver.hostname.disable.master.reversedns"; 475 476 /** 477 * HBASE-18226: This config and hbase.unasfe.regionserver.hostname are mutually exclusive. 478 * Exception will be thrown if both are used. 479 */ 480 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 481 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 482 "hbase.unsafe.regionserver.hostname.disable.master.reversedns"; 483 484 /** 485 * HBASE-24667: This config hbase.regionserver.hostname.disable.master.reversedns will be replaced by 486 * hbase.unsafe.regionserver.hostname.disable.master.reversedns. Keep the old config keys here for backward 487 * compatibility. 488 */ 489 static { 490 Configuration.addDeprecation(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY); 491 } 492 493 /** 494 * This servers startcode. 495 */ 496 protected final long startcode; 497 498 /** 499 * Unique identifier for the cluster we are a part of. 500 */ 501 protected String clusterId; 502 503 // chore for refreshing store files for secondary regions 504 private StorefileRefresherChore storefileRefresher; 505 506 private RegionServerCoprocessorHost rsHost; 507 508 private RegionServerProcedureManagerHost rspmHost; 509 510 private RegionServerRpcQuotaManager rsQuotaManager; 511 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 512 513 /** 514 * Nonce manager. Nonces are used to make operations like increment and append idempotent 515 * in the case where client doesn't receive the response from a successful operation and 516 * retries. We track the successful ops for some time via a nonce sent by client and handle 517 * duplicate operations (currently, by failing them; in future we might use MVCC to return 518 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from 519 * HBASE-3787) are: 520 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth 521 * of past records. If we don't read the records, we don't read and recover the nonces. 522 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup. 523 * - There's no WAL recovery during normal region move, so nonces will not be transfered. 524 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and 525 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush 526 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was 527 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce 528 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the 529 * latest nonce in it expired. It can also be recovered during move. 530 */ 531 final ServerNonceManager nonceManager; 532 533 private UserProvider userProvider; 534 535 protected final RSRpcServices rpcServices; 536 537 private CoordinatedStateManager csm; 538 539 /** 540 * Configuration manager is used to register/deregister and notify the configuration observers 541 * when the regionserver is notified that there was a change in the on disk configs. 542 */ 543 protected final ConfigurationManager configurationManager; 544 545 @InterfaceAudience.Private 546 CompactedHFilesDischarger compactedFileDischarger; 547 548 private volatile ThroughputController flushThroughputController; 549 550 private SecureBulkLoadManager secureBulkLoadManager; 551 552 private FileSystemUtilizationChore fsUtilizationChore; 553 554 private final NettyEventLoopGroupConfig eventLoopGroupConfig; 555 556 /** 557 * Provide online slow log responses from ringbuffer 558 */ 559 private NamedQueueRecorder namedQueueRecorder = null; 560 561 /** 562 * True if this RegionServer is coming up in a cluster where there is no Master; 563 * means it needs to just come up and make do without a Master to talk to: e.g. in test or 564 * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only 565 * purpose is as a Replication-stream sink; see HBASE-18846 for more. 566 * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ? 567 */ 568 private final boolean masterless; 569 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 570 571 /**regionserver codec list **/ 572 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 573 574 // A timer to shutdown the process if abort takes too long 575 private Timer abortMonitor; 576 577 /** 578 * Starts a HRegionServer at the default location. 579 * <p/> 580 * Don't start any services or managers in here in the Constructor. 581 * Defer till after we register with the Master as much as possible. See {@link #startServices}. 582 */ 583 public HRegionServer(final Configuration conf) throws IOException { 584 super("RegionServer"); // thread name 585 TraceUtil.initTracer(conf); 586 try { 587 this.startcode = System.currentTimeMillis(); 588 this.conf = conf; 589 this.dataFsOk = true; 590 this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 591 this.eventLoopGroupConfig = setupNetty(this.conf); 592 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); 593 HFile.checkHFileVersion(this.conf); 594 checkCodecs(this.conf); 595 this.userProvider = UserProvider.instantiate(conf); 596 FSUtils.setupShortCircuitRead(this.conf); 597 598 // Disable usage of meta replicas in the regionserver 599 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 600 // Config'ed params 601 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 602 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 603 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 604 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); 605 606 this.sleeper = new Sleeper(this.msgInterval, this); 607 608 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 609 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 610 611 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 612 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 613 614 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 615 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 616 617 this.abortRequested = new AtomicBoolean(false); 618 this.stopped = false; 619 620 initNamedQueueRecorder(conf); 621 rpcServices = createRpcServices(); 622 useThisHostnameInstead = getUseThisHostnameInstead(conf); 623 String hostName = 624 StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName() 625 : this.useThisHostnameInstead; 626 serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); 627 628 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); 629 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); 630 631 // login the zookeeper client principal (if using security) 632 ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, 633 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); 634 // login the server principal (if using secure Hadoop) 635 login(userProvider, hostName); 636 // init superusers and add the server principal (if using security) 637 // or process owner as default super user. 638 Superusers.initialize(conf); 639 regionServerAccounting = new RegionServerAccounting(conf); 640 641 boolean isMasterNotCarryTable = 642 this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf); 643 644 // no need to instantiate block cache and mob file cache when master not carry table 645 if (!isMasterNotCarryTable) { 646 blockCache = BlockCacheFactory.createBlockCache(conf); 647 mobFileCache = new MobFileCache(conf); 648 } 649 650 uncaughtExceptionHandler = 651 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 652 653 initializeFileSystem(); 654 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); 655 656 this.configurationManager = new ConfigurationManager(); 657 setupWindows(getConfiguration(), getConfigurationManager()); 658 659 // Some unit tests don't need a cluster, so no zookeeper at all 660 // Open connection to zookeeper and set primary watcher 661 zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this, 662 canCreateBaseZNode()); 663 // If no master in cluster, skip trying to track one or look for a cluster status. 664 if (!this.masterless) { 665 if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 666 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { 667 this.csm = new ZkCoordinatedStateManager(this); 668 } 669 670 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 671 masterAddressTracker.start(); 672 673 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); 674 clusterStatusTracker.start(); 675 } else { 676 masterAddressTracker = null; 677 clusterStatusTracker = null; 678 } 679 this.rpcServices.start(zooKeeper); 680 // This violates 'no starting stuff in Constructor' but Master depends on the below chore 681 // and executor being created and takes a different startup route. Lots of overlap between HRS 682 // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super 683 // Master expects Constructor to put up web servers. Ugh. 684 // class HRS. TODO. 685 this.choreService = new ChoreService(getName(), true); 686 this.executorService = new ExecutorService(getName()); 687 putUpWebUI(); 688 } catch (Throwable t) { 689 // Make sure we log the exception. HRegionServer is often started via reflection and the 690 // cause of failed startup is lost. 691 LOG.error("Failed construction RegionServer", t); 692 throw t; 693 } 694 } 695 696 private void initNamedQueueRecorder(Configuration conf) { 697 if (!(this instanceof HMaster)) { 698 final boolean isOnlineLogProviderEnabled = conf.getBoolean( 699 HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, 700 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 701 if (isOnlineLogProviderEnabled) { 702 this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); 703 } 704 } else { 705 final boolean isBalancerDecisionRecording = conf 706 .getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, 707 BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); 708 if (isBalancerDecisionRecording) { 709 this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); 710 } 711 } 712 } 713 714 // HMaster should override this method to load the specific config for master 715 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 716 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); 717 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 718 if (!StringUtils.isBlank(hostname)) { 719 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + UNSAFE_RS_HOSTNAME_KEY + 720 " are mutually exclusive. Do not set " + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + 721 " to true while " + UNSAFE_RS_HOSTNAME_KEY + " is used"; 722 throw new IOException(msg); 723 } else { 724 return rpcServices.isa.getHostName(); 725 } 726 } else { 727 return hostname; 728 } 729 } 730 731 /** 732 * If running on Windows, do windows-specific setup. 733 */ 734 private static void setupWindows(final Configuration conf, ConfigurationManager cm) { 735 if (!SystemUtils.IS_OS_WINDOWS) { 736 Signal.handle(new Signal("HUP"), signal -> { 737 conf.reloadConfiguration(); 738 cm.notifyAllObservers(conf); 739 }); 740 } 741 } 742 743 private static NettyEventLoopGroupConfig setupNetty(Configuration conf) { 744 // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL. 745 NettyEventLoopGroupConfig nelgc = 746 new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); 747 NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 748 NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 749 return nelgc; 750 } 751 752 private void initializeFileSystem() throws IOException { 753 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase 754 // checksum verification enabled, then automatically switch off hdfs checksum verification. 755 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 756 String walDirUri = CommonFSUtils.getDirUri(this.conf, 757 new Path(conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR)))); 758 // set WAL's uri 759 if (walDirUri != null) { 760 CommonFSUtils.setFsDefault(this.conf, walDirUri); 761 } 762 // init the WALFs 763 this.walFs = new HFileSystem(this.conf, useHBaseChecksum); 764 this.walRootDir = CommonFSUtils.getWALRootDir(this.conf); 765 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else 766 // underlying hadoop hdfs accessors will be going against wrong filesystem 767 // (unless all is set to defaults). 768 String rootDirUri = 769 CommonFSUtils.getDirUri(this.conf, new Path(conf.get(HConstants.HBASE_DIR))); 770 if (rootDirUri != null) { 771 CommonFSUtils.setFsDefault(this.conf, rootDirUri); 772 } 773 // init the filesystem 774 this.dataFs = new HFileSystem(this.conf, useHBaseChecksum); 775 this.dataRootDir = CommonFSUtils.getRootDir(this.conf); 776 this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir, 777 !canUpdateTableDescriptor(), cacheTableDescriptor()); 778 } 779 780 protected void login(UserProvider user, String host) throws IOException { 781 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 782 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 783 } 784 785 /** 786 * Wait for an active Master. 787 * See override in Master superclass for how it is used. 788 */ 789 protected void waitForMasterActive() {} 790 791 protected String getProcessName() { 792 return REGIONSERVER; 793 } 794 795 protected boolean canCreateBaseZNode() { 796 return this.masterless; 797 } 798 799 protected boolean canUpdateTableDescriptor() { 800 return false; 801 } 802 803 protected boolean cacheTableDescriptor() { 804 return false; 805 } 806 807 protected RSRpcServices createRpcServices() throws IOException { 808 return new RSRpcServices(this); 809 } 810 811 protected void configureInfoServer() { 812 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 813 infoServer.setAttribute(REGIONSERVER, this); 814 } 815 816 protected Class<? extends HttpServlet> getDumpServlet() { 817 return RSDumpServlet.class; 818 } 819 820 @Override 821 public boolean registerService(com.google.protobuf.Service instance) { 822 /* 823 * No stacking of instances is allowed for a single executorService name 824 */ 825 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 826 instance.getDescriptorForType(); 827 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 828 if (coprocessorServiceHandlers.containsKey(serviceName)) { 829 LOG.error("Coprocessor executorService " + serviceName 830 + " already registered, rejecting request from " + instance); 831 return false; 832 } 833 834 coprocessorServiceHandlers.put(serviceName, instance); 835 if (LOG.isDebugEnabled()) { 836 LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName); 837 } 838 return true; 839 } 840 841 protected ClusterConnection createClusterConnection() throws IOException { 842 Configuration conf = this.conf; 843 // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons: 844 // - Decouples RS and master life cycles. RegionServers can continue be up independent of 845 // masters' availability. 846 // - Configuration management for region servers (cluster internal) is much simpler when adding 847 // new masters or removing existing masters, since only clients' config needs to be updated. 848 // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for 849 // other internal connections too. 850 conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 851 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 852 if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 853 // Use server ZK cluster for server-issued connections, so we clone 854 // the conf and unset the client ZK related properties 855 conf = new Configuration(this.conf); 856 conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 857 } 858 // Create a cluster connection that when appropriate, can short-circuit and go directly to the 859 // local server if the request is to the local server bypassing RPC. Can be used for both local 860 // and remote invocations. 861 return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), 862 serverName, rpcServices, rpcServices); 863 } 864 865 /** 866 * Run test on configured codecs to make sure supporting libs are in place. 867 * @param c 868 * @throws IOException 869 */ 870 private static void checkCodecs(final Configuration c) throws IOException { 871 // check to see if the codec list is available: 872 String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null); 873 if (codecs == null) return; 874 for (String codec : codecs) { 875 if (!CompressionTest.testCompression(codec)) { 876 throw new IOException("Compression codec " + codec + 877 " not supported, aborting RS construction"); 878 } 879 } 880 } 881 882 public String getClusterId() { 883 return this.clusterId; 884 } 885 886 /** 887 * Setup our cluster connection if not already initialized. 888 * @throws IOException 889 */ 890 protected synchronized void setupClusterConnection() throws IOException { 891 if (clusterConnection == null) { 892 clusterConnection = createClusterConnection(); 893 } 894 } 895 896 /** 897 * All initialization needed before we go register with Master.<br> 898 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 899 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 900 */ 901 private void preRegistrationInitialization() { 902 try { 903 initializeZooKeeper(); 904 setupClusterConnection(); 905 // Setup RPC client for master communication 906 this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( 907 this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); 908 } catch (Throwable t) { 909 // Call stop if error or process will stick around for ever since server 910 // puts up non-daemon threads. 911 this.rpcServices.stop(); 912 abort("Initialization of RS failed. Hence aborting RS.", t); 913 } 914 } 915 916 /** 917 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 918 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 919 * <p> 920 * Finally open long-living server short-circuit connection. 921 */ 922 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 923 justification="cluster Id znode read would give us correct response") 924 private void initializeZooKeeper() throws IOException, InterruptedException { 925 // Nothing to do in here if no Master in the mix. 926 if (this.masterless) { 927 return; 928 } 929 930 // Create the master address tracker, register with zk, and start it. Then 931 // block until a master is available. No point in starting up if no master 932 // running. 933 blockAndCheckIfStopped(this.masterAddressTracker); 934 935 // Wait on cluster being up. Master will set this flag up in zookeeper 936 // when ready. 937 blockAndCheckIfStopped(this.clusterStatusTracker); 938 939 // If we are HMaster then the cluster id should have already been set. 940 if (clusterId == null) { 941 // Retrieve clusterId 942 // Since cluster status is now up 943 // ID should have already been set by HMaster 944 try { 945 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 946 if (clusterId == null) { 947 this.abort("Cluster ID has not been set"); 948 } 949 LOG.info("ClusterId : " + clusterId); 950 } catch (KeeperException e) { 951 this.abort("Failed to retrieve Cluster ID", e); 952 } 953 } 954 955 waitForMasterActive(); 956 if (isStopped() || isAborted()) { 957 return; // No need for further initialization 958 } 959 960 // watch for snapshots and other procedures 961 try { 962 rspmHost = new RegionServerProcedureManagerHost(); 963 rspmHost.loadProcedures(conf); 964 rspmHost.initialize(this); 965 } catch (KeeperException e) { 966 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 967 } 968 } 969 970 /** 971 * Utilty method to wait indefinitely on a znode availability while checking 972 * if the region server is shut down 973 * @param tracker znode tracker to use 974 * @throws IOException any IO exception, plus if the RS is stopped 975 * @throws InterruptedException if the waiting thread is interrupted 976 */ 977 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 978 throws IOException, InterruptedException { 979 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 980 if (this.stopped) { 981 throw new IOException("Received the shutdown message while waiting."); 982 } 983 } 984 } 985 986 /** 987 * @return True if the cluster is up. 988 */ 989 @Override 990 public boolean isClusterUp() { 991 return this.masterless || 992 (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 993 } 994 995 /** 996 * The HRegionServer sticks in this loop until closed. 997 */ 998 @Override 999 public void run() { 1000 if (isStopped()) { 1001 LOG.info("Skipping run; stopped"); 1002 return; 1003 } 1004 try { 1005 // Do pre-registration initializations; zookeeper, lease threads, etc. 1006 preRegistrationInitialization(); 1007 } catch (Throwable e) { 1008 abort("Fatal exception during initialization", e); 1009 } 1010 1011 try { 1012 if (!isStopped() && !isAborted()) { 1013 ShutdownHook.install(conf, dataFs, this, Thread.currentThread()); 1014 // Initialize the RegionServerCoprocessorHost now that our ephemeral 1015 // node was created, in case any coprocessors want to use ZooKeeper 1016 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 1017 1018 // Try and register with the Master; tell it we are here. Break if server is stopped or 1019 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 1020 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 1021 // come up. 1022 LOG.debug("About to register with Master."); 1023 RetryCounterFactory rcf = 1024 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 1025 RetryCounter rc = rcf.create(); 1026 while (keepLooping()) { 1027 RegionServerStartupResponse w = reportForDuty(); 1028 if (w == null) { 1029 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 1030 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 1031 this.sleeper.sleep(sleepTime); 1032 } else { 1033 handleReportForDutyResponse(w); 1034 break; 1035 } 1036 } 1037 } 1038 1039 if (!isStopped() && isHealthy()) { 1040 // start the snapshot handler and other procedure handlers, 1041 // since the server is ready to run 1042 if (this.rspmHost != null) { 1043 this.rspmHost.start(); 1044 } 1045 // Start the Quota Manager 1046 if (this.rsQuotaManager != null) { 1047 rsQuotaManager.start(getRpcServer().getScheduler()); 1048 } 1049 if (this.rsSpaceQuotaManager != null) { 1050 this.rsSpaceQuotaManager.start(); 1051 } 1052 } 1053 1054 // We registered with the Master. Go into run mode. 1055 long lastMsg = System.currentTimeMillis(); 1056 long oldRequestCount = -1; 1057 // The main run loop. 1058 while (!isStopped() && isHealthy()) { 1059 if (!isClusterUp()) { 1060 if (onlineRegions.isEmpty()) { 1061 stop("Exiting; cluster shutdown set and not carrying any regions"); 1062 } else if (!this.stopping) { 1063 this.stopping = true; 1064 LOG.info("Closing user regions"); 1065 closeUserRegions(this.abortRequested.get()); 1066 } else { 1067 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 1068 if (allUserRegionsOffline) { 1069 // Set stopped if no more write requests tp meta tables 1070 // since last time we went around the loop. Any open 1071 // meta regions will be closed on our way out. 1072 if (oldRequestCount == getWriteRequestCount()) { 1073 stop("Stopped; only catalog regions remaining online"); 1074 break; 1075 } 1076 oldRequestCount = getWriteRequestCount(); 1077 } else { 1078 // Make sure all regions have been closed -- some regions may 1079 // have not got it because we were splitting at the time of 1080 // the call to closeUserRegions. 1081 closeUserRegions(this.abortRequested.get()); 1082 } 1083 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 1084 } 1085 } 1086 long now = System.currentTimeMillis(); 1087 if ((now - lastMsg) >= msgInterval) { 1088 tryRegionServerReport(lastMsg, now); 1089 lastMsg = System.currentTimeMillis(); 1090 } 1091 if (!isStopped() && !isAborted()) { 1092 this.sleeper.sleep(); 1093 } 1094 } // for 1095 } catch (Throwable t) { 1096 if (!rpcServices.checkOOME(t)) { 1097 String prefix = t instanceof YouAreDeadException? "": "Unhandled: "; 1098 abort(prefix + t.getMessage(), t); 1099 } 1100 } 1101 1102 if (this.leaseManager != null) { 1103 this.leaseManager.closeAfterLeasesExpire(); 1104 } 1105 if (this.splitLogWorker != null) { 1106 splitLogWorker.stop(); 1107 } 1108 if (this.infoServer != null) { 1109 LOG.info("Stopping infoServer"); 1110 try { 1111 this.infoServer.stop(); 1112 } catch (Exception e) { 1113 LOG.error("Failed to stop infoServer", e); 1114 } 1115 } 1116 // Send cache a shutdown. 1117 if (blockCache != null) { 1118 blockCache.shutdown(); 1119 } 1120 if (mobFileCache != null) { 1121 mobFileCache.shutdown(); 1122 } 1123 1124 // Send interrupts to wake up threads if sleeping so they notice shutdown. 1125 // TODO: Should we check they are alive? If OOME could have exited already 1126 if (this.hMemManager != null) this.hMemManager.stop(); 1127 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); 1128 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); 1129 1130 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 1131 if (rspmHost != null) { 1132 rspmHost.stop(this.abortRequested.get() || this.killed); 1133 } 1134 1135 if (this.killed) { 1136 // Just skip out w/o closing regions. Used when testing. 1137 } else if (abortRequested.get()) { 1138 if (this.dataFsOk) { 1139 closeUserRegions(abortRequested.get()); // Don't leave any open file handles 1140 } 1141 LOG.info("aborting server " + this.serverName); 1142 } else { 1143 closeUserRegions(abortRequested.get()); 1144 LOG.info("stopping server " + this.serverName); 1145 } 1146 1147 if (this.clusterConnection != null && !clusterConnection.isClosed()) { 1148 try { 1149 this.clusterConnection.close(); 1150 } catch (IOException e) { 1151 // Although the {@link Closeable} interface throws an {@link 1152 // IOException}, in reality, the implementation would never do that. 1153 LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e); 1154 } 1155 } 1156 1157 // Closing the compactSplit thread before closing meta regions 1158 if (!this.killed && containsMetaTableRegions()) { 1159 if (!abortRequested.get() || this.dataFsOk) { 1160 if (this.compactSplitThread != null) { 1161 this.compactSplitThread.join(); 1162 this.compactSplitThread = null; 1163 } 1164 closeMetaTableRegions(abortRequested.get()); 1165 } 1166 } 1167 1168 if (!this.killed && this.dataFsOk) { 1169 waitOnAllRegionsToClose(abortRequested.get()); 1170 LOG.info("stopping server " + this.serverName + "; all regions closed."); 1171 } 1172 1173 // Stop the quota manager 1174 if (rsQuotaManager != null) { 1175 rsQuotaManager.stop(); 1176 } 1177 if (rsSpaceQuotaManager != null) { 1178 rsSpaceQuotaManager.stop(); 1179 rsSpaceQuotaManager = null; 1180 } 1181 1182 // flag may be changed when closing regions throws exception. 1183 if (this.dataFsOk) { 1184 shutdownWAL(!abortRequested.get()); 1185 } 1186 1187 // Make sure the proxy is down. 1188 if (this.rssStub != null) { 1189 this.rssStub = null; 1190 } 1191 if (this.lockStub != null) { 1192 this.lockStub = null; 1193 } 1194 if (this.rpcClient != null) { 1195 this.rpcClient.close(); 1196 } 1197 if (this.leaseManager != null) { 1198 this.leaseManager.close(); 1199 } 1200 if (this.pauseMonitor != null) { 1201 this.pauseMonitor.stop(); 1202 } 1203 1204 if (!killed) { 1205 stopServiceThreads(); 1206 } 1207 1208 if (this.rpcServices != null) { 1209 this.rpcServices.stop(); 1210 } 1211 1212 try { 1213 deleteMyEphemeralNode(); 1214 } catch (KeeperException.NoNodeException nn) { 1215 // pass 1216 } catch (KeeperException e) { 1217 LOG.warn("Failed deleting my ephemeral node", e); 1218 } 1219 // We may have failed to delete the znode at the previous step, but 1220 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1221 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1222 1223 if (this.zooKeeper != null) { 1224 this.zooKeeper.close(); 1225 } 1226 this.shutDown = true; 1227 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1228 } 1229 1230 private boolean containsMetaTableRegions() { 1231 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1232 } 1233 1234 private boolean areAllUserRegionsOffline() { 1235 if (getNumberOfOnlineRegions() > 2) return false; 1236 boolean allUserRegionsOffline = true; 1237 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 1238 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1239 allUserRegionsOffline = false; 1240 break; 1241 } 1242 } 1243 return allUserRegionsOffline; 1244 } 1245 1246 /** 1247 * @return Current write count for all online regions. 1248 */ 1249 private long getWriteRequestCount() { 1250 long writeCount = 0; 1251 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 1252 writeCount += e.getValue().getWriteRequestsCount(); 1253 } 1254 return writeCount; 1255 } 1256 1257 @InterfaceAudience.Private 1258 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1259 throws IOException { 1260 RegionServerStatusService.BlockingInterface rss = rssStub; 1261 if (rss == null) { 1262 // the current server could be stopping. 1263 return; 1264 } 1265 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1266 try { 1267 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1268 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1269 request.setLoad(sl); 1270 rss.regionServerReport(null, request.build()); 1271 } catch (ServiceException se) { 1272 IOException ioe = ProtobufUtil.getRemoteException(se); 1273 if (ioe instanceof YouAreDeadException) { 1274 // This will be caught and handled as a fatal error in run() 1275 throw ioe; 1276 } 1277 if (rssStub == rss) { 1278 rssStub = null; 1279 } 1280 // Couldn't connect to the master, get location from zk and reconnect 1281 // Method blocks until new master is found or we are stopped 1282 createRegionServerStatusStub(true); 1283 } 1284 } 1285 1286 /** 1287 * Reports the given map of Regions and their size on the filesystem to the active Master. 1288 * 1289 * @param regionSizeStore The store containing region sizes 1290 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1291 */ 1292 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1293 RegionServerStatusService.BlockingInterface rss = rssStub; 1294 if (rss == null) { 1295 // the current server could be stopping. 1296 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1297 return true; 1298 } 1299 try { 1300 buildReportAndSend(rss, regionSizeStore); 1301 } catch (ServiceException se) { 1302 IOException ioe = ProtobufUtil.getRemoteException(se); 1303 if (ioe instanceof PleaseHoldException) { 1304 LOG.trace("Failed to report region sizes to Master because it is initializing." 1305 + " This will be retried.", ioe); 1306 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1307 return true; 1308 } 1309 if (rssStub == rss) { 1310 rssStub = null; 1311 } 1312 createRegionServerStatusStub(true); 1313 if (ioe instanceof DoNotRetryIOException) { 1314 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1315 if (doNotRetryEx.getCause() != null) { 1316 Throwable t = doNotRetryEx.getCause(); 1317 if (t instanceof UnsupportedOperationException) { 1318 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1319 return false; 1320 } 1321 } 1322 } 1323 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1324 } 1325 return true; 1326 } 1327 1328 /** 1329 * Builds the region size report and sends it to the master. Upon successful sending of the 1330 * report, the region sizes that were sent are marked as sent. 1331 * 1332 * @param rss The stub to send to the Master 1333 * @param regionSizeStore The store containing region sizes 1334 */ 1335 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1336 RegionSizeStore regionSizeStore) throws ServiceException { 1337 RegionSpaceUseReportRequest request = 1338 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1339 rss.reportRegionSpaceUse(null, request); 1340 // Record the number of size reports sent 1341 if (metricsRegionServer != null) { 1342 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1343 } 1344 } 1345 1346 /** 1347 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1348 * 1349 * @param regionSizes The size in bytes of regions 1350 * @return The corresponding protocol buffer message. 1351 */ 1352 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1353 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1354 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1355 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1356 } 1357 return request.build(); 1358 } 1359 1360 /** 1361 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} 1362 * protobuf message. 1363 * 1364 * @param regionInfo The RegionInfo 1365 * @param sizeInBytes The size in bytes of the Region 1366 * @return The protocol buffer 1367 */ 1368 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1369 return RegionSpaceUse.newBuilder() 1370 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1371 .setRegionSize(Objects.requireNonNull(sizeInBytes)) 1372 .build(); 1373 } 1374 1375 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1376 throws IOException { 1377 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1378 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1379 // the wrapper to compute those numbers in one place. 1380 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1381 // Instead they should be stored in an HBase table so that external visibility into HBase is 1382 // improved; Additionally the load balancer will be able to take advantage of a more complete 1383 // history. 1384 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1385 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1386 long usedMemory = -1L; 1387 long maxMemory = -1L; 1388 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1389 if (usage != null) { 1390 usedMemory = usage.getUsed(); 1391 maxMemory = usage.getMax(); 1392 } 1393 1394 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1395 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1396 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1397 serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024)); 1398 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1399 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1400 Builder coprocessorBuilder = Coprocessor.newBuilder(); 1401 for (String coprocessor : coprocessors) { 1402 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1403 } 1404 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1405 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1406 for (HRegion region : regions) { 1407 if (region.getCoprocessorHost() != null) { 1408 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1409 for (String regionCoprocessor : regionCoprocessors) { 1410 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1411 } 1412 } 1413 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1414 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1415 .getCoprocessors()) { 1416 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1417 } 1418 } 1419 serverLoad.setReportStartTime(reportStartTime); 1420 serverLoad.setReportEndTime(reportEndTime); 1421 if (this.infoServer != null) { 1422 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1423 } else { 1424 serverLoad.setInfoServerPort(-1); 1425 } 1426 MetricsUserAggregateSource userSource = 1427 metricsRegionServer.getMetricsUserAggregate().getSource(); 1428 if (userSource != null) { 1429 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1430 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1431 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1432 } 1433 } 1434 // for the replicationLoad purpose. Only need to get from one executorService 1435 // either source or sink will get the same info 1436 ReplicationSourceService rsources = getReplicationSourceService(); 1437 1438 if (rsources != null) { 1439 // always refresh first to get the latest value 1440 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); 1441 if (rLoad != null) { 1442 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1443 for (ClusterStatusProtos.ReplicationLoadSource rLS : 1444 rLoad.getReplicationLoadSourceEntries()) { 1445 serverLoad.addReplLoadSource(rLS); 1446 } 1447 1448 } 1449 } 1450 1451 return serverLoad.build(); 1452 } 1453 1454 private String getOnlineRegionsAsPrintableString() { 1455 StringBuilder sb = new StringBuilder(); 1456 for (Region r: this.onlineRegions.values()) { 1457 if (sb.length() > 0) sb.append(", "); 1458 sb.append(r.getRegionInfo().getEncodedName()); 1459 } 1460 return sb.toString(); 1461 } 1462 1463 /** 1464 * Wait on regions close. 1465 */ 1466 private void waitOnAllRegionsToClose(final boolean abort) { 1467 // Wait till all regions are closed before going out. 1468 int lastCount = -1; 1469 long previousLogTime = 0; 1470 Set<String> closedRegions = new HashSet<>(); 1471 boolean interrupted = false; 1472 try { 1473 while (!onlineRegions.isEmpty()) { 1474 int count = getNumberOfOnlineRegions(); 1475 // Only print a message if the count of regions has changed. 1476 if (count != lastCount) { 1477 // Log every second at most 1478 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 1479 previousLogTime = System.currentTimeMillis(); 1480 lastCount = count; 1481 LOG.info("Waiting on " + count + " regions to close"); 1482 // Only print out regions still closing if a small number else will 1483 // swamp the log. 1484 if (count < 10 && LOG.isDebugEnabled()) { 1485 LOG.debug("Online Regions=" + this.onlineRegions); 1486 } 1487 } 1488 } 1489 // Ensure all user regions have been sent a close. Use this to 1490 // protect against the case where an open comes in after we start the 1491 // iterator of onlineRegions to close all user regions. 1492 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1493 RegionInfo hri = e.getValue().getRegionInfo(); 1494 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) && 1495 !closedRegions.contains(hri.getEncodedName())) { 1496 closedRegions.add(hri.getEncodedName()); 1497 // Don't update zk with this close transition; pass false. 1498 closeRegionIgnoreErrors(hri, abort); 1499 } 1500 } 1501 // No regions in RIT, we could stop waiting now. 1502 if (this.regionsInTransitionInRS.isEmpty()) { 1503 if (!onlineRegions.isEmpty()) { 1504 LOG.info("We were exiting though online regions are not empty," + 1505 " because some regions failed closing"); 1506 } 1507 break; 1508 } else { 1509 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream(). 1510 map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1511 } 1512 if (sleepInterrupted(200)) { 1513 interrupted = true; 1514 } 1515 } 1516 } finally { 1517 if (interrupted) { 1518 Thread.currentThread().interrupt(); 1519 } 1520 } 1521 } 1522 1523 private static boolean sleepInterrupted(long millis) { 1524 boolean interrupted = false; 1525 try { 1526 Thread.sleep(millis); 1527 } catch (InterruptedException e) { 1528 LOG.warn("Interrupted while sleeping"); 1529 interrupted = true; 1530 } 1531 return interrupted; 1532 } 1533 1534 private void shutdownWAL(final boolean close) { 1535 if (this.walFactory != null) { 1536 try { 1537 if (close) { 1538 walFactory.close(); 1539 } else { 1540 walFactory.shutdown(); 1541 } 1542 } catch (Throwable e) { 1543 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1544 LOG.error("Shutdown / close of WAL failed: " + e); 1545 LOG.debug("Shutdown / close exception details:", e); 1546 } 1547 } 1548 } 1549 1550 /** 1551 * get NamedQueue Provider to add different logs to ringbuffer 1552 * 1553 * @return NamedQueueRecorder 1554 */ 1555 public NamedQueueRecorder getNamedQueueRecorder() { 1556 return this.namedQueueRecorder; 1557 } 1558 1559 /* 1560 * Run init. Sets up wal and starts up all server threads. 1561 * 1562 * @param c Extra configuration. 1563 */ 1564 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1565 throws IOException { 1566 try { 1567 boolean updateRootDir = false; 1568 for (NameStringPair e : c.getMapEntriesList()) { 1569 String key = e.getName(); 1570 // The hostname the master sees us as. 1571 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1572 String hostnameFromMasterPOV = e.getValue(); 1573 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(), 1574 this.startcode); 1575 if (!StringUtils.isBlank(useThisHostnameInstead) && 1576 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) { 1577 String msg = "Master passed us a different hostname to use; was=" + 1578 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV; 1579 LOG.error(msg); 1580 throw new IOException(msg); 1581 } 1582 if (StringUtils.isBlank(useThisHostnameInstead) && 1583 !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) { 1584 String msg = "Master passed us a different hostname to use; was=" + 1585 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV; 1586 LOG.error(msg); 1587 } 1588 continue; 1589 } 1590 1591 String value = e.getValue(); 1592 if (key.equals(HConstants.HBASE_DIR)) { 1593 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1594 updateRootDir = true; 1595 } 1596 } 1597 1598 if (LOG.isDebugEnabled()) { 1599 LOG.debug("Config from master: " + key + "=" + value); 1600 } 1601 this.conf.set(key, value); 1602 } 1603 // Set our ephemeral znode up in zookeeper now we have a name. 1604 createMyEphemeralNode(); 1605 1606 if (updateRootDir) { 1607 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1608 initializeFileSystem(); 1609 } 1610 1611 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1612 // config param for task trackers, but we can piggyback off of it. 1613 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1614 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1615 } 1616 1617 // Save it in a file, this will allow to see if we crash 1618 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1619 1620 // This call sets up an initialized replication and WAL. Later we start it up. 1621 setupWALAndReplication(); 1622 // Init in here rather than in constructor after thread name has been set 1623 final MetricsTable metricsTable = 1624 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1625 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1626 this.metricsRegionServer = new MetricsRegionServer(metricsRegionServerImpl, 1627 conf, metricsTable); 1628 // Now that we have a metrics source, start the pause monitor 1629 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1630 pauseMonitor.start(); 1631 1632 // There is a rare case where we do NOT want services to start. Check config. 1633 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1634 startServices(); 1635 } 1636 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1637 // or make sense of it. 1638 startReplicationService(); 1639 1640 1641 // Set up ZK 1642 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + 1643 ", sessionid=0x" + 1644 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1645 1646 // Wake up anyone waiting for this server to online 1647 synchronized (online) { 1648 online.set(true); 1649 online.notifyAll(); 1650 } 1651 } catch (Throwable e) { 1652 stop("Failed initialization"); 1653 throw convertThrowableToIOE(cleanup(e, "Failed init"), 1654 "Region server startup failed"); 1655 } finally { 1656 sleeper.skipSleepCycle(); 1657 } 1658 } 1659 1660 protected void initializeMemStoreChunkCreator() { 1661 if (MemStoreLAB.isEnabled(conf)) { 1662 // MSLAB is enabled. So initialize MemStoreChunkPool 1663 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from 1664 // it. 1665 Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf); 1666 long globalMemStoreSize = pair.getFirst(); 1667 boolean offheap = this.regionServerAccounting.isOffheap(); 1668 // When off heap memstore in use, take full area for chunk pool. 1669 float poolSizePercentage = offheap ? 1.0F : 1670 conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); 1671 float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, 1672 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); 1673 int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); 1674 float indexChunkSizePercent = conf.getFloat(MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_KEY, 1675 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1676 // init the chunkCreator 1677 ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, 1678 initialCountPercentage, this.hMemManager, indexChunkSizePercent); 1679 } 1680 } 1681 1682 private void startHeapMemoryManager() { 1683 if (this.blockCache != null) { 1684 this.hMemManager = 1685 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1686 this.hMemManager.start(getChoreService()); 1687 } 1688 } 1689 1690 private void createMyEphemeralNode() throws KeeperException { 1691 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1692 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1693 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1694 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1695 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1696 } 1697 1698 private void deleteMyEphemeralNode() throws KeeperException { 1699 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1700 } 1701 1702 @Override 1703 public RegionServerAccounting getRegionServerAccounting() { 1704 return regionServerAccounting; 1705 } 1706 1707 /** 1708 * @param r Region to get RegionLoad for. 1709 * @param regionLoadBldr the RegionLoad.Builder, can be null 1710 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1711 * @return RegionLoad instance. 1712 */ 1713 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1714 RegionSpecifier.Builder regionSpecifier) throws IOException { 1715 byte[] name = r.getRegionInfo().getRegionName(); 1716 int stores = 0; 1717 int storefiles = 0; 1718 int storeRefCount = 0; 1719 int maxCompactedStoreFileRefCount = 0; 1720 int storeUncompressedSizeMB = 0; 1721 int storefileSizeMB = 0; 1722 int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024); 1723 long storefileIndexSizeKB = 0; 1724 int rootLevelIndexSizeKB = 0; 1725 int totalStaticIndexSizeKB = 0; 1726 int totalStaticBloomSizeKB = 0; 1727 long totalCompactingKVs = 0; 1728 long currentCompactedKVs = 0; 1729 List<HStore> storeList = r.getStores(); 1730 stores += storeList.size(); 1731 for (HStore store : storeList) { 1732 storefiles += store.getStorefilesCount(); 1733 int currentStoreRefCount = store.getStoreRefCount(); 1734 storeRefCount += currentStoreRefCount; 1735 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1736 maxCompactedStoreFileRefCount = Math.max(maxCompactedStoreFileRefCount, 1737 currentMaxCompactedStoreFileRefCount); 1738 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024); 1739 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); 1740 //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1741 storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024; 1742 CompactionProgress progress = store.getCompactionProgress(); 1743 if (progress != null) { 1744 totalCompactingKVs += progress.getTotalCompactingKVs(); 1745 currentCompactedKVs += progress.currentCompactedKVs; 1746 } 1747 rootLevelIndexSizeKB += (int) (store.getStorefilesRootLevelIndexSize() / 1024); 1748 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024); 1749 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024); 1750 } 1751 1752 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); 1753 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); 1754 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname()); 1755 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); 1756 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); 1757 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); 1758 if (regionLoadBldr == null) { 1759 regionLoadBldr = RegionLoad.newBuilder(); 1760 } 1761 if (regionSpecifier == null) { 1762 regionSpecifier = RegionSpecifier.newBuilder(); 1763 } 1764 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1765 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1766 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()) 1767 .setStores(stores) 1768 .setStorefiles(storefiles) 1769 .setStoreRefCount(storeRefCount) 1770 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1771 .setStoreUncompressedSizeMB(storeUncompressedSizeMB) 1772 .setStorefileSizeMB(storefileSizeMB) 1773 .setMemStoreSizeMB(memstoreSizeMB) 1774 .setStorefileIndexSizeKB(storefileIndexSizeKB) 1775 .setRootIndexSizeKB(rootLevelIndexSizeKB) 1776 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1777 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1778 .setReadRequestsCount(r.getReadRequestsCount()) 1779 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1780 .setWriteRequestsCount(r.getWriteRequestsCount()) 1781 .setTotalCompactingKVs(totalCompactingKVs) 1782 .setCurrentCompactedKVs(currentCompactedKVs) 1783 .setDataLocality(dataLocality) 1784 .setDataLocalityForSsd(dataLocalityForSsd) 1785 .setBlocksLocalWeight(blocksLocalWeight) 1786 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight) 1787 .setBlocksTotalWeight(blocksTotalWeight) 1788 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) 1789 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); 1790 r.setCompleteSequenceId(regionLoadBldr); 1791 return regionLoadBldr.build(); 1792 } 1793 1794 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1795 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1796 userLoadBldr.setUserName(user); 1797 userSource.getClientMetrics().values().stream().map( 1798 clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1799 .setHostName(clientMetrics.getHostName()) 1800 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1801 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1802 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) 1803 .forEach(userLoadBldr::addClientMetrics); 1804 return userLoadBldr.build(); 1805 } 1806 1807 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1808 HRegion r = onlineRegions.get(encodedRegionName); 1809 return r != null ? createRegionLoad(r, null, null) : null; 1810 } 1811 1812 /** 1813 * Inner class that runs on a long period checking if regions need compaction. 1814 */ 1815 private static class CompactionChecker extends ScheduledChore { 1816 private final HRegionServer instance; 1817 private final int majorCompactPriority; 1818 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1819 //Iteration is 1-based rather than 0-based so we don't check for compaction 1820 // immediately upon region server startup 1821 private long iteration = 1; 1822 1823 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1824 super("CompactionChecker", stopper, sleepTime); 1825 this.instance = h; 1826 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1827 1828 /* MajorCompactPriority is configurable. 1829 * If not set, the compaction will use default priority. 1830 */ 1831 this.majorCompactPriority = this.instance.conf. 1832 getInt("hbase.regionserver.compactionChecker.majorCompactPriority", 1833 DEFAULT_PRIORITY); 1834 } 1835 1836 @Override 1837 protected void chore() { 1838 for (Region r : this.instance.onlineRegions.values()) { 1839 if (r == null) { 1840 continue; 1841 } 1842 HRegion hr = (HRegion) r; 1843 for (HStore s : hr.stores.values()) { 1844 try { 1845 long multiplier = s.getCompactionCheckMultiplier(); 1846 assert multiplier > 0; 1847 if (iteration % multiplier != 0) { 1848 continue; 1849 } 1850 if (s.needsCompaction()) { 1851 // Queue a compaction. Will recognize if major is needed. 1852 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1853 getName() + " requests compaction"); 1854 } else if (s.shouldPerformMajorCompaction()) { 1855 s.triggerMajorCompaction(); 1856 if (majorCompactPriority == DEFAULT_PRIORITY || 1857 majorCompactPriority > hr.getCompactPriority()) { 1858 this.instance.compactSplitThread.requestCompaction(hr, s, 1859 getName() + " requests major compaction; use default priority", 1860 Store.NO_PRIORITY, 1861 CompactionLifeCycleTracker.DUMMY, null); 1862 } else { 1863 this.instance.compactSplitThread.requestCompaction(hr, s, 1864 getName() + " requests major compaction; use configured priority", 1865 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1866 } 1867 } 1868 } catch (IOException e) { 1869 LOG.warn("Failed major compaction check on " + r, e); 1870 } 1871 } 1872 } 1873 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1874 } 1875 } 1876 1877 private static class PeriodicMemStoreFlusher extends ScheduledChore { 1878 private final HRegionServer server; 1879 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1880 private final static int MIN_DELAY_TIME = 0; // millisec 1881 private final long rangeOfDelayMs; 1882 1883 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1884 super("MemstoreFlusherChore", server, cacheFlushInterval); 1885 this.server = server; 1886 1887 final long configuredRangeOfDelay = server.getConfiguration().getInt( 1888 "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 1889 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 1890 } 1891 1892 @Override 1893 protected void chore() { 1894 final StringBuilder whyFlush = new StringBuilder(); 1895 for (HRegion r : this.server.onlineRegions.values()) { 1896 if (r == null) continue; 1897 if (r.shouldFlush(whyFlush)) { 1898 FlushRequester requester = server.getFlushRequester(); 1899 if (requester != null) { 1900 long randomDelay = RandomUtils.nextLong(0, rangeOfDelayMs) + MIN_DELAY_TIME; 1901 //Throttle the flushes by putting a delay. If we don't throttle, and there 1902 //is a balanced write-load on the regions in a table, we might end up 1903 //overwhelming the filesystem with too many flushes at once. 1904 if (requester.requestDelayedFlush(r, randomDelay)) { 1905 LOG.info("{} requesting flush of {} because {} after random delay {} ms", 1906 getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), 1907 randomDelay); 1908 } 1909 } 1910 } 1911 } 1912 } 1913 } 1914 1915 /** 1916 * Report the status of the server. A server is online once all the startup is 1917 * completed (setting up filesystem, starting executorService threads, etc.). This 1918 * method is designed mostly to be useful in tests. 1919 * 1920 * @return true if online, false if not. 1921 */ 1922 public boolean isOnline() { 1923 return online.get(); 1924 } 1925 1926 /** 1927 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1928 * be hooked up to WAL. 1929 */ 1930 private void setupWALAndReplication() throws IOException { 1931 WALFactory factory = new WALFactory(conf, serverName.toString(), (Server)this); 1932 // TODO Replication make assumptions here based on the default filesystem impl 1933 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1934 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1935 1936 Path logDir = new Path(walRootDir, logName); 1937 LOG.debug("logDir={}", logDir); 1938 if (this.walFs.exists(logDir)) { 1939 throw new RegionServerRunningException( 1940 "Region server has already created directory at " + this.serverName.toString()); 1941 } 1942 // Always create wal directory as now we need this when master restarts to find out the live 1943 // region servers. 1944 if (!this.walFs.mkdirs(logDir)) { 1945 throw new IOException("Can not create wal directory " + logDir); 1946 } 1947 // Instantiate replication if replication enabled. Pass it the log directories. 1948 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); 1949 this.walFactory = factory; 1950 } 1951 1952 /** 1953 * Start up replication source and sink handlers. 1954 */ 1955 private void startReplicationService() throws IOException { 1956 if (this.replicationSourceHandler == this.replicationSinkHandler && 1957 this.replicationSourceHandler != null) { 1958 this.replicationSourceHandler.startReplicationService(); 1959 } else { 1960 if (this.replicationSourceHandler != null) { 1961 this.replicationSourceHandler.startReplicationService(); 1962 } 1963 if (this.replicationSinkHandler != null) { 1964 this.replicationSinkHandler.startReplicationService(); 1965 } 1966 } 1967 } 1968 1969 /** 1970 * @return Master address tracker instance. 1971 */ 1972 public MasterAddressTracker getMasterAddressTracker() { 1973 return this.masterAddressTracker; 1974 } 1975 1976 /** 1977 * Start maintenance Threads, Server, Worker and lease checker threads. 1978 * Start all threads we need to run. This is called after we've successfully 1979 * registered with the Master. 1980 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we 1981 * get an unhandled exception. We cannot set the handler on all threads. 1982 * Server's internal Listener thread is off limits. For Server, if an OOME, it 1983 * waits a while then retries. Meantime, a flush or a compaction that tries to 1984 * run should trigger same critical condition and the shutdown will run. On 1985 * its way out, this server will shut down Server. Leases are sort of 1986 * inbetween. It has an internal thread that while it inherits from Chore, it 1987 * keeps its own internal stop mechanism so needs to be stopped by this 1988 * hosting server. Worker logs the exception and exits. 1989 */ 1990 private void startServices() throws IOException { 1991 if (!isStopped() && !isAborted()) { 1992 initializeThreads(); 1993 } 1994 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); 1995 this.secureBulkLoadManager.start(); 1996 1997 // Health checker thread. 1998 if (isHealthCheckerConfigured()) { 1999 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 2000 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 2001 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 2002 } 2003 2004 this.walRoller = new LogRoller(this); 2005 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 2006 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 2007 2008 // Create the CompactedFileDischarger chore executorService. This chore helps to 2009 // remove the compacted files that will no longer be used in reads. 2010 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 2011 // 2 mins so that compacted files can be archived before the TTLCleaner runs 2012 int cleanerInterval = 2013 conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 2014 this.compactedFileDischarger = 2015 new CompactedHFilesDischarger(cleanerInterval, this, this); 2016 choreService.scheduleChore(compactedFileDischarger); 2017 2018 // Start executor services 2019 this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION, 2020 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 2021 this.executorService.startExecutorService(ExecutorType.RS_OPEN_META, 2022 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); 2023 this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, 2024 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); 2025 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION, 2026 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); 2027 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META, 2028 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); 2029 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 2030 this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, 2031 conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); 2032 } 2033 this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( 2034 HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); 2035 // Start the threads for compacted files discharger 2036 this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, 2037 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); 2038 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 2039 this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, 2040 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 2041 conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); 2042 } 2043 this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER, 2044 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); 2045 this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE, 2046 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1)); 2047 2048 Threads 2049 .setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); 2050 if (this.cacheFlusher != null) { 2051 this.cacheFlusher.start(uncaughtExceptionHandler); 2052 } 2053 Threads.setDaemonThreadRunning(this.procedureResultReporter, 2054 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 2055 2056 if (this.compactionChecker != null) { 2057 choreService.scheduleChore(compactionChecker); 2058 } 2059 if (this.periodicFlusher != null) { 2060 choreService.scheduleChore(periodicFlusher); 2061 } 2062 if (this.healthCheckChore != null) { 2063 choreService.scheduleChore(healthCheckChore); 2064 } 2065 if (this.nonceManagerChore != null) { 2066 choreService.scheduleChore(nonceManagerChore); 2067 } 2068 if (this.storefileRefresher != null) { 2069 choreService.scheduleChore(storefileRefresher); 2070 } 2071 if (this.fsUtilizationChore != null) { 2072 choreService.scheduleChore(fsUtilizationChore); 2073 } 2074 if (this.slowLogTableOpsChore != null) { 2075 choreService.scheduleChore(slowLogTableOpsChore); 2076 } 2077 2078 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 2079 // an unhandled exception, it will just exit. 2080 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 2081 uncaughtExceptionHandler); 2082 2083 // Create the log splitting worker and start it 2084 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 2085 // quite a while inside Connection layer. The worker won't be available for other 2086 // tasks even after current task is preempted after a split task times out. 2087 Configuration sinkConf = HBaseConfiguration.create(conf); 2088 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2089 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 2090 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2091 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 2092 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 2093 if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 2094 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { 2095 // SplitLogWorker needs csm. If none, don't start this. 2096 this.splitLogWorker = new SplitLogWorker(sinkConf, this, 2097 this, walFactory); 2098 splitLogWorker.start(); 2099 LOG.debug("SplitLogWorker started"); 2100 } 2101 2102 // Memstore services. 2103 startHeapMemoryManager(); 2104 // Call it after starting HeapMemoryManager. 2105 initializeMemStoreChunkCreator(); 2106 } 2107 2108 private void initializeThreads() { 2109 // Cache flushing thread. 2110 this.cacheFlusher = new MemStoreFlusher(conf, this); 2111 2112 // Compaction thread 2113 this.compactSplitThread = new CompactSplit(this); 2114 2115 // Background thread to check for compactions; needed if region has not gotten updates 2116 // in a while. It will take care of not checking too frequently on store-by-store basis. 2117 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 2118 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 2119 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 2120 2121 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 2122 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 2123 if (isSlowLogTableEnabled) { 2124 // default chore duration: 10 min 2125 final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); 2126 slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder); 2127 } 2128 2129 if (this.nonceManager != null) { 2130 // Create the scheduled chore that cleans up nonces. 2131 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2132 } 2133 2134 // Setup the Quota Manager 2135 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2136 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2137 2138 if (QuotaUtil.isQuotaEnabled(conf)) { 2139 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2140 } 2141 2142 2143 boolean onlyMetaRefresh = false; 2144 int storefileRefreshPeriod = conf.getInt( 2145 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2146 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2147 if (storefileRefreshPeriod == 0) { 2148 storefileRefreshPeriod = conf.getInt( 2149 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2150 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2151 onlyMetaRefresh = true; 2152 } 2153 if (storefileRefreshPeriod > 0) { 2154 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, 2155 onlyMetaRefresh, this, this); 2156 } 2157 registerConfigurationObservers(); 2158 } 2159 2160 private void registerConfigurationObservers() { 2161 // Registering the compactSplitThread object with the ConfigurationManager. 2162 configurationManager.registerObserver(this.compactSplitThread); 2163 configurationManager.registerObserver(this.rpcServices); 2164 configurationManager.registerObserver(this); 2165 } 2166 2167 /** 2168 * Puts up the webui. 2169 */ 2170 private void putUpWebUI() throws IOException { 2171 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 2172 HConstants.DEFAULT_REGIONSERVER_INFOPORT); 2173 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); 2174 2175 if(this instanceof HMaster) { 2176 port = conf.getInt(HConstants.MASTER_INFO_PORT, 2177 HConstants.DEFAULT_MASTER_INFOPORT); 2178 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); 2179 } 2180 // -1 is for disabling info server 2181 if (port < 0) { 2182 return; 2183 } 2184 2185 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { 2186 String msg = 2187 "Failed to start http info server. Address " + addr 2188 + " does not belong to this host. Correct configuration parameter: " 2189 + "hbase.regionserver.info.bindAddress"; 2190 LOG.error(msg); 2191 throw new IOException(msg); 2192 } 2193 // check if auto port bind enabled 2194 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false); 2195 while (true) { 2196 try { 2197 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf); 2198 infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet()); 2199 configureInfoServer(); 2200 this.infoServer.start(); 2201 break; 2202 } catch (BindException e) { 2203 if (!auto) { 2204 // auto bind disabled throw BindException 2205 LOG.error("Failed binding http info server to port: " + port); 2206 throw e; 2207 } 2208 // auto bind enabled, try to use another port 2209 LOG.info("Failed binding http info server to port: " + port); 2210 port++; 2211 } 2212 } 2213 port = this.infoServer.getPort(); 2214 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port); 2215 int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT, 2216 HConstants.DEFAULT_MASTER_INFOPORT); 2217 conf.setInt("hbase.master.info.port.orig", masterInfoPort); 2218 conf.setInt(HConstants.MASTER_INFO_PORT, port); 2219 } 2220 2221 /* 2222 * Verify that server is healthy 2223 */ 2224 private boolean isHealthy() { 2225 if (!dataFsOk) { 2226 // File system problem 2227 return false; 2228 } 2229 // Verify that all threads are alive 2230 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2231 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2232 && (this.walRoller == null || this.walRoller.isAlive()) 2233 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2234 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2235 if (!healthy) { 2236 stop("One or more threads are no longer alive -- stop"); 2237 } 2238 return healthy; 2239 } 2240 2241 @Override 2242 public List<WAL> getWALs() { 2243 return walFactory.getWALs(); 2244 } 2245 2246 @Override 2247 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2248 try { 2249 WAL wal = walFactory.getWAL(regionInfo); 2250 if (this.walRoller != null) { 2251 this.walRoller.addWAL(wal); 2252 } 2253 return wal; 2254 } catch (FailedCloseWALAfterInitializedErrorException ex) { 2255 // see HBASE-21751 for details 2256 abort("WAL can not clean up after init failed", ex); 2257 throw ex; 2258 } 2259 } 2260 2261 public LogRoller getWalRoller() { 2262 return walRoller; 2263 } 2264 2265 WALFactory getWalFactory() { 2266 return walFactory; 2267 } 2268 2269 @Override 2270 public Connection getConnection() { 2271 return getClusterConnection(); 2272 } 2273 2274 @Override 2275 public ClusterConnection getClusterConnection() { 2276 return this.clusterConnection; 2277 } 2278 2279 @Override 2280 public void stop(final String msg) { 2281 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2282 } 2283 2284 /** 2285 * Stops the regionserver. 2286 * @param msg Status message 2287 * @param force True if this is a regionserver abort 2288 * @param user The user executing the stop request, or null if no user is associated 2289 */ 2290 public void stop(final String msg, final boolean force, final User user) { 2291 if (!this.stopped) { 2292 LOG.info("***** STOPPING region server '" + this + "' *****"); 2293 if (this.rsHost != null) { 2294 // when forced via abort don't allow CPs to override 2295 try { 2296 this.rsHost.preStop(msg, user); 2297 } catch (IOException ioe) { 2298 if (!force) { 2299 LOG.warn("The region server did not stop", ioe); 2300 return; 2301 } 2302 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2303 } 2304 } 2305 this.stopped = true; 2306 LOG.info("STOPPED: " + msg); 2307 // Wakes run() if it is sleeping 2308 sleeper.skipSleepCycle(); 2309 } 2310 } 2311 2312 public void waitForServerOnline(){ 2313 while (!isStopped() && !isOnline()) { 2314 synchronized (online) { 2315 try { 2316 online.wait(msgInterval); 2317 } catch (InterruptedException ie) { 2318 Thread.currentThread().interrupt(); 2319 break; 2320 } 2321 } 2322 } 2323 } 2324 2325 @Override 2326 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2327 HRegion r = context.getRegion(); 2328 long openProcId = context.getOpenProcId(); 2329 long masterSystemTime = context.getMasterSystemTime(); 2330 rpcServices.checkOpen(); 2331 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2332 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2333 // Do checks to see if we need to compact (references or too many files) 2334 for (HStore s : r.stores.values()) { 2335 if (s.hasReferences() || s.needsCompaction()) { 2336 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2337 } 2338 } 2339 long openSeqNum = r.getOpenSeqNum(); 2340 if (openSeqNum == HConstants.NO_SEQNUM) { 2341 // If we opened a region, we should have read some sequence number from it. 2342 LOG.error( 2343 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2344 openSeqNum = 0; 2345 } 2346 2347 // Notify master 2348 if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2349 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) { 2350 throw new IOException( 2351 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2352 } 2353 2354 triggerFlushInPrimaryRegion(r); 2355 2356 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2357 } 2358 2359 /** 2360 * Helper method for use in tests. Skip the region transition report when there's no master 2361 * around to receive it. 2362 */ 2363 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2364 final TransitionCode code = context.getCode(); 2365 final long openSeqNum = context.getOpenSeqNum(); 2366 long masterSystemTime = context.getMasterSystemTime(); 2367 final RegionInfo[] hris = context.getHris(); 2368 2369 if (code == TransitionCode.OPENED) { 2370 Preconditions.checkArgument(hris != null && hris.length == 1); 2371 if (hris[0].isMetaRegion()) { 2372 try { 2373 MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, 2374 hris[0].getReplicaId(), RegionState.State.OPEN); 2375 } catch (KeeperException e) { 2376 LOG.info("Failed to update meta location", e); 2377 return false; 2378 } 2379 } else { 2380 try { 2381 MetaTableAccessor.updateRegionLocation(clusterConnection, 2382 hris[0], serverName, openSeqNum, masterSystemTime); 2383 } catch (IOException e) { 2384 LOG.info("Failed to update meta", e); 2385 return false; 2386 } 2387 } 2388 } 2389 return true; 2390 } 2391 2392 private ReportRegionStateTransitionRequest createReportRegionStateTransitionRequest( 2393 final RegionStateTransitionContext context) { 2394 final TransitionCode code = context.getCode(); 2395 final long openSeqNum = context.getOpenSeqNum(); 2396 final RegionInfo[] hris = context.getHris(); 2397 final long[] procIds = context.getProcIds(); 2398 2399 ReportRegionStateTransitionRequest.Builder builder = 2400 ReportRegionStateTransitionRequest.newBuilder(); 2401 builder.setServer(ProtobufUtil.toServerName(serverName)); 2402 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2403 transition.setTransitionCode(code); 2404 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2405 transition.setOpenSeqNum(openSeqNum); 2406 } 2407 for (RegionInfo hri: hris) { 2408 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2409 } 2410 for (long procId: procIds) { 2411 transition.addProcId(procId); 2412 } 2413 2414 return builder.build(); 2415 } 2416 2417 @Override 2418 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2419 if (TEST_SKIP_REPORTING_TRANSITION) { 2420 return skipReportingTransition(context); 2421 } 2422 final ReportRegionStateTransitionRequest request = 2423 createReportRegionStateTransitionRequest(context); 2424 2425 // Time to pause if master says 'please hold'. Make configurable if needed. 2426 final long initPauseTime = 1000; 2427 int tries = 0; 2428 long pauseTime; 2429 // Keep looping till we get an error. We want to send reports even though server is going down. 2430 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2431 // HRegionServer does down. 2432 while (this.clusterConnection != null && !this.clusterConnection.isClosed()) { 2433 RegionServerStatusService.BlockingInterface rss = rssStub; 2434 try { 2435 if (rss == null) { 2436 createRegionServerStatusStub(); 2437 continue; 2438 } 2439 ReportRegionStateTransitionResponse response = 2440 rss.reportRegionStateTransition(null, request); 2441 if (response.hasErrorMessage()) { 2442 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2443 break; 2444 } 2445 // Log if we had to retry else don't log unless TRACE. We want to 2446 // know if were successful after an attempt showed in logs as failed. 2447 if (tries > 0 || LOG.isTraceEnabled()) { 2448 LOG.info("TRANSITION REPORTED " + request); 2449 } 2450 // NOTE: Return mid-method!!! 2451 return true; 2452 } catch (ServiceException se) { 2453 IOException ioe = ProtobufUtil.getRemoteException(se); 2454 boolean pause = 2455 ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException 2456 || ioe instanceof CallQueueTooBigException; 2457 if (pause) { 2458 // Do backoff else we flood the Master with requests. 2459 pauseTime = ConnectionUtils.getPauseTime(initPauseTime, tries); 2460 } else { 2461 pauseTime = initPauseTime; // Reset. 2462 } 2463 LOG.info("Failed report transition " + 2464 TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" + 2465 (pause? 2466 " after " + pauseTime + "ms delay (Master is coming online...).": 2467 " immediately."), 2468 ioe); 2469 if (pause) Threads.sleep(pauseTime); 2470 tries++; 2471 if (rssStub == rss) { 2472 rssStub = null; 2473 } 2474 } 2475 } 2476 return false; 2477 } 2478 2479 /** 2480 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2481 * block this thread. See RegionReplicaFlushHandler for details. 2482 */ 2483 private void triggerFlushInPrimaryRegion(final HRegion region) { 2484 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2485 return; 2486 } 2487 TableName tn = region.getTableDescriptor().getTableName(); 2488 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) || 2489 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf)) { 2490 region.setReadsEnabled(true); 2491 return; 2492 } 2493 2494 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2495 // RegionReplicaFlushHandler might reset this. 2496 2497 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2498 if (this.executorService != null) { 2499 this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, 2500 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); 2501 } else { 2502 LOG.info("Executor is null; not running flush of primary region replica for {}", 2503 region.getRegionInfo()); 2504 } 2505 } 2506 2507 @Override 2508 public RpcServerInterface getRpcServer() { 2509 return rpcServices.rpcServer; 2510 } 2511 2512 @InterfaceAudience.Private 2513 public RSRpcServices getRSRpcServices() { 2514 return rpcServices; 2515 } 2516 2517 /** 2518 * Cause the server to exit without closing the regions it is serving, the log 2519 * it is using and without notifying the master. Used unit testing and on 2520 * catastrophic events such as HDFS is yanked out from under hbase or we OOME. 2521 * 2522 * @param reason 2523 * the reason we are aborting 2524 * @param cause 2525 * the exception that caused the abort, or null 2526 */ 2527 @Override 2528 public void abort(String reason, Throwable cause) { 2529 if (!setAbortRequested()) { 2530 // Abort already in progress, ignore the new request. 2531 LOG.debug( 2532 "Abort already in progress. Ignoring the current request with reason: {}", reason); 2533 return; 2534 } 2535 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2536 if (cause != null) { 2537 LOG.error(HBaseMarkers.FATAL, msg, cause); 2538 } else { 2539 LOG.error(HBaseMarkers.FATAL, msg); 2540 } 2541 // HBASE-4014: show list of coprocessors that were loaded to help debug 2542 // regionserver crashes.Note that we're implicitly using 2543 // java.util.HashSet's toString() method to print the coprocessor names. 2544 LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " + 2545 CoprocessorHost.getLoadedCoprocessors()); 2546 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2547 try { 2548 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2549 } catch (MalformedObjectNameException | IOException e) { 2550 LOG.warn("Failed dumping metrics", e); 2551 } 2552 2553 // Do our best to report our abort to the master, but this may not work 2554 try { 2555 if (cause != null) { 2556 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2557 } 2558 // Report to the master but only if we have already registered with the master. 2559 RegionServerStatusService.BlockingInterface rss = rssStub; 2560 if (rss != null && this.serverName != null) { 2561 ReportRSFatalErrorRequest.Builder builder = 2562 ReportRSFatalErrorRequest.newBuilder(); 2563 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2564 builder.setErrorMessage(msg); 2565 rss.reportRSFatalError(null, builder.build()); 2566 } 2567 } catch (Throwable t) { 2568 LOG.warn("Unable to report fatal error to master", t); 2569 } 2570 2571 scheduleAbortTimer(); 2572 // shutdown should be run as the internal user 2573 stop(reason, true, null); 2574 } 2575 2576 /** 2577 * Sets the abort state if not already set. 2578 * @return True if abortRequested set to True successfully, false if an abort is already in 2579 * progress. 2580 */ 2581 protected boolean setAbortRequested() { 2582 return abortRequested.compareAndSet(false, true); 2583 } 2584 2585 /** 2586 * @see HRegionServer#abort(String, Throwable) 2587 */ 2588 public void abort(String reason) { 2589 abort(reason, null); 2590 } 2591 2592 @Override 2593 public boolean isAborted() { 2594 return abortRequested.get(); 2595 } 2596 2597 /* 2598 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup 2599 * logs but it does close socket in case want to bring up server on old 2600 * hostname+port immediately. 2601 */ 2602 @InterfaceAudience.Private 2603 protected void kill() { 2604 this.killed = true; 2605 abort("Simulated kill"); 2606 } 2607 2608 // Limits the time spent in the shutdown process. 2609 private void scheduleAbortTimer() { 2610 if (this.abortMonitor == null) { 2611 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2612 TimerTask abortTimeoutTask = null; 2613 try { 2614 Constructor<? extends TimerTask> timerTaskCtor = 2615 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2616 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2617 timerTaskCtor.setAccessible(true); 2618 abortTimeoutTask = timerTaskCtor.newInstance(); 2619 } catch (Exception e) { 2620 LOG.warn("Initialize abort timeout task failed", e); 2621 } 2622 if (abortTimeoutTask != null) { 2623 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2624 } 2625 } 2626 } 2627 2628 /** 2629 * Wait on all threads to finish. Presumption is that all closes and stops 2630 * have already been called. 2631 */ 2632 protected void stopServiceThreads() { 2633 // clean up the scheduled chores 2634 if (this.choreService != null) { 2635 choreService.cancelChore(nonceManagerChore); 2636 choreService.cancelChore(compactionChecker); 2637 choreService.cancelChore(periodicFlusher); 2638 choreService.cancelChore(healthCheckChore); 2639 choreService.cancelChore(storefileRefresher); 2640 choreService.cancelChore(fsUtilizationChore); 2641 choreService.cancelChore(slowLogTableOpsChore); 2642 // clean up the remaining scheduled chores (in case we missed out any) 2643 choreService.shutdown(); 2644 } 2645 2646 if (this.cacheFlusher != null) { 2647 this.cacheFlusher.join(); 2648 } 2649 2650 if (this.spanReceiverHost != null) { 2651 this.spanReceiverHost.closeReceivers(); 2652 } 2653 if (this.walRoller != null) { 2654 this.walRoller.close(); 2655 } 2656 if (this.compactSplitThread != null) { 2657 this.compactSplitThread.join(); 2658 } 2659 if (this.executorService != null) this.executorService.shutdown(); 2660 if (this.replicationSourceHandler != null && 2661 this.replicationSourceHandler == this.replicationSinkHandler) { 2662 this.replicationSourceHandler.stopReplicationService(); 2663 } else { 2664 if (this.replicationSourceHandler != null) { 2665 this.replicationSourceHandler.stopReplicationService(); 2666 } 2667 if (this.replicationSinkHandler != null) { 2668 this.replicationSinkHandler.stopReplicationService(); 2669 } 2670 } 2671 } 2672 2673 /** 2674 * @return Return the object that implements the replication 2675 * source executorService. 2676 */ 2677 @InterfaceAudience.Private 2678 public ReplicationSourceService getReplicationSourceService() { 2679 return replicationSourceHandler; 2680 } 2681 2682 /** 2683 * @return Return the object that implements the replication 2684 * sink executorService. 2685 */ 2686 ReplicationSinkService getReplicationSinkService() { 2687 return replicationSinkHandler; 2688 } 2689 2690 /** 2691 * Get the current master from ZooKeeper and open the RPC connection to it. 2692 * To get a fresh connection, the current rssStub must be null. 2693 * Method will block until a master is available. You can break from this 2694 * block by requesting the server stop. 2695 * 2696 * @return master + port, or null if server has been stopped 2697 */ 2698 private synchronized ServerName createRegionServerStatusStub() { 2699 // Create RS stub without refreshing the master node from ZK, use cached data 2700 return createRegionServerStatusStub(false); 2701 } 2702 2703 /** 2704 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2705 * connection, the current rssStub must be null. Method will block until a master is available. 2706 * You can break from this block by requesting the server stop. 2707 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2708 * @return master + port, or null if server has been stopped 2709 */ 2710 @InterfaceAudience.Private 2711 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2712 if (rssStub != null) { 2713 return masterAddressTracker.getMasterAddress(); 2714 } 2715 ServerName sn = null; 2716 long previousLogTime = 0; 2717 RegionServerStatusService.BlockingInterface intRssStub = null; 2718 LockService.BlockingInterface intLockStub = null; 2719 boolean interrupted = false; 2720 try { 2721 while (keepLooping()) { 2722 sn = this.masterAddressTracker.getMasterAddress(refresh); 2723 if (sn == null) { 2724 if (!keepLooping()) { 2725 // give up with no connection. 2726 LOG.debug("No master found and cluster is stopped; bailing out"); 2727 return null; 2728 } 2729 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 2730 LOG.debug("No master found; retry"); 2731 previousLogTime = System.currentTimeMillis(); 2732 } 2733 refresh = true; // let's try pull it from ZK directly 2734 if (sleepInterrupted(200)) { 2735 interrupted = true; 2736 } 2737 continue; 2738 } 2739 2740 // If we are on the active master, use the shortcut 2741 if (this instanceof HMaster && sn.equals(getServerName())) { 2742 intRssStub = ((HMaster)this).getMasterRpcServices(); 2743 intLockStub = ((HMaster)this).getMasterRpcServices(); 2744 break; 2745 } 2746 try { 2747 BlockingRpcChannel channel = 2748 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), 2749 shortOperationTimeout); 2750 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2751 intLockStub = LockService.newBlockingStub(channel); 2752 break; 2753 } catch (IOException e) { 2754 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 2755 e = e instanceof RemoteException ? 2756 ((RemoteException)e).unwrapRemoteException() : e; 2757 if (e instanceof ServerNotRunningYetException) { 2758 LOG.info("Master isn't available yet, retrying"); 2759 } else { 2760 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2761 } 2762 previousLogTime = System.currentTimeMillis(); 2763 } 2764 if (sleepInterrupted(200)) { 2765 interrupted = true; 2766 } 2767 } 2768 } 2769 } finally { 2770 if (interrupted) { 2771 Thread.currentThread().interrupt(); 2772 } 2773 } 2774 this.rssStub = intRssStub; 2775 this.lockStub = intLockStub; 2776 return sn; 2777 } 2778 2779 /** 2780 * @return True if we should break loop because cluster is going down or 2781 * this server has been stopped or hdfs has gone bad. 2782 */ 2783 private boolean keepLooping() { 2784 return !this.stopped && isClusterUp(); 2785 } 2786 2787 /* 2788 * Let the master know we're here Run initialization using parameters passed 2789 * us by the master. 2790 * @return A Map of key/value configurations we got from the Master else 2791 * null if we failed to register. 2792 * @throws IOException 2793 */ 2794 private RegionServerStartupResponse reportForDuty() throws IOException { 2795 if (this.masterless) return RegionServerStartupResponse.getDefaultInstance(); 2796 ServerName masterServerName = createRegionServerStatusStub(true); 2797 RegionServerStatusService.BlockingInterface rss = rssStub; 2798 if (masterServerName == null || rss == null) return null; 2799 RegionServerStartupResponse result = null; 2800 try { 2801 rpcServices.requestCount.reset(); 2802 rpcServices.rpcGetRequestCount.reset(); 2803 rpcServices.rpcScanRequestCount.reset(); 2804 rpcServices.rpcFullScanRequestCount.reset(); 2805 rpcServices.rpcMultiRequestCount.reset(); 2806 rpcServices.rpcMutateRequestCount.reset(); 2807 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2808 + rpcServices.isa.getPort() + ", startcode=" + this.startcode); 2809 long now = EnvironmentEdgeManager.currentTime(); 2810 int port = rpcServices.isa.getPort(); 2811 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2812 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2813 request.setUseThisHostnameInstead(useThisHostnameInstead); 2814 } 2815 request.setPort(port); 2816 request.setServerStartCode(this.startcode); 2817 request.setServerCurrentTime(now); 2818 result = rss.regionServerStartup(null, request.build()); 2819 } catch (ServiceException se) { 2820 IOException ioe = ProtobufUtil.getRemoteException(se); 2821 if (ioe instanceof ClockOutOfSyncException) { 2822 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", 2823 ioe); 2824 // Re-throw IOE will cause RS to abort 2825 throw ioe; 2826 } else if (ioe instanceof ServerNotRunningYetException) { 2827 LOG.debug("Master is not running yet"); 2828 } else { 2829 LOG.warn("error telling master we are up", se); 2830 } 2831 rssStub = null; 2832 } 2833 return result; 2834 } 2835 2836 @Override 2837 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2838 try { 2839 GetLastFlushedSequenceIdRequest req = 2840 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2841 RegionServerStatusService.BlockingInterface rss = rssStub; 2842 if (rss == null) { // Try to connect one more time 2843 createRegionServerStatusStub(); 2844 rss = rssStub; 2845 if (rss == null) { 2846 // Still no luck, we tried 2847 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2848 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2849 .build(); 2850 } 2851 } 2852 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2853 return RegionStoreSequenceIds.newBuilder() 2854 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2855 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2856 } catch (ServiceException e) { 2857 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2858 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2859 .build(); 2860 } 2861 } 2862 2863 /** 2864 * Close meta region if we carry it 2865 * @param abort Whether we're running an abort. 2866 */ 2867 private void closeMetaTableRegions(final boolean abort) { 2868 HRegion meta = null; 2869 this.onlineRegionsLock.writeLock().lock(); 2870 try { 2871 for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) { 2872 RegionInfo hri = e.getValue().getRegionInfo(); 2873 if (hri.isMetaRegion()) { 2874 meta = e.getValue(); 2875 } 2876 if (meta != null) break; 2877 } 2878 } finally { 2879 this.onlineRegionsLock.writeLock().unlock(); 2880 } 2881 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2882 } 2883 2884 /** 2885 * Schedule closes on all user regions. 2886 * Should be safe calling multiple times because it wont' close regions 2887 * that are already closed or that are closing. 2888 * @param abort Whether we're running an abort. 2889 */ 2890 private void closeUserRegions(final boolean abort) { 2891 this.onlineRegionsLock.writeLock().lock(); 2892 try { 2893 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 2894 HRegion r = e.getValue(); 2895 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2896 // Don't update zk with this close transition; pass false. 2897 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2898 } 2899 } 2900 } finally { 2901 this.onlineRegionsLock.writeLock().unlock(); 2902 } 2903 } 2904 2905 /** @return the info server */ 2906 public InfoServer getInfoServer() { 2907 return infoServer; 2908 } 2909 2910 /** 2911 * @return true if a stop has been requested. 2912 */ 2913 @Override 2914 public boolean isStopped() { 2915 return this.stopped; 2916 } 2917 2918 @Override 2919 public boolean isStopping() { 2920 return this.stopping; 2921 } 2922 2923 @Override 2924 public Configuration getConfiguration() { 2925 return conf; 2926 } 2927 2928 protected Map<String, HRegion> getOnlineRegions() { 2929 return this.onlineRegions; 2930 } 2931 2932 public int getNumberOfOnlineRegions() { 2933 return this.onlineRegions.size(); 2934 } 2935 2936 /** 2937 * For tests, web ui and metrics. 2938 * This method will only work if HRegionServer is in the same JVM as client; 2939 * HRegion cannot be serialized to cross an rpc. 2940 */ 2941 public Collection<HRegion> getOnlineRegionsLocalContext() { 2942 Collection<HRegion> regions = this.onlineRegions.values(); 2943 return Collections.unmodifiableCollection(regions); 2944 } 2945 2946 @Override 2947 public void addRegion(HRegion region) { 2948 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2949 configurationManager.registerObserver(region); 2950 } 2951 2952 /** 2953 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2954 * the biggest. If two regions are the same size, then the last one found wins; i.e. this 2955 * method may NOT return all regions. 2956 */ 2957 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2958 // we'll sort the regions in reverse 2959 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2960 // Copy over all regions. Regions are sorted by size with biggest first. 2961 for (HRegion region : this.onlineRegions.values()) { 2962 sortedRegions.put(region.getMemStoreOffHeapSize(), region); 2963 } 2964 return sortedRegions; 2965 } 2966 2967 /** 2968 * @return A new Map of online regions sorted by region heap size with the first entry being the 2969 * biggest. If two regions are the same size, then the last one found wins; i.e. this method 2970 * may NOT return all regions. 2971 */ 2972 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2973 // we'll sort the regions in reverse 2974 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2975 // Copy over all regions. Regions are sorted by size with biggest first. 2976 for (HRegion region : this.onlineRegions.values()) { 2977 sortedRegions.put(region.getMemStoreHeapSize(), region); 2978 } 2979 return sortedRegions; 2980 } 2981 2982 /** 2983 * @return time stamp in millis of when this region server was started 2984 */ 2985 public long getStartcode() { 2986 return this.startcode; 2987 } 2988 2989 /** @return reference to FlushRequester */ 2990 @Override 2991 public FlushRequester getFlushRequester() { 2992 return this.cacheFlusher; 2993 } 2994 2995 @Override 2996 public CompactionRequester getCompactionRequestor() { 2997 return this.compactSplitThread; 2998 } 2999 3000 @Override 3001 public LeaseManager getLeaseManager() { 3002 return leaseManager; 3003 } 3004 3005 /** 3006 * @return Return the rootDir. 3007 */ 3008 protected Path getDataRootDir() { 3009 return dataRootDir; 3010 } 3011 3012 @Override 3013 public FileSystem getFileSystem() { 3014 return dataFs; 3015 } 3016 3017 /** 3018 * @return {@code true} when the data file system is available, {@code false} otherwise. 3019 */ 3020 boolean isDataFileSystemOk() { 3021 return this.dataFsOk; 3022 } 3023 3024 /** 3025 * @return Return the walRootDir. 3026 */ 3027 public Path getWALRootDir() { 3028 return walRootDir; 3029 } 3030 3031 /** 3032 * @return Return the walFs. 3033 */ 3034 public FileSystem getWALFileSystem() { 3035 return walFs; 3036 } 3037 3038 @Override 3039 public String toString() { 3040 return getServerName().toString(); 3041 } 3042 3043 @Override 3044 public ZKWatcher getZooKeeper() { 3045 return zooKeeper; 3046 } 3047 3048 @Override 3049 public CoordinatedStateManager getCoordinatedStateManager() { 3050 return csm; 3051 } 3052 3053 @Override 3054 public ServerName getServerName() { 3055 return serverName; 3056 } 3057 3058 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){ 3059 return this.rsHost; 3060 } 3061 3062 @Override 3063 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 3064 return this.regionsInTransitionInRS; 3065 } 3066 3067 @Override 3068 public ExecutorService getExecutorService() { 3069 return executorService; 3070 } 3071 3072 @Override 3073 public ChoreService getChoreService() { 3074 return choreService; 3075 } 3076 3077 @Override 3078 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 3079 return rsQuotaManager; 3080 } 3081 3082 // 3083 // Main program and support routines 3084 // 3085 /** 3086 * Load the replication executorService objects, if any 3087 */ 3088 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 3089 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { 3090 if ((server instanceof HMaster) && 3091 (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { 3092 return; 3093 } 3094 // read in the name of the source replication class from the config file. 3095 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 3096 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 3097 3098 // read in the name of the sink replication class from the config file. 3099 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 3100 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 3101 3102 // If both the sink and the source class names are the same, then instantiate 3103 // only one object. 3104 if (sourceClassname.equals(sinkClassname)) { 3105 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3106 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 3107 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 3108 } else { 3109 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3110 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 3111 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 3112 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 3113 } 3114 } 3115 3116 private static <T extends ReplicationService> T newReplicationInstance(String classname, 3117 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 3118 Path oldLogDir, WALFactory walFactory) throws IOException { 3119 final Class<? extends T> clazz; 3120 try { 3121 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 3122 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 3123 } catch (java.lang.ClassNotFoundException nfe) { 3124 throw new IOException("Could not find class for " + classname); 3125 } 3126 T service = ReflectionUtils.newInstance(clazz, conf); 3127 service.initialize(server, walFs, logDir, oldLogDir, walFactory); 3128 return service; 3129 } 3130 3131 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){ 3132 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 3133 if(!this.isOnline()){ 3134 return walGroupsReplicationStatus; 3135 } 3136 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 3137 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 3138 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 3139 for(ReplicationSourceInterface source: allSources){ 3140 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 3141 } 3142 return walGroupsReplicationStatus; 3143 } 3144 3145 /** 3146 * Utility for constructing an instance of the passed HRegionServer class. 3147 */ 3148 static HRegionServer constructRegionServer( 3149 final Class<? extends HRegionServer> regionServerClass, 3150 final Configuration conf 3151 ) { 3152 try { 3153 Constructor<? extends HRegionServer> c = 3154 regionServerClass.getConstructor(Configuration.class); 3155 return c.newInstance(conf); 3156 } catch (Exception e) { 3157 throw new RuntimeException("Failed construction of " + "Regionserver: " 3158 + regionServerClass.toString(), e); 3159 } 3160 } 3161 3162 /** 3163 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 3164 */ 3165 public static void main(String[] args) { 3166 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 3167 VersionInfo.logVersion(); 3168 Configuration conf = HBaseConfiguration.create(); 3169 @SuppressWarnings("unchecked") 3170 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 3171 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 3172 3173 new HRegionServerCommandLine(regionServerClass).doMain(args); 3174 } 3175 3176 /** 3177 * Gets the online regions of the specified table. 3178 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>. 3179 * Only returns <em>online</em> regions. If a region on this table has been 3180 * closed during a disable, etc., it will not be included in the returned list. 3181 * So, the returned list may not necessarily be ALL regions in this table, its 3182 * all the ONLINE regions in the table. 3183 * @param tableName table to limit the scope of the query 3184 * @return Online regions from <code>tableName</code> 3185 */ 3186 @Override 3187 public List<HRegion> getRegions(TableName tableName) { 3188 List<HRegion> tableRegions = new ArrayList<>(); 3189 synchronized (this.onlineRegions) { 3190 for (HRegion region: this.onlineRegions.values()) { 3191 RegionInfo regionInfo = region.getRegionInfo(); 3192 if(regionInfo.getTable().equals(tableName)) { 3193 tableRegions.add(region); 3194 } 3195 } 3196 } 3197 return tableRegions; 3198 } 3199 3200 @Override 3201 public List<HRegion> getRegions() { 3202 List<HRegion> allRegions; 3203 synchronized (this.onlineRegions) { 3204 // Return a clone copy of the onlineRegions 3205 allRegions = new ArrayList<>(onlineRegions.values()); 3206 } 3207 return allRegions; 3208 } 3209 3210 /** 3211 * Gets the online tables in this RS. 3212 * This method looks at the in-memory onlineRegions. 3213 * @return all the online tables in this RS 3214 */ 3215 public Set<TableName> getOnlineTables() { 3216 Set<TableName> tables = new HashSet<>(); 3217 synchronized (this.onlineRegions) { 3218 for (Region region: this.onlineRegions.values()) { 3219 tables.add(region.getTableDescriptor().getTableName()); 3220 } 3221 } 3222 return tables; 3223 } 3224 3225 public String[] getRegionServerCoprocessors() { 3226 TreeSet<String> coprocessors = new TreeSet<>(); 3227 try { 3228 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3229 } catch (IOException exception) { 3230 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " + 3231 "skipping."); 3232 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3233 } 3234 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3235 for (HRegion region: regions) { 3236 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3237 try { 3238 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3239 } catch (IOException exception) { 3240 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region + 3241 "; skipping."); 3242 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3243 } 3244 } 3245 coprocessors.addAll(rsHost.getCoprocessors()); 3246 return coprocessors.toArray(new String[0]); 3247 } 3248 3249 /** 3250 * Try to close the region, logs a warning on failure but continues. 3251 * @param region Region to close 3252 */ 3253 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3254 try { 3255 if (!closeRegion(region.getEncodedName(), abort, null)) { 3256 LOG.warn("Failed to close " + region.getRegionNameAsString() + 3257 " - ignoring and continuing"); 3258 } 3259 } catch (IOException e) { 3260 LOG.warn("Failed to close " + region.getRegionNameAsString() + 3261 " - ignoring and continuing", e); 3262 } 3263 } 3264 3265 /** 3266 * Close asynchronously a region, can be called from the master or internally by the regionserver 3267 * when stopping. If called from the master, the region will update the status. 3268 * 3269 * <p> 3270 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3271 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3272 * </p> 3273 3274 * <p> 3275 * If a close was in progress, this new request will be ignored, and an exception thrown. 3276 * </p> 3277 * 3278 * @param encodedName Region to close 3279 * @param abort True if we are aborting 3280 * @param destination Where the Region is being moved too... maybe null if unknown. 3281 * @return True if closed a region. 3282 * @throws NotServingRegionException if the region is not online 3283 */ 3284 protected boolean closeRegion(String encodedName, final boolean abort, 3285 final ServerName destination) 3286 throws NotServingRegionException { 3287 //Check for permissions to close. 3288 HRegion actualRegion = this.getRegion(encodedName); 3289 // Can be null if we're calling close on a region that's not online 3290 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3291 try { 3292 actualRegion.getCoprocessorHost().preClose(false); 3293 } catch (IOException exp) { 3294 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3295 return false; 3296 } 3297 } 3298 3299 // previous can come back 'null' if not in map. 3300 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), 3301 Boolean.FALSE); 3302 3303 if (Boolean.TRUE.equals(previous)) { 3304 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " + 3305 "trying to OPEN. Cancelling OPENING."); 3306 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3307 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3308 // We're going to try to do a standard close then. 3309 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." + 3310 " Doing a standard close now"); 3311 return closeRegion(encodedName, abort, destination); 3312 } 3313 // Let's get the region from the online region list again 3314 actualRegion = this.getRegion(encodedName); 3315 if (actualRegion == null) { // If already online, we still need to close it. 3316 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3317 // The master deletes the znode when it receives this exception. 3318 throw new NotServingRegionException("The region " + encodedName + 3319 " was opening but not yet served. Opening is cancelled."); 3320 } 3321 } else if (previous == null) { 3322 LOG.info("Received CLOSE for {}", encodedName); 3323 } else if (Boolean.FALSE.equals(previous)) { 3324 LOG.info("Received CLOSE for the region: " + encodedName + 3325 ", which we are already trying to CLOSE, but not completed yet"); 3326 return true; 3327 } 3328 3329 if (actualRegion == null) { 3330 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3331 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3332 // The master deletes the znode when it receives this exception. 3333 throw new NotServingRegionException("The region " + encodedName + 3334 " is not online, and is not opening."); 3335 } 3336 3337 CloseRegionHandler crh; 3338 final RegionInfo hri = actualRegion.getRegionInfo(); 3339 if (hri.isMetaRegion()) { 3340 crh = new CloseMetaHandler(this, this, hri, abort); 3341 } else { 3342 crh = new CloseRegionHandler(this, this, hri, abort, destination); 3343 } 3344 this.executorService.submit(crh); 3345 return true; 3346 } 3347 3348 /** 3349 * @return HRegion for the passed binary <code>regionName</code> or null if 3350 * named region is not member of the online regions. 3351 */ 3352 public HRegion getOnlineRegion(final byte[] regionName) { 3353 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3354 return this.onlineRegions.get(encodedRegionName); 3355 } 3356 3357 @Override 3358 public HRegion getRegion(final String encodedRegionName) { 3359 return this.onlineRegions.get(encodedRegionName); 3360 } 3361 3362 3363 @Override 3364 public boolean removeRegion(final HRegion r, ServerName destination) { 3365 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3366 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3367 if (destination != null) { 3368 long closeSeqNum = r.getMaxFlushedSeqId(); 3369 if (closeSeqNum == HConstants.NO_SEQNUM) { 3370 // No edits in WAL for this region; get the sequence number when the region was opened. 3371 closeSeqNum = r.getOpenSeqNum(); 3372 if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0; 3373 } 3374 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3375 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3376 if (selfMove) { 3377 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(r.getRegionInfo().getEncodedName() 3378 , new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3379 } 3380 } 3381 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3382 return toReturn != null; 3383 } 3384 3385 /** 3386 * Protected Utility method for safely obtaining an HRegion handle. 3387 * 3388 * @param regionName Name of online {@link HRegion} to return 3389 * @return {@link HRegion} for <code>regionName</code> 3390 */ 3391 protected HRegion getRegion(final byte[] regionName) 3392 throws NotServingRegionException { 3393 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3394 return getRegionByEncodedName(regionName, encodedRegionName); 3395 } 3396 3397 public HRegion getRegionByEncodedName(String encodedRegionName) 3398 throws NotServingRegionException { 3399 return getRegionByEncodedName(null, encodedRegionName); 3400 } 3401 3402 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3403 throws NotServingRegionException { 3404 HRegion region = this.onlineRegions.get(encodedRegionName); 3405 if (region == null) { 3406 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3407 if (moveInfo != null) { 3408 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3409 } 3410 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3411 String regionNameStr = regionName == null? 3412 encodedRegionName: Bytes.toStringBinary(regionName); 3413 if (isOpening != null && isOpening) { 3414 throw new RegionOpeningException("Region " + regionNameStr + 3415 " is opening on " + this.serverName); 3416 } 3417 throw new NotServingRegionException("" + regionNameStr + 3418 " is not online on " + this.serverName); 3419 } 3420 return region; 3421 } 3422 3423 /** 3424 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to 3425 * IOE if it isn't already. 3426 * 3427 * @param t Throwable 3428 * @param msg Message to log in error. Can be null. 3429 * @return Throwable converted to an IOE; methods can only let out IOEs. 3430 */ 3431 private Throwable cleanup(final Throwable t, final String msg) { 3432 // Don't log as error if NSRE; NSRE is 'normal' operation. 3433 if (t instanceof NotServingRegionException) { 3434 LOG.debug("NotServingRegionException; " + t.getMessage()); 3435 return t; 3436 } 3437 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3438 if (msg == null) { 3439 LOG.error("", e); 3440 } else { 3441 LOG.error(msg, e); 3442 } 3443 if (!rpcServices.checkOOME(t)) { 3444 checkFileSystem(); 3445 } 3446 return t; 3447 } 3448 3449 /** 3450 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3451 * @return Make <code>t</code> an IOE if it isn't already. 3452 */ 3453 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3454 return (t instanceof IOException ? (IOException) t : msg == null 3455 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t)); 3456 } 3457 3458 /** 3459 * Checks to see if the file system is still accessible. If not, sets 3460 * abortRequested and stopRequested 3461 * 3462 * @return false if file system is not available 3463 */ 3464 boolean checkFileSystem() { 3465 if (this.dataFsOk && this.dataFs != null) { 3466 try { 3467 FSUtils.checkFileSystemAvailable(this.dataFs); 3468 } catch (IOException e) { 3469 abort("File System not available", e); 3470 this.dataFsOk = false; 3471 } 3472 } 3473 return this.dataFsOk; 3474 } 3475 3476 @Override 3477 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3478 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3479 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()]; 3480 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3481 // it is a map of region name to InetSocketAddress[] 3482 for (int i = 0; i < favoredNodes.size(); i++) { 3483 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(), 3484 favoredNodes.get(i).getPort()); 3485 } 3486 regionFavoredNodesMap.put(encodedRegionName, addr); 3487 } 3488 3489 /** 3490 * Return the favored nodes for a region given its encoded name. Look at the 3491 * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[] 3492 * 3493 * @return array of favored locations 3494 */ 3495 @Override 3496 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3497 return regionFavoredNodesMap.get(encodedRegionName); 3498 } 3499 3500 @Override 3501 public ServerNonceManager getNonceManager() { 3502 return this.nonceManager; 3503 } 3504 3505 private static class MovedRegionInfo { 3506 private final ServerName serverName; 3507 private final long seqNum; 3508 3509 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3510 this.serverName = serverName; 3511 this.seqNum = closeSeqNum; 3512 } 3513 3514 public ServerName getServerName() { 3515 return serverName; 3516 } 3517 3518 public long getSeqNum() { 3519 return seqNum; 3520 } 3521 } 3522 3523 /** 3524 * We need a timeout. If not there is a risk of giving a wrong information: this would double 3525 * the number of network calls instead of reducing them. 3526 */ 3527 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3528 3529 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, boolean selfMove) { 3530 if (selfMove) { 3531 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3532 return; 3533 } 3534 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" + 3535 closeSeqNum); 3536 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3537 } 3538 3539 void removeFromMovedRegions(String encodedName) { 3540 movedRegionInfoCache.invalidate(encodedName); 3541 } 3542 3543 @InterfaceAudience.Private 3544 public MovedRegionInfo getMovedRegion(String encodedRegionName) { 3545 return movedRegionInfoCache.getIfPresent(encodedRegionName); 3546 } 3547 3548 @InterfaceAudience.Private 3549 public int movedRegionCacheExpiredTime() { 3550 return TIMEOUT_REGION_MOVED; 3551 } 3552 3553 private String getMyEphemeralNodePath() { 3554 return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString()); 3555 } 3556 3557 private boolean isHealthCheckerConfigured() { 3558 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3559 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3560 } 3561 3562 /** 3563 * @return the underlying {@link CompactSplit} for the servers 3564 */ 3565 public CompactSplit getCompactSplitThread() { 3566 return this.compactSplitThread; 3567 } 3568 3569 CoprocessorServiceResponse execRegionServerService( 3570 @SuppressWarnings("UnusedParameters") final RpcController controller, 3571 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3572 try { 3573 ServerRpcController serviceController = new ServerRpcController(); 3574 CoprocessorServiceCall call = serviceRequest.getCall(); 3575 String serviceName = call.getServiceName(); 3576 com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName); 3577 if (service == null) { 3578 throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " + 3579 serviceName); 3580 } 3581 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 3582 service.getDescriptorForType(); 3583 3584 String methodName = call.getMethodName(); 3585 com.google.protobuf.Descriptors.MethodDescriptor methodDesc = 3586 serviceDesc.findMethodByName(methodName); 3587 if (methodDesc == null) { 3588 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName + 3589 " called on executorService " + serviceName); 3590 } 3591 3592 com.google.protobuf.Message request = 3593 CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3594 final com.google.protobuf.Message.Builder responseBuilder = 3595 service.getResponsePrototype(methodDesc).newBuilderForType(); 3596 service.callMethod(methodDesc, serviceController, request, message -> { 3597 if (message != null) { 3598 responseBuilder.mergeFrom(message); 3599 } 3600 }); 3601 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3602 if (exception != null) { 3603 throw exception; 3604 } 3605 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3606 } catch (IOException ie) { 3607 throw new ServiceException(ie); 3608 } 3609 } 3610 3611 /** 3612 * May be null if this is a master which not carry table. 3613 * 3614 * @return The block cache instance used by the regionserver. 3615 */ 3616 @Override 3617 public Optional<BlockCache> getBlockCache() { 3618 return Optional.ofNullable(this.blockCache); 3619 } 3620 3621 /** 3622 * May be null if this is a master which not carry table. 3623 * 3624 * @return The cache for mob files used by the regionserver. 3625 */ 3626 @Override 3627 public Optional<MobFileCache> getMobFileCache() { 3628 return Optional.ofNullable(this.mobFileCache); 3629 } 3630 3631 @Override 3632 public AccessChecker getAccessChecker() { 3633 return rpcServices.getAccessChecker(); 3634 } 3635 3636 @Override 3637 public ZKPermissionWatcher getZKPermissionWatcher() { 3638 return rpcServices.getZkPermissionWatcher(); 3639 } 3640 3641 /** 3642 * @return : Returns the ConfigurationManager object for testing purposes. 3643 */ 3644 @InterfaceAudience.Private 3645 ConfigurationManager getConfigurationManager() { 3646 return configurationManager; 3647 } 3648 3649 /** 3650 * @return Return table descriptors implementation. 3651 */ 3652 @Override 3653 public TableDescriptors getTableDescriptors() { 3654 return this.tableDescriptors; 3655 } 3656 3657 /** 3658 * Reload the configuration from disk. 3659 */ 3660 void updateConfiguration() { 3661 LOG.info("Reloading the configuration from disk."); 3662 // Reload the configuration from disk. 3663 conf.reloadConfiguration(); 3664 configurationManager.notifyAllObservers(conf); 3665 } 3666 3667 CacheEvictionStats clearRegionBlockCache(Region region) { 3668 long evictedBlocks = 0; 3669 3670 for(Store store : region.getStores()) { 3671 for(StoreFile hFile : store.getStorefiles()) { 3672 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3673 } 3674 } 3675 3676 return CacheEvictionStats.builder() 3677 .withEvictedBlocks(evictedBlocks) 3678 .build(); 3679 } 3680 3681 @Override 3682 public double getCompactionPressure() { 3683 double max = 0; 3684 for (Region region : onlineRegions.values()) { 3685 for (Store store : region.getStores()) { 3686 double normCount = store.getCompactionPressure(); 3687 if (normCount > max) { 3688 max = normCount; 3689 } 3690 } 3691 } 3692 return max; 3693 } 3694 3695 @Override 3696 public HeapMemoryManager getHeapMemoryManager() { 3697 return hMemManager; 3698 } 3699 3700 MemStoreFlusher getMemStoreFlusher() { 3701 return cacheFlusher; 3702 } 3703 3704 /** 3705 * For testing 3706 * @return whether all wal roll request finished for this regionserver 3707 */ 3708 @InterfaceAudience.Private 3709 public boolean walRollRequestFinished() { 3710 return this.walRoller.walRollFinished(); 3711 } 3712 3713 @Override 3714 public ThroughputController getFlushThroughputController() { 3715 return flushThroughputController; 3716 } 3717 3718 @Override 3719 public double getFlushPressure() { 3720 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3721 // return 0 during RS initialization 3722 return 0.0; 3723 } 3724 return getRegionServerAccounting().getFlushPressure(); 3725 } 3726 3727 @Override 3728 public void onConfigurationChange(Configuration newConf) { 3729 ThroughputController old = this.flushThroughputController; 3730 if (old != null) { 3731 old.stop("configuration change"); 3732 } 3733 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3734 try { 3735 Superusers.initialize(newConf); 3736 } catch (IOException e) { 3737 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3738 } 3739 } 3740 3741 @Override 3742 public MetricsRegionServer getMetrics() { 3743 return metricsRegionServer; 3744 } 3745 3746 @Override 3747 public SecureBulkLoadManager getSecureBulkLoadManager() { 3748 return this.secureBulkLoadManager; 3749 } 3750 3751 @Override 3752 public EntityLock regionLock(final List<RegionInfo> regionInfos, final String description, 3753 final Abortable abort) { 3754 return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) 3755 .regionLock(regionInfos, description, abort); 3756 } 3757 3758 @Override 3759 public void unassign(byte[] regionName) throws IOException { 3760 clusterConnection.getAdmin().unassign(regionName, false); 3761 } 3762 3763 @Override 3764 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3765 return this.rsSpaceQuotaManager; 3766 } 3767 3768 @Override 3769 public boolean reportFileArchivalForQuotas(TableName tableName, 3770 Collection<Entry<String, Long>> archivedFiles) { 3771 if (TEST_SKIP_REPORTING_TRANSITION) { 3772 return false; 3773 } 3774 RegionServerStatusService.BlockingInterface rss = rssStub; 3775 if (rss == null || rsSpaceQuotaManager == null) { 3776 // the current server could be stopping. 3777 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3778 return false; 3779 } 3780 try { 3781 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3782 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3783 rss.reportFileArchival(null, request); 3784 } catch (ServiceException se) { 3785 IOException ioe = ProtobufUtil.getRemoteException(se); 3786 if (ioe instanceof PleaseHoldException) { 3787 if (LOG.isTraceEnabled()) { 3788 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3789 + " This will be retried.", ioe); 3790 } 3791 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3792 return false; 3793 } 3794 if (rssStub == rss) { 3795 rssStub = null; 3796 } 3797 // re-create the stub if we failed to report the archival 3798 createRegionServerStatusStub(true); 3799 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3800 return false; 3801 } 3802 return true; 3803 } 3804 3805 public NettyEventLoopGroupConfig getEventLoopGroupConfig() { 3806 return eventLoopGroupConfig; 3807 } 3808 3809 @Override 3810 public Connection createConnection(Configuration conf) throws IOException { 3811 User user = UserProvider.instantiate(conf).getCurrent(); 3812 return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, 3813 this.rpcServices, this.rpcServices); 3814 } 3815 3816 void executeProcedure(long procId, RSProcedureCallable callable) { 3817 executorService.submit(new RSProcedureHandler(this, procId, callable)); 3818 } 3819 3820 public void remoteProcedureComplete(long procId, Throwable error) { 3821 procedureResultReporter.complete(procId, error); 3822 } 3823 3824 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3825 RegionServerStatusService.BlockingInterface rss; 3826 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 3827 for (;;) { 3828 rss = rssStub; 3829 if (rss != null) { 3830 break; 3831 } 3832 createRegionServerStatusStub(); 3833 } 3834 try { 3835 rss.reportProcedureDone(null, request); 3836 } catch (ServiceException se) { 3837 if (rssStub == rss) { 3838 rssStub = null; 3839 } 3840 throw ProtobufUtil.getRemoteException(se); 3841 } 3842 } 3843 3844 /** 3845 * Will ignore the open/close region procedures which already submitted or executed. 3846 * 3847 * When master had unfinished open/close region procedure and restarted, new active master may 3848 * send duplicate open/close region request to regionserver. The open/close request is submitted 3849 * to a thread pool and execute. So first need a cache for submitted open/close region procedures. 3850 * 3851 * After the open/close region request executed and report region transition succeed, cache it in 3852 * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3853 * transition succeed, master will not send the open/close region request to regionserver again. 3854 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3855 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. 3856 * 3857 * See HBASE-22404 for more details. 3858 * 3859 * @param procId the id of the open/close region procedure 3860 * @return true if the procedure can be submitted. 3861 */ 3862 boolean submitRegionProcedure(long procId) { 3863 if (procId == -1) { 3864 return true; 3865 } 3866 // Ignore the region procedures which already submitted. 3867 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3868 if (previous != null) { 3869 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3870 return false; 3871 } 3872 // Ignore the region procedures which already executed. 3873 if (executedRegionProcedures.getIfPresent(procId) != null) { 3874 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3875 return false; 3876 } 3877 return true; 3878 } 3879 3880 /** 3881 * See {@link #submitRegionProcedure(long)}. 3882 * @param procId the id of the open/close region procedure 3883 */ 3884 public void finishRegionProcedure(long procId) { 3885 executedRegionProcedures.put(procId, procId); 3886 submittedRegionProcedures.remove(procId); 3887 } 3888 3889 public boolean isShutDown() { 3890 return shutDown; 3891 } 3892 3893 /** 3894 * Force to terminate region server when abort timeout. 3895 */ 3896 private static class SystemExitWhenAbortTimeout extends TimerTask { 3897 3898 public SystemExitWhenAbortTimeout() { 3899 } 3900 3901 @Override 3902 public void run() { 3903 LOG.warn("Aborting region server timed out, terminating forcibly" + 3904 " and does not wait for any running shutdown hooks or finalizers to finish their work." + 3905 " Thread dump to stdout."); 3906 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3907 Runtime.getRuntime().halt(1); 3908 } 3909 } 3910 3911 @InterfaceAudience.Private 3912 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 3913 return compactedFileDischarger; 3914 } 3915}