001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 024 025import java.io.IOException; 026import java.lang.Thread.UncaughtExceptionHandler; 027import java.lang.management.MemoryType; 028import java.lang.management.MemoryUsage; 029import java.lang.reflect.Constructor; 030import java.net.BindException; 031import java.net.InetAddress; 032import java.net.InetSocketAddress; 033import java.time.Duration; 034import java.util.ArrayList; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.Comparator; 038import java.util.HashSet; 039import java.util.Iterator; 040import java.util.List; 041import java.util.Map; 042import java.util.Map.Entry; 043import java.util.Objects; 044import java.util.Optional; 045import java.util.Set; 046import java.util.SortedMap; 047import java.util.Timer; 048import java.util.TimerTask; 049import java.util.TreeMap; 050import java.util.TreeSet; 051import java.util.concurrent.ConcurrentHashMap; 052import java.util.concurrent.ConcurrentMap; 053import java.util.concurrent.ConcurrentSkipListMap; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicBoolean; 056import java.util.concurrent.locks.ReentrantReadWriteLock; 057import java.util.function.Function; 058import javax.management.MalformedObjectNameException; 059import javax.servlet.http.HttpServlet; 060import org.apache.commons.lang3.RandomUtils; 061import org.apache.commons.lang3.StringUtils; 062import org.apache.commons.lang3.SystemUtils; 063import org.apache.hadoop.conf.Configuration; 064import org.apache.hadoop.fs.FileSystem; 065import org.apache.hadoop.fs.Path; 066import org.apache.hadoop.hbase.Abortable; 067import org.apache.hadoop.hbase.CacheEvictionStats; 068import org.apache.hadoop.hbase.CallQueueTooBigException; 069import org.apache.hadoop.hbase.ChoreService; 070import org.apache.hadoop.hbase.ClockOutOfSyncException; 071import org.apache.hadoop.hbase.CoordinatedStateManager; 072import org.apache.hadoop.hbase.DoNotRetryIOException; 073import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException; 074import org.apache.hadoop.hbase.HBaseConfiguration; 075import org.apache.hadoop.hbase.HBaseInterfaceAudience; 076import org.apache.hadoop.hbase.HConstants; 077import org.apache.hadoop.hbase.HealthCheckChore; 078import org.apache.hadoop.hbase.MetaTableAccessor; 079import org.apache.hadoop.hbase.NotServingRegionException; 080import org.apache.hadoop.hbase.PleaseHoldException; 081import org.apache.hadoop.hbase.ScheduledChore; 082import org.apache.hadoop.hbase.ServerName; 083import org.apache.hadoop.hbase.Stoppable; 084import org.apache.hadoop.hbase.TableDescriptors; 085import org.apache.hadoop.hbase.TableName; 086import org.apache.hadoop.hbase.YouAreDeadException; 087import org.apache.hadoop.hbase.ZNodeClearer; 088import org.apache.hadoop.hbase.client.ClusterConnection; 089import org.apache.hadoop.hbase.client.Connection; 090import org.apache.hadoop.hbase.client.ConnectionUtils; 091import org.apache.hadoop.hbase.client.RegionInfo; 092import org.apache.hadoop.hbase.client.RegionInfoBuilder; 093import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 094import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 095import org.apache.hadoop.hbase.client.locking.EntityLock; 096import org.apache.hadoop.hbase.client.locking.LockServiceClient; 097import org.apache.hadoop.hbase.conf.ConfigurationManager; 098import org.apache.hadoop.hbase.conf.ConfigurationObserver; 099import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 100import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 101import org.apache.hadoop.hbase.exceptions.RegionMovedException; 102import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 103import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 104import org.apache.hadoop.hbase.executor.ExecutorService; 105import org.apache.hadoop.hbase.executor.ExecutorType; 106import org.apache.hadoop.hbase.fs.HFileSystem; 107import org.apache.hadoop.hbase.http.InfoServer; 108import org.apache.hadoop.hbase.io.hfile.BlockCache; 109import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 110import org.apache.hadoop.hbase.io.hfile.HFile; 111import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 112import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 113import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; 114import org.apache.hadoop.hbase.ipc.RpcClient; 115import org.apache.hadoop.hbase.ipc.RpcClientFactory; 116import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 117import org.apache.hadoop.hbase.ipc.RpcServer; 118import org.apache.hadoop.hbase.ipc.RpcServerInterface; 119import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 120import org.apache.hadoop.hbase.ipc.ServerRpcController; 121import org.apache.hadoop.hbase.log.HBaseMarkers; 122import org.apache.hadoop.hbase.master.HMaster; 123import org.apache.hadoop.hbase.master.LoadBalancer; 124import org.apache.hadoop.hbase.master.RegionState.State; 125import org.apache.hadoop.hbase.mob.MobFileCache; 126import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 127import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 128import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 129import org.apache.hadoop.hbase.quotas.QuotaUtil; 130import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 131import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 132import org.apache.hadoop.hbase.quotas.RegionSize; 133import org.apache.hadoop.hbase.quotas.RegionSizeStore; 134import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 135import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 136import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 137import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 138import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 139import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 140import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 141import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 142import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 143import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 144import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 145import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 146import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 147import org.apache.hadoop.hbase.security.SecurityConstants; 148import org.apache.hadoop.hbase.security.Superusers; 149import org.apache.hadoop.hbase.security.User; 150import org.apache.hadoop.hbase.security.UserProvider; 151import org.apache.hadoop.hbase.trace.SpanReceiverHost; 152import org.apache.hadoop.hbase.trace.TraceUtil; 153import org.apache.hadoop.hbase.util.Addressing; 154import org.apache.hadoop.hbase.util.Bytes; 155import org.apache.hadoop.hbase.util.CompressionTest; 156import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 157import org.apache.hadoop.hbase.util.FSTableDescriptors; 158import org.apache.hadoop.hbase.util.FSUtils; 159import org.apache.hadoop.hbase.util.HasThread; 160import org.apache.hadoop.hbase.util.JvmPauseMonitor; 161import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 162import org.apache.hadoop.hbase.util.Pair; 163import org.apache.hadoop.hbase.util.RetryCounter; 164import org.apache.hadoop.hbase.util.RetryCounterFactory; 165import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 166import org.apache.hadoop.hbase.util.Sleeper; 167import org.apache.hadoop.hbase.util.Threads; 168import org.apache.hadoop.hbase.util.VersionInfo; 169import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 170import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; 171import org.apache.hadoop.hbase.wal.WAL; 172import org.apache.hadoop.hbase.wal.WALFactory; 173import org.apache.hadoop.hbase.wal.WALProvider; 174import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; 175import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 176import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 177import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 178import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 179import org.apache.hadoop.hbase.zookeeper.ZKUtil; 180import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 181import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 182import org.apache.hadoop.ipc.RemoteException; 183import org.apache.hadoop.util.ReflectionUtils; 184import org.apache.yetus.audience.InterfaceAudience; 185import org.apache.zookeeper.KeeperException; 186import org.slf4j.Logger; 187import org.slf4j.LoggerFactory; 188import sun.misc.Signal; 189import sun.misc.SignalHandler; 190 191import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 192import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 193import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 194import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 195import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 196import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 197import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 198import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 199import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 200import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 201import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 202 203import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 204import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 233 234/** 235 * HRegionServer makes a set of HRegions available to clients. It checks in with 236 * the HMaster. There are many HRegionServers in a single HBase deployment. 237 */ 238@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 239@SuppressWarnings({ "deprecation"}) 240public class HRegionServer extends HasThread implements 241 RegionServerServices, LastSequenceId, ConfigurationObserver { 242 // Time to pause if master says 'please hold'. Make configurable if needed. 243 private static final int INIT_PAUSE_TIME_MS = 1000; 244 245 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 246 247 /** 248 * For testing only! Set to true to skip notifying region assignment to master . 249 */ 250 @VisibleForTesting 251 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") 252 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 253 254 //RegionName vs current action in progress 255 //true - if open region action in progress 256 //false - if close region action in progress 257 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 258 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 259 260 /** 261 * Used to cache the open/close region procedures which already submitted. 262 * See {@link #submitRegionProcedure(long)}. 263 */ 264 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 265 /** 266 * Used to cache the open/close region procedures which already executed. 267 * See {@link #submitRegionProcedure(long)}. 268 */ 269 private final Cache<Long, Long> executedRegionProcedures = 270 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 271 272 // Cache flushing 273 protected MemStoreFlusher cacheFlusher; 274 275 protected HeapMemoryManager hMemManager; 276 277 /** 278 * Cluster connection to be shared by services. 279 * Initialized at server startup and closed when server shuts down. 280 * Clients must never close it explicitly. 281 * Clients hosted by this Server should make use of this clusterConnection rather than create 282 * their own; if they create their own, there is no way for the hosting server to shutdown 283 * ongoing client RPCs. 284 */ 285 protected ClusterConnection clusterConnection; 286 287 /** 288 * Go here to get table descriptors. 289 */ 290 protected TableDescriptors tableDescriptors; 291 292 // Replication services. If no replication, this handler will be null. 293 protected ReplicationSourceService replicationSourceHandler; 294 protected ReplicationSinkService replicationSinkHandler; 295 296 // Compactions 297 public CompactSplit compactSplitThread; 298 299 /** 300 * Map of regions currently being served by this region server. Key is the 301 * encoded region name. All access should be synchronized. 302 */ 303 protected final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 304 305 /** 306 * Map of encoded region names to the DataNode locations they should be hosted on 307 * We store the value as InetSocketAddress since this is used only in HDFS 308 * API (create() that takes favored nodes as hints for placing file blocks). 309 * We could have used ServerName here as the value class, but we'd need to 310 * convert it to InetSocketAddress at some point before the HDFS API call, and 311 * it seems a bit weird to store ServerName since ServerName refers to RegionServers 312 * and here we really mean DataNode locations. 313 */ 314 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap = 315 new ConcurrentHashMap<>(); 316 317 // Leases 318 protected Leases leases; 319 320 // Instance of the hbase executor executorService. 321 protected ExecutorService executorService; 322 323 // If false, the file system has become unavailable 324 protected volatile boolean fsOk; 325 protected HFileSystem fs; 326 protected HFileSystem walFs; 327 328 // Set when a report to the master comes back with a message asking us to 329 // shutdown. Also set by call to stop when debugging or running unit tests 330 // of HRegionServer in isolation. 331 private volatile boolean stopped = false; 332 333 // Go down hard. Used if file system becomes unavailable and also in 334 // debugging and unit tests. 335 private volatile boolean abortRequested; 336 public static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 337 // Default abort timeout is 1200 seconds for safe 338 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 339 // Will run this task when abort timeout 340 public static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 341 342 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<>(); 343 344 // A state before we go into stopped state. At this stage we're closing user 345 // space regions. 346 private boolean stopping = false; 347 348 volatile boolean killed = false; 349 350 private volatile boolean shutDown = false; 351 352 protected final Configuration conf; 353 354 private Path rootDir; 355 private Path walRootDir; 356 357 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 358 359 final int numRetries; 360 protected final int threadWakeFrequency; 361 protected final int msgInterval; 362 363 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 364 private final int compactionCheckFrequency; 365 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 366 private final int flushCheckFrequency; 367 368 protected final int numRegionsToReport; 369 370 // Stub to do region server status calls against the master. 371 private volatile RegionServerStatusService.BlockingInterface rssStub; 372 private volatile LockService.BlockingInterface lockStub; 373 // RPC client. Used to make the stub above that does region server status checking. 374 RpcClient rpcClient; 375 376 private RpcRetryingCallerFactory rpcRetryingCallerFactory; 377 private RpcControllerFactory rpcControllerFactory; 378 379 private UncaughtExceptionHandler uncaughtExceptionHandler; 380 381 // Info server. Default access so can be used by unit tests. REGIONSERVER 382 // is name of the webapp and the attribute name used stuffing this instance 383 // into web context. 384 protected InfoServer infoServer; 385 private JvmPauseMonitor pauseMonitor; 386 387 /** region server process name */ 388 public static final String REGIONSERVER = "regionserver"; 389 390 MetricsRegionServer metricsRegionServer; 391 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 392 MetricsTable metricsTable; 393 private SpanReceiverHost spanReceiverHost; 394 395 /** 396 * ChoreService used to schedule tasks that we want to run periodically 397 */ 398 private ChoreService choreService; 399 400 /* 401 * Check for compactions requests. 402 */ 403 ScheduledChore compactionChecker; 404 405 /* 406 * Check for flushes 407 */ 408 ScheduledChore periodicFlusher; 409 410 protected volatile WALFactory walFactory; 411 412 // WAL roller. log is protected rather than private to avoid 413 // eclipse warning when accessed by inner classes 414 protected LogRoller walRoller; 415 416 // A thread which calls reportProcedureDone 417 private RemoteProcedureResultReporter procedureResultReporter; 418 419 // flag set after we're done setting up server threads 420 final AtomicBoolean online = new AtomicBoolean(false); 421 422 // zookeeper connection and watcher 423 protected final ZKWatcher zooKeeper; 424 425 // master address tracker 426 private final MasterAddressTracker masterAddressTracker; 427 428 // Cluster Status Tracker 429 protected final ClusterStatusTracker clusterStatusTracker; 430 431 // Log Splitting Worker 432 private SplitLogWorker splitLogWorker; 433 434 // A sleeper that sleeps for msgInterval. 435 protected final Sleeper sleeper; 436 437 private final int operationTimeout; 438 private final int shortOperationTimeout; 439 440 private final RegionServerAccounting regionServerAccounting; 441 442 // Block cache 443 private BlockCache blockCache; 444 // The cache for mob files 445 private MobFileCache mobFileCache; 446 447 /** The health check chore. */ 448 private HealthCheckChore healthCheckChore; 449 450 /** The nonce manager chore. */ 451 private ScheduledChore nonceManagerChore; 452 453 private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap(); 454 455 /** 456 * The server name the Master sees us as. Its made from the hostname the 457 * master passes us, port, and server startcode. Gets set after registration 458 * against Master. 459 */ 460 protected ServerName serverName; 461 462 /* 463 * hostname specified by hostname config 464 */ 465 protected String useThisHostnameInstead; 466 467 // key to the config parameter of server hostname 468 // the specification of server hostname is optional. The hostname should be resolvable from 469 // both master and region server 470 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 471 final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname"; 472 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 473 protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname"; 474 475 // HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive. 476 // Exception will be thrown if both are used. 477 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 478 "hbase.regionserver.hostname.disable.master.reversedns"; 479 480 /** 481 * This servers startcode. 482 */ 483 protected final long startcode; 484 485 /** 486 * Unique identifier for the cluster we are a part of. 487 */ 488 protected String clusterId; 489 490 /** 491 * Chore to clean periodically the moved region list 492 */ 493 private MovedRegionsCleaner movedRegionsCleaner; 494 495 // chore for refreshing store files for secondary regions 496 private StorefileRefresherChore storefileRefresher; 497 498 private RegionServerCoprocessorHost rsHost; 499 500 private RegionServerProcedureManagerHost rspmHost; 501 502 private RegionServerRpcQuotaManager rsQuotaManager; 503 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 504 505 /** 506 * Nonce manager. Nonces are used to make operations like increment and append idempotent 507 * in the case where client doesn't receive the response from a successful operation and 508 * retries. We track the successful ops for some time via a nonce sent by client and handle 509 * duplicate operations (currently, by failing them; in future we might use MVCC to return 510 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from 511 * HBASE-3787) are: 512 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth 513 * of past records. If we don't read the records, we don't read and recover the nonces. 514 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup. 515 * - There's no WAL recovery during normal region move, so nonces will not be transfered. 516 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and 517 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush 518 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was 519 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce 520 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the 521 * latest nonce in it expired. It can also be recovered during move. 522 */ 523 final ServerNonceManager nonceManager; 524 525 private UserProvider userProvider; 526 527 protected final RSRpcServices rpcServices; 528 529 protected CoordinatedStateManager csm; 530 531 /** 532 * Configuration manager is used to register/deregister and notify the configuration observers 533 * when the regionserver is notified that there was a change in the on disk configs. 534 */ 535 protected final ConfigurationManager configurationManager; 536 537 @VisibleForTesting 538 CompactedHFilesDischarger compactedFileDischarger; 539 540 private volatile ThroughputController flushThroughputController; 541 542 protected SecureBulkLoadManager secureBulkLoadManager; 543 544 protected FileSystemUtilizationChore fsUtilizationChore; 545 546 private final NettyEventLoopGroupConfig eventLoopGroupConfig; 547 548 /** 549 * True if this RegionServer is coming up in a cluster where there is no Master; 550 * means it needs to just come up and make do without a Master to talk to: e.g. in test or 551 * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only 552 * purpose is as a Replication-stream sink; see HBASE-18846 for more. 553 */ 554 private final boolean masterless; 555 static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 556 557 /**regionserver codec list **/ 558 public static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 559 560 /** 561 * Starts a HRegionServer at the default location 562 */ 563 // Don't start any services or managers in here in the Constructor. 564 // Defer till after we register with the Master as much as possible. See #startServices. 565 public HRegionServer(Configuration conf) throws IOException { 566 super("RegionServer"); // thread name 567 TraceUtil.initTracer(conf); 568 try { 569 this.startcode = System.currentTimeMillis(); 570 this.conf = conf; 571 this.fsOk = true; 572 this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 573 this.eventLoopGroupConfig = setupNetty(this.conf); 574 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); 575 HFile.checkHFileVersion(this.conf); 576 checkCodecs(this.conf); 577 this.userProvider = UserProvider.instantiate(conf); 578 FSUtils.setupShortCircuitRead(this.conf); 579 580 // Disable usage of meta replicas in the regionserver 581 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 582 // Config'ed params 583 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 584 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 585 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 586 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 587 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 588 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); 589 590 this.sleeper = new Sleeper(this.msgInterval, this); 591 592 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 593 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 594 595 this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10); 596 597 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 598 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 599 600 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 601 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 602 603 this.abortRequested = false; 604 this.stopped = false; 605 606 rpcServices = createRpcServices(); 607 useThisHostnameInstead = getUseThisHostnameInstead(conf); 608 String hostName = 609 StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName() 610 : this.useThisHostnameInstead; 611 serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); 612 613 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); 614 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); 615 616 // login the zookeeper client principal (if using security) 617 ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, 618 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); 619 // login the server principal (if using secure Hadoop) 620 login(userProvider, hostName); 621 // init superusers and add the server principal (if using security) 622 // or process owner as default super user. 623 Superusers.initialize(conf); 624 regionServerAccounting = new RegionServerAccounting(conf); 625 626 boolean isMasterNotCarryTable = 627 this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf); 628 629 // no need to instantiate block cache and mob file cache when master not carry table 630 if (!isMasterNotCarryTable) { 631 blockCache = BlockCacheFactory.createBlockCache(conf); 632 mobFileCache = new MobFileCache(conf); 633 } 634 635 uncaughtExceptionHandler = new UncaughtExceptionHandler() { 636 @Override 637 public void uncaughtException(Thread t, Throwable e) { 638 abort("Uncaught exception in executorService thread " + t.getName(), e); 639 } 640 }; 641 642 initializeFileSystem(); 643 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); 644 645 this.configurationManager = new ConfigurationManager(); 646 setupWindows(getConfiguration(), getConfigurationManager()); 647 648 // Some unit tests don't need a cluster, so no zookeeper at all 649 if (!conf.getBoolean("hbase.testing.nocluster", false)) { 650 // Open connection to zookeeper and set primary watcher 651 zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + 652 rpcServices.isa.getPort(), this, canCreateBaseZNode()); 653 // If no master in cluster, skip trying to track one or look for a cluster status. 654 if (!this.masterless) { 655 if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 656 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { 657 this.csm = new ZkCoordinatedStateManager(this); 658 } 659 660 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 661 masterAddressTracker.start(); 662 663 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); 664 clusterStatusTracker.start(); 665 } else { 666 masterAddressTracker = null; 667 clusterStatusTracker = null; 668 } 669 } else { 670 zooKeeper = null; 671 masterAddressTracker = null; 672 clusterStatusTracker = null; 673 } 674 this.rpcServices.start(zooKeeper); 675 // This violates 'no starting stuff in Constructor' but Master depends on the below chore 676 // and executor being created and takes a different startup route. Lots of overlap between HRS 677 // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super 678 // Master expects Constructor to put up web servers. Ugh. 679 // class HRS. TODO. 680 this.choreService = new ChoreService(getName(), true); 681 this.executorService = new ExecutorService(getName()); 682 putUpWebUI(); 683 } catch (Throwable t) { 684 // Make sure we log the exception. HRegionServer is often started via reflection and the 685 // cause of failed startup is lost. 686 LOG.error("Failed construction RegionServer", t); 687 throw t; 688 } 689 } 690 691 // HMaster should override this method to load the specific config for master 692 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 693 String hostname = conf.get(RS_HOSTNAME_KEY); 694 if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 695 if (!StringUtils.isBlank(hostname)) { 696 String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY + 697 " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + 698 " to true while " + RS_HOSTNAME_KEY + " is used"; 699 throw new IOException(msg); 700 } else { 701 return rpcServices.isa.getHostName(); 702 } 703 } else { 704 return hostname; 705 } 706 } 707 708 /** 709 * If running on Windows, do windows-specific setup. 710 */ 711 private static void setupWindows(final Configuration conf, ConfigurationManager cm) { 712 if (!SystemUtils.IS_OS_WINDOWS) { 713 Signal.handle(new Signal("HUP"), new SignalHandler() { 714 @Override 715 public void handle(Signal signal) { 716 conf.reloadConfiguration(); 717 cm.notifyAllObservers(conf); 718 } 719 }); 720 } 721 } 722 723 private static NettyEventLoopGroupConfig setupNetty(Configuration conf) { 724 // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL. 725 NettyEventLoopGroupConfig nelgc = 726 new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); 727 NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 728 NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 729 return nelgc; 730 } 731 732 private void initializeFileSystem() throws IOException { 733 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase 734 // checksum verification enabled, then automatically switch off hdfs checksum verification. 735 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 736 FSUtils.setFsDefault(this.conf, FSUtils.getWALRootDir(this.conf)); 737 this.walFs = new HFileSystem(this.conf, useHBaseChecksum); 738 this.walRootDir = FSUtils.getWALRootDir(this.conf); 739 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else 740 // underlying hadoop hdfs accessors will be going against wrong filesystem 741 // (unless all is set to defaults). 742 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf)); 743 this.fs = new HFileSystem(this.conf, useHBaseChecksum); 744 this.rootDir = FSUtils.getRootDir(this.conf); 745 this.tableDescriptors = getFsTableDescriptors(); 746 } 747 748 protected TableDescriptors getFsTableDescriptors() throws IOException { 749 return new FSTableDescriptors(this.conf, 750 this.fs, this.rootDir, !canUpdateTableDescriptor(), false, getMetaTableObserver()); 751 } 752 753 protected Function<TableDescriptorBuilder, TableDescriptorBuilder> getMetaTableObserver() { 754 return null; 755 } 756 757 protected void login(UserProvider user, String host) throws IOException { 758 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 759 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 760 } 761 762 763 /** 764 * Wait for an active Master. 765 * See override in Master superclass for how it is used. 766 */ 767 protected void waitForMasterActive() {} 768 769 protected String getProcessName() { 770 return REGIONSERVER; 771 } 772 773 protected boolean canCreateBaseZNode() { 774 return this.masterless; 775 } 776 777 protected boolean canUpdateTableDescriptor() { 778 return false; 779 } 780 781 protected RSRpcServices createRpcServices() throws IOException { 782 return new RSRpcServices(this); 783 } 784 785 protected void configureInfoServer() { 786 infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class); 787 infoServer.setAttribute(REGIONSERVER, this); 788 } 789 790 protected Class<? extends HttpServlet> getDumpServlet() { 791 return RSDumpServlet.class; 792 } 793 794 @Override 795 public boolean registerService(com.google.protobuf.Service instance) { 796 /* 797 * No stacking of instances is allowed for a single executorService name 798 */ 799 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 800 instance.getDescriptorForType(); 801 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 802 if (coprocessorServiceHandlers.containsKey(serviceName)) { 803 LOG.error("Coprocessor executorService " + serviceName 804 + " already registered, rejecting request from " + instance); 805 return false; 806 } 807 808 coprocessorServiceHandlers.put(serviceName, instance); 809 if (LOG.isDebugEnabled()) { 810 LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName); 811 } 812 return true; 813 } 814 815 /** 816 * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to 817 * the local server; i.e. a short-circuit Connection. Safe to use going to local or remote 818 * server. Create this instance in a method can be intercepted and mocked in tests. 819 * @throws IOException 820 */ 821 @VisibleForTesting 822 protected ClusterConnection createClusterConnection() throws IOException { 823 Configuration conf = this.conf; 824 if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { 825 // Use server ZK cluster for server-issued connections, so we clone 826 // the conf and unset the client ZK related properties 827 conf = new Configuration(this.conf); 828 conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); 829 } 830 // Create a cluster connection that when appropriate, can short-circuit and go directly to the 831 // local server if the request is to the local server bypassing RPC. Can be used for both local 832 // and remote invocations. 833 return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), 834 serverName, rpcServices, rpcServices); 835 } 836 837 /** 838 * Run test on configured codecs to make sure supporting libs are in place. 839 * @param c 840 * @throws IOException 841 */ 842 private static void checkCodecs(final Configuration c) throws IOException { 843 // check to see if the codec list is available: 844 String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null); 845 if (codecs == null) return; 846 for (String codec : codecs) { 847 if (!CompressionTest.testCompression(codec)) { 848 throw new IOException("Compression codec " + codec + 849 " not supported, aborting RS construction"); 850 } 851 } 852 } 853 854 public String getClusterId() { 855 return this.clusterId; 856 } 857 858 /** 859 * Setup our cluster connection if not already initialized. 860 * @throws IOException 861 */ 862 protected synchronized void setupClusterConnection() throws IOException { 863 if (clusterConnection == null) { 864 clusterConnection = createClusterConnection(); 865 } 866 } 867 868 /** 869 * All initialization needed before we go register with Master.<br> 870 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 871 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 872 */ 873 private void preRegistrationInitialization() { 874 try { 875 initializeZooKeeper(); 876 setupClusterConnection(); 877 // Setup RPC client for master communication 878 this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( 879 this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); 880 } catch (Throwable t) { 881 // Call stop if error or process will stick around for ever since server 882 // puts up non-daemon threads. 883 this.rpcServices.stop(); 884 abort("Initialization of RS failed. Hence aborting RS.", t); 885 } 886 } 887 888 /** 889 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 890 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 891 * <p> 892 * Finally open long-living server short-circuit connection. 893 */ 894 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 895 justification="cluster Id znode read would give us correct response") 896 private void initializeZooKeeper() throws IOException, InterruptedException { 897 // Nothing to do in here if no Master in the mix. 898 if (this.masterless) { 899 return; 900 } 901 902 // Create the master address tracker, register with zk, and start it. Then 903 // block until a master is available. No point in starting up if no master 904 // running. 905 blockAndCheckIfStopped(this.masterAddressTracker); 906 907 // Wait on cluster being up. Master will set this flag up in zookeeper 908 // when ready. 909 blockAndCheckIfStopped(this.clusterStatusTracker); 910 911 // If we are HMaster then the cluster id should have already been set. 912 if (clusterId == null) { 913 // Retrieve clusterId 914 // Since cluster status is now up 915 // ID should have already been set by HMaster 916 try { 917 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 918 if (clusterId == null) { 919 this.abort("Cluster ID has not been set"); 920 } 921 LOG.info("ClusterId : " + clusterId); 922 } catch (KeeperException e) { 923 this.abort("Failed to retrieve Cluster ID", e); 924 } 925 } 926 927 waitForMasterActive(); 928 if (isStopped() || isAborted()) { 929 return; // No need for further initialization 930 } 931 932 // watch for snapshots and other procedures 933 try { 934 rspmHost = new RegionServerProcedureManagerHost(); 935 rspmHost.loadProcedures(conf); 936 rspmHost.initialize(this); 937 } catch (KeeperException e) { 938 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 939 } 940 } 941 942 /** 943 * Utilty method to wait indefinitely on a znode availability while checking 944 * if the region server is shut down 945 * @param tracker znode tracker to use 946 * @throws IOException any IO exception, plus if the RS is stopped 947 * @throws InterruptedException 948 */ 949 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 950 throws IOException, InterruptedException { 951 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 952 if (this.stopped) { 953 throw new IOException("Received the shutdown message while waiting."); 954 } 955 } 956 } 957 958 /** 959 * @return True if the cluster is up. 960 */ 961 @Override 962 public boolean isClusterUp() { 963 return this.masterless || 964 (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 965 } 966 967 /** 968 * The HRegionServer sticks in this loop until closed. 969 */ 970 @Override 971 public void run() { 972 try { 973 // Do pre-registration initializations; zookeeper, lease threads, etc. 974 preRegistrationInitialization(); 975 } catch (Throwable e) { 976 abort("Fatal exception during initialization", e); 977 } 978 979 try { 980 if (!isStopped() && !isAborted()) { 981 ShutdownHook.install(conf, fs, this, Thread.currentThread()); 982 // Initialize the RegionServerCoprocessorHost now that our ephemeral 983 // node was created, in case any coprocessors want to use ZooKeeper 984 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 985 } 986 987 // Try and register with the Master; tell it we are here. Break if server is stopped or the 988 // clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and start 989 // up all Services. Use RetryCounter to get backoff in case Master is struggling to come up. 990 LOG.debug("About to register with Master."); 991 RetryCounterFactory rcf = new RetryCounterFactory(Integer.MAX_VALUE, 992 this.sleeper.getPeriod(), 1000 * 60 * 5); 993 RetryCounter rc = rcf.create(); 994 while (keepLooping()) { 995 RegionServerStartupResponse w = reportForDuty(); 996 if (w == null) { 997 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 998 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 999 this.sleeper.sleep(sleepTime); 1000 } else { 1001 handleReportForDutyResponse(w); 1002 break; 1003 } 1004 } 1005 1006 if (!isStopped() && isHealthy()) { 1007 // start the snapshot handler and other procedure handlers, 1008 // since the server is ready to run 1009 if (this.rspmHost != null) { 1010 this.rspmHost.start(); 1011 } 1012 // Start the Quota Manager 1013 if (this.rsQuotaManager != null) { 1014 rsQuotaManager.start(getRpcServer().getScheduler()); 1015 } 1016 if (this.rsSpaceQuotaManager != null) { 1017 this.rsSpaceQuotaManager.start(); 1018 } 1019 } 1020 1021 // We registered with the Master. Go into run mode. 1022 long lastMsg = System.currentTimeMillis(); 1023 long oldRequestCount = -1; 1024 // The main run loop. 1025 while (!isStopped() && isHealthy()) { 1026 if (!isClusterUp()) { 1027 if (isOnlineRegionsEmpty()) { 1028 stop("Exiting; cluster shutdown set and not carrying any regions"); 1029 } else if (!this.stopping) { 1030 this.stopping = true; 1031 LOG.info("Closing user regions"); 1032 closeUserRegions(this.abortRequested); 1033 } else if (this.stopping) { 1034 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 1035 if (allUserRegionsOffline) { 1036 // Set stopped if no more write requests tp meta tables 1037 // since last time we went around the loop. Any open 1038 // meta regions will be closed on our way out. 1039 if (oldRequestCount == getWriteRequestCount()) { 1040 stop("Stopped; only catalog regions remaining online"); 1041 break; 1042 } 1043 oldRequestCount = getWriteRequestCount(); 1044 } else { 1045 // Make sure all regions have been closed -- some regions may 1046 // have not got it because we were splitting at the time of 1047 // the call to closeUserRegions. 1048 closeUserRegions(this.abortRequested); 1049 } 1050 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 1051 } 1052 } 1053 long now = System.currentTimeMillis(); 1054 if ((now - lastMsg) >= msgInterval) { 1055 tryRegionServerReport(lastMsg, now); 1056 lastMsg = System.currentTimeMillis(); 1057 } 1058 if (!isStopped() && !isAborted()) { 1059 this.sleeper.sleep(); 1060 } 1061 } // for 1062 } catch (Throwable t) { 1063 if (!rpcServices.checkOOME(t)) { 1064 String prefix = t instanceof YouAreDeadException? "": "Unhandled: "; 1065 abort(prefix + t.getMessage(), t); 1066 } 1067 } 1068 1069 if (abortRequested) { 1070 Timer abortMonitor = new Timer("Abort regionserver monitor", true); 1071 TimerTask abortTimeoutTask = null; 1072 try { 1073 Constructor<? extends TimerTask> timerTaskCtor = 1074 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 1075 .asSubclass(TimerTask.class).getDeclaredConstructor(); 1076 timerTaskCtor.setAccessible(true); 1077 abortTimeoutTask = timerTaskCtor.newInstance(); 1078 } catch (Exception e) { 1079 LOG.warn("Initialize abort timeout task failed", e); 1080 } 1081 if (abortTimeoutTask != null) { 1082 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 1083 } 1084 } 1085 1086 if (this.leases != null) { 1087 this.leases.closeAfterLeasesExpire(); 1088 } 1089 if (this.splitLogWorker != null) { 1090 splitLogWorker.stop(); 1091 } 1092 if (this.infoServer != null) { 1093 LOG.info("Stopping infoServer"); 1094 try { 1095 this.infoServer.stop(); 1096 } catch (Exception e) { 1097 LOG.error("Failed to stop infoServer", e); 1098 } 1099 } 1100 // Send cache a shutdown. 1101 if (blockCache != null) { 1102 blockCache.shutdown(); 1103 } 1104 if (mobFileCache != null) { 1105 mobFileCache.shutdown(); 1106 } 1107 1108 if (movedRegionsCleaner != null) { 1109 movedRegionsCleaner.stop("Region Server stopping"); 1110 } 1111 1112 // Send interrupts to wake up threads if sleeping so they notice shutdown. 1113 // TODO: Should we check they are alive? If OOME could have exited already 1114 if (this.hMemManager != null) this.hMemManager.stop(); 1115 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); 1116 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); 1117 sendShutdownInterrupt(); 1118 1119 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 1120 if (rspmHost != null) { 1121 rspmHost.stop(this.abortRequested || this.killed); 1122 } 1123 1124 if (this.killed) { 1125 // Just skip out w/o closing regions. Used when testing. 1126 } else if (abortRequested) { 1127 if (this.fsOk) { 1128 closeUserRegions(abortRequested); // Don't leave any open file handles 1129 } 1130 LOG.info("aborting server " + this.serverName); 1131 } else { 1132 closeUserRegions(abortRequested); 1133 LOG.info("stopping server " + this.serverName); 1134 } 1135 1136 if (this.clusterConnection != null && !clusterConnection.isClosed()) { 1137 try { 1138 this.clusterConnection.close(); 1139 } catch (IOException e) { 1140 // Although the {@link Closeable} interface throws an {@link 1141 // IOException}, in reality, the implementation would never do that. 1142 LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e); 1143 } 1144 } 1145 1146 // Closing the compactSplit thread before closing meta regions 1147 if (!this.killed && containsMetaTableRegions()) { 1148 if (!abortRequested || this.fsOk) { 1149 if (this.compactSplitThread != null) { 1150 this.compactSplitThread.join(); 1151 this.compactSplitThread = null; 1152 } 1153 closeMetaTableRegions(abortRequested); 1154 } 1155 } 1156 1157 if (!this.killed && this.fsOk) { 1158 waitOnAllRegionsToClose(abortRequested); 1159 LOG.info("stopping server " + this.serverName + "; all regions closed."); 1160 } 1161 1162 // Stop the quota manager 1163 if (rsQuotaManager != null) { 1164 rsQuotaManager.stop(); 1165 } 1166 if (rsSpaceQuotaManager != null) { 1167 rsSpaceQuotaManager.stop(); 1168 rsSpaceQuotaManager = null; 1169 } 1170 1171 //fsOk flag may be changed when closing regions throws exception. 1172 if (this.fsOk) { 1173 shutdownWAL(!abortRequested); 1174 } 1175 1176 // Make sure the proxy is down. 1177 if (this.rssStub != null) { 1178 this.rssStub = null; 1179 } 1180 if (this.lockStub != null) { 1181 this.lockStub = null; 1182 } 1183 if (this.rpcClient != null) { 1184 this.rpcClient.close(); 1185 } 1186 if (this.leases != null) { 1187 this.leases.close(); 1188 } 1189 if (this.pauseMonitor != null) { 1190 this.pauseMonitor.stop(); 1191 } 1192 1193 if (!killed) { 1194 stopServiceThreads(); 1195 } 1196 1197 if (this.rpcServices != null) { 1198 this.rpcServices.stop(); 1199 } 1200 1201 try { 1202 deleteMyEphemeralNode(); 1203 } catch (KeeperException.NoNodeException nn) { 1204 } catch (KeeperException e) { 1205 LOG.warn("Failed deleting my ephemeral node", e); 1206 } 1207 // We may have failed to delete the znode at the previous step, but 1208 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1209 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1210 1211 if (this.zooKeeper != null) { 1212 this.zooKeeper.close(); 1213 } 1214 this.shutDown = true; 1215 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1216 } 1217 1218 private boolean containsMetaTableRegions() { 1219 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1220 } 1221 1222 private boolean areAllUserRegionsOffline() { 1223 if (getNumberOfOnlineRegions() > 2) return false; 1224 boolean allUserRegionsOffline = true; 1225 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 1226 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1227 allUserRegionsOffline = false; 1228 break; 1229 } 1230 } 1231 return allUserRegionsOffline; 1232 } 1233 1234 /** 1235 * @return Current write count for all online regions. 1236 */ 1237 private long getWriteRequestCount() { 1238 long writeCount = 0; 1239 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 1240 writeCount += e.getValue().getWriteRequestsCount(); 1241 } 1242 return writeCount; 1243 } 1244 1245 @VisibleForTesting 1246 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1247 throws IOException { 1248 RegionServerStatusService.BlockingInterface rss = rssStub; 1249 if (rss == null) { 1250 // the current server could be stopping. 1251 return; 1252 } 1253 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1254 try { 1255 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1256 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1257 request.setLoad(sl); 1258 rss.regionServerReport(null, request.build()); 1259 } catch (ServiceException se) { 1260 IOException ioe = ProtobufUtil.getRemoteException(se); 1261 if (ioe instanceof YouAreDeadException) { 1262 // This will be caught and handled as a fatal error in run() 1263 throw ioe; 1264 } 1265 if (rssStub == rss) { 1266 rssStub = null; 1267 } 1268 // Couldn't connect to the master, get location from zk and reconnect 1269 // Method blocks until new master is found or we are stopped 1270 createRegionServerStatusStub(true); 1271 } 1272 } 1273 1274 /** 1275 * Reports the given map of Regions and their size on the filesystem to the active Master. 1276 * 1277 * @param regionSizeStore The store containing region sizes 1278 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1279 */ 1280 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1281 RegionServerStatusService.BlockingInterface rss = rssStub; 1282 if (rss == null) { 1283 // the current server could be stopping. 1284 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1285 return true; 1286 } 1287 try { 1288 buildReportAndSend(rss, regionSizeStore); 1289 } catch (ServiceException se) { 1290 IOException ioe = ProtobufUtil.getRemoteException(se); 1291 if (ioe instanceof PleaseHoldException) { 1292 LOG.trace("Failed to report region sizes to Master because it is initializing." 1293 + " This will be retried.", ioe); 1294 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1295 return true; 1296 } 1297 if (rssStub == rss) { 1298 rssStub = null; 1299 } 1300 createRegionServerStatusStub(true); 1301 if (ioe instanceof DoNotRetryIOException) { 1302 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1303 if (doNotRetryEx.getCause() != null) { 1304 Throwable t = doNotRetryEx.getCause(); 1305 if (t instanceof UnsupportedOperationException) { 1306 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1307 return false; 1308 } 1309 } 1310 } 1311 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1312 } 1313 return true; 1314 } 1315 1316 /** 1317 * Builds the region size report and sends it to the master. Upon successful sending of the 1318 * report, the region sizes that were sent are marked as sent. 1319 * 1320 * @param rss The stub to send to the Master 1321 * @param regionSizeStore The store containing region sizes 1322 */ 1323 void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1324 RegionSizeStore regionSizeStore) throws ServiceException { 1325 RegionSpaceUseReportRequest request = 1326 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1327 rss.reportRegionSpaceUse(null, request); 1328 // Record the number of size reports sent 1329 if (metricsRegionServer != null) { 1330 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1331 } 1332 } 1333 1334 /** 1335 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1336 * 1337 * @param regionSizeStore The size in bytes of regions 1338 * @return The corresponding protocol buffer message. 1339 */ 1340 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1341 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1342 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1343 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1344 } 1345 return request.build(); 1346 } 1347 1348 /** 1349 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} 1350 * protobuf message. 1351 * 1352 * @param regionInfo The RegionInfo 1353 * @param sizeInBytes The size in bytes of the Region 1354 * @return The protocol buffer 1355 */ 1356 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1357 return RegionSpaceUse.newBuilder() 1358 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1359 .setRegionSize(Objects.requireNonNull(sizeInBytes)) 1360 .build(); 1361 } 1362 1363 ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1364 throws IOException { 1365 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1366 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1367 // the wrapper to compute those numbers in one place. 1368 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1369 // Instead they should be stored in an HBase table so that external visibility into HBase is 1370 // improved; Additionally the load balancer will be able to take advantage of a more complete 1371 // history. 1372 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1373 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1374 long usedMemory = -1L; 1375 long maxMemory = -1L; 1376 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1377 if (usage != null) { 1378 usedMemory = usage.getUsed(); 1379 maxMemory = usage.getMax(); 1380 } 1381 1382 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1383 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1384 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1385 serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024)); 1386 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1387 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1388 Builder coprocessorBuilder = Coprocessor.newBuilder(); 1389 for (String coprocessor : coprocessors) { 1390 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1391 } 1392 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1393 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1394 for (HRegion region : regions) { 1395 if (region.getCoprocessorHost() != null) { 1396 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1397 Iterator<String> iterator = regionCoprocessors.iterator(); 1398 while (iterator.hasNext()) { 1399 serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build()); 1400 } 1401 } 1402 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1403 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1404 .getCoprocessors()) { 1405 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1406 } 1407 } 1408 serverLoad.setReportStartTime(reportStartTime); 1409 serverLoad.setReportEndTime(reportEndTime); 1410 if (this.infoServer != null) { 1411 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1412 } else { 1413 serverLoad.setInfoServerPort(-1); 1414 } 1415 1416 // for the replicationLoad purpose. Only need to get from one executorService 1417 // either source or sink will get the same info 1418 ReplicationSourceService rsources = getReplicationSourceService(); 1419 1420 if (rsources != null) { 1421 // always refresh first to get the latest value 1422 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); 1423 if (rLoad != null) { 1424 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1425 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) { 1426 serverLoad.addReplLoadSource(rLS); 1427 } 1428 } 1429 } 1430 1431 return serverLoad.build(); 1432 } 1433 1434 String getOnlineRegionsAsPrintableString() { 1435 StringBuilder sb = new StringBuilder(); 1436 for (Region r: this.onlineRegions.values()) { 1437 if (sb.length() > 0) sb.append(", "); 1438 sb.append(r.getRegionInfo().getEncodedName()); 1439 } 1440 return sb.toString(); 1441 } 1442 1443 /** 1444 * Wait on regions close. 1445 */ 1446 private void waitOnAllRegionsToClose(final boolean abort) { 1447 // Wait till all regions are closed before going out. 1448 int lastCount = -1; 1449 long previousLogTime = 0; 1450 Set<String> closedRegions = new HashSet<>(); 1451 boolean interrupted = false; 1452 try { 1453 while (!isOnlineRegionsEmpty()) { 1454 int count = getNumberOfOnlineRegions(); 1455 // Only print a message if the count of regions has changed. 1456 if (count != lastCount) { 1457 // Log every second at most 1458 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 1459 previousLogTime = System.currentTimeMillis(); 1460 lastCount = count; 1461 LOG.info("Waiting on " + count + " regions to close"); 1462 // Only print out regions still closing if a small number else will 1463 // swamp the log. 1464 if (count < 10 && LOG.isDebugEnabled()) { 1465 LOG.debug("Online Regions=" + this.onlineRegions); 1466 } 1467 } 1468 } 1469 // Ensure all user regions have been sent a close. Use this to 1470 // protect against the case where an open comes in after we start the 1471 // iterator of onlineRegions to close all user regions. 1472 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1473 RegionInfo hri = e.getValue().getRegionInfo(); 1474 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) && 1475 !closedRegions.contains(hri.getEncodedName())) { 1476 closedRegions.add(hri.getEncodedName()); 1477 // Don't update zk with this close transition; pass false. 1478 closeRegionIgnoreErrors(hri, abort); 1479 } 1480 } 1481 // No regions in RIT, we could stop waiting now. 1482 if (this.regionsInTransitionInRS.isEmpty()) { 1483 if (!isOnlineRegionsEmpty()) { 1484 LOG.info("We were exiting though online regions are not empty," + 1485 " because some regions failed closing"); 1486 } 1487 break; 1488 } 1489 if (sleep(200)) { 1490 interrupted = true; 1491 } 1492 } 1493 } finally { 1494 if (interrupted) { 1495 Thread.currentThread().interrupt(); 1496 } 1497 } 1498 } 1499 1500 private boolean sleep(long millis) { 1501 boolean interrupted = false; 1502 try { 1503 Thread.sleep(millis); 1504 } catch (InterruptedException e) { 1505 LOG.warn("Interrupted while sleeping"); 1506 interrupted = true; 1507 } 1508 return interrupted; 1509 } 1510 1511 private void shutdownWAL(final boolean close) { 1512 if (this.walFactory != null) { 1513 try { 1514 if (close) { 1515 walFactory.close(); 1516 } else { 1517 walFactory.shutdown(); 1518 } 1519 } catch (Throwable e) { 1520 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1521 LOG.error("Shutdown / close of WAL failed: " + e); 1522 LOG.debug("Shutdown / close exception details:", e); 1523 } 1524 } 1525 } 1526 1527 /* 1528 * Run init. Sets up wal and starts up all server threads. 1529 * 1530 * @param c Extra configuration. 1531 */ 1532 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1533 throws IOException { 1534 try { 1535 boolean updateRootDir = false; 1536 for (NameStringPair e : c.getMapEntriesList()) { 1537 String key = e.getName(); 1538 // The hostname the master sees us as. 1539 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1540 String hostnameFromMasterPOV = e.getValue(); 1541 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(), 1542 this.startcode); 1543 if (!StringUtils.isBlank(useThisHostnameInstead) && 1544 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) { 1545 String msg = "Master passed us a different hostname to use; was=" + 1546 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV; 1547 LOG.error(msg); 1548 throw new IOException(msg); 1549 } 1550 if (StringUtils.isBlank(useThisHostnameInstead) && 1551 !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) { 1552 String msg = "Master passed us a different hostname to use; was=" + 1553 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV; 1554 LOG.error(msg); 1555 } 1556 continue; 1557 } 1558 1559 String value = e.getValue(); 1560 if (key.equals(HConstants.HBASE_DIR)) { 1561 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1562 updateRootDir = true; 1563 } 1564 } 1565 1566 if (LOG.isDebugEnabled()) { 1567 LOG.debug("Config from master: " + key + "=" + value); 1568 } 1569 this.conf.set(key, value); 1570 } 1571 // Set our ephemeral znode up in zookeeper now we have a name. 1572 createMyEphemeralNode(); 1573 1574 if (updateRootDir) { 1575 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1576 initializeFileSystem(); 1577 } 1578 1579 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1580 // config param for task trackers, but we can piggyback off of it. 1581 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1582 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1583 } 1584 1585 // Save it in a file, this will allow to see if we crash 1586 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1587 1588 // This call sets up an initialized replication and WAL. Later we start it up. 1589 setupWALAndReplication(); 1590 // Init in here rather than in constructor after thread name has been set 1591 this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1592 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1593 this.metricsRegionServer = new MetricsRegionServer(metricsRegionServerImpl, 1594 conf, metricsTable); 1595 // Now that we have a metrics source, start the pause monitor 1596 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1597 pauseMonitor.start(); 1598 1599 // There is a rare case where we do NOT want services to start. Check config. 1600 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1601 startServices(); 1602 } 1603 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1604 // or make sense of it. 1605 startReplicationService(); 1606 1607 1608 // Set up ZK 1609 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + 1610 ", sessionid=0x" + 1611 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1612 1613 // Wake up anyone waiting for this server to online 1614 synchronized (online) { 1615 online.set(true); 1616 online.notifyAll(); 1617 } 1618 } catch (Throwable e) { 1619 stop("Failed initialization"); 1620 throw convertThrowableToIOE(cleanup(e, "Failed init"), 1621 "Region server startup failed"); 1622 } finally { 1623 sleeper.skipSleepCycle(); 1624 } 1625 } 1626 1627 protected void initializeMemStoreChunkCreator() { 1628 if (MemStoreLAB.isEnabled(conf)) { 1629 // MSLAB is enabled. So initialize MemStoreChunkPool 1630 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from 1631 // it. 1632 Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf); 1633 long globalMemStoreSize = pair.getFirst(); 1634 boolean offheap = this.regionServerAccounting.isOffheap(); 1635 // When off heap memstore in use, take full area for chunk pool. 1636 float poolSizePercentage = offheap? 1.0F: 1637 conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); 1638 float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, 1639 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); 1640 int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); 1641 // init the chunkCreator 1642 ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, 1643 initialCountPercentage, this.hMemManager); 1644 } 1645 } 1646 1647 private void startHeapMemoryManager() { 1648 if (this.blockCache != null) { 1649 this.hMemManager = 1650 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1651 this.hMemManager.start(getChoreService()); 1652 } 1653 } 1654 1655 private void createMyEphemeralNode() throws KeeperException, IOException { 1656 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1657 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1658 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1659 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1660 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1661 } 1662 1663 private void deleteMyEphemeralNode() throws KeeperException { 1664 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1665 } 1666 1667 @Override 1668 public RegionServerAccounting getRegionServerAccounting() { 1669 return regionServerAccounting; 1670 } 1671 1672 /* 1673 * @param r Region to get RegionLoad for. 1674 * @param regionLoadBldr the RegionLoad.Builder, can be null 1675 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1676 * @return RegionLoad instance. 1677 * 1678 * @throws IOException 1679 */ 1680 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1681 RegionSpecifier.Builder regionSpecifier) throws IOException { 1682 byte[] name = r.getRegionInfo().getRegionName(); 1683 int stores = 0; 1684 int storefiles = 0; 1685 int storeUncompressedSizeMB = 0; 1686 int storefileSizeMB = 0; 1687 int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024); 1688 long storefileIndexSizeKB = 0; 1689 int rootLevelIndexSizeKB = 0; 1690 int totalStaticIndexSizeKB = 0; 1691 int totalStaticBloomSizeKB = 0; 1692 long totalCompactingKVs = 0; 1693 long currentCompactedKVs = 0; 1694 List<HStore> storeList = r.getStores(); 1695 stores += storeList.size(); 1696 for (HStore store : storeList) { 1697 storefiles += store.getStorefilesCount(); 1698 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024); 1699 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); 1700 //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1701 storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024; 1702 CompactionProgress progress = store.getCompactionProgress(); 1703 if (progress != null) { 1704 totalCompactingKVs += progress.getTotalCompactingKVs(); 1705 currentCompactedKVs += progress.currentCompactedKVs; 1706 } 1707 rootLevelIndexSizeKB += (int) (store.getStorefilesRootLevelIndexSize() / 1024); 1708 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024); 1709 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024); 1710 } 1711 1712 float dataLocality = 1713 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname()); 1714 if (regionLoadBldr == null) { 1715 regionLoadBldr = RegionLoad.newBuilder(); 1716 } 1717 if (regionSpecifier == null) { 1718 regionSpecifier = RegionSpecifier.newBuilder(); 1719 } 1720 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1721 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1722 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()) 1723 .setStores(stores) 1724 .setStorefiles(storefiles) 1725 .setStoreUncompressedSizeMB(storeUncompressedSizeMB) 1726 .setStorefileSizeMB(storefileSizeMB) 1727 .setMemStoreSizeMB(memstoreSizeMB) 1728 .setStorefileIndexSizeKB(storefileIndexSizeKB) 1729 .setRootIndexSizeKB(rootLevelIndexSizeKB) 1730 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1731 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1732 .setReadRequestsCount(r.getReadRequestsCount()) 1733 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1734 .setWriteRequestsCount(r.getWriteRequestsCount()) 1735 .setTotalCompactingKVs(totalCompactingKVs) 1736 .setCurrentCompactedKVs(currentCompactedKVs) 1737 .setDataLocality(dataLocality) 1738 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); 1739 r.setCompleteSequenceId(regionLoadBldr); 1740 1741 return regionLoadBldr.build(); 1742 } 1743 1744 /** 1745 * @param encodedRegionName 1746 * @return An instance of RegionLoad. 1747 */ 1748 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1749 HRegion r = onlineRegions.get(encodedRegionName); 1750 return r != null ? createRegionLoad(r, null, null) : null; 1751 } 1752 1753 /* 1754 * Inner class that runs on a long period checking if regions need compaction. 1755 */ 1756 private static class CompactionChecker extends ScheduledChore { 1757 private final HRegionServer instance; 1758 private final int majorCompactPriority; 1759 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1760 //Iteration is 1-based rather than 0-based so we don't check for compaction 1761 // immediately upon region server startup 1762 private long iteration = 1; 1763 1764 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1765 super("CompactionChecker", stopper, sleepTime); 1766 this.instance = h; 1767 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1768 1769 /* MajorCompactPriority is configurable. 1770 * If not set, the compaction will use default priority. 1771 */ 1772 this.majorCompactPriority = this.instance.conf. 1773 getInt("hbase.regionserver.compactionChecker.majorCompactPriority", 1774 DEFAULT_PRIORITY); 1775 } 1776 1777 @Override 1778 protected void chore() { 1779 for (Region r : this.instance.onlineRegions.values()) { 1780 if (r == null) { 1781 continue; 1782 } 1783 HRegion hr = (HRegion) r; 1784 for (HStore s : hr.stores.values()) { 1785 try { 1786 long multiplier = s.getCompactionCheckMultiplier(); 1787 assert multiplier > 0; 1788 if (iteration % multiplier != 0) { 1789 continue; 1790 } 1791 if (s.needsCompaction()) { 1792 // Queue a compaction. Will recognize if major is needed. 1793 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1794 getName() + " requests compaction"); 1795 } else if (s.shouldPerformMajorCompaction()) { 1796 s.triggerMajorCompaction(); 1797 if (majorCompactPriority == DEFAULT_PRIORITY || 1798 majorCompactPriority > hr.getCompactPriority()) { 1799 this.instance.compactSplitThread.requestCompaction(hr, s, 1800 getName() + " requests major compaction; use default priority", 1801 Store.NO_PRIORITY, 1802 CompactionLifeCycleTracker.DUMMY, null); 1803 } else { 1804 this.instance.compactSplitThread.requestCompaction(hr, s, 1805 getName() + " requests major compaction; use configured priority", 1806 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1807 } 1808 } 1809 } catch (IOException e) { 1810 LOG.warn("Failed major compaction check on " + r, e); 1811 } 1812 } 1813 } 1814 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1815 } 1816 } 1817 1818 static class PeriodicMemStoreFlusher extends ScheduledChore { 1819 final HRegionServer server; 1820 final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1821 final static int MIN_DELAY_TIME = 0; // millisec 1822 1823 final int rangeOfDelay; 1824 public PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1825 super("MemstoreFlusherChore", server, cacheFlushInterval); 1826 this.server = server; 1827 1828 this.rangeOfDelay = this.server.conf.getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", 1829 RANGE_OF_DELAY)*1000; 1830 } 1831 1832 @Override 1833 protected void chore() { 1834 final StringBuilder whyFlush = new StringBuilder(); 1835 for (HRegion r : this.server.onlineRegions.values()) { 1836 if (r == null) continue; 1837 if (r.shouldFlush(whyFlush)) { 1838 FlushRequester requester = server.getFlushRequester(); 1839 if (requester != null) { 1840 long randomDelay = (long) RandomUtils.nextInt(0, rangeOfDelay) + MIN_DELAY_TIME; 1841 //Throttle the flushes by putting a delay. If we don't throttle, and there 1842 //is a balanced write-load on the regions in a table, we might end up 1843 //overwhelming the filesystem with too many flushes at once. 1844 if (requester.requestDelayedFlush(r, randomDelay, false)) { 1845 LOG.info("{} requesting flush of {} because {} after random delay {} ms", 1846 getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), 1847 randomDelay); 1848 } 1849 } 1850 } 1851 } 1852 } 1853 } 1854 1855 /** 1856 * Report the status of the server. A server is online once all the startup is 1857 * completed (setting up filesystem, starting executorService threads, etc.). This 1858 * method is designed mostly to be useful in tests. 1859 * 1860 * @return true if online, false if not. 1861 */ 1862 public boolean isOnline() { 1863 return online.get(); 1864 } 1865 1866 /** 1867 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1868 * be hooked up to WAL. 1869 */ 1870 private void setupWALAndReplication() throws IOException { 1871 WALFactory factory = new WALFactory(conf, serverName.toString()); 1872 1873 // TODO Replication make assumptions here based on the default filesystem impl 1874 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1875 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1876 1877 Path logDir = new Path(walRootDir, logName); 1878 LOG.debug("logDir={}", logDir); 1879 if (this.walFs.exists(logDir)) { 1880 throw new RegionServerRunningException( 1881 "Region server has already created directory at " + this.serverName.toString()); 1882 } 1883 // Always create wal directory as now we need this when master restarts to find out the live 1884 // region servers. 1885 if (!this.walFs.mkdirs(logDir)) { 1886 throw new IOException("Can not create wal directory " + logDir); 1887 } 1888 // Instantiate replication if replication enabled. Pass it the log directories. 1889 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, 1890 factory.getWALProvider()); 1891 this.walFactory = factory; 1892 } 1893 1894 /** 1895 * Start up replication source and sink handlers. 1896 * @throws IOException 1897 */ 1898 private void startReplicationService() throws IOException { 1899 if (this.replicationSourceHandler == this.replicationSinkHandler && 1900 this.replicationSourceHandler != null) { 1901 this.replicationSourceHandler.startReplicationService(); 1902 } else { 1903 if (this.replicationSourceHandler != null) { 1904 this.replicationSourceHandler.startReplicationService(); 1905 } 1906 if (this.replicationSinkHandler != null) { 1907 this.replicationSinkHandler.startReplicationService(); 1908 } 1909 } 1910 } 1911 1912 1913 public MetricsRegionServer getRegionServerMetrics() { 1914 return this.metricsRegionServer; 1915 } 1916 1917 /** 1918 * @return Master address tracker instance. 1919 */ 1920 public MasterAddressTracker getMasterAddressTracker() { 1921 return this.masterAddressTracker; 1922 } 1923 1924 /* 1925 * Start maintenance Threads, Server, Worker and lease checker threads. 1926 * Start all threads we need to run. This is called after we've successfully 1927 * registered with the Master. 1928 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we 1929 * get an unhandled exception. We cannot set the handler on all threads. 1930 * Server's internal Listener thread is off limits. For Server, if an OOME, it 1931 * waits a while then retries. Meantime, a flush or a compaction that tries to 1932 * run should trigger same critical condition and the shutdown will run. On 1933 * its way out, this server will shut down Server. Leases are sort of 1934 * inbetween. It has an internal thread that while it inherits from Chore, it 1935 * keeps its own internal stop mechanism so needs to be stopped by this 1936 * hosting server. Worker logs the exception and exits. 1937 */ 1938 private void startServices() throws IOException { 1939 if (!isStopped() && !isAborted()) { 1940 initializeThreads(); 1941 } 1942 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); 1943 this.secureBulkLoadManager.start(); 1944 1945 // Health checker thread. 1946 if (isHealthCheckerConfigured()) { 1947 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1948 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1949 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1950 } 1951 1952 this.walRoller = new LogRoller(this); 1953 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1954 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 1955 1956 // Create the CompactedFileDischarger chore executorService. This chore helps to 1957 // remove the compacted files that will no longer be used in reads. 1958 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1959 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1960 int cleanerInterval = 1961 conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1962 this.compactedFileDischarger = 1963 new CompactedHFilesDischarger(cleanerInterval, this, this); 1964 choreService.scheduleChore(compactedFileDischarger); 1965 1966 // Start executor services 1967 this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION, 1968 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1969 this.executorService.startExecutorService(ExecutorType.RS_OPEN_META, 1970 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); 1971 this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION, 1972 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3)); 1973 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION, 1974 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); 1975 this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META, 1976 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); 1977 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1978 this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, 1979 conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); 1980 } 1981 this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( 1982 HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); 1983 // Start the threads for compacted files discharger 1984 this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, 1985 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); 1986 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1987 this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, 1988 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1989 conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); 1990 } 1991 this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER, 1992 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); 1993 this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE, 1994 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1)); 1995 1996 Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", 1997 uncaughtExceptionHandler); 1998 if (this.cacheFlusher != null) { 1999 this.cacheFlusher.start(uncaughtExceptionHandler); 2000 } 2001 Threads.setDaemonThreadRunning(this.procedureResultReporter, 2002 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 2003 2004 if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); 2005 if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); 2006 if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); 2007 if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); 2008 if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); 2009 if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); 2010 if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore); 2011 2012 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 2013 // an unhandled exception, it will just exit. 2014 Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", 2015 uncaughtExceptionHandler); 2016 2017 // Create the log splitting worker and start it 2018 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 2019 // quite a while inside Connection layer. The worker won't be available for other 2020 // tasks even after current task is preempted after a split task times out. 2021 Configuration sinkConf = HBaseConfiguration.create(conf); 2022 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2023 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 2024 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2025 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 2026 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 2027 if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 2028 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { 2029 // SplitLogWorker needs csm. If none, don't start this. 2030 this.splitLogWorker = new SplitLogWorker(sinkConf, this, 2031 this, walFactory); 2032 splitLogWorker.start(); 2033 } else { 2034 LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null"); 2035 } 2036 2037 // Memstore services. 2038 startHeapMemoryManager(); 2039 // Call it after starting HeapMemoryManager. 2040 initializeMemStoreChunkCreator(); 2041 } 2042 2043 private void initializeThreads() throws IOException { 2044 // Cache flushing thread. 2045 this.cacheFlusher = new MemStoreFlusher(conf, this); 2046 2047 // Compaction thread 2048 this.compactSplitThread = new CompactSplit(this); 2049 2050 // Background thread to check for compactions; needed if region has not gotten updates 2051 // in a while. It will take care of not checking too frequently on store-by-store basis. 2052 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 2053 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 2054 this.leases = new Leases(this.threadWakeFrequency); 2055 2056 // Create the thread to clean the moved regions list 2057 movedRegionsCleaner = MovedRegionsCleaner.create(this); 2058 2059 if (this.nonceManager != null) { 2060 // Create the scheduled chore that cleans up nonces. 2061 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2062 } 2063 2064 // Setup the Quota Manager 2065 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2066 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2067 2068 if (QuotaUtil.isQuotaEnabled(conf)) { 2069 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2070 } 2071 2072 2073 boolean onlyMetaRefresh = false; 2074 int storefileRefreshPeriod = conf.getInt( 2075 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2076 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2077 if (storefileRefreshPeriod == 0) { 2078 storefileRefreshPeriod = conf.getInt( 2079 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2080 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2081 onlyMetaRefresh = true; 2082 } 2083 if (storefileRefreshPeriod > 0) { 2084 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, 2085 onlyMetaRefresh, this, this); 2086 } 2087 registerConfigurationObservers(); 2088 } 2089 2090 private void registerConfigurationObservers() { 2091 // Registering the compactSplitThread object with the ConfigurationManager. 2092 configurationManager.registerObserver(this.compactSplitThread); 2093 configurationManager.registerObserver(this.rpcServices); 2094 configurationManager.registerObserver(this); 2095 } 2096 2097 /** 2098 * Puts up the webui. 2099 * @return Returns final port -- maybe different from what we started with. 2100 * @throws IOException 2101 */ 2102 private int putUpWebUI() throws IOException { 2103 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 2104 HConstants.DEFAULT_REGIONSERVER_INFOPORT); 2105 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); 2106 2107 if(this instanceof HMaster) { 2108 port = conf.getInt(HConstants.MASTER_INFO_PORT, 2109 HConstants.DEFAULT_MASTER_INFOPORT); 2110 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); 2111 } 2112 // -1 is for disabling info server 2113 if (port < 0) return port; 2114 2115 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { 2116 String msg = 2117 "Failed to start http info server. Address " + addr 2118 + " does not belong to this host. Correct configuration parameter: " 2119 + "hbase.regionserver.info.bindAddress"; 2120 LOG.error(msg); 2121 throw new IOException(msg); 2122 } 2123 // check if auto port bind enabled 2124 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, 2125 false); 2126 while (true) { 2127 try { 2128 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf); 2129 infoServer.addServlet("dump", "/dump", getDumpServlet()); 2130 configureInfoServer(); 2131 this.infoServer.start(); 2132 break; 2133 } catch (BindException e) { 2134 if (!auto) { 2135 // auto bind disabled throw BindException 2136 LOG.error("Failed binding http info server to port: " + port); 2137 throw e; 2138 } 2139 // auto bind enabled, try to use another port 2140 LOG.info("Failed binding http info server to port: " + port); 2141 port++; 2142 } 2143 } 2144 port = this.infoServer.getPort(); 2145 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port); 2146 int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT, 2147 HConstants.DEFAULT_MASTER_INFOPORT); 2148 conf.setInt("hbase.master.info.port.orig", masterInfoPort); 2149 conf.setInt(HConstants.MASTER_INFO_PORT, port); 2150 return port; 2151 } 2152 2153 /* 2154 * Verify that server is healthy 2155 */ 2156 private boolean isHealthy() { 2157 if (!fsOk) { 2158 // File system problem 2159 return false; 2160 } 2161 // Verify that all threads are alive 2162 boolean healthy = (this.leases == null || this.leases.isAlive()) 2163 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2164 && (this.walRoller == null || this.walRoller.isAlive()) 2165 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2166 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2167 if (!healthy) { 2168 stop("One or more threads are no longer alive -- stop"); 2169 } 2170 return healthy; 2171 } 2172 2173 @Override 2174 public List<WAL> getWALs() throws IOException { 2175 return walFactory.getWALs(); 2176 } 2177 2178 @Override 2179 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2180 try { 2181 WAL wal = walFactory.getWAL(regionInfo); 2182 if (this.walRoller != null) { 2183 this.walRoller.addWAL(wal); 2184 } 2185 return wal; 2186 }catch (FailedCloseWALAfterInitializedErrorException ex) { 2187 // see HBASE-21751 for details 2188 abort("wal can not clean up after init failed", ex); 2189 throw ex; 2190 } 2191 } 2192 2193 public LogRoller getWalRoller() { 2194 return walRoller; 2195 } 2196 2197 @Override 2198 public Connection getConnection() { 2199 return getClusterConnection(); 2200 } 2201 2202 @Override 2203 public ClusterConnection getClusterConnection() { 2204 return this.clusterConnection; 2205 } 2206 2207 @Override 2208 public void stop(final String msg) { 2209 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2210 } 2211 2212 /** 2213 * Stops the regionserver. 2214 * @param msg Status message 2215 * @param force True if this is a regionserver abort 2216 * @param user The user executing the stop request, or null if no user is associated 2217 */ 2218 public void stop(final String msg, final boolean force, final User user) { 2219 if (!this.stopped) { 2220 LOG.info("***** STOPPING region server '" + this + "' *****"); 2221 if (this.rsHost != null) { 2222 // when forced via abort don't allow CPs to override 2223 try { 2224 this.rsHost.preStop(msg, user); 2225 } catch (IOException ioe) { 2226 if (!force) { 2227 LOG.warn("The region server did not stop", ioe); 2228 return; 2229 } 2230 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2231 } 2232 } 2233 this.stopped = true; 2234 LOG.info("STOPPED: " + msg); 2235 // Wakes run() if it is sleeping 2236 sleeper.skipSleepCycle(); 2237 } 2238 } 2239 2240 public void waitForServerOnline(){ 2241 while (!isStopped() && !isOnline()) { 2242 synchronized (online) { 2243 try { 2244 online.wait(msgInterval); 2245 } catch (InterruptedException ie) { 2246 Thread.currentThread().interrupt(); 2247 break; 2248 } 2249 } 2250 } 2251 } 2252 2253 @Override 2254 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2255 HRegion r = context.getRegion(); 2256 long openProcId = context.getOpenProcId(); 2257 long masterSystemTime = context.getMasterSystemTime(); 2258 rpcServices.checkOpen(); 2259 LOG.info("Post open deploy tasks for {}, openProcId={}, masterSystemTime={}", 2260 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2261 // Do checks to see if we need to compact (references or too many files) 2262 for (HStore s : r.stores.values()) { 2263 if (s.hasReferences() || s.needsCompaction()) { 2264 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2265 } 2266 } 2267 long openSeqNum = r.getOpenSeqNum(); 2268 if (openSeqNum == HConstants.NO_SEQNUM) { 2269 // If we opened a region, we should have read some sequence number from it. 2270 LOG.error( 2271 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2272 openSeqNum = 0; 2273 } 2274 2275 // Notify master 2276 if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2277 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) { 2278 throw new IOException( 2279 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2280 } 2281 2282 triggerFlushInPrimaryRegion(r); 2283 2284 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2285 } 2286 2287 @Override 2288 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2289 TransitionCode code = context.getCode(); 2290 long openSeqNum = context.getOpenSeqNum(); 2291 long masterSystemTime = context.getMasterSystemTime(); 2292 RegionInfo[] hris = context.getHris(); 2293 long[] procIds = context.getProcIds(); 2294 2295 if (TEST_SKIP_REPORTING_TRANSITION) { 2296 // This is for testing only in case there is no master 2297 // to handle the region transition report at all. 2298 if (code == TransitionCode.OPENED) { 2299 Preconditions.checkArgument(hris != null && hris.length == 1); 2300 if (hris[0].isMetaRegion()) { 2301 try { 2302 MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, 2303 hris[0].getReplicaId(),State.OPEN); 2304 } catch (KeeperException e) { 2305 LOG.info("Failed to update meta location", e); 2306 return false; 2307 } 2308 } else { 2309 try { 2310 MetaTableAccessor.updateRegionLocation(clusterConnection, 2311 hris[0], serverName, openSeqNum, masterSystemTime); 2312 } catch (IOException e) { 2313 LOG.info("Failed to update meta", e); 2314 return false; 2315 } 2316 } 2317 } 2318 return true; 2319 } 2320 2321 ReportRegionStateTransitionRequest.Builder builder = 2322 ReportRegionStateTransitionRequest.newBuilder(); 2323 builder.setServer(ProtobufUtil.toServerName(serverName)); 2324 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2325 transition.setTransitionCode(code); 2326 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2327 transition.setOpenSeqNum(openSeqNum); 2328 } 2329 for (RegionInfo hri: hris) { 2330 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2331 } 2332 for (long procId: procIds) { 2333 transition.addProcId(procId); 2334 } 2335 ReportRegionStateTransitionRequest request = builder.build(); 2336 int tries = 0; 2337 long pauseTime = INIT_PAUSE_TIME_MS; 2338 // Keep looping till we get an error. We want to send reports even though server is going down. 2339 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2340 // HRegionServer does down. 2341 while (this.clusterConnection != null && !this.clusterConnection.isClosed()) { 2342 RegionServerStatusService.BlockingInterface rss = rssStub; 2343 try { 2344 if (rss == null) { 2345 createRegionServerStatusStub(); 2346 continue; 2347 } 2348 ReportRegionStateTransitionResponse response = 2349 rss.reportRegionStateTransition(null, request); 2350 if (response.hasErrorMessage()) { 2351 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2352 break; 2353 } 2354 // Log if we had to retry else don't log unless TRACE. We want to 2355 // know if were successful after an attempt showed in logs as failed. 2356 if (tries > 0 || LOG.isTraceEnabled()) { 2357 LOG.info("TRANSITION REPORTED " + request); 2358 } 2359 // NOTE: Return mid-method!!! 2360 return true; 2361 } catch (ServiceException se) { 2362 IOException ioe = ProtobufUtil.getRemoteException(se); 2363 boolean pause = 2364 ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException 2365 || ioe instanceof CallQueueTooBigException; 2366 if (pause) { 2367 // Do backoff else we flood the Master with requests. 2368 pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries); 2369 } else { 2370 pauseTime = INIT_PAUSE_TIME_MS; // Reset. 2371 } 2372 LOG.info("Failed report transition " + 2373 TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" + 2374 (pause? 2375 " after " + pauseTime + "ms delay (Master is coming online...).": 2376 " immediately."), 2377 ioe); 2378 if (pause) Threads.sleep(pauseTime); 2379 tries++; 2380 if (rssStub == rss) { 2381 rssStub = null; 2382 } 2383 } 2384 } 2385 return false; 2386 } 2387 2388 /** 2389 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2390 * block this thread. See RegionReplicaFlushHandler for details. 2391 */ 2392 void triggerFlushInPrimaryRegion(final HRegion region) { 2393 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2394 return; 2395 } 2396 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) || 2397 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled( 2398 region.conf)) { 2399 region.setReadsEnabled(true); 2400 return; 2401 } 2402 2403 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2404 // RegionReplicaFlushHandler might reset this. 2405 2406 // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2407 if (this.executorService != null) { 2408 this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, 2409 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); 2410 } 2411 } 2412 2413 @Override 2414 public RpcServerInterface getRpcServer() { 2415 return rpcServices.rpcServer; 2416 } 2417 2418 @VisibleForTesting 2419 public RSRpcServices getRSRpcServices() { 2420 return rpcServices; 2421 } 2422 2423 /** 2424 * Cause the server to exit without closing the regions it is serving, the log 2425 * it is using and without notifying the master. Used unit testing and on 2426 * catastrophic events such as HDFS is yanked out from under hbase or we OOME. 2427 * 2428 * @param reason 2429 * the reason we are aborting 2430 * @param cause 2431 * the exception that caused the abort, or null 2432 */ 2433 @Override 2434 public void abort(String reason, Throwable cause) { 2435 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2436 if (cause != null) { 2437 LOG.error(HBaseMarkers.FATAL, msg, cause); 2438 } else { 2439 LOG.error(HBaseMarkers.FATAL, msg); 2440 } 2441 setAbortRequested(); 2442 // HBASE-4014: show list of coprocessors that were loaded to help debug 2443 // regionserver crashes.Note that we're implicitly using 2444 // java.util.HashSet's toString() method to print the coprocessor names. 2445 LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " + 2446 CoprocessorHost.getLoadedCoprocessors()); 2447 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2448 try { 2449 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2450 } catch (MalformedObjectNameException | IOException e) { 2451 LOG.warn("Failed dumping metrics", e); 2452 } 2453 2454 // Do our best to report our abort to the master, but this may not work 2455 try { 2456 if (cause != null) { 2457 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2458 } 2459 // Report to the master but only if we have already registered with the master. 2460 if (rssStub != null && this.serverName != null) { 2461 ReportRSFatalErrorRequest.Builder builder = 2462 ReportRSFatalErrorRequest.newBuilder(); 2463 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2464 builder.setErrorMessage(msg); 2465 rssStub.reportRSFatalError(null, builder.build()); 2466 } 2467 } catch (Throwable t) { 2468 LOG.warn("Unable to report fatal error to master", t); 2469 } 2470 // shutdown should be run as the internal user 2471 stop(reason, true, null); 2472 } 2473 2474 protected final void setAbortRequested() { 2475 this.abortRequested = true; 2476 } 2477 2478 /** 2479 * @see HRegionServer#abort(String, Throwable) 2480 */ 2481 public void abort(String reason) { 2482 abort(reason, null); 2483 } 2484 2485 @Override 2486 public boolean isAborted() { 2487 return this.abortRequested; 2488 } 2489 2490 /* 2491 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup 2492 * logs but it does close socket in case want to bring up server on old 2493 * hostname+port immediately. 2494 */ 2495 @VisibleForTesting 2496 protected void kill() { 2497 this.killed = true; 2498 abort("Simulated kill"); 2499 } 2500 2501 /** 2502 * Called on stop/abort before closing the cluster connection and meta locator. 2503 */ 2504 protected void sendShutdownInterrupt() { 2505 } 2506 2507 /** 2508 * Wait on all threads to finish. Presumption is that all closes and stops 2509 * have already been called. 2510 */ 2511 protected void stopServiceThreads() { 2512 // clean up the scheduled chores 2513 if (this.choreService != null) { 2514 choreService.cancelChore(nonceManagerChore); 2515 choreService.cancelChore(compactionChecker); 2516 choreService.cancelChore(periodicFlusher); 2517 choreService.cancelChore(healthCheckChore); 2518 choreService.cancelChore(storefileRefresher); 2519 choreService.cancelChore(movedRegionsCleaner); 2520 choreService.cancelChore(fsUtilizationChore); 2521 // clean up the remaining scheduled chores (in case we missed out any) 2522 choreService.shutdown(); 2523 } 2524 2525 if (this.cacheFlusher != null) { 2526 this.cacheFlusher.join(); 2527 } 2528 2529 if (this.spanReceiverHost != null) { 2530 this.spanReceiverHost.closeReceivers(); 2531 } 2532 if (this.walRoller != null) { 2533 this.walRoller.close(); 2534 } 2535 if (this.compactSplitThread != null) { 2536 this.compactSplitThread.join(); 2537 } 2538 if (this.executorService != null) this.executorService.shutdown(); 2539 if (this.replicationSourceHandler != null && 2540 this.replicationSourceHandler == this.replicationSinkHandler) { 2541 this.replicationSourceHandler.stopReplicationService(); 2542 } else { 2543 if (this.replicationSourceHandler != null) { 2544 this.replicationSourceHandler.stopReplicationService(); 2545 } 2546 if (this.replicationSinkHandler != null) { 2547 this.replicationSinkHandler.stopReplicationService(); 2548 } 2549 } 2550 } 2551 2552 /** 2553 * @return Return the object that implements the replication 2554 * source executorService. 2555 */ 2556 @VisibleForTesting 2557 public ReplicationSourceService getReplicationSourceService() { 2558 return replicationSourceHandler; 2559 } 2560 2561 /** 2562 * @return Return the object that implements the replication 2563 * sink executorService. 2564 */ 2565 ReplicationSinkService getReplicationSinkService() { 2566 return replicationSinkHandler; 2567 } 2568 2569 /** 2570 * Get the current master from ZooKeeper and open the RPC connection to it. 2571 * To get a fresh connection, the current rssStub must be null. 2572 * Method will block until a master is available. You can break from this 2573 * block by requesting the server stop. 2574 * 2575 * @return master + port, or null if server has been stopped 2576 */ 2577 @VisibleForTesting 2578 protected synchronized ServerName createRegionServerStatusStub() { 2579 // Create RS stub without refreshing the master node from ZK, use cached data 2580 return createRegionServerStatusStub(false); 2581 } 2582 2583 /** 2584 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2585 * connection, the current rssStub must be null. Method will block until a master is available. 2586 * You can break from this block by requesting the server stop. 2587 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2588 * @return master + port, or null if server has been stopped 2589 */ 2590 @VisibleForTesting 2591 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2592 if (rssStub != null) { 2593 return masterAddressTracker.getMasterAddress(); 2594 } 2595 ServerName sn = null; 2596 long previousLogTime = 0; 2597 RegionServerStatusService.BlockingInterface intRssStub = null; 2598 LockService.BlockingInterface intLockStub = null; 2599 boolean interrupted = false; 2600 try { 2601 while (keepLooping()) { 2602 sn = this.masterAddressTracker.getMasterAddress(refresh); 2603 if (sn == null) { 2604 if (!keepLooping()) { 2605 // give up with no connection. 2606 LOG.debug("No master found and cluster is stopped; bailing out"); 2607 return null; 2608 } 2609 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 2610 LOG.debug("No master found; retry"); 2611 previousLogTime = System.currentTimeMillis(); 2612 } 2613 refresh = true; // let's try pull it from ZK directly 2614 if (sleep(200)) { 2615 interrupted = true; 2616 } 2617 continue; 2618 } 2619 2620 // If we are on the active master, use the shortcut 2621 if (this instanceof HMaster && sn.equals(getServerName())) { 2622 intRssStub = ((HMaster)this).getMasterRpcServices(); 2623 intLockStub = ((HMaster)this).getMasterRpcServices(); 2624 break; 2625 } 2626 try { 2627 BlockingRpcChannel channel = 2628 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), 2629 shortOperationTimeout); 2630 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2631 intLockStub = LockService.newBlockingStub(channel); 2632 break; 2633 } catch (IOException e) { 2634 if (System.currentTimeMillis() > (previousLogTime + 1000)) { 2635 e = e instanceof RemoteException ? 2636 ((RemoteException)e).unwrapRemoteException() : e; 2637 if (e instanceof ServerNotRunningYetException) { 2638 LOG.info("Master isn't available yet, retrying"); 2639 } else { 2640 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2641 } 2642 previousLogTime = System.currentTimeMillis(); 2643 } 2644 if (sleep(200)) { 2645 interrupted = true; 2646 } 2647 } 2648 } 2649 } finally { 2650 if (interrupted) { 2651 Thread.currentThread().interrupt(); 2652 } 2653 } 2654 this.rssStub = intRssStub; 2655 this.lockStub = intLockStub; 2656 return sn; 2657 } 2658 2659 /** 2660 * @return True if we should break loop because cluster is going down or 2661 * this server has been stopped or hdfs has gone bad. 2662 */ 2663 private boolean keepLooping() { 2664 return !this.stopped && isClusterUp(); 2665 } 2666 2667 /* 2668 * Let the master know we're here Run initialization using parameters passed 2669 * us by the master. 2670 * @return A Map of key/value configurations we got from the Master else 2671 * null if we failed to register. 2672 * @throws IOException 2673 */ 2674 private RegionServerStartupResponse reportForDuty() throws IOException { 2675 if (this.masterless) return RegionServerStartupResponse.getDefaultInstance(); 2676 ServerName masterServerName = createRegionServerStatusStub(true); 2677 if (masterServerName == null) return null; 2678 RegionServerStartupResponse result = null; 2679 try { 2680 rpcServices.requestCount.reset(); 2681 rpcServices.rpcGetRequestCount.reset(); 2682 rpcServices.rpcScanRequestCount.reset(); 2683 rpcServices.rpcMultiRequestCount.reset(); 2684 rpcServices.rpcMutateRequestCount.reset(); 2685 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2686 + rpcServices.isa.getPort() + ", startcode=" + this.startcode); 2687 long now = EnvironmentEdgeManager.currentTime(); 2688 int port = rpcServices.isa.getPort(); 2689 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2690 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2691 request.setUseThisHostnameInstead(useThisHostnameInstead); 2692 } 2693 request.setPort(port); 2694 request.setServerStartCode(this.startcode); 2695 request.setServerCurrentTime(now); 2696 result = this.rssStub.regionServerStartup(null, request.build()); 2697 } catch (ServiceException se) { 2698 IOException ioe = ProtobufUtil.getRemoteException(se); 2699 if (ioe instanceof ClockOutOfSyncException) { 2700 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", 2701 ioe); 2702 // Re-throw IOE will cause RS to abort 2703 throw ioe; 2704 } else if (ioe instanceof ServerNotRunningYetException) { 2705 LOG.debug("Master is not running yet"); 2706 } else { 2707 LOG.warn("error telling master we are up", se); 2708 } 2709 rssStub = null; 2710 } 2711 return result; 2712 } 2713 2714 @Override 2715 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2716 try { 2717 GetLastFlushedSequenceIdRequest req = 2718 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2719 RegionServerStatusService.BlockingInterface rss = rssStub; 2720 if (rss == null) { // Try to connect one more time 2721 createRegionServerStatusStub(); 2722 rss = rssStub; 2723 if (rss == null) { 2724 // Still no luck, we tried 2725 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2726 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2727 .build(); 2728 } 2729 } 2730 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2731 return RegionStoreSequenceIds.newBuilder() 2732 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2733 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2734 } catch (ServiceException e) { 2735 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2736 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2737 .build(); 2738 } 2739 } 2740 2741 /** 2742 * Closes all regions. Called on our way out. 2743 * Assumes that its not possible for new regions to be added to onlineRegions 2744 * while this method runs. 2745 */ 2746 protected void closeAllRegions(final boolean abort) { 2747 closeUserRegions(abort); 2748 closeMetaTableRegions(abort); 2749 } 2750 2751 /** 2752 * Close meta region if we carry it 2753 * @param abort Whether we're running an abort. 2754 */ 2755 void closeMetaTableRegions(final boolean abort) { 2756 HRegion meta = null; 2757 this.lock.writeLock().lock(); 2758 try { 2759 for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) { 2760 RegionInfo hri = e.getValue().getRegionInfo(); 2761 if (hri.isMetaRegion()) { 2762 meta = e.getValue(); 2763 } 2764 if (meta != null) break; 2765 } 2766 } finally { 2767 this.lock.writeLock().unlock(); 2768 } 2769 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2770 } 2771 2772 /** 2773 * Schedule closes on all user regions. 2774 * Should be safe calling multiple times because it wont' close regions 2775 * that are already closed or that are closing. 2776 * @param abort Whether we're running an abort. 2777 */ 2778 void closeUserRegions(final boolean abort) { 2779 this.lock.writeLock().lock(); 2780 try { 2781 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { 2782 HRegion r = e.getValue(); 2783 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2784 // Don't update zk with this close transition; pass false. 2785 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2786 } 2787 } 2788 } finally { 2789 this.lock.writeLock().unlock(); 2790 } 2791 } 2792 2793 /** @return the info server */ 2794 public InfoServer getInfoServer() { 2795 return infoServer; 2796 } 2797 2798 /** 2799 * @return true if a stop has been requested. 2800 */ 2801 @Override 2802 public boolean isStopped() { 2803 return this.stopped; 2804 } 2805 2806 @Override 2807 public boolean isStopping() { 2808 return this.stopping; 2809 } 2810 2811 /** 2812 * 2813 * @return the configuration 2814 */ 2815 @Override 2816 public Configuration getConfiguration() { 2817 return conf; 2818 } 2819 2820 /** @return the write lock for the server */ 2821 ReentrantReadWriteLock.WriteLock getWriteLock() { 2822 return lock.writeLock(); 2823 } 2824 2825 public int getNumberOfOnlineRegions() { 2826 return this.onlineRegions.size(); 2827 } 2828 2829 boolean isOnlineRegionsEmpty() { 2830 return this.onlineRegions.isEmpty(); 2831 } 2832 2833 /** 2834 * For tests, web ui and metrics. 2835 * This method will only work if HRegionServer is in the same JVM as client; 2836 * HRegion cannot be serialized to cross an rpc. 2837 */ 2838 public Collection<HRegion> getOnlineRegionsLocalContext() { 2839 Collection<HRegion> regions = this.onlineRegions.values(); 2840 return Collections.unmodifiableCollection(regions); 2841 } 2842 2843 @Override 2844 public void addRegion(HRegion region) { 2845 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2846 configurationManager.registerObserver(region); 2847 } 2848 2849 /** 2850 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2851 * the biggest. If two regions are the same size, then the last one found wins; i.e. this 2852 * method may NOT return all regions. 2853 */ 2854 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2855 // we'll sort the regions in reverse 2856 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>( 2857 new Comparator<Long>() { 2858 @Override 2859 public int compare(Long a, Long b) { 2860 return -1 * a.compareTo(b); 2861 } 2862 }); 2863 // Copy over all regions. Regions are sorted by size with biggest first. 2864 for (HRegion region : this.onlineRegions.values()) { 2865 sortedRegions.put(region.getMemStoreOffHeapSize(), region); 2866 } 2867 return sortedRegions; 2868 } 2869 2870 /** 2871 * @return A new Map of online regions sorted by region heap size with the first entry being the 2872 * biggest. If two regions are the same size, then the last one found wins; i.e. this method 2873 * may NOT return all regions. 2874 */ 2875 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2876 // we'll sort the regions in reverse 2877 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>( 2878 new Comparator<Long>() { 2879 @Override 2880 public int compare(Long a, Long b) { 2881 return -1 * a.compareTo(b); 2882 } 2883 }); 2884 // Copy over all regions. Regions are sorted by size with biggest first. 2885 for (HRegion region : this.onlineRegions.values()) { 2886 sortedRegions.put(region.getMemStoreHeapSize(), region); 2887 } 2888 return sortedRegions; 2889 } 2890 2891 /** 2892 * @return time stamp in millis of when this region server was started 2893 */ 2894 public long getStartcode() { 2895 return this.startcode; 2896 } 2897 2898 /** @return reference to FlushRequester */ 2899 @Override 2900 public FlushRequester getFlushRequester() { 2901 return this.cacheFlusher; 2902 } 2903 2904 @Override 2905 public CompactionRequester getCompactionRequestor() { 2906 return this.compactSplitThread; 2907 } 2908 2909 @Override 2910 public Leases getLeases() { 2911 return leases; 2912 } 2913 2914 /** 2915 * @return Return the rootDir. 2916 */ 2917 protected Path getRootDir() { 2918 return rootDir; 2919 } 2920 2921 /** 2922 * @return Return the fs. 2923 */ 2924 @Override 2925 public FileSystem getFileSystem() { 2926 return fs; 2927 } 2928 2929 /** 2930 * @return Return the walRootDir. 2931 */ 2932 public Path getWALRootDir() { 2933 return walRootDir; 2934 } 2935 2936 /** 2937 * @return Return the walFs. 2938 */ 2939 public FileSystem getWALFileSystem() { 2940 return walFs; 2941 } 2942 2943 @Override 2944 public String toString() { 2945 return getServerName().toString(); 2946 } 2947 2948 /** 2949 * Interval at which threads should run 2950 * 2951 * @return the interval 2952 */ 2953 public int getThreadWakeFrequency() { 2954 return threadWakeFrequency; 2955 } 2956 2957 @Override 2958 public ZKWatcher getZooKeeper() { 2959 return zooKeeper; 2960 } 2961 2962 @Override 2963 public CoordinatedStateManager getCoordinatedStateManager() { 2964 return csm; 2965 } 2966 2967 @Override 2968 public ServerName getServerName() { 2969 return serverName; 2970 } 2971 2972 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){ 2973 return this.rsHost; 2974 } 2975 2976 @Override 2977 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2978 return this.regionsInTransitionInRS; 2979 } 2980 2981 @Override 2982 public ExecutorService getExecutorService() { 2983 return executorService; 2984 } 2985 2986 @Override 2987 public ChoreService getChoreService() { 2988 return choreService; 2989 } 2990 2991 @Override 2992 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 2993 return rsQuotaManager; 2994 } 2995 2996 // 2997 // Main program and support routines 2998 // 2999 /** 3000 * Load the replication executorService objects, if any 3001 */ 3002 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 3003 FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { 3004 if ((server instanceof HMaster) && 3005 (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { 3006 return; 3007 } 3008 3009 // read in the name of the source replication class from the config file. 3010 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 3011 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 3012 3013 // read in the name of the sink replication class from the config file. 3014 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 3015 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 3016 3017 // If both the sink and the source class names are the same, then instantiate 3018 // only one object. 3019 if (sourceClassname.equals(sinkClassname)) { 3020 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3021 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 3022 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 3023 } else { 3024 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3025 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 3026 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 3027 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); 3028 } 3029 } 3030 3031 private static <T extends ReplicationService> T newReplicationInstance(String classname, 3032 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 3033 Path oldLogDir, WALProvider walProvider) throws IOException { 3034 Class<? extends T> clazz = null; 3035 try { 3036 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 3037 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 3038 } catch (java.lang.ClassNotFoundException nfe) { 3039 throw new IOException("Could not find class for " + classname); 3040 } 3041 T service = ReflectionUtils.newInstance(clazz, conf); 3042 service.initialize(server, walFs, logDir, oldLogDir, walProvider); 3043 return service; 3044 } 3045 3046 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){ 3047 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 3048 if(!this.isOnline()){ 3049 return walGroupsReplicationStatus; 3050 } 3051 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 3052 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 3053 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 3054 for(ReplicationSourceInterface source: allSources){ 3055 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 3056 } 3057 return walGroupsReplicationStatus; 3058 } 3059 3060 /** 3061 * Utility for constructing an instance of the passed HRegionServer class. 3062 * 3063 * @param regionServerClass 3064 * @param conf2 3065 * @return HRegionServer instance. 3066 */ 3067 public static HRegionServer constructRegionServer( 3068 Class<? extends HRegionServer> regionServerClass, 3069 final Configuration conf2) { 3070 try { 3071 Constructor<? extends HRegionServer> c = regionServerClass 3072 .getConstructor(Configuration.class); 3073 return c.newInstance(conf2); 3074 } catch (Exception e) { 3075 throw new RuntimeException("Failed construction of " + "Regionserver: " 3076 + regionServerClass.toString(), e); 3077 } 3078 } 3079 3080 /** 3081 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 3082 */ 3083 public static void main(String[] args) throws Exception { 3084 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 3085 VersionInfo.logVersion(); 3086 Configuration conf = HBaseConfiguration.create(); 3087 @SuppressWarnings("unchecked") 3088 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 3089 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 3090 3091 new HRegionServerCommandLine(regionServerClass).doMain(args); 3092 } 3093 3094 /** 3095 * Gets the online regions of the specified table. 3096 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>. 3097 * Only returns <em>online</em> regions. If a region on this table has been 3098 * closed during a disable, etc., it will not be included in the returned list. 3099 * So, the returned list may not necessarily be ALL regions in this table, its 3100 * all the ONLINE regions in the table. 3101 * @param tableName 3102 * @return Online regions from <code>tableName</code> 3103 */ 3104 @Override 3105 public List<HRegion> getRegions(TableName tableName) { 3106 List<HRegion> tableRegions = new ArrayList<>(); 3107 synchronized (this.onlineRegions) { 3108 for (HRegion region: this.onlineRegions.values()) { 3109 RegionInfo regionInfo = region.getRegionInfo(); 3110 if(regionInfo.getTable().equals(tableName)) { 3111 tableRegions.add(region); 3112 } 3113 } 3114 } 3115 return tableRegions; 3116 } 3117 3118 @Override 3119 public List<HRegion> getRegions() { 3120 List<HRegion> allRegions = new ArrayList<>(); 3121 synchronized (this.onlineRegions) { 3122 // Return a clone copy of the onlineRegions 3123 allRegions.addAll(onlineRegions.values()); 3124 } 3125 return allRegions; 3126 } 3127 3128 /** 3129 * Gets the online tables in this RS. 3130 * This method looks at the in-memory onlineRegions. 3131 * @return all the online tables in this RS 3132 */ 3133 public Set<TableName> getOnlineTables() { 3134 Set<TableName> tables = new HashSet<>(); 3135 synchronized (this.onlineRegions) { 3136 for (Region region: this.onlineRegions.values()) { 3137 tables.add(region.getTableDescriptor().getTableName()); 3138 } 3139 } 3140 return tables; 3141 } 3142 3143 // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). 3144 public String[] getRegionServerCoprocessors() { 3145 TreeSet<String> coprocessors = new TreeSet<>(); 3146 try { 3147 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3148 } catch (IOException exception) { 3149 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " + 3150 "skipping."); 3151 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3152 } 3153 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3154 for (HRegion region: regions) { 3155 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3156 try { 3157 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3158 } catch (IOException exception) { 3159 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region + 3160 "; skipping."); 3161 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3162 } 3163 } 3164 coprocessors.addAll(rsHost.getCoprocessors()); 3165 return coprocessors.toArray(new String[coprocessors.size()]); 3166 } 3167 3168 /** 3169 * Try to close the region, logs a warning on failure but continues. 3170 * @param region Region to close 3171 */ 3172 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3173 try { 3174 if (!closeRegion(region.getEncodedName(), abort, null)) { 3175 LOG.warn("Failed to close " + region.getRegionNameAsString() + 3176 " - ignoring and continuing"); 3177 } 3178 } catch (IOException e) { 3179 LOG.warn("Failed to close " + region.getRegionNameAsString() + 3180 " - ignoring and continuing", e); 3181 } 3182 } 3183 3184 /** 3185 * Close asynchronously a region, can be called from the master or internally by the regionserver 3186 * when stopping. If called from the master, the region will update the znode status. 3187 * 3188 * <p> 3189 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3190 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3191 * </p> 3192 3193 * <p> 3194 * If a close was in progress, this new request will be ignored, and an exception thrown. 3195 * </p> 3196 * 3197 * @param encodedName Region to close 3198 * @param abort True if we are aborting 3199 * @return True if closed a region. 3200 * @throws NotServingRegionException if the region is not online 3201 */ 3202 protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn) 3203 throws NotServingRegionException { 3204 //Check for permissions to close. 3205 HRegion actualRegion = this.getRegion(encodedName); 3206 // Can be null if we're calling close on a region that's not online 3207 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3208 try { 3209 actualRegion.getCoprocessorHost().preClose(false); 3210 } catch (IOException exp) { 3211 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3212 return false; 3213 } 3214 } 3215 3216 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), 3217 Boolean.FALSE); 3218 3219 if (Boolean.TRUE.equals(previous)) { 3220 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " + 3221 "trying to OPEN. Cancelling OPENING."); 3222 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3223 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3224 // We're going to try to do a standard close then. 3225 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." + 3226 " Doing a standard close now"); 3227 return closeRegion(encodedName, abort, sn); 3228 } 3229 // Let's get the region from the online region list again 3230 actualRegion = this.getRegion(encodedName); 3231 if (actualRegion == null) { // If already online, we still need to close it. 3232 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3233 // The master deletes the znode when it receives this exception. 3234 throw new NotServingRegionException("The region " + encodedName + 3235 " was opening but not yet served. Opening is cancelled."); 3236 } 3237 } else if (Boolean.FALSE.equals(previous)) { 3238 LOG.info("Received CLOSE for the region: " + encodedName + 3239 ", which we are already trying to CLOSE, but not completed yet"); 3240 return true; 3241 } 3242 3243 if (actualRegion == null) { 3244 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3245 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3246 // The master deletes the znode when it receives this exception. 3247 throw new NotServingRegionException("The region " + encodedName + 3248 " is not online, and is not opening."); 3249 } 3250 3251 CloseRegionHandler crh; 3252 final RegionInfo hri = actualRegion.getRegionInfo(); 3253 if (hri.isMetaRegion()) { 3254 crh = new CloseMetaHandler(this, this, hri, abort); 3255 } else { 3256 crh = new CloseRegionHandler(this, this, hri, abort, sn); 3257 } 3258 this.executorService.submit(crh); 3259 return true; 3260 } 3261 3262 /** 3263 * Close and offline the region for split or merge 3264 * 3265 * @param regionEncodedName the name of the region(s) to close 3266 * @return true if closed the region successfully. 3267 * @throws IOException 3268 */ 3269 protected boolean closeAndOfflineRegionForSplitOrMerge(final List<String> regionEncodedName) 3270 throws IOException { 3271 for (int i = 0; i < regionEncodedName.size(); ++i) { 3272 HRegion regionToClose = this.getRegion(regionEncodedName.get(i)); 3273 if (regionToClose != null) { 3274 Map<byte[], List<HStoreFile>> hstoreFiles = null; 3275 Exception exceptionToThrow = null; 3276 try { 3277 hstoreFiles = regionToClose.close(false); 3278 } catch (Exception e) { 3279 exceptionToThrow = e; 3280 } 3281 if (exceptionToThrow == null && hstoreFiles == null) { 3282 // The region was closed by someone else 3283 exceptionToThrow = 3284 new IOException("Failed to close region: already closed by another thread"); 3285 } 3286 if (exceptionToThrow != null) { 3287 if (exceptionToThrow instanceof IOException) { 3288 throw (IOException) exceptionToThrow; 3289 } 3290 throw new IOException(exceptionToThrow); 3291 } 3292 // Offline the region 3293 this.removeRegion(regionToClose, null); 3294 } 3295 } 3296 return true; 3297 } 3298 3299 /** 3300 * @return HRegion for the passed binary <code>regionName</code> or null if 3301 * named region is not member of the online regions. 3302 */ 3303 public HRegion getOnlineRegion(final byte[] regionName) { 3304 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3305 return this.onlineRegions.get(encodedRegionName); 3306 } 3307 3308 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) { 3309 return this.regionFavoredNodesMap.get(encodedRegionName); 3310 } 3311 3312 @Override 3313 public HRegion getRegion(final String encodedRegionName) { 3314 return this.onlineRegions.get(encodedRegionName); 3315 } 3316 3317 3318 @Override 3319 public boolean removeRegion(final HRegion r, ServerName destination) { 3320 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3321 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3322 if (destination != null) { 3323 long closeSeqNum = r.getMaxFlushedSeqId(); 3324 if (closeSeqNum == HConstants.NO_SEQNUM) { 3325 // No edits in WAL for this region; get the sequence number when the region was opened. 3326 closeSeqNum = r.getOpenSeqNum(); 3327 if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0; 3328 } 3329 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum); 3330 } 3331 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3332 return toReturn != null; 3333 } 3334 3335 /** 3336 * Protected Utility method for safely obtaining an HRegion handle. 3337 * 3338 * @param regionName 3339 * Name of online {@link HRegion} to return 3340 * @return {@link HRegion} for <code>regionName</code> 3341 * @throws NotServingRegionException 3342 */ 3343 protected HRegion getRegion(final byte[] regionName) 3344 throws NotServingRegionException { 3345 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3346 return getRegionByEncodedName(regionName, encodedRegionName); 3347 } 3348 3349 public HRegion getRegionByEncodedName(String encodedRegionName) 3350 throws NotServingRegionException { 3351 return getRegionByEncodedName(null, encodedRegionName); 3352 } 3353 3354 protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3355 throws NotServingRegionException { 3356 HRegion region = this.onlineRegions.get(encodedRegionName); 3357 if (region == null) { 3358 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3359 if (moveInfo != null) { 3360 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3361 } 3362 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3363 String regionNameStr = regionName == null? 3364 encodedRegionName: Bytes.toStringBinary(regionName); 3365 if (isOpening != null && isOpening.booleanValue()) { 3366 throw new RegionOpeningException("Region " + regionNameStr + 3367 " is opening on " + this.serverName); 3368 } 3369 throw new NotServingRegionException("" + regionNameStr + 3370 " is not online on " + this.serverName); 3371 } 3372 return region; 3373 } 3374 3375 /* 3376 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to 3377 * IOE if it isn't already. 3378 * 3379 * @param t Throwable 3380 * 3381 * @param msg Message to log in error. Can be null. 3382 * 3383 * @return Throwable converted to an IOE; methods can only let out IOEs. 3384 */ 3385 private Throwable cleanup(final Throwable t, final String msg) { 3386 // Don't log as error if NSRE; NSRE is 'normal' operation. 3387 if (t instanceof NotServingRegionException) { 3388 LOG.debug("NotServingRegionException; " + t.getMessage()); 3389 return t; 3390 } 3391 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3392 if (msg == null) { 3393 LOG.error("", e); 3394 } else { 3395 LOG.error(msg, e); 3396 } 3397 if (!rpcServices.checkOOME(t)) { 3398 checkFileSystem(); 3399 } 3400 return t; 3401 } 3402 3403 /* 3404 * @param t 3405 * 3406 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3407 * 3408 * @return Make <code>t</code> an IOE if it isn't already. 3409 */ 3410 protected IOException convertThrowableToIOE(final Throwable t, final String msg) { 3411 return (t instanceof IOException ? (IOException) t : msg == null 3412 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t)); 3413 } 3414 3415 /** 3416 * Checks to see if the file system is still accessible. If not, sets 3417 * abortRequested and stopRequested 3418 * 3419 * @return false if file system is not available 3420 */ 3421 public boolean checkFileSystem() { 3422 if (this.fsOk && this.fs != null) { 3423 try { 3424 FSUtils.checkFileSystemAvailable(this.fs); 3425 } catch (IOException e) { 3426 abort("File System not available", e); 3427 this.fsOk = false; 3428 } 3429 } 3430 return this.fsOk; 3431 } 3432 3433 @Override 3434 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3435 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3436 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()]; 3437 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3438 // it is a map of region name to InetSocketAddress[] 3439 for (int i = 0; i < favoredNodes.size(); i++) { 3440 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(), 3441 favoredNodes.get(i).getPort()); 3442 } 3443 regionFavoredNodesMap.put(encodedRegionName, addr); 3444 } 3445 3446 /** 3447 * Return the favored nodes for a region given its encoded name. Look at the 3448 * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[] 3449 * @param encodedRegionName 3450 * @return array of favored locations 3451 */ 3452 @Override 3453 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3454 return regionFavoredNodesMap.get(encodedRegionName); 3455 } 3456 3457 @Override 3458 public ServerNonceManager getNonceManager() { 3459 return this.nonceManager; 3460 } 3461 3462 private static class MovedRegionInfo { 3463 private final ServerName serverName; 3464 private final long seqNum; 3465 private final long ts; 3466 3467 public MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3468 this.serverName = serverName; 3469 this.seqNum = closeSeqNum; 3470 ts = EnvironmentEdgeManager.currentTime(); 3471 } 3472 3473 public ServerName getServerName() { 3474 return serverName; 3475 } 3476 3477 public long getSeqNum() { 3478 return seqNum; 3479 } 3480 3481 public long getMoveTime() { 3482 return ts; 3483 } 3484 } 3485 3486 // This map will contains all the regions that we closed for a move. 3487 // We add the time it was moved as we don't want to keep too old information 3488 protected Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000); 3489 3490 // We need a timeout. If not there is a risk of giving a wrong information: this would double 3491 // the number of network calls instead of reducing them. 3492 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3493 3494 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) { 3495 if (ServerName.isSameAddress(destination, this.getServerName())) { 3496 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3497 return; 3498 } 3499 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" + 3500 closeSeqNum); 3501 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3502 } 3503 3504 void removeFromMovedRegions(String encodedName) { 3505 movedRegions.remove(encodedName); 3506 } 3507 3508 private MovedRegionInfo getMovedRegion(final String encodedRegionName) { 3509 MovedRegionInfo dest = movedRegions.get(encodedRegionName); 3510 3511 long now = EnvironmentEdgeManager.currentTime(); 3512 if (dest != null) { 3513 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) { 3514 return dest; 3515 } else { 3516 movedRegions.remove(encodedRegionName); 3517 } 3518 } 3519 3520 return null; 3521 } 3522 3523 /** 3524 * Remove the expired entries from the moved regions list. 3525 */ 3526 protected void cleanMovedRegions() { 3527 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED; 3528 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator(); 3529 3530 while (it.hasNext()){ 3531 Map.Entry<String, MovedRegionInfo> e = it.next(); 3532 if (e.getValue().getMoveTime() < cutOff) { 3533 it.remove(); 3534 } 3535 } 3536 } 3537 3538 /* 3539 * Use this to allow tests to override and schedule more frequently. 3540 */ 3541 3542 protected int movedRegionCleanerPeriod() { 3543 return TIMEOUT_REGION_MOVED; 3544 } 3545 3546 /** 3547 * Creates a Chore thread to clean the moved region cache. 3548 */ 3549 protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable { 3550 private HRegionServer regionServer; 3551 Stoppable stoppable; 3552 3553 private MovedRegionsCleaner( 3554 HRegionServer regionServer, Stoppable stoppable){ 3555 super("MovedRegionsCleaner for region " + regionServer, stoppable, 3556 regionServer.movedRegionCleanerPeriod()); 3557 this.regionServer = regionServer; 3558 this.stoppable = stoppable; 3559 } 3560 3561 static MovedRegionsCleaner create(HRegionServer rs){ 3562 Stoppable stoppable = new Stoppable() { 3563 private volatile boolean isStopped = false; 3564 @Override public void stop(String why) { isStopped = true;} 3565 @Override public boolean isStopped() {return isStopped;} 3566 }; 3567 3568 return new MovedRegionsCleaner(rs, stoppable); 3569 } 3570 3571 @Override 3572 protected void chore() { 3573 regionServer.cleanMovedRegions(); 3574 } 3575 3576 @Override 3577 public void stop(String why) { 3578 stoppable.stop(why); 3579 } 3580 3581 @Override 3582 public boolean isStopped() { 3583 return stoppable.isStopped(); 3584 } 3585 } 3586 3587 private String getMyEphemeralNodePath() { 3588 return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString()); 3589 } 3590 3591 private boolean isHealthCheckerConfigured() { 3592 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3593 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3594 } 3595 3596 /** 3597 * @return the underlying {@link CompactSplit} for the servers 3598 */ 3599 public CompactSplit getCompactSplitThread() { 3600 return this.compactSplitThread; 3601 } 3602 3603 public CoprocessorServiceResponse execRegionServerService( 3604 @SuppressWarnings("UnusedParameters") final RpcController controller, 3605 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3606 try { 3607 ServerRpcController serviceController = new ServerRpcController(); 3608 CoprocessorServiceCall call = serviceRequest.getCall(); 3609 String serviceName = call.getServiceName(); 3610 com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName); 3611 if (service == null) { 3612 throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " + 3613 serviceName); 3614 } 3615 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 3616 service.getDescriptorForType(); 3617 3618 String methodName = call.getMethodName(); 3619 com.google.protobuf.Descriptors.MethodDescriptor methodDesc = 3620 serviceDesc.findMethodByName(methodName); 3621 if (methodDesc == null) { 3622 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName + 3623 " called on executorService " + serviceName); 3624 } 3625 3626 com.google.protobuf.Message request = 3627 CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3628 final com.google.protobuf.Message.Builder responseBuilder = 3629 service.getResponsePrototype(methodDesc).newBuilderForType(); 3630 service.callMethod(methodDesc, serviceController, request, 3631 new com.google.protobuf.RpcCallback<com.google.protobuf.Message>() { 3632 @Override 3633 public void run(com.google.protobuf.Message message) { 3634 if (message != null) { 3635 responseBuilder.mergeFrom(message); 3636 } 3637 } 3638 }); 3639 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3640 if (exception != null) { 3641 throw exception; 3642 } 3643 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3644 } catch (IOException ie) { 3645 throw new ServiceException(ie); 3646 } 3647 } 3648 3649 /** 3650 * May be null if this is a master which not carry table. 3651 * 3652 * @return The block cache instance used by the regionserver. 3653 */ 3654 @Override 3655 public Optional<BlockCache> getBlockCache() { 3656 return Optional.ofNullable(this.blockCache); 3657 } 3658 3659 /** 3660 * May be null if this is a master which not carry table. 3661 * 3662 * @return The cache for mob files used by the regionserver. 3663 */ 3664 @Override 3665 public Optional<MobFileCache> getMobFileCache() { 3666 return Optional.ofNullable(this.mobFileCache); 3667 } 3668 3669 /** 3670 * @return : Returns the ConfigurationManager object for testing purposes. 3671 */ 3672 protected ConfigurationManager getConfigurationManager() { 3673 return configurationManager; 3674 } 3675 3676 /** 3677 * @return Return table descriptors implementation. 3678 */ 3679 @Override 3680 public TableDescriptors getTableDescriptors() { 3681 return this.tableDescriptors; 3682 } 3683 3684 /** 3685 * Reload the configuration from disk. 3686 */ 3687 public void updateConfiguration() { 3688 LOG.info("Reloading the configuration from disk."); 3689 // Reload the configuration from disk. 3690 conf.reloadConfiguration(); 3691 configurationManager.notifyAllObservers(conf); 3692 } 3693 3694 public CacheEvictionStats clearRegionBlockCache(Region region) { 3695 long evictedBlocks = 0; 3696 3697 for(Store store : region.getStores()) { 3698 for(StoreFile hFile : store.getStorefiles()) { 3699 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3700 } 3701 } 3702 3703 return CacheEvictionStats.builder() 3704 .withEvictedBlocks(evictedBlocks) 3705 .build(); 3706 } 3707 3708 @Override 3709 public double getCompactionPressure() { 3710 double max = 0; 3711 for (Region region : onlineRegions.values()) { 3712 for (Store store : region.getStores()) { 3713 double normCount = store.getCompactionPressure(); 3714 if (normCount > max) { 3715 max = normCount; 3716 } 3717 } 3718 } 3719 return max; 3720 } 3721 3722 @Override 3723 public HeapMemoryManager getHeapMemoryManager() { 3724 return hMemManager; 3725 } 3726 3727 /** 3728 * For testing 3729 * @return whether all wal roll request finished for this regionserver 3730 */ 3731 @VisibleForTesting 3732 public boolean walRollRequestFinished() { 3733 return this.walRoller.walRollFinished(); 3734 } 3735 3736 @Override 3737 public ThroughputController getFlushThroughputController() { 3738 return flushThroughputController; 3739 } 3740 3741 @Override 3742 public double getFlushPressure() { 3743 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3744 // return 0 during RS initialization 3745 return 0.0; 3746 } 3747 return getRegionServerAccounting().getFlushPressure(); 3748 } 3749 3750 @Override 3751 public void onConfigurationChange(Configuration newConf) { 3752 ThroughputController old = this.flushThroughputController; 3753 if (old != null) { 3754 old.stop("configuration change"); 3755 } 3756 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3757 try { 3758 Superusers.initialize(newConf); 3759 } catch (IOException e) { 3760 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3761 } 3762 } 3763 3764 @Override 3765 public MetricsRegionServer getMetrics() { 3766 return metricsRegionServer; 3767 } 3768 3769 @Override 3770 public SecureBulkLoadManager getSecureBulkLoadManager() { 3771 return this.secureBulkLoadManager; 3772 } 3773 3774 @Override 3775 public EntityLock regionLock(List<RegionInfo> regionInfos, String description, 3776 Abortable abort) throws IOException { 3777 return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) 3778 .regionLock(regionInfos, description, abort); 3779 } 3780 3781 @Override 3782 public void unassign(byte[] regionName) throws IOException { 3783 clusterConnection.getAdmin().unassign(regionName, false); 3784 } 3785 3786 @Override 3787 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3788 return this.rsSpaceQuotaManager; 3789 } 3790 3791 @Override 3792 public boolean reportFileArchivalForQuotas(TableName tableName, 3793 Collection<Entry<String, Long>> archivedFiles) { 3794 RegionServerStatusService.BlockingInterface rss = rssStub; 3795 if (rss == null || rsSpaceQuotaManager == null) { 3796 // the current server could be stopping. 3797 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3798 return false; 3799 } 3800 try { 3801 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3802 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3803 rss.reportFileArchival(null, request); 3804 } catch (ServiceException se) { 3805 IOException ioe = ProtobufUtil.getRemoteException(se); 3806 if (ioe instanceof PleaseHoldException) { 3807 if (LOG.isTraceEnabled()) { 3808 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3809 + " This will be retried.", ioe); 3810 } 3811 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3812 return false; 3813 } 3814 if (rssStub == rss) { 3815 rssStub = null; 3816 } 3817 // re-create the stub if we failed to report the archival 3818 createRegionServerStatusStub(true); 3819 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3820 return false; 3821 } 3822 return true; 3823 } 3824 3825 public NettyEventLoopGroupConfig getEventLoopGroupConfig() { 3826 return eventLoopGroupConfig; 3827 } 3828 3829 @Override 3830 public Connection createConnection(Configuration conf) throws IOException { 3831 User user = UserProvider.instantiate(conf).getCurrent(); 3832 return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, 3833 this.rpcServices, this.rpcServices); 3834 } 3835 3836 public void executeProcedure(long procId, RSProcedureCallable callable) { 3837 executorService.submit(new RSProcedureHandler(this, procId, callable)); 3838 } 3839 3840 public void remoteProcedureComplete(long procId, Throwable error) { 3841 procedureResultReporter.complete(procId, error); 3842 } 3843 3844 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3845 RegionServerStatusService.BlockingInterface rss = rssStub; 3846 for (;;) { 3847 rss = rssStub; 3848 if (rss != null) { 3849 break; 3850 } 3851 createRegionServerStatusStub(); 3852 } 3853 try { 3854 rss.reportProcedureDone(null, request); 3855 } catch (ServiceException se) { 3856 if (rssStub == rss) { 3857 rssStub = null; 3858 } 3859 throw ProtobufUtil.getRemoteException(se); 3860 } 3861 } 3862 3863 /** 3864 * Will ignore the open/close region procedures which already submitted or executed. 3865 * 3866 * When master had unfinished open/close region procedure and restarted, new active master may 3867 * send duplicate open/close region request to regionserver. The open/close request is submitted 3868 * to a thread pool and execute. So first need a cache for submitted open/close region procedures. 3869 * 3870 * After the open/close region request executed and report region transition succeed, cache it in 3871 * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3872 * transition succeed, master will not send the open/close region request to regionserver again. 3873 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3874 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. 3875 * 3876 * See HBASE-22404 for more details. 3877 * 3878 * @param procId the id of the open/close region procedure 3879 * @return true if the procedure can be submitted. 3880 */ 3881 boolean submitRegionProcedure(long procId) { 3882 if (procId == -1) { 3883 return true; 3884 } 3885 // Ignore the region procedures which already submitted. 3886 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3887 if (previous != null) { 3888 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3889 return false; 3890 } 3891 // Ignore the region procedures which already executed. 3892 if (executedRegionProcedures.getIfPresent(procId) != null) { 3893 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3894 return false; 3895 } 3896 return true; 3897 } 3898 3899 /** 3900 * See {@link #submitRegionProcedure(long)}. 3901 * @param procId the id of the open/close region procedure 3902 */ 3903 public void finishRegionProcedure(long procId) { 3904 executedRegionProcedures.put(procId, procId); 3905 submittedRegionProcedures.remove(procId); 3906 } 3907 3908 public boolean isShutDown() { 3909 return shutDown; 3910 } 3911 3912 /** 3913 * Force to terminate region server when abort timeout. 3914 */ 3915 private static class SystemExitWhenAbortTimeout extends TimerTask { 3916 3917 public SystemExitWhenAbortTimeout() { 3918 } 3919 3920 @Override 3921 public void run() { 3922 LOG.warn("Aborting region server timed out, terminating forcibly" + 3923 " and does not wait for any running shutdown hooks or finalizers to finish their work." + 3924 " Thread dump to stdout."); 3925 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3926 Runtime.getRuntime().halt(1); 3927 } 3928 } 3929 3930 @VisibleForTesting 3931 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 3932 return compactedFileDischarger; 3933 } 3934}