001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 024import static org.apache.hadoop.hbase.util.DNS.RS_HOSTNAME_KEY; 025import java.io.IOException; 026import java.lang.management.MemoryType; 027import java.lang.management.MemoryUsage; 028import java.lang.reflect.Constructor; 029import java.net.BindException; 030import java.net.InetAddress; 031import java.net.InetSocketAddress; 032import java.time.Duration; 033import java.util.ArrayList; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.Comparator; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Map.Entry; 041import java.util.Objects; 042import java.util.Optional; 043import java.util.Set; 044import java.util.SortedMap; 045import java.util.Timer; 046import java.util.TimerTask; 047import java.util.TreeMap; 048import java.util.TreeSet; 049import java.util.concurrent.ConcurrentHashMap; 050import java.util.concurrent.ConcurrentMap; 051import java.util.concurrent.ConcurrentSkipListMap; 052import java.util.concurrent.TimeUnit; 053import java.util.concurrent.atomic.AtomicBoolean; 054import java.util.concurrent.locks.ReentrantReadWriteLock; 055import java.util.stream.Collectors; 056import javax.management.MalformedObjectNameException; 057import javax.servlet.http.HttpServlet; 058import org.apache.commons.lang3.RandomUtils; 059import org.apache.commons.lang3.StringUtils; 060import org.apache.commons.lang3.SystemUtils; 061import org.apache.hadoop.conf.Configuration; 062import org.apache.hadoop.fs.FileSystem; 063import org.apache.hadoop.fs.Path; 064import org.apache.hadoop.hbase.Abortable; 065import org.apache.hadoop.hbase.CacheEvictionStats; 066import org.apache.hadoop.hbase.CallQueueTooBigException; 067import org.apache.hadoop.hbase.ChoreService; 068import org.apache.hadoop.hbase.ClockOutOfSyncException; 069import org.apache.hadoop.hbase.CoordinatedStateManager; 070import org.apache.hadoop.hbase.DoNotRetryIOException; 071import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException; 072import org.apache.hadoop.hbase.HBaseConfiguration; 073import org.apache.hadoop.hbase.HBaseInterfaceAudience; 074import org.apache.hadoop.hbase.HConstants; 075import org.apache.hadoop.hbase.HealthCheckChore; 076import org.apache.hadoop.hbase.MetaTableAccessor; 077import org.apache.hadoop.hbase.NotServingRegionException; 078import org.apache.hadoop.hbase.PleaseHoldException; 079import org.apache.hadoop.hbase.ScheduledChore; 080import org.apache.hadoop.hbase.ServerName; 081import org.apache.hadoop.hbase.Stoppable; 082import org.apache.hadoop.hbase.TableDescriptors; 083import org.apache.hadoop.hbase.TableName; 084import org.apache.hadoop.hbase.YouAreDeadException; 085import org.apache.hadoop.hbase.ZNodeClearer; 086import org.apache.hadoop.hbase.client.ClusterConnection; 087import org.apache.hadoop.hbase.client.Connection; 088import org.apache.hadoop.hbase.client.ConnectionUtils; 089import org.apache.hadoop.hbase.client.RegionInfo; 090import org.apache.hadoop.hbase.client.RegionInfoBuilder; 091import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 092import org.apache.hadoop.hbase.client.locking.EntityLock; 093import org.apache.hadoop.hbase.client.locking.LockServiceClient; 094import org.apache.hadoop.hbase.conf.ConfigurationManager; 095import org.apache.hadoop.hbase.conf.ConfigurationObserver; 096import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 097import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 098import org.apache.hadoop.hbase.exceptions.RegionMovedException; 099import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 100import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 101import org.apache.hadoop.hbase.executor.ExecutorService; 102import org.apache.hadoop.hbase.executor.ExecutorType; 103import org.apache.hadoop.hbase.fs.HFileSystem; 104import org.apache.hadoop.hbase.http.InfoServer; 105import org.apache.hadoop.hbase.io.hfile.BlockCache; 106import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 107import org.apache.hadoop.hbase.io.hfile.HFile; 108import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 109import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 110import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; 111import org.apache.hadoop.hbase.ipc.RpcClient; 112import org.apache.hadoop.hbase.ipc.RpcClientFactory; 113import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 114import org.apache.hadoop.hbase.ipc.RpcServer; 115import org.apache.hadoop.hbase.ipc.RpcServerInterface; 116import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 117import org.apache.hadoop.hbase.ipc.ServerRpcController; 118import org.apache.hadoop.hbase.log.HBaseMarkers; 119import org.apache.hadoop.hbase.master.HMaster; 120import org.apache.hadoop.hbase.master.LoadBalancer; 121import org.apache.hadoop.hbase.master.RegionState.State; 122import org.apache.hadoop.hbase.master.RegionState; 123import org.apache.hadoop.hbase.mob.MobFileCache; 124import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 125import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 126import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 127import org.apache.hadoop.hbase.quotas.QuotaUtil; 128import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 129import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 130import org.apache.hadoop.hbase.quotas.RegionSize; 131import org.apache.hadoop.hbase.quotas.RegionSizeStore; 132import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 133import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 134import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 135import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 136import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 137import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 138import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 139import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 140import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; 141import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore; 142import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 143import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 144import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 145import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 146import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 147import org.apache.hadoop.hbase.security.SecurityConstants; 148import org.apache.hadoop.hbase.security.Superusers; 149import org.apache.hadoop.hbase.security.User; 150import org.apache.hadoop.hbase.security.UserProvider; 151import org.apache.hadoop.hbase.security.access.AccessChecker; 152import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; 153import org.apache.hadoop.hbase.trace.SpanReceiverHost; 154import org.apache.hadoop.hbase.trace.TraceUtil; 155import org.apache.hadoop.hbase.util.Addressing; 156import org.apache.hadoop.hbase.util.Bytes; 157import org.apache.hadoop.hbase.util.CommonFSUtils; 158import org.apache.hadoop.hbase.util.CompressionTest; 159import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 160import org.apache.hadoop.hbase.util.FSTableDescriptors; 161import org.apache.hadoop.hbase.util.FSUtils; 162import org.apache.hadoop.hbase.util.FutureUtils; 163import org.apache.hadoop.hbase.util.JvmPauseMonitor; 164import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 165import org.apache.hadoop.hbase.util.Pair; 166import org.apache.hadoop.hbase.util.RetryCounter; 167import org.apache.hadoop.hbase.util.RetryCounterFactory; 168import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 169import org.apache.hadoop.hbase.util.Sleeper; 170import org.apache.hadoop.hbase.util.Threads; 171import org.apache.hadoop.hbase.util.VersionInfo; 172import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 173import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; 174import org.apache.hadoop.hbase.wal.WAL; 175import org.apache.hadoop.hbase.wal.WALFactory; 176import org.apache.hadoop.hbase.wal.WALProvider; 177import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; 178import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 179import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 180import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 181import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 182import org.apache.hadoop.hbase.zookeeper.ZKUtil; 183import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 184import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 185import org.apache.hadoop.ipc.RemoteException; 186import org.apache.hadoop.util.ReflectionUtils; 187import org.apache.yetus.audience.InterfaceAudience; 188import org.apache.zookeeper.KeeperException; 189import org.slf4j.Logger; 190import org.slf4j.LoggerFactory; 191import sun.misc.Signal; 192import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 193import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 194import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 195import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 196import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 197import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 198import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 199import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 200import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 201import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 202import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 203import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 204import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 234 235/** 236 * HRegionServer makes a set of HRegions available to clients. It checks in with 237 * the HMaster. There are many HRegionServers in a single HBase deployment. 238 */ 239@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 240@SuppressWarnings({ "deprecation"}) 241public class HRegionServer extends Thread implements 242 RegionServerServices, LastSequenceId, ConfigurationObserver { 243 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 244 245 /** 246 * For testing only! Set to true to skip notifying region assignment to master . 247 */ 248 @VisibleForTesting 249 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") 250 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 251 252 /** 253 * A map from RegionName to current action in progress. Boolean value indicates: 254 * true - if open region action in progress 255 * false - if close region action in progress 256 */ 257 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 258 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 259 260 /** 261 * Used to cache the open/close region procedures which already submitted. 262 * See {@link #submitRegionProcedure(long)}. 263 */ 264 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 265 /** 266 * Used to cache the open/close region procedures which already executed. 267 * See {@link #submitRegionProcedure(long)}. 268 */ 269 private final Cache<Long, Long> executedRegionProcedures = 270 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 271 272 private MemStoreFlusher cacheFlusher; 273 274 private HeapMemoryManager hMemManager; 275 276 /** 277 * Cluster connection to be shared by services. 278 * Initialized at server startup and closed when server shuts down. 279 * Clients must never close it explicitly. 280 * Clients hosted by this Server should make use of this clusterConnection rather than create 281 * their own; if they create their own, there is no way for the hosting server to shutdown 282 * ongoing client RPCs. 283 */ 284 protected ClusterConnection clusterConnection; 285 286 /** 287 * Go here to get table descriptors. 288 */ 289 protected TableDescriptors tableDescriptors; 290 291 // Replication services. If no replication, this handler will be null. 292 private ReplicationSourceService replicationSourceHandler; 293 private ReplicationSinkService replicationSinkHandler; 294 295 // Compactions 296 public CompactSplit compactSplitThread; 297 298 /** 299 * Map of regions currently being served by this region server. Key is the 300 * encoded region name. All access should be synchronized. 301 */ 302 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 303 /** 304 * Lock for gating access to {@link #onlineRegions}. 305 * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap? 306 */ 307 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 308 309 /** 310 * Map of encoded region names to the DataNode locations they should be hosted on 311 * We store the value as InetSocketAddress since this is used only in HDFS 312 * API (create() that takes favored nodes as hints for placing file blocks). 313 * We could have used ServerName here as the value class, but we'd need to 314 * convert it to InetSocketAddress at some point before the HDFS API call, and 315 * it seems a bit weird to store ServerName since ServerName refers to RegionServers 316 * and here we really mean DataNode locations. 317 */ 318 private final Map<String, InetSocketAddress[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 319 320 private LeaseManager leaseManager; 321 322 // Instance of the hbase executor executorService. 323 protected ExecutorService executorService; 324 325 private volatile boolean dataFsOk; 326 private HFileSystem dataFs; 327 private HFileSystem walFs; 328 329 // Set when a report to the master comes back with a message asking us to 330 // shutdown. Also set by call to stop when debugging or running unit tests 331 // of HRegionServer in isolation. 332 private volatile boolean stopped = false; 333 334 // Go down hard. Used if file system becomes unavailable and also in 335 // debugging and unit tests. 336 private volatile boolean abortRequested; 337 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 338 // Default abort timeout is 1200 seconds for safe 339 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 340 // Will run this task when abort timeout 341 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 342 343 // A state before we go into stopped state. At this stage we're closing user 344 // space regions. 345 private boolean stopping = false; 346 private volatile boolean killed = false; 347 private volatile boolean shutDown = false; 348 349 protected final Configuration conf; 350 351 private Path dataRootDir; 352 private Path walRootDir; 353 354 private final int threadWakeFrequency; 355 final int msgInterval; 356 357 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 358 private final int compactionCheckFrequency; 359 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 360 private final int flushCheckFrequency; 361 362 // Stub to do region server status calls against the master. 363 private volatile RegionServerStatusService.BlockingInterface rssStub; 364 private volatile LockService.BlockingInterface lockStub; 365 // RPC client. Used to make the stub above that does region server status checking. 366 private RpcClient rpcClient; 367 368 private RpcRetryingCallerFactory rpcRetryingCallerFactory; 369 private RpcControllerFactory rpcControllerFactory; 370 371 private UncaughtExceptionHandler uncaughtExceptionHandler; 372 373 // Info server. Default access so can be used by unit tests. REGIONSERVER 374 // is name of the webapp and the attribute name used stuffing this instance 375 // into web context. 376 protected InfoServer infoServer; 377 private JvmPauseMonitor pauseMonitor; 378 379 /** region server process name */ 380 public static final String REGIONSERVER = "regionserver"; 381 382 private MetricsRegionServer metricsRegionServer; 383 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 384 private SpanReceiverHost spanReceiverHost; 385 386 /** 387 * ChoreService used to schedule tasks that we want to run periodically 388 */ 389 private ChoreService choreService; 390 391 /** 392 * Check for compactions requests. 393 */ 394 private ScheduledChore compactionChecker; 395 396 /** 397 * Check for flushes 398 */ 399 private ScheduledChore periodicFlusher; 400 401 private volatile WALFactory walFactory; 402 403 private LogRoller walRoller; 404 405 // A thread which calls reportProcedureDone 406 private RemoteProcedureResultReporter procedureResultReporter; 407 408 // flag set after we're done setting up server threads 409 final AtomicBoolean online = new AtomicBoolean(false); 410 411 // zookeeper connection and watcher 412 protected final ZKWatcher zooKeeper; 413 414 // master address tracker 415 private final MasterAddressTracker masterAddressTracker; 416 417 // Cluster Status Tracker 418 protected final ClusterStatusTracker clusterStatusTracker; 419 420 // Log Splitting Worker 421 private SplitLogWorker splitLogWorker; 422 423 // A sleeper that sleeps for msgInterval. 424 protected final Sleeper sleeper; 425 426 private final int operationTimeout; 427 private final int shortOperationTimeout; 428 429 private final RegionServerAccounting regionServerAccounting; 430 431 private SlowLogTableOpsChore slowLogTableOpsChore = null; 432 433 // Block cache 434 private BlockCache blockCache; 435 // The cache for mob files 436 private MobFileCache mobFileCache; 437 438 /** The health check chore. */ 439 private HealthCheckChore healthCheckChore; 440 441 /** The nonce manager chore. */ 442 private ScheduledChore nonceManagerChore; 443 444 private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap(); 445 446 /** 447 * The server name the Master sees us as. Its made from the hostname the 448 * master passes us, port, and server startcode. Gets set after registration 449 * against Master. 450 */ 451 protected ServerName serverName; 452 453 /** 454 * hostname specified by hostname config 455 */ 456 protected String useThisHostnameInstead; 457 458 /** 459 * HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive. 460 * Exception will be thrown if both are used. 461 */ 462 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 463 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 464 "hbase.regionserver.hostname.disable.master.reversedns"; 465 466 /** 467 * This servers startcode. 468 */ 469 protected final long startcode; 470 471 /** 472 * Unique identifier for the cluster we are a part of. 473 */ 474 protected String clusterId; 475 476 /** 477 * Chore to clean periodically the moved region list 478 */ 479 private MovedRegionsCleaner movedRegionsCleaner; 480 481 // chore for refreshing store files for secondary regions 482 private StorefileRefresherChore storefileRefresher; 483 484 private RegionServerCoprocessorHost rsHost; 485 486 private RegionServerProcedureManagerHost rspmHost; 487 488 private RegionServerRpcQuotaManager rsQuotaManager; 489 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 490 491 /** 492 * Nonce manager. Nonces are used to make operations like increment and append idempotent 493 * in the case where client doesn't receive the response from a successful operation and 494 * retries. We track the successful ops for some time via a nonce sent by client and handle 495 * duplicate operations (currently, by failing them; in future we might use MVCC to return 496 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from 497 * HBASE-3787) are: 498 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth 499 * of past records. If we don't read the records, we don't read and recover the nonces. 500 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup. 501 * - There's no WAL recovery during normal region move, so nonces will not be transfered. 502 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and 503 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush 504 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was 505 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce 506 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the 507 * latest nonce in it expired. It can also be recovered during move. 508 */ 509 final ServerNonceManager nonceManager; 510 511 private UserProvider userProvider; 512 513 protected final RSRpcServices rpcServices; 514 515 private CoordinatedStateManager csm; 516 517 /** 518 * Configuration manager is used to register/deregister and notify the configuration observers 519 * when the regionserver is notified that there was a change in the on disk configs. 520 */ 521 protected final ConfigurationManager configurationManager; 522 523 @VisibleForTesting 524 CompactedHFilesDischarger compactedFileDischarger; 525 526 private volatile ThroughputController flushThroughputController; 527 528 private SecureBulkLoadManager secureBulkLoadManager; 529 530 private FileSystemUtilizationChore fsUtilizationChore; 531 532 private final NettyEventLoopGroupConfig eventLoopGroupConfig; 533 534 /** 535 * Provide online slow log responses from ringbuffer 536 */ 537 private SlowLogRecorder slowLogRecorder; 538 539 /** 540 * True if this RegionServer is coming up in a cluster where there is no Master; 541 * means it needs to just come up and make do without a Master to talk to: e.g. in test or 542 * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only 543 * purpose is as a Replication-stream sink; see HBASE-18846 for more. 544 * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ? 545 */ 546 private final boolean masterless; 547 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 548 549 /**regionserver codec list **/ 550 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 551 552 // A timer to shutdown the process if abort takes too long 553 private Timer abortMonitor; 554 555 /** 556 * Starts a HRegionServer at the default location. 557 * <p/> 558 * Don't start any services or managers in here in the Constructor. 559 * Defer till after we register with the Master as much as possible. See {@link #startServices}. 560 */ 561 public HRegionServer(final Configuration conf) throws IOException { 562 super("RegionServer"); // thread name 563 TraceUtil.initTracer(conf); 564 try { 565 this.startcode = System.currentTimeMillis(); 566 this.conf = conf; 567 this.dataFsOk = true; 568 this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 569 this.eventLoopGroupConfig = setupNetty(this.conf); 570 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); 571 HFile.checkHFileVersion(this.conf); 572 checkCodecs(this.conf); 573 this.userProvider = UserProvider.instantiate(conf); 574 FSUtils.setupShortCircuitRead(this.conf); 575 576 // Disable usage of meta replicas in the regionserver 577 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 578 // Config'ed params 579 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 580 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 581 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 582 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); 583 584 this.sleeper = new Sleeper(this.msgInterval, this); 585 586 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 587 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 588 589 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 590 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 591 592 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 593 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 594 595 this.abortRequested = false; 596 this.stopped = false; 597 598 if (!(this instanceof HMaster)) { 599 this.slowLogRecorder = new SlowLogRecorder(this.conf); 600 } 601 rpcServices = createRpcServices(); 602 useThisHostnameInstead = getUseThisHostnameInstead(conf); 603 String hostName = 604 StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName() 605 : this.useThisHostnameInstead; 606 serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); 607 608 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); 609 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); 610 611 // login the zookeeper client principal (if using security) 612 ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, 613 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); 614 // login the server principal (if using secure Hadoop) 615 login(userProvider, hostName); 616 // init superusers and add the server principal (if using security) 617 // or process owner as default super user. 618 Superusers.initialize(conf); 619 regionServerAccounting = new RegionServerAccounting(conf); 620 621 boolean isMasterNotCarryTable = 622 this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf); 623 624 // no need to instantiate block cache and mob file cache when master not carry table 625 if (!isMasterNotCarryTable) { 626 blockCache = BlockCacheFactory.createBlockCache(conf); 627 mobFileCache = new MobFileCache(conf); 628 } 629 630 uncaughtExceptionHandler = 631 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 632 633 initializeFileSystem(); 634 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); 635 636 this.configurationManager = new ConfigurationManager(); 637 setupWindows(getConfiguration(), getConfigurationManager()); 638 639 // Some unit tests don't need a cluster, so no zookeeper at all 640 if (!conf.getBoolean("hbase.testing.nocluster", false)) { 641 // Open connection to zookeeper and set primary watcher 642 zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + 643 rpcServices.isa.getPort(), this, canCreateBaseZNode()); 644 // If no master in cluster, skip trying to track one or look for a cluster status. 645 if (!this.masterless) { 646 if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 647 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { 648 this.csm = new ZkCoordinatedStateManager(this); 649 } 650 651 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 652 masterAddressTracker.start(); 653 654 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); 655 clusterStatusTracker.start(); 656 } else { 657 masterAddressTracker = null; 658 clusterStatusTracker = null; 659 } 660 } else { 661 zooKeeper = null; 662 masterAddressTracker = null; 663 clusterStatusTracker = null; 664 } 665 this.rpcServices.start(zooKeeper); 666 // This violates 'no starting stuff in Constructor' but Master depends on the below chore 667 // and executor being created and takes a different startup route. Lots of overlap between HRS 668 // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super 669 // Master expects Constructor to put up web servers. Ugh. 670 // class HRS. TODO. 671 this.choreService = new ChoreService(getName(), true); 672 this.executorService = new ExecutorService(getName()); 673 putUpWebUI(); 674 } catch (Throwable t) { 675 // Make sure we log the exception. HRegionServer is often started via reflection and the 676 // cause of failed startup is lost. 677 LOG.error("Failed construction RegionServer", t); 678 throw t; 679 } 680 } 681 682 // HMaster should override this method to load the specific config for master 683 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 684 String hostname = conf.get(RS_HOSTNAME_KEY); 685 if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 686 if (!StringUtils.isBlank(hostname)) { 687 String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY + 688 " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + 689 " to true while " + RS_HOSTNAME_KEY + " is used"; 690 throw new IOException(msg); 691 } else { 692 return rpcServices.isa.getHostName(); 693 } 694 } else { 695 return hostname; 696 } 697 } 698 699 /** 700 * If running on Windows, do windows-specific setup. 701 */ 702 private static void setupWindows(final Configuration conf, ConfigurationManager cm) { 703 if (!SystemUtils.IS_OS_WINDOWS) { 704 Signal.handle(new Signal("HUP"), signal -> { 705 conf.reloadConfiguration(); 706 cm.notifyAllObservers(conf); 707 }); 708 } 709 } 710 711 private static NettyEventLoopGroupConfig setupNetty(Configuration conf) { 712 // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL. 713 NettyEventLoopGroupConfig nelgc = 714 new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); 715 NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 716 NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 717 return nelgc; 718 } 719 720 private void initializeFileSystem() throws IOException { 721 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase 722 // checksum verification enabled, then automatically switch off hdfs checksum verification. 723 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 724 CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getWALRootDir(this.conf)); 725 this.walFs = new HFileSystem(this.conf, useHBaseChecksum); 726 this.walRootDir = CommonFSUtils.getWALRootDir(this.conf); 727 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else 728 // underlying hadoop hdfs accessors will be going against wrong filesystem 729 // (unless all is set to defaults). 730 CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getRootDir(this.conf)); 731 this.dataFs = new HFileSystem(this.conf, useHBaseChecksum); 732 this.dataRootDir = CommonFSUtils.getRootDir(this.conf); 733 this.tableDescriptors = 734 new FSTableDescriptors(this.dataFs, this.dataRootDir, !canUpdateTableDescriptor(), false); 735 if (this instanceof HMaster) { 736 FSTableDescriptors.tryUpdateMetaTableDescriptor(this.conf, this.dataFs, this.dataRootDir, 737 builder -> builder.setRegionReplication( 738 conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM))); 739 } 740 } 741 742 protected void login(UserProvider user, String host) throws IOException { 743 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 744 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 745 } 746 747 /** 748 * Wait for an active Master. 749 * See override in Master superclass for how it is used. 750 */ 751 protected void waitForMasterActive() {} 752 753 protected String getProcessName() { 754 return REGIONSERVER; 755 } 756 757 protected boolean canCreateBaseZNode() { 758 return this.masterless; 759 } 760 761 protected boolean canUpdateTableDescriptor() { 762 return false; 763 } 764 765 protected RSRpcServices createRpcServices() throws IOException { 766 return new RSRpcServices(this); 767 } 768 769 protected void configureInfoServer() { 770 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 771 infoServer.setAttribute(REGIONSERVER, this); 772 } 773 774 protected Class<? extends HttpServlet> getDumpServlet() { 775 return RSDumpServlet.class; 776 } 777 778 @Override 779 public boolean registerService(com.google.protobuf.Service instance) { 780 /* 781 * No stacking of instances is allowed for a single executorService name 782 */ 783 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 784 instance.getDescriptorForType(); 785 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 786 if (coprocessorServiceHandlers.containsKey(serviceName)) { 787 LOG.error("Coprocessor executorService " + serviceName 788 + " already registered, rejecting request from " + instance); 789 return false; 790 } 791 792 coprocessorServiceHandlers.put(serviceName, instance); 793 if (LOG.isDebugEnabled()) { 794 LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName); 795 } 796 return true; 797 } 798 799 protected ClusterConnection createClusterConnection() throws IOException { 800 Configuration conf = this.conf; 801 // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons: 802 // - Decouples RS and master life cycles. RegionServers can continue be up independent of 803 // masters' availability. 804 // - Configuration management for region servers (cluster internal) is much simpler when adding 805 // new masters or removing existing masters, since only clients' config needs to be updated. 806 // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for 807 // other internal connections too. 808 conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 809 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 810 if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 811 // Use server ZK cluster for server-issued connections, so we clone 812 // the conf and unset the client ZK related properties 813 conf = new Configuration(this.conf); 814 conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 815 } 816 // Create a cluster connection that when appropriate, can short-circuit and go directly to the 817 // local server if the request is to the local server bypassing RPC. Can be used for both local 818 // and remote invocations. 819 return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), 820 serverName, rpcServices, rpcServices); 821 } 822 823 /** 824 * Run test on configured codecs to make sure supporting libs are in place. 825 * @param c 826 * @throws IOException 827 */ 828 private static void checkCodecs(final Configuration c) throws IOException { 829 // check to see if the codec list is available: 830 String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null); 831 if (codecs == null) return; 832 for (String codec : codecs) { 833 if (!CompressionTest.testCompression(codec)) { 834 throw new IOException("Compression codec " + codec + 835 " not supported, aborting RS construction"); 836 } 837 } 838 } 839 840 public String getClusterId() { 841 return this.clusterId; 842 } 843 844 /** 845 * Setup our cluster connection if not already initialized. 846 * @throws IOException 847 */ 848 protected synchronized void setupClusterConnection() throws IOException { 849 if (clusterConnection == null) { 850 clusterConnection = createClusterConnection(); 851 } 852 } 853 854 /** 855 * All initialization needed before we go register with Master.<br> 856 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 857 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 858 */ 859 private void preRegistrationInitialization() { 860 try { 861 initializeZooKeeper(); 862 setupClusterConnection(); 863 // Setup RPC client for master communication 864 this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( 865 this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); 866 } catch (Throwable t) { 867 // Call stop if error or process will stick around for ever since server 868 // puts up non-daemon threads. 869 this.rpcServices.stop(); 870 abort("Initialization of RS failed. Hence aborting RS.", t); 871 } 872 } 873 874 /** 875 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 876 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 877 * <p> 878 * Finally open long-living server short-circuit connection. 879 */ 880 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 881 justification="cluster Id znode read would give us correct response") 882 private void initializeZooKeeper() throws IOException, InterruptedException { 883 // Nothing to do in here if no Master in the mix. 884 if (this.masterless) { 885 return; 886 } 887 888 // Create the master address tracker, register with zk, and start it. Then 889 // block until a master is available. No point in starting up if no master 890 // running. 891 blockAndCheckIfStopped(this.masterAddressTracker); 892 893 // Wait on cluster being up. Master will set this flag up in zookeeper 894 // when ready. 895 blockAndCheckIfStopped(this.clusterStatusTracker); 896 897 // If we are HMaster then the cluster id should have already been set. 898 if (clusterId == null) { 899 // Retrieve clusterId 900 // Since cluster status is now up 901 // ID should have already been set by HMaster 902 try { 903 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 904 if (clusterId == null) { 905 this.abort("Cluster ID has not been set"); 906 } 907 LOG.info("ClusterId : " + clusterId); 908 } catch (KeeperException e) { 909 this.abort("Failed to retrieve Cluster ID", e); 910 } 911 } 912 913 waitForMasterActive(); 914 if (isStopped() || isAborted()) { 915 return; // No need for further initialization 916 } 917 918 // watch for snapshots and other procedures 919 try { 920 rspmHost = new RegionServerProcedureManagerHost(); 921 rspmHost.loadProcedures(conf); 922 rspmHost.initialize(this); 923 } catch (KeeperException e) { 924 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 925 } 926 } 927 928 /** 929 * Utilty method to wait indefinitely on a znode availability while checking 930 * if the region server is shut down 931 * @param tracker znode tracker to use 932 * @throws IOException any IO exception, plus if the RS is stopped 933 * @throws InterruptedException if the waiting thread is interrupted 934 */ 935 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 936 throws IOException, InterruptedException { 937 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 938 if (this.stopped) { 939 throw new IOException("Received the shutdown message while waiting."); 940 } 941 } 942 } 943 944 /** 945 * @return True if the cluster is up. 946 */ 947 @Override 948 public boolean isClusterUp() { 949 return this.masterless || 950 (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 951 } 952 953 /** 954 * The HRegionServer sticks in this loop until closed. 955 */ 956 @Override 957 public void run() { 958 if (isStopped()) { 959 LOG.info("Skipping run; stopped"); 960 return; 961 } 962 try { 963 // Do pre-registration initializations; zookeeper, lease threads, etc. 964 preRegistrationInitialization(); 965 } catch (Throwable e) { 966 abort("Fatal exception during initialization", e); 967 } 968 969 try { 970 if (!isStopped() && !isAborted()) { 971 ShutdownHook.install(conf, dataFs, this, Thread.currentThread()); 972 // Initialize the RegionServerCoprocessorHost now that our ephemeral 973 // node was created, in case any coprocessors want to use ZooKeeper 974 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 975 976 // Try and register with the Master; tell it we are here. Break if server is stopped or 977 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 978 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 979 // come up. 980 LOG.debug("About to register with Master."); 981 RetryCounterFactory rcf = 982 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 983 RetryCounter rc = rcf.create(); 984 while (keepLooping()) { 985 RegionServerStartupResponse w = reportForDuty(); 986 if (w == null) { 987 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 988 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 989 this.sleeper.sleep(sleepTime); 990 } else { 991 handleReportForDutyResponse(w); 992 break; 993 } 994 } 995 } 996 997 if (!isStopped() && isHealthy()) { 998 // start the snapshot handler and other procedure handlers, 999 // since the server is ready to run 1000 if (this.rspmHost != null) { 1001 this.rspmHost.start(); 1002 } 1003 // Start the Quota Manager 1004 if (this.rsQuotaManager != null) { 1005 rsQuotaManager.start(getRpcServer().getScheduler()); 1006 } 1007 if (this.rsSpaceQuotaManager != null) { 1008 this.rsSpaceQuotaManager.start(); 1009 } 1010 } 1011 1012 // We registered with the Master. Go into run mode. 1013 long lastMsg = System.currentTimeMillis(); 1014 long oldRequestCount = -1; 1015 // The main run loop. 1016 while (!isStopped() && isHealthy()) { 1017 if (!isClusterUp()) { 1018 if (onlineRegions.isEmpty()) { 1019 stop("Exiting; cluster shutdown set and not carrying any regions"); 1020 } else if (!this.stopping) { 1021 this.stopping = true; 1022 LOG.info("Closing user regions"); 1023 closeUserRegions(this.abortRequested); 1024 } else { 1025 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 1026 if (allUserRegionsOffline) { 1027 // Set stopped if no more write requests tp meta tables 1028 // since last time we went around the loop. Any open 1029 // meta regions will be closed on our way out. 1030 if (oldRequestCount == getWriteRequestCount()) { 1031 stop("Stopped; only catalog regions remaining online"); 1032 break; 1033 } 1034 oldRequestCount = getWriteRequestCount(); 1035 } else { 1036 // Make sure all regions have been closed -- some regions may 1037 // have not got it because we were splitting at the time of 1038 // the call to closeUserRegions. 1039 closeUserRegions(this.abortRequested); 1040 } 1041 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 1042 } 1043 } 1044 long now = System.currentTimeMillis(); 1045 if ((now - lastMsg) >= msgInterval) { 1046 tryRegionServerReport(lastMsg, now); 1047 lastMsg = System.currentTimeMillis(); 1048 } 1049 if (!isStopped() && !isAborted()) { 1050 this.sleeper.sleep(); 1051 } 1052 } // for 1053 } catch (Throwable t) { 1054 if (!rpcServices.checkOOME(t)) { 1055 String prefix = t instanceof YouAreDeadException? "": "Unhandled: "; 1056 abort(prefix + t.getMessage(), t); 1057 } 1058 } 1059 1060 if (this.leaseManager != null) { 1061 this.leaseManager.closeAfterLeasesExpire(); 1062 } 1063 if (this.splitLogWorker != null) { 1064 splitLogWorker.stop(); 1065 } 1066 if (this.infoServer != null) { 1067 LOG.info("Stopping infoServer"); 1068 try { 1069 this.infoServer.stop(); 1070 } catch (Exception e) { 1071 LOG.error("Failed to stop infoServer", e); 1072 } 1073 } 1074 // Send cache a shutdown. 1075 if (blockCache != null) { 1076 blockCache.shutdown(); 1077 } 1078 if (mobFileCache != null) { 1079 mobFileCache.shutdown(); 1080 } 1081 1082 if (movedRegionsCleaner != null) { 1083 movedRegionsCleaner.stop("Region Server stopping"); 1084 } 1085 1086 // Send interrupts to wake up threads if sleeping so they notice shutdown. 1087 // TODO: Should we check they are alive? If OOME could have exited already 1088 if (this.hMemManager != null) this.hMemManager.stop(); 1089 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); 1090 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); 1091 1092 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 1093 if (rspmHost != null) { 1094 rspmHost.stop(this.abortRequested || this.killed); 1095 } 1096 1097 if (this.killed) { 1098 // Just skip out w/o closing regions. Used when testing. 1099 } else if (abortRequested) { 1100 if (this.dataFsOk) { 1101 closeUserRegions(abortRequested); // Don't leave any open file handles 1102 } 1103 LOG.info("aborting server " + this.serverName); 1104 } else { 1105 closeUserRegions(abortRequested); 1106 LOG.info("stopping server " + this.serverName); 1107 } 1108 1109 if (this.clusterConnection != null && !clusterConnection.isClosed()) { 1110 try { 1111 this.clusterConnection.close(); 1112 } catch (IOException e) { 1113 // Although the {@link Closeable} interface throws an {@link 1114 // IOException}, in reality, the implementation would never do that. 1115 LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e); 1116 } 1117 } 1118 1119 // Closing the compactSplit thread before closing meta regions 1120 if (!this.killed && containsMetaTableRegions()) { 1121 if (!abortRequested || this.dataFsOk) { 1122 if (this.compactSplitThread != null) { 1123 this.compactSplitThread.join(); 1124 this.compactSplitThread = null; 1125 } 1126 closeMetaTableRegions(abortRequested); 1127 } 1128 } 1129 1130 if (!this.killed && this.dataFsOk) { 1131 waitOnAllRegionsToClose(abortRequested); 1132 LOG.info("stopping server " + this.serverName + "; all regions closed."); 1133 } 1134 1135 // Stop the quota manager 1136 if (rsQuotaManager != null) { 1137 rsQuotaManager.stop(); 1138 } 1139 if (rsSpaceQuotaManager != null) { 1140 rsSpaceQuotaManager.stop(); 1141 rsSpaceQuotaManager = null; 1142 } 1143 1144 // flag may be changed when closing regions throws exception. 1145 if (this.dataFsOk) { 1146 shutdownWAL(!abortRequested); 1147 } 1148 1149 // Make sure the proxy is down. 1150 if (this.rssStub != null) { 1151 this.rssStub = null; 1152 } 1153 if (this.lockStub != null) { 1154 this.lockStub = null; 1155 } 1156 if (this.rpcClient != null) { 1157 this.rpcClient.close(); 1158 } 1159 if (this.leaseManager != null) { 1160 this.leaseManager.close(); 1161 } 1162 if (this.pauseMonitor != null) { 1163 this.pauseMonitor.stop(); 1164 } 1165 1166 if (!killed) { 1167 stopServiceThreads(); 1168 } 1169 1170 if (this.rpcServices != null) { 1171 this.rpcServices.stop(); 1172 } 1173 1174 try { 1175 deleteMyEphemeralNode(); 1176 } catch (KeeperException.NoNodeException nn) { 1177 // pass 1178 } catch (KeeperException e) { 1179 LOG.warn("Failed deleting my ephemeral node", e); 1180 } 1181 // We may have failed to delete the znode at the previous step, but 1182 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1183 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1184 1185 if (this.zooKeeper != null) { 1186 this.zooKeeper.close(); 1187 } 1188 this.shutDown = true; 1189 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1190 } 1191 1192 private boolean containsMetaTableRegions() { 1193 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1194 } 1195 1196 private boolean areAllUserRegionsOffline() { 1197 if (getNumberOfOnlineRegions() > 2) return false; 1198 boolean allUserRegionsOffline = true; 1199 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 1200 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1201 allUserRegionsOffline = false; 1202 break; 1203 } 1204 } 1205 return allUserRegionsOffline; 1206 } 1207 1208 /** 1209 * @return Current write count for all online regions. 1210 */ 1211 private long getWriteRequestCount() { 1212 long writeCount = 0; 1213 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 1214 writeCount += e.getValue().getWriteRequestsCount(); 1215 } 1216 return writeCount; 1217 } 1218 1219 @VisibleForTesting 1220 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1221 throws IOException { 1222 RegionServerStatusService.BlockingInterface rss = rssStub; 1223 if (rss == null) { 1224 // the current server could be stopping. 1225 return; 1226 } 1227 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1228 try { 1229 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1230 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1231 request.setLoad(sl); 1232 rss.regionServerReport(null, request.build()); 1233 } catch (ServiceException se) { 1234 IOException ioe = ProtobufUtil.getRemoteException(se); 1235 if (ioe instanceof YouAreDeadException) { 1236 // This will be caught and handled as a fatal error in run() 1237 throw ioe; 1238 } 1239 if (rssStub == rss) { 1240 rssStub = null; 1241 } 1242 // Couldn't connect to the master, get location from zk and reconnect 1243 // Method blocks until new master is found or we are stopped 1244 createRegionServerStatusStub(true); 1245 } 1246 } 1247 1248 /** 1249 * Reports the given map of Regions and their size on the filesystem to the active Master. 1250 * 1251 * @param regionSizeStore The store containing region sizes 1252 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1253 */ 1254 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1255 RegionServerStatusService.BlockingInterface rss = rssStub; 1256 if (rss == null) { 1257 // the current server could be stopping. 1258 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1259 return true; 1260 } 1261 try { 1262 buildReportAndSend(rss, regionSizeStore); 1263 } catch (ServiceException se) { 1264 IOException ioe = ProtobufUtil.getRemoteException(se); 1265 if (ioe instanceof PleaseHoldException) { 1266 LOG.trace("Failed to report region sizes to Master because it is initializing." 1267 + " This will be retried.", ioe); 1268 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1269 return true; 1270 } 1271 if (rssStub == rss) { 1272 rssStub = null; 1273 } 1274 createRegionServerStatusStub(true); 1275 if (ioe instanceof DoNotRetryIOException) { 1276 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1277 if (doNotRetryEx.getCause() != null) { 1278 Throwable t = doNotRetryEx.getCause(); 1279 if (t instanceof UnsupportedOperationException) { 1280 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1281 return false; 1282 } 1283 } 1284 } 1285 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1286 } 1287 return true; 1288 } 1289 1290 /** 1291 * Builds the region size report and sends it to the master. Upon successful sending of the 1292 * report, the region sizes that were sent are marked as sent. 1293 * 1294 * @param rss The stub to send to the Master 1295 * @param regionSizeStore The store containing region sizes 1296 */ 1297 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1298 RegionSizeStore regionSizeStore) throws ServiceException { 1299 RegionSpaceUseReportRequest request = 1300 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1301 rss.reportRegionSpaceUse(null, request); 1302 // Record the number of size reports sent 1303 if (metricsRegionServer != null) { 1304 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1305 } 1306 } 1307 1308 /** 1309 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1310 * 1311 * @param regionSizes The size in bytes of regions 1312 * @return The corresponding protocol buffer message. 1313 */ 1314 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1315 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1316 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1317 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1318 } 1319 return request.build(); 1320 } 1321 1322 /** 1323 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} 1324 * protobuf message. 1325 * 1326 * @param regionInfo The RegionInfo 1327 * @param sizeInBytes The size in bytes of the Region 1328 * @return The protocol buffer 1329 */ 1330 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1331 return RegionSpaceUse.newBuilder() 1332 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1333 .setRegionSize(Objects.requireNonNull(sizeInBytes)) 1334 .build(); 1335 } 1336 1337 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1338 throws IOException { 1339 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1340 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1341 // the wrapper to compute those numbers in one place. 1342 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1343 // Instead they should be stored in an HBase table so that external visibility into HBase is 1344 // improved; Additionally the load balancer will be able to take advantage of a more complete 1345 // history. 1346 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1347 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1348 long usedMemory = -1L; 1349 long maxMemory = -1L; 1350 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1351 if (usage != null) { 1352 usedMemory = usage.getUsed(); 1353 maxMemory = usage.getMax(); 1354 } 1355 1356 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1357 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1358 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1359 serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024)); 1360 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1361 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1362 Builder coprocessorBuilder = Coprocessor.newBuilder(); 1363 for (String coprocessor : coprocessors) { 1364 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1365 } 1366 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1367 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1368 for (HRegion region : regions) { 1369 if (region.getCoprocessorHost() != null) { 1370 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1371 for (String regionCoprocessor : regionCoprocessors) { 1372 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1373 } 1374 } 1375 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1376 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1377 .getCoprocessors()) { 1378 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1379 } 1380 } 1381 serverLoad.setReportStartTime(reportStartTime); 1382 serverLoad.setReportEndTime(reportEndTime); 1383 if (this.infoServer != null) { 1384 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1385 } else { 1386 serverLoad.setInfoServerPort(-1); 1387 } 1388 MetricsUserAggregateSource userSource = 1389 metricsRegionServer.getMetricsUserAggregate().getSource(); 1390 if (userSource != null) { 1391 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1392 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1393 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1394 } 1395 } 1396 // for the replicationLoad purpose. Only need to get from one executorService 1397 // either source or sink will get the same info 1398 ReplicationSourceService rsources = getReplicationSourceService(); 1399 1400 if (rsources != null) { 1401 // always refresh first to get the latest value 1402 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); 1403 if (rLoad != null) { 1404 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1405 for (ClusterStatusProtos.ReplicationLoadSource rLS : 1406 rLoad.getReplicationLoadSourceEntries()) { 1407 serverLoad.addReplLoadSource(rLS); 1408 } 1409 1410 } 1411 } 1412 1413 return serverLoad.build(); 1414 } 1415 1416 private String getOnlineRegionsAsPrintableString() { 1417 StringBuilder sb = new StringBuilder(); 1418 for (Region r: this.onlineRegions.values()) { 1419 if (sb.length() > 0) sb.append(", "); 1420 sb.append(r.getRegionInfo().getEncodedName()); 1421 } 1422 return sb.toString(); 1423 } 1424 1425 /** 1426 * Wait on regions close. 1427 */ 1428 private void waitOnAllRegionsToClose(final boolean abort) { 1429 // Wait till all regions are closed before going out. 1430 int lastCount = -1; 1431 long previousLogTime = 0; 1432 Set<String> closedRegions = new HashSet<>(); 1433 boolean interrupted = false; 1434 try { 1435 while (!onlineRegions.isEmpty()) { 1436 int count = getNumberOfOnlineRegions(); 1437 // Only print a message if the count of regions has changed. 1438 if (count != lastCount) { 1439 // Log every second at most 1440 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 1441 previousLogTime = System.currentTimeMillis(); 1442 lastCount = count; 1443 LOG.info("Waiting on " + count + " regions to close"); 1444 // Only print out regions still closing if a small number else will 1445 // swamp the log. 1446 if (count < 10 && LOG.isDebugEnabled()) { 1447 LOG.debug("Online Regions=" + this.onlineRegions); 1448 } 1449 } 1450 } 1451 // Ensure all user regions have been sent a close. Use this to 1452 // protect against the case where an open comes in after we start the 1453 // iterator of onlineRegions to close all user regions. 1454 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1455 RegionInfo hri = e.getValue().getRegionInfo(); 1456 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) && 1457 !closedRegions.contains(hri.getEncodedName())) { 1458 closedRegions.add(hri.getEncodedName()); 1459 // Don't update zk with this close transition; pass false. 1460 closeRegionIgnoreErrors(hri, abort); 1461 } 1462 } 1463 // No regions in RIT, we could stop waiting now. 1464 if (this.regionsInTransitionInRS.isEmpty()) { 1465 if (!onlineRegions.isEmpty()) { 1466 LOG.info("We were exiting though online regions are not empty," + 1467 " because some regions failed closing"); 1468 } 1469 break; 1470 } else { 1471 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream(). 1472 map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1473 } 1474 if (sleepInterrupted(200)) { 1475 interrupted = true; 1476 } 1477 } 1478 } finally { 1479 if (interrupted) { 1480 Thread.currentThread().interrupt(); 1481 } 1482 } 1483 } 1484 1485 private static boolean sleepInterrupted(long millis) { 1486 boolean interrupted = false; 1487 try { 1488 Thread.sleep(millis); 1489 } catch (InterruptedException e) { 1490 LOG.warn("Interrupted while sleeping"); 1491 interrupted = true; 1492 } 1493 return interrupted; 1494 } 1495 1496 private void shutdownWAL(final boolean close) { 1497 if (this.walFactory != null) { 1498 try { 1499 if (close) { 1500 walFactory.close(); 1501 } else { 1502 walFactory.shutdown(); 1503 } 1504 } catch (Throwable e) { 1505 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1506 LOG.error("Shutdown / close of WAL failed: " + e); 1507 LOG.debug("Shutdown / close exception details:", e); 1508 } 1509 } 1510 } 1511 1512 /** 1513 * get Online SlowLog Provider to add slow logs to ringbuffer 1514 * 1515 * @return Online SlowLog Provider 1516 */ 1517 public SlowLogRecorder getSlowLogRecorder() { 1518 return this.slowLogRecorder; 1519 } 1520 1521 /* 1522 * Run init. Sets up wal and starts up all server threads. 1523 * 1524 * @param c Extra configuration. 1525 */ 1526 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1527 throws IOException { 1528 try { 1529 boolean updateRootDir = false; 1530 for (NameStringPair e : c.getMapEntriesList()) { 1531 String key = e.getName(); 1532 // The hostname the master sees us as. 1533 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1534 String hostnameFromMasterPOV = e.getValue(); 1535 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(), 1536 this.startcode); 1537 if (!StringUtils.isBlank(useThisHostnameInstead) && 1538 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) { 1539 String msg = "Master passed us a different hostname to use; was=" + 1540 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV; 1541 LOG.error(msg); 1542 throw new IOException(msg); 1543 } 1544 if (StringUtils.isBlank(useThisHostnameInstead) && 1545 !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) { 1546 String msg = "Master passed us a different hostname to use; was=" + 1547 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV; 1548 LOG.error(msg); 1549 } 1550 continue; 1551 } 1552 1553 String value = e.getValue(); 1554 if (key.equals(HConstants.HBASE_DIR)) { 1555 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1556 updateRootDir = true; 1557 } 1558 } 1559 1560 if (LOG.isDebugEnabled()) { 1561 LOG.debug("Config from master: " + key + "=" + value); 1562 } 1563 this.conf.set(key, value); 1564 } 1565 // Set our ephemeral znode up in zookeeper now we have a name. 1566 createMyEphemeralNode(); 1567 1568 if (updateRootDir) { 1569 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1570 initializeFileSystem(); 1571 } 1572 1573 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1574 // config param for task trackers, but we can piggyback off of it. 1575 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1576 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1577 } 1578 1579 // Save it in a file, this will allow to see if we crash 1580 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1581 1582 // This call sets up an initialized replication and WAL. Later we start it up. 1583 setupWALAndReplication(); 1584 // Init in here rather than in constructor after thread name has been set 1585 final MetricsTable metricsTable = 1586 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1587 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1588 this.metricsRegionServer = new MetricsRegionServer(metricsRegionServerImpl, 1589 conf, metricsTable); 1590 // Now that we have a metrics source, start the pause monitor 1591 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1592 pauseMonitor.start(); 1593 1594 // There is a rare case where we do NOT want services to start. Check config. 1595 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1596 startServices(); 1597 } 1598 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1599 // or make sense of it. 1600 startReplicationService(); 1601 1602 1603 // Set up ZK 1604 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + 1605 ", sessionid=0x" + 1606 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1607 1608 // Wake up anyone waiting for this server to online 1609 synchronized (online) { 1610 online.set(true); 1611 online.notifyAll(); 1612 } 1613 } catch (Throwable e) { 1614 stop("Failed initialization"); 1615 throw convertThrowableToIOE(cleanup(e, "Failed init"), 1616 "Region server startup failed"); 1617 } finally { 1618 sleeper.skipSleepCycle(); 1619 } 1620 } 1621 1622 protected void initializeMemStoreChunkCreator() { 1623 if (MemStoreLAB.isEnabled(conf)) { 1624 // MSLAB is enabled. So initialize MemStoreChunkPool 1625 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from 1626 // it. 1627 Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf); 1628 long globalMemStoreSize = pair.getFirst(); 1629 boolean offheap = this.regionServerAccounting.isOffheap(); 1630 // When off heap memstore in use, take full area for chunk pool. 1631 float poolSizePercentage = offheap? 1.0F: 1632 conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); 1633 float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, 1634 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); 1635 int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); 1636 // init the chunkCreator 1637 ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, 1638 initialCountPercentage, this.hMemManager); 1639 } 1640 } 1641 1642 private void startHeapMemoryManager() { 1643 if (this.blockCache != null) { 1644 this.hMemManager = 1645 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1646 this.hMemManager.start(getChoreService()); 1647 } 1648 } 1649 1650 private void createMyEphemeralNode() throws KeeperException { 1651 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1652 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1653 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1654 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1655 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1656 } 1657 1658 private void deleteMyEphemeralNode() throws KeeperException { 1659 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1660 } 1661 1662 @Override 1663 public RegionServerAccounting getRegionServerAccounting() { 1664 return regionServerAccounting; 1665 } 1666 1667 /** 1668 * @param r Region to get RegionLoad for. 1669 * @param regionLoadBldr the RegionLoad.Builder, can be null 1670 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1671 * @return RegionLoad instance. 1672 */ 1673 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1674 RegionSpecifier.Builder regionSpecifier) throws IOException { 1675 byte[] name = r.getRegionInfo().getRegionName(); 1676 int stores = 0; 1677 int storefiles = 0; 1678 int storeRefCount = 0; 1679 int maxCompactedStoreFileRefCount = 0; 1680 int storeUncompressedSizeMB = 0; 1681 int storefileSizeMB = 0; 1682 int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024); 1683 long storefileIndexSizeKB = 0; 1684 int rootLevelIndexSizeKB = 0; 1685 int totalStaticIndexSizeKB = 0; 1686 int totalStaticBloomSizeKB = 0; 1687 long totalCompactingKVs = 0; 1688 long currentCompactedKVs = 0; 1689 List<HStore> storeList = r.getStores(); 1690 stores += storeList.size(); 1691 for (HStore store : storeList) { 1692 storefiles += store.getStorefilesCount(); 1693 int currentStoreRefCount = store.getStoreRefCount(); 1694 storeRefCount += currentStoreRefCount; 1695 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1696 maxCompactedStoreFileRefCount = Math.max(maxCompactedStoreFileRefCount, 1697 currentMaxCompactedStoreFileRefCount); 1698 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024); 1699 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); 1700 //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1701 storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024; 1702 CompactionProgress progress = store.getCompactionProgress(); 1703 if (progress != null) { 1704 totalCompactingKVs += progress.getTotalCompactingKVs(); 1705 currentCompactedKVs += progress.currentCompactedKVs; 1706 } 1707 rootLevelIndexSizeKB += (int) (store.getStorefilesRootLevelIndexSize() / 1024); 1708 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024); 1709 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024); 1710 } 1711 1712 float dataLocality = 1713 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname()); 1714 if (regionLoadBldr == null) { 1715 regionLoadBldr = RegionLoad.newBuilder(); 1716 } 1717 if (regionSpecifier == null) { 1718 regionSpecifier = RegionSpecifier.newBuilder(); 1719 } 1720 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1721 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1722 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()) 1723 .setStores(stores) 1724 .setStorefiles(storefiles) 1725 .setStoreRefCount(storeRefCount) 1726 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1727 .setStoreUncompressedSizeMB(storeUncompressedSizeMB) 1728 .setStorefileSizeMB(storefileSizeMB) 1729 .setMemStoreSizeMB(memstoreSizeMB) 1730 .setStorefileIndexSizeKB(storefileIndexSizeKB) 1731 .setRootIndexSizeKB(rootLevelIndexSizeKB) 1732 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1733 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1734 .setReadRequestsCount(r.getReadRequestsCount()) 1735 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1736 .setWriteRequestsCount(r.getWriteRequestsCount()) 1737 .setTotalCompactingKVs(totalCompactingKVs) 1738 .setCurrentCompactedKVs(currentCompactedKVs) 1739 .setDataLocality(dataLocality) 1740 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); 1741 r.setCompleteSequenceId(regionLoadBldr); 1742 1743 return regionLoadBldr.build(); 1744 } 1745 1746 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1747 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1748 userLoadBldr.setUserName(user); 1749 userSource.getClientMetrics().values().stream().map( 1750 clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1751 .setHostName(clientMetrics.getHostName()) 1752 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1753 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1754 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) 1755 .forEach(userLoadBldr::addClientMetrics); 1756 return userLoadBldr.build(); 1757 } 1758 1759 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1760 HRegion r = onlineRegions.get(encodedRegionName); 1761 return r != null ? createRegionLoad(r, null, null) : null; 1762 } 1763 1764 /** 1765 * Inner class that runs on a long period checking if regions need compaction. 1766 */ 1767 private static class CompactionChecker extends ScheduledChore { 1768 private final HRegionServer instance; 1769 private final int majorCompactPriority; 1770 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1771 //Iteration is 1-based rather than 0-based so we don't check for compaction 1772 // immediately upon region server startup 1773 private long iteration = 1; 1774 1775 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1776 super("CompactionChecker", stopper, sleepTime); 1777 this.instance = h; 1778 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1779 1780 /* MajorCompactPriority is configurable. 1781 * If not set, the compaction will use default priority. 1782 */ 1783 this.majorCompactPriority = this.instance.conf. 1784 getInt("hbase.regionserver.compactionChecker.majorCompactPriority", 1785 DEFAULT_PRIORITY); 1786 } 1787 1788 @Override 1789 protected void chore() { 1790 for (Region r : this.instance.onlineRegions.values()) { 1791 if (r == null) { 1792 continue; 1793 } 1794 HRegion hr = (HRegion) r; 1795 for (HStore s : hr.stores.values()) { 1796 try { 1797 long multiplier = s.getCompactionCheckMultiplier(); 1798 assert multiplier > 0; 1799 if (iteration % multiplier != 0) { 1800 continue; 1801 } 1802 if (s.needsCompaction()) { 1803 // Queue a compaction. Will recognize if major is needed. 1804 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1805 getName() + " requests compaction"); 1806 } else if (s.shouldPerformMajorCompaction()) { 1807 s.triggerMajorCompaction(); 1808 if (majorCompactPriority == DEFAULT_PRIORITY || 1809 majorCompactPriority > hr.getCompactPriority()) { 1810 this.instance.compactSplitThread.requestCompaction(hr, s, 1811 getName() + " requests major compaction; use default priority", 1812 Store.NO_PRIORITY, 1813 CompactionLifeCycleTracker.DUMMY, null); 1814 } else { 1815 this.instance.compactSplitThread.requestCompaction(hr, s, 1816 getName() + " requests major compaction; use configured priority", 1817 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1818 } 1819 } 1820 } catch (IOException e) { 1821 LOG.warn("Failed major compaction check on " + r, e); 1822 } 1823 } 1824 } 1825 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1826 } 1827 } 1828 1829 private static class PeriodicMemStoreFlusher extends ScheduledChore { 1830 private final HRegionServer server; 1831 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1832 private final static int MIN_DELAY_TIME = 0; // millisec 1833 private final long rangeOfDelayMs; 1834 1835 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1836 super("MemstoreFlusherChore", server, cacheFlushInterval); 1837 this.server = server; 1838 1839 final long configuredRangeOfDelay = server.getConfiguration().getInt( 1840 "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 1841 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 1842 } 1843 1844 @Override 1845 protected void chore() { 1846 final StringBuilder whyFlush = new StringBuilder(); 1847 for (HRegion r : this.server.onlineRegions.values()) { 1848 if (r == null) continue; 1849 if (r.shouldFlush(whyFlush)) { 1850 FlushRequester requester = server.getFlushRequester(); 1851 if (requester != null) { 1852 long randomDelay = RandomUtils.nextLong(0, rangeOfDelayMs) + MIN_DELAY_TIME; 1853 //Throttle the flushes by putting a delay. If we don't throttle, and there 1854 //is a balanced write-load on the regions in a table, we might end up 1855 //overwhelming the filesystem with too many flushes at once. 1856 if (requester.requestDelayedFlush(r, randomDelay, false)) { 1857 LOG.info("{} requesting flush of {} because {} after random delay {} ms", 1858 getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), 1859 randomDelay); 1860 } 1861 } 1862 } 1863 } 1864 } 1865 } 1866 1867 /** 1868 * Report the status of the server. A server is online once all the startup is 1869 * completed (setting up filesystem, starting executorService threads, etc.). This 1870 * method is designed mostly to be useful in tests. 1871 * 1872 * @return true if online, false if not. 1873 */ 1874 public boolean isOnline() { 1875 return online.get(); 1876 } 1877 1878 /** 1879 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1880 * be hooked up to WAL. 1881 */ 1882 private void setupWALAndReplication() throws IOException { 1883 WALFactory factory = new WALFactory(conf, serverName.toString()); 1884 1885 // TODO Replication make assumptions here based on the default filesystem impl 1886 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1887 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1888 1889 Path logDir = new Path(walRootDir, logName); 1890 LOG.debug("logDir={}", logDir); 1891 if (this.walFs.exists(logDir)) { 1892 throw new RegionServerRunningException( 1893 "Region server has already created directory at " + this.serverName.toString()); 1894 } 1895 // Always create wal directory as now we need this when master restarts to find out the live 1896 // region servers. 1897 if (!this.walFs.mkdirs(logDir)) { 1898 throw new IOException("Can not create wal directory " + logDir); 1899 } 1900 // Instantiate replication if replication enabled. Pass it the log directories. 1901 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, 1902 factory.getWALProvider()); 1903 this.walFactory = factory; 1904 } 1905 1906 /** 1907 * Start up replication source and sink handlers. 1908 */ 1909 private void startReplicationService() throws IOException { 1910 if (this.replicationSourceHandler == this.replicationSinkHandler && 1911 this.replicationSourceHandler != null) { 1912 this.replicationSourceHandler.startReplicationService(); 1913 } else { 1914 if (this.replicationSourceHandler != null) { 1915 this.replicationSourceHandler.startReplicationService(); 1916 } 1917 if (this.replicationSinkHandler != null) { 1918 this.replicationSinkHandler.startReplicationService(); 1919 } 1920 } 1921 } 1922 1923 /** 1924 * @return Master address tracker instance. 1925 */ 1926 public MasterAddressTracker getMasterAddressTracker() { 1927 return this.masterAddressTracker; 1928 } 1929 1930 /** 1931 * Start maintenance Threads, Server, Worker and lease checker threads. 1932 * Start all threads we need to run. This is called after we've successfully 1933 * registered with the Master. 1934 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we 1935 * get an unhandled exception. We cannot set the handler on all threads. 1936 * Server's internal Listener thread is off limits. For Server, if an OOME, it 1937 * waits a while then retries. Meantime, a flush or a compaction that tries to 1938 * run should trigger same critical condition and the shutdown will run. On 1939 * its way out, this server will shut down Server. Leases are sort of 1940 * inbetween. It has an internal thread that while it inherits from Chore, it 1941 * keeps its own internal stop mechanism so needs to be stopped by this 1942 * hosting server. Worker logs the exception and exits. 1943 */ 1944 private void startServices() throws IOException { 1945 if (!isStopped() && !isAborted()) { 1946 initializeThreads(); 1947 } 1948 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); 1949 this.secureBulkLoadManager.start(); 1950 1951 // Health checker thread. 1952 if (isHealthCheckerConfigured()) { 1953 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1954 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1955 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1956 } 1957 1958 this.walRoller = new LogRoller(this); 1959 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1960 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 1961 1962 // Create the CompactedFileDischarger chore executorService. This chore helps to 1963 // remove the compacted files that will no longer be used in reads. 1964 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1965 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1966 int cleanerInterval = 1967 conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1968 this.compactedFileDischarger = 1969 new CompactedHFilesDischarger(cleanerInterval, this, this); 1970 choreService.scheduleChore(compactedFileDischarger); 1971 1972 // Start executor services 1973 this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION, 1974 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1975 this.executorService.startExecutorService(ExecutorType.RS_OPEN_META, 1976 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); 1977 this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, 1978 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); 1979 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION, 1980 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); 1981 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META, 1982 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); 1983 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1984 this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, 1985 conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); 1986 } 1987 this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( 1988 HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); 1989 // Start the threads for compacted files discharger 1990 this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, 1991 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); 1992 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1993 this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, 1994 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1995 conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); 1996 } 1997 this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER, 1998 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); 1999 this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE, 2000 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1)); 2001 2002 Threads 2003 .setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); 2004 if (this.cacheFlusher != null) { 2005 this.cacheFlusher.start(uncaughtExceptionHandler); 2006 } 2007 Threads.setDaemonThreadRunning(this.procedureResultReporter, 2008 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 2009 2010 if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); 2011 if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); 2012 if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); 2013 if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); 2014 if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); 2015 if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); 2016 if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore); 2017 if (this.slowLogTableOpsChore != null) { 2018 choreService.scheduleChore(slowLogTableOpsChore); 2019 } 2020 2021 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 2022 // an unhandled exception, it will just exit. 2023 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 2024 uncaughtExceptionHandler); 2025 2026 // Create the log splitting worker and start it 2027 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 2028 // quite a while inside Connection layer. The worker won't be available for other 2029 // tasks even after current task is preempted after a split task times out. 2030 Configuration sinkConf = HBaseConfiguration.create(conf); 2031 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2032 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 2033 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2034 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 2035 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 2036 if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 2037 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { 2038 // SplitLogWorker needs csm. If none, don't start this. 2039 this.splitLogWorker = new SplitLogWorker(sinkConf, this, 2040 this, walFactory); 2041 splitLogWorker.start(); 2042 LOG.debug("SplitLogWorker started"); 2043 } 2044 2045 // Memstore services. 2046 startHeapMemoryManager(); 2047 // Call it after starting HeapMemoryManager. 2048 initializeMemStoreChunkCreator(); 2049 } 2050 2051 private void initializeThreads() { 2052 // Cache flushing thread. 2053 this.cacheFlusher = new MemStoreFlusher(conf, this); 2054 2055 // Compaction thread 2056 this.compactSplitThread = new CompactSplit(this); 2057 2058 // Background thread to check for compactions; needed if region has not gotten updates 2059 // in a while. It will take care of not checking too frequently on store-by-store basis. 2060 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 2061 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 2062 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 2063 2064 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 2065 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 2066 if (isSlowLogTableEnabled) { 2067 // default chore duration: 10 min 2068 final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); 2069 slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder); 2070 } 2071 2072 // Create the thread to clean the moved regions list 2073 movedRegionsCleaner = MovedRegionsCleaner.create(this); 2074 2075 if (this.nonceManager != null) { 2076 // Create the scheduled chore that cleans up nonces. 2077 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2078 } 2079 2080 // Setup the Quota Manager 2081 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2082 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2083 2084 if (QuotaUtil.isQuotaEnabled(conf)) { 2085 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2086 } 2087 2088 2089 boolean onlyMetaRefresh = false; 2090 int storefileRefreshPeriod = conf.getInt( 2091 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2092 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2093 if (storefileRefreshPeriod == 0) { 2094 storefileRefreshPeriod = conf.getInt( 2095 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2096 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2097 onlyMetaRefresh = true; 2098 } 2099 if (storefileRefreshPeriod > 0) { 2100 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, 2101 onlyMetaRefresh, this, this); 2102 } 2103 registerConfigurationObservers(); 2104 } 2105 2106 private void registerConfigurationObservers() { 2107 // Registering the compactSplitThread object with the ConfigurationManager. 2108 configurationManager.registerObserver(this.compactSplitThread); 2109 configurationManager.registerObserver(this.rpcServices); 2110 configurationManager.registerObserver(this); 2111 } 2112 2113 /** 2114 * Puts up the webui. 2115 */ 2116 private void putUpWebUI() throws IOException { 2117 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 2118 HConstants.DEFAULT_REGIONSERVER_INFOPORT); 2119 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); 2120 2121 if(this instanceof HMaster) { 2122 port = conf.getInt(HConstants.MASTER_INFO_PORT, 2123 HConstants.DEFAULT_MASTER_INFOPORT); 2124 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); 2125 } 2126 // -1 is for disabling info server 2127 if (port < 0) { 2128 return; 2129 } 2130 2131 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { 2132 String msg = 2133 "Failed to start http info server. Address " + addr 2134 + " does not belong to this host. Correct configuration parameter: " 2135 + "hbase.regionserver.info.bindAddress"; 2136 LOG.error(msg); 2137 throw new IOException(msg); 2138 } 2139 // check if auto port bind enabled 2140 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false); 2141 while (true) { 2142 try { 2143 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf); 2144 infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet()); 2145 configureInfoServer(); 2146 this.infoServer.start(); 2147 break; 2148 } catch (BindException e) { 2149 if (!auto) { 2150 // auto bind disabled throw BindException 2151 LOG.error("Failed binding http info server to port: " + port); 2152 throw e; 2153 } 2154 // auto bind enabled, try to use another port 2155 LOG.info("Failed binding http info server to port: " + port); 2156 port++; 2157 } 2158 } 2159 port = this.infoServer.getPort(); 2160 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port); 2161 int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT, 2162 HConstants.DEFAULT_MASTER_INFOPORT); 2163 conf.setInt("hbase.master.info.port.orig", masterInfoPort); 2164 conf.setInt(HConstants.MASTER_INFO_PORT, port); 2165 } 2166 2167 /* 2168 * Verify that server is healthy 2169 */ 2170 private boolean isHealthy() { 2171 if (!dataFsOk) { 2172 // File system problem 2173 return false; 2174 } 2175 // Verify that all threads are alive 2176 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2177 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2178 && (this.walRoller == null || this.walRoller.isAlive()) 2179 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2180 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2181 if (!healthy) { 2182 stop("One or more threads are no longer alive -- stop"); 2183 } 2184 return healthy; 2185 } 2186 2187 @Override 2188 public List<WAL> getWALs() { 2189 return walFactory.getWALs(); 2190 } 2191 2192 @Override 2193 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2194 try { 2195 WAL wal = walFactory.getWAL(regionInfo); 2196 if (this.walRoller != null) { 2197 this.walRoller.addWAL(wal); 2198 } 2199 return wal; 2200 }catch (FailedCloseWALAfterInitializedErrorException ex) { 2201 // see HBASE-21751 for details 2202 abort("wal can not clean up after init failed", ex); 2203 throw ex; 2204 } 2205 } 2206 2207 public LogRoller getWalRoller() { 2208 return walRoller; 2209 } 2210 2211 WALFactory getWalFactory() { 2212 return walFactory; 2213 } 2214 2215 @Override 2216 public Connection getConnection() { 2217 return getClusterConnection(); 2218 } 2219 2220 @Override 2221 public ClusterConnection getClusterConnection() { 2222 return this.clusterConnection; 2223 } 2224 2225 @Override 2226 public void stop(final String msg) { 2227 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2228 } 2229 2230 /** 2231 * Stops the regionserver. 2232 * @param msg Status message 2233 * @param force True if this is a regionserver abort 2234 * @param user The user executing the stop request, or null if no user is associated 2235 */ 2236 public void stop(final String msg, final boolean force, final User user) { 2237 if (!this.stopped) { 2238 LOG.info("***** STOPPING region server '" + this + "' *****"); 2239 if (this.rsHost != null) { 2240 // when forced via abort don't allow CPs to override 2241 try { 2242 this.rsHost.preStop(msg, user); 2243 } catch (IOException ioe) { 2244 if (!force) { 2245 LOG.warn("The region server did not stop", ioe); 2246 return; 2247 } 2248 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2249 } 2250 } 2251 this.stopped = true; 2252 LOG.info("STOPPED: " + msg); 2253 // Wakes run() if it is sleeping 2254 sleeper.skipSleepCycle(); 2255 } 2256 } 2257 2258 public void waitForServerOnline(){ 2259 while (!isStopped() && !isOnline()) { 2260 synchronized (online) { 2261 try { 2262 online.wait(msgInterval); 2263 } catch (InterruptedException ie) { 2264 Thread.currentThread().interrupt(); 2265 break; 2266 } 2267 } 2268 } 2269 } 2270 2271 @Override 2272 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2273 HRegion r = context.getRegion(); 2274 long openProcId = context.getOpenProcId(); 2275 long masterSystemTime = context.getMasterSystemTime(); 2276 rpcServices.checkOpen(); 2277 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2278 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2279 // Do checks to see if we need to compact (references or too many files) 2280 for (HStore s : r.stores.values()) { 2281 if (s.hasReferences() || s.needsCompaction()) { 2282 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2283 } 2284 } 2285 long openSeqNum = r.getOpenSeqNum(); 2286 if (openSeqNum == HConstants.NO_SEQNUM) { 2287 // If we opened a region, we should have read some sequence number from it. 2288 LOG.error( 2289 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2290 openSeqNum = 0; 2291 } 2292 2293 // Notify master 2294 if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2295 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) { 2296 throw new IOException( 2297 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2298 } 2299 2300 triggerFlushInPrimaryRegion(r); 2301 2302 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2303 } 2304 2305 /** 2306 * Helper method for use in tests. Skip the region transition report when there's no master 2307 * around to receive it. 2308 */ 2309 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2310 final TransitionCode code = context.getCode(); 2311 final long openSeqNum = context.getOpenSeqNum(); 2312 long masterSystemTime = context.getMasterSystemTime(); 2313 final RegionInfo[] hris = context.getHris(); 2314 2315 if (code == TransitionCode.OPENED) { 2316 Preconditions.checkArgument(hris != null && hris.length == 1); 2317 if (hris[0].isMetaRegion()) { 2318 try { 2319 MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, 2320 hris[0].getReplicaId(), RegionState.State.OPEN); 2321 } catch (KeeperException e) { 2322 LOG.info("Failed to update meta location", e); 2323 return false; 2324 } 2325 } else { 2326 try { 2327 MetaTableAccessor.updateRegionLocation(clusterConnection, 2328 hris[0], serverName, openSeqNum, masterSystemTime); 2329 } catch (IOException e) { 2330 LOG.info("Failed to update meta", e); 2331 return false; 2332 } 2333 } 2334 } 2335 return true; 2336 } 2337 2338 private ReportRegionStateTransitionRequest createReportRegionStateTransitionRequest( 2339 final RegionStateTransitionContext context) { 2340 final TransitionCode code = context.getCode(); 2341 final long openSeqNum = context.getOpenSeqNum(); 2342 final RegionInfo[] hris = context.getHris(); 2343 final long[] procIds = context.getProcIds(); 2344 2345 ReportRegionStateTransitionRequest.Builder builder = 2346 ReportRegionStateTransitionRequest.newBuilder(); 2347 builder.setServer(ProtobufUtil.toServerName(serverName)); 2348 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2349 transition.setTransitionCode(code); 2350 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2351 transition.setOpenSeqNum(openSeqNum); 2352 } 2353 for (RegionInfo hri: hris) { 2354 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2355 } 2356 for (long procId: procIds) { 2357 transition.addProcId(procId); 2358 } 2359 2360 return builder.build(); 2361 } 2362 2363 @Override 2364 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2365 if (TEST_SKIP_REPORTING_TRANSITION) { 2366 return skipReportingTransition(context); 2367 } 2368 final ReportRegionStateTransitionRequest request = 2369 createReportRegionStateTransitionRequest(context); 2370 2371 // Time to pause if master says 'please hold'. Make configurable if needed. 2372 final long initPauseTime = 1000; 2373 int tries = 0; 2374 long pauseTime; 2375 // Keep looping till we get an error. We want to send reports even though server is going down. 2376 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2377 // HRegionServer does down. 2378 while (this.clusterConnection != null && !this.clusterConnection.isClosed()) { 2379 RegionServerStatusService.BlockingInterface rss = rssStub; 2380 try { 2381 if (rss == null) { 2382 createRegionServerStatusStub(); 2383 continue; 2384 } 2385 ReportRegionStateTransitionResponse response = 2386 rss.reportRegionStateTransition(null, request); 2387 if (response.hasErrorMessage()) { 2388 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2389 break; 2390 } 2391 // Log if we had to retry else don't log unless TRACE. We want to 2392 // know if were successful after an attempt showed in logs as failed. 2393 if (tries > 0 || LOG.isTraceEnabled()) { 2394 LOG.info("TRANSITION REPORTED " + request); 2395 } 2396 // NOTE: Return mid-method!!! 2397 return true; 2398 } catch (ServiceException se) { 2399 IOException ioe = ProtobufUtil.getRemoteException(se); 2400 boolean pause = 2401 ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException 2402 || ioe instanceof CallQueueTooBigException; 2403 if (pause) { 2404 // Do backoff else we flood the Master with requests. 2405 pauseTime = ConnectionUtils.getPauseTime(initPauseTime, tries); 2406 } else { 2407 pauseTime = initPauseTime; // Reset. 2408 } 2409 LOG.info("Failed report transition " + 2410 TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" + 2411 (pause? 2412 " after " + pauseTime + "ms delay (Master is coming online...).": 2413 " immediately."), 2414 ioe); 2415 if (pause) Threads.sleep(pauseTime); 2416 tries++; 2417 if (rssStub == rss) { 2418 rssStub = null; 2419 } 2420 } 2421 } 2422 return false; 2423 } 2424 2425 /** 2426 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2427 * block this thread. See RegionReplicaFlushHandler for details. 2428 */ 2429 private void triggerFlushInPrimaryRegion(final HRegion region) { 2430 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2431 return; 2432 } 2433 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) || 2434 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled( 2435 region.conf)) { 2436 region.setReadsEnabled(true); 2437 return; 2438 } 2439 2440 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2441 // RegionReplicaFlushHandler might reset this. 2442 2443 // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2444 if (this.executorService != null) { 2445 this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, 2446 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); 2447 } 2448 } 2449 2450 @Override 2451 public RpcServerInterface getRpcServer() { 2452 return rpcServices.rpcServer; 2453 } 2454 2455 @VisibleForTesting 2456 public RSRpcServices getRSRpcServices() { 2457 return rpcServices; 2458 } 2459 2460 /** 2461 * Cause the server to exit without closing the regions it is serving, the log 2462 * it is using and without notifying the master. Used unit testing and on 2463 * catastrophic events such as HDFS is yanked out from under hbase or we OOME. 2464 * 2465 * @param reason 2466 * the reason we are aborting 2467 * @param cause 2468 * the exception that caused the abort, or null 2469 */ 2470 @Override 2471 public void abort(String reason, Throwable cause) { 2472 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2473 if (cause != null) { 2474 LOG.error(HBaseMarkers.FATAL, msg, cause); 2475 } else { 2476 LOG.error(HBaseMarkers.FATAL, msg); 2477 } 2478 setAbortRequested(); 2479 // HBASE-4014: show list of coprocessors that were loaded to help debug 2480 // regionserver crashes.Note that we're implicitly using 2481 // java.util.HashSet's toString() method to print the coprocessor names. 2482 LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " + 2483 CoprocessorHost.getLoadedCoprocessors()); 2484 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2485 try { 2486 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2487 } catch (MalformedObjectNameException | IOException e) { 2488 LOG.warn("Failed dumping metrics", e); 2489 } 2490 2491 // Do our best to report our abort to the master, but this may not work 2492 try { 2493 if (cause != null) { 2494 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2495 } 2496 // Report to the master but only if we have already registered with the master. 2497 RegionServerStatusService.BlockingInterface rss = rssStub; 2498 if (rss != null && this.serverName != null) { 2499 ReportRSFatalErrorRequest.Builder builder = 2500 ReportRSFatalErrorRequest.newBuilder(); 2501 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2502 builder.setErrorMessage(msg); 2503 rss.reportRSFatalError(null, builder.build()); 2504 } 2505 } catch (Throwable t) { 2506 LOG.warn("Unable to report fatal error to master", t); 2507 } 2508 2509 scheduleAbortTimer(); 2510 // shutdown should be run as the internal user 2511 stop(reason, true, null); 2512 } 2513 2514 protected final void setAbortRequested() { 2515 this.abortRequested = true; 2516 } 2517 2518 /** 2519 * @see HRegionServer#abort(String, Throwable) 2520 */ 2521 public void abort(String reason) { 2522 abort(reason, null); 2523 } 2524 2525 @Override 2526 public boolean isAborted() { 2527 return this.abortRequested; 2528 } 2529 2530 /* 2531 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup 2532 * logs but it does close socket in case want to bring up server on old 2533 * hostname+port immediately. 2534 */ 2535 @VisibleForTesting 2536 protected void kill() { 2537 this.killed = true; 2538 abort("Simulated kill"); 2539 } 2540 2541 // Limits the time spent in the shutdown process. 2542 private void scheduleAbortTimer() { 2543 if (this.abortMonitor == null) { 2544 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2545 TimerTask abortTimeoutTask = null; 2546 try { 2547 Constructor<? extends TimerTask> timerTaskCtor = 2548 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2549 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2550 timerTaskCtor.setAccessible(true); 2551 abortTimeoutTask = timerTaskCtor.newInstance(); 2552 } catch (Exception e) { 2553 LOG.warn("Initialize abort timeout task failed", e); 2554 } 2555 if (abortTimeoutTask != null) { 2556 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2557 } 2558 } 2559 } 2560 2561 /** 2562 * Wait on all threads to finish. Presumption is that all closes and stops 2563 * have already been called. 2564 */ 2565 protected void stopServiceThreads() { 2566 // clean up the scheduled chores 2567 if (this.choreService != null) { 2568 choreService.cancelChore(nonceManagerChore); 2569 choreService.cancelChore(compactionChecker); 2570 choreService.cancelChore(periodicFlusher); 2571 choreService.cancelChore(healthCheckChore); 2572 choreService.cancelChore(storefileRefresher); 2573 choreService.cancelChore(movedRegionsCleaner); 2574 choreService.cancelChore(fsUtilizationChore); 2575 choreService.cancelChore(slowLogTableOpsChore); 2576 // clean up the remaining scheduled chores (in case we missed out any) 2577 choreService.shutdown(); 2578 } 2579 2580 if (this.cacheFlusher != null) { 2581 this.cacheFlusher.join(); 2582 } 2583 2584 if (this.spanReceiverHost != null) { 2585 this.spanReceiverHost.closeReceivers(); 2586 } 2587 if (this.walRoller != null) { 2588 this.walRoller.close(); 2589 } 2590 if (this.compactSplitThread != null) { 2591 this.compactSplitThread.join(); 2592 } 2593 if (this.executorService != null) this.executorService.shutdown(); 2594 if (this.replicationSourceHandler != null && 2595 this.replicationSourceHandler == this.replicationSinkHandler) { 2596 this.replicationSourceHandler.stopReplicationService(); 2597 } else { 2598 if (this.replicationSourceHandler != null) { 2599 this.replicationSourceHandler.stopReplicationService(); 2600 } 2601 if (this.replicationSinkHandler != null) { 2602 this.replicationSinkHandler.stopReplicationService(); 2603 } 2604 } 2605 } 2606 2607 /** 2608 * @return Return the object that implements the replication 2609 * source executorService. 2610 */ 2611 @VisibleForTesting 2612 public ReplicationSourceService getReplicationSourceService() { 2613 return replicationSourceHandler; 2614 } 2615 2616 /** 2617 * @return Return the object that implements the replication 2618 * sink executorService. 2619 */ 2620 ReplicationSinkService getReplicationSinkService() { 2621 return replicationSinkHandler; 2622 } 2623 2624 /** 2625 * Get the current master from ZooKeeper and open the RPC connection to it. 2626 * To get a fresh connection, the current rssStub must be null. 2627 * Method will block until a master is available. You can break from this 2628 * block by requesting the server stop. 2629 * 2630 * @return master + port, or null if server has been stopped 2631 */ 2632 private synchronized ServerName createRegionServerStatusStub() { 2633 // Create RS stub without refreshing the master node from ZK, use cached data 2634 return createRegionServerStatusStub(false); 2635 } 2636 2637 /** 2638 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2639 * connection, the current rssStub must be null. Method will block until a master is available. 2640 * You can break from this block by requesting the server stop. 2641 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2642 * @return master + port, or null if server has been stopped 2643 */ 2644 @VisibleForTesting 2645 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2646 if (rssStub != null) { 2647 return masterAddressTracker.getMasterAddress(); 2648 } 2649 ServerName sn = null; 2650 long previousLogTime = 0; 2651 RegionServerStatusService.BlockingInterface intRssStub = null; 2652 LockService.BlockingInterface intLockStub = null; 2653 boolean interrupted = false; 2654 try { 2655 while (keepLooping()) { 2656 sn = this.masterAddressTracker.getMasterAddress(refresh); 2657 if (sn == null) { 2658 if (!keepLooping()) { 2659 // give up with no connection. 2660 LOG.debug("No master found and cluster is stopped; bailing out"); 2661 return null; 2662 } 2663 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 2664 LOG.debug("No master found; retry"); 2665 previousLogTime = System.currentTimeMillis(); 2666 } 2667 refresh = true; // let's try pull it from ZK directly 2668 if (sleepInterrupted(200)) { 2669 interrupted = true; 2670 } 2671 continue; 2672 } 2673 2674 // If we are on the active master, use the shortcut 2675 if (this instanceof HMaster && sn.equals(getServerName())) { 2676 intRssStub = ((HMaster)this).getMasterRpcServices(); 2677 intLockStub = ((HMaster)this).getMasterRpcServices(); 2678 break; 2679 } 2680 try { 2681 BlockingRpcChannel channel = 2682 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), 2683 shortOperationTimeout); 2684 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2685 intLockStub = LockService.newBlockingStub(channel); 2686 break; 2687 } catch (IOException e) { 2688 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 2689 e = e instanceof RemoteException ? 2690 ((RemoteException)e).unwrapRemoteException() : e; 2691 if (e instanceof ServerNotRunningYetException) { 2692 LOG.info("Master isn't available yet, retrying"); 2693 } else { 2694 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2695 } 2696 previousLogTime = System.currentTimeMillis(); 2697 } 2698 if (sleepInterrupted(200)) { 2699 interrupted = true; 2700 } 2701 } 2702 } 2703 } finally { 2704 if (interrupted) { 2705 Thread.currentThread().interrupt(); 2706 } 2707 } 2708 this.rssStub = intRssStub; 2709 this.lockStub = intLockStub; 2710 return sn; 2711 } 2712 2713 /** 2714 * @return True if we should break loop because cluster is going down or 2715 * this server has been stopped or hdfs has gone bad. 2716 */ 2717 private boolean keepLooping() { 2718 return !this.stopped && isClusterUp(); 2719 } 2720 2721 /* 2722 * Let the master know we're here Run initialization using parameters passed 2723 * us by the master. 2724 * @return A Map of key/value configurations we got from the Master else 2725 * null if we failed to register. 2726 * @throws IOException 2727 */ 2728 private RegionServerStartupResponse reportForDuty() throws IOException { 2729 if (this.masterless) return RegionServerStartupResponse.getDefaultInstance(); 2730 ServerName masterServerName = createRegionServerStatusStub(true); 2731 RegionServerStatusService.BlockingInterface rss = rssStub; 2732 if (masterServerName == null || rss == null) return null; 2733 RegionServerStartupResponse result = null; 2734 try { 2735 rpcServices.requestCount.reset(); 2736 rpcServices.rpcGetRequestCount.reset(); 2737 rpcServices.rpcScanRequestCount.reset(); 2738 rpcServices.rpcMultiRequestCount.reset(); 2739 rpcServices.rpcMutateRequestCount.reset(); 2740 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2741 + rpcServices.isa.getPort() + ", startcode=" + this.startcode); 2742 long now = EnvironmentEdgeManager.currentTime(); 2743 int port = rpcServices.isa.getPort(); 2744 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2745 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2746 request.setUseThisHostnameInstead(useThisHostnameInstead); 2747 } 2748 request.setPort(port); 2749 request.setServerStartCode(this.startcode); 2750 request.setServerCurrentTime(now); 2751 result = rss.regionServerStartup(null, request.build()); 2752 } catch (ServiceException se) { 2753 IOException ioe = ProtobufUtil.getRemoteException(se); 2754 if (ioe instanceof ClockOutOfSyncException) { 2755 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", 2756 ioe); 2757 // Re-throw IOE will cause RS to abort 2758 throw ioe; 2759 } else if (ioe instanceof ServerNotRunningYetException) { 2760 LOG.debug("Master is not running yet"); 2761 } else { 2762 LOG.warn("error telling master we are up", se); 2763 } 2764 rssStub = null; 2765 } 2766 return result; 2767 } 2768 2769 @Override 2770 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2771 try { 2772 GetLastFlushedSequenceIdRequest req = 2773 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2774 RegionServerStatusService.BlockingInterface rss = rssStub; 2775 if (rss == null) { // Try to connect one more time 2776 createRegionServerStatusStub(); 2777 rss = rssStub; 2778 if (rss == null) { 2779 // Still no luck, we tried 2780 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2781 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2782 .build(); 2783 } 2784 } 2785 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2786 return RegionStoreSequenceIds.newBuilder() 2787 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2788 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2789 } catch (ServiceException e) { 2790 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2791 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2792 .build(); 2793 } 2794 } 2795 2796 /** 2797 * Close meta region if we carry it 2798 * @param abort Whether we're running an abort. 2799 */ 2800 private void closeMetaTableRegions(final boolean abort) { 2801 HRegion meta = null; 2802 this.onlineRegionsLock.writeLock().lock(); 2803 try { 2804 for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) { 2805 RegionInfo hri = e.getValue().getRegionInfo(); 2806 if (hri.isMetaRegion()) { 2807 meta = e.getValue(); 2808 } 2809 if (meta != null) break; 2810 } 2811 } finally { 2812 this.onlineRegionsLock.writeLock().unlock(); 2813 } 2814 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2815 } 2816 2817 /** 2818 * Schedule closes on all user regions. 2819 * Should be safe calling multiple times because it wont' close regions 2820 * that are already closed or that are closing. 2821 * @param abort Whether we're running an abort. 2822 */ 2823 private void closeUserRegions(final boolean abort) { 2824 this.onlineRegionsLock.writeLock().lock(); 2825 try { 2826 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 2827 HRegion r = e.getValue(); 2828 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2829 // Don't update zk with this close transition; pass false. 2830 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2831 } 2832 } 2833 } finally { 2834 this.onlineRegionsLock.writeLock().unlock(); 2835 } 2836 } 2837 2838 /** @return the info server */ 2839 public InfoServer getInfoServer() { 2840 return infoServer; 2841 } 2842 2843 /** 2844 * @return true if a stop has been requested. 2845 */ 2846 @Override 2847 public boolean isStopped() { 2848 return this.stopped; 2849 } 2850 2851 @Override 2852 public boolean isStopping() { 2853 return this.stopping; 2854 } 2855 2856 @Override 2857 public Configuration getConfiguration() { 2858 return conf; 2859 } 2860 2861 protected Map<String, HRegion> getOnlineRegions() { 2862 return this.onlineRegions; 2863 } 2864 2865 public int getNumberOfOnlineRegions() { 2866 return this.onlineRegions.size(); 2867 } 2868 2869 /** 2870 * For tests, web ui and metrics. 2871 * This method will only work if HRegionServer is in the same JVM as client; 2872 * HRegion cannot be serialized to cross an rpc. 2873 */ 2874 public Collection<HRegion> getOnlineRegionsLocalContext() { 2875 Collection<HRegion> regions = this.onlineRegions.values(); 2876 return Collections.unmodifiableCollection(regions); 2877 } 2878 2879 @Override 2880 public void addRegion(HRegion region) { 2881 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2882 configurationManager.registerObserver(region); 2883 } 2884 2885 /** 2886 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2887 * the biggest. If two regions are the same size, then the last one found wins; i.e. this 2888 * method may NOT return all regions. 2889 */ 2890 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2891 // we'll sort the regions in reverse 2892 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2893 // Copy over all regions. Regions are sorted by size with biggest first. 2894 for (HRegion region : this.onlineRegions.values()) { 2895 sortedRegions.put(region.getMemStoreOffHeapSize(), region); 2896 } 2897 return sortedRegions; 2898 } 2899 2900 /** 2901 * @return A new Map of online regions sorted by region heap size with the first entry being the 2902 * biggest. If two regions are the same size, then the last one found wins; i.e. this method 2903 * may NOT return all regions. 2904 */ 2905 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2906 // we'll sort the regions in reverse 2907 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2908 // Copy over all regions. Regions are sorted by size with biggest first. 2909 for (HRegion region : this.onlineRegions.values()) { 2910 sortedRegions.put(region.getMemStoreHeapSize(), region); 2911 } 2912 return sortedRegions; 2913 } 2914 2915 /** 2916 * @return time stamp in millis of when this region server was started 2917 */ 2918 public long getStartcode() { 2919 return this.startcode; 2920 } 2921 2922 /** @return reference to FlushRequester */ 2923 @Override 2924 public FlushRequester getFlushRequester() { 2925 return this.cacheFlusher; 2926 } 2927 2928 @Override 2929 public CompactionRequester getCompactionRequestor() { 2930 return this.compactSplitThread; 2931 } 2932 2933 @Override 2934 public LeaseManager getLeaseManager() { 2935 return leaseManager; 2936 } 2937 2938 /** 2939 * @return Return the rootDir. 2940 */ 2941 protected Path getDataRootDir() { 2942 return dataRootDir; 2943 } 2944 2945 @Override 2946 public FileSystem getFileSystem() { 2947 return dataFs; 2948 } 2949 2950 /** 2951 * @return {@code true} when the data file system is available, {@code false} otherwise. 2952 */ 2953 boolean isDataFileSystemOk() { 2954 return this.dataFsOk; 2955 } 2956 2957 /** 2958 * @return Return the walRootDir. 2959 */ 2960 public Path getWALRootDir() { 2961 return walRootDir; 2962 } 2963 2964 /** 2965 * @return Return the walFs. 2966 */ 2967 public FileSystem getWALFileSystem() { 2968 return walFs; 2969 } 2970 2971 @Override 2972 public String toString() { 2973 return getServerName().toString(); 2974 } 2975 2976 @Override 2977 public ZKWatcher getZooKeeper() { 2978 return zooKeeper; 2979 } 2980 2981 @Override 2982 public CoordinatedStateManager getCoordinatedStateManager() { 2983 return csm; 2984 } 2985 2986 @Override 2987 public ServerName getServerName() { 2988 return serverName; 2989 } 2990 2991 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){ 2992 return this.rsHost; 2993 } 2994 2995 @Override 2996 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2997 return this.regionsInTransitionInRS; 2998 } 2999 3000 @Override 3001 public ExecutorService getExecutorService() { 3002 return executorService; 3003 } 3004 3005 @Override 3006 public ChoreService getChoreService() { 3007 return choreService; 3008 } 3009 3010 @Override 3011 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 3012 return rsQuotaManager; 3013 } 3014 3015 // 3016 // Main program and support routines 3017 // 3018 /** 3019 * Load the replication executorService objects, if any 3020 */ 3021 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 3022 FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { 3023 if ((server instanceof HMaster) && 3024 (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { 3025 return; 3026 } 3027 3028 // read in the name of the source replication class from the config file. 3029 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 3030 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 3031 3032 // read in the name of the sink replication class from the config file. 3033 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 3034 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 3035 3036 // If both the sink and the source class names are the same, then instantiate 3037 // only one object. 3038 if (sourceClassname.equals(sinkClassname)) { 3039 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3040 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 3041 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 3042 } else { 3043 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3044 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 3045 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 3046 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 3047 } 3048 } 3049 3050 private static <T extends ReplicationService> T newReplicationInstance(String classname, 3051 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 3052 Path oldLogDir, WALProvider walProvider) throws IOException { 3053 final Class<? extends T> clazz; 3054 try { 3055 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 3056 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 3057 } catch (java.lang.ClassNotFoundException nfe) { 3058 throw new IOException("Could not find class for " + classname); 3059 } 3060 T service = ReflectionUtils.newInstance(clazz, conf); 3061 service.initialize(server, walFs, logDir, oldLogDir, walProvider); 3062 return service; 3063 } 3064 3065 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){ 3066 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 3067 if(!this.isOnline()){ 3068 return walGroupsReplicationStatus; 3069 } 3070 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 3071 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 3072 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 3073 for(ReplicationSourceInterface source: allSources){ 3074 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 3075 } 3076 return walGroupsReplicationStatus; 3077 } 3078 3079 /** 3080 * Utility for constructing an instance of the passed HRegionServer class. 3081 */ 3082 static HRegionServer constructRegionServer( 3083 final Class<? extends HRegionServer> regionServerClass, 3084 final Configuration conf 3085 ) { 3086 try { 3087 Constructor<? extends HRegionServer> c = 3088 regionServerClass.getConstructor(Configuration.class); 3089 return c.newInstance(conf); 3090 } catch (Exception e) { 3091 throw new RuntimeException("Failed construction of " + "Regionserver: " 3092 + regionServerClass.toString(), e); 3093 } 3094 } 3095 3096 /** 3097 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 3098 */ 3099 public static void main(String[] args) { 3100 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 3101 VersionInfo.logVersion(); 3102 Configuration conf = HBaseConfiguration.create(); 3103 @SuppressWarnings("unchecked") 3104 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 3105 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 3106 3107 new HRegionServerCommandLine(regionServerClass).doMain(args); 3108 } 3109 3110 /** 3111 * Gets the online regions of the specified table. 3112 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>. 3113 * Only returns <em>online</em> regions. If a region on this table has been 3114 * closed during a disable, etc., it will not be included in the returned list. 3115 * So, the returned list may not necessarily be ALL regions in this table, its 3116 * all the ONLINE regions in the table. 3117 * @param tableName table to limit the scope of the query 3118 * @return Online regions from <code>tableName</code> 3119 */ 3120 @Override 3121 public List<HRegion> getRegions(TableName tableName) { 3122 List<HRegion> tableRegions = new ArrayList<>(); 3123 synchronized (this.onlineRegions) { 3124 for (HRegion region: this.onlineRegions.values()) { 3125 RegionInfo regionInfo = region.getRegionInfo(); 3126 if(regionInfo.getTable().equals(tableName)) { 3127 tableRegions.add(region); 3128 } 3129 } 3130 } 3131 return tableRegions; 3132 } 3133 3134 @Override 3135 public List<HRegion> getRegions() { 3136 List<HRegion> allRegions; 3137 synchronized (this.onlineRegions) { 3138 // Return a clone copy of the onlineRegions 3139 allRegions = new ArrayList<>(onlineRegions.values()); 3140 } 3141 return allRegions; 3142 } 3143 3144 /** 3145 * Gets the online tables in this RS. 3146 * This method looks at the in-memory onlineRegions. 3147 * @return all the online tables in this RS 3148 */ 3149 public Set<TableName> getOnlineTables() { 3150 Set<TableName> tables = new HashSet<>(); 3151 synchronized (this.onlineRegions) { 3152 for (Region region: this.onlineRegions.values()) { 3153 tables.add(region.getTableDescriptor().getTableName()); 3154 } 3155 } 3156 return tables; 3157 } 3158 3159 public String[] getRegionServerCoprocessors() { 3160 TreeSet<String> coprocessors = new TreeSet<>(); 3161 try { 3162 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3163 } catch (IOException exception) { 3164 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " + 3165 "skipping."); 3166 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3167 } 3168 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3169 for (HRegion region: regions) { 3170 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3171 try { 3172 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3173 } catch (IOException exception) { 3174 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region + 3175 "; skipping."); 3176 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3177 } 3178 } 3179 coprocessors.addAll(rsHost.getCoprocessors()); 3180 return coprocessors.toArray(new String[0]); 3181 } 3182 3183 /** 3184 * Try to close the region, logs a warning on failure but continues. 3185 * @param region Region to close 3186 */ 3187 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3188 try { 3189 if (!closeRegion(region.getEncodedName(), abort, null)) { 3190 LOG.warn("Failed to close " + region.getRegionNameAsString() + 3191 " - ignoring and continuing"); 3192 } 3193 } catch (IOException e) { 3194 LOG.warn("Failed to close " + region.getRegionNameAsString() + 3195 " - ignoring and continuing", e); 3196 } 3197 } 3198 3199 /** 3200 * Close asynchronously a region, can be called from the master or internally by the regionserver 3201 * when stopping. If called from the master, the region will update the status. 3202 * 3203 * <p> 3204 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3205 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3206 * </p> 3207 3208 * <p> 3209 * If a close was in progress, this new request will be ignored, and an exception thrown. 3210 * </p> 3211 * 3212 * @param encodedName Region to close 3213 * @param abort True if we are aborting 3214 * @param destination Where the Region is being moved too... maybe null if unknown. 3215 * @return True if closed a region. 3216 * @throws NotServingRegionException if the region is not online 3217 */ 3218 protected boolean closeRegion(String encodedName, final boolean abort, 3219 final ServerName destination) 3220 throws NotServingRegionException { 3221 //Check for permissions to close. 3222 HRegion actualRegion = this.getRegion(encodedName); 3223 // Can be null if we're calling close on a region that's not online 3224 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3225 try { 3226 actualRegion.getCoprocessorHost().preClose(false); 3227 } catch (IOException exp) { 3228 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3229 return false; 3230 } 3231 } 3232 3233 // previous can come back 'null' if not in map. 3234 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), 3235 Boolean.FALSE); 3236 3237 if (Boolean.TRUE.equals(previous)) { 3238 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " + 3239 "trying to OPEN. Cancelling OPENING."); 3240 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3241 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3242 // We're going to try to do a standard close then. 3243 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." + 3244 " Doing a standard close now"); 3245 return closeRegion(encodedName, abort, destination); 3246 } 3247 // Let's get the region from the online region list again 3248 actualRegion = this.getRegion(encodedName); 3249 if (actualRegion == null) { // If already online, we still need to close it. 3250 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3251 // The master deletes the znode when it receives this exception. 3252 throw new NotServingRegionException("The region " + encodedName + 3253 " was opening but not yet served. Opening is cancelled."); 3254 } 3255 } else if (previous == null) { 3256 LOG.info("Received CLOSE for {}", encodedName); 3257 } else if (Boolean.FALSE.equals(previous)) { 3258 LOG.info("Received CLOSE for the region: " + encodedName + 3259 ", which we are already trying to CLOSE, but not completed yet"); 3260 return true; 3261 } 3262 3263 if (actualRegion == null) { 3264 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3265 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3266 // The master deletes the znode when it receives this exception. 3267 throw new NotServingRegionException("The region " + encodedName + 3268 " is not online, and is not opening."); 3269 } 3270 3271 CloseRegionHandler crh; 3272 final RegionInfo hri = actualRegion.getRegionInfo(); 3273 if (hri.isMetaRegion()) { 3274 crh = new CloseMetaHandler(this, this, hri, abort); 3275 } else { 3276 crh = new CloseRegionHandler(this, this, hri, abort, destination); 3277 } 3278 this.executorService.submit(crh); 3279 return true; 3280 } 3281 3282 /** 3283 * @return HRegion for the passed binary <code>regionName</code> or null if 3284 * named region is not member of the online regions. 3285 */ 3286 public HRegion getOnlineRegion(final byte[] regionName) { 3287 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3288 return this.onlineRegions.get(encodedRegionName); 3289 } 3290 3291 @Override 3292 public HRegion getRegion(final String encodedRegionName) { 3293 return this.onlineRegions.get(encodedRegionName); 3294 } 3295 3296 3297 @Override 3298 public boolean removeRegion(final HRegion r, ServerName destination) { 3299 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3300 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3301 if (destination != null) { 3302 long closeSeqNum = r.getMaxFlushedSeqId(); 3303 if (closeSeqNum == HConstants.NO_SEQNUM) { 3304 // No edits in WAL for this region; get the sequence number when the region was opened. 3305 closeSeqNum = r.getOpenSeqNum(); 3306 if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0; 3307 } 3308 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3309 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3310 if (selfMove) { 3311 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(r.getRegionInfo().getEncodedName() 3312 , new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3313 } 3314 } 3315 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3316 return toReturn != null; 3317 } 3318 3319 /** 3320 * Protected Utility method for safely obtaining an HRegion handle. 3321 * 3322 * @param regionName Name of online {@link HRegion} to return 3323 * @return {@link HRegion} for <code>regionName</code> 3324 */ 3325 protected HRegion getRegion(final byte[] regionName) 3326 throws NotServingRegionException { 3327 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3328 return getRegionByEncodedName(regionName, encodedRegionName); 3329 } 3330 3331 public HRegion getRegionByEncodedName(String encodedRegionName) 3332 throws NotServingRegionException { 3333 return getRegionByEncodedName(null, encodedRegionName); 3334 } 3335 3336 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3337 throws NotServingRegionException { 3338 HRegion region = this.onlineRegions.get(encodedRegionName); 3339 if (region == null) { 3340 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3341 if (moveInfo != null) { 3342 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3343 } 3344 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3345 String regionNameStr = regionName == null? 3346 encodedRegionName: Bytes.toStringBinary(regionName); 3347 if (isOpening != null && isOpening) { 3348 throw new RegionOpeningException("Region " + regionNameStr + 3349 " is opening on " + this.serverName); 3350 } 3351 throw new NotServingRegionException("" + regionNameStr + 3352 " is not online on " + this.serverName); 3353 } 3354 return region; 3355 } 3356 3357 /** 3358 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to 3359 * IOE if it isn't already. 3360 * 3361 * @param t Throwable 3362 * @param msg Message to log in error. Can be null. 3363 * @return Throwable converted to an IOE; methods can only let out IOEs. 3364 */ 3365 private Throwable cleanup(final Throwable t, final String msg) { 3366 // Don't log as error if NSRE; NSRE is 'normal' operation. 3367 if (t instanceof NotServingRegionException) { 3368 LOG.debug("NotServingRegionException; " + t.getMessage()); 3369 return t; 3370 } 3371 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3372 if (msg == null) { 3373 LOG.error("", e); 3374 } else { 3375 LOG.error(msg, e); 3376 } 3377 if (!rpcServices.checkOOME(t)) { 3378 checkFileSystem(); 3379 } 3380 return t; 3381 } 3382 3383 /** 3384 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3385 * @return Make <code>t</code> an IOE if it isn't already. 3386 */ 3387 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3388 return (t instanceof IOException ? (IOException) t : msg == null 3389 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t)); 3390 } 3391 3392 /** 3393 * Checks to see if the file system is still accessible. If not, sets 3394 * abortRequested and stopRequested 3395 * 3396 * @return false if file system is not available 3397 */ 3398 boolean checkFileSystem() { 3399 if (this.dataFsOk && this.dataFs != null) { 3400 try { 3401 FSUtils.checkFileSystemAvailable(this.dataFs); 3402 } catch (IOException e) { 3403 abort("File System not available", e); 3404 this.dataFsOk = false; 3405 } 3406 } 3407 return this.dataFsOk; 3408 } 3409 3410 @Override 3411 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3412 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3413 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()]; 3414 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3415 // it is a map of region name to InetSocketAddress[] 3416 for (int i = 0; i < favoredNodes.size(); i++) { 3417 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(), 3418 favoredNodes.get(i).getPort()); 3419 } 3420 regionFavoredNodesMap.put(encodedRegionName, addr); 3421 } 3422 3423 /** 3424 * Return the favored nodes for a region given its encoded name. Look at the 3425 * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[] 3426 * 3427 * @return array of favored locations 3428 */ 3429 @Override 3430 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3431 return regionFavoredNodesMap.get(encodedRegionName); 3432 } 3433 3434 @Override 3435 public ServerNonceManager getNonceManager() { 3436 return this.nonceManager; 3437 } 3438 3439 private static class MovedRegionInfo { 3440 private final ServerName serverName; 3441 private final long seqNum; 3442 private final long moveTime; 3443 3444 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3445 this.serverName = serverName; 3446 this.seqNum = closeSeqNum; 3447 this.moveTime = EnvironmentEdgeManager.currentTime(); 3448 } 3449 3450 public ServerName getServerName() { 3451 return serverName; 3452 } 3453 3454 public long getSeqNum() { 3455 return seqNum; 3456 } 3457 3458 long getMoveTime() { 3459 return moveTime; 3460 } 3461 } 3462 3463 /** 3464 * This map will contains all the regions that we closed for a move. 3465 * We add the time it was moved as we don't want to keep too old information 3466 */ 3467 private Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000); 3468 3469 /** 3470 * We need a timeout. If not there is a risk of giving a wrong information: this would double 3471 * the number of network calls instead of reducing them. 3472 */ 3473 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3474 3475 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, boolean selfMove) { 3476 if (selfMove) { 3477 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3478 return; 3479 } 3480 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" + 3481 closeSeqNum); 3482 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3483 } 3484 3485 void removeFromMovedRegions(String encodedName) { 3486 movedRegions.remove(encodedName); 3487 } 3488 3489 private MovedRegionInfo getMovedRegion(final String encodedRegionName) { 3490 MovedRegionInfo dest = movedRegions.get(encodedRegionName); 3491 3492 long now = EnvironmentEdgeManager.currentTime(); 3493 if (dest != null) { 3494 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) { 3495 return dest; 3496 } else { 3497 movedRegions.remove(encodedRegionName); 3498 } 3499 } 3500 3501 return null; 3502 } 3503 3504 /** 3505 * Remove the expired entries from the moved regions list. 3506 */ 3507 protected void cleanMovedRegions() { 3508 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED; 3509 3510 movedRegions.entrySet().removeIf(e -> e.getValue().getMoveTime() < cutOff); 3511 } 3512 3513 /* 3514 * Use this to allow tests to override and schedule more frequently. 3515 */ 3516 3517 protected int movedRegionCleanerPeriod() { 3518 return TIMEOUT_REGION_MOVED; 3519 } 3520 3521 /** 3522 * Creates a Chore thread to clean the moved region cache. 3523 */ 3524 protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable { 3525 private HRegionServer regionServer; 3526 Stoppable stoppable; 3527 3528 private MovedRegionsCleaner( 3529 HRegionServer regionServer, Stoppable stoppable){ 3530 super("MovedRegionsCleaner for region " + regionServer, stoppable, 3531 regionServer.movedRegionCleanerPeriod()); 3532 this.regionServer = regionServer; 3533 this.stoppable = stoppable; 3534 } 3535 3536 static MovedRegionsCleaner create(HRegionServer rs){ 3537 Stoppable stoppable = new Stoppable() { 3538 private volatile boolean isStopped = false; 3539 @Override public void stop(String why) { isStopped = true;} 3540 @Override public boolean isStopped() {return isStopped;} 3541 }; 3542 3543 return new MovedRegionsCleaner(rs, stoppable); 3544 } 3545 3546 @Override 3547 protected void chore() { 3548 regionServer.cleanMovedRegions(); 3549 } 3550 3551 @Override 3552 public void stop(String why) { 3553 stoppable.stop(why); 3554 } 3555 3556 @Override 3557 public boolean isStopped() { 3558 return stoppable.isStopped(); 3559 } 3560 } 3561 3562 private String getMyEphemeralNodePath() { 3563 return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString()); 3564 } 3565 3566 private boolean isHealthCheckerConfigured() { 3567 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3568 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3569 } 3570 3571 /** 3572 * @return the underlying {@link CompactSplit} for the servers 3573 */ 3574 public CompactSplit getCompactSplitThread() { 3575 return this.compactSplitThread; 3576 } 3577 3578 CoprocessorServiceResponse execRegionServerService( 3579 @SuppressWarnings("UnusedParameters") final RpcController controller, 3580 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3581 try { 3582 ServerRpcController serviceController = new ServerRpcController(); 3583 CoprocessorServiceCall call = serviceRequest.getCall(); 3584 String serviceName = call.getServiceName(); 3585 com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName); 3586 if (service == null) { 3587 throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " + 3588 serviceName); 3589 } 3590 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 3591 service.getDescriptorForType(); 3592 3593 String methodName = call.getMethodName(); 3594 com.google.protobuf.Descriptors.MethodDescriptor methodDesc = 3595 serviceDesc.findMethodByName(methodName); 3596 if (methodDesc == null) { 3597 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName + 3598 " called on executorService " + serviceName); 3599 } 3600 3601 com.google.protobuf.Message request = 3602 CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3603 final com.google.protobuf.Message.Builder responseBuilder = 3604 service.getResponsePrototype(methodDesc).newBuilderForType(); 3605 service.callMethod(methodDesc, serviceController, request, message -> { 3606 if (message != null) { 3607 responseBuilder.mergeFrom(message); 3608 } 3609 }); 3610 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3611 if (exception != null) { 3612 throw exception; 3613 } 3614 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3615 } catch (IOException ie) { 3616 throw new ServiceException(ie); 3617 } 3618 } 3619 3620 /** 3621 * May be null if this is a master which not carry table. 3622 * 3623 * @return The block cache instance used by the regionserver. 3624 */ 3625 @Override 3626 public Optional<BlockCache> getBlockCache() { 3627 return Optional.ofNullable(this.blockCache); 3628 } 3629 3630 /** 3631 * May be null if this is a master which not carry table. 3632 * 3633 * @return The cache for mob files used by the regionserver. 3634 */ 3635 @Override 3636 public Optional<MobFileCache> getMobFileCache() { 3637 return Optional.ofNullable(this.mobFileCache); 3638 } 3639 3640 @Override 3641 public AccessChecker getAccessChecker() { 3642 return rpcServices.getAccessChecker(); 3643 } 3644 3645 @Override 3646 public ZKPermissionWatcher getZKPermissionWatcher() { 3647 return rpcServices.getZkPermissionWatcher(); 3648 } 3649 3650 /** 3651 * @return : Returns the ConfigurationManager object for testing purposes. 3652 */ 3653 @VisibleForTesting 3654 ConfigurationManager getConfigurationManager() { 3655 return configurationManager; 3656 } 3657 3658 /** 3659 * @return Return table descriptors implementation. 3660 */ 3661 @Override 3662 public TableDescriptors getTableDescriptors() { 3663 return this.tableDescriptors; 3664 } 3665 3666 /** 3667 * Reload the configuration from disk. 3668 */ 3669 void updateConfiguration() { 3670 LOG.info("Reloading the configuration from disk."); 3671 // Reload the configuration from disk. 3672 conf.reloadConfiguration(); 3673 configurationManager.notifyAllObservers(conf); 3674 } 3675 3676 CacheEvictionStats clearRegionBlockCache(Region region) { 3677 long evictedBlocks = 0; 3678 3679 for(Store store : region.getStores()) { 3680 for(StoreFile hFile : store.getStorefiles()) { 3681 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3682 } 3683 } 3684 3685 return CacheEvictionStats.builder() 3686 .withEvictedBlocks(evictedBlocks) 3687 .build(); 3688 } 3689 3690 @Override 3691 public double getCompactionPressure() { 3692 double max = 0; 3693 for (Region region : onlineRegions.values()) { 3694 for (Store store : region.getStores()) { 3695 double normCount = store.getCompactionPressure(); 3696 if (normCount > max) { 3697 max = normCount; 3698 } 3699 } 3700 } 3701 return max; 3702 } 3703 3704 @Override 3705 public HeapMemoryManager getHeapMemoryManager() { 3706 return hMemManager; 3707 } 3708 3709 MemStoreFlusher getMemStoreFlusher() { 3710 return cacheFlusher; 3711 } 3712 3713 /** 3714 * For testing 3715 * @return whether all wal roll request finished for this regionserver 3716 */ 3717 @VisibleForTesting 3718 public boolean walRollRequestFinished() { 3719 return this.walRoller.walRollFinished(); 3720 } 3721 3722 @Override 3723 public ThroughputController getFlushThroughputController() { 3724 return flushThroughputController; 3725 } 3726 3727 @Override 3728 public double getFlushPressure() { 3729 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3730 // return 0 during RS initialization 3731 return 0.0; 3732 } 3733 return getRegionServerAccounting().getFlushPressure(); 3734 } 3735 3736 @Override 3737 public void onConfigurationChange(Configuration newConf) { 3738 ThroughputController old = this.flushThroughputController; 3739 if (old != null) { 3740 old.stop("configuration change"); 3741 } 3742 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3743 try { 3744 Superusers.initialize(newConf); 3745 } catch (IOException e) { 3746 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3747 } 3748 } 3749 3750 @Override 3751 public MetricsRegionServer getMetrics() { 3752 return metricsRegionServer; 3753 } 3754 3755 @Override 3756 public SecureBulkLoadManager getSecureBulkLoadManager() { 3757 return this.secureBulkLoadManager; 3758 } 3759 3760 @Override 3761 public EntityLock regionLock(final List<RegionInfo> regionInfos, final String description, 3762 final Abortable abort) { 3763 return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) 3764 .regionLock(regionInfos, description, abort); 3765 } 3766 3767 @Override 3768 public void unassign(byte[] regionName) throws IOException { 3769 clusterConnection.getAdmin().unassign(regionName, false); 3770 } 3771 3772 @Override 3773 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3774 return this.rsSpaceQuotaManager; 3775 } 3776 3777 @Override 3778 public boolean reportFileArchivalForQuotas(TableName tableName, 3779 Collection<Entry<String, Long>> archivedFiles) { 3780 RegionServerStatusService.BlockingInterface rss = rssStub; 3781 if (rss == null || rsSpaceQuotaManager == null) { 3782 // the current server could be stopping. 3783 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3784 return false; 3785 } 3786 try { 3787 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3788 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3789 rss.reportFileArchival(null, request); 3790 } catch (ServiceException se) { 3791 IOException ioe = ProtobufUtil.getRemoteException(se); 3792 if (ioe instanceof PleaseHoldException) { 3793 if (LOG.isTraceEnabled()) { 3794 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3795 + " This will be retried.", ioe); 3796 } 3797 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3798 return false; 3799 } 3800 if (rssStub == rss) { 3801 rssStub = null; 3802 } 3803 // re-create the stub if we failed to report the archival 3804 createRegionServerStatusStub(true); 3805 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3806 return false; 3807 } 3808 return true; 3809 } 3810 3811 public NettyEventLoopGroupConfig getEventLoopGroupConfig() { 3812 return eventLoopGroupConfig; 3813 } 3814 3815 @Override 3816 public Connection createConnection(Configuration conf) throws IOException { 3817 User user = UserProvider.instantiate(conf).getCurrent(); 3818 return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, 3819 this.rpcServices, this.rpcServices); 3820 } 3821 3822 void executeProcedure(long procId, RSProcedureCallable callable) { 3823 executorService.submit(new RSProcedureHandler(this, procId, callable)); 3824 } 3825 3826 public void remoteProcedureComplete(long procId, Throwable error) { 3827 procedureResultReporter.complete(procId, error); 3828 } 3829 3830 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3831 RegionServerStatusService.BlockingInterface rss; 3832 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 3833 for (;;) { 3834 rss = rssStub; 3835 if (rss != null) { 3836 break; 3837 } 3838 createRegionServerStatusStub(); 3839 } 3840 try { 3841 rss.reportProcedureDone(null, request); 3842 } catch (ServiceException se) { 3843 if (rssStub == rss) { 3844 rssStub = null; 3845 } 3846 throw ProtobufUtil.getRemoteException(se); 3847 } 3848 } 3849 3850 /** 3851 * Will ignore the open/close region procedures which already submitted or executed. 3852 * 3853 * When master had unfinished open/close region procedure and restarted, new active master may 3854 * send duplicate open/close region request to regionserver. The open/close request is submitted 3855 * to a thread pool and execute. So first need a cache for submitted open/close region procedures. 3856 * 3857 * After the open/close region request executed and report region transition succeed, cache it in 3858 * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3859 * transition succeed, master will not send the open/close region request to regionserver again. 3860 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3861 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. 3862 * 3863 * See HBASE-22404 for more details. 3864 * 3865 * @param procId the id of the open/close region procedure 3866 * @return true if the procedure can be submitted. 3867 */ 3868 boolean submitRegionProcedure(long procId) { 3869 if (procId == -1) { 3870 return true; 3871 } 3872 // Ignore the region procedures which already submitted. 3873 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3874 if (previous != null) { 3875 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3876 return false; 3877 } 3878 // Ignore the region procedures which already executed. 3879 if (executedRegionProcedures.getIfPresent(procId) != null) { 3880 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3881 return false; 3882 } 3883 return true; 3884 } 3885 3886 /** 3887 * See {@link #submitRegionProcedure(long)}. 3888 * @param procId the id of the open/close region procedure 3889 */ 3890 public void finishRegionProcedure(long procId) { 3891 executedRegionProcedures.put(procId, procId); 3892 submittedRegionProcedures.remove(procId); 3893 } 3894 3895 public boolean isShutDown() { 3896 return shutDown; 3897 } 3898 3899 /** 3900 * Force to terminate region server when abort timeout. 3901 */ 3902 private static class SystemExitWhenAbortTimeout extends TimerTask { 3903 3904 public SystemExitWhenAbortTimeout() { 3905 } 3906 3907 @Override 3908 public void run() { 3909 LOG.warn("Aborting region server timed out, terminating forcibly" + 3910 " and does not wait for any running shutdown hooks or finalizers to finish their work." + 3911 " Thread dump to stdout."); 3912 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3913 Runtime.getRuntime().halt(1); 3914 } 3915 } 3916 3917 @VisibleForTesting 3918 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 3919 return compactedFileDischarger; 3920 } 3921}