001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.Thread.UncaughtExceptionHandler; 022import java.lang.management.MemoryType; 023import java.lang.management.MemoryUsage; 024import java.lang.reflect.Constructor; 025import java.net.BindException; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.time.Duration; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.Comparator; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.Objects; 039import java.util.Set; 040import java.util.SortedMap; 041import java.util.Timer; 042import java.util.TimerTask; 043import java.util.TreeMap; 044import java.util.TreeSet; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ConcurrentMap; 047import java.util.concurrent.ConcurrentSkipListMap; 048import java.util.concurrent.atomic.AtomicBoolean; 049import java.util.concurrent.locks.ReentrantReadWriteLock; 050import java.util.function.Function; 051import javax.management.MalformedObjectNameException; 052import javax.servlet.http.HttpServlet; 053import org.apache.commons.lang3.RandomUtils; 054import org.apache.commons.lang3.StringUtils; 055import org.apache.commons.lang3.SystemUtils; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.hbase.Abortable; 060import org.apache.hadoop.hbase.CacheEvictionStats; 061import org.apache.hadoop.hbase.ChoreService; 062import org.apache.hadoop.hbase.ClockOutOfSyncException; 063import org.apache.hadoop.hbase.CoordinatedStateManager; 064import org.apache.hadoop.hbase.DoNotRetryIOException; 065import org.apache.hadoop.hbase.HBaseConfiguration; 066import org.apache.hadoop.hbase.HBaseInterfaceAudience; 067import org.apache.hadoop.hbase.HConstants; 068import org.apache.hadoop.hbase.HealthCheckChore; 069import org.apache.hadoop.hbase.MetaTableAccessor; 070import org.apache.hadoop.hbase.NotServingRegionException; 071import org.apache.hadoop.hbase.PleaseHoldException; 072import org.apache.hadoop.hbase.ScheduledChore; 073import org.apache.hadoop.hbase.ServerName; 074import org.apache.hadoop.hbase.Stoppable; 075import org.apache.hadoop.hbase.TableDescriptors; 076import org.apache.hadoop.hbase.TableName; 077import org.apache.hadoop.hbase.YouAreDeadException; 078import org.apache.hadoop.hbase.ZNodeClearer; 079import org.apache.hadoop.hbase.client.ClusterConnection; 080import org.apache.hadoop.hbase.client.Connection; 081import org.apache.hadoop.hbase.client.ConnectionUtils; 082import org.apache.hadoop.hbase.client.RegionInfo; 083import org.apache.hadoop.hbase.client.RegionInfoBuilder; 084import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 085import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 086import org.apache.hadoop.hbase.client.locking.EntityLock; 087import org.apache.hadoop.hbase.client.locking.LockServiceClient; 088import org.apache.hadoop.hbase.conf.ConfigurationManager; 089import org.apache.hadoop.hbase.conf.ConfigurationObserver; 090import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 091import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 092import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 093import org.apache.hadoop.hbase.exceptions.RegionMovedException; 094import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 095import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 096import org.apache.hadoop.hbase.executor.ExecutorService; 097import org.apache.hadoop.hbase.executor.ExecutorType; 098import org.apache.hadoop.hbase.fs.HFileSystem; 099import org.apache.hadoop.hbase.http.InfoServer; 100import org.apache.hadoop.hbase.io.hfile.BlockCache; 101import org.apache.hadoop.hbase.io.hfile.CacheConfig; 102import org.apache.hadoop.hbase.io.hfile.HFile; 103import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 104import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 105import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; 106import org.apache.hadoop.hbase.ipc.RpcClient; 107import org.apache.hadoop.hbase.ipc.RpcClientFactory; 108import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 109import org.apache.hadoop.hbase.ipc.RpcServer; 110import org.apache.hadoop.hbase.ipc.RpcServerInterface; 111import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 112import org.apache.hadoop.hbase.ipc.ServerRpcController; 113import org.apache.hadoop.hbase.log.HBaseMarkers; 114import org.apache.hadoop.hbase.master.HMaster; 115import org.apache.hadoop.hbase.master.LoadBalancer; 116import org.apache.hadoop.hbase.master.RegionState.State; 117import org.apache.hadoop.hbase.mob.MobCacheConfig; 118import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 119import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 120import org.apache.hadoop.hbase.quotas.QuotaUtil; 121import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 122import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 123import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 124import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 125import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 126import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 127import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 128import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 129import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 130import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 131import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 132import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 133import org.apache.hadoop.hbase.security.Superusers; 134import org.apache.hadoop.hbase.security.User; 135import org.apache.hadoop.hbase.security.UserProvider; 136import org.apache.hadoop.hbase.trace.SpanReceiverHost; 137import org.apache.hadoop.hbase.trace.TraceUtil; 138import org.apache.hadoop.hbase.util.Addressing; 139import org.apache.hadoop.hbase.util.Bytes; 140import org.apache.hadoop.hbase.util.CompressionTest; 141import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 142import org.apache.hadoop.hbase.util.FSTableDescriptors; 143import org.apache.hadoop.hbase.util.FSUtils; 144import org.apache.hadoop.hbase.util.HasThread; 145import org.apache.hadoop.hbase.util.JvmPauseMonitor; 146import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 147import org.apache.hadoop.hbase.util.Pair; 148import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 149import org.apache.hadoop.hbase.util.Sleeper; 150import org.apache.hadoop.hbase.util.Threads; 151import org.apache.hadoop.hbase.util.VersionInfo; 152import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 153import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; 154import org.apache.hadoop.hbase.wal.WAL; 155import org.apache.hadoop.hbase.wal.WALFactory; 156import org.apache.hadoop.hbase.wal.WALProvider; 157import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; 158import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 159import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 160import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 161import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 162import org.apache.hadoop.hbase.zookeeper.ZKUtil; 163import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 164import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 165import org.apache.hadoop.ipc.RemoteException; 166import org.apache.hadoop.util.ReflectionUtils; 167import org.apache.yetus.audience.InterfaceAudience; 168import org.apache.zookeeper.KeeperException; 169import org.slf4j.Logger; 170import org.slf4j.LoggerFactory; 171import sun.misc.Signal; 172import sun.misc.SignalHandler; 173 174import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 175import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 176import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 177import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 178import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 179import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 180import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 181import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 182import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 183 184import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 185import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 212 213/** 214 * HRegionServer makes a set of HRegions available to clients. It checks in with 215 * the HMaster. There are many HRegionServers in a single HBase deployment. 216 */ 217@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 218@SuppressWarnings({ "deprecation"}) 219public class HRegionServer extends HasThread implements 220 RegionServerServices, LastSequenceId, ConfigurationObserver { 221 // Time to pause if master says 'please hold'. Make configurable if needed. 222 private static final int INIT_PAUSE_TIME_MS = 1000; 223 224 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 225 226 /** 227 * For testing only! Set to true to skip notifying region assignment to master . 228 */ 229 @VisibleForTesting 230 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") 231 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 232 233 //RegionName vs current action in progress 234 //true - if open region action in progress 235 //false - if close region action in progress 236 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 237 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 238 239 // Cache flushing 240 protected MemStoreFlusher cacheFlusher; 241 242 protected HeapMemoryManager hMemManager; 243 244 /** 245 * Cluster connection to be shared by services. 246 * Initialized at server startup and closed when server shuts down. 247 * Clients must never close it explicitly. 248 * Clients hosted by this Server should make use of this clusterConnection rather than create 249 * their own; if they create their own, there is no way for the hosting server to shutdown 250 * ongoing client RPCs. 251 */ 252 protected ClusterConnection clusterConnection; 253 254 /* 255 * Long-living meta table locator, which is created when the server is started and stopped 256 * when server shuts down. References to this locator shall be used to perform according 257 * operations in EventHandlers. Primary reason for this decision is to make it mockable 258 * for tests. 259 */ 260 protected MetaTableLocator metaTableLocator; 261 262 /** 263 * Go here to get table descriptors. 264 */ 265 protected TableDescriptors tableDescriptors; 266 267 // Replication services. If no replication, this handler will be null. 268 protected ReplicationSourceService replicationSourceHandler; 269 protected ReplicationSinkService replicationSinkHandler; 270 271 // Compactions 272 public CompactSplit compactSplitThread; 273 274 /** 275 * Map of regions currently being served by this region server. Key is the 276 * encoded region name. All access should be synchronized. 277 */ 278 protected final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 279 280 /** 281 * Map of encoded region names to the DataNode locations they should be hosted on 282 * We store the value as InetSocketAddress since this is used only in HDFS 283 * API (create() that takes favored nodes as hints for placing file blocks). 284 * We could have used ServerName here as the value class, but we'd need to 285 * convert it to InetSocketAddress at some point before the HDFS API call, and 286 * it seems a bit weird to store ServerName since ServerName refers to RegionServers 287 * and here we really mean DataNode locations. 288 */ 289 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap = 290 new ConcurrentHashMap<>(); 291 292 // Leases 293 protected Leases leases; 294 295 // Instance of the hbase executor executorService. 296 protected ExecutorService executorService; 297 298 // If false, the file system has become unavailable 299 protected volatile boolean fsOk; 300 protected HFileSystem fs; 301 protected HFileSystem walFs; 302 303 // Set when a report to the master comes back with a message asking us to 304 // shutdown. Also set by call to stop when debugging or running unit tests 305 // of HRegionServer in isolation. 306 private volatile boolean stopped = false; 307 308 // Go down hard. Used if file system becomes unavailable and also in 309 // debugging and unit tests. 310 private volatile boolean abortRequested; 311 public static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 312 // Default abort timeout is 1200 seconds for safe 313 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 314 // Will run this task when abort timeout 315 public static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 316 317 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<>(); 318 319 // A state before we go into stopped state. At this stage we're closing user 320 // space regions. 321 private boolean stopping = false; 322 323 volatile boolean killed = false; 324 325 protected final Configuration conf; 326 327 private Path rootDir; 328 private Path walRootDir; 329 330 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 331 332 final int numRetries; 333 protected final int threadWakeFrequency; 334 protected final int msgInterval; 335 336 protected final int numRegionsToReport; 337 338 // Stub to do region server status calls against the master. 339 private volatile RegionServerStatusService.BlockingInterface rssStub; 340 private volatile LockService.BlockingInterface lockStub; 341 // RPC client. Used to make the stub above that does region server status checking. 342 RpcClient rpcClient; 343 344 private RpcRetryingCallerFactory rpcRetryingCallerFactory; 345 private RpcControllerFactory rpcControllerFactory; 346 347 private UncaughtExceptionHandler uncaughtExceptionHandler; 348 349 // Info server. Default access so can be used by unit tests. REGIONSERVER 350 // is name of the webapp and the attribute name used stuffing this instance 351 // into web context. 352 protected InfoServer infoServer; 353 private JvmPauseMonitor pauseMonitor; 354 355 /** region server process name */ 356 public static final String REGIONSERVER = "regionserver"; 357 358 MetricsRegionServer metricsRegionServer; 359 MetricsTable metricsTable; 360 private SpanReceiverHost spanReceiverHost; 361 362 /** 363 * ChoreService used to schedule tasks that we want to run periodically 364 */ 365 private ChoreService choreService; 366 367 /* 368 * Check for compactions requests. 369 */ 370 ScheduledChore compactionChecker; 371 372 /* 373 * Check for flushes 374 */ 375 ScheduledChore periodicFlusher; 376 377 protected volatile WALFactory walFactory; 378 379 // WAL roller. log is protected rather than private to avoid 380 // eclipse warning when accessed by inner classes 381 protected LogRoller walRoller; 382 383 // flag set after we're done setting up server threads 384 final AtomicBoolean online = new AtomicBoolean(false); 385 386 // zookeeper connection and watcher 387 protected final ZKWatcher zooKeeper; 388 389 // master address tracker 390 private final MasterAddressTracker masterAddressTracker; 391 392 // Cluster Status Tracker 393 protected final ClusterStatusTracker clusterStatusTracker; 394 395 // Log Splitting Worker 396 private SplitLogWorker splitLogWorker; 397 398 // A sleeper that sleeps for msgInterval. 399 protected final Sleeper sleeper; 400 401 private final int operationTimeout; 402 private final int shortOperationTimeout; 403 404 private final RegionServerAccounting regionServerAccounting; 405 406 // Cache configuration and block cache reference 407 protected CacheConfig cacheConfig; 408 // Cache configuration for mob 409 final MobCacheConfig mobCacheConfig; 410 411 /** The health check chore. */ 412 private HealthCheckChore healthCheckChore; 413 414 /** The nonce manager chore. */ 415 private ScheduledChore nonceManagerChore; 416 417 private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap(); 418 419 /** 420 * The server name the Master sees us as. Its made from the hostname the 421 * master passes us, port, and server startcode. Gets set after registration 422 * against Master. 423 */ 424 protected ServerName serverName; 425 426 /* 427 * hostname specified by hostname config 428 */ 429 protected String useThisHostnameInstead; 430 431 // key to the config parameter of server hostname 432 // the specification of server hostname is optional. The hostname should be resolvable from 433 // both master and region server 434 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 435 final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname"; 436 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 437 protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname"; 438 439 // HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive. 440 // Exception will be thrown if both are used. 441 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 442 "hbase.regionserver.hostname.disable.master.reversedns"; 443 444 /** 445 * This servers startcode. 446 */ 447 protected final long startcode; 448 449 /** 450 * Unique identifier for the cluster we are a part of. 451 */ 452 protected String clusterId; 453 454 /** 455 * Chore to clean periodically the moved region list 456 */ 457 private MovedRegionsCleaner movedRegionsCleaner; 458 459 // chore for refreshing store files for secondary regions 460 private StorefileRefresherChore storefileRefresher; 461 462 private RegionServerCoprocessorHost rsHost; 463 464 private RegionServerProcedureManagerHost rspmHost; 465 466 private RegionServerRpcQuotaManager rsQuotaManager; 467 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 468 469 /** 470 * Nonce manager. Nonces are used to make operations like increment and append idempotent 471 * in the case where client doesn't receive the response from a successful operation and 472 * retries. We track the successful ops for some time via a nonce sent by client and handle 473 * duplicate operations (currently, by failing them; in future we might use MVCC to return 474 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from 475 * HBASE-3787) are: 476 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth 477 * of past records. If we don't read the records, we don't read and recover the nonces. 478 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup. 479 * - There's no WAL recovery during normal region move, so nonces will not be transfered. 480 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and 481 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush 482 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was 483 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce 484 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the 485 * latest nonce in it expired. It can also be recovered during move. 486 */ 487 final ServerNonceManager nonceManager; 488 489 private UserProvider userProvider; 490 491 protected final RSRpcServices rpcServices; 492 493 protected CoordinatedStateManager csm; 494 495 /** 496 * Configuration manager is used to register/deregister and notify the configuration observers 497 * when the regionserver is notified that there was a change in the on disk configs. 498 */ 499 protected final ConfigurationManager configurationManager; 500 501 @VisibleForTesting 502 CompactedHFilesDischarger compactedFileDischarger; 503 504 private volatile ThroughputController flushThroughputController; 505 506 protected SecureBulkLoadManager secureBulkLoadManager; 507 508 protected FileSystemUtilizationChore fsUtilizationChore; 509 510 private final NettyEventLoopGroupConfig eventLoopGroupConfig; 511 512 /** 513 * True if this RegionServer is coming up in a cluster where there is no Master; 514 * means it needs to just come up and make do without a Master to talk to: e.g. in test or 515 * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only 516 * purpose is as a Replication-stream sink; see HBASE-18846 for more. 517 */ 518 private final boolean masterless; 519 static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 520 521 /** 522 * Starts a HRegionServer at the default location 523 */ 524 // Don't start any services or managers in here in the Constructor. 525 // Defer till after we register with the Master as much as possible. See #startServices. 526 public HRegionServer(Configuration conf) throws IOException { 527 super("RegionServer"); // thread name 528 TraceUtil.initTracer(conf); 529 try { 530 this.startcode = System.currentTimeMillis(); 531 this.conf = conf; 532 this.fsOk = true; 533 this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 534 this.eventLoopGroupConfig = setupNetty(this.conf); 535 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); 536 HFile.checkHFileVersion(this.conf); 537 checkCodecs(this.conf); 538 this.userProvider = UserProvider.instantiate(conf); 539 FSUtils.setupShortCircuitRead(this.conf); 540 541 // Disable usage of meta replicas in the regionserver 542 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 543 // Config'ed params 544 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 545 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 546 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 547 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); 548 549 this.sleeper = new Sleeper(this.msgInterval, this); 550 551 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 552 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 553 554 this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10); 555 556 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 557 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 558 559 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 560 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 561 562 this.abortRequested = false; 563 this.stopped = false; 564 565 rpcServices = createRpcServices(); 566 useThisHostnameInstead = getUseThisHostnameInstead(conf); 567 String hostName = 568 StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName() 569 : this.useThisHostnameInstead; 570 serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); 571 572 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); 573 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); 574 575 // login the zookeeper client principal (if using security) 576 ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, 577 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); 578 // login the server principal (if using secure Hadoop) 579 login(userProvider, hostName); 580 // init superusers and add the server principal (if using security) 581 // or process owner as default super user. 582 Superusers.initialize(conf); 583 regionServerAccounting = new RegionServerAccounting(conf); 584 585 boolean isMasterNotCarryTable = 586 this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf); 587 // no need to instantiate global block cache when master not carry table 588 if (!isMasterNotCarryTable) { 589 CacheConfig.instantiateBlockCache(conf); 590 } 591 cacheConfig = new CacheConfig(conf); 592 mobCacheConfig = new MobCacheConfig(conf); 593 594 uncaughtExceptionHandler = new UncaughtExceptionHandler() { 595 @Override 596 public void uncaughtException(Thread t, Throwable e) { 597 abort("Uncaught exception in executorService thread " + t.getName(), e); 598 } 599 }; 600 601 initializeFileSystem(); 602 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); 603 604 this.configurationManager = new ConfigurationManager(); 605 setupWindows(getConfiguration(), getConfigurationManager()); 606 607 // Some unit tests don't need a cluster, so no zookeeper at all 608 if (!conf.getBoolean("hbase.testing.nocluster", false)) { 609 // Open connection to zookeeper and set primary watcher 610 zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + 611 rpcServices.isa.getPort(), this, canCreateBaseZNode()); 612 // If no master in cluster, skip trying to track one or look for a cluster status. 613 if (!this.masterless) { 614 this.csm = new ZkCoordinatedStateManager(this); 615 616 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 617 masterAddressTracker.start(); 618 619 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); 620 clusterStatusTracker.start(); 621 } else { 622 masterAddressTracker = null; 623 clusterStatusTracker = null; 624 } 625 } else { 626 zooKeeper = null; 627 masterAddressTracker = null; 628 clusterStatusTracker = null; 629 } 630 this.rpcServices.start(zooKeeper); 631 // This violates 'no starting stuff in Constructor' but Master depends on the below chore 632 // and executor being created and takes a different startup route. Lots of overlap between HRS 633 // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super 634 // Master expects Constructor to put up web servers. Ugh. 635 // class HRS. TODO. 636 this.choreService = new ChoreService(getName(), true); 637 this.executorService = new ExecutorService(getName()); 638 putUpWebUI(); 639 } catch (Throwable t) { 640 // Make sure we log the exception. HRegionServer is often started via reflection and the 641 // cause of failed startup is lost. 642 LOG.error("Failed construction RegionServer", t); 643 throw t; 644 } 645 } 646 647 // HMaster should override this method to load the specific config for master 648 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 649 String hostname = conf.get(RS_HOSTNAME_KEY); 650 if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 651 if (!StringUtils.isBlank(hostname)) { 652 String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY + 653 " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + 654 " to true while " + RS_HOSTNAME_KEY + " is used"; 655 throw new IOException(msg); 656 } else { 657 return rpcServices.isa.getHostName(); 658 } 659 } else { 660 return hostname; 661 } 662 } 663 664 /** 665 * If running on Windows, do windows-specific setup. 666 */ 667 private static void setupWindows(final Configuration conf, ConfigurationManager cm) { 668 if (!SystemUtils.IS_OS_WINDOWS) { 669 Signal.handle(new Signal("HUP"), new SignalHandler() { 670 @Override 671 public void handle(Signal signal) { 672 conf.reloadConfiguration(); 673 cm.notifyAllObservers(conf); 674 } 675 }); 676 } 677 } 678 679 private static NettyEventLoopGroupConfig setupNetty(Configuration conf) { 680 // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL. 681 NettyEventLoopGroupConfig nelgc = 682 new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); 683 NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 684 NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 685 return nelgc; 686 } 687 688 private void initializeFileSystem() throws IOException { 689 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase 690 // checksum verification enabled, then automatically switch off hdfs checksum verification. 691 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 692 FSUtils.setFsDefault(this.conf, FSUtils.getWALRootDir(this.conf)); 693 this.walFs = new HFileSystem(this.conf, useHBaseChecksum); 694 this.walRootDir = FSUtils.getWALRootDir(this.conf); 695 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else 696 // underlying hadoop hdfs accessors will be going against wrong filesystem 697 // (unless all is set to defaults). 698 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf)); 699 this.fs = new HFileSystem(this.conf, useHBaseChecksum); 700 this.rootDir = FSUtils.getRootDir(this.conf); 701 this.tableDescriptors = getFsTableDescriptors(); 702 } 703 704 protected TableDescriptors getFsTableDescriptors() throws IOException { 705 return new FSTableDescriptors(this.conf, 706 this.fs, this.rootDir, !canUpdateTableDescriptor(), false, getMetaTableObserver()); 707 } 708 709 protected Function<TableDescriptorBuilder, TableDescriptorBuilder> getMetaTableObserver() { 710 return null; 711 } 712 713 protected void login(UserProvider user, String host) throws IOException { 714 user.login("hbase.regionserver.keytab.file", 715 "hbase.regionserver.kerberos.principal", host); 716 } 717 718 719 /** 720 * Wait for an active Master. 721 * See override in Master superclass for how it is used. 722 */ 723 protected void waitForMasterActive() {} 724 725 protected String getProcessName() { 726 return REGIONSERVER; 727 } 728 729 protected boolean canCreateBaseZNode() { 730 return this.masterless; 731 } 732 733 protected boolean canUpdateTableDescriptor() { 734 return false; 735 } 736 737 protected RSRpcServices createRpcServices() throws IOException { 738 return new RSRpcServices(this); 739 } 740 741 protected void configureInfoServer() { 742 infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class); 743 infoServer.setAttribute(REGIONSERVER, this); 744 } 745 746 protected Class<? extends HttpServlet> getDumpServlet() { 747 return RSDumpServlet.class; 748 } 749 750 @Override 751 public boolean registerService(com.google.protobuf.Service instance) { 752 /* 753 * No stacking of instances is allowed for a single executorService name 754 */ 755 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 756 instance.getDescriptorForType(); 757 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 758 if (coprocessorServiceHandlers.containsKey(serviceName)) { 759 LOG.error("Coprocessor executorService " + serviceName 760 + " already registered, rejecting request from " + instance); 761 return false; 762 } 763 764 coprocessorServiceHandlers.put(serviceName, instance); 765 if (LOG.isDebugEnabled()) { 766 LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName); 767 } 768 return true; 769 } 770 771 /** 772 * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to 773 * the local server; i.e. a short-circuit Connection. Safe to use going to local or remote 774 * server. Create this instance in a method can be intercepted and mocked in tests. 775 * @throws IOException 776 */ 777 @VisibleForTesting 778 protected ClusterConnection createClusterConnection() throws IOException { 779 // Create a cluster connection that when appropriate, can short-circuit and go directly to the 780 // local server if the request is to the local server bypassing RPC. Can be used for both local 781 // and remote invocations. 782 return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), 783 serverName, rpcServices, rpcServices); 784 } 785 786 /** 787 * Run test on configured codecs to make sure supporting libs are in place. 788 * @param c 789 * @throws IOException 790 */ 791 private static void checkCodecs(final Configuration c) throws IOException { 792 // check to see if the codec list is available: 793 String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null); 794 if (codecs == null) return; 795 for (String codec : codecs) { 796 if (!CompressionTest.testCompression(codec)) { 797 throw new IOException("Compression codec " + codec + 798 " not supported, aborting RS construction"); 799 } 800 } 801 } 802 803 public String getClusterId() { 804 return this.clusterId; 805 } 806 807 /** 808 * Setup our cluster connection if not already initialized. 809 * @throws IOException 810 */ 811 protected synchronized void setupClusterConnection() throws IOException { 812 if (clusterConnection == null) { 813 clusterConnection = createClusterConnection(); 814 metaTableLocator = new MetaTableLocator(); 815 } 816 } 817 818 /** 819 * All initialization needed before we go register with Master.<br> 820 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 821 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 822 */ 823 private void preRegistrationInitialization() { 824 try { 825 initializeZooKeeper(); 826 setupClusterConnection(); 827 // Setup RPC client for master communication 828 this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( 829 this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); 830 } catch (Throwable t) { 831 // Call stop if error or process will stick around for ever since server 832 // puts up non-daemon threads. 833 this.rpcServices.stop(); 834 abort("Initialization of RS failed. Hence aborting RS.", t); 835 } 836 } 837 838 /** 839 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 840 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 841 * <p> 842 * Finally open long-living server short-circuit connection. 843 */ 844 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 845 justification="cluster Id znode read would give us correct response") 846 private void initializeZooKeeper() throws IOException, InterruptedException { 847 // Nothing to do in here if no Master in the mix. 848 if (this.masterless) { 849 return; 850 } 851 852 // Create the master address tracker, register with zk, and start it. Then 853 // block until a master is available. No point in starting up if no master 854 // running. 855 blockAndCheckIfStopped(this.masterAddressTracker); 856 857 // Wait on cluster being up. Master will set this flag up in zookeeper 858 // when ready. 859 blockAndCheckIfStopped(this.clusterStatusTracker); 860 861 // If we are HMaster then the cluster id should have already been set. 862 if (clusterId == null) { 863 // Retrieve clusterId 864 // Since cluster status is now up 865 // ID should have already been set by HMaster 866 try { 867 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 868 if (clusterId == null) { 869 this.abort("Cluster ID has not been set"); 870 } 871 LOG.info("ClusterId : " + clusterId); 872 } catch (KeeperException e) { 873 this.abort("Failed to retrieve Cluster ID", e); 874 } 875 } 876 877 waitForMasterActive(); 878 if (isStopped() || isAborted()) { 879 return; // No need for further initialization 880 } 881 882 // watch for snapshots and other procedures 883 try { 884 rspmHost = new RegionServerProcedureManagerHost(); 885 rspmHost.loadProcedures(conf); 886 rspmHost.initialize(this); 887 } catch (KeeperException e) { 888 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 889 } 890 } 891 892 /** 893 * Utilty method to wait indefinitely on a znode availability while checking 894 * if the region server is shut down 895 * @param tracker znode tracker to use 896 * @throws IOException any IO exception, plus if the RS is stopped 897 * @throws InterruptedException 898 */ 899 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 900 throws IOException, InterruptedException { 901 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 902 if (this.stopped) { 903 throw new IOException("Received the shutdown message while waiting."); 904 } 905 } 906 } 907 908 /** 909 * @return True if the cluster is up. 910 */ 911 @Override 912 public boolean isClusterUp() { 913 return this.masterless || 914 (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 915 } 916 917 /** 918 * The HRegionServer sticks in this loop until closed. 919 */ 920 @Override 921 public void run() { 922 try { 923 // Do pre-registration initializations; zookeeper, lease threads, etc. 924 preRegistrationInitialization(); 925 } catch (Throwable e) { 926 abort("Fatal exception during initialization", e); 927 } 928 929 try { 930 if (!isStopped() && !isAborted()) { 931 ShutdownHook.install(conf, fs, this, Thread.currentThread()); 932 // Initialize the RegionServerCoprocessorHost now that our ephemeral 933 // node was created, in case any coprocessors want to use ZooKeeper 934 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 935 } 936 937 // Try and register with the Master; tell it we are here. Break if 938 // server is stopped or the clusterup flag is down or hdfs went wacky. 939 // Once registered successfully, go ahead and start up all Services. 940 LOG.debug("About to register with Master."); 941 while (keepLooping()) { 942 RegionServerStartupResponse w = reportForDuty(); 943 if (w == null) { 944 LOG.warn("reportForDuty failed; sleeping and then retrying."); 945 this.sleeper.sleep(); 946 } else { 947 handleReportForDutyResponse(w); 948 break; 949 } 950 } 951 952 if (!isStopped() && isHealthy()) { 953 // start the snapshot handler and other procedure handlers, 954 // since the server is ready to run 955 if (this.rspmHost != null) { 956 this.rspmHost.start(); 957 } 958 // Start the Quota Manager 959 if (this.rsQuotaManager != null) { 960 rsQuotaManager.start(getRpcServer().getScheduler()); 961 } 962 if (this.rsSpaceQuotaManager != null) { 963 this.rsSpaceQuotaManager.start(); 964 } 965 } 966 967 // We registered with the Master. Go into run mode. 968 long lastMsg = System.currentTimeMillis(); 969 long oldRequestCount = -1; 970 // The main run loop. 971 while (!isStopped() && isHealthy()) { 972 if (!isClusterUp()) { 973 if (isOnlineRegionsEmpty()) { 974 stop("Exiting; cluster shutdown set and not carrying any regions"); 975 } else if (!this.stopping) { 976 this.stopping = true; 977 LOG.info("Closing user regions"); 978 closeUserRegions(this.abortRequested); 979 } else if (this.stopping) { 980 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 981 if (allUserRegionsOffline) { 982 // Set stopped if no more write requests tp meta tables 983 // since last time we went around the loop. Any open 984 // meta regions will be closed on our way out. 985 if (oldRequestCount == getWriteRequestCount()) { 986 stop("Stopped; only catalog regions remaining online"); 987 break; 988 } 989 oldRequestCount = getWriteRequestCount(); 990 } else { 991 // Make sure all regions have been closed -- some regions may 992 // have not got it because we were splitting at the time of 993 // the call to closeUserRegions. 994 closeUserRegions(this.abortRequested); 995 } 996 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 997 } 998 } 999 long now = System.currentTimeMillis(); 1000 if ((now - lastMsg) >= msgInterval) { 1001 tryRegionServerReport(lastMsg, now); 1002 lastMsg = System.currentTimeMillis(); 1003 } 1004 if (!isStopped() && !isAborted()) { 1005 this.sleeper.sleep(); 1006 } 1007 } // for 1008 } catch (Throwable t) { 1009 if (!rpcServices.checkOOME(t)) { 1010 String prefix = t instanceof YouAreDeadException? "": "Unhandled: "; 1011 abort(prefix + t.getMessage(), t); 1012 } 1013 } 1014 1015 if (abortRequested) { 1016 Timer abortMonitor = new Timer("Abort regionserver monitor", true); 1017 TimerTask abortTimeoutTask = null; 1018 try { 1019 abortTimeoutTask = 1020 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 1021 .asSubclass(TimerTask.class).getDeclaredConstructor().newInstance(); 1022 } catch (Exception e) { 1023 LOG.warn("Initialize abort timeout task failed", e); 1024 } 1025 if (abortTimeoutTask != null) { 1026 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 1027 } 1028 } 1029 1030 if (this.leases != null) { 1031 this.leases.closeAfterLeasesExpire(); 1032 } 1033 if (this.splitLogWorker != null) { 1034 splitLogWorker.stop(); 1035 } 1036 if (this.infoServer != null) { 1037 LOG.info("Stopping infoServer"); 1038 try { 1039 this.infoServer.stop(); 1040 } catch (Exception e) { 1041 LOG.error("Failed to stop infoServer", e); 1042 } 1043 } 1044 // Send cache a shutdown. 1045 if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) { 1046 cacheConfig.getBlockCache().shutdown(); 1047 } 1048 mobCacheConfig.getMobFileCache().shutdown(); 1049 1050 if (movedRegionsCleaner != null) { 1051 movedRegionsCleaner.stop("Region Server stopping"); 1052 } 1053 1054 // Send interrupts to wake up threads if sleeping so they notice shutdown. 1055 // TODO: Should we check they are alive? If OOME could have exited already 1056 if (this.hMemManager != null) this.hMemManager.stop(); 1057 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); 1058 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); 1059 if (this.compactionChecker != null) this.compactionChecker.cancel(true); 1060 if (this.healthCheckChore != null) this.healthCheckChore.cancel(true); 1061 if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true); 1062 if (this.storefileRefresher != null) this.storefileRefresher.cancel(true); 1063 sendShutdownInterrupt(); 1064 1065 // Stop the quota manager 1066 if (rsQuotaManager != null) { 1067 rsQuotaManager.stop(); 1068 } 1069 if (rsSpaceQuotaManager != null) { 1070 rsSpaceQuotaManager.stop(); 1071 rsSpaceQuotaManager = null; 1072 } 1073 1074 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 1075 if (rspmHost != null) { 1076 rspmHost.stop(this.abortRequested || this.killed); 1077 } 1078 1079 if (this.killed) { 1080 // Just skip out w/o closing regions. Used when testing. 1081 } else if (abortRequested) { 1082 if (this.fsOk) { 1083 closeUserRegions(abortRequested); // Don't leave any open file handles 1084 } 1085 LOG.info("aborting server " + this.serverName); 1086 } else { 1087 closeUserRegions(abortRequested); 1088 LOG.info("stopping server " + this.serverName); 1089 } 1090 1091 // so callers waiting for meta without timeout can stop 1092 if (this.metaTableLocator != null) this.metaTableLocator.stop(); 1093 if (this.clusterConnection != null && !clusterConnection.isClosed()) { 1094 try { 1095 this.clusterConnection.close(); 1096 } catch (IOException e) { 1097 // Although the {@link Closeable} interface throws an {@link 1098 // IOException}, in reality, the implementation would never do that. 1099 LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e); 1100 } 1101 } 1102 1103 // Closing the compactSplit thread before closing meta regions 1104 if (!this.killed && containsMetaTableRegions()) { 1105 if (!abortRequested || this.fsOk) { 1106 if (this.compactSplitThread != null) { 1107 this.compactSplitThread.join(); 1108 this.compactSplitThread = null; 1109 } 1110 closeMetaTableRegions(abortRequested); 1111 } 1112 } 1113 1114 if (!this.killed && this.fsOk) { 1115 waitOnAllRegionsToClose(abortRequested); 1116 LOG.info("stopping server " + this.serverName + "; all regions closed."); 1117 } 1118 1119 //fsOk flag may be changed when closing regions throws exception. 1120 if (this.fsOk) { 1121 shutdownWAL(!abortRequested); 1122 } 1123 1124 // Make sure the proxy is down. 1125 if (this.rssStub != null) { 1126 this.rssStub = null; 1127 } 1128 if (this.lockStub != null) { 1129 this.lockStub = null; 1130 } 1131 if (this.rpcClient != null) { 1132 this.rpcClient.close(); 1133 } 1134 if (this.leases != null) { 1135 this.leases.close(); 1136 } 1137 if (this.pauseMonitor != null) { 1138 this.pauseMonitor.stop(); 1139 } 1140 1141 if (!killed) { 1142 stopServiceThreads(); 1143 } 1144 1145 if (this.rpcServices != null) { 1146 this.rpcServices.stop(); 1147 } 1148 1149 try { 1150 deleteMyEphemeralNode(); 1151 } catch (KeeperException.NoNodeException nn) { 1152 } catch (KeeperException e) { 1153 LOG.warn("Failed deleting my ephemeral node", e); 1154 } 1155 // We may have failed to delete the znode at the previous step, but 1156 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1157 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1158 1159 if (this.zooKeeper != null) { 1160 this.zooKeeper.close(); 1161 } 1162 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1163 } 1164 1165 private boolean containsMetaTableRegions() { 1166 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1167 } 1168 1169 private boolean areAllUserRegionsOffline() { 1170 if (getNumberOfOnlineRegions() > 2) return false; 1171 boolean allUserRegionsOffline = true; 1172 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 1173 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1174 allUserRegionsOffline = false; 1175 break; 1176 } 1177 } 1178 return allUserRegionsOffline; 1179 } 1180 1181 /** 1182 * @return Current write count for all online regions. 1183 */ 1184 private long getWriteRequestCount() { 1185 long writeCount = 0; 1186 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 1187 writeCount += e.getValue().getWriteRequestsCount(); 1188 } 1189 return writeCount; 1190 } 1191 1192 @VisibleForTesting 1193 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1194 throws IOException { 1195 RegionServerStatusService.BlockingInterface rss = rssStub; 1196 if (rss == null) { 1197 // the current server could be stopping. 1198 return; 1199 } 1200 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1201 try { 1202 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1203 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1204 request.setLoad(sl); 1205 rss.regionServerReport(null, request.build()); 1206 } catch (ServiceException se) { 1207 IOException ioe = ProtobufUtil.getRemoteException(se); 1208 if (ioe instanceof YouAreDeadException) { 1209 // This will be caught and handled as a fatal error in run() 1210 throw ioe; 1211 } 1212 if (rssStub == rss) { 1213 rssStub = null; 1214 } 1215 // Couldn't connect to the master, get location from zk and reconnect 1216 // Method blocks until new master is found or we are stopped 1217 createRegionServerStatusStub(true); 1218 } 1219 } 1220 1221 /** 1222 * Reports the given map of Regions and their size on the filesystem to the active Master. 1223 * 1224 * @param onlineRegionSizes A map of region info to size in bytes 1225 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1226 */ 1227 public boolean reportRegionSizesForQuotas(final Map<RegionInfo, Long> onlineRegionSizes) { 1228 RegionServerStatusService.BlockingInterface rss = rssStub; 1229 if (rss == null) { 1230 // the current server could be stopping. 1231 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1232 return true; 1233 } 1234 try { 1235 RegionSpaceUseReportRequest request = buildRegionSpaceUseReportRequest( 1236 Objects.requireNonNull(onlineRegionSizes)); 1237 rss.reportRegionSpaceUse(null, request); 1238 } catch (ServiceException se) { 1239 IOException ioe = ProtobufUtil.getRemoteException(se); 1240 if (ioe instanceof PleaseHoldException) { 1241 LOG.trace("Failed to report region sizes to Master because it is initializing." 1242 + " This will be retried.", ioe); 1243 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1244 return true; 1245 } 1246 if (rssStub == rss) { 1247 rssStub = null; 1248 } 1249 createRegionServerStatusStub(true); 1250 if (ioe instanceof DoNotRetryIOException) { 1251 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1252 if (doNotRetryEx.getCause() != null) { 1253 Throwable t = doNotRetryEx.getCause(); 1254 if (t instanceof UnsupportedOperationException) { 1255 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1256 return false; 1257 } 1258 } 1259 } 1260 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1261 } 1262 return true; 1263 } 1264 1265 /** 1266 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1267 * 1268 * @param regionSizes Map of region info to size in bytes. 1269 * @return The corresponding protocol buffer message. 1270 */ 1271 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(Map<RegionInfo,Long> regionSizes) { 1272 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1273 for (Entry<RegionInfo, Long> entry : Objects.requireNonNull(regionSizes).entrySet()) { 1274 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue())); 1275 } 1276 return request.build(); 1277 } 1278 1279 /** 1280 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} 1281 * protobuf message. 1282 * 1283 * @param regionInfo The RegionInfo 1284 * @param sizeInBytes The size in bytes of the Region 1285 * @return The protocol buffer 1286 */ 1287 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1288 return RegionSpaceUse.newBuilder() 1289 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1290 .setRegionSize(Objects.requireNonNull(sizeInBytes)) 1291 .build(); 1292 } 1293 1294 ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1295 throws IOException { 1296 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1297 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1298 // the wrapper to compute those numbers in one place. 1299 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1300 // Instead they should be stored in an HBase table so that external visibility into HBase is 1301 // improved; Additionally the load balancer will be able to take advantage of a more complete 1302 // history. 1303 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1304 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1305 long usedMemory = -1L; 1306 long maxMemory = -1L; 1307 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1308 if (usage != null) { 1309 usedMemory = usage.getUsed(); 1310 maxMemory = usage.getMax(); 1311 } 1312 1313 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1314 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1315 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1316 serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024)); 1317 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1318 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1319 Builder coprocessorBuilder = Coprocessor.newBuilder(); 1320 for (String coprocessor : coprocessors) { 1321 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1322 } 1323 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1324 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1325 for (HRegion region : regions) { 1326 if (region.getCoprocessorHost() != null) { 1327 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1328 Iterator<String> iterator = regionCoprocessors.iterator(); 1329 while (iterator.hasNext()) { 1330 serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build()); 1331 } 1332 } 1333 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1334 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1335 .getCoprocessors()) { 1336 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1337 } 1338 } 1339 serverLoad.setReportStartTime(reportStartTime); 1340 serverLoad.setReportEndTime(reportEndTime); 1341 if (this.infoServer != null) { 1342 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1343 } else { 1344 serverLoad.setInfoServerPort(-1); 1345 } 1346 1347 // for the replicationLoad purpose. Only need to get from one executorService 1348 // either source or sink will get the same info 1349 ReplicationSourceService rsources = getReplicationSourceService(); 1350 1351 if (rsources != null) { 1352 // always refresh first to get the latest value 1353 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); 1354 if (rLoad != null) { 1355 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1356 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) { 1357 serverLoad.addReplLoadSource(rLS); 1358 } 1359 } 1360 } 1361 1362 return serverLoad.build(); 1363 } 1364 1365 String getOnlineRegionsAsPrintableString() { 1366 StringBuilder sb = new StringBuilder(); 1367 for (Region r: this.onlineRegions.values()) { 1368 if (sb.length() > 0) sb.append(", "); 1369 sb.append(r.getRegionInfo().getEncodedName()); 1370 } 1371 return sb.toString(); 1372 } 1373 1374 /** 1375 * Wait on regions close. 1376 */ 1377 private void waitOnAllRegionsToClose(final boolean abort) { 1378 // Wait till all regions are closed before going out. 1379 int lastCount = -1; 1380 long previousLogTime = 0; 1381 Set<String> closedRegions = new HashSet<>(); 1382 boolean interrupted = false; 1383 try { 1384 while (!isOnlineRegionsEmpty()) { 1385 int count = getNumberOfOnlineRegions(); 1386 // Only print a message if the count of regions has changed. 1387 if (count != lastCount) { 1388 // Log every second at most 1389 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 1390 previousLogTime = System.currentTimeMillis(); 1391 lastCount = count; 1392 LOG.info("Waiting on " + count + " regions to close"); 1393 // Only print out regions still closing if a small number else will 1394 // swamp the log. 1395 if (count < 10 && LOG.isDebugEnabled()) { 1396 LOG.debug("Online Regions=" + this.onlineRegions); 1397 } 1398 } 1399 } 1400 // Ensure all user regions have been sent a close. Use this to 1401 // protect against the case where an open comes in after we start the 1402 // iterator of onlineRegions to close all user regions. 1403 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1404 RegionInfo hri = e.getValue().getRegionInfo(); 1405 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) && 1406 !closedRegions.contains(hri.getEncodedName())) { 1407 closedRegions.add(hri.getEncodedName()); 1408 // Don't update zk with this close transition; pass false. 1409 closeRegionIgnoreErrors(hri, abort); 1410 } 1411 } 1412 // No regions in RIT, we could stop waiting now. 1413 if (this.regionsInTransitionInRS.isEmpty()) { 1414 if (!isOnlineRegionsEmpty()) { 1415 LOG.info("We were exiting though online regions are not empty," + 1416 " because some regions failed closing"); 1417 } 1418 break; 1419 } 1420 if (sleep(200)) { 1421 interrupted = true; 1422 } 1423 } 1424 } finally { 1425 if (interrupted) { 1426 Thread.currentThread().interrupt(); 1427 } 1428 } 1429 } 1430 1431 private boolean sleep(long millis) { 1432 boolean interrupted = false; 1433 try { 1434 Thread.sleep(millis); 1435 } catch (InterruptedException e) { 1436 LOG.warn("Interrupted while sleeping"); 1437 interrupted = true; 1438 } 1439 return interrupted; 1440 } 1441 1442 private void shutdownWAL(final boolean close) { 1443 if (this.walFactory != null) { 1444 try { 1445 if (close) { 1446 walFactory.close(); 1447 } else { 1448 walFactory.shutdown(); 1449 } 1450 } catch (Throwable e) { 1451 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1452 LOG.error("Shutdown / close of WAL failed: " + e); 1453 LOG.debug("Shutdown / close exception details:", e); 1454 } 1455 } 1456 } 1457 1458 /* 1459 * Run init. Sets up wal and starts up all server threads. 1460 * 1461 * @param c Extra configuration. 1462 */ 1463 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1464 throws IOException { 1465 try { 1466 boolean updateRootDir = false; 1467 for (NameStringPair e : c.getMapEntriesList()) { 1468 String key = e.getName(); 1469 // The hostname the master sees us as. 1470 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1471 String hostnameFromMasterPOV = e.getValue(); 1472 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(), 1473 this.startcode); 1474 if (!StringUtils.isBlank(useThisHostnameInstead) && 1475 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) { 1476 String msg = "Master passed us a different hostname to use; was=" + 1477 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV; 1478 LOG.error(msg); 1479 throw new IOException(msg); 1480 } 1481 if (StringUtils.isBlank(useThisHostnameInstead) && 1482 !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) { 1483 String msg = "Master passed us a different hostname to use; was=" + 1484 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV; 1485 LOG.error(msg); 1486 } 1487 continue; 1488 } 1489 1490 String value = e.getValue(); 1491 if (key.equals(HConstants.HBASE_DIR)) { 1492 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1493 updateRootDir = true; 1494 } 1495 } 1496 1497 if (LOG.isDebugEnabled()) { 1498 LOG.debug("Config from master: " + key + "=" + value); 1499 } 1500 this.conf.set(key, value); 1501 } 1502 // Set our ephemeral znode up in zookeeper now we have a name. 1503 createMyEphemeralNode(); 1504 1505 if (updateRootDir) { 1506 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1507 initializeFileSystem(); 1508 } 1509 1510 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1511 // config param for task trackers, but we can piggyback off of it. 1512 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1513 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1514 } 1515 1516 // Save it in a file, this will allow to see if we crash 1517 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1518 1519 // This call sets up an initialized replication and WAL. Later we start it up. 1520 setupWALAndReplication(); 1521 // Init in here rather than in constructor after thread name has been set 1522 this.metricsRegionServer = new MetricsRegionServer( 1523 new MetricsRegionServerWrapperImpl(this), conf); 1524 this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1525 // Now that we have a metrics source, start the pause monitor 1526 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1527 pauseMonitor.start(); 1528 1529 // There is a rare case where we do NOT want services to start. Check config. 1530 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1531 startServices(); 1532 } 1533 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1534 // or make sense of it. 1535 startReplicationService(); 1536 1537 1538 // Set up ZK 1539 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + 1540 ", sessionid=0x" + 1541 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1542 1543 // Wake up anyone waiting for this server to online 1544 synchronized (online) { 1545 online.set(true); 1546 online.notifyAll(); 1547 } 1548 } catch (Throwable e) { 1549 stop("Failed initialization"); 1550 throw convertThrowableToIOE(cleanup(e, "Failed init"), 1551 "Region server startup failed"); 1552 } finally { 1553 sleeper.skipSleepCycle(); 1554 } 1555 } 1556 1557 protected void initializeMemStoreChunkCreator() { 1558 if (MemStoreLAB.isEnabled(conf)) { 1559 // MSLAB is enabled. So initialize MemStoreChunkPool 1560 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from 1561 // it. 1562 Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf); 1563 long globalMemStoreSize = pair.getFirst(); 1564 boolean offheap = this.regionServerAccounting.isOffheap(); 1565 // When off heap memstore in use, take full area for chunk pool. 1566 float poolSizePercentage = offheap? 1.0F: 1567 conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); 1568 float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, 1569 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); 1570 int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); 1571 // init the chunkCreator 1572 ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, 1573 initialCountPercentage, this.hMemManager); 1574 } 1575 } 1576 1577 private void startHeapMemoryManager() { 1578 this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this, 1579 this.regionServerAccounting); 1580 if (this.hMemManager != null) { 1581 this.hMemManager.start(getChoreService()); 1582 } 1583 } 1584 1585 private void createMyEphemeralNode() throws KeeperException, IOException { 1586 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1587 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1588 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1589 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1590 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1591 } 1592 1593 private void deleteMyEphemeralNode() throws KeeperException { 1594 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1595 } 1596 1597 @Override 1598 public RegionServerAccounting getRegionServerAccounting() { 1599 return regionServerAccounting; 1600 } 1601 1602 /* 1603 * @param r Region to get RegionLoad for. 1604 * @param regionLoadBldr the RegionLoad.Builder, can be null 1605 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1606 * @return RegionLoad instance. 1607 * 1608 * @throws IOException 1609 */ 1610 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1611 RegionSpecifier.Builder regionSpecifier) throws IOException { 1612 byte[] name = r.getRegionInfo().getRegionName(); 1613 int stores = 0; 1614 int storefiles = 0; 1615 int storeUncompressedSizeMB = 0; 1616 int storefileSizeMB = 0; 1617 int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024); 1618 long storefileIndexSizeKB = 0; 1619 int rootLevelIndexSizeKB = 0; 1620 int totalStaticIndexSizeKB = 0; 1621 int totalStaticBloomSizeKB = 0; 1622 long totalCompactingKVs = 0; 1623 long currentCompactedKVs = 0; 1624 List<HStore> storeList = r.getStores(); 1625 stores += storeList.size(); 1626 for (HStore store : storeList) { 1627 storefiles += store.getStorefilesCount(); 1628 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024); 1629 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); 1630 //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1631 storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024; 1632 CompactionProgress progress = store.getCompactionProgress(); 1633 if (progress != null) { 1634 totalCompactingKVs += progress.getTotalCompactingKVs(); 1635 currentCompactedKVs += progress.currentCompactedKVs; 1636 } 1637 rootLevelIndexSizeKB += (int) (store.getStorefilesRootLevelIndexSize() / 1024); 1638 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024); 1639 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024); 1640 } 1641 1642 float dataLocality = 1643 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname()); 1644 if (regionLoadBldr == null) { 1645 regionLoadBldr = RegionLoad.newBuilder(); 1646 } 1647 if (regionSpecifier == null) { 1648 regionSpecifier = RegionSpecifier.newBuilder(); 1649 } 1650 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1651 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1652 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()) 1653 .setStores(stores) 1654 .setStorefiles(storefiles) 1655 .setStoreUncompressedSizeMB(storeUncompressedSizeMB) 1656 .setStorefileSizeMB(storefileSizeMB) 1657 .setMemStoreSizeMB(memstoreSizeMB) 1658 .setStorefileIndexSizeKB(storefileIndexSizeKB) 1659 .setRootIndexSizeKB(rootLevelIndexSizeKB) 1660 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1661 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1662 .setReadRequestsCount(r.getReadRequestsCount()) 1663 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1664 .setWriteRequestsCount(r.getWriteRequestsCount()) 1665 .setTotalCompactingKVs(totalCompactingKVs) 1666 .setCurrentCompactedKVs(currentCompactedKVs) 1667 .setDataLocality(dataLocality) 1668 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); 1669 r.setCompleteSequenceId(regionLoadBldr); 1670 1671 return regionLoadBldr.build(); 1672 } 1673 1674 /** 1675 * @param encodedRegionName 1676 * @return An instance of RegionLoad. 1677 */ 1678 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1679 HRegion r = onlineRegions.get(encodedRegionName); 1680 return r != null ? createRegionLoad(r, null, null) : null; 1681 } 1682 1683 /* 1684 * Inner class that runs on a long period checking if regions need compaction. 1685 */ 1686 private static class CompactionChecker extends ScheduledChore { 1687 private final HRegionServer instance; 1688 private final int majorCompactPriority; 1689 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1690 //Iteration is 1-based rather than 0-based so we don't check for compaction 1691 // immediately upon region server startup 1692 private long iteration = 1; 1693 1694 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1695 super("CompactionChecker", stopper, sleepTime); 1696 this.instance = h; 1697 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1698 1699 /* MajorCompactPriority is configurable. 1700 * If not set, the compaction will use default priority. 1701 */ 1702 this.majorCompactPriority = this.instance.conf. 1703 getInt("hbase.regionserver.compactionChecker.majorCompactPriority", 1704 DEFAULT_PRIORITY); 1705 } 1706 1707 @Override 1708 protected void chore() { 1709 for (Region r : this.instance.onlineRegions.values()) { 1710 if (r == null) { 1711 continue; 1712 } 1713 HRegion hr = (HRegion) r; 1714 for (HStore s : hr.stores.values()) { 1715 try { 1716 long multiplier = s.getCompactionCheckMultiplier(); 1717 assert multiplier > 0; 1718 if (iteration % multiplier != 0) { 1719 continue; 1720 } 1721 if (s.needsCompaction()) { 1722 // Queue a compaction. Will recognize if major is needed. 1723 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1724 getName() + " requests compaction"); 1725 } else if (s.shouldPerformMajorCompaction()) { 1726 s.triggerMajorCompaction(); 1727 if (majorCompactPriority == DEFAULT_PRIORITY || 1728 majorCompactPriority > hr.getCompactPriority()) { 1729 this.instance.compactSplitThread.requestCompaction(hr, s, 1730 getName() + " requests major compaction; use default priority", 1731 Store.NO_PRIORITY, 1732 CompactionLifeCycleTracker.DUMMY, null); 1733 } else { 1734 this.instance.compactSplitThread.requestCompaction(hr, s, 1735 getName() + " requests major compaction; use configured priority", 1736 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1737 } 1738 } 1739 } catch (IOException e) { 1740 LOG.warn("Failed major compaction check on " + r, e); 1741 } 1742 } 1743 } 1744 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1745 } 1746 } 1747 1748 static class PeriodicMemStoreFlusher extends ScheduledChore { 1749 final HRegionServer server; 1750 final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds 1751 final static int MIN_DELAY_TIME = 0; // millisec 1752 public PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1753 super("MemstoreFlusherChore", server, cacheFlushInterval); 1754 this.server = server; 1755 } 1756 1757 @Override 1758 protected void chore() { 1759 final StringBuilder whyFlush = new StringBuilder(); 1760 for (HRegion r : this.server.onlineRegions.values()) { 1761 if (r == null) continue; 1762 if (r.shouldFlush(whyFlush)) { 1763 FlushRequester requester = server.getFlushRequester(); 1764 if (requester != null) { 1765 long randomDelay = (long) RandomUtils.nextInt(0, RANGE_OF_DELAY) + MIN_DELAY_TIME; 1766 LOG.info(getName() + " requesting flush of " + 1767 r.getRegionInfo().getRegionNameAsString() + " because " + 1768 whyFlush.toString() + 1769 " after random delay " + randomDelay + "ms"); 1770 //Throttle the flushes by putting a delay. If we don't throttle, and there 1771 //is a balanced write-load on the regions in a table, we might end up 1772 //overwhelming the filesystem with too many flushes at once. 1773 requester.requestDelayedFlush(r, randomDelay, false); 1774 } 1775 } 1776 } 1777 } 1778 } 1779 1780 /** 1781 * Report the status of the server. A server is online once all the startup is 1782 * completed (setting up filesystem, starting executorService threads, etc.). This 1783 * method is designed mostly to be useful in tests. 1784 * 1785 * @return true if online, false if not. 1786 */ 1787 public boolean isOnline() { 1788 return online.get(); 1789 } 1790 1791 /** 1792 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1793 * be hooked up to WAL. 1794 */ 1795 private void setupWALAndReplication() throws IOException { 1796 WALFactory factory = new WALFactory(conf, serverName.toString()); 1797 1798 // TODO Replication make assumptions here based on the default filesystem impl 1799 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1800 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1801 1802 Path logDir = new Path(walRootDir, logName); 1803 LOG.debug("logDir={}", logDir); 1804 if (this.walFs.exists(logDir)) { 1805 throw new RegionServerRunningException( 1806 "Region server has already created directory at " + this.serverName.toString()); 1807 } 1808 // Instantiate replication if replication enabled. Pass it the log directories. 1809 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, 1810 factory.getWALProvider()); 1811 this.walFactory = factory; 1812 } 1813 1814 /** 1815 * Start up replication source and sink handlers. 1816 * @throws IOException 1817 */ 1818 private void startReplicationService() throws IOException { 1819 if (this.replicationSourceHandler == this.replicationSinkHandler && 1820 this.replicationSourceHandler != null) { 1821 this.replicationSourceHandler.startReplicationService(); 1822 } else { 1823 if (this.replicationSourceHandler != null) { 1824 this.replicationSourceHandler.startReplicationService(); 1825 } 1826 if (this.replicationSinkHandler != null) { 1827 this.replicationSinkHandler.startReplicationService(); 1828 } 1829 } 1830 } 1831 1832 1833 public MetricsRegionServer getRegionServerMetrics() { 1834 return this.metricsRegionServer; 1835 } 1836 1837 /** 1838 * @return Master address tracker instance. 1839 */ 1840 public MasterAddressTracker getMasterAddressTracker() { 1841 return this.masterAddressTracker; 1842 } 1843 1844 /* 1845 * Start maintenance Threads, Server, Worker and lease checker threads. 1846 * Start all threads we need to run. This is called after we've successfully 1847 * registered with the Master. 1848 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we 1849 * get an unhandled exception. We cannot set the handler on all threads. 1850 * Server's internal Listener thread is off limits. For Server, if an OOME, it 1851 * waits a while then retries. Meantime, a flush or a compaction that tries to 1852 * run should trigger same critical condition and the shutdown will run. On 1853 * its way out, this server will shut down Server. Leases are sort of 1854 * inbetween. It has an internal thread that while it inherits from Chore, it 1855 * keeps its own internal stop mechanism so needs to be stopped by this 1856 * hosting server. Worker logs the exception and exits. 1857 */ 1858 private void startServices() throws IOException { 1859 if (!isStopped() && !isAborted()) { 1860 initializeThreads(); 1861 } 1862 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); 1863 this.secureBulkLoadManager.start(); 1864 1865 // Health checker thread. 1866 if (isHealthCheckerConfigured()) { 1867 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1868 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1869 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1870 } 1871 1872 this.walRoller = new LogRoller(this, this); 1873 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1874 1875 // Create the CompactedFileDischarger chore executorService. This chore helps to 1876 // remove the compacted files that will no longer be used in reads. 1877 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1878 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1879 int cleanerInterval = 1880 conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1881 this.compactedFileDischarger = 1882 new CompactedHFilesDischarger(cleanerInterval, this, this); 1883 choreService.scheduleChore(compactedFileDischarger); 1884 1885 // Start executor services 1886 this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION, 1887 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1888 this.executorService.startExecutorService(ExecutorType.RS_OPEN_META, 1889 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); 1890 this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, 1891 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); 1892 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION, 1893 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); 1894 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META, 1895 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); 1896 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1897 this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, 1898 conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); 1899 } 1900 this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( 1901 "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); 1902 // Start the threads for compacted files discharger 1903 this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, 1904 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); 1905 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1906 this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, 1907 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1908 conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); 1909 } 1910 1911 Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", 1912 uncaughtExceptionHandler); 1913 this.cacheFlusher.start(uncaughtExceptionHandler); 1914 1915 if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); 1916 if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); 1917 if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); 1918 if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); 1919 if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); 1920 if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); 1921 if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore); 1922 1923 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 1924 // an unhandled exception, it will just exit. 1925 Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", 1926 uncaughtExceptionHandler); 1927 1928 // Create the log splitting worker and start it 1929 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 1930 // quite a while inside Connection layer. The worker won't be available for other 1931 // tasks even after current task is preempted after a split task times out. 1932 Configuration sinkConf = HBaseConfiguration.create(conf); 1933 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1934 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 1935 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1936 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 1937 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 1938 if (this.csm != null) { 1939 // SplitLogWorker needs csm. If none, don't start this. 1940 this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, 1941 this, walFactory); 1942 splitLogWorker.start(); 1943 } else { 1944 LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null"); 1945 } 1946 1947 // Memstore services. 1948 startHeapMemoryManager(); 1949 // Call it after starting HeapMemoryManager. 1950 initializeMemStoreChunkCreator(); 1951 } 1952 1953 private void initializeThreads() throws IOException { 1954 // Cache flushing thread. 1955 this.cacheFlusher = new MemStoreFlusher(conf, this); 1956 1957 // Compaction thread 1958 this.compactSplitThread = new CompactSplit(this); 1959 1960 // Background thread to check for compactions; needed if region has not gotten updates 1961 // in a while. It will take care of not checking too frequently on store-by-store basis. 1962 this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); 1963 this.periodicFlusher = new PeriodicMemStoreFlusher(this.threadWakeFrequency, this); 1964 this.leases = new Leases(this.threadWakeFrequency); 1965 1966 // Create the thread to clean the moved regions list 1967 movedRegionsCleaner = MovedRegionsCleaner.create(this); 1968 1969 if (this.nonceManager != null) { 1970 // Create the scheduled chore that cleans up nonces. 1971 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 1972 } 1973 1974 // Setup the Quota Manager 1975 rsQuotaManager = new RegionServerRpcQuotaManager(this); 1976 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 1977 1978 if (QuotaUtil.isQuotaEnabled(conf)) { 1979 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 1980 } 1981 1982 1983 boolean onlyMetaRefresh = false; 1984 int storefileRefreshPeriod = conf.getInt( 1985 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1986 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 1987 if (storefileRefreshPeriod == 0) { 1988 storefileRefreshPeriod = conf.getInt( 1989 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 1990 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 1991 onlyMetaRefresh = true; 1992 } 1993 if (storefileRefreshPeriod > 0) { 1994 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, 1995 onlyMetaRefresh, this, this); 1996 } 1997 registerConfigurationObservers(); 1998 } 1999 2000 private void registerConfigurationObservers() { 2001 // Registering the compactSplitThread object with the ConfigurationManager. 2002 configurationManager.registerObserver(this.compactSplitThread); 2003 configurationManager.registerObserver(this.rpcServices); 2004 configurationManager.registerObserver(this); 2005 } 2006 2007 /** 2008 * Puts up the webui. 2009 * @return Returns final port -- maybe different from what we started with. 2010 * @throws IOException 2011 */ 2012 private int putUpWebUI() throws IOException { 2013 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 2014 HConstants.DEFAULT_REGIONSERVER_INFOPORT); 2015 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); 2016 2017 if(this instanceof HMaster) { 2018 port = conf.getInt(HConstants.MASTER_INFO_PORT, 2019 HConstants.DEFAULT_MASTER_INFOPORT); 2020 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); 2021 } 2022 // -1 is for disabling info server 2023 if (port < 0) return port; 2024 2025 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { 2026 String msg = 2027 "Failed to start http info server. Address " + addr 2028 + " does not belong to this host. Correct configuration parameter: " 2029 + "hbase.regionserver.info.bindAddress"; 2030 LOG.error(msg); 2031 throw new IOException(msg); 2032 } 2033 // check if auto port bind enabled 2034 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, 2035 false); 2036 while (true) { 2037 try { 2038 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf); 2039 infoServer.addServlet("dump", "/dump", getDumpServlet()); 2040 configureInfoServer(); 2041 this.infoServer.start(); 2042 break; 2043 } catch (BindException e) { 2044 if (!auto) { 2045 // auto bind disabled throw BindException 2046 LOG.error("Failed binding http info server to port: " + port); 2047 throw e; 2048 } 2049 // auto bind enabled, try to use another port 2050 LOG.info("Failed binding http info server to port: " + port); 2051 port++; 2052 } 2053 } 2054 port = this.infoServer.getPort(); 2055 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port); 2056 int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT, 2057 HConstants.DEFAULT_MASTER_INFOPORT); 2058 conf.setInt("hbase.master.info.port.orig", masterInfoPort); 2059 conf.setInt(HConstants.MASTER_INFO_PORT, port); 2060 return port; 2061 } 2062 2063 /* 2064 * Verify that server is healthy 2065 */ 2066 private boolean isHealthy() { 2067 if (!fsOk) { 2068 // File system problem 2069 return false; 2070 } 2071 // Verify that all threads are alive 2072 boolean healthy = (this.leases == null || this.leases.isAlive()) 2073 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2074 && (this.walRoller == null || this.walRoller.isAlive()) 2075 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2076 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2077 if (!healthy) { 2078 stop("One or more threads are no longer alive -- stop"); 2079 } 2080 return healthy; 2081 } 2082 2083 @Override 2084 public List<WAL> getWALs() throws IOException { 2085 return walFactory.getWALs(); 2086 } 2087 2088 @Override 2089 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2090 WAL wal = walFactory.getWAL(regionInfo); 2091 if (this.walRoller != null) { 2092 this.walRoller.addWAL(wal); 2093 } 2094 return wal; 2095 } 2096 2097 public LogRoller getWalRoller() { 2098 return walRoller; 2099 } 2100 2101 @Override 2102 public Connection getConnection() { 2103 return getClusterConnection(); 2104 } 2105 2106 @Override 2107 public ClusterConnection getClusterConnection() { 2108 return this.clusterConnection; 2109 } 2110 2111 @Override 2112 public MetaTableLocator getMetaTableLocator() { 2113 return this.metaTableLocator; 2114 } 2115 2116 @Override 2117 public void stop(final String msg) { 2118 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2119 } 2120 2121 /** 2122 * Stops the regionserver. 2123 * @param msg Status message 2124 * @param force True if this is a regionserver abort 2125 * @param user The user executing the stop request, or null if no user is associated 2126 */ 2127 public void stop(final String msg, final boolean force, final User user) { 2128 if (!this.stopped) { 2129 LOG.info("***** STOPPING region server '" + this + "' *****"); 2130 if (this.rsHost != null) { 2131 // when forced via abort don't allow CPs to override 2132 try { 2133 this.rsHost.preStop(msg, user); 2134 } catch (IOException ioe) { 2135 if (!force) { 2136 LOG.warn("The region server did not stop", ioe); 2137 return; 2138 } 2139 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2140 } 2141 } 2142 this.stopped = true; 2143 LOG.info("STOPPED: " + msg); 2144 // Wakes run() if it is sleeping 2145 sleeper.skipSleepCycle(); 2146 } 2147 } 2148 2149 public void waitForServerOnline(){ 2150 while (!isStopped() && !isOnline()) { 2151 synchronized (online) { 2152 try { 2153 online.wait(msgInterval); 2154 } catch (InterruptedException ie) { 2155 Thread.currentThread().interrupt(); 2156 break; 2157 } 2158 } 2159 } 2160 } 2161 2162 @Override 2163 public void postOpenDeployTasks(final PostOpenDeployContext context) 2164 throws KeeperException, IOException { 2165 HRegion r = context.getRegion(); 2166 long masterSystemTime = context.getMasterSystemTime(); 2167 rpcServices.checkOpen(); 2168 LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString()); 2169 // Do checks to see if we need to compact (references or too many files) 2170 for (HStore s : r.stores.values()) { 2171 if (s.hasReferences() || s.needsCompaction()) { 2172 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2173 } 2174 } 2175 long openSeqNum = r.getOpenSeqNum(); 2176 if (openSeqNum == HConstants.NO_SEQNUM) { 2177 // If we opened a region, we should have read some sequence number from it. 2178 LOG.error("No sequence number found when opening " + 2179 r.getRegionInfo().getRegionNameAsString()); 2180 openSeqNum = 0; 2181 } 2182 2183 // Notify master 2184 if (!reportRegionStateTransition(new RegionStateTransitionContext( 2185 TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) { 2186 throw new IOException("Failed to report opened region to master: " 2187 + r.getRegionInfo().getRegionNameAsString()); 2188 } 2189 2190 triggerFlushInPrimaryRegion(r); 2191 2192 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2193 } 2194 2195 @Override 2196 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2197 TransitionCode code = context.getCode(); 2198 long openSeqNum = context.getOpenSeqNum(); 2199 long masterSystemTime = context.getMasterSystemTime(); 2200 RegionInfo[] hris = context.getHris(); 2201 2202 if (TEST_SKIP_REPORTING_TRANSITION) { 2203 // This is for testing only in case there is no master 2204 // to handle the region transition report at all. 2205 if (code == TransitionCode.OPENED) { 2206 Preconditions.checkArgument(hris != null && hris.length == 1); 2207 if (hris[0].isMetaRegion()) { 2208 try { 2209 MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, 2210 hris[0].getReplicaId(),State.OPEN); 2211 } catch (KeeperException e) { 2212 LOG.info("Failed to update meta location", e); 2213 return false; 2214 } 2215 } else { 2216 try { 2217 MetaTableAccessor.updateRegionLocation(clusterConnection, 2218 hris[0], serverName, openSeqNum, masterSystemTime); 2219 } catch (IOException e) { 2220 LOG.info("Failed to update meta", e); 2221 return false; 2222 } 2223 } 2224 } 2225 return true; 2226 } 2227 2228 ReportRegionStateTransitionRequest.Builder builder = 2229 ReportRegionStateTransitionRequest.newBuilder(); 2230 builder.setServer(ProtobufUtil.toServerName(serverName)); 2231 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2232 transition.setTransitionCode(code); 2233 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2234 transition.setOpenSeqNum(openSeqNum); 2235 } 2236 for (RegionInfo hri: hris) { 2237 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2238 } 2239 ReportRegionStateTransitionRequest request = builder.build(); 2240 int tries = 0; 2241 long pauseTime = INIT_PAUSE_TIME_MS; 2242 // Keep looping till we get an error. We want to send reports even though server is going down. 2243 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2244 // HRegionServer does down. 2245 while (this.clusterConnection != null && !this.clusterConnection.isClosed()) { 2246 RegionServerStatusService.BlockingInterface rss = rssStub; 2247 try { 2248 if (rss == null) { 2249 createRegionServerStatusStub(); 2250 continue; 2251 } 2252 ReportRegionStateTransitionResponse response = 2253 rss.reportRegionStateTransition(null, request); 2254 if (response.hasErrorMessage()) { 2255 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2256 break; 2257 } 2258 // Log if we had to retry else don't log unless TRACE. We want to 2259 // know if were successful after an attempt showed in logs as failed. 2260 if (tries > 0 || LOG.isTraceEnabled()) { 2261 LOG.info("TRANSITION REPORTED " + request); 2262 } 2263 // NOTE: Return mid-method!!! 2264 return true; 2265 } catch (ServiceException se) { 2266 IOException ioe = ProtobufUtil.getRemoteException(se); 2267 boolean pause = ioe instanceof ServerNotRunningYetException || 2268 ioe instanceof PleaseHoldException; 2269 if (pause) { 2270 // Do backoff else we flood the Master with requests. 2271 pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries); 2272 } else { 2273 pauseTime = INIT_PAUSE_TIME_MS; // Reset. 2274 } 2275 LOG.info("Failed report transition " + 2276 TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" + 2277 (pause? 2278 " after " + pauseTime + "ms delay (Master is coming online...).": 2279 " immediately."), 2280 ioe); 2281 if (pause) Threads.sleep(pauseTime); 2282 tries++; 2283 if (rssStub == rss) { 2284 rssStub = null; 2285 } 2286 } 2287 } 2288 return false; 2289 } 2290 2291 /** 2292 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2293 * block this thread. See RegionReplicaFlushHandler for details. 2294 */ 2295 void triggerFlushInPrimaryRegion(final HRegion region) { 2296 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2297 return; 2298 } 2299 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) || 2300 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled( 2301 region.conf)) { 2302 region.setReadsEnabled(true); 2303 return; 2304 } 2305 2306 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2307 // RegionReplicaFlushHandler might reset this. 2308 2309 // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2310 if (this.executorService != null) { 2311 this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, 2312 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); 2313 } 2314 } 2315 2316 @Override 2317 public RpcServerInterface getRpcServer() { 2318 return rpcServices.rpcServer; 2319 } 2320 2321 @VisibleForTesting 2322 public RSRpcServices getRSRpcServices() { 2323 return rpcServices; 2324 } 2325 2326 /** 2327 * Cause the server to exit without closing the regions it is serving, the log 2328 * it is using and without notifying the master. Used unit testing and on 2329 * catastrophic events such as HDFS is yanked out from under hbase or we OOME. 2330 * 2331 * @param reason 2332 * the reason we are aborting 2333 * @param cause 2334 * the exception that caused the abort, or null 2335 */ 2336 @Override 2337 public void abort(String reason, Throwable cause) { 2338 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2339 if (cause != null) { 2340 LOG.error(HBaseMarkers.FATAL, msg, cause); 2341 } else { 2342 LOG.error(HBaseMarkers.FATAL, msg); 2343 } 2344 this.abortRequested = true; 2345 // HBASE-4014: show list of coprocessors that were loaded to help debug 2346 // regionserver crashes.Note that we're implicitly using 2347 // java.util.HashSet's toString() method to print the coprocessor names. 2348 LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " + 2349 CoprocessorHost.getLoadedCoprocessors()); 2350 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2351 try { 2352 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2353 } catch (MalformedObjectNameException | IOException e) { 2354 LOG.warn("Failed dumping metrics", e); 2355 } 2356 2357 // Do our best to report our abort to the master, but this may not work 2358 try { 2359 if (cause != null) { 2360 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2361 } 2362 // Report to the master but only if we have already registered with the master. 2363 if (rssStub != null && this.serverName != null) { 2364 ReportRSFatalErrorRequest.Builder builder = 2365 ReportRSFatalErrorRequest.newBuilder(); 2366 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2367 builder.setErrorMessage(msg); 2368 rssStub.reportRSFatalError(null, builder.build()); 2369 } 2370 } catch (Throwable t) { 2371 LOG.warn("Unable to report fatal error to master", t); 2372 } 2373 // shutdown should be run as the internal user 2374 stop(reason, true, null); 2375 } 2376 2377 /** 2378 * @see HRegionServer#abort(String, Throwable) 2379 */ 2380 public void abort(String reason) { 2381 abort(reason, null); 2382 } 2383 2384 @Override 2385 public boolean isAborted() { 2386 return this.abortRequested; 2387 } 2388 2389 /* 2390 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup 2391 * logs but it does close socket in case want to bring up server on old 2392 * hostname+port immediately. 2393 */ 2394 @VisibleForTesting 2395 protected void kill() { 2396 this.killed = true; 2397 abort("Simulated kill"); 2398 } 2399 2400 /** 2401 * Called on stop/abort before closing the cluster connection and meta locator. 2402 */ 2403 protected void sendShutdownInterrupt() { 2404 } 2405 2406 /** 2407 * Wait on all threads to finish. Presumption is that all closes and stops 2408 * have already been called. 2409 */ 2410 protected void stopServiceThreads() { 2411 // clean up the scheduled chores 2412 if (this.choreService != null) choreService.shutdown(); 2413 if (this.nonceManagerChore != null) nonceManagerChore.cancel(true); 2414 if (this.compactionChecker != null) compactionChecker.cancel(true); 2415 if (this.periodicFlusher != null) periodicFlusher.cancel(true); 2416 if (this.healthCheckChore != null) healthCheckChore.cancel(true); 2417 if (this.storefileRefresher != null) storefileRefresher.cancel(true); 2418 if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true); 2419 if (this.fsUtilizationChore != null) fsUtilizationChore.cancel(true); 2420 2421 if (this.cacheFlusher != null) { 2422 this.cacheFlusher.join(); 2423 } 2424 2425 if (this.spanReceiverHost != null) { 2426 this.spanReceiverHost.closeReceivers(); 2427 } 2428 if (this.walRoller != null) { 2429 this.walRoller.close(); 2430 } 2431 if (this.compactSplitThread != null) { 2432 this.compactSplitThread.join(); 2433 } 2434 if (this.executorService != null) this.executorService.shutdown(); 2435 if (this.replicationSourceHandler != null && 2436 this.replicationSourceHandler == this.replicationSinkHandler) { 2437 this.replicationSourceHandler.stopReplicationService(); 2438 } else { 2439 if (this.replicationSourceHandler != null) { 2440 this.replicationSourceHandler.stopReplicationService(); 2441 } 2442 if (this.replicationSinkHandler != null) { 2443 this.replicationSinkHandler.stopReplicationService(); 2444 } 2445 } 2446 } 2447 2448 /** 2449 * @return Return the object that implements the replication 2450 * source executorService. 2451 */ 2452 @VisibleForTesting 2453 public ReplicationSourceService getReplicationSourceService() { 2454 return replicationSourceHandler; 2455 } 2456 2457 /** 2458 * @return Return the object that implements the replication 2459 * sink executorService. 2460 */ 2461 ReplicationSinkService getReplicationSinkService() { 2462 return replicationSinkHandler; 2463 } 2464 2465 /** 2466 * Get the current master from ZooKeeper and open the RPC connection to it. 2467 * To get a fresh connection, the current rssStub must be null. 2468 * Method will block until a master is available. You can break from this 2469 * block by requesting the server stop. 2470 * 2471 * @return master + port, or null if server has been stopped 2472 */ 2473 @VisibleForTesting 2474 protected synchronized ServerName createRegionServerStatusStub() { 2475 // Create RS stub without refreshing the master node from ZK, use cached data 2476 return createRegionServerStatusStub(false); 2477 } 2478 2479 /** 2480 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2481 * connection, the current rssStub must be null. Method will block until a master is available. 2482 * You can break from this block by requesting the server stop. 2483 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2484 * @return master + port, or null if server has been stopped 2485 */ 2486 @VisibleForTesting 2487 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2488 if (rssStub != null) { 2489 return masterAddressTracker.getMasterAddress(); 2490 } 2491 ServerName sn = null; 2492 long previousLogTime = 0; 2493 RegionServerStatusService.BlockingInterface intRssStub = null; 2494 LockService.BlockingInterface intLockStub = null; 2495 boolean interrupted = false; 2496 try { 2497 while (keepLooping()) { 2498 sn = this.masterAddressTracker.getMasterAddress(refresh); 2499 if (sn == null) { 2500 if (!keepLooping()) { 2501 // give up with no connection. 2502 LOG.debug("No master found and cluster is stopped; bailing out"); 2503 return null; 2504 } 2505 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 2506 LOG.debug("No master found; retry"); 2507 previousLogTime = System.currentTimeMillis(); 2508 } 2509 refresh = true; // let's try pull it from ZK directly 2510 if (sleep(200)) { 2511 interrupted = true; 2512 } 2513 continue; 2514 } 2515 2516 // If we are on the active master, use the shortcut 2517 if (this instanceof HMaster && sn.equals(getServerName())) { 2518 intRssStub = ((HMaster)this).getMasterRpcServices(); 2519 intLockStub = ((HMaster)this).getMasterRpcServices(); 2520 break; 2521 } 2522 try { 2523 BlockingRpcChannel channel = 2524 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), 2525 shortOperationTimeout); 2526 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2527 intLockStub = LockService.newBlockingStub(channel); 2528 break; 2529 } catch (IOException e) { 2530 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 2531 e = e instanceof RemoteException ? 2532 ((RemoteException)e).unwrapRemoteException() : e; 2533 if (e instanceof ServerNotRunningYetException) { 2534 LOG.info("Master isn't available yet, retrying"); 2535 } else { 2536 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2537 } 2538 previousLogTime = System.currentTimeMillis(); 2539 } 2540 if (sleep(200)) { 2541 interrupted = true; 2542 } 2543 } 2544 } 2545 } finally { 2546 if (interrupted) { 2547 Thread.currentThread().interrupt(); 2548 } 2549 } 2550 this.rssStub = intRssStub; 2551 this.lockStub = intLockStub; 2552 return sn; 2553 } 2554 2555 /** 2556 * @return True if we should break loop because cluster is going down or 2557 * this server has been stopped or hdfs has gone bad. 2558 */ 2559 private boolean keepLooping() { 2560 return !this.stopped && isClusterUp(); 2561 } 2562 2563 /* 2564 * Let the master know we're here Run initialization using parameters passed 2565 * us by the master. 2566 * @return A Map of key/value configurations we got from the Master else 2567 * null if we failed to register. 2568 * @throws IOException 2569 */ 2570 private RegionServerStartupResponse reportForDuty() throws IOException { 2571 if (this.masterless) return RegionServerStartupResponse.getDefaultInstance(); 2572 ServerName masterServerName = createRegionServerStatusStub(true); 2573 if (masterServerName == null) return null; 2574 RegionServerStartupResponse result = null; 2575 try { 2576 rpcServices.requestCount.reset(); 2577 rpcServices.rpcGetRequestCount.reset(); 2578 rpcServices.rpcScanRequestCount.reset(); 2579 rpcServices.rpcMultiRequestCount.reset(); 2580 rpcServices.rpcMutateRequestCount.reset(); 2581 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2582 + rpcServices.isa.getPort() + ", startcode=" + this.startcode); 2583 long now = EnvironmentEdgeManager.currentTime(); 2584 int port = rpcServices.isa.getPort(); 2585 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2586 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2587 request.setUseThisHostnameInstead(useThisHostnameInstead); 2588 } 2589 request.setPort(port); 2590 request.setServerStartCode(this.startcode); 2591 request.setServerCurrentTime(now); 2592 result = this.rssStub.regionServerStartup(null, request.build()); 2593 } catch (ServiceException se) { 2594 IOException ioe = ProtobufUtil.getRemoteException(se); 2595 if (ioe instanceof ClockOutOfSyncException) { 2596 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", 2597 ioe); 2598 // Re-throw IOE will cause RS to abort 2599 throw ioe; 2600 } else if (ioe instanceof ServerNotRunningYetException) { 2601 LOG.debug("Master is not running yet"); 2602 } else { 2603 LOG.warn("error telling master we are up", se); 2604 } 2605 rssStub = null; 2606 } 2607 return result; 2608 } 2609 2610 @Override 2611 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2612 try { 2613 GetLastFlushedSequenceIdRequest req = 2614 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2615 RegionServerStatusService.BlockingInterface rss = rssStub; 2616 if (rss == null) { // Try to connect one more time 2617 createRegionServerStatusStub(); 2618 rss = rssStub; 2619 if (rss == null) { 2620 // Still no luck, we tried 2621 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2622 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2623 .build(); 2624 } 2625 } 2626 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2627 return RegionStoreSequenceIds.newBuilder() 2628 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2629 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2630 } catch (ServiceException e) { 2631 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2632 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2633 .build(); 2634 } 2635 } 2636 2637 /** 2638 * Closes all regions. Called on our way out. 2639 * Assumes that its not possible for new regions to be added to onlineRegions 2640 * while this method runs. 2641 */ 2642 protected void closeAllRegions(final boolean abort) { 2643 closeUserRegions(abort); 2644 closeMetaTableRegions(abort); 2645 } 2646 2647 /** 2648 * Close meta region if we carry it 2649 * @param abort Whether we're running an abort. 2650 */ 2651 void closeMetaTableRegions(final boolean abort) { 2652 HRegion meta = null; 2653 this.lock.writeLock().lock(); 2654 try { 2655 for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) { 2656 RegionInfo hri = e.getValue().getRegionInfo(); 2657 if (hri.isMetaRegion()) { 2658 meta = e.getValue(); 2659 } 2660 if (meta != null) break; 2661 } 2662 } finally { 2663 this.lock.writeLock().unlock(); 2664 } 2665 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2666 } 2667 2668 /** 2669 * Schedule closes on all user regions. 2670 * Should be safe calling multiple times because it wont' close regions 2671 * that are already closed or that are closing. 2672 * @param abort Whether we're running an abort. 2673 */ 2674 void closeUserRegions(final boolean abort) { 2675 this.lock.writeLock().lock(); 2676 try { 2677 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 2678 HRegion r = e.getValue(); 2679 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2680 // Don't update zk with this close transition; pass false. 2681 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2682 } 2683 } 2684 } finally { 2685 this.lock.writeLock().unlock(); 2686 } 2687 } 2688 2689 /** @return the info server */ 2690 public InfoServer getInfoServer() { 2691 return infoServer; 2692 } 2693 2694 /** 2695 * @return true if a stop has been requested. 2696 */ 2697 @Override 2698 public boolean isStopped() { 2699 return this.stopped; 2700 } 2701 2702 @Override 2703 public boolean isStopping() { 2704 return this.stopping; 2705 } 2706 2707 /** 2708 * 2709 * @return the configuration 2710 */ 2711 @Override 2712 public Configuration getConfiguration() { 2713 return conf; 2714 } 2715 2716 /** @return the write lock for the server */ 2717 ReentrantReadWriteLock.WriteLock getWriteLock() { 2718 return lock.writeLock(); 2719 } 2720 2721 public int getNumberOfOnlineRegions() { 2722 return this.onlineRegions.size(); 2723 } 2724 2725 boolean isOnlineRegionsEmpty() { 2726 return this.onlineRegions.isEmpty(); 2727 } 2728 2729 /** 2730 * For tests, web ui and metrics. 2731 * This method will only work if HRegionServer is in the same JVM as client; 2732 * HRegion cannot be serialized to cross an rpc. 2733 */ 2734 public Collection<HRegion> getOnlineRegionsLocalContext() { 2735 Collection<HRegion> regions = this.onlineRegions.values(); 2736 return Collections.unmodifiableCollection(regions); 2737 } 2738 2739 @Override 2740 public void addRegion(HRegion region) { 2741 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2742 configurationManager.registerObserver(region); 2743 } 2744 2745 /** 2746 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2747 * the biggest. If two regions are the same size, then the last one found wins; i.e. this 2748 * method may NOT return all regions. 2749 */ 2750 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2751 // we'll sort the regions in reverse 2752 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>( 2753 new Comparator<Long>() { 2754 @Override 2755 public int compare(Long a, Long b) { 2756 return -1 * a.compareTo(b); 2757 } 2758 }); 2759 // Copy over all regions. Regions are sorted by size with biggest first. 2760 for (HRegion region : this.onlineRegions.values()) { 2761 sortedRegions.put(region.getMemStoreOffHeapSize(), region); 2762 } 2763 return sortedRegions; 2764 } 2765 2766 /** 2767 * @return A new Map of online regions sorted by region heap size with the first entry being the 2768 * biggest. If two regions are the same size, then the last one found wins; i.e. this method 2769 * may NOT return all regions. 2770 */ 2771 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2772 // we'll sort the regions in reverse 2773 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>( 2774 new Comparator<Long>() { 2775 @Override 2776 public int compare(Long a, Long b) { 2777 return -1 * a.compareTo(b); 2778 } 2779 }); 2780 // Copy over all regions. Regions are sorted by size with biggest first. 2781 for (HRegion region : this.onlineRegions.values()) { 2782 sortedRegions.put(region.getMemStoreHeapSize(), region); 2783 } 2784 return sortedRegions; 2785 } 2786 2787 /** 2788 * @return time stamp in millis of when this region server was started 2789 */ 2790 public long getStartcode() { 2791 return this.startcode; 2792 } 2793 2794 /** @return reference to FlushRequester */ 2795 @Override 2796 public FlushRequester getFlushRequester() { 2797 return this.cacheFlusher; 2798 } 2799 2800 @Override 2801 public CompactionRequester getCompactionRequestor() { 2802 return this.compactSplitThread; 2803 } 2804 2805 /** 2806 * Get the top N most loaded regions this server is serving so we can tell the 2807 * master which regions it can reallocate if we're overloaded. TODO: actually 2808 * calculate which regions are most loaded. (Right now, we're just grabbing 2809 * the first N regions being served regardless of load.) 2810 */ 2811 protected RegionInfo[] getMostLoadedRegions() { 2812 ArrayList<RegionInfo> regions = new ArrayList<>(); 2813 for (Region r : onlineRegions.values()) { 2814 if (!r.isAvailable()) { 2815 continue; 2816 } 2817 if (regions.size() < numRegionsToReport) { 2818 regions.add(r.getRegionInfo()); 2819 } else { 2820 break; 2821 } 2822 } 2823 return regions.toArray(new RegionInfo[regions.size()]); 2824 } 2825 2826 @Override 2827 public Leases getLeases() { 2828 return leases; 2829 } 2830 2831 /** 2832 * @return Return the rootDir. 2833 */ 2834 protected Path getRootDir() { 2835 return rootDir; 2836 } 2837 2838 /** 2839 * @return Return the fs. 2840 */ 2841 @Override 2842 public FileSystem getFileSystem() { 2843 return fs; 2844 } 2845 2846 /** 2847 * @return Return the walRootDir. 2848 */ 2849 public Path getWALRootDir() { 2850 return walRootDir; 2851 } 2852 2853 /** 2854 * @return Return the walFs. 2855 */ 2856 public FileSystem getWALFileSystem() { 2857 return walFs; 2858 } 2859 2860 @Override 2861 public String toString() { 2862 return getServerName().toString(); 2863 } 2864 2865 /** 2866 * Interval at which threads should run 2867 * 2868 * @return the interval 2869 */ 2870 public int getThreadWakeFrequency() { 2871 return threadWakeFrequency; 2872 } 2873 2874 @Override 2875 public ZKWatcher getZooKeeper() { 2876 return zooKeeper; 2877 } 2878 2879 @Override 2880 public CoordinatedStateManager getCoordinatedStateManager() { 2881 return csm; 2882 } 2883 2884 @Override 2885 public ServerName getServerName() { 2886 return serverName; 2887 } 2888 2889 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){ 2890 return this.rsHost; 2891 } 2892 2893 @Override 2894 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2895 return this.regionsInTransitionInRS; 2896 } 2897 2898 @Override 2899 public ExecutorService getExecutorService() { 2900 return executorService; 2901 } 2902 2903 @Override 2904 public ChoreService getChoreService() { 2905 return choreService; 2906 } 2907 2908 @Override 2909 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 2910 return rsQuotaManager; 2911 } 2912 2913 // 2914 // Main program and support routines 2915 // 2916 /** 2917 * Load the replication executorService objects, if any 2918 */ 2919 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 2920 FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { 2921 if ((server instanceof HMaster) && 2922 (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { 2923 return; 2924 } 2925 2926 // read in the name of the source replication class from the config file. 2927 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 2928 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2929 2930 // read in the name of the sink replication class from the config file. 2931 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 2932 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2933 2934 // If both the sink and the source class names are the same, then instantiate 2935 // only one object. 2936 if (sourceClassname.equals(sinkClassname)) { 2937 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2938 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 2939 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 2940 } else { 2941 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2942 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 2943 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 2944 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 2945 } 2946 } 2947 2948 private static <T extends ReplicationService> T newReplicationInstance(String classname, 2949 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 2950 Path oldLogDir, WALProvider walProvider) throws IOException { 2951 Class<? extends T> clazz = null; 2952 try { 2953 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 2954 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 2955 } catch (java.lang.ClassNotFoundException nfe) { 2956 throw new IOException("Could not find class for " + classname); 2957 } 2958 T service = ReflectionUtils.newInstance(clazz, conf); 2959 service.initialize(server, walFs, logDir, oldLogDir, walProvider); 2960 return service; 2961 } 2962 2963 /** 2964 * Utility for constructing an instance of the passed HRegionServer class. 2965 * 2966 * @param regionServerClass 2967 * @param conf2 2968 * @return HRegionServer instance. 2969 */ 2970 public static HRegionServer constructRegionServer( 2971 Class<? extends HRegionServer> regionServerClass, 2972 final Configuration conf2) { 2973 try { 2974 Constructor<? extends HRegionServer> c = regionServerClass 2975 .getConstructor(Configuration.class); 2976 return c.newInstance(conf2); 2977 } catch (Exception e) { 2978 throw new RuntimeException("Failed construction of " + "Regionserver: " 2979 + regionServerClass.toString(), e); 2980 } 2981 } 2982 2983 /** 2984 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 2985 */ 2986 public static void main(String[] args) throws Exception { 2987 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 2988 VersionInfo.logVersion(); 2989 Configuration conf = HBaseConfiguration.create(); 2990 @SuppressWarnings("unchecked") 2991 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 2992 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 2993 2994 new HRegionServerCommandLine(regionServerClass).doMain(args); 2995 } 2996 2997 /** 2998 * Gets the online regions of the specified table. 2999 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>. 3000 * Only returns <em>online</em> regions. If a region on this table has been 3001 * closed during a disable, etc., it will not be included in the returned list. 3002 * So, the returned list may not necessarily be ALL regions in this table, its 3003 * all the ONLINE regions in the table. 3004 * @param tableName 3005 * @return Online regions from <code>tableName</code> 3006 */ 3007 @Override 3008 public List<HRegion> getRegions(TableName tableName) { 3009 List<HRegion> tableRegions = new ArrayList<>(); 3010 synchronized (this.onlineRegions) { 3011 for (HRegion region: this.onlineRegions.values()) { 3012 RegionInfo regionInfo = region.getRegionInfo(); 3013 if(regionInfo.getTable().equals(tableName)) { 3014 tableRegions.add(region); 3015 } 3016 } 3017 } 3018 return tableRegions; 3019 } 3020 3021 @Override 3022 public List<HRegion> getRegions() { 3023 List<HRegion> allRegions = new ArrayList<>(); 3024 synchronized (this.onlineRegions) { 3025 // Return a clone copy of the onlineRegions 3026 allRegions.addAll(onlineRegions.values()); 3027 } 3028 return allRegions; 3029 } 3030 3031 /** 3032 * Gets the online tables in this RS. 3033 * This method looks at the in-memory onlineRegions. 3034 * @return all the online tables in this RS 3035 */ 3036 public Set<TableName> getOnlineTables() { 3037 Set<TableName> tables = new HashSet<>(); 3038 synchronized (this.onlineRegions) { 3039 for (Region region: this.onlineRegions.values()) { 3040 tables.add(region.getTableDescriptor().getTableName()); 3041 } 3042 } 3043 return tables; 3044 } 3045 3046 // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). 3047 public String[] getRegionServerCoprocessors() { 3048 TreeSet<String> coprocessors = new TreeSet<>(); 3049 try { 3050 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3051 } catch (IOException exception) { 3052 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " + 3053 "skipping."); 3054 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3055 } 3056 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3057 for (HRegion region: regions) { 3058 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3059 try { 3060 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3061 } catch (IOException exception) { 3062 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region + 3063 "; skipping."); 3064 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3065 } 3066 } 3067 coprocessors.addAll(rsHost.getCoprocessors()); 3068 return coprocessors.toArray(new String[coprocessors.size()]); 3069 } 3070 3071 /** 3072 * Try to close the region, logs a warning on failure but continues. 3073 * @param region Region to close 3074 */ 3075 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3076 try { 3077 if (!closeRegion(region.getEncodedName(), abort, null)) { 3078 LOG.warn("Failed to close " + region.getRegionNameAsString() + 3079 " - ignoring and continuing"); 3080 } 3081 } catch (IOException e) { 3082 LOG.warn("Failed to close " + region.getRegionNameAsString() + 3083 " - ignoring and continuing", e); 3084 } 3085 } 3086 3087 /** 3088 * Close asynchronously a region, can be called from the master or internally by the regionserver 3089 * when stopping. If called from the master, the region will update the znode status. 3090 * 3091 * <p> 3092 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3093 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3094 * </p> 3095 3096 * <p> 3097 * If a close was in progress, this new request will be ignored, and an exception thrown. 3098 * </p> 3099 * 3100 * @param encodedName Region to close 3101 * @param abort True if we are aborting 3102 * @return True if closed a region. 3103 * @throws NotServingRegionException if the region is not online 3104 */ 3105 protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn) 3106 throws NotServingRegionException { 3107 //Check for permissions to close. 3108 HRegion actualRegion = this.getRegion(encodedName); 3109 // Can be null if we're calling close on a region that's not online 3110 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3111 try { 3112 actualRegion.getCoprocessorHost().preClose(false); 3113 } catch (IOException exp) { 3114 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3115 return false; 3116 } 3117 } 3118 3119 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), 3120 Boolean.FALSE); 3121 3122 if (Boolean.TRUE.equals(previous)) { 3123 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " + 3124 "trying to OPEN. Cancelling OPENING."); 3125 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3126 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3127 // We're going to try to do a standard close then. 3128 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." + 3129 " Doing a standard close now"); 3130 return closeRegion(encodedName, abort, sn); 3131 } 3132 // Let's get the region from the online region list again 3133 actualRegion = this.getRegion(encodedName); 3134 if (actualRegion == null) { // If already online, we still need to close it. 3135 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3136 // The master deletes the znode when it receives this exception. 3137 throw new NotServingRegionException("The region " + encodedName + 3138 " was opening but not yet served. Opening is cancelled."); 3139 } 3140 } else if (Boolean.FALSE.equals(previous)) { 3141 LOG.info("Received CLOSE for the region: " + encodedName + 3142 ", which we are already trying to CLOSE, but not completed yet"); 3143 return true; 3144 } 3145 3146 if (actualRegion == null) { 3147 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3148 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3149 // The master deletes the znode when it receives this exception. 3150 throw new NotServingRegionException("The region " + encodedName + 3151 " is not online, and is not opening."); 3152 } 3153 3154 CloseRegionHandler crh; 3155 final RegionInfo hri = actualRegion.getRegionInfo(); 3156 if (hri.isMetaRegion()) { 3157 crh = new CloseMetaHandler(this, this, hri, abort); 3158 } else { 3159 crh = new CloseRegionHandler(this, this, hri, abort, sn); 3160 } 3161 this.executorService.submit(crh); 3162 return true; 3163 } 3164 3165 /** 3166 * Close and offline the region for split or merge 3167 * 3168 * @param regionEncodedName the name of the region(s) to close 3169 * @return true if closed the region successfully. 3170 * @throws IOException 3171 */ 3172 protected boolean closeAndOfflineRegionForSplitOrMerge(final List<String> regionEncodedName) 3173 throws IOException { 3174 for (int i = 0; i < regionEncodedName.size(); ++i) { 3175 HRegion regionToClose = this.getRegion(regionEncodedName.get(i)); 3176 if (regionToClose != null) { 3177 Map<byte[], List<HStoreFile>> hstoreFiles = null; 3178 Exception exceptionToThrow = null; 3179 try { 3180 hstoreFiles = regionToClose.close(false); 3181 } catch (Exception e) { 3182 exceptionToThrow = e; 3183 } 3184 if (exceptionToThrow == null && hstoreFiles == null) { 3185 // The region was closed by someone else 3186 exceptionToThrow = 3187 new IOException("Failed to close region: already closed by another thread"); 3188 } 3189 if (exceptionToThrow != null) { 3190 if (exceptionToThrow instanceof IOException) { 3191 throw (IOException) exceptionToThrow; 3192 } 3193 throw new IOException(exceptionToThrow); 3194 } 3195 // Offline the region 3196 this.removeRegion(regionToClose, null); 3197 } 3198 } 3199 return true; 3200 } 3201 3202 /** 3203 * @return HRegion for the passed binary <code>regionName</code> or null if 3204 * named region is not member of the online regions. 3205 */ 3206 public HRegion getOnlineRegion(final byte[] regionName) { 3207 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3208 return this.onlineRegions.get(encodedRegionName); 3209 } 3210 3211 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) { 3212 return this.regionFavoredNodesMap.get(encodedRegionName); 3213 } 3214 3215 @Override 3216 public HRegion getRegion(final String encodedRegionName) { 3217 return this.onlineRegions.get(encodedRegionName); 3218 } 3219 3220 3221 @Override 3222 public boolean removeRegion(final HRegion r, ServerName destination) { 3223 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3224 if (destination != null) { 3225 long closeSeqNum = r.getMaxFlushedSeqId(); 3226 if (closeSeqNum == HConstants.NO_SEQNUM) { 3227 // No edits in WAL for this region; get the sequence number when the region was opened. 3228 closeSeqNum = r.getOpenSeqNum(); 3229 if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0; 3230 } 3231 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum); 3232 } 3233 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3234 return toReturn != null; 3235 } 3236 3237 /** 3238 * Protected Utility method for safely obtaining an HRegion handle. 3239 * 3240 * @param regionName 3241 * Name of online {@link HRegion} to return 3242 * @return {@link HRegion} for <code>regionName</code> 3243 * @throws NotServingRegionException 3244 */ 3245 protected HRegion getRegion(final byte[] regionName) 3246 throws NotServingRegionException { 3247 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3248 return getRegionByEncodedName(regionName, encodedRegionName); 3249 } 3250 3251 public HRegion getRegionByEncodedName(String encodedRegionName) 3252 throws NotServingRegionException { 3253 return getRegionByEncodedName(null, encodedRegionName); 3254 } 3255 3256 protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3257 throws NotServingRegionException { 3258 HRegion region = this.onlineRegions.get(encodedRegionName); 3259 if (region == null) { 3260 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3261 if (moveInfo != null) { 3262 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3263 } 3264 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3265 String regionNameStr = regionName == null? 3266 encodedRegionName: Bytes.toStringBinary(regionName); 3267 if (isOpening != null && isOpening.booleanValue()) { 3268 throw new RegionOpeningException("Region " + regionNameStr + 3269 " is opening on " + this.serverName); 3270 } 3271 throw new NotServingRegionException("" + regionNameStr + 3272 " is not online on " + this.serverName); 3273 } 3274 return region; 3275 } 3276 3277 /* 3278 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to 3279 * IOE if it isn't already. 3280 * 3281 * @param t Throwable 3282 * 3283 * @param msg Message to log in error. Can be null. 3284 * 3285 * @return Throwable converted to an IOE; methods can only let out IOEs. 3286 */ 3287 private Throwable cleanup(final Throwable t, final String msg) { 3288 // Don't log as error if NSRE; NSRE is 'normal' operation. 3289 if (t instanceof NotServingRegionException) { 3290 LOG.debug("NotServingRegionException; " + t.getMessage()); 3291 return t; 3292 } 3293 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3294 if (msg == null) { 3295 LOG.error("", e); 3296 } else { 3297 LOG.error(msg, e); 3298 } 3299 if (!rpcServices.checkOOME(t)) { 3300 checkFileSystem(); 3301 } 3302 return t; 3303 } 3304 3305 /* 3306 * @param t 3307 * 3308 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3309 * 3310 * @return Make <code>t</code> an IOE if it isn't already. 3311 */ 3312 protected IOException convertThrowableToIOE(final Throwable t, final String msg) { 3313 return (t instanceof IOException ? (IOException) t : msg == null 3314 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t)); 3315 } 3316 3317 /** 3318 * Checks to see if the file system is still accessible. If not, sets 3319 * abortRequested and stopRequested 3320 * 3321 * @return false if file system is not available 3322 */ 3323 public boolean checkFileSystem() { 3324 if (this.fsOk && this.fs != null) { 3325 try { 3326 FSUtils.checkFileSystemAvailable(this.fs); 3327 } catch (IOException e) { 3328 abort("File System not available", e); 3329 this.fsOk = false; 3330 } 3331 } 3332 return this.fsOk; 3333 } 3334 3335 @Override 3336 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3337 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3338 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()]; 3339 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3340 // it is a map of region name to InetSocketAddress[] 3341 for (int i = 0; i < favoredNodes.size(); i++) { 3342 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(), 3343 favoredNodes.get(i).getPort()); 3344 } 3345 regionFavoredNodesMap.put(encodedRegionName, addr); 3346 } 3347 3348 /** 3349 * Return the favored nodes for a region given its encoded name. Look at the 3350 * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[] 3351 * @param encodedRegionName 3352 * @return array of favored locations 3353 */ 3354 @Override 3355 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3356 return regionFavoredNodesMap.get(encodedRegionName); 3357 } 3358 3359 @Override 3360 public ServerNonceManager getNonceManager() { 3361 return this.nonceManager; 3362 } 3363 3364 private static class MovedRegionInfo { 3365 private final ServerName serverName; 3366 private final long seqNum; 3367 private final long ts; 3368 3369 public MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3370 this.serverName = serverName; 3371 this.seqNum = closeSeqNum; 3372 ts = EnvironmentEdgeManager.currentTime(); 3373 } 3374 3375 public ServerName getServerName() { 3376 return serverName; 3377 } 3378 3379 public long getSeqNum() { 3380 return seqNum; 3381 } 3382 3383 public long getMoveTime() { 3384 return ts; 3385 } 3386 } 3387 3388 // This map will contains all the regions that we closed for a move. 3389 // We add the time it was moved as we don't want to keep too old information 3390 protected Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000); 3391 3392 // We need a timeout. If not there is a risk of giving a wrong information: this would double 3393 // the number of network calls instead of reducing them. 3394 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3395 3396 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) { 3397 if (ServerName.isSameAddress(destination, this.getServerName())) { 3398 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3399 return; 3400 } 3401 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" + 3402 closeSeqNum); 3403 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3404 } 3405 3406 void removeFromMovedRegions(String encodedName) { 3407 movedRegions.remove(encodedName); 3408 } 3409 3410 private MovedRegionInfo getMovedRegion(final String encodedRegionName) { 3411 MovedRegionInfo dest = movedRegions.get(encodedRegionName); 3412 3413 long now = EnvironmentEdgeManager.currentTime(); 3414 if (dest != null) { 3415 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) { 3416 return dest; 3417 } else { 3418 movedRegions.remove(encodedRegionName); 3419 } 3420 } 3421 3422 return null; 3423 } 3424 3425 /** 3426 * Remove the expired entries from the moved regions list. 3427 */ 3428 protected void cleanMovedRegions() { 3429 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED; 3430 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator(); 3431 3432 while (it.hasNext()){ 3433 Map.Entry<String, MovedRegionInfo> e = it.next(); 3434 if (e.getValue().getMoveTime() < cutOff) { 3435 it.remove(); 3436 } 3437 } 3438 } 3439 3440 /* 3441 * Use this to allow tests to override and schedule more frequently. 3442 */ 3443 3444 protected int movedRegionCleanerPeriod() { 3445 return TIMEOUT_REGION_MOVED; 3446 } 3447 3448 /** 3449 * Creates a Chore thread to clean the moved region cache. 3450 */ 3451 protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable { 3452 private HRegionServer regionServer; 3453 Stoppable stoppable; 3454 3455 private MovedRegionsCleaner( 3456 HRegionServer regionServer, Stoppable stoppable){ 3457 super("MovedRegionsCleaner for region " + regionServer, stoppable, 3458 regionServer.movedRegionCleanerPeriod()); 3459 this.regionServer = regionServer; 3460 this.stoppable = stoppable; 3461 } 3462 3463 static MovedRegionsCleaner create(HRegionServer rs){ 3464 Stoppable stoppable = new Stoppable() { 3465 private volatile boolean isStopped = false; 3466 @Override public void stop(String why) { isStopped = true;} 3467 @Override public boolean isStopped() {return isStopped;} 3468 }; 3469 3470 return new MovedRegionsCleaner(rs, stoppable); 3471 } 3472 3473 @Override 3474 protected void chore() { 3475 regionServer.cleanMovedRegions(); 3476 } 3477 3478 @Override 3479 public void stop(String why) { 3480 stoppable.stop(why); 3481 } 3482 3483 @Override 3484 public boolean isStopped() { 3485 return stoppable.isStopped(); 3486 } 3487 } 3488 3489 private String getMyEphemeralNodePath() { 3490 return ZNodePaths.joinZNode(this.zooKeeper.znodePaths.rsZNode, getServerName().toString()); 3491 } 3492 3493 private boolean isHealthCheckerConfigured() { 3494 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3495 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3496 } 3497 3498 /** 3499 * @return the underlying {@link CompactSplit} for the servers 3500 */ 3501 public CompactSplit getCompactSplitThread() { 3502 return this.compactSplitThread; 3503 } 3504 3505 public CoprocessorServiceResponse execRegionServerService( 3506 @SuppressWarnings("UnusedParameters") final RpcController controller, 3507 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3508 try { 3509 ServerRpcController serviceController = new ServerRpcController(); 3510 CoprocessorServiceCall call = serviceRequest.getCall(); 3511 String serviceName = call.getServiceName(); 3512 com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName); 3513 if (service == null) { 3514 throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " + 3515 serviceName); 3516 } 3517 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 3518 service.getDescriptorForType(); 3519 3520 String methodName = call.getMethodName(); 3521 com.google.protobuf.Descriptors.MethodDescriptor methodDesc = 3522 serviceDesc.findMethodByName(methodName); 3523 if (methodDesc == null) { 3524 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName + 3525 " called on executorService " + serviceName); 3526 } 3527 3528 com.google.protobuf.Message request = 3529 CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3530 final com.google.protobuf.Message.Builder responseBuilder = 3531 service.getResponsePrototype(methodDesc).newBuilderForType(); 3532 service.callMethod(methodDesc, serviceController, request, 3533 new com.google.protobuf.RpcCallback<com.google.protobuf.Message>() { 3534 @Override 3535 public void run(com.google.protobuf.Message message) { 3536 if (message != null) { 3537 responseBuilder.mergeFrom(message); 3538 } 3539 } 3540 }); 3541 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3542 if (exception != null) { 3543 throw exception; 3544 } 3545 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3546 } catch (IOException ie) { 3547 throw new ServiceException(ie); 3548 } 3549 } 3550 3551 /** 3552 * @return The cache config instance used by the regionserver. 3553 */ 3554 public CacheConfig getCacheConfig() { 3555 return this.cacheConfig; 3556 } 3557 3558 /** 3559 * @return : Returns the ConfigurationManager object for testing purposes. 3560 */ 3561 protected ConfigurationManager getConfigurationManager() { 3562 return configurationManager; 3563 } 3564 3565 /** 3566 * @return Return table descriptors implementation. 3567 */ 3568 public TableDescriptors getTableDescriptors() { 3569 return this.tableDescriptors; 3570 } 3571 3572 /** 3573 * Reload the configuration from disk. 3574 */ 3575 public void updateConfiguration() { 3576 LOG.info("Reloading the configuration from disk."); 3577 // Reload the configuration from disk. 3578 conf.reloadConfiguration(); 3579 configurationManager.notifyAllObservers(conf); 3580 } 3581 3582 public CacheEvictionStats clearRegionBlockCache(Region region) { 3583 BlockCache blockCache = this.getCacheConfig().getBlockCache(); 3584 long evictedBlocks = 0; 3585 3586 for(Store store : region.getStores()) { 3587 for(StoreFile hFile : store.getStorefiles()) { 3588 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3589 } 3590 } 3591 3592 return CacheEvictionStats.builder() 3593 .withEvictedBlocks(evictedBlocks) 3594 .build(); 3595 } 3596 3597 @Override 3598 public double getCompactionPressure() { 3599 double max = 0; 3600 for (Region region : onlineRegions.values()) { 3601 for (Store store : region.getStores()) { 3602 double normCount = store.getCompactionPressure(); 3603 if (normCount > max) { 3604 max = normCount; 3605 } 3606 } 3607 } 3608 return max; 3609 } 3610 3611 @Override 3612 public HeapMemoryManager getHeapMemoryManager() { 3613 return hMemManager; 3614 } 3615 3616 /** 3617 * For testing 3618 * @return whether all wal roll request finished for this regionserver 3619 */ 3620 @VisibleForTesting 3621 public boolean walRollRequestFinished() { 3622 return this.walRoller.walRollFinished(); 3623 } 3624 3625 @Override 3626 public ThroughputController getFlushThroughputController() { 3627 return flushThroughputController; 3628 } 3629 3630 @Override 3631 public double getFlushPressure() { 3632 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3633 // return 0 during RS initialization 3634 return 0.0; 3635 } 3636 return getRegionServerAccounting().getFlushPressure(); 3637 } 3638 3639 @Override 3640 public void onConfigurationChange(Configuration newConf) { 3641 ThroughputController old = this.flushThroughputController; 3642 if (old != null) { 3643 old.stop("configuration change"); 3644 } 3645 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3646 } 3647 3648 @Override 3649 public MetricsRegionServer getMetrics() { 3650 return metricsRegionServer; 3651 } 3652 3653 @Override 3654 public SecureBulkLoadManager getSecureBulkLoadManager() { 3655 return this.secureBulkLoadManager; 3656 } 3657 3658 @Override 3659 public EntityLock regionLock(List<RegionInfo> regionInfos, String description, 3660 Abortable abort) throws IOException { 3661 return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) 3662 .regionLock(regionInfos, description, abort); 3663 } 3664 3665 @Override 3666 public void unassign(byte[] regionName) throws IOException { 3667 clusterConnection.getAdmin().unassign(regionName, false); 3668 } 3669 3670 @Override 3671 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3672 return this.rsSpaceQuotaManager; 3673 } 3674 3675 public NettyEventLoopGroupConfig getEventLoopGroupConfig() { 3676 return eventLoopGroupConfig; 3677 } 3678 3679 @Override 3680 public Connection createConnection(Configuration conf) throws IOException { 3681 User user = UserProvider.instantiate(conf).getCurrent(); 3682 return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, 3683 this.rpcServices, this.rpcServices); 3684 } 3685 3686 /** 3687 * Force to terminate region server when abort timeout. 3688 */ 3689 private static class SystemExitWhenAbortTimeout extends TimerTask { 3690 3691 public SystemExitWhenAbortTimeout() { 3692 3693 } 3694 3695 @Override 3696 public void run() { 3697 LOG.warn("Aborting region server timed out, terminating forcibly" + 3698 " and does not wait for any running shutdown hooks or finalizers to finish their work." + 3699 " Thread dump to stdout."); 3700 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3701 Runtime.getRuntime().halt(1); 3702 } 3703 } 3704}