001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.Thread.UncaughtExceptionHandler; 022import java.lang.management.MemoryType; 023import java.lang.management.MemoryUsage; 024import java.lang.reflect.Constructor; 025import java.net.BindException; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.time.Duration; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.Comparator; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.Objects; 039import java.util.Set; 040import java.util.SortedMap; 041import java.util.Timer; 042import java.util.TimerTask; 043import java.util.TreeMap; 044import java.util.TreeSet; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ConcurrentMap; 047import java.util.concurrent.ConcurrentSkipListMap; 048import java.util.concurrent.atomic.AtomicBoolean; 049import java.util.concurrent.locks.ReentrantReadWriteLock; 050import java.util.function.Function; 051import javax.management.MalformedObjectNameException; 052import javax.servlet.http.HttpServlet; 053import org.apache.commons.lang3.RandomUtils; 054import org.apache.commons.lang3.StringUtils; 055import org.apache.commons.lang3.SystemUtils; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.hbase.Abortable; 060import org.apache.hadoop.hbase.CacheEvictionStats; 061import org.apache.hadoop.hbase.CallQueueTooBigException; 062import org.apache.hadoop.hbase.ChoreService; 063import org.apache.hadoop.hbase.ClockOutOfSyncException; 064import org.apache.hadoop.hbase.CoordinatedStateManager; 065import org.apache.hadoop.hbase.DoNotRetryIOException; 066import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException; 067import org.apache.hadoop.hbase.HBaseConfiguration; 068import org.apache.hadoop.hbase.HBaseInterfaceAudience; 069import org.apache.hadoop.hbase.HConstants; 070import org.apache.hadoop.hbase.HealthCheckChore; 071import org.apache.hadoop.hbase.MetaTableAccessor; 072import org.apache.hadoop.hbase.NotServingRegionException; 073import org.apache.hadoop.hbase.PleaseHoldException; 074import org.apache.hadoop.hbase.ScheduledChore; 075import org.apache.hadoop.hbase.ServerName; 076import org.apache.hadoop.hbase.Stoppable; 077import org.apache.hadoop.hbase.TableDescriptors; 078import org.apache.hadoop.hbase.TableName; 079import org.apache.hadoop.hbase.YouAreDeadException; 080import org.apache.hadoop.hbase.ZNodeClearer; 081import org.apache.hadoop.hbase.client.ClusterConnection; 082import org.apache.hadoop.hbase.client.Connection; 083import org.apache.hadoop.hbase.client.ConnectionUtils; 084import org.apache.hadoop.hbase.client.RegionInfo; 085import org.apache.hadoop.hbase.client.RegionInfoBuilder; 086import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 087import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 088import org.apache.hadoop.hbase.client.locking.EntityLock; 089import org.apache.hadoop.hbase.client.locking.LockServiceClient; 090import org.apache.hadoop.hbase.conf.ConfigurationManager; 091import org.apache.hadoop.hbase.conf.ConfigurationObserver; 092import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 093import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 094import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 095import org.apache.hadoop.hbase.exceptions.RegionMovedException; 096import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 097import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 098import org.apache.hadoop.hbase.executor.ExecutorService; 099import org.apache.hadoop.hbase.executor.ExecutorType; 100import org.apache.hadoop.hbase.fs.HFileSystem; 101import org.apache.hadoop.hbase.http.InfoServer; 102import org.apache.hadoop.hbase.io.hfile.BlockCache; 103import org.apache.hadoop.hbase.io.hfile.CacheConfig; 104import org.apache.hadoop.hbase.io.hfile.HFile; 105import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 106import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 107import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; 108import org.apache.hadoop.hbase.ipc.RpcClient; 109import org.apache.hadoop.hbase.ipc.RpcClientFactory; 110import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 111import org.apache.hadoop.hbase.ipc.RpcServer; 112import org.apache.hadoop.hbase.ipc.RpcServerInterface; 113import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 114import org.apache.hadoop.hbase.ipc.ServerRpcController; 115import org.apache.hadoop.hbase.log.HBaseMarkers; 116import org.apache.hadoop.hbase.master.HMaster; 117import org.apache.hadoop.hbase.master.LoadBalancer; 118import org.apache.hadoop.hbase.master.RegionState.State; 119import org.apache.hadoop.hbase.mob.MobCacheConfig; 120import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 121import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 122import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 123import org.apache.hadoop.hbase.quotas.QuotaUtil; 124import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 125import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 126import org.apache.hadoop.hbase.quotas.RegionSize; 127import org.apache.hadoop.hbase.quotas.RegionSizeStore; 128import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 129import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 130import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 131import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 132import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 133import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 134import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 135import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 136import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 137import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 138import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 139import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 140import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 141import org.apache.hadoop.hbase.security.Superusers; 142import org.apache.hadoop.hbase.security.User; 143import org.apache.hadoop.hbase.security.UserProvider; 144import org.apache.hadoop.hbase.trace.SpanReceiverHost; 145import org.apache.hadoop.hbase.trace.TraceUtil; 146import org.apache.hadoop.hbase.util.Addressing; 147import org.apache.hadoop.hbase.util.Bytes; 148import org.apache.hadoop.hbase.util.CompressionTest; 149import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 150import org.apache.hadoop.hbase.util.FSTableDescriptors; 151import org.apache.hadoop.hbase.util.FSUtils; 152import org.apache.hadoop.hbase.util.HasThread; 153import org.apache.hadoop.hbase.util.JvmPauseMonitor; 154import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 155import org.apache.hadoop.hbase.util.Pair; 156import org.apache.hadoop.hbase.util.RetryCounter; 157import org.apache.hadoop.hbase.util.RetryCounterFactory; 158import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 159import org.apache.hadoop.hbase.util.Sleeper; 160import org.apache.hadoop.hbase.util.Threads; 161import org.apache.hadoop.hbase.util.VersionInfo; 162import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 163import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; 164import org.apache.hadoop.hbase.wal.WAL; 165import org.apache.hadoop.hbase.wal.WALFactory; 166import org.apache.hadoop.hbase.wal.WALProvider; 167import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; 168import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 169import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 170import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 171import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 172import org.apache.hadoop.hbase.zookeeper.ZKUtil; 173import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 174import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 175import org.apache.hadoop.ipc.RemoteException; 176import org.apache.hadoop.util.ReflectionUtils; 177import org.apache.yetus.audience.InterfaceAudience; 178import org.apache.zookeeper.KeeperException; 179import org.slf4j.Logger; 180import org.slf4j.LoggerFactory; 181import sun.misc.Signal; 182import sun.misc.SignalHandler; 183 184import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 185import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 186import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 187import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 188import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 189import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 190import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 191import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 192import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 193 194import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 195import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 224 225/** 226 * HRegionServer makes a set of HRegions available to clients. It checks in with 227 * the HMaster. There are many HRegionServers in a single HBase deployment. 228 */ 229@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 230@SuppressWarnings({ "deprecation"}) 231public class HRegionServer extends HasThread implements 232 RegionServerServices, LastSequenceId, ConfigurationObserver { 233 // Time to pause if master says 'please hold'. Make configurable if needed. 234 private static final int INIT_PAUSE_TIME_MS = 1000; 235 236 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 237 238 /** 239 * For testing only! Set to true to skip notifying region assignment to master . 240 */ 241 @VisibleForTesting 242 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") 243 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 244 245 //RegionName vs current action in progress 246 //true - if open region action in progress 247 //false - if close region action in progress 248 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 249 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 250 251 // Cache flushing 252 protected MemStoreFlusher cacheFlusher; 253 254 protected HeapMemoryManager hMemManager; 255 256 /** 257 * Cluster connection to be shared by services. 258 * Initialized at server startup and closed when server shuts down. 259 * Clients must never close it explicitly. 260 * Clients hosted by this Server should make use of this clusterConnection rather than create 261 * their own; if they create their own, there is no way for the hosting server to shutdown 262 * ongoing client RPCs. 263 */ 264 protected ClusterConnection clusterConnection; 265 266 /* 267 * Long-living meta table locator, which is created when the server is started and stopped 268 * when server shuts down. References to this locator shall be used to perform according 269 * operations in EventHandlers. Primary reason for this decision is to make it mockable 270 * for tests. 271 */ 272 protected MetaTableLocator metaTableLocator; 273 274 /** 275 * Go here to get table descriptors. 276 */ 277 protected TableDescriptors tableDescriptors; 278 279 // Replication services. If no replication, this handler will be null. 280 protected ReplicationSourceService replicationSourceHandler; 281 protected ReplicationSinkService replicationSinkHandler; 282 283 // Compactions 284 public CompactSplit compactSplitThread; 285 286 /** 287 * Map of regions currently being served by this region server. Key is the 288 * encoded region name. All access should be synchronized. 289 */ 290 protected final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 291 292 /** 293 * Map of encoded region names to the DataNode locations they should be hosted on 294 * We store the value as InetSocketAddress since this is used only in HDFS 295 * API (create() that takes favored nodes as hints for placing file blocks). 296 * We could have used ServerName here as the value class, but we'd need to 297 * convert it to InetSocketAddress at some point before the HDFS API call, and 298 * it seems a bit weird to store ServerName since ServerName refers to RegionServers 299 * and here we really mean DataNode locations. 300 */ 301 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap = 302 new ConcurrentHashMap<>(); 303 304 // Leases 305 protected Leases leases; 306 307 // Instance of the hbase executor executorService. 308 protected ExecutorService executorService; 309 310 // If false, the file system has become unavailable 311 protected volatile boolean fsOk; 312 protected HFileSystem fs; 313 protected HFileSystem walFs; 314 315 // Set when a report to the master comes back with a message asking us to 316 // shutdown. Also set by call to stop when debugging or running unit tests 317 // of HRegionServer in isolation. 318 private volatile boolean stopped = false; 319 320 // Go down hard. Used if file system becomes unavailable and also in 321 // debugging and unit tests. 322 private volatile boolean abortRequested; 323 public static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 324 // Default abort timeout is 1200 seconds for safe 325 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 326 // Will run this task when abort timeout 327 public static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 328 329 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<>(); 330 331 // A state before we go into stopped state. At this stage we're closing user 332 // space regions. 333 private boolean stopping = false; 334 335 volatile boolean killed = false; 336 337 private volatile boolean shutDown = false; 338 339 protected final Configuration conf; 340 341 private Path rootDir; 342 private Path walRootDir; 343 344 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 345 346 final int numRetries; 347 protected final int threadWakeFrequency; 348 protected final int msgInterval; 349 350 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 351 private final int compactionCheckFrequency; 352 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 353 private final int flushCheckFrequency; 354 355 protected final int numRegionsToReport; 356 357 // Stub to do region server status calls against the master. 358 private volatile RegionServerStatusService.BlockingInterface rssStub; 359 private volatile LockService.BlockingInterface lockStub; 360 // RPC client. Used to make the stub above that does region server status checking. 361 RpcClient rpcClient; 362 363 private RpcRetryingCallerFactory rpcRetryingCallerFactory; 364 private RpcControllerFactory rpcControllerFactory; 365 366 private UncaughtExceptionHandler uncaughtExceptionHandler; 367 368 // Info server. Default access so can be used by unit tests. REGIONSERVER 369 // is name of the webapp and the attribute name used stuffing this instance 370 // into web context. 371 protected InfoServer infoServer; 372 private JvmPauseMonitor pauseMonitor; 373 374 /** region server process name */ 375 public static final String REGIONSERVER = "regionserver"; 376 377 MetricsRegionServer metricsRegionServer; 378 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 379 MetricsTable metricsTable; 380 private SpanReceiverHost spanReceiverHost; 381 382 /** 383 * ChoreService used to schedule tasks that we want to run periodically 384 */ 385 private ChoreService choreService; 386 387 /* 388 * Check for compactions requests. 389 */ 390 ScheduledChore compactionChecker; 391 392 /* 393 * Check for flushes 394 */ 395 ScheduledChore periodicFlusher; 396 397 protected volatile WALFactory walFactory; 398 399 // WAL roller. log is protected rather than private to avoid 400 // eclipse warning when accessed by inner classes 401 protected LogRoller walRoller; 402 403 // A thread which calls reportProcedureDone 404 private RemoteProcedureResultReporter procedureResultReporter; 405 406 // flag set after we're done setting up server threads 407 final AtomicBoolean online = new AtomicBoolean(false); 408 409 // zookeeper connection and watcher 410 protected final ZKWatcher zooKeeper; 411 412 // master address tracker 413 private final MasterAddressTracker masterAddressTracker; 414 415 // Cluster Status Tracker 416 protected final ClusterStatusTracker clusterStatusTracker; 417 418 // Log Splitting Worker 419 private SplitLogWorker splitLogWorker; 420 421 // A sleeper that sleeps for msgInterval. 422 protected final Sleeper sleeper; 423 424 private final int operationTimeout; 425 private final int shortOperationTimeout; 426 427 private final RegionServerAccounting regionServerAccounting; 428 429 // Cache configuration and block cache reference 430 protected CacheConfig cacheConfig; 431 // Cache configuration for mob 432 final MobCacheConfig mobCacheConfig; 433 434 /** The health check chore. */ 435 private HealthCheckChore healthCheckChore; 436 437 /** The nonce manager chore. */ 438 private ScheduledChore nonceManagerChore; 439 440 private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap(); 441 442 /** 443 * The server name the Master sees us as. Its made from the hostname the 444 * master passes us, port, and server startcode. Gets set after registration 445 * against Master. 446 */ 447 protected ServerName serverName; 448 449 /* 450 * hostname specified by hostname config 451 */ 452 protected String useThisHostnameInstead; 453 454 // key to the config parameter of server hostname 455 // the specification of server hostname is optional. The hostname should be resolvable from 456 // both master and region server 457 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 458 final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname"; 459 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 460 protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname"; 461 462 // HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive. 463 // Exception will be thrown if both are used. 464 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 465 "hbase.regionserver.hostname.disable.master.reversedns"; 466 467 /** 468 * This servers startcode. 469 */ 470 protected final long startcode; 471 472 /** 473 * Unique identifier for the cluster we are a part of. 474 */ 475 protected String clusterId; 476 477 /** 478 * Chore to clean periodically the moved region list 479 */ 480 private MovedRegionsCleaner movedRegionsCleaner; 481 482 // chore for refreshing store files for secondary regions 483 private StorefileRefresherChore storefileRefresher; 484 485 private RegionServerCoprocessorHost rsHost; 486 487 private RegionServerProcedureManagerHost rspmHost; 488 489 private RegionServerRpcQuotaManager rsQuotaManager; 490 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 491 492 /** 493 * Nonce manager. Nonces are used to make operations like increment and append idempotent 494 * in the case where client doesn't receive the response from a successful operation and 495 * retries. We track the successful ops for some time via a nonce sent by client and handle 496 * duplicate operations (currently, by failing them; in future we might use MVCC to return 497 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from 498 * HBASE-3787) are: 499 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth 500 * of past records. If we don't read the records, we don't read and recover the nonces. 501 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup. 502 * - There's no WAL recovery during normal region move, so nonces will not be transfered. 503 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and 504 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush 505 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was 506 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce 507 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the 508 * latest nonce in it expired. It can also be recovered during move. 509 */ 510 final ServerNonceManager nonceManager; 511 512 private UserProvider userProvider; 513 514 protected final RSRpcServices rpcServices; 515 516 protected CoordinatedStateManager csm; 517 518 /** 519 * Configuration manager is used to register/deregister and notify the configuration observers 520 * when the regionserver is notified that there was a change in the on disk configs. 521 */ 522 protected final ConfigurationManager configurationManager; 523 524 @VisibleForTesting 525 CompactedHFilesDischarger compactedFileDischarger; 526 527 private volatile ThroughputController flushThroughputController; 528 529 protected SecureBulkLoadManager secureBulkLoadManager; 530 531 protected FileSystemUtilizationChore fsUtilizationChore; 532 533 private final NettyEventLoopGroupConfig eventLoopGroupConfig; 534 535 /** 536 * True if this RegionServer is coming up in a cluster where there is no Master; 537 * means it needs to just come up and make do without a Master to talk to: e.g. in test or 538 * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only 539 * purpose is as a Replication-stream sink; see HBASE-18846 for more. 540 */ 541 private final boolean masterless; 542 static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 543 544 /** 545 * Starts a HRegionServer at the default location 546 */ 547 // Don't start any services or managers in here in the Constructor. 548 // Defer till after we register with the Master as much as possible. See #startServices. 549 public HRegionServer(Configuration conf) throws IOException { 550 super("RegionServer"); // thread name 551 TraceUtil.initTracer(conf); 552 try { 553 this.startcode = System.currentTimeMillis(); 554 this.conf = conf; 555 this.fsOk = true; 556 this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 557 this.eventLoopGroupConfig = setupNetty(this.conf); 558 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); 559 HFile.checkHFileVersion(this.conf); 560 checkCodecs(this.conf); 561 this.userProvider = UserProvider.instantiate(conf); 562 FSUtils.setupShortCircuitRead(this.conf); 563 564 // Disable usage of meta replicas in the regionserver 565 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 566 // Config'ed params 567 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 568 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 569 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 570 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 571 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 572 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); 573 574 this.sleeper = new Sleeper(this.msgInterval, this); 575 576 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 577 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 578 579 this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10); 580 581 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 582 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 583 584 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 585 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 586 587 this.abortRequested = false; 588 this.stopped = false; 589 590 rpcServices = createRpcServices(); 591 useThisHostnameInstead = getUseThisHostnameInstead(conf); 592 String hostName = 593 StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName() 594 : this.useThisHostnameInstead; 595 serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); 596 597 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); 598 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); 599 600 // login the zookeeper client principal (if using security) 601 ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, 602 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); 603 // login the server principal (if using secure Hadoop) 604 login(userProvider, hostName); 605 // init superusers and add the server principal (if using security) 606 // or process owner as default super user. 607 Superusers.initialize(conf); 608 regionServerAccounting = new RegionServerAccounting(conf); 609 610 boolean isMasterNotCarryTable = 611 this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf); 612 // no need to instantiate global block cache when master not carry table 613 if (!isMasterNotCarryTable) { 614 CacheConfig.instantiateBlockCache(conf); 615 } 616 cacheConfig = new CacheConfig(conf); 617 mobCacheConfig = new MobCacheConfig(conf); 618 619 uncaughtExceptionHandler = new UncaughtExceptionHandler() { 620 @Override 621 public void uncaughtException(Thread t, Throwable e) { 622 abort("Uncaught exception in executorService thread " + t.getName(), e); 623 } 624 }; 625 626 initializeFileSystem(); 627 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); 628 629 this.configurationManager = new ConfigurationManager(); 630 setupWindows(getConfiguration(), getConfigurationManager()); 631 632 // Some unit tests don't need a cluster, so no zookeeper at all 633 if (!conf.getBoolean("hbase.testing.nocluster", false)) { 634 // Open connection to zookeeper and set primary watcher 635 zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + 636 rpcServices.isa.getPort(), this, canCreateBaseZNode()); 637 // If no master in cluster, skip trying to track one or look for a cluster status. 638 if (!this.masterless) { 639 this.csm = new ZkCoordinatedStateManager(this); 640 641 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 642 masterAddressTracker.start(); 643 644 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); 645 clusterStatusTracker.start(); 646 } else { 647 masterAddressTracker = null; 648 clusterStatusTracker = null; 649 } 650 } else { 651 zooKeeper = null; 652 masterAddressTracker = null; 653 clusterStatusTracker = null; 654 } 655 this.rpcServices.start(zooKeeper); 656 // This violates 'no starting stuff in Constructor' but Master depends on the below chore 657 // and executor being created and takes a different startup route. Lots of overlap between HRS 658 // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super 659 // Master expects Constructor to put up web servers. Ugh. 660 // class HRS. TODO. 661 this.choreService = new ChoreService(getName(), true); 662 this.executorService = new ExecutorService(getName()); 663 putUpWebUI(); 664 } catch (Throwable t) { 665 // Make sure we log the exception. HRegionServer is often started via reflection and the 666 // cause of failed startup is lost. 667 LOG.error("Failed construction RegionServer", t); 668 throw t; 669 } 670 } 671 672 // HMaster should override this method to load the specific config for master 673 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 674 String hostname = conf.get(RS_HOSTNAME_KEY); 675 if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 676 if (!StringUtils.isBlank(hostname)) { 677 String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY + 678 " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + 679 " to true while " + RS_HOSTNAME_KEY + " is used"; 680 throw new IOException(msg); 681 } else { 682 return rpcServices.isa.getHostName(); 683 } 684 } else { 685 return hostname; 686 } 687 } 688 689 /** 690 * If running on Windows, do windows-specific setup. 691 */ 692 private static void setupWindows(final Configuration conf, ConfigurationManager cm) { 693 if (!SystemUtils.IS_OS_WINDOWS) { 694 Signal.handle(new Signal("HUP"), new SignalHandler() { 695 @Override 696 public void handle(Signal signal) { 697 conf.reloadConfiguration(); 698 cm.notifyAllObservers(conf); 699 } 700 }); 701 } 702 } 703 704 private static NettyEventLoopGroupConfig setupNetty(Configuration conf) { 705 // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL. 706 NettyEventLoopGroupConfig nelgc = 707 new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); 708 NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 709 NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 710 return nelgc; 711 } 712 713 private void initializeFileSystem() throws IOException { 714 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase 715 // checksum verification enabled, then automatically switch off hdfs checksum verification. 716 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 717 FSUtils.setFsDefault(this.conf, FSUtils.getWALRootDir(this.conf)); 718 this.walFs = new HFileSystem(this.conf, useHBaseChecksum); 719 this.walRootDir = FSUtils.getWALRootDir(this.conf); 720 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else 721 // underlying hadoop hdfs accessors will be going against wrong filesystem 722 // (unless all is set to defaults). 723 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf)); 724 this.fs = new HFileSystem(this.conf, useHBaseChecksum); 725 this.rootDir = FSUtils.getRootDir(this.conf); 726 this.tableDescriptors = getFsTableDescriptors(); 727 } 728 729 protected TableDescriptors getFsTableDescriptors() throws IOException { 730 return new FSTableDescriptors(this.conf, 731 this.fs, this.rootDir, !canUpdateTableDescriptor(), false, getMetaTableObserver()); 732 } 733 734 protected Function<TableDescriptorBuilder, TableDescriptorBuilder> getMetaTableObserver() { 735 return null; 736 } 737 738 protected void login(UserProvider user, String host) throws IOException { 739 user.login("hbase.regionserver.keytab.file", 740 "hbase.regionserver.kerberos.principal", host); 741 } 742 743 744 /** 745 * Wait for an active Master. 746 * See override in Master superclass for how it is used. 747 */ 748 protected void waitForMasterActive() {} 749 750 protected String getProcessName() { 751 return REGIONSERVER; 752 } 753 754 protected boolean canCreateBaseZNode() { 755 return this.masterless; 756 } 757 758 protected boolean canUpdateTableDescriptor() { 759 return false; 760 } 761 762 protected RSRpcServices createRpcServices() throws IOException { 763 return new RSRpcServices(this); 764 } 765 766 protected void configureInfoServer() { 767 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 768 infoServer.setAttribute(REGIONSERVER, this); 769 } 770 771 protected Class<? extends HttpServlet> getDumpServlet() { 772 return RSDumpServlet.class; 773 } 774 775 @Override 776 public boolean registerService(com.google.protobuf.Service instance) { 777 /* 778 * No stacking of instances is allowed for a single executorService name 779 */ 780 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 781 instance.getDescriptorForType(); 782 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 783 if (coprocessorServiceHandlers.containsKey(serviceName)) { 784 LOG.error("Coprocessor executorService " + serviceName 785 + " already registered, rejecting request from " + instance); 786 return false; 787 } 788 789 coprocessorServiceHandlers.put(serviceName, instance); 790 if (LOG.isDebugEnabled()) { 791 LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName); 792 } 793 return true; 794 } 795 796 /** 797 * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to 798 * the local server; i.e. a short-circuit Connection. Safe to use going to local or remote 799 * server. Create this instance in a method can be intercepted and mocked in tests. 800 * @throws IOException 801 */ 802 @VisibleForTesting 803 protected ClusterConnection createClusterConnection() throws IOException { 804 Configuration conf = this.conf; 805 if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 806 // Use server ZK cluster for server-issued connections, so we clone 807 // the conf and unset the client ZK related properties 808 conf = new Configuration(this.conf); 809 conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 810 } 811 // Create a cluster connection that when appropriate, can short-circuit and go directly to the 812 // local server if the request is to the local server bypassing RPC. Can be used for both local 813 // and remote invocations. 814 return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), 815 serverName, rpcServices, rpcServices); 816 } 817 818 /** 819 * Run test on configured codecs to make sure supporting libs are in place. 820 * @param c 821 * @throws IOException 822 */ 823 private static void checkCodecs(final Configuration c) throws IOException { 824 // check to see if the codec list is available: 825 String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null); 826 if (codecs == null) return; 827 for (String codec : codecs) { 828 if (!CompressionTest.testCompression(codec)) { 829 throw new IOException("Compression codec " + codec + 830 " not supported, aborting RS construction"); 831 } 832 } 833 } 834 835 public String getClusterId() { 836 return this.clusterId; 837 } 838 839 /** 840 * Setup our cluster connection if not already initialized. 841 * @throws IOException 842 */ 843 protected synchronized void setupClusterConnection() throws IOException { 844 if (clusterConnection == null) { 845 clusterConnection = createClusterConnection(); 846 metaTableLocator = new MetaTableLocator(); 847 } 848 } 849 850 /** 851 * All initialization needed before we go register with Master.<br> 852 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 853 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 854 */ 855 private void preRegistrationInitialization() { 856 try { 857 initializeZooKeeper(); 858 setupClusterConnection(); 859 // Setup RPC client for master communication 860 this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( 861 this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); 862 } catch (Throwable t) { 863 // Call stop if error or process will stick around for ever since server 864 // puts up non-daemon threads. 865 this.rpcServices.stop(); 866 abort("Initialization of RS failed. Hence aborting RS.", t); 867 } 868 } 869 870 /** 871 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 872 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 873 * <p> 874 * Finally open long-living server short-circuit connection. 875 */ 876 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 877 justification="cluster Id znode read would give us correct response") 878 private void initializeZooKeeper() throws IOException, InterruptedException { 879 // Nothing to do in here if no Master in the mix. 880 if (this.masterless) { 881 return; 882 } 883 884 // Create the master address tracker, register with zk, and start it. Then 885 // block until a master is available. No point in starting up if no master 886 // running. 887 blockAndCheckIfStopped(this.masterAddressTracker); 888 889 // Wait on cluster being up. Master will set this flag up in zookeeper 890 // when ready. 891 blockAndCheckIfStopped(this.clusterStatusTracker); 892 893 // If we are HMaster then the cluster id should have already been set. 894 if (clusterId == null) { 895 // Retrieve clusterId 896 // Since cluster status is now up 897 // ID should have already been set by HMaster 898 try { 899 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 900 if (clusterId == null) { 901 this.abort("Cluster ID has not been set"); 902 } 903 LOG.info("ClusterId : " + clusterId); 904 } catch (KeeperException e) { 905 this.abort("Failed to retrieve Cluster ID", e); 906 } 907 } 908 909 waitForMasterActive(); 910 if (isStopped() || isAborted()) { 911 return; // No need for further initialization 912 } 913 914 // watch for snapshots and other procedures 915 try { 916 rspmHost = new RegionServerProcedureManagerHost(); 917 rspmHost.loadProcedures(conf); 918 rspmHost.initialize(this); 919 } catch (KeeperException e) { 920 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 921 } 922 } 923 924 /** 925 * Utilty method to wait indefinitely on a znode availability while checking 926 * if the region server is shut down 927 * @param tracker znode tracker to use 928 * @throws IOException any IO exception, plus if the RS is stopped 929 * @throws InterruptedException 930 */ 931 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 932 throws IOException, InterruptedException { 933 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 934 if (this.stopped) { 935 throw new IOException("Received the shutdown message while waiting."); 936 } 937 } 938 } 939 940 /** 941 * @return True if the cluster is up. 942 */ 943 @Override 944 public boolean isClusterUp() { 945 return this.masterless || 946 (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 947 } 948 949 /** 950 * The HRegionServer sticks in this loop until closed. 951 */ 952 @Override 953 public void run() { 954 try { 955 // Do pre-registration initializations; zookeeper, lease threads, etc. 956 preRegistrationInitialization(); 957 } catch (Throwable e) { 958 abort("Fatal exception during initialization", e); 959 } 960 961 try { 962 if (!isStopped() && !isAborted()) { 963 ShutdownHook.install(conf, fs, this, Thread.currentThread()); 964 // Initialize the RegionServerCoprocessorHost now that our ephemeral 965 // node was created, in case any coprocessors want to use ZooKeeper 966 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 967 } 968 969 // Try and register with the Master; tell it we are here. Break if server is stopped or the 970 // clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and start 971 // up all Services. Use RetryCounter to get backoff in case Master is struggling to come up. 972 LOG.debug("About to register with Master."); 973 RetryCounterFactory rcf = new RetryCounterFactory(Integer.MAX_VALUE, 974 this.sleeper.getPeriod(), 1000 * 60 * 5); 975 RetryCounter rc = rcf.create(); 976 while (keepLooping()) { 977 RegionServerStartupResponse w = reportForDuty(); 978 if (w == null) { 979 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 980 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 981 this.sleeper.sleep(sleepTime); 982 } else { 983 handleReportForDutyResponse(w); 984 break; 985 } 986 } 987 988 if (!isStopped() && isHealthy()) { 989 // start the snapshot handler and other procedure handlers, 990 // since the server is ready to run 991 if (this.rspmHost != null) { 992 this.rspmHost.start(); 993 } 994 // Start the Quota Manager 995 if (this.rsQuotaManager != null) { 996 rsQuotaManager.start(getRpcServer().getScheduler()); 997 } 998 if (this.rsSpaceQuotaManager != null) { 999 this.rsSpaceQuotaManager.start(); 1000 } 1001 } 1002 1003 // We registered with the Master. Go into run mode. 1004 long lastMsg = System.currentTimeMillis(); 1005 long oldRequestCount = -1; 1006 // The main run loop. 1007 while (!isStopped() && isHealthy()) { 1008 if (!isClusterUp()) { 1009 if (isOnlineRegionsEmpty()) { 1010 stop("Exiting; cluster shutdown set and not carrying any regions"); 1011 } else if (!this.stopping) { 1012 this.stopping = true; 1013 LOG.info("Closing user regions"); 1014 closeUserRegions(this.abortRequested); 1015 } else if (this.stopping) { 1016 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 1017 if (allUserRegionsOffline) { 1018 // Set stopped if no more write requests tp meta tables 1019 // since last time we went around the loop. Any open 1020 // meta regions will be closed on our way out. 1021 if (oldRequestCount == getWriteRequestCount()) { 1022 stop("Stopped; only catalog regions remaining online"); 1023 break; 1024 } 1025 oldRequestCount = getWriteRequestCount(); 1026 } else { 1027 // Make sure all regions have been closed -- some regions may 1028 // have not got it because we were splitting at the time of 1029 // the call to closeUserRegions. 1030 closeUserRegions(this.abortRequested); 1031 } 1032 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 1033 } 1034 } 1035 long now = System.currentTimeMillis(); 1036 if ((now - lastMsg) >= msgInterval) { 1037 tryRegionServerReport(lastMsg, now); 1038 lastMsg = System.currentTimeMillis(); 1039 } 1040 if (!isStopped() && !isAborted()) { 1041 this.sleeper.sleep(); 1042 } 1043 } // for 1044 } catch (Throwable t) { 1045 if (!rpcServices.checkOOME(t)) { 1046 String prefix = t instanceof YouAreDeadException? "": "Unhandled: "; 1047 abort(prefix + t.getMessage(), t); 1048 } 1049 } 1050 1051 if (abortRequested) { 1052 Timer abortMonitor = new Timer("Abort regionserver monitor", true); 1053 TimerTask abortTimeoutTask = null; 1054 try { 1055 Constructor<? extends TimerTask> timerTaskCtor = 1056 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 1057 .asSubclass(TimerTask.class).getDeclaredConstructor(); 1058 timerTaskCtor.setAccessible(true); 1059 abortTimeoutTask = timerTaskCtor.newInstance(); 1060 } catch (Exception e) { 1061 LOG.warn("Initialize abort timeout task failed", e); 1062 } 1063 if (abortTimeoutTask != null) { 1064 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 1065 } 1066 } 1067 1068 if (this.leases != null) { 1069 this.leases.closeAfterLeasesExpire(); 1070 } 1071 if (this.splitLogWorker != null) { 1072 splitLogWorker.stop(); 1073 } 1074 if (this.infoServer != null) { 1075 LOG.info("Stopping infoServer"); 1076 try { 1077 this.infoServer.stop(); 1078 } catch (Exception e) { 1079 LOG.error("Failed to stop infoServer", e); 1080 } 1081 } 1082 // Send cache a shutdown. 1083 if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) { 1084 cacheConfig.getBlockCache().shutdown(); 1085 } 1086 mobCacheConfig.getMobFileCache().shutdown(); 1087 1088 if (movedRegionsCleaner != null) { 1089 movedRegionsCleaner.stop("Region Server stopping"); 1090 } 1091 1092 // Send interrupts to wake up threads if sleeping so they notice shutdown. 1093 // TODO: Should we check they are alive? If OOME could have exited already 1094 if (this.hMemManager != null) this.hMemManager.stop(); 1095 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); 1096 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); 1097 sendShutdownInterrupt(); 1098 1099 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 1100 if (rspmHost != null) { 1101 rspmHost.stop(this.abortRequested || this.killed); 1102 } 1103 1104 if (this.killed) { 1105 // Just skip out w/o closing regions. Used when testing. 1106 } else if (abortRequested) { 1107 if (this.fsOk) { 1108 closeUserRegions(abortRequested); // Don't leave any open file handles 1109 } 1110 LOG.info("aborting server " + this.serverName); 1111 } else { 1112 closeUserRegions(abortRequested); 1113 LOG.info("stopping server " + this.serverName); 1114 } 1115 1116 // so callers waiting for meta without timeout can stop 1117 if (this.metaTableLocator != null) this.metaTableLocator.stop(); 1118 if (this.clusterConnection != null && !clusterConnection.isClosed()) { 1119 try { 1120 this.clusterConnection.close(); 1121 } catch (IOException e) { 1122 // Although the {@link Closeable} interface throws an {@link 1123 // IOException}, in reality, the implementation would never do that. 1124 LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e); 1125 } 1126 } 1127 1128 // Closing the compactSplit thread before closing meta regions 1129 if (!this.killed && containsMetaTableRegions()) { 1130 if (!abortRequested || this.fsOk) { 1131 if (this.compactSplitThread != null) { 1132 this.compactSplitThread.join(); 1133 this.compactSplitThread = null; 1134 } 1135 closeMetaTableRegions(abortRequested); 1136 } 1137 } 1138 1139 if (!this.killed && this.fsOk) { 1140 waitOnAllRegionsToClose(abortRequested); 1141 LOG.info("stopping server " + this.serverName + "; all regions closed."); 1142 } 1143 1144 // Stop the quota manager 1145 if (rsQuotaManager != null) { 1146 rsQuotaManager.stop(); 1147 } 1148 if (rsSpaceQuotaManager != null) { 1149 rsSpaceQuotaManager.stop(); 1150 rsSpaceQuotaManager = null; 1151 } 1152 1153 //fsOk flag may be changed when closing regions throws exception. 1154 if (this.fsOk) { 1155 shutdownWAL(!abortRequested); 1156 } 1157 1158 // Make sure the proxy is down. 1159 if (this.rssStub != null) { 1160 this.rssStub = null; 1161 } 1162 if (this.lockStub != null) { 1163 this.lockStub = null; 1164 } 1165 if (this.rpcClient != null) { 1166 this.rpcClient.close(); 1167 } 1168 if (this.leases != null) { 1169 this.leases.close(); 1170 } 1171 if (this.pauseMonitor != null) { 1172 this.pauseMonitor.stop(); 1173 } 1174 1175 if (!killed) { 1176 stopServiceThreads(); 1177 } 1178 1179 if (this.rpcServices != null) { 1180 this.rpcServices.stop(); 1181 } 1182 1183 try { 1184 deleteMyEphemeralNode(); 1185 } catch (KeeperException.NoNodeException nn) { 1186 } catch (KeeperException e) { 1187 LOG.warn("Failed deleting my ephemeral node", e); 1188 } 1189 // We may have failed to delete the znode at the previous step, but 1190 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1191 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1192 1193 if (this.zooKeeper != null) { 1194 this.zooKeeper.close(); 1195 } 1196 this.shutDown = true; 1197 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1198 } 1199 1200 private boolean containsMetaTableRegions() { 1201 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1202 } 1203 1204 private boolean areAllUserRegionsOffline() { 1205 if (getNumberOfOnlineRegions() > 2) return false; 1206 boolean allUserRegionsOffline = true; 1207 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 1208 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1209 allUserRegionsOffline = false; 1210 break; 1211 } 1212 } 1213 return allUserRegionsOffline; 1214 } 1215 1216 /** 1217 * @return Current write count for all online regions. 1218 */ 1219 private long getWriteRequestCount() { 1220 long writeCount = 0; 1221 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 1222 writeCount += e.getValue().getWriteRequestsCount(); 1223 } 1224 return writeCount; 1225 } 1226 1227 @VisibleForTesting 1228 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1229 throws IOException { 1230 RegionServerStatusService.BlockingInterface rss = rssStub; 1231 if (rss == null) { 1232 // the current server could be stopping. 1233 return; 1234 } 1235 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1236 try { 1237 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1238 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1239 request.setLoad(sl); 1240 rss.regionServerReport(null, request.build()); 1241 } catch (ServiceException se) { 1242 IOException ioe = ProtobufUtil.getRemoteException(se); 1243 if (ioe instanceof YouAreDeadException) { 1244 // This will be caught and handled as a fatal error in run() 1245 throw ioe; 1246 } 1247 if (rssStub == rss) { 1248 rssStub = null; 1249 } 1250 // Couldn't connect to the master, get location from zk and reconnect 1251 // Method blocks until new master is found or we are stopped 1252 createRegionServerStatusStub(true); 1253 } 1254 } 1255 1256 /** 1257 * Reports the given map of Regions and their size on the filesystem to the active Master. 1258 * 1259 * @param regionSizeStore The store containing region sizes 1260 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1261 */ 1262 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1263 RegionServerStatusService.BlockingInterface rss = rssStub; 1264 if (rss == null) { 1265 // the current server could be stopping. 1266 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1267 return true; 1268 } 1269 try { 1270 buildReportAndSend(rss, regionSizeStore); 1271 } catch (ServiceException se) { 1272 IOException ioe = ProtobufUtil.getRemoteException(se); 1273 if (ioe instanceof PleaseHoldException) { 1274 LOG.trace("Failed to report region sizes to Master because it is initializing." 1275 + " This will be retried.", ioe); 1276 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1277 return true; 1278 } 1279 if (rssStub == rss) { 1280 rssStub = null; 1281 } 1282 createRegionServerStatusStub(true); 1283 if (ioe instanceof DoNotRetryIOException) { 1284 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1285 if (doNotRetryEx.getCause() != null) { 1286 Throwable t = doNotRetryEx.getCause(); 1287 if (t instanceof UnsupportedOperationException) { 1288 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1289 return false; 1290 } 1291 } 1292 } 1293 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1294 } 1295 return true; 1296 } 1297 1298 /** 1299 * Builds the region size report and sends it to the master. Upon successful sending of the 1300 * report, the region sizes that were sent are marked as sent. 1301 * 1302 * @param rss The stub to send to the Master 1303 * @param regionSizeStore The store containing region sizes 1304 */ 1305 void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1306 RegionSizeStore regionSizeStore) throws ServiceException { 1307 RegionSpaceUseReportRequest request = 1308 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1309 rss.reportRegionSpaceUse(null, request); 1310 // Record the number of size reports sent 1311 if (metricsRegionServer != null) { 1312 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1313 } 1314 } 1315 1316 /** 1317 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1318 * 1319 * @param regionSizeStore The size in bytes of regions 1320 * @return The corresponding protocol buffer message. 1321 */ 1322 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1323 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1324 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1325 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1326 } 1327 return request.build(); 1328 } 1329 1330 /** 1331 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} 1332 * protobuf message. 1333 * 1334 * @param regionInfo The RegionInfo 1335 * @param sizeInBytes The size in bytes of the Region 1336 * @return The protocol buffer 1337 */ 1338 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1339 return RegionSpaceUse.newBuilder() 1340 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1341 .setRegionSize(Objects.requireNonNull(sizeInBytes)) 1342 .build(); 1343 } 1344 1345 ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1346 throws IOException { 1347 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1348 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1349 // the wrapper to compute those numbers in one place. 1350 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1351 // Instead they should be stored in an HBase table so that external visibility into HBase is 1352 // improved; Additionally the load balancer will be able to take advantage of a more complete 1353 // history. 1354 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1355 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1356 long usedMemory = -1L; 1357 long maxMemory = -1L; 1358 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1359 if (usage != null) { 1360 usedMemory = usage.getUsed(); 1361 maxMemory = usage.getMax(); 1362 } 1363 1364 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1365 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1366 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1367 serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024)); 1368 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1369 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1370 Builder coprocessorBuilder = Coprocessor.newBuilder(); 1371 for (String coprocessor : coprocessors) { 1372 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1373 } 1374 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1375 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1376 for (HRegion region : regions) { 1377 if (region.getCoprocessorHost() != null) { 1378 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1379 Iterator<String> iterator = regionCoprocessors.iterator(); 1380 while (iterator.hasNext()) { 1381 serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build()); 1382 } 1383 } 1384 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1385 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1386 .getCoprocessors()) { 1387 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1388 } 1389 } 1390 serverLoad.setReportStartTime(reportStartTime); 1391 serverLoad.setReportEndTime(reportEndTime); 1392 if (this.infoServer != null) { 1393 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1394 } else { 1395 serverLoad.setInfoServerPort(-1); 1396 } 1397 1398 // for the replicationLoad purpose. Only need to get from one executorService 1399 // either source or sink will get the same info 1400 ReplicationSourceService rsources = getReplicationSourceService(); 1401 1402 if (rsources != null) { 1403 // always refresh first to get the latest value 1404 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); 1405 if (rLoad != null) { 1406 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1407 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) { 1408 serverLoad.addReplLoadSource(rLS); 1409 } 1410 } 1411 } 1412 1413 return serverLoad.build(); 1414 } 1415 1416 String getOnlineRegionsAsPrintableString() { 1417 StringBuilder sb = new StringBuilder(); 1418 for (Region r: this.onlineRegions.values()) { 1419 if (sb.length() > 0) sb.append(", "); 1420 sb.append(r.getRegionInfo().getEncodedName()); 1421 } 1422 return sb.toString(); 1423 } 1424 1425 /** 1426 * Wait on regions close. 1427 */ 1428 private void waitOnAllRegionsToClose(final boolean abort) { 1429 // Wait till all regions are closed before going out. 1430 int lastCount = -1; 1431 long previousLogTime = 0; 1432 Set<String> closedRegions = new HashSet<>(); 1433 boolean interrupted = false; 1434 try { 1435 while (!isOnlineRegionsEmpty()) { 1436 int count = getNumberOfOnlineRegions(); 1437 // Only print a message if the count of regions has changed. 1438 if (count != lastCount) { 1439 // Log every second at most 1440 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 1441 previousLogTime = System.currentTimeMillis(); 1442 lastCount = count; 1443 LOG.info("Waiting on " + count + " regions to close"); 1444 // Only print out regions still closing if a small number else will 1445 // swamp the log. 1446 if (count < 10 && LOG.isDebugEnabled()) { 1447 LOG.debug("Online Regions=" + this.onlineRegions); 1448 } 1449 } 1450 } 1451 // Ensure all user regions have been sent a close. Use this to 1452 // protect against the case where an open comes in after we start the 1453 // iterator of onlineRegions to close all user regions. 1454 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1455 RegionInfo hri = e.getValue().getRegionInfo(); 1456 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) && 1457 !closedRegions.contains(hri.getEncodedName())) { 1458 closedRegions.add(hri.getEncodedName()); 1459 // Don't update zk with this close transition; pass false. 1460 closeRegionIgnoreErrors(hri, abort); 1461 } 1462 } 1463 // No regions in RIT, we could stop waiting now. 1464 if (this.regionsInTransitionInRS.isEmpty()) { 1465 if (!isOnlineRegionsEmpty()) { 1466 LOG.info("We were exiting though online regions are not empty," + 1467 " because some regions failed closing"); 1468 } 1469 break; 1470 } 1471 if (sleep(200)) { 1472 interrupted = true; 1473 } 1474 } 1475 } finally { 1476 if (interrupted) { 1477 Thread.currentThread().interrupt(); 1478 } 1479 } 1480 } 1481 1482 private boolean sleep(long millis) { 1483 boolean interrupted = false; 1484 try { 1485 Thread.sleep(millis); 1486 } catch (InterruptedException e) { 1487 LOG.warn("Interrupted while sleeping"); 1488 interrupted = true; 1489 } 1490 return interrupted; 1491 } 1492 1493 private void shutdownWAL(final boolean close) { 1494 if (this.walFactory != null) { 1495 try { 1496 if (close) { 1497 walFactory.close(); 1498 } else { 1499 walFactory.shutdown(); 1500 } 1501 } catch (Throwable e) { 1502 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1503 LOG.error("Shutdown / close of WAL failed: " + e); 1504 LOG.debug("Shutdown / close exception details:", e); 1505 } 1506 } 1507 } 1508 1509 /* 1510 * Run init. Sets up wal and starts up all server threads. 1511 * 1512 * @param c Extra configuration. 1513 */ 1514 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1515 throws IOException { 1516 try { 1517 boolean updateRootDir = false; 1518 for (NameStringPair e : c.getMapEntriesList()) { 1519 String key = e.getName(); 1520 // The hostname the master sees us as. 1521 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1522 String hostnameFromMasterPOV = e.getValue(); 1523 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(), 1524 this.startcode); 1525 if (!StringUtils.isBlank(useThisHostnameInstead) && 1526 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) { 1527 String msg = "Master passed us a different hostname to use; was=" + 1528 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV; 1529 LOG.error(msg); 1530 throw new IOException(msg); 1531 } 1532 if (StringUtils.isBlank(useThisHostnameInstead) && 1533 !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) { 1534 String msg = "Master passed us a different hostname to use; was=" + 1535 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV; 1536 LOG.error(msg); 1537 } 1538 continue; 1539 } 1540 1541 String value = e.getValue(); 1542 if (key.equals(HConstants.HBASE_DIR)) { 1543 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1544 updateRootDir = true; 1545 } 1546 } 1547 1548 if (LOG.isDebugEnabled()) { 1549 LOG.debug("Config from master: " + key + "=" + value); 1550 } 1551 this.conf.set(key, value); 1552 } 1553 // Set our ephemeral znode up in zookeeper now we have a name. 1554 createMyEphemeralNode(); 1555 1556 if (updateRootDir) { 1557 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1558 initializeFileSystem(); 1559 } 1560 1561 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1562 // config param for task trackers, but we can piggyback off of it. 1563 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1564 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1565 } 1566 1567 // Save it in a file, this will allow to see if we crash 1568 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1569 1570 // This call sets up an initialized replication and WAL. Later we start it up. 1571 setupWALAndReplication(); 1572 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1573 // Init in here rather than in constructor after thread name has been set 1574 this.metricsRegionServer = new MetricsRegionServer(metricsRegionServerImpl, conf); 1575 this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1576 // Now that we have a metrics source, start the pause monitor 1577 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1578 pauseMonitor.start(); 1579 1580 // There is a rare case where we do NOT want services to start. Check config. 1581 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1582 startServices(); 1583 } 1584 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1585 // or make sense of it. 1586 startReplicationService(); 1587 1588 1589 // Set up ZK 1590 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + 1591 ", sessionid=0x" + 1592 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1593 1594 // Wake up anyone waiting for this server to online 1595 synchronized (online) { 1596 online.set(true); 1597 online.notifyAll(); 1598 } 1599 } catch (Throwable e) { 1600 stop("Failed initialization"); 1601 throw convertThrowableToIOE(cleanup(e, "Failed init"), 1602 "Region server startup failed"); 1603 } finally { 1604 sleeper.skipSleepCycle(); 1605 } 1606 } 1607 1608 protected void initializeMemStoreChunkCreator() { 1609 if (MemStoreLAB.isEnabled(conf)) { 1610 // MSLAB is enabled. So initialize MemStoreChunkPool 1611 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from 1612 // it. 1613 Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf); 1614 long globalMemStoreSize = pair.getFirst(); 1615 boolean offheap = this.regionServerAccounting.isOffheap(); 1616 // When off heap memstore in use, take full area for chunk pool. 1617 float poolSizePercentage = offheap? 1.0F: 1618 conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); 1619 float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, 1620 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); 1621 int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); 1622 // init the chunkCreator 1623 ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, 1624 initialCountPercentage, this.hMemManager); 1625 } 1626 } 1627 1628 private void startHeapMemoryManager() { 1629 this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this, 1630 this.regionServerAccounting); 1631 if (this.hMemManager != null) { 1632 this.hMemManager.start(getChoreService()); 1633 } 1634 } 1635 1636 private void createMyEphemeralNode() throws KeeperException, IOException { 1637 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1638 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1639 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1640 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1641 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1642 } 1643 1644 private void deleteMyEphemeralNode() throws KeeperException { 1645 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1646 } 1647 1648 @Override 1649 public RegionServerAccounting getRegionServerAccounting() { 1650 return regionServerAccounting; 1651 } 1652 1653 /* 1654 * @param r Region to get RegionLoad for. 1655 * @param regionLoadBldr the RegionLoad.Builder, can be null 1656 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1657 * @return RegionLoad instance. 1658 * 1659 * @throws IOException 1660 */ 1661 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1662 RegionSpecifier.Builder regionSpecifier) throws IOException { 1663 byte[] name = r.getRegionInfo().getRegionName(); 1664 int stores = 0; 1665 int storefiles = 0; 1666 int storeUncompressedSizeMB = 0; 1667 int storefileSizeMB = 0; 1668 int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024); 1669 long storefileIndexSizeKB = 0; 1670 int rootLevelIndexSizeKB = 0; 1671 int totalStaticIndexSizeKB = 0; 1672 int totalStaticBloomSizeKB = 0; 1673 long totalCompactingKVs = 0; 1674 long currentCompactedKVs = 0; 1675 List<HStore> storeList = r.getStores(); 1676 stores += storeList.size(); 1677 for (HStore store : storeList) { 1678 storefiles += store.getStorefilesCount(); 1679 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024); 1680 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); 1681 //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1682 storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024; 1683 CompactionProgress progress = store.getCompactionProgress(); 1684 if (progress != null) { 1685 totalCompactingKVs += progress.getTotalCompactingKVs(); 1686 currentCompactedKVs += progress.currentCompactedKVs; 1687 } 1688 rootLevelIndexSizeKB += (int) (store.getStorefilesRootLevelIndexSize() / 1024); 1689 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024); 1690 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024); 1691 } 1692 1693 float dataLocality = 1694 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname()); 1695 if (regionLoadBldr == null) { 1696 regionLoadBldr = RegionLoad.newBuilder(); 1697 } 1698 if (regionSpecifier == null) { 1699 regionSpecifier = RegionSpecifier.newBuilder(); 1700 } 1701 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1702 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1703 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()) 1704 .setStores(stores) 1705 .setStorefiles(storefiles) 1706 .setStoreUncompressedSizeMB(storeUncompressedSizeMB) 1707 .setStorefileSizeMB(storefileSizeMB) 1708 .setMemStoreSizeMB(memstoreSizeMB) 1709 .setStorefileIndexSizeKB(storefileIndexSizeKB) 1710 .setRootIndexSizeKB(rootLevelIndexSizeKB) 1711 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1712 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1713 .setReadRequestsCount(r.getReadRequestsCount()) 1714 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1715 .setWriteRequestsCount(r.getWriteRequestsCount()) 1716 .setTotalCompactingKVs(totalCompactingKVs) 1717 .setCurrentCompactedKVs(currentCompactedKVs) 1718 .setDataLocality(dataLocality) 1719 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); 1720 r.setCompleteSequenceId(regionLoadBldr); 1721 1722 return regionLoadBldr.build(); 1723 } 1724 1725 /** 1726 * @param encodedRegionName 1727 * @return An instance of RegionLoad. 1728 */ 1729 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1730 HRegion r = onlineRegions.get(encodedRegionName); 1731 return r != null ? createRegionLoad(r, null, null) : null; 1732 } 1733 1734 /* 1735 * Inner class that runs on a long period checking if regions need compaction. 1736 */ 1737 private static class CompactionChecker extends ScheduledChore { 1738 private final HRegionServer instance; 1739 private final int majorCompactPriority; 1740 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1741 //Iteration is 1-based rather than 0-based so we don't check for compaction 1742 // immediately upon region server startup 1743 private long iteration = 1; 1744 1745 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1746 super("CompactionChecker", stopper, sleepTime); 1747 this.instance = h; 1748 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1749 1750 /* MajorCompactPriority is configurable. 1751 * If not set, the compaction will use default priority. 1752 */ 1753 this.majorCompactPriority = this.instance.conf. 1754 getInt("hbase.regionserver.compactionChecker.majorCompactPriority", 1755 DEFAULT_PRIORITY); 1756 } 1757 1758 @Override 1759 protected void chore() { 1760 for (Region r : this.instance.onlineRegions.values()) { 1761 if (r == null) { 1762 continue; 1763 } 1764 HRegion hr = (HRegion) r; 1765 for (HStore s : hr.stores.values()) { 1766 try { 1767 long multiplier = s.getCompactionCheckMultiplier(); 1768 assert multiplier > 0; 1769 if (iteration % multiplier != 0) { 1770 continue; 1771 } 1772 if (s.needsCompaction()) { 1773 // Queue a compaction. Will recognize if major is needed. 1774 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1775 getName() + " requests compaction"); 1776 } else if (s.shouldPerformMajorCompaction()) { 1777 s.triggerMajorCompaction(); 1778 if (majorCompactPriority == DEFAULT_PRIORITY || 1779 majorCompactPriority > hr.getCompactPriority()) { 1780 this.instance.compactSplitThread.requestCompaction(hr, s, 1781 getName() + " requests major compaction; use default priority", 1782 Store.NO_PRIORITY, 1783 CompactionLifeCycleTracker.DUMMY, null); 1784 } else { 1785 this.instance.compactSplitThread.requestCompaction(hr, s, 1786 getName() + " requests major compaction; use configured priority", 1787 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1788 } 1789 } 1790 } catch (IOException e) { 1791 LOG.warn("Failed major compaction check on " + r, e); 1792 } 1793 } 1794 } 1795 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1796 } 1797 } 1798 1799 static class PeriodicMemStoreFlusher extends ScheduledChore { 1800 final HRegionServer server; 1801 final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1802 final static int MIN_DELAY_TIME = 0; // millisec 1803 1804 final int rangeOfDelay; 1805 public PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1806 super("MemstoreFlusherChore", server, cacheFlushInterval); 1807 this.server = server; 1808 1809 this.rangeOfDelay = this.server.conf.getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", 1810 RANGE_OF_DELAY)*1000; 1811 } 1812 1813 @Override 1814 protected void chore() { 1815 final StringBuilder whyFlush = new StringBuilder(); 1816 for (HRegion r : this.server.onlineRegions.values()) { 1817 if (r == null) continue; 1818 if (r.shouldFlush(whyFlush)) { 1819 FlushRequester requester = server.getFlushRequester(); 1820 if (requester != null) { 1821 long randomDelay = (long) RandomUtils.nextInt(0, rangeOfDelay) + MIN_DELAY_TIME; 1822 //Throttle the flushes by putting a delay. If we don't throttle, and there 1823 //is a balanced write-load on the regions in a table, we might end up 1824 //overwhelming the filesystem with too many flushes at once. 1825 if (requester.requestDelayedFlush(r, randomDelay, false)) { 1826 LOG.info("{} requesting flush of {} because {} after random delay {} ms", 1827 getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), 1828 randomDelay); 1829 } 1830 } 1831 } 1832 } 1833 } 1834 } 1835 1836 /** 1837 * Report the status of the server. A server is online once all the startup is 1838 * completed (setting up filesystem, starting executorService threads, etc.). This 1839 * method is designed mostly to be useful in tests. 1840 * 1841 * @return true if online, false if not. 1842 */ 1843 public boolean isOnline() { 1844 return online.get(); 1845 } 1846 1847 /** 1848 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1849 * be hooked up to WAL. 1850 */ 1851 private void setupWALAndReplication() throws IOException { 1852 WALFactory factory = new WALFactory(conf, serverName.toString()); 1853 1854 // TODO Replication make assumptions here based on the default filesystem impl 1855 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1856 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1857 1858 Path logDir = new Path(walRootDir, logName); 1859 LOG.debug("logDir={}", logDir); 1860 if (this.walFs.exists(logDir)) { 1861 throw new RegionServerRunningException( 1862 "Region server has already created directory at " + this.serverName.toString()); 1863 } 1864 // Always create wal directory as now we need this when master restarts to find out the live 1865 // region servers. 1866 if (!this.walFs.mkdirs(logDir)) { 1867 throw new IOException("Can not create wal directory " + logDir); 1868 } 1869 // Instantiate replication if replication enabled. Pass it the log directories. 1870 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, 1871 factory.getWALProvider()); 1872 this.walFactory = factory; 1873 } 1874 1875 /** 1876 * Start up replication source and sink handlers. 1877 * @throws IOException 1878 */ 1879 private void startReplicationService() throws IOException { 1880 if (this.replicationSourceHandler == this.replicationSinkHandler && 1881 this.replicationSourceHandler != null) { 1882 this.replicationSourceHandler.startReplicationService(); 1883 } else { 1884 if (this.replicationSourceHandler != null) { 1885 this.replicationSourceHandler.startReplicationService(); 1886 } 1887 if (this.replicationSinkHandler != null) { 1888 this.replicationSinkHandler.startReplicationService(); 1889 } 1890 } 1891 } 1892 1893 1894 public MetricsRegionServer getRegionServerMetrics() { 1895 return this.metricsRegionServer; 1896 } 1897 1898 /** 1899 * @return Master address tracker instance. 1900 */ 1901 public MasterAddressTracker getMasterAddressTracker() { 1902 return this.masterAddressTracker; 1903 } 1904 1905 /* 1906 * Start maintenance Threads, Server, Worker and lease checker threads. 1907 * Start all threads we need to run. This is called after we've successfully 1908 * registered with the Master. 1909 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we 1910 * get an unhandled exception. We cannot set the handler on all threads. 1911 * Server's internal Listener thread is off limits. For Server, if an OOME, it 1912 * waits a while then retries. Meantime, a flush or a compaction that tries to 1913 * run should trigger same critical condition and the shutdown will run. On 1914 * its way out, this server will shut down Server. Leases are sort of 1915 * inbetween. It has an internal thread that while it inherits from Chore, it 1916 * keeps its own internal stop mechanism so needs to be stopped by this 1917 * hosting server. Worker logs the exception and exits. 1918 */ 1919 private void startServices() throws IOException { 1920 if (!isStopped() && !isAborted()) { 1921 initializeThreads(); 1922 } 1923 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); 1924 this.secureBulkLoadManager.start(); 1925 1926 // Health checker thread. 1927 if (isHealthCheckerConfigured()) { 1928 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1929 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1930 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1931 } 1932 1933 this.walRoller = new LogRoller(this); 1934 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1935 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 1936 1937 // Create the CompactedFileDischarger chore executorService. This chore helps to 1938 // remove the compacted files that will no longer be used in reads. 1939 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1940 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1941 int cleanerInterval = 1942 conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1943 this.compactedFileDischarger = 1944 new CompactedHFilesDischarger(cleanerInterval, this, this); 1945 choreService.scheduleChore(compactedFileDischarger); 1946 1947 // Start executor services 1948 this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION, 1949 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1950 this.executorService.startExecutorService(ExecutorType.RS_OPEN_META, 1951 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); 1952 this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, 1953 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); 1954 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION, 1955 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); 1956 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META, 1957 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); 1958 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1959 this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, 1960 conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); 1961 } 1962 this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( 1963 "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); 1964 // Start the threads for compacted files discharger 1965 this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, 1966 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); 1967 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1968 this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, 1969 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1970 conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); 1971 } 1972 this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER, 1973 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); 1974 1975 Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", 1976 uncaughtExceptionHandler); 1977 this.cacheFlusher.start(uncaughtExceptionHandler); 1978 Threads.setDaemonThreadRunning(this.procedureResultReporter, 1979 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 1980 1981 if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); 1982 if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); 1983 if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); 1984 if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); 1985 if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); 1986 if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); 1987 if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore); 1988 1989 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 1990 // an unhandled exception, it will just exit. 1991 Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", 1992 uncaughtExceptionHandler); 1993 1994 // Create the log splitting worker and start it 1995 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 1996 // quite a while inside Connection layer. The worker won't be available for other 1997 // tasks even after current task is preempted after a split task times out. 1998 Configuration sinkConf = HBaseConfiguration.create(conf); 1999 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2000 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 2001 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2002 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 2003 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 2004 if (this.csm != null) { 2005 // SplitLogWorker needs csm. If none, don't start this. 2006 this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, 2007 this, walFactory); 2008 splitLogWorker.start(); 2009 } else { 2010 LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null"); 2011 } 2012 2013 // Memstore services. 2014 startHeapMemoryManager(); 2015 // Call it after starting HeapMemoryManager. 2016 initializeMemStoreChunkCreator(); 2017 } 2018 2019 private void initializeThreads() throws IOException { 2020 // Cache flushing thread. 2021 this.cacheFlusher = new MemStoreFlusher(conf, this); 2022 2023 // Compaction thread 2024 this.compactSplitThread = new CompactSplit(this); 2025 2026 // Background thread to check for compactions; needed if region has not gotten updates 2027 // in a while. It will take care of not checking too frequently on store-by-store basis. 2028 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 2029 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 2030 this.leases = new Leases(this.threadWakeFrequency); 2031 2032 // Create the thread to clean the moved regions list 2033 movedRegionsCleaner = MovedRegionsCleaner.create(this); 2034 2035 if (this.nonceManager != null) { 2036 // Create the scheduled chore that cleans up nonces. 2037 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2038 } 2039 2040 // Setup the Quota Manager 2041 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2042 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2043 2044 if (QuotaUtil.isQuotaEnabled(conf)) { 2045 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2046 } 2047 2048 2049 boolean onlyMetaRefresh = false; 2050 int storefileRefreshPeriod = conf.getInt( 2051 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2052 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2053 if (storefileRefreshPeriod == 0) { 2054 storefileRefreshPeriod = conf.getInt( 2055 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2056 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2057 onlyMetaRefresh = true; 2058 } 2059 if (storefileRefreshPeriod > 0) { 2060 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, 2061 onlyMetaRefresh, this, this); 2062 } 2063 registerConfigurationObservers(); 2064 } 2065 2066 private void registerConfigurationObservers() { 2067 // Registering the compactSplitThread object with the ConfigurationManager. 2068 configurationManager.registerObserver(this.compactSplitThread); 2069 configurationManager.registerObserver(this.rpcServices); 2070 configurationManager.registerObserver(this); 2071 } 2072 2073 /** 2074 * Puts up the webui. 2075 * @return Returns final port -- maybe different from what we started with. 2076 * @throws IOException 2077 */ 2078 private int putUpWebUI() throws IOException { 2079 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 2080 HConstants.DEFAULT_REGIONSERVER_INFOPORT); 2081 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); 2082 2083 if(this instanceof HMaster) { 2084 port = conf.getInt(HConstants.MASTER_INFO_PORT, 2085 HConstants.DEFAULT_MASTER_INFOPORT); 2086 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); 2087 } 2088 // -1 is for disabling info server 2089 if (port < 0) return port; 2090 2091 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { 2092 String msg = 2093 "Failed to start http info server. Address " + addr 2094 + " does not belong to this host. Correct configuration parameter: " 2095 + "hbase.regionserver.info.bindAddress"; 2096 LOG.error(msg); 2097 throw new IOException(msg); 2098 } 2099 // check if auto port bind enabled 2100 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, 2101 false); 2102 while (true) { 2103 try { 2104 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf); 2105 infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet()); 2106 configureInfoServer(); 2107 this.infoServer.start(); 2108 break; 2109 } catch (BindException e) { 2110 if (!auto) { 2111 // auto bind disabled throw BindException 2112 LOG.error("Failed binding http info server to port: " + port); 2113 throw e; 2114 } 2115 // auto bind enabled, try to use another port 2116 LOG.info("Failed binding http info server to port: " + port); 2117 port++; 2118 } 2119 } 2120 port = this.infoServer.getPort(); 2121 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port); 2122 int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT, 2123 HConstants.DEFAULT_MASTER_INFOPORT); 2124 conf.setInt("hbase.master.info.port.orig", masterInfoPort); 2125 conf.setInt(HConstants.MASTER_INFO_PORT, port); 2126 return port; 2127 } 2128 2129 /* 2130 * Verify that server is healthy 2131 */ 2132 private boolean isHealthy() { 2133 if (!fsOk) { 2134 // File system problem 2135 return false; 2136 } 2137 // Verify that all threads are alive 2138 boolean healthy = (this.leases == null || this.leases.isAlive()) 2139 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2140 && (this.walRoller == null || this.walRoller.isAlive()) 2141 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2142 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2143 if (!healthy) { 2144 stop("One or more threads are no longer alive -- stop"); 2145 } 2146 return healthy; 2147 } 2148 2149 @Override 2150 public List<WAL> getWALs() throws IOException { 2151 return walFactory.getWALs(); 2152 } 2153 2154 @Override 2155 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2156 try { 2157 WAL wal = walFactory.getWAL(regionInfo); 2158 if (this.walRoller != null) { 2159 this.walRoller.addWAL(wal); 2160 } 2161 return wal; 2162 }catch (FailedCloseWALAfterInitializedErrorException ex) { 2163 // see HBASE-21751 for details 2164 abort("wal can not clean up after init failed", ex); 2165 throw ex; 2166 } 2167 } 2168 2169 public LogRoller getWalRoller() { 2170 return walRoller; 2171 } 2172 2173 @Override 2174 public Connection getConnection() { 2175 return getClusterConnection(); 2176 } 2177 2178 @Override 2179 public ClusterConnection getClusterConnection() { 2180 return this.clusterConnection; 2181 } 2182 2183 @Override 2184 public MetaTableLocator getMetaTableLocator() { 2185 return this.metaTableLocator; 2186 } 2187 2188 @Override 2189 public void stop(final String msg) { 2190 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2191 } 2192 2193 /** 2194 * Stops the regionserver. 2195 * @param msg Status message 2196 * @param force True if this is a regionserver abort 2197 * @param user The user executing the stop request, or null if no user is associated 2198 */ 2199 public void stop(final String msg, final boolean force, final User user) { 2200 if (!this.stopped) { 2201 LOG.info("***** STOPPING region server '" + this + "' *****"); 2202 if (this.rsHost != null) { 2203 // when forced via abort don't allow CPs to override 2204 try { 2205 this.rsHost.preStop(msg, user); 2206 } catch (IOException ioe) { 2207 if (!force) { 2208 LOG.warn("The region server did not stop", ioe); 2209 return; 2210 } 2211 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2212 } 2213 } 2214 this.stopped = true; 2215 LOG.info("STOPPED: " + msg); 2216 // Wakes run() if it is sleeping 2217 sleeper.skipSleepCycle(); 2218 } 2219 } 2220 2221 public void waitForServerOnline(){ 2222 while (!isStopped() && !isOnline()) { 2223 synchronized (online) { 2224 try { 2225 online.wait(msgInterval); 2226 } catch (InterruptedException ie) { 2227 Thread.currentThread().interrupt(); 2228 break; 2229 } 2230 } 2231 } 2232 } 2233 2234 @Override 2235 public void postOpenDeployTasks(final PostOpenDeployContext context) 2236 throws KeeperException, IOException { 2237 HRegion r = context.getRegion(); 2238 long masterSystemTime = context.getMasterSystemTime(); 2239 rpcServices.checkOpen(); 2240 LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString()); 2241 // Do checks to see if we need to compact (references or too many files) 2242 for (HStore s : r.stores.values()) { 2243 if (s.hasReferences() || s.needsCompaction()) { 2244 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2245 } 2246 } 2247 long openSeqNum = r.getOpenSeqNum(); 2248 if (openSeqNum == HConstants.NO_SEQNUM) { 2249 // If we opened a region, we should have read some sequence number from it. 2250 LOG.error("No sequence number found when opening " + 2251 r.getRegionInfo().getRegionNameAsString()); 2252 openSeqNum = 0; 2253 } 2254 2255 // Notify master 2256 if (!reportRegionStateTransition(new RegionStateTransitionContext( 2257 TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) { 2258 throw new IOException("Failed to report opened region to master: " 2259 + r.getRegionInfo().getRegionNameAsString()); 2260 } 2261 2262 triggerFlushInPrimaryRegion(r); 2263 2264 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2265 } 2266 2267 @Override 2268 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2269 TransitionCode code = context.getCode(); 2270 long openSeqNum = context.getOpenSeqNum(); 2271 long masterSystemTime = context.getMasterSystemTime(); 2272 RegionInfo[] hris = context.getHris(); 2273 2274 if (TEST_SKIP_REPORTING_TRANSITION) { 2275 // This is for testing only in case there is no master 2276 // to handle the region transition report at all. 2277 if (code == TransitionCode.OPENED) { 2278 Preconditions.checkArgument(hris != null && hris.length == 1); 2279 if (hris[0].isMetaRegion()) { 2280 try { 2281 MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, 2282 hris[0].getReplicaId(),State.OPEN); 2283 } catch (KeeperException e) { 2284 LOG.info("Failed to update meta location", e); 2285 return false; 2286 } 2287 } else { 2288 try { 2289 MetaTableAccessor.updateRegionLocation(clusterConnection, 2290 hris[0], serverName, openSeqNum, masterSystemTime); 2291 } catch (IOException e) { 2292 LOG.info("Failed to update meta", e); 2293 return false; 2294 } 2295 } 2296 } 2297 return true; 2298 } 2299 2300 ReportRegionStateTransitionRequest.Builder builder = 2301 ReportRegionStateTransitionRequest.newBuilder(); 2302 builder.setServer(ProtobufUtil.toServerName(serverName)); 2303 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2304 transition.setTransitionCode(code); 2305 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2306 transition.setOpenSeqNum(openSeqNum); 2307 } 2308 for (RegionInfo hri: hris) { 2309 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2310 } 2311 ReportRegionStateTransitionRequest request = builder.build(); 2312 int tries = 0; 2313 long pauseTime = INIT_PAUSE_TIME_MS; 2314 // Keep looping till we get an error. We want to send reports even though server is going down. 2315 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2316 // HRegionServer does down. 2317 while (this.clusterConnection != null && !this.clusterConnection.isClosed()) { 2318 RegionServerStatusService.BlockingInterface rss = rssStub; 2319 try { 2320 if (rss == null) { 2321 createRegionServerStatusStub(); 2322 continue; 2323 } 2324 ReportRegionStateTransitionResponse response = 2325 rss.reportRegionStateTransition(null, request); 2326 if (response.hasErrorMessage()) { 2327 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2328 break; 2329 } 2330 // Log if we had to retry else don't log unless TRACE. We want to 2331 // know if were successful after an attempt showed in logs as failed. 2332 if (tries > 0 || LOG.isTraceEnabled()) { 2333 LOG.info("TRANSITION REPORTED " + request); 2334 } 2335 // NOTE: Return mid-method!!! 2336 return true; 2337 } catch (ServiceException se) { 2338 IOException ioe = ProtobufUtil.getRemoteException(se); 2339 boolean pause = 2340 ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException 2341 || ioe instanceof CallQueueTooBigException; 2342 if (pause) { 2343 // Do backoff else we flood the Master with requests. 2344 pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries); 2345 } else { 2346 pauseTime = INIT_PAUSE_TIME_MS; // Reset. 2347 } 2348 LOG.info("Failed report transition " + 2349 TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" + 2350 (pause? 2351 " after " + pauseTime + "ms delay (Master is coming online...).": 2352 " immediately."), 2353 ioe); 2354 if (pause) Threads.sleep(pauseTime); 2355 tries++; 2356 if (rssStub == rss) { 2357 rssStub = null; 2358 } 2359 } 2360 } 2361 return false; 2362 } 2363 2364 /** 2365 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2366 * block this thread. See RegionReplicaFlushHandler for details. 2367 */ 2368 void triggerFlushInPrimaryRegion(final HRegion region) { 2369 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2370 return; 2371 } 2372 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) || 2373 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled( 2374 region.conf)) { 2375 region.setReadsEnabled(true); 2376 return; 2377 } 2378 2379 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2380 // RegionReplicaFlushHandler might reset this. 2381 2382 // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2383 if (this.executorService != null) { 2384 this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, 2385 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); 2386 } 2387 } 2388 2389 @Override 2390 public RpcServerInterface getRpcServer() { 2391 return rpcServices.rpcServer; 2392 } 2393 2394 @VisibleForTesting 2395 public RSRpcServices getRSRpcServices() { 2396 return rpcServices; 2397 } 2398 2399 /** 2400 * Cause the server to exit without closing the regions it is serving, the log 2401 * it is using and without notifying the master. Used unit testing and on 2402 * catastrophic events such as HDFS is yanked out from under hbase or we OOME. 2403 * 2404 * @param reason 2405 * the reason we are aborting 2406 * @param cause 2407 * the exception that caused the abort, or null 2408 */ 2409 @Override 2410 public void abort(String reason, Throwable cause) { 2411 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2412 if (cause != null) { 2413 LOG.error(HBaseMarkers.FATAL, msg, cause); 2414 } else { 2415 LOG.error(HBaseMarkers.FATAL, msg); 2416 } 2417 setAbortRequested(); 2418 // HBASE-4014: show list of coprocessors that were loaded to help debug 2419 // regionserver crashes.Note that we're implicitly using 2420 // java.util.HashSet's toString() method to print the coprocessor names. 2421 LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " + 2422 CoprocessorHost.getLoadedCoprocessors()); 2423 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2424 try { 2425 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2426 } catch (MalformedObjectNameException | IOException e) { 2427 LOG.warn("Failed dumping metrics", e); 2428 } 2429 2430 // Do our best to report our abort to the master, but this may not work 2431 try { 2432 if (cause != null) { 2433 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2434 } 2435 // Report to the master but only if we have already registered with the master. 2436 if (rssStub != null && this.serverName != null) { 2437 ReportRSFatalErrorRequest.Builder builder = 2438 ReportRSFatalErrorRequest.newBuilder(); 2439 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2440 builder.setErrorMessage(msg); 2441 rssStub.reportRSFatalError(null, builder.build()); 2442 } 2443 } catch (Throwable t) { 2444 LOG.warn("Unable to report fatal error to master", t); 2445 } 2446 // shutdown should be run as the internal user 2447 stop(reason, true, null); 2448 } 2449 2450 protected final void setAbortRequested() { 2451 this.abortRequested = true; 2452 } 2453 2454 /** 2455 * @see HRegionServer#abort(String, Throwable) 2456 */ 2457 public void abort(String reason) { 2458 abort(reason, null); 2459 } 2460 2461 @Override 2462 public boolean isAborted() { 2463 return this.abortRequested; 2464 } 2465 2466 /* 2467 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup 2468 * logs but it does close socket in case want to bring up server on old 2469 * hostname+port immediately. 2470 */ 2471 @VisibleForTesting 2472 protected void kill() { 2473 this.killed = true; 2474 abort("Simulated kill"); 2475 } 2476 2477 /** 2478 * Called on stop/abort before closing the cluster connection and meta locator. 2479 */ 2480 protected void sendShutdownInterrupt() { 2481 } 2482 2483 /** 2484 * Wait on all threads to finish. Presumption is that all closes and stops 2485 * have already been called. 2486 */ 2487 protected void stopServiceThreads() { 2488 // clean up the scheduled chores 2489 if (this.choreService != null) { 2490 choreService.cancelChore(nonceManagerChore); 2491 choreService.cancelChore(compactionChecker); 2492 choreService.cancelChore(periodicFlusher); 2493 choreService.cancelChore(healthCheckChore); 2494 choreService.cancelChore(storefileRefresher); 2495 choreService.cancelChore(movedRegionsCleaner); 2496 choreService.cancelChore(fsUtilizationChore); 2497 // clean up the remaining scheduled chores (in case we missed out any) 2498 choreService.shutdown(); 2499 } 2500 2501 if (this.cacheFlusher != null) { 2502 this.cacheFlusher.join(); 2503 } 2504 2505 if (this.spanReceiverHost != null) { 2506 this.spanReceiverHost.closeReceivers(); 2507 } 2508 if (this.walRoller != null) { 2509 this.walRoller.close(); 2510 } 2511 if (this.compactSplitThread != null) { 2512 this.compactSplitThread.join(); 2513 } 2514 if (this.executorService != null) this.executorService.shutdown(); 2515 if (this.replicationSourceHandler != null && 2516 this.replicationSourceHandler == this.replicationSinkHandler) { 2517 this.replicationSourceHandler.stopReplicationService(); 2518 } else { 2519 if (this.replicationSourceHandler != null) { 2520 this.replicationSourceHandler.stopReplicationService(); 2521 } 2522 if (this.replicationSinkHandler != null) { 2523 this.replicationSinkHandler.stopReplicationService(); 2524 } 2525 } 2526 } 2527 2528 /** 2529 * @return Return the object that implements the replication 2530 * source executorService. 2531 */ 2532 @VisibleForTesting 2533 public ReplicationSourceService getReplicationSourceService() { 2534 return replicationSourceHandler; 2535 } 2536 2537 /** 2538 * @return Return the object that implements the replication 2539 * sink executorService. 2540 */ 2541 ReplicationSinkService getReplicationSinkService() { 2542 return replicationSinkHandler; 2543 } 2544 2545 /** 2546 * Get the current master from ZooKeeper and open the RPC connection to it. 2547 * To get a fresh connection, the current rssStub must be null. 2548 * Method will block until a master is available. You can break from this 2549 * block by requesting the server stop. 2550 * 2551 * @return master + port, or null if server has been stopped 2552 */ 2553 @VisibleForTesting 2554 protected synchronized ServerName createRegionServerStatusStub() { 2555 // Create RS stub without refreshing the master node from ZK, use cached data 2556 return createRegionServerStatusStub(false); 2557 } 2558 2559 /** 2560 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2561 * connection, the current rssStub must be null. Method will block until a master is available. 2562 * You can break from this block by requesting the server stop. 2563 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2564 * @return master + port, or null if server has been stopped 2565 */ 2566 @VisibleForTesting 2567 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2568 if (rssStub != null) { 2569 return masterAddressTracker.getMasterAddress(); 2570 } 2571 ServerName sn = null; 2572 long previousLogTime = 0; 2573 RegionServerStatusService.BlockingInterface intRssStub = null; 2574 LockService.BlockingInterface intLockStub = null; 2575 boolean interrupted = false; 2576 try { 2577 while (keepLooping()) { 2578 sn = this.masterAddressTracker.getMasterAddress(refresh); 2579 if (sn == null) { 2580 if (!keepLooping()) { 2581 // give up with no connection. 2582 LOG.debug("No master found and cluster is stopped; bailing out"); 2583 return null; 2584 } 2585 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 2586 LOG.debug("No master found; retry"); 2587 previousLogTime = System.currentTimeMillis(); 2588 } 2589 refresh = true; // let's try pull it from ZK directly 2590 if (sleep(200)) { 2591 interrupted = true; 2592 } 2593 continue; 2594 } 2595 2596 // If we are on the active master, use the shortcut 2597 if (this instanceof HMaster && sn.equals(getServerName())) { 2598 intRssStub = ((HMaster)this).getMasterRpcServices(); 2599 intLockStub = ((HMaster)this).getMasterRpcServices(); 2600 break; 2601 } 2602 try { 2603 BlockingRpcChannel channel = 2604 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), 2605 shortOperationTimeout); 2606 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2607 intLockStub = LockService.newBlockingStub(channel); 2608 break; 2609 } catch (IOException e) { 2610 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 2611 e = e instanceof RemoteException ? 2612 ((RemoteException)e).unwrapRemoteException() : e; 2613 if (e instanceof ServerNotRunningYetException) { 2614 LOG.info("Master isn't available yet, retrying"); 2615 } else { 2616 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2617 } 2618 previousLogTime = System.currentTimeMillis(); 2619 } 2620 if (sleep(200)) { 2621 interrupted = true; 2622 } 2623 } 2624 } 2625 } finally { 2626 if (interrupted) { 2627 Thread.currentThread().interrupt(); 2628 } 2629 } 2630 this.rssStub = intRssStub; 2631 this.lockStub = intLockStub; 2632 return sn; 2633 } 2634 2635 /** 2636 * @return True if we should break loop because cluster is going down or 2637 * this server has been stopped or hdfs has gone bad. 2638 */ 2639 private boolean keepLooping() { 2640 return !this.stopped && isClusterUp(); 2641 } 2642 2643 /* 2644 * Let the master know we're here Run initialization using parameters passed 2645 * us by the master. 2646 * @return A Map of key/value configurations we got from the Master else 2647 * null if we failed to register. 2648 * @throws IOException 2649 */ 2650 private RegionServerStartupResponse reportForDuty() throws IOException { 2651 if (this.masterless) return RegionServerStartupResponse.getDefaultInstance(); 2652 ServerName masterServerName = createRegionServerStatusStub(true); 2653 if (masterServerName == null) return null; 2654 RegionServerStartupResponse result = null; 2655 try { 2656 rpcServices.requestCount.reset(); 2657 rpcServices.rpcGetRequestCount.reset(); 2658 rpcServices.rpcScanRequestCount.reset(); 2659 rpcServices.rpcMultiRequestCount.reset(); 2660 rpcServices.rpcMutateRequestCount.reset(); 2661 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2662 + rpcServices.isa.getPort() + ", startcode=" + this.startcode); 2663 long now = EnvironmentEdgeManager.currentTime(); 2664 int port = rpcServices.isa.getPort(); 2665 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2666 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2667 request.setUseThisHostnameInstead(useThisHostnameInstead); 2668 } 2669 request.setPort(port); 2670 request.setServerStartCode(this.startcode); 2671 request.setServerCurrentTime(now); 2672 result = this.rssStub.regionServerStartup(null, request.build()); 2673 } catch (ServiceException se) { 2674 IOException ioe = ProtobufUtil.getRemoteException(se); 2675 if (ioe instanceof ClockOutOfSyncException) { 2676 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", 2677 ioe); 2678 // Re-throw IOE will cause RS to abort 2679 throw ioe; 2680 } else if (ioe instanceof ServerNotRunningYetException) { 2681 LOG.debug("Master is not running yet"); 2682 } else { 2683 LOG.warn("error telling master we are up", se); 2684 } 2685 rssStub = null; 2686 } 2687 return result; 2688 } 2689 2690 @Override 2691 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2692 try { 2693 GetLastFlushedSequenceIdRequest req = 2694 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2695 RegionServerStatusService.BlockingInterface rss = rssStub; 2696 if (rss == null) { // Try to connect one more time 2697 createRegionServerStatusStub(); 2698 rss = rssStub; 2699 if (rss == null) { 2700 // Still no luck, we tried 2701 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2702 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2703 .build(); 2704 } 2705 } 2706 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2707 return RegionStoreSequenceIds.newBuilder() 2708 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2709 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2710 } catch (ServiceException e) { 2711 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2712 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2713 .build(); 2714 } 2715 } 2716 2717 /** 2718 * Closes all regions. Called on our way out. 2719 * Assumes that its not possible for new regions to be added to onlineRegions 2720 * while this method runs. 2721 */ 2722 protected void closeAllRegions(final boolean abort) { 2723 closeUserRegions(abort); 2724 closeMetaTableRegions(abort); 2725 } 2726 2727 /** 2728 * Close meta region if we carry it 2729 * @param abort Whether we're running an abort. 2730 */ 2731 void closeMetaTableRegions(final boolean abort) { 2732 HRegion meta = null; 2733 this.lock.writeLock().lock(); 2734 try { 2735 for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) { 2736 RegionInfo hri = e.getValue().getRegionInfo(); 2737 if (hri.isMetaRegion()) { 2738 meta = e.getValue(); 2739 } 2740 if (meta != null) break; 2741 } 2742 } finally { 2743 this.lock.writeLock().unlock(); 2744 } 2745 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2746 } 2747 2748 /** 2749 * Schedule closes on all user regions. 2750 * Should be safe calling multiple times because it wont' close regions 2751 * that are already closed or that are closing. 2752 * @param abort Whether we're running an abort. 2753 */ 2754 void closeUserRegions(final boolean abort) { 2755 this.lock.writeLock().lock(); 2756 try { 2757 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 2758 HRegion r = e.getValue(); 2759 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2760 // Don't update zk with this close transition; pass false. 2761 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2762 } 2763 } 2764 } finally { 2765 this.lock.writeLock().unlock(); 2766 } 2767 } 2768 2769 /** @return the info server */ 2770 public InfoServer getInfoServer() { 2771 return infoServer; 2772 } 2773 2774 /** 2775 * @return true if a stop has been requested. 2776 */ 2777 @Override 2778 public boolean isStopped() { 2779 return this.stopped; 2780 } 2781 2782 @Override 2783 public boolean isStopping() { 2784 return this.stopping; 2785 } 2786 2787 /** 2788 * 2789 * @return the configuration 2790 */ 2791 @Override 2792 public Configuration getConfiguration() { 2793 return conf; 2794 } 2795 2796 /** @return the write lock for the server */ 2797 ReentrantReadWriteLock.WriteLock getWriteLock() { 2798 return lock.writeLock(); 2799 } 2800 2801 public int getNumberOfOnlineRegions() { 2802 return this.onlineRegions.size(); 2803 } 2804 2805 boolean isOnlineRegionsEmpty() { 2806 return this.onlineRegions.isEmpty(); 2807 } 2808 2809 /** 2810 * For tests, web ui and metrics. 2811 * This method will only work if HRegionServer is in the same JVM as client; 2812 * HRegion cannot be serialized to cross an rpc. 2813 */ 2814 public Collection<HRegion> getOnlineRegionsLocalContext() { 2815 Collection<HRegion> regions = this.onlineRegions.values(); 2816 return Collections.unmodifiableCollection(regions); 2817 } 2818 2819 @Override 2820 public void addRegion(HRegion region) { 2821 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2822 configurationManager.registerObserver(region); 2823 } 2824 2825 /** 2826 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2827 * the biggest. If two regions are the same size, then the last one found wins; i.e. this 2828 * method may NOT return all regions. 2829 */ 2830 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2831 // we'll sort the regions in reverse 2832 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>( 2833 new Comparator<Long>() { 2834 @Override 2835 public int compare(Long a, Long b) { 2836 return -1 * a.compareTo(b); 2837 } 2838 }); 2839 // Copy over all regions. Regions are sorted by size with biggest first. 2840 for (HRegion region : this.onlineRegions.values()) { 2841 sortedRegions.put(region.getMemStoreOffHeapSize(), region); 2842 } 2843 return sortedRegions; 2844 } 2845 2846 /** 2847 * @return A new Map of online regions sorted by region heap size with the first entry being the 2848 * biggest. If two regions are the same size, then the last one found wins; i.e. this method 2849 * may NOT return all regions. 2850 */ 2851 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2852 // we'll sort the regions in reverse 2853 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>( 2854 new Comparator<Long>() { 2855 @Override 2856 public int compare(Long a, Long b) { 2857 return -1 * a.compareTo(b); 2858 } 2859 }); 2860 // Copy over all regions. Regions are sorted by size with biggest first. 2861 for (HRegion region : this.onlineRegions.values()) { 2862 sortedRegions.put(region.getMemStoreHeapSize(), region); 2863 } 2864 return sortedRegions; 2865 } 2866 2867 /** 2868 * @return time stamp in millis of when this region server was started 2869 */ 2870 public long getStartcode() { 2871 return this.startcode; 2872 } 2873 2874 /** @return reference to FlushRequester */ 2875 @Override 2876 public FlushRequester getFlushRequester() { 2877 return this.cacheFlusher; 2878 } 2879 2880 @Override 2881 public CompactionRequester getCompactionRequestor() { 2882 return this.compactSplitThread; 2883 } 2884 2885 @Override 2886 public Leases getLeases() { 2887 return leases; 2888 } 2889 2890 /** 2891 * @return Return the rootDir. 2892 */ 2893 protected Path getRootDir() { 2894 return rootDir; 2895 } 2896 2897 /** 2898 * @return Return the fs. 2899 */ 2900 @Override 2901 public FileSystem getFileSystem() { 2902 return fs; 2903 } 2904 2905 /** 2906 * @return Return the walRootDir. 2907 */ 2908 public Path getWALRootDir() { 2909 return walRootDir; 2910 } 2911 2912 /** 2913 * @return Return the walFs. 2914 */ 2915 public FileSystem getWALFileSystem() { 2916 return walFs; 2917 } 2918 2919 @Override 2920 public String toString() { 2921 return getServerName().toString(); 2922 } 2923 2924 /** 2925 * Interval at which threads should run 2926 * 2927 * @return the interval 2928 */ 2929 public int getThreadWakeFrequency() { 2930 return threadWakeFrequency; 2931 } 2932 2933 @Override 2934 public ZKWatcher getZooKeeper() { 2935 return zooKeeper; 2936 } 2937 2938 @Override 2939 public CoordinatedStateManager getCoordinatedStateManager() { 2940 return csm; 2941 } 2942 2943 @Override 2944 public ServerName getServerName() { 2945 return serverName; 2946 } 2947 2948 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){ 2949 return this.rsHost; 2950 } 2951 2952 @Override 2953 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2954 return this.regionsInTransitionInRS; 2955 } 2956 2957 @Override 2958 public ExecutorService getExecutorService() { 2959 return executorService; 2960 } 2961 2962 @Override 2963 public ChoreService getChoreService() { 2964 return choreService; 2965 } 2966 2967 @Override 2968 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 2969 return rsQuotaManager; 2970 } 2971 2972 // 2973 // Main program and support routines 2974 // 2975 /** 2976 * Load the replication executorService objects, if any 2977 */ 2978 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 2979 FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { 2980 if ((server instanceof HMaster) && 2981 (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { 2982 return; 2983 } 2984 2985 // read in the name of the source replication class from the config file. 2986 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 2987 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2988 2989 // read in the name of the sink replication class from the config file. 2990 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 2991 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2992 2993 // If both the sink and the source class names are the same, then instantiate 2994 // only one object. 2995 if (sourceClassname.equals(sinkClassname)) { 2996 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2997 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 2998 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 2999 } else { 3000 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3001 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 3002 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 3003 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 3004 } 3005 } 3006 3007 private static <T extends ReplicationService> T newReplicationInstance(String classname, 3008 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 3009 Path oldLogDir, WALProvider walProvider) throws IOException { 3010 Class<? extends T> clazz = null; 3011 try { 3012 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 3013 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 3014 } catch (java.lang.ClassNotFoundException nfe) { 3015 throw new IOException("Could not find class for " + classname); 3016 } 3017 T service = ReflectionUtils.newInstance(clazz, conf); 3018 service.initialize(server, walFs, logDir, oldLogDir, walProvider); 3019 return service; 3020 } 3021 3022 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){ 3023 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 3024 if(!this.isOnline()){ 3025 return walGroupsReplicationStatus; 3026 } 3027 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 3028 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 3029 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 3030 for(ReplicationSourceInterface source: allSources){ 3031 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 3032 } 3033 return walGroupsReplicationStatus; 3034 } 3035 3036 /** 3037 * Utility for constructing an instance of the passed HRegionServer class. 3038 * 3039 * @param regionServerClass 3040 * @param conf2 3041 * @return HRegionServer instance. 3042 */ 3043 public static HRegionServer constructRegionServer( 3044 Class<? extends HRegionServer> regionServerClass, 3045 final Configuration conf2) { 3046 try { 3047 Constructor<? extends HRegionServer> c = regionServerClass 3048 .getConstructor(Configuration.class); 3049 return c.newInstance(conf2); 3050 } catch (Exception e) { 3051 throw new RuntimeException("Failed construction of " + "Regionserver: " 3052 + regionServerClass.toString(), e); 3053 } 3054 } 3055 3056 /** 3057 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 3058 */ 3059 public static void main(String[] args) throws Exception { 3060 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 3061 VersionInfo.logVersion(); 3062 Configuration conf = HBaseConfiguration.create(); 3063 @SuppressWarnings("unchecked") 3064 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 3065 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 3066 3067 new HRegionServerCommandLine(regionServerClass).doMain(args); 3068 } 3069 3070 /** 3071 * Gets the online regions of the specified table. 3072 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>. 3073 * Only returns <em>online</em> regions. If a region on this table has been 3074 * closed during a disable, etc., it will not be included in the returned list. 3075 * So, the returned list may not necessarily be ALL regions in this table, its 3076 * all the ONLINE regions in the table. 3077 * @param tableName 3078 * @return Online regions from <code>tableName</code> 3079 */ 3080 @Override 3081 public List<HRegion> getRegions(TableName tableName) { 3082 List<HRegion> tableRegions = new ArrayList<>(); 3083 synchronized (this.onlineRegions) { 3084 for (HRegion region: this.onlineRegions.values()) { 3085 RegionInfo regionInfo = region.getRegionInfo(); 3086 if(regionInfo.getTable().equals(tableName)) { 3087 tableRegions.add(region); 3088 } 3089 } 3090 } 3091 return tableRegions; 3092 } 3093 3094 @Override 3095 public List<HRegion> getRegions() { 3096 List<HRegion> allRegions = new ArrayList<>(); 3097 synchronized (this.onlineRegions) { 3098 // Return a clone copy of the onlineRegions 3099 allRegions.addAll(onlineRegions.values()); 3100 } 3101 return allRegions; 3102 } 3103 3104 /** 3105 * Gets the online tables in this RS. 3106 * This method looks at the in-memory onlineRegions. 3107 * @return all the online tables in this RS 3108 */ 3109 public Set<TableName> getOnlineTables() { 3110 Set<TableName> tables = new HashSet<>(); 3111 synchronized (this.onlineRegions) { 3112 for (Region region: this.onlineRegions.values()) { 3113 tables.add(region.getTableDescriptor().getTableName()); 3114 } 3115 } 3116 return tables; 3117 } 3118 3119 // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). 3120 public String[] getRegionServerCoprocessors() { 3121 TreeSet<String> coprocessors = new TreeSet<>(); 3122 try { 3123 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3124 } catch (IOException exception) { 3125 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " + 3126 "skipping."); 3127 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3128 } 3129 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3130 for (HRegion region: regions) { 3131 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3132 try { 3133 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3134 } catch (IOException exception) { 3135 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region + 3136 "; skipping."); 3137 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3138 } 3139 } 3140 coprocessors.addAll(rsHost.getCoprocessors()); 3141 return coprocessors.toArray(new String[coprocessors.size()]); 3142 } 3143 3144 /** 3145 * Try to close the region, logs a warning on failure but continues. 3146 * @param region Region to close 3147 */ 3148 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3149 try { 3150 if (!closeRegion(region.getEncodedName(), abort, null)) { 3151 LOG.warn("Failed to close " + region.getRegionNameAsString() + 3152 " - ignoring and continuing"); 3153 } 3154 } catch (IOException e) { 3155 LOG.warn("Failed to close " + region.getRegionNameAsString() + 3156 " - ignoring and continuing", e); 3157 } 3158 } 3159 3160 /** 3161 * Close asynchronously a region, can be called from the master or internally by the regionserver 3162 * when stopping. If called from the master, the region will update the znode status. 3163 * 3164 * <p> 3165 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3166 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3167 * </p> 3168 3169 * <p> 3170 * If a close was in progress, this new request will be ignored, and an exception thrown. 3171 * </p> 3172 * 3173 * @param encodedName Region to close 3174 * @param abort True if we are aborting 3175 * @return True if closed a region. 3176 * @throws NotServingRegionException if the region is not online 3177 */ 3178 protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn) 3179 throws NotServingRegionException { 3180 //Check for permissions to close. 3181 HRegion actualRegion = this.getRegion(encodedName); 3182 // Can be null if we're calling close on a region that's not online 3183 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3184 try { 3185 actualRegion.getCoprocessorHost().preClose(false); 3186 } catch (IOException exp) { 3187 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3188 return false; 3189 } 3190 } 3191 3192 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), 3193 Boolean.FALSE); 3194 3195 if (Boolean.TRUE.equals(previous)) { 3196 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " + 3197 "trying to OPEN. Cancelling OPENING."); 3198 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3199 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3200 // We're going to try to do a standard close then. 3201 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." + 3202 " Doing a standard close now"); 3203 return closeRegion(encodedName, abort, sn); 3204 } 3205 // Let's get the region from the online region list again 3206 actualRegion = this.getRegion(encodedName); 3207 if (actualRegion == null) { // If already online, we still need to close it. 3208 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3209 // The master deletes the znode when it receives this exception. 3210 throw new NotServingRegionException("The region " + encodedName + 3211 " was opening but not yet served. Opening is cancelled."); 3212 } 3213 } else if (Boolean.FALSE.equals(previous)) { 3214 LOG.info("Received CLOSE for the region: " + encodedName + 3215 ", which we are already trying to CLOSE, but not completed yet"); 3216 return true; 3217 } 3218 3219 if (actualRegion == null) { 3220 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3221 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3222 // The master deletes the znode when it receives this exception. 3223 throw new NotServingRegionException("The region " + encodedName + 3224 " is not online, and is not opening."); 3225 } 3226 3227 CloseRegionHandler crh; 3228 final RegionInfo hri = actualRegion.getRegionInfo(); 3229 if (hri.isMetaRegion()) { 3230 crh = new CloseMetaHandler(this, this, hri, abort); 3231 } else { 3232 crh = new CloseRegionHandler(this, this, hri, abort, sn); 3233 } 3234 this.executorService.submit(crh); 3235 return true; 3236 } 3237 3238 /** 3239 * Close and offline the region for split or merge 3240 * 3241 * @param regionEncodedName the name of the region(s) to close 3242 * @return true if closed the region successfully. 3243 * @throws IOException 3244 */ 3245 protected boolean closeAndOfflineRegionForSplitOrMerge(final List<String> regionEncodedName) 3246 throws IOException { 3247 for (int i = 0; i < regionEncodedName.size(); ++i) { 3248 HRegion regionToClose = this.getRegion(regionEncodedName.get(i)); 3249 if (regionToClose != null) { 3250 Map<byte[], List<HStoreFile>> hstoreFiles = null; 3251 Exception exceptionToThrow = null; 3252 try { 3253 hstoreFiles = regionToClose.close(false); 3254 } catch (Exception e) { 3255 exceptionToThrow = e; 3256 } 3257 if (exceptionToThrow == null && hstoreFiles == null) { 3258 // The region was closed by someone else 3259 exceptionToThrow = 3260 new IOException("Failed to close region: already closed by another thread"); 3261 } 3262 if (exceptionToThrow != null) { 3263 if (exceptionToThrow instanceof IOException) { 3264 throw (IOException) exceptionToThrow; 3265 } 3266 throw new IOException(exceptionToThrow); 3267 } 3268 // Offline the region 3269 this.removeRegion(regionToClose, null); 3270 } 3271 } 3272 return true; 3273 } 3274 3275 /** 3276 * @return HRegion for the passed binary <code>regionName</code> or null if 3277 * named region is not member of the online regions. 3278 */ 3279 public HRegion getOnlineRegion(final byte[] regionName) { 3280 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3281 return this.onlineRegions.get(encodedRegionName); 3282 } 3283 3284 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) { 3285 return this.regionFavoredNodesMap.get(encodedRegionName); 3286 } 3287 3288 @Override 3289 public HRegion getRegion(final String encodedRegionName) { 3290 return this.onlineRegions.get(encodedRegionName); 3291 } 3292 3293 3294 @Override 3295 public boolean removeRegion(final HRegion r, ServerName destination) { 3296 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3297 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3298 if (destination != null) { 3299 long closeSeqNum = r.getMaxFlushedSeqId(); 3300 if (closeSeqNum == HConstants.NO_SEQNUM) { 3301 // No edits in WAL for this region; get the sequence number when the region was opened. 3302 closeSeqNum = r.getOpenSeqNum(); 3303 if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0; 3304 } 3305 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum); 3306 } 3307 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3308 return toReturn != null; 3309 } 3310 3311 /** 3312 * Protected Utility method for safely obtaining an HRegion handle. 3313 * 3314 * @param regionName 3315 * Name of online {@link HRegion} to return 3316 * @return {@link HRegion} for <code>regionName</code> 3317 * @throws NotServingRegionException 3318 */ 3319 protected HRegion getRegion(final byte[] regionName) 3320 throws NotServingRegionException { 3321 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3322 return getRegionByEncodedName(regionName, encodedRegionName); 3323 } 3324 3325 public HRegion getRegionByEncodedName(String encodedRegionName) 3326 throws NotServingRegionException { 3327 return getRegionByEncodedName(null, encodedRegionName); 3328 } 3329 3330 protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3331 throws NotServingRegionException { 3332 HRegion region = this.onlineRegions.get(encodedRegionName); 3333 if (region == null) { 3334 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3335 if (moveInfo != null) { 3336 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3337 } 3338 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3339 String regionNameStr = regionName == null? 3340 encodedRegionName: Bytes.toStringBinary(regionName); 3341 if (isOpening != null && isOpening.booleanValue()) { 3342 throw new RegionOpeningException("Region " + regionNameStr + 3343 " is opening on " + this.serverName); 3344 } 3345 throw new NotServingRegionException("" + regionNameStr + 3346 " is not online on " + this.serverName); 3347 } 3348 return region; 3349 } 3350 3351 /* 3352 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to 3353 * IOE if it isn't already. 3354 * 3355 * @param t Throwable 3356 * 3357 * @param msg Message to log in error. Can be null. 3358 * 3359 * @return Throwable converted to an IOE; methods can only let out IOEs. 3360 */ 3361 private Throwable cleanup(final Throwable t, final String msg) { 3362 // Don't log as error if NSRE; NSRE is 'normal' operation. 3363 if (t instanceof NotServingRegionException) { 3364 LOG.debug("NotServingRegionException; " + t.getMessage()); 3365 return t; 3366 } 3367 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3368 if (msg == null) { 3369 LOG.error("", e); 3370 } else { 3371 LOG.error(msg, e); 3372 } 3373 if (!rpcServices.checkOOME(t)) { 3374 checkFileSystem(); 3375 } 3376 return t; 3377 } 3378 3379 /* 3380 * @param t 3381 * 3382 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3383 * 3384 * @return Make <code>t</code> an IOE if it isn't already. 3385 */ 3386 protected IOException convertThrowableToIOE(final Throwable t, final String msg) { 3387 return (t instanceof IOException ? (IOException) t : msg == null 3388 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t)); 3389 } 3390 3391 /** 3392 * Checks to see if the file system is still accessible. If not, sets 3393 * abortRequested and stopRequested 3394 * 3395 * @return false if file system is not available 3396 */ 3397 public boolean checkFileSystem() { 3398 if (this.fsOk && this.fs != null) { 3399 try { 3400 FSUtils.checkFileSystemAvailable(this.fs); 3401 } catch (IOException e) { 3402 abort("File System not available", e); 3403 this.fsOk = false; 3404 } 3405 } 3406 return this.fsOk; 3407 } 3408 3409 @Override 3410 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3411 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3412 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()]; 3413 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3414 // it is a map of region name to InetSocketAddress[] 3415 for (int i = 0; i < favoredNodes.size(); i++) { 3416 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(), 3417 favoredNodes.get(i).getPort()); 3418 } 3419 regionFavoredNodesMap.put(encodedRegionName, addr); 3420 } 3421 3422 /** 3423 * Return the favored nodes for a region given its encoded name. Look at the 3424 * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[] 3425 * @param encodedRegionName 3426 * @return array of favored locations 3427 */ 3428 @Override 3429 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3430 return regionFavoredNodesMap.get(encodedRegionName); 3431 } 3432 3433 @Override 3434 public ServerNonceManager getNonceManager() { 3435 return this.nonceManager; 3436 } 3437 3438 private static class MovedRegionInfo { 3439 private final ServerName serverName; 3440 private final long seqNum; 3441 private final long ts; 3442 3443 public MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3444 this.serverName = serverName; 3445 this.seqNum = closeSeqNum; 3446 ts = EnvironmentEdgeManager.currentTime(); 3447 } 3448 3449 public ServerName getServerName() { 3450 return serverName; 3451 } 3452 3453 public long getSeqNum() { 3454 return seqNum; 3455 } 3456 3457 public long getMoveTime() { 3458 return ts; 3459 } 3460 } 3461 3462 // This map will contains all the regions that we closed for a move. 3463 // We add the time it was moved as we don't want to keep too old information 3464 protected Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000); 3465 3466 // We need a timeout. If not there is a risk of giving a wrong information: this would double 3467 // the number of network calls instead of reducing them. 3468 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3469 3470 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) { 3471 if (ServerName.isSameAddress(destination, this.getServerName())) { 3472 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3473 return; 3474 } 3475 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" + 3476 closeSeqNum); 3477 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3478 } 3479 3480 void removeFromMovedRegions(String encodedName) { 3481 movedRegions.remove(encodedName); 3482 } 3483 3484 private MovedRegionInfo getMovedRegion(final String encodedRegionName) { 3485 MovedRegionInfo dest = movedRegions.get(encodedRegionName); 3486 3487 long now = EnvironmentEdgeManager.currentTime(); 3488 if (dest != null) { 3489 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) { 3490 return dest; 3491 } else { 3492 movedRegions.remove(encodedRegionName); 3493 } 3494 } 3495 3496 return null; 3497 } 3498 3499 /** 3500 * Remove the expired entries from the moved regions list. 3501 */ 3502 protected void cleanMovedRegions() { 3503 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED; 3504 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator(); 3505 3506 while (it.hasNext()){ 3507 Map.Entry<String, MovedRegionInfo> e = it.next(); 3508 if (e.getValue().getMoveTime() < cutOff) { 3509 it.remove(); 3510 } 3511 } 3512 } 3513 3514 /* 3515 * Use this to allow tests to override and schedule more frequently. 3516 */ 3517 3518 protected int movedRegionCleanerPeriod() { 3519 return TIMEOUT_REGION_MOVED; 3520 } 3521 3522 /** 3523 * Creates a Chore thread to clean the moved region cache. 3524 */ 3525 protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable { 3526 private HRegionServer regionServer; 3527 Stoppable stoppable; 3528 3529 private MovedRegionsCleaner( 3530 HRegionServer regionServer, Stoppable stoppable){ 3531 super("MovedRegionsCleaner for region " + regionServer, stoppable, 3532 regionServer.movedRegionCleanerPeriod()); 3533 this.regionServer = regionServer; 3534 this.stoppable = stoppable; 3535 } 3536 3537 static MovedRegionsCleaner create(HRegionServer rs){ 3538 Stoppable stoppable = new Stoppable() { 3539 private volatile boolean isStopped = false; 3540 @Override public void stop(String why) { isStopped = true;} 3541 @Override public boolean isStopped() {return isStopped;} 3542 }; 3543 3544 return new MovedRegionsCleaner(rs, stoppable); 3545 } 3546 3547 @Override 3548 protected void chore() { 3549 regionServer.cleanMovedRegions(); 3550 } 3551 3552 @Override 3553 public void stop(String why) { 3554 stoppable.stop(why); 3555 } 3556 3557 @Override 3558 public boolean isStopped() { 3559 return stoppable.isStopped(); 3560 } 3561 } 3562 3563 private String getMyEphemeralNodePath() { 3564 return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString()); 3565 } 3566 3567 private boolean isHealthCheckerConfigured() { 3568 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3569 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3570 } 3571 3572 /** 3573 * @return the underlying {@link CompactSplit} for the servers 3574 */ 3575 public CompactSplit getCompactSplitThread() { 3576 return this.compactSplitThread; 3577 } 3578 3579 public CoprocessorServiceResponse execRegionServerService( 3580 @SuppressWarnings("UnusedParameters") final RpcController controller, 3581 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3582 try { 3583 ServerRpcController serviceController = new ServerRpcController(); 3584 CoprocessorServiceCall call = serviceRequest.getCall(); 3585 String serviceName = call.getServiceName(); 3586 com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName); 3587 if (service == null) { 3588 throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " + 3589 serviceName); 3590 } 3591 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 3592 service.getDescriptorForType(); 3593 3594 String methodName = call.getMethodName(); 3595 com.google.protobuf.Descriptors.MethodDescriptor methodDesc = 3596 serviceDesc.findMethodByName(methodName); 3597 if (methodDesc == null) { 3598 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName + 3599 " called on executorService " + serviceName); 3600 } 3601 3602 com.google.protobuf.Message request = 3603 CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3604 final com.google.protobuf.Message.Builder responseBuilder = 3605 service.getResponsePrototype(methodDesc).newBuilderForType(); 3606 service.callMethod(methodDesc, serviceController, request, 3607 new com.google.protobuf.RpcCallback<com.google.protobuf.Message>() { 3608 @Override 3609 public void run(com.google.protobuf.Message message) { 3610 if (message != null) { 3611 responseBuilder.mergeFrom(message); 3612 } 3613 } 3614 }); 3615 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3616 if (exception != null) { 3617 throw exception; 3618 } 3619 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3620 } catch (IOException ie) { 3621 throw new ServiceException(ie); 3622 } 3623 } 3624 3625 /** 3626 * @return The cache config instance used by the regionserver. 3627 */ 3628 public CacheConfig getCacheConfig() { 3629 return this.cacheConfig; 3630 } 3631 3632 /** 3633 * @return : Returns the ConfigurationManager object for testing purposes. 3634 */ 3635 protected ConfigurationManager getConfigurationManager() { 3636 return configurationManager; 3637 } 3638 3639 /** 3640 * @return Return table descriptors implementation. 3641 */ 3642 public TableDescriptors getTableDescriptors() { 3643 return this.tableDescriptors; 3644 } 3645 3646 /** 3647 * Reload the configuration from disk. 3648 */ 3649 public void updateConfiguration() { 3650 LOG.info("Reloading the configuration from disk."); 3651 // Reload the configuration from disk. 3652 conf.reloadConfiguration(); 3653 configurationManager.notifyAllObservers(conf); 3654 } 3655 3656 public CacheEvictionStats clearRegionBlockCache(Region region) { 3657 BlockCache blockCache = this.getCacheConfig().getBlockCache(); 3658 long evictedBlocks = 0; 3659 3660 for(Store store : region.getStores()) { 3661 for(StoreFile hFile : store.getStorefiles()) { 3662 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3663 } 3664 } 3665 3666 return CacheEvictionStats.builder() 3667 .withEvictedBlocks(evictedBlocks) 3668 .build(); 3669 } 3670 3671 @Override 3672 public double getCompactionPressure() { 3673 double max = 0; 3674 for (Region region : onlineRegions.values()) { 3675 for (Store store : region.getStores()) { 3676 double normCount = store.getCompactionPressure(); 3677 if (normCount > max) { 3678 max = normCount; 3679 } 3680 } 3681 } 3682 return max; 3683 } 3684 3685 @Override 3686 public HeapMemoryManager getHeapMemoryManager() { 3687 return hMemManager; 3688 } 3689 3690 /** 3691 * For testing 3692 * @return whether all wal roll request finished for this regionserver 3693 */ 3694 @VisibleForTesting 3695 public boolean walRollRequestFinished() { 3696 return this.walRoller.walRollFinished(); 3697 } 3698 3699 @Override 3700 public ThroughputController getFlushThroughputController() { 3701 return flushThroughputController; 3702 } 3703 3704 @Override 3705 public double getFlushPressure() { 3706 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3707 // return 0 during RS initialization 3708 return 0.0; 3709 } 3710 return getRegionServerAccounting().getFlushPressure(); 3711 } 3712 3713 @Override 3714 public void onConfigurationChange(Configuration newConf) { 3715 ThroughputController old = this.flushThroughputController; 3716 if (old != null) { 3717 old.stop("configuration change"); 3718 } 3719 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3720 try { 3721 Superusers.initialize(newConf); 3722 } catch (IOException e) { 3723 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3724 } 3725 } 3726 3727 @Override 3728 public MetricsRegionServer getMetrics() { 3729 return metricsRegionServer; 3730 } 3731 3732 @Override 3733 public SecureBulkLoadManager getSecureBulkLoadManager() { 3734 return this.secureBulkLoadManager; 3735 } 3736 3737 @Override 3738 public EntityLock regionLock(List<RegionInfo> regionInfos, String description, 3739 Abortable abort) throws IOException { 3740 return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) 3741 .regionLock(regionInfos, description, abort); 3742 } 3743 3744 @Override 3745 public void unassign(byte[] regionName) throws IOException { 3746 clusterConnection.getAdmin().unassign(regionName, false); 3747 } 3748 3749 @Override 3750 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3751 return this.rsSpaceQuotaManager; 3752 } 3753 3754 @Override 3755 public boolean reportFileArchivalForQuotas(TableName tableName, 3756 Collection<Entry<String, Long>> archivedFiles) { 3757 RegionServerStatusService.BlockingInterface rss = rssStub; 3758 if (rss == null || rsSpaceQuotaManager == null) { 3759 // the current server could be stopping. 3760 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3761 return false; 3762 } 3763 try { 3764 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3765 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3766 rss.reportFileArchival(null, request); 3767 } catch (ServiceException se) { 3768 IOException ioe = ProtobufUtil.getRemoteException(se); 3769 if (ioe instanceof PleaseHoldException) { 3770 if (LOG.isTraceEnabled()) { 3771 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3772 + " This will be retried.", ioe); 3773 } 3774 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3775 return false; 3776 } 3777 if (rssStub == rss) { 3778 rssStub = null; 3779 } 3780 // re-create the stub if we failed to report the archival 3781 createRegionServerStatusStub(true); 3782 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3783 return false; 3784 } 3785 return true; 3786 } 3787 3788 public NettyEventLoopGroupConfig getEventLoopGroupConfig() { 3789 return eventLoopGroupConfig; 3790 } 3791 3792 @Override 3793 public Connection createConnection(Configuration conf) throws IOException { 3794 User user = UserProvider.instantiate(conf).getCurrent(); 3795 return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, 3796 this.rpcServices, this.rpcServices); 3797 } 3798 3799 public void executeProcedure(long procId, RSProcedureCallable callable) { 3800 executorService.submit(new RSProcedureHandler(this, procId, callable)); 3801 } 3802 3803 public void remoteProcedureComplete(long procId, Throwable error) { 3804 procedureResultReporter.complete(procId, error); 3805 } 3806 3807 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3808 RegionServerStatusService.BlockingInterface rss = rssStub; 3809 for (;;) { 3810 rss = rssStub; 3811 if (rss != null) { 3812 break; 3813 } 3814 createRegionServerStatusStub(); 3815 } 3816 try { 3817 rss.reportProcedureDone(null, request); 3818 } catch (ServiceException se) { 3819 if (rssStub == rss) { 3820 rssStub = null; 3821 } 3822 throw ProtobufUtil.getRemoteException(se); 3823 } 3824 } 3825 3826 public boolean isShutDown() { 3827 return shutDown; 3828 } 3829 3830 /** 3831 * Force to terminate region server when abort timeout. 3832 */ 3833 private static class SystemExitWhenAbortTimeout extends TimerTask { 3834 3835 public SystemExitWhenAbortTimeout() { 3836 3837 } 3838 3839 @Override 3840 public void run() { 3841 LOG.warn("Aborting region server timed out, terminating forcibly" + 3842 " and does not wait for any running shutdown hooks or finalizers to finish their work." + 3843 " Thread dump to stdout."); 3844 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3845 Runtime.getRuntime().halt(1); 3846 } 3847 } 3848}