001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT; 026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; 027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT; 028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY; 029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT; 030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; 031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; 032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; 033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; 034 035import io.opentelemetry.api.trace.Span; 036import io.opentelemetry.api.trace.StatusCode; 037import io.opentelemetry.context.Scope; 038import java.io.IOException; 039import java.io.PrintWriter; 040import java.lang.management.MemoryUsage; 041import java.lang.reflect.Constructor; 042import java.net.InetSocketAddress; 043import java.time.Duration; 044import java.util.ArrayList; 045import java.util.Collection; 046import java.util.Collections; 047import java.util.Comparator; 048import java.util.HashSet; 049import java.util.Iterator; 050import java.util.List; 051import java.util.Map; 052import java.util.Map.Entry; 053import java.util.Objects; 054import java.util.Optional; 055import java.util.Set; 056import java.util.SortedMap; 057import java.util.Timer; 058import java.util.TimerTask; 059import java.util.TreeMap; 060import java.util.TreeSet; 061import java.util.concurrent.ConcurrentHashMap; 062import java.util.concurrent.ConcurrentMap; 063import java.util.concurrent.ConcurrentSkipListMap; 064import java.util.concurrent.ThreadLocalRandom; 065import java.util.concurrent.TimeUnit; 066import java.util.concurrent.atomic.AtomicBoolean; 067import java.util.concurrent.locks.ReentrantReadWriteLock; 068import java.util.stream.Collectors; 069import javax.management.MalformedObjectNameException; 070import javax.servlet.http.HttpServlet; 071import org.apache.commons.lang3.StringUtils; 072import org.apache.commons.lang3.mutable.MutableFloat; 073import org.apache.hadoop.conf.Configuration; 074import org.apache.hadoop.fs.FileSystem; 075import org.apache.hadoop.fs.Path; 076import org.apache.hadoop.hbase.Abortable; 077import org.apache.hadoop.hbase.CacheEvictionStats; 078import org.apache.hadoop.hbase.CallQueueTooBigException; 079import org.apache.hadoop.hbase.ClockOutOfSyncException; 080import org.apache.hadoop.hbase.DoNotRetryIOException; 081import org.apache.hadoop.hbase.ExecutorStatusChore; 082import org.apache.hadoop.hbase.HBaseConfiguration; 083import org.apache.hadoop.hbase.HBaseInterfaceAudience; 084import org.apache.hadoop.hbase.HBaseServerBase; 085import org.apache.hadoop.hbase.HConstants; 086import org.apache.hadoop.hbase.HDFSBlocksDistribution; 087import org.apache.hadoop.hbase.HRegionLocation; 088import org.apache.hadoop.hbase.HealthCheckChore; 089import org.apache.hadoop.hbase.MetaTableAccessor; 090import org.apache.hadoop.hbase.NotServingRegionException; 091import org.apache.hadoop.hbase.PleaseHoldException; 092import org.apache.hadoop.hbase.ScheduledChore; 093import org.apache.hadoop.hbase.ServerName; 094import org.apache.hadoop.hbase.Stoppable; 095import org.apache.hadoop.hbase.TableName; 096import org.apache.hadoop.hbase.YouAreDeadException; 097import org.apache.hadoop.hbase.ZNodeClearer; 098import org.apache.hadoop.hbase.client.ConnectionUtils; 099import org.apache.hadoop.hbase.client.RegionInfo; 100import org.apache.hadoop.hbase.client.RegionInfoBuilder; 101import org.apache.hadoop.hbase.client.locking.EntityLock; 102import org.apache.hadoop.hbase.client.locking.LockServiceClient; 103import org.apache.hadoop.hbase.conf.ConfigurationObserver; 104import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 105import org.apache.hadoop.hbase.exceptions.RegionMovedException; 106import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 107import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 108import org.apache.hadoop.hbase.executor.ExecutorType; 109import org.apache.hadoop.hbase.http.InfoServer; 110import org.apache.hadoop.hbase.io.hfile.BlockCache; 111import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 112import org.apache.hadoop.hbase.io.hfile.HFile; 113import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 114import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 115import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException; 116import org.apache.hadoop.hbase.ipc.RpcClient; 117import org.apache.hadoop.hbase.ipc.RpcServer; 118import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 119import org.apache.hadoop.hbase.ipc.ServerRpcController; 120import org.apache.hadoop.hbase.log.HBaseMarkers; 121import org.apache.hadoop.hbase.mob.MobFileCache; 122import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; 123import org.apache.hadoop.hbase.monitoring.TaskMonitor; 124import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 125import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore; 126import org.apache.hadoop.hbase.net.Address; 127import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 128import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 129import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 130import org.apache.hadoop.hbase.quotas.QuotaUtil; 131import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 132import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 133import org.apache.hadoop.hbase.quotas.RegionSize; 134import org.apache.hadoop.hbase.quotas.RegionSizeStore; 135import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 136import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 137import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 138import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 139import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 140import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 141import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 142import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 143import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet; 144import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; 145import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; 146import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 147import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 148import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 149import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; 150import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 151import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; 152import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 153import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 154import org.apache.hadoop.hbase.security.SecurityConstants; 155import org.apache.hadoop.hbase.security.Superusers; 156import org.apache.hadoop.hbase.security.User; 157import org.apache.hadoop.hbase.security.UserProvider; 158import org.apache.hadoop.hbase.trace.TraceUtil; 159import org.apache.hadoop.hbase.util.Bytes; 160import org.apache.hadoop.hbase.util.CompressionTest; 161import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 162import org.apache.hadoop.hbase.util.DNS; 163import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 164import org.apache.hadoop.hbase.util.FSUtils; 165import org.apache.hadoop.hbase.util.FutureUtils; 166import org.apache.hadoop.hbase.util.JvmPauseMonitor; 167import org.apache.hadoop.hbase.util.Pair; 168import org.apache.hadoop.hbase.util.RetryCounter; 169import org.apache.hadoop.hbase.util.RetryCounterFactory; 170import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 171import org.apache.hadoop.hbase.util.Threads; 172import org.apache.hadoop.hbase.util.VersionInfo; 173import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 174import org.apache.hadoop.hbase.wal.WAL; 175import org.apache.hadoop.hbase.wal.WALFactory; 176import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 177import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 178import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 179import org.apache.hadoop.hbase.zookeeper.ZKUtil; 180import org.apache.hadoop.ipc.RemoteException; 181import org.apache.hadoop.util.ReflectionUtils; 182import org.apache.yetus.audience.InterfaceAudience; 183import org.apache.zookeeper.KeeperException; 184import org.slf4j.Logger; 185import org.slf4j.LoggerFactory; 186 187import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 188import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 189import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 190import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 191import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 192import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses; 193import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 194import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 195import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 196import org.apache.hbase.thirdparty.com.google.protobuf.Message; 197import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 198import org.apache.hbase.thirdparty.com.google.protobuf.Service; 199import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 200import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 201import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 202 203import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 204import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 233 234/** 235 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There 236 * are many HRegionServers in a single HBase deployment. 237 */ 238@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 239@SuppressWarnings({ "deprecation" }) 240public class HRegionServer extends HBaseServerBase<RSRpcServices> 241 implements RegionServerServices, LastSequenceId { 242 243 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 244 245 int unitMB = 1024 * 1024; 246 int unitKB = 1024; 247 248 /** 249 * For testing only! Set to true to skip notifying region assignment to master . 250 */ 251 @InterfaceAudience.Private 252 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") 253 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 254 255 /** 256 * A map from RegionName to current action in progress. Boolean value indicates: true - if open 257 * region action in progress false - if close region action in progress 258 */ 259 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 260 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 261 262 /** 263 * Used to cache the open/close region procedures which already submitted. See 264 * {@link #submitRegionProcedure(long)}. 265 */ 266 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 267 /** 268 * Used to cache the open/close region procedures which already executed. See 269 * {@link #submitRegionProcedure(long)}. 270 */ 271 private final Cache<Long, Long> executedRegionProcedures = 272 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 273 274 /** 275 * Used to cache the moved-out regions 276 */ 277 private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder() 278 .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build(); 279 280 private MemStoreFlusher cacheFlusher; 281 282 private HeapMemoryManager hMemManager; 283 284 // Replication services. If no replication, this handler will be null. 285 private ReplicationSourceService replicationSourceHandler; 286 private ReplicationSinkService replicationSinkHandler; 287 private boolean sameReplicationSourceAndSink; 288 289 // Compactions 290 private CompactSplit compactSplitThread; 291 292 /** 293 * Map of regions currently being served by this region server. Key is the encoded region name. 294 * All access should be synchronized. 295 */ 296 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 297 /** 298 * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it 299 * need to be a ConcurrentHashMap? 300 */ 301 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 302 303 /** 304 * Map of encoded region names to the DataNode locations they should be hosted on We store the 305 * value as Address since InetSocketAddress is required by the HDFS API (create() that takes 306 * favored nodes as hints for placing file blocks). We could have used ServerName here as the 307 * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API 308 * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and 309 * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the 310 * conversion on demand from Address to InetSocketAddress will guarantee the resolution results 311 * will be fresh when we need it. 312 */ 313 private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 314 315 private LeaseManager leaseManager; 316 317 private volatile boolean dataFsOk; 318 319 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 320 // Default abort timeout is 1200 seconds for safe 321 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 322 // Will run this task when abort timeout 323 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 324 325 // A state before we go into stopped state. At this stage we're closing user 326 // space regions. 327 private boolean stopping = false; 328 private volatile boolean killed = false; 329 330 private final int threadWakeFrequency; 331 332 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 333 private final int compactionCheckFrequency; 334 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 335 private final int flushCheckFrequency; 336 337 // Stub to do region server status calls against the master. 338 private volatile RegionServerStatusService.BlockingInterface rssStub; 339 private volatile LockService.BlockingInterface lockStub; 340 // RPC client. Used to make the stub above that does region server status checking. 341 private RpcClient rpcClient; 342 343 private UncaughtExceptionHandler uncaughtExceptionHandler; 344 345 private JvmPauseMonitor pauseMonitor; 346 347 private RSSnapshotVerifier rsSnapshotVerifier; 348 349 /** region server process name */ 350 public static final String REGIONSERVER = "regionserver"; 351 352 private MetricsRegionServer metricsRegionServer; 353 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 354 355 /** 356 * Check for compactions requests. 357 */ 358 private ScheduledChore compactionChecker; 359 360 /** 361 * Check for flushes 362 */ 363 private ScheduledChore periodicFlusher; 364 365 private volatile WALFactory walFactory; 366 367 private LogRoller walRoller; 368 369 // A thread which calls reportProcedureDone 370 private RemoteProcedureResultReporter procedureResultReporter; 371 372 // flag set after we're done setting up server threads 373 final AtomicBoolean online = new AtomicBoolean(false); 374 375 // master address tracker 376 private final MasterAddressTracker masterAddressTracker; 377 378 // Log Splitting Worker 379 private SplitLogWorker splitLogWorker; 380 381 private final int shortOperationTimeout; 382 383 // Time to pause if master says 'please hold' 384 private final long retryPauseTime; 385 386 private final RegionServerAccounting regionServerAccounting; 387 388 private NamedQueueServiceChore namedQueueServiceChore = null; 389 390 // Block cache 391 private BlockCache blockCache; 392 // The cache for mob files 393 private MobFileCache mobFileCache; 394 395 /** The health check chore. */ 396 private HealthCheckChore healthCheckChore; 397 398 /** The Executor status collect chore. */ 399 private ExecutorStatusChore executorStatusChore; 400 401 /** The nonce manager chore. */ 402 private ScheduledChore nonceManagerChore; 403 404 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap(); 405 406 /** 407 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use 408 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead. 409 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a> 410 */ 411 @Deprecated 412 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 413 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 414 "hbase.regionserver.hostname.disable.master.reversedns"; 415 416 /** 417 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive. 418 * Exception will be thrown if both are used. 419 */ 420 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 421 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 422 "hbase.unsafe.regionserver.hostname.disable.master.reversedns"; 423 424 /** 425 * Unique identifier for the cluster we are a part of. 426 */ 427 private String clusterId; 428 429 // chore for refreshing store files for secondary regions 430 private StorefileRefresherChore storefileRefresher; 431 432 private volatile RegionServerCoprocessorHost rsHost; 433 434 private RegionServerProcedureManagerHost rspmHost; 435 436 private RegionServerRpcQuotaManager rsQuotaManager; 437 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 438 439 /** 440 * Nonce manager. Nonces are used to make operations like increment and append idempotent in the 441 * case where client doesn't receive the response from a successful operation and retries. We 442 * track the successful ops for some time via a nonce sent by client and handle duplicate 443 * operations (currently, by failing them; in future we might use MVCC to return result). Nonces 444 * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL 445 * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past 446 * records. If we don't read the records, we don't read and recover the nonces. Some WALs within 447 * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL 448 * recovery during normal region move, so nonces will not be transfered. We can have separate 449 * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path 450 * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a 451 * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part 452 * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't 453 * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be 454 * recovered during move. 455 */ 456 final ServerNonceManager nonceManager; 457 458 private BrokenStoreFileCleaner brokenStoreFileCleaner; 459 460 private RSMobFileCleanerChore rsMobFileCleanerChore; 461 462 @InterfaceAudience.Private 463 CompactedHFilesDischarger compactedFileDischarger; 464 465 private volatile ThroughputController flushThroughputController; 466 467 private SecureBulkLoadManager secureBulkLoadManager; 468 469 private FileSystemUtilizationChore fsUtilizationChore; 470 471 private BootstrapNodeManager bootstrapNodeManager; 472 473 /** 474 * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to 475 * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing 476 * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a 477 * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace 478 * {@link #TEST_SKIP_REPORTING_TRANSITION} ? 479 */ 480 private final boolean masterless; 481 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 482 483 /** regionserver codec list **/ 484 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 485 486 // A timer to shutdown the process if abort takes too long 487 private Timer abortMonitor; 488 489 private RegionReplicationBufferManager regionReplicationBufferManager; 490 491 /* 492 * Chore that creates replication marker rows. 493 */ 494 private ReplicationMarkerChore replicationMarkerChore; 495 496 // A timer submit requests to the PrefetchExecutor 497 private PrefetchExecutorNotifier prefetchExecutorNotifier; 498 499 /** 500 * Starts a HRegionServer at the default location. 501 * <p/> 502 * Don't start any services or managers in here in the Constructor. Defer till after we register 503 * with the Master as much as possible. See {@link #startServices}. 504 */ 505 public HRegionServer(final Configuration conf) throws IOException { 506 super(conf, "RegionServer"); // thread name 507 final Span span = TraceUtil.createSpan("HRegionServer.cxtor"); 508 try (Scope ignored = span.makeCurrent()) { 509 this.dataFsOk = true; 510 this.masterless = !clusterMode(); 511 MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf); 512 HFile.checkHFileVersion(this.conf); 513 checkCodecs(this.conf); 514 FSUtils.setupShortCircuitRead(this.conf); 515 516 // Disable usage of meta replicas in the regionserver 517 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 518 // Config'ed params 519 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 520 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 521 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 522 523 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 524 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 525 526 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 527 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 528 529 this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME, 530 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); 531 532 regionServerAccounting = new RegionServerAccounting(conf); 533 534 blockCache = BlockCacheFactory.createBlockCache(conf); 535 // The call below, instantiates the DataTieringManager only when 536 // the configuration "hbase.regionserver.datatiering.enable" is set to true. 537 DataTieringManager.instantiate(conf, onlineRegions); 538 539 mobFileCache = new MobFileCache(conf); 540 541 rsSnapshotVerifier = new RSSnapshotVerifier(conf); 542 543 uncaughtExceptionHandler = 544 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 545 546 // If no master in cluster, skip trying to track one or look for a cluster status. 547 if (!this.masterless) { 548 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 549 masterAddressTracker.start(); 550 } else { 551 masterAddressTracker = null; 552 } 553 this.rpcServices.start(zooKeeper); 554 span.setStatus(StatusCode.OK); 555 } catch (Throwable t) { 556 // Make sure we log the exception. HRegionServer is often started via reflection and the 557 // cause of failed startup is lost. 558 TraceUtil.setError(span, t); 559 LOG.error("Failed construction RegionServer", t); 560 throw t; 561 } finally { 562 span.end(); 563 } 564 } 565 566 // HMaster should override this method to load the specific config for master 567 @Override 568 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 569 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); 570 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 571 if (!StringUtils.isBlank(hostname)) { 572 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " 573 + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " 574 + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " 575 + UNSAFE_RS_HOSTNAME_KEY + " is used"; 576 throw new IOException(msg); 577 } else { 578 return DNS.getHostname(conf, DNS.ServerType.REGIONSERVER); 579 } 580 } else { 581 return hostname; 582 } 583 } 584 585 @Override 586 protected DNS.ServerType getDNSServerType() { 587 return DNS.ServerType.REGIONSERVER; 588 } 589 590 @Override 591 protected void login(UserProvider user, String host) throws IOException { 592 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 593 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 594 } 595 596 @Override 597 protected String getProcessName() { 598 return REGIONSERVER; 599 } 600 601 @Override 602 protected RegionServerCoprocessorHost getCoprocessorHost() { 603 return getRegionServerCoprocessorHost(); 604 } 605 606 @Override 607 protected boolean canCreateBaseZNode() { 608 return !clusterMode(); 609 } 610 611 @Override 612 protected boolean canUpdateTableDescriptor() { 613 return false; 614 } 615 616 @Override 617 protected boolean cacheTableDescriptor() { 618 return false; 619 } 620 621 protected RSRpcServices createRpcServices() throws IOException { 622 return new RSRpcServices(this); 623 } 624 625 @Override 626 protected void configureInfoServer(InfoServer infoServer) { 627 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 628 infoServer.setAttribute(REGIONSERVER, this); 629 } 630 631 @Override 632 protected Class<? extends HttpServlet> getDumpServlet() { 633 return RSDumpServlet.class; 634 } 635 636 /** 637 * Used by {@link RSDumpServlet} to generate debugging information. 638 */ 639 public void dumpRowLocks(final PrintWriter out) { 640 StringBuilder sb = new StringBuilder(); 641 for (HRegion region : getRegions()) { 642 if (region.getLockedRows().size() > 0) { 643 for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) { 644 sb.setLength(0); 645 sb.append(region.getTableDescriptor().getTableName()).append(",") 646 .append(region.getRegionInfo().getEncodedName()).append(","); 647 sb.append(rowLockContext.toString()); 648 out.println(sb); 649 } 650 } 651 } 652 } 653 654 @Override 655 public boolean registerService(Service instance) { 656 // No stacking of instances is allowed for a single executorService name 657 ServiceDescriptor serviceDesc = instance.getDescriptorForType(); 658 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 659 if (coprocessorServiceHandlers.containsKey(serviceName)) { 660 LOG.error("Coprocessor executorService " + serviceName 661 + " already registered, rejecting request from " + instance); 662 return false; 663 } 664 665 coprocessorServiceHandlers.put(serviceName, instance); 666 if (LOG.isDebugEnabled()) { 667 LOG.debug( 668 "Registered regionserver coprocessor executorService: executorService=" + serviceName); 669 } 670 return true; 671 } 672 673 /** 674 * Run test on configured codecs to make sure supporting libs are in place. 675 */ 676 private static void checkCodecs(final Configuration c) throws IOException { 677 // check to see if the codec list is available: 678 String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null); 679 if (codecs == null) { 680 return; 681 } 682 for (String codec : codecs) { 683 if (!CompressionTest.testCompression(codec)) { 684 throw new IOException( 685 "Compression codec " + codec + " not supported, aborting RS construction"); 686 } 687 } 688 } 689 690 public String getClusterId() { 691 return this.clusterId; 692 } 693 694 /** 695 * All initialization needed before we go register with Master.<br> 696 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 697 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 698 */ 699 private void preRegistrationInitialization() { 700 final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization"); 701 try (Scope ignored = span.makeCurrent()) { 702 initializeZooKeeper(); 703 setupClusterConnection(); 704 bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker); 705 regionReplicationBufferManager = new RegionReplicationBufferManager(this); 706 // Setup RPC client for master communication 707 this.rpcClient = asyncClusterConnection.getRpcClient(); 708 span.setStatus(StatusCode.OK); 709 } catch (Throwable t) { 710 // Call stop if error or process will stick around for ever since server 711 // puts up non-daemon threads. 712 TraceUtil.setError(span, t); 713 this.rpcServices.stop(); 714 abort("Initialization of RS failed. Hence aborting RS.", t); 715 } finally { 716 span.end(); 717 } 718 } 719 720 /** 721 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 722 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 723 * <p> 724 * Finally open long-living server short-circuit connection. 725 */ 726 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 727 justification = "cluster Id znode read would give us correct response") 728 private void initializeZooKeeper() throws IOException, InterruptedException { 729 // Nothing to do in here if no Master in the mix. 730 if (this.masterless) { 731 return; 732 } 733 734 // Create the master address tracker, register with zk, and start it. Then 735 // block until a master is available. No point in starting up if no master 736 // running. 737 blockAndCheckIfStopped(this.masterAddressTracker); 738 739 // Wait on cluster being up. Master will set this flag up in zookeeper 740 // when ready. 741 blockAndCheckIfStopped(this.clusterStatusTracker); 742 743 // If we are HMaster then the cluster id should have already been set. 744 if (clusterId == null) { 745 // Retrieve clusterId 746 // Since cluster status is now up 747 // ID should have already been set by HMaster 748 try { 749 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 750 if (clusterId == null) { 751 this.abort("Cluster ID has not been set"); 752 } 753 LOG.info("ClusterId : " + clusterId); 754 } catch (KeeperException e) { 755 this.abort("Failed to retrieve Cluster ID", e); 756 } 757 } 758 759 if (isStopped() || isAborted()) { 760 return; // No need for further initialization 761 } 762 763 // watch for snapshots and other procedures 764 try { 765 rspmHost = new RegionServerProcedureManagerHost(); 766 rspmHost.loadProcedures(conf); 767 rspmHost.initialize(this); 768 } catch (KeeperException e) { 769 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 770 } 771 } 772 773 /** 774 * Utilty method to wait indefinitely on a znode availability while checking if the region server 775 * is shut down 776 * @param tracker znode tracker to use 777 * @throws IOException any IO exception, plus if the RS is stopped 778 * @throws InterruptedException if the waiting thread is interrupted 779 */ 780 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 781 throws IOException, InterruptedException { 782 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 783 if (this.stopped) { 784 throw new IOException("Received the shutdown message while waiting."); 785 } 786 } 787 } 788 789 /** Returns True if the cluster is up. */ 790 @Override 791 public boolean isClusterUp() { 792 return this.masterless 793 || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 794 } 795 796 private void initializeReplicationMarkerChore() { 797 boolean replicationMarkerEnabled = 798 conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); 799 // If replication or replication marker is not enabled then return immediately. 800 if (replicationMarkerEnabled) { 801 int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY, 802 REPLICATION_MARKER_CHORE_DURATION_DEFAULT); 803 replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf); 804 } 805 } 806 807 @Override 808 public boolean isStopping() { 809 return stopping; 810 } 811 812 /** 813 * The HRegionServer sticks in this loop until closed. 814 */ 815 @Override 816 public void run() { 817 if (isStopped()) { 818 LOG.info("Skipping run; stopped"); 819 return; 820 } 821 try { 822 // Do pre-registration initializations; zookeeper, lease threads, etc. 823 preRegistrationInitialization(); 824 } catch (Throwable e) { 825 abort("Fatal exception during initialization", e); 826 } 827 828 try { 829 if (!isStopped() && !isAborted()) { 830 installShutdownHook(); 831 // Initialize the RegionServerCoprocessorHost now that our ephemeral 832 // node was created, in case any coprocessors want to use ZooKeeper 833 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 834 835 // Try and register with the Master; tell it we are here. Break if server is stopped or 836 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 837 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 838 // come up. 839 LOG.debug("About to register with Master."); 840 TraceUtil.trace(() -> { 841 RetryCounterFactory rcf = 842 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 843 RetryCounter rc = rcf.create(); 844 while (keepLooping()) { 845 RegionServerStartupResponse w = reportForDuty(); 846 if (w == null) { 847 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 848 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 849 this.sleeper.sleep(sleepTime); 850 } else { 851 handleReportForDutyResponse(w); 852 break; 853 } 854 } 855 }, "HRegionServer.registerWithMaster"); 856 } 857 858 if (!isStopped() && isHealthy()) { 859 TraceUtil.trace(() -> { 860 // start the snapshot handler and other procedure handlers, 861 // since the server is ready to run 862 if (this.rspmHost != null) { 863 this.rspmHost.start(); 864 } 865 // Start the Quota Manager 866 if (this.rsQuotaManager != null) { 867 rsQuotaManager.start(getRpcServer().getScheduler()); 868 } 869 if (this.rsSpaceQuotaManager != null) { 870 this.rsSpaceQuotaManager.start(); 871 } 872 }, "HRegionServer.startup"); 873 } 874 875 // We registered with the Master. Go into run mode. 876 long lastMsg = EnvironmentEdgeManager.currentTime(); 877 long oldRequestCount = -1; 878 // The main run loop. 879 while (!isStopped() && isHealthy()) { 880 if (!isClusterUp()) { 881 if (onlineRegions.isEmpty()) { 882 stop("Exiting; cluster shutdown set and not carrying any regions"); 883 } else if (!this.stopping) { 884 this.stopping = true; 885 LOG.info("Closing user regions"); 886 closeUserRegions(isAborted()); 887 } else { 888 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 889 if (allUserRegionsOffline) { 890 // Set stopped if no more write requests tp meta tables 891 // since last time we went around the loop. Any open 892 // meta regions will be closed on our way out. 893 if (oldRequestCount == getWriteRequestCount()) { 894 stop("Stopped; only catalog regions remaining online"); 895 break; 896 } 897 oldRequestCount = getWriteRequestCount(); 898 } else { 899 // Make sure all regions have been closed -- some regions may 900 // have not got it because we were splitting at the time of 901 // the call to closeUserRegions. 902 closeUserRegions(this.abortRequested.get()); 903 } 904 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 905 } 906 } 907 long now = EnvironmentEdgeManager.currentTime(); 908 if ((now - lastMsg) >= msgInterval) { 909 tryRegionServerReport(lastMsg, now); 910 lastMsg = EnvironmentEdgeManager.currentTime(); 911 } 912 if (!isStopped() && !isAborted()) { 913 this.sleeper.sleep(); 914 } 915 } // for 916 } catch (Throwable t) { 917 if (!rpcServices.checkOOME(t)) { 918 String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: "; 919 abort(prefix + t.getMessage(), t); 920 } 921 } 922 923 final Span span = TraceUtil.createSpan("HRegionServer exiting main loop"); 924 try (Scope ignored = span.makeCurrent()) { 925 if (this.leaseManager != null) { 926 this.leaseManager.closeAfterLeasesExpire(); 927 } 928 if (this.splitLogWorker != null) { 929 splitLogWorker.stop(); 930 } 931 stopInfoServer(); 932 // Send cache a shutdown. 933 if (blockCache != null) { 934 blockCache.shutdown(); 935 } 936 if (mobFileCache != null) { 937 mobFileCache.shutdown(); 938 } 939 940 // Send interrupts to wake up threads if sleeping so they notice shutdown. 941 // TODO: Should we check they are alive? If OOME could have exited already 942 if (this.hMemManager != null) { 943 this.hMemManager.stop(); 944 } 945 if (this.cacheFlusher != null) { 946 this.cacheFlusher.interruptIfNecessary(); 947 } 948 if (this.compactSplitThread != null) { 949 this.compactSplitThread.interruptIfNecessary(); 950 } 951 952 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 953 if (rspmHost != null) { 954 rspmHost.stop(this.abortRequested.get() || this.killed); 955 } 956 957 if (this.killed) { 958 // Just skip out w/o closing regions. Used when testing. 959 } else if (abortRequested.get()) { 960 if (this.dataFsOk) { 961 closeUserRegions(abortRequested.get()); // Don't leave any open file handles 962 } 963 LOG.info("aborting server " + this.serverName); 964 } else { 965 closeUserRegions(abortRequested.get()); 966 LOG.info("stopping server " + this.serverName); 967 } 968 regionReplicationBufferManager.stop(); 969 closeClusterConnection(); 970 // Closing the compactSplit thread before closing meta regions 971 if (!this.killed && containsMetaTableRegions()) { 972 if (!abortRequested.get() || this.dataFsOk) { 973 if (this.compactSplitThread != null) { 974 this.compactSplitThread.join(); 975 this.compactSplitThread = null; 976 } 977 closeMetaTableRegions(abortRequested.get()); 978 } 979 } 980 981 if (!this.killed && this.dataFsOk) { 982 waitOnAllRegionsToClose(abortRequested.get()); 983 LOG.info("stopping server " + this.serverName + "; all regions closed."); 984 } 985 986 // Stop the quota manager 987 if (rsQuotaManager != null) { 988 rsQuotaManager.stop(); 989 } 990 if (rsSpaceQuotaManager != null) { 991 rsSpaceQuotaManager.stop(); 992 rsSpaceQuotaManager = null; 993 } 994 995 // flag may be changed when closing regions throws exception. 996 if (this.dataFsOk) { 997 shutdownWAL(!abortRequested.get()); 998 } 999 1000 // Make sure the proxy is down. 1001 if (this.rssStub != null) { 1002 this.rssStub = null; 1003 } 1004 if (this.lockStub != null) { 1005 this.lockStub = null; 1006 } 1007 if (this.rpcClient != null) { 1008 this.rpcClient.close(); 1009 } 1010 if (this.leaseManager != null) { 1011 this.leaseManager.close(); 1012 } 1013 if (this.pauseMonitor != null) { 1014 this.pauseMonitor.stop(); 1015 } 1016 1017 if (!killed) { 1018 stopServiceThreads(); 1019 } 1020 1021 if (this.rpcServices != null) { 1022 this.rpcServices.stop(); 1023 } 1024 1025 try { 1026 deleteMyEphemeralNode(); 1027 } catch (KeeperException.NoNodeException nn) { 1028 // pass 1029 } catch (KeeperException e) { 1030 LOG.warn("Failed deleting my ephemeral node", e); 1031 } 1032 // We may have failed to delete the znode at the previous step, but 1033 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1034 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1035 1036 closeZooKeeper(); 1037 closeTableDescriptors(); 1038 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1039 span.setStatus(StatusCode.OK); 1040 } finally { 1041 span.end(); 1042 } 1043 } 1044 1045 private boolean containsMetaTableRegions() { 1046 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1047 } 1048 1049 private boolean areAllUserRegionsOffline() { 1050 if (getNumberOfOnlineRegions() > 2) { 1051 return false; 1052 } 1053 boolean allUserRegionsOffline = true; 1054 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1055 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1056 allUserRegionsOffline = false; 1057 break; 1058 } 1059 } 1060 return allUserRegionsOffline; 1061 } 1062 1063 /** Returns Current write count for all online regions. */ 1064 private long getWriteRequestCount() { 1065 long writeCount = 0; 1066 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1067 writeCount += e.getValue().getWriteRequestsCount(); 1068 } 1069 return writeCount; 1070 } 1071 1072 @InterfaceAudience.Private 1073 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1074 throws IOException { 1075 RegionServerStatusService.BlockingInterface rss = rssStub; 1076 if (rss == null) { 1077 // the current server could be stopping. 1078 return; 1079 } 1080 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1081 final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport"); 1082 try (Scope ignored = span.makeCurrent()) { 1083 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1084 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1085 request.setLoad(sl); 1086 rss.regionServerReport(null, request.build()); 1087 span.setStatus(StatusCode.OK); 1088 } catch (ServiceException se) { 1089 IOException ioe = ProtobufUtil.getRemoteException(se); 1090 if (ioe instanceof YouAreDeadException) { 1091 // This will be caught and handled as a fatal error in run() 1092 TraceUtil.setError(span, ioe); 1093 throw ioe; 1094 } 1095 if (rssStub == rss) { 1096 rssStub = null; 1097 } 1098 TraceUtil.setError(span, se); 1099 // Couldn't connect to the master, get location from zk and reconnect 1100 // Method blocks until new master is found or we are stopped 1101 createRegionServerStatusStub(true); 1102 } finally { 1103 span.end(); 1104 } 1105 } 1106 1107 /** 1108 * Reports the given map of Regions and their size on the filesystem to the active Master. 1109 * @param regionSizeStore The store containing region sizes 1110 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1111 */ 1112 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1113 RegionServerStatusService.BlockingInterface rss = rssStub; 1114 if (rss == null) { 1115 // the current server could be stopping. 1116 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1117 return true; 1118 } 1119 try { 1120 buildReportAndSend(rss, regionSizeStore); 1121 } catch (ServiceException se) { 1122 IOException ioe = ProtobufUtil.getRemoteException(se); 1123 if (ioe instanceof PleaseHoldException) { 1124 LOG.trace("Failed to report region sizes to Master because it is initializing." 1125 + " This will be retried.", ioe); 1126 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1127 return true; 1128 } 1129 if (rssStub == rss) { 1130 rssStub = null; 1131 } 1132 createRegionServerStatusStub(true); 1133 if (ioe instanceof DoNotRetryIOException) { 1134 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1135 if (doNotRetryEx.getCause() != null) { 1136 Throwable t = doNotRetryEx.getCause(); 1137 if (t instanceof UnsupportedOperationException) { 1138 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1139 return false; 1140 } 1141 } 1142 } 1143 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1144 } 1145 return true; 1146 } 1147 1148 /** 1149 * Builds the region size report and sends it to the master. Upon successful sending of the 1150 * report, the region sizes that were sent are marked as sent. 1151 * @param rss The stub to send to the Master 1152 * @param regionSizeStore The store containing region sizes 1153 */ 1154 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1155 RegionSizeStore regionSizeStore) throws ServiceException { 1156 RegionSpaceUseReportRequest request = 1157 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1158 rss.reportRegionSpaceUse(null, request); 1159 // Record the number of size reports sent 1160 if (metricsRegionServer != null) { 1161 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1162 } 1163 } 1164 1165 /** 1166 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1167 * @param regionSizes The size in bytes of regions 1168 * @return The corresponding protocol buffer message. 1169 */ 1170 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1171 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1172 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1173 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1174 } 1175 return request.build(); 1176 } 1177 1178 /** 1179 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf 1180 * message. 1181 * @param regionInfo The RegionInfo 1182 * @param sizeInBytes The size in bytes of the Region 1183 * @return The protocol buffer 1184 */ 1185 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1186 return RegionSpaceUse.newBuilder() 1187 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1188 .setRegionSize(Objects.requireNonNull(sizeInBytes)).build(); 1189 } 1190 1191 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1192 throws IOException { 1193 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1194 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1195 // the wrapper to compute those numbers in one place. 1196 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1197 // Instead they should be stored in an HBase table so that external visibility into HBase is 1198 // improved; Additionally the load balancer will be able to take advantage of a more complete 1199 // history. 1200 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1201 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1202 long usedMemory = -1L; 1203 long maxMemory = -1L; 1204 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1205 if (usage != null) { 1206 usedMemory = usage.getUsed(); 1207 maxMemory = usage.getMax(); 1208 } 1209 1210 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1211 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1212 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1213 serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024)); 1214 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1215 serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount()); 1216 serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount()); 1217 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1218 Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder(); 1219 for (String coprocessor : coprocessors) { 1220 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1221 } 1222 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1223 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1224 for (HRegion region : regions) { 1225 if (region.getCoprocessorHost() != null) { 1226 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1227 for (String regionCoprocessor : regionCoprocessors) { 1228 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1229 } 1230 } 1231 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1232 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1233 .getCoprocessors()) { 1234 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1235 } 1236 } 1237 1238 getBlockCache().ifPresent(cache -> { 1239 cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> { 1240 regionCachedInfo.forEach((regionName, prefetchSize) -> { 1241 serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB)); 1242 }); 1243 }); 1244 }); 1245 serverLoad.setCacheFreeSize(regionServerWrapper.getBlockCacheFreeSize()); 1246 if (DataTieringManager.getInstance() != null) { 1247 DataTieringManager.getInstance().getRegionColdDataSize() 1248 .forEach((regionName, coldDataSize) -> serverLoad.putRegionColdData(regionName, 1249 roundSize(coldDataSize.getSecond(), unitMB))); 1250 } 1251 serverLoad.setReportStartTime(reportStartTime); 1252 serverLoad.setReportEndTime(reportEndTime); 1253 if (this.infoServer != null) { 1254 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1255 } else { 1256 serverLoad.setInfoServerPort(-1); 1257 } 1258 MetricsUserAggregateSource userSource = 1259 metricsRegionServer.getMetricsUserAggregate().getSource(); 1260 if (userSource != null) { 1261 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1262 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1263 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1264 } 1265 } 1266 1267 if (sameReplicationSourceAndSink && replicationSourceHandler != null) { 1268 // always refresh first to get the latest value 1269 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1270 if (rLoad != null) { 1271 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1272 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1273 .getReplicationLoadSourceEntries()) { 1274 serverLoad.addReplLoadSource(rLS); 1275 } 1276 } 1277 } else { 1278 if (replicationSourceHandler != null) { 1279 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1280 if (rLoad != null) { 1281 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1282 .getReplicationLoadSourceEntries()) { 1283 serverLoad.addReplLoadSource(rLS); 1284 } 1285 } 1286 } 1287 if (replicationSinkHandler != null) { 1288 ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad(); 1289 if (rLoad != null) { 1290 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1291 } 1292 } 1293 } 1294 1295 TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask 1296 .newBuilder().setDescription(task.getDescription()) 1297 .setStatus(task.getStatus() != null ? task.getStatus() : "") 1298 .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) 1299 .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build())); 1300 1301 return serverLoad.build(); 1302 } 1303 1304 private String getOnlineRegionsAsPrintableString() { 1305 StringBuilder sb = new StringBuilder(); 1306 for (Region r : this.onlineRegions.values()) { 1307 if (sb.length() > 0) { 1308 sb.append(", "); 1309 } 1310 sb.append(r.getRegionInfo().getEncodedName()); 1311 } 1312 return sb.toString(); 1313 } 1314 1315 /** 1316 * Wait on regions close. 1317 */ 1318 private void waitOnAllRegionsToClose(final boolean abort) { 1319 // Wait till all regions are closed before going out. 1320 int lastCount = -1; 1321 long previousLogTime = 0; 1322 Set<String> closedRegions = new HashSet<>(); 1323 boolean interrupted = false; 1324 try { 1325 while (!onlineRegions.isEmpty()) { 1326 int count = getNumberOfOnlineRegions(); 1327 // Only print a message if the count of regions has changed. 1328 if (count != lastCount) { 1329 // Log every second at most 1330 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 1331 previousLogTime = EnvironmentEdgeManager.currentTime(); 1332 lastCount = count; 1333 LOG.info("Waiting on " + count + " regions to close"); 1334 // Only print out regions still closing if a small number else will 1335 // swamp the log. 1336 if (count < 10 && LOG.isDebugEnabled()) { 1337 LOG.debug("Online Regions=" + this.onlineRegions); 1338 } 1339 } 1340 } 1341 // Ensure all user regions have been sent a close. Use this to 1342 // protect against the case where an open comes in after we start the 1343 // iterator of onlineRegions to close all user regions. 1344 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1345 RegionInfo hri = e.getValue().getRegionInfo(); 1346 if ( 1347 !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) 1348 && !closedRegions.contains(hri.getEncodedName()) 1349 ) { 1350 closedRegions.add(hri.getEncodedName()); 1351 // Don't update zk with this close transition; pass false. 1352 closeRegionIgnoreErrors(hri, abort); 1353 } 1354 } 1355 // No regions in RIT, we could stop waiting now. 1356 if (this.regionsInTransitionInRS.isEmpty()) { 1357 if (!onlineRegions.isEmpty()) { 1358 LOG.info("We were exiting though online regions are not empty," 1359 + " because some regions failed closing"); 1360 } 1361 break; 1362 } else { 1363 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream() 1364 .map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1365 } 1366 if (sleepInterrupted(200)) { 1367 interrupted = true; 1368 } 1369 } 1370 } finally { 1371 if (interrupted) { 1372 Thread.currentThread().interrupt(); 1373 } 1374 } 1375 } 1376 1377 private static boolean sleepInterrupted(long millis) { 1378 boolean interrupted = false; 1379 try { 1380 Thread.sleep(millis); 1381 } catch (InterruptedException e) { 1382 LOG.warn("Interrupted while sleeping"); 1383 interrupted = true; 1384 } 1385 return interrupted; 1386 } 1387 1388 private void shutdownWAL(final boolean close) { 1389 if (this.walFactory != null) { 1390 try { 1391 if (close) { 1392 walFactory.close(); 1393 } else { 1394 walFactory.shutdown(); 1395 } 1396 } catch (Throwable e) { 1397 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1398 LOG.error("Shutdown / close of WAL failed: " + e); 1399 LOG.debug("Shutdown / close exception details:", e); 1400 } 1401 } 1402 } 1403 1404 /** 1405 * Run init. Sets up wal and starts up all server threads. 1406 * @param c Extra configuration. 1407 */ 1408 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1409 throws IOException { 1410 try { 1411 boolean updateRootDir = false; 1412 for (NameStringPair e : c.getMapEntriesList()) { 1413 String key = e.getName(); 1414 // The hostname the master sees us as. 1415 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1416 String hostnameFromMasterPOV = e.getValue(); 1417 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, 1418 rpcServices.getSocketAddress().getPort(), this.startcode); 1419 String expectedHostName = rpcServices.getSocketAddress().getHostName(); 1420 // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it 1421 // is set to disable. so we will use the ip of the RegionServer to compare with the 1422 // hostname passed by the Master, see HBASE-27304 for details. 1423 if ( 1424 StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent() 1425 && InetAddresses.isInetAddress(getActiveMaster().get().getHostname()) 1426 ) { 1427 expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress(); 1428 } 1429 boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead) 1430 ? hostnameFromMasterPOV.equals(expectedHostName) 1431 : hostnameFromMasterPOV.equals(useThisHostnameInstead); 1432 if (!isHostnameConsist) { 1433 String msg = "Master passed us a different hostname to use; was=" 1434 + (StringUtils.isBlank(useThisHostnameInstead) 1435 ? expectedHostName 1436 : this.useThisHostnameInstead) 1437 + ", but now=" + hostnameFromMasterPOV; 1438 LOG.error(msg); 1439 throw new IOException(msg); 1440 } 1441 continue; 1442 } 1443 1444 String value = e.getValue(); 1445 if (key.equals(HConstants.HBASE_DIR)) { 1446 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1447 updateRootDir = true; 1448 } 1449 } 1450 1451 if (LOG.isDebugEnabled()) { 1452 LOG.debug("Config from master: " + key + "=" + value); 1453 } 1454 this.conf.set(key, value); 1455 } 1456 // Set our ephemeral znode up in zookeeper now we have a name. 1457 createMyEphemeralNode(); 1458 1459 if (updateRootDir) { 1460 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1461 initializeFileSystem(); 1462 } 1463 1464 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1465 // config param for task trackers, but we can piggyback off of it. 1466 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1467 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1468 } 1469 1470 // Save it in a file, this will allow to see if we crash 1471 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1472 1473 // This call sets up an initialized replication and WAL. Later we start it up. 1474 setupWALAndReplication(); 1475 // Init in here rather than in constructor after thread name has been set 1476 final MetricsTable metricsTable = 1477 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1478 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1479 this.metricsRegionServer = 1480 new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable); 1481 // Now that we have a metrics source, start the pause monitor 1482 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1483 pauseMonitor.start(); 1484 1485 // There is a rare case where we do NOT want services to start. Check config. 1486 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1487 startServices(); 1488 } 1489 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1490 // or make sense of it. 1491 startReplicationService(); 1492 1493 // Set up ZK 1494 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress() 1495 + ", sessionid=0x" 1496 + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1497 1498 // Wake up anyone waiting for this server to online 1499 synchronized (online) { 1500 online.set(true); 1501 online.notifyAll(); 1502 } 1503 } catch (Throwable e) { 1504 stop("Failed initialization"); 1505 throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed"); 1506 } finally { 1507 sleeper.skipSleepCycle(); 1508 } 1509 } 1510 1511 private void startHeapMemoryManager() { 1512 if (this.blockCache != null) { 1513 this.hMemManager = 1514 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1515 this.hMemManager.start(getChoreService()); 1516 } 1517 } 1518 1519 private void createMyEphemeralNode() throws KeeperException { 1520 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1521 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1522 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1523 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1524 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1525 } 1526 1527 private void deleteMyEphemeralNode() throws KeeperException { 1528 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1529 } 1530 1531 @Override 1532 public RegionServerAccounting getRegionServerAccounting() { 1533 return regionServerAccounting; 1534 } 1535 1536 // Round the size with KB or MB. 1537 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1 1538 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See 1539 // HBASE-26340 for more details on why this is important. 1540 private static int roundSize(long sizeInByte, int sizeUnit) { 1541 if (sizeInByte == 0) { 1542 return 0; 1543 } else if (sizeInByte < sizeUnit) { 1544 return 1; 1545 } else { 1546 return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE); 1547 } 1548 } 1549 1550 /** 1551 * @param r Region to get RegionLoad for. 1552 * @param regionLoadBldr the RegionLoad.Builder, can be null 1553 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1554 * @return RegionLoad instance. 1555 */ 1556 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1557 RegionSpecifier.Builder regionSpecifier) throws IOException { 1558 byte[] name = r.getRegionInfo().getRegionName(); 1559 String regionEncodedName = r.getRegionInfo().getEncodedName(); 1560 int stores = 0; 1561 int storefiles = 0; 1562 int storeRefCount = 0; 1563 int maxCompactedStoreFileRefCount = 0; 1564 long storeUncompressedSize = 0L; 1565 long storefileSize = 0L; 1566 long storefileIndexSize = 0L; 1567 long rootLevelIndexSize = 0L; 1568 long totalStaticIndexSize = 0L; 1569 long totalStaticBloomSize = 0L; 1570 long totalCompactingKVs = 0L; 1571 long currentCompactedKVs = 0L; 1572 long totalRegionSize = 0L; 1573 List<HStore> storeList = r.getStores(); 1574 stores += storeList.size(); 1575 for (HStore store : storeList) { 1576 storefiles += store.getStorefilesCount(); 1577 int currentStoreRefCount = store.getStoreRefCount(); 1578 storeRefCount += currentStoreRefCount; 1579 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1580 maxCompactedStoreFileRefCount = 1581 Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); 1582 storeUncompressedSize += store.getStoreSizeUncompressed(); 1583 storefileSize += store.getStorefilesSize(); 1584 totalRegionSize += store.getHFilesSize(); 1585 // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1586 storefileIndexSize += store.getStorefilesRootLevelIndexSize(); 1587 CompactionProgress progress = store.getCompactionProgress(); 1588 if (progress != null) { 1589 totalCompactingKVs += progress.getTotalCompactingKVs(); 1590 currentCompactedKVs += progress.currentCompactedKVs; 1591 } 1592 rootLevelIndexSize += store.getStorefilesRootLevelIndexSize(); 1593 totalStaticIndexSize += store.getTotalStaticIndexSize(); 1594 totalStaticBloomSize += store.getTotalStaticBloomSize(); 1595 } 1596 1597 int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); 1598 int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); 1599 int storefileSizeMB = roundSize(storefileSize, unitMB); 1600 int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB); 1601 int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); 1602 int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); 1603 int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); 1604 int regionSizeMB = roundSize(totalRegionSize, unitMB); 1605 final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f); 1606 getBlockCache().ifPresent(bc -> { 1607 bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> { 1608 if (regionCachedInfo.containsKey(regionEncodedName)) { 1609 currentRegionCachedRatio.setValue(regionSizeMB == 0 1610 ? 0.0f 1611 : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB); 1612 } 1613 }); 1614 }); 1615 final MutableFloat currentRegionColdDataRatio = new MutableFloat(0.0f); 1616 if (DataTieringManager.getInstance() != null) { 1617 DataTieringManager.getInstance().getRegionColdDataSize().computeIfPresent(regionEncodedName, 1618 (k, v) -> { 1619 int coldSizeMB = roundSize(v.getSecond(), unitMB); 1620 currentRegionColdDataRatio 1621 .setValue(regionSizeMB == 0 ? 0.0f : (float) coldSizeMB / regionSizeMB); 1622 return v; 1623 }); 1624 } 1625 1626 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); 1627 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); 1628 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname()); 1629 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); 1630 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); 1631 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); 1632 if (regionLoadBldr == null) { 1633 regionLoadBldr = RegionLoad.newBuilder(); 1634 } 1635 if (regionSpecifier == null) { 1636 regionSpecifier = RegionSpecifier.newBuilder(); 1637 } 1638 1639 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1640 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1641 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores) 1642 .setStorefiles(storefiles).setStoreRefCount(storeRefCount) 1643 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1644 .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB) 1645 .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB) 1646 .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1647 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1648 .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount()) 1649 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1650 .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs) 1651 .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality) 1652 .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) 1653 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) 1654 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) 1655 .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB) 1656 .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue()) 1657 .setCurrentRegionColdDataRatio(currentRegionColdDataRatio.floatValue()); 1658 r.setCompleteSequenceId(regionLoadBldr); 1659 return regionLoadBldr.build(); 1660 } 1661 1662 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1663 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1664 userLoadBldr.setUserName(user); 1665 userSource.getClientMetrics().values().stream() 1666 .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1667 .setHostName(clientMetrics.getHostName()) 1668 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1669 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1670 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) 1671 .forEach(userLoadBldr::addClientMetrics); 1672 return userLoadBldr.build(); 1673 } 1674 1675 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1676 HRegion r = onlineRegions.get(encodedRegionName); 1677 return r != null ? createRegionLoad(r, null, null) : null; 1678 } 1679 1680 /** 1681 * Inner class that runs on a long period checking if regions need compaction. 1682 */ 1683 private static class CompactionChecker extends ScheduledChore { 1684 private final HRegionServer instance; 1685 private final int majorCompactPriority; 1686 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1687 // Iteration is 1-based rather than 0-based so we don't check for compaction 1688 // immediately upon region server startup 1689 private long iteration = 1; 1690 1691 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1692 super("CompactionChecker", stopper, sleepTime); 1693 this.instance = h; 1694 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1695 1696 /* 1697 * MajorCompactPriority is configurable. If not set, the compaction will use default priority. 1698 */ 1699 this.majorCompactPriority = this.instance.conf 1700 .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); 1701 } 1702 1703 @Override 1704 protected void chore() { 1705 for (HRegion hr : this.instance.onlineRegions.values()) { 1706 // If region is read only or compaction is disabled at table level, there's no need to 1707 // iterate through region's stores 1708 if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) { 1709 continue; 1710 } 1711 1712 for (HStore s : hr.stores.values()) { 1713 try { 1714 long multiplier = s.getCompactionCheckMultiplier(); 1715 assert multiplier > 0; 1716 if (iteration % multiplier != 0) { 1717 continue; 1718 } 1719 if (s.needsCompaction()) { 1720 // Queue a compaction. Will recognize if major is needed. 1721 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1722 getName() + " requests compaction"); 1723 } else if (s.shouldPerformMajorCompaction()) { 1724 s.triggerMajorCompaction(); 1725 if ( 1726 majorCompactPriority == DEFAULT_PRIORITY 1727 || majorCompactPriority > hr.getCompactPriority() 1728 ) { 1729 this.instance.compactSplitThread.requestCompaction(hr, s, 1730 getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, 1731 CompactionLifeCycleTracker.DUMMY, null); 1732 } else { 1733 this.instance.compactSplitThread.requestCompaction(hr, s, 1734 getName() + " requests major compaction; use configured priority", 1735 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1736 } 1737 } 1738 } catch (IOException e) { 1739 LOG.warn("Failed major compaction check on " + hr, e); 1740 } 1741 } 1742 } 1743 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1744 } 1745 } 1746 1747 private static class PeriodicMemStoreFlusher extends ScheduledChore { 1748 private final HRegionServer server; 1749 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1750 private final static int MIN_DELAY_TIME = 0; // millisec 1751 private final long rangeOfDelayMs; 1752 1753 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1754 super("MemstoreFlusherChore", server, cacheFlushInterval); 1755 this.server = server; 1756 1757 final long configuredRangeOfDelay = server.getConfiguration() 1758 .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 1759 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 1760 } 1761 1762 @Override 1763 protected void chore() { 1764 final StringBuilder whyFlush = new StringBuilder(); 1765 for (HRegion r : this.server.onlineRegions.values()) { 1766 if (r == null) { 1767 continue; 1768 } 1769 if (r.shouldFlush(whyFlush)) { 1770 FlushRequester requester = server.getFlushRequester(); 1771 if (requester != null) { 1772 long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME; 1773 // Throttle the flushes by putting a delay. If we don't throttle, and there 1774 // is a balanced write-load on the regions in a table, we might end up 1775 // overwhelming the filesystem with too many flushes at once. 1776 if (requester.requestDelayedFlush(r, delay)) { 1777 LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(), 1778 r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay); 1779 } 1780 } 1781 } 1782 } 1783 } 1784 } 1785 1786 /** 1787 * Report the status of the server. A server is online once all the startup is completed (setting 1788 * up filesystem, starting executorService threads, etc.). This method is designed mostly to be 1789 * useful in tests. 1790 * @return true if online, false if not. 1791 */ 1792 public boolean isOnline() { 1793 return online.get(); 1794 } 1795 1796 /** 1797 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1798 * be hooked up to WAL. 1799 */ 1800 private void setupWALAndReplication() throws IOException { 1801 WALFactory factory = new WALFactory(conf, serverName, this); 1802 // TODO Replication make assumptions here based on the default filesystem impl 1803 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1804 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1805 1806 Path logDir = new Path(walRootDir, logName); 1807 LOG.debug("logDir={}", logDir); 1808 if (this.walFs.exists(logDir)) { 1809 throw new RegionServerRunningException( 1810 "Region server has already created directory at " + this.serverName.toString()); 1811 } 1812 // Create wal directory here and we will never create it again in other places. This is 1813 // important to make sure that our fencing way takes effect. See HBASE-29797 for more details. 1814 if (!this.walFs.mkdirs(logDir)) { 1815 throw new IOException("Can not create wal directory " + logDir); 1816 } 1817 // Instantiate replication if replication enabled. Pass it the log directories. 1818 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); 1819 1820 WALActionsListener walEventListener = getWALEventTrackerListener(conf); 1821 if (walEventListener != null && factory.getWALProvider() != null) { 1822 factory.getWALProvider().addWALActionsListener(walEventListener); 1823 } 1824 this.walFactory = factory; 1825 } 1826 1827 private WALActionsListener getWALEventTrackerListener(Configuration conf) { 1828 if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) { 1829 WALEventTrackerListener listener = 1830 new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName()); 1831 return listener; 1832 } 1833 return null; 1834 } 1835 1836 /** 1837 * Start up replication source and sink handlers. 1838 */ 1839 private void startReplicationService() throws IOException { 1840 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 1841 this.replicationSourceHandler.startReplicationService(); 1842 } else { 1843 if (this.replicationSourceHandler != null) { 1844 this.replicationSourceHandler.startReplicationService(); 1845 } 1846 if (this.replicationSinkHandler != null) { 1847 this.replicationSinkHandler.startReplicationService(); 1848 } 1849 } 1850 } 1851 1852 /** Returns Master address tracker instance. */ 1853 public MasterAddressTracker getMasterAddressTracker() { 1854 return this.masterAddressTracker; 1855 } 1856 1857 /** 1858 * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need 1859 * to run. This is called after we've successfully registered with the Master. Install an 1860 * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We 1861 * cannot set the handler on all threads. Server's internal Listener thread is off limits. For 1862 * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries 1863 * to run should trigger same critical condition and the shutdown will run. On its way out, this 1864 * server will shut down Server. Leases are sort of inbetween. It has an internal thread that 1865 * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped 1866 * by this hosting server. Worker logs the exception and exits. 1867 */ 1868 private void startServices() throws IOException { 1869 if (!isStopped() && !isAborted()) { 1870 initializeThreads(); 1871 } 1872 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection); 1873 this.secureBulkLoadManager.start(); 1874 1875 // Health checker thread. 1876 if (isHealthCheckerConfigured()) { 1877 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1878 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1879 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1880 } 1881 // Executor status collect thread. 1882 if ( 1883 this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED, 1884 HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED) 1885 ) { 1886 int sleepTime = 1887 this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ); 1888 executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(), 1889 this.metricsRegionServer.getMetricsSource()); 1890 } 1891 1892 this.walRoller = new LogRoller(this); 1893 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1894 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 1895 1896 // Create the CompactedFileDischarger chore executorService. This chore helps to 1897 // remove the compacted files that will no longer be used in reads. 1898 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1899 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1900 int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1901 this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this); 1902 choreService.scheduleChore(compactedFileDischarger); 1903 1904 // Start executor services 1905 final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3); 1906 executorService.startExecutorService(executorService.new ExecutorConfig() 1907 .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads)); 1908 final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1); 1909 executorService.startExecutorService(executorService.new ExecutorConfig() 1910 .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads)); 1911 final int openPriorityRegionThreads = 1912 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3); 1913 executorService.startExecutorService( 1914 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION) 1915 .setCorePoolSize(openPriorityRegionThreads)); 1916 final int closeRegionThreads = 1917 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3); 1918 executorService.startExecutorService(executorService.new ExecutorConfig() 1919 .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads)); 1920 final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1); 1921 executorService.startExecutorService(executorService.new ExecutorConfig() 1922 .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads)); 1923 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1924 final int storeScannerParallelSeekThreads = 1925 conf.getInt("hbase.storescanner.parallel.seek.threads", 10); 1926 executorService.startExecutorService( 1927 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK) 1928 .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true)); 1929 } 1930 final int logReplayOpsThreads = 1931 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 1932 executorService.startExecutorService( 1933 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS) 1934 .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true)); 1935 // Start the threads for compacted files discharger 1936 final int compactionDischargerThreads = 1937 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10); 1938 executorService.startExecutorService(executorService.new ExecutorConfig() 1939 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER) 1940 .setCorePoolSize(compactionDischargerThreads)); 1941 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1942 final int regionReplicaFlushThreads = 1943 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1944 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1945 executorService.startExecutorService(executorService.new ExecutorConfig() 1946 .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS) 1947 .setCorePoolSize(regionReplicaFlushThreads)); 1948 } 1949 final int refreshPeerThreads = 1950 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2); 1951 executorService.startExecutorService(executorService.new ExecutorConfig() 1952 .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads)); 1953 final int replaySyncReplicationWALThreads = 1954 conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1); 1955 executorService.startExecutorService(executorService.new ExecutorConfig() 1956 .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL) 1957 .setCorePoolSize(replaySyncReplicationWALThreads)); 1958 final int switchRpcThrottleThreads = 1959 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); 1960 executorService.startExecutorService( 1961 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE) 1962 .setCorePoolSize(switchRpcThrottleThreads)); 1963 final int claimReplicationQueueThreads = 1964 conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); 1965 executorService.startExecutorService( 1966 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE) 1967 .setCorePoolSize(claimReplicationQueueThreads)); 1968 final int rsSnapshotOperationThreads = 1969 conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); 1970 executorService.startExecutorService( 1971 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) 1972 .setCorePoolSize(rsSnapshotOperationThreads)); 1973 final int rsFlushOperationThreads = 1974 conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3); 1975 executorService.startExecutorService(executorService.new ExecutorConfig() 1976 .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads)); 1977 final int rsRefreshQuotasThreads = 1978 conf.getInt("hbase.regionserver.executor.refresh.quotas.threads", 1); 1979 executorService.startExecutorService( 1980 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS) 1981 .setCorePoolSize(rsRefreshQuotasThreads)); 1982 final int logRollThreads = conf.getInt("hbase.regionserver.executor.log.roll.threads", 1); 1983 executorService.startExecutorService(executorService.new ExecutorConfig() 1984 .setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads)); 1985 1986 Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", 1987 uncaughtExceptionHandler); 1988 if (this.cacheFlusher != null) { 1989 this.cacheFlusher.start(uncaughtExceptionHandler); 1990 } 1991 Threads.setDaemonThreadRunning(this.procedureResultReporter, 1992 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 1993 1994 if (this.compactionChecker != null) { 1995 choreService.scheduleChore(compactionChecker); 1996 } 1997 if (this.periodicFlusher != null) { 1998 choreService.scheduleChore(periodicFlusher); 1999 } 2000 if (this.healthCheckChore != null) { 2001 choreService.scheduleChore(healthCheckChore); 2002 } 2003 if (this.executorStatusChore != null) { 2004 choreService.scheduleChore(executorStatusChore); 2005 } 2006 if (this.nonceManagerChore != null) { 2007 choreService.scheduleChore(nonceManagerChore); 2008 } 2009 if (this.storefileRefresher != null) { 2010 choreService.scheduleChore(storefileRefresher); 2011 } 2012 if (this.fsUtilizationChore != null) { 2013 choreService.scheduleChore(fsUtilizationChore); 2014 } 2015 if (this.namedQueueServiceChore != null) { 2016 choreService.scheduleChore(namedQueueServiceChore); 2017 } 2018 if (this.brokenStoreFileCleaner != null) { 2019 choreService.scheduleChore(brokenStoreFileCleaner); 2020 } 2021 if (this.rsMobFileCleanerChore != null) { 2022 choreService.scheduleChore(rsMobFileCleanerChore); 2023 } 2024 if (replicationMarkerChore != null) { 2025 LOG.info("Starting replication marker chore"); 2026 choreService.scheduleChore(replicationMarkerChore); 2027 } 2028 2029 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 2030 // an unhandled exception, it will just exit. 2031 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 2032 uncaughtExceptionHandler); 2033 2034 // Create the log splitting worker and start it 2035 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 2036 // quite a while inside Connection layer. The worker won't be available for other 2037 // tasks even after current task is preempted after a split task times out. 2038 Configuration sinkConf = HBaseConfiguration.create(conf); 2039 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2040 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 2041 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2042 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 2043 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 2044 if ( 2045 this.csm != null 2046 && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 2047 ) { 2048 // SplitLogWorker needs csm. If none, don't start this. 2049 this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); 2050 splitLogWorker.start(); 2051 LOG.debug("SplitLogWorker started"); 2052 } 2053 2054 // Memstore services. 2055 startHeapMemoryManager(); 2056 // Call it after starting HeapMemoryManager. 2057 initializeMemStoreChunkCreator(hMemManager); 2058 } 2059 2060 private void initializeThreads() { 2061 // Cache flushing thread. 2062 this.cacheFlusher = new MemStoreFlusher(conf, this); 2063 2064 // Compaction thread 2065 this.compactSplitThread = new CompactSplit(this); 2066 2067 // Prefetch Notifier 2068 this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 2069 2070 // Background thread to check for compactions; needed if region has not gotten updates 2071 // in a while. It will take care of not checking too frequently on store-by-store basis. 2072 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 2073 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 2074 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 2075 2076 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 2077 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 2078 final boolean walEventTrackerEnabled = 2079 conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); 2080 2081 if (isSlowLogTableEnabled || walEventTrackerEnabled) { 2082 // default chore duration: 10 min 2083 // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property 2084 final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY, 2085 DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION); 2086 2087 final int namedQueueChoreDuration = 2088 conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT); 2089 // Considering min of slowLogChoreDuration and namedQueueChoreDuration 2090 int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration); 2091 2092 namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration, 2093 this.namedQueueRecorder, this.getConnection()); 2094 } 2095 2096 if (this.nonceManager != null) { 2097 // Create the scheduled chore that cleans up nonces. 2098 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2099 } 2100 2101 // Setup the Quota Manager 2102 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2103 configurationManager.registerObserver(rsQuotaManager); 2104 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2105 2106 if (QuotaUtil.isQuotaEnabled(conf)) { 2107 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2108 } 2109 2110 boolean onlyMetaRefresh = false; 2111 int storefileRefreshPeriod = 2112 conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2113 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2114 if (storefileRefreshPeriod == 0) { 2115 storefileRefreshPeriod = 2116 conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2117 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2118 onlyMetaRefresh = true; 2119 } 2120 if (storefileRefreshPeriod > 0) { 2121 this.storefileRefresher = 2122 new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); 2123 } 2124 2125 int brokenStoreFileCleanerPeriod = 2126 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, 2127 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); 2128 int brokenStoreFileCleanerDelay = 2129 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, 2130 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); 2131 double brokenStoreFileCleanerDelayJitter = 2132 conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, 2133 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); 2134 double jitterRate = 2135 (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; 2136 long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); 2137 this.brokenStoreFileCleaner = 2138 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), 2139 brokenStoreFileCleanerPeriod, this, conf, this); 2140 2141 this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); 2142 2143 registerConfigurationObservers(); 2144 initializeReplicationMarkerChore(); 2145 } 2146 2147 private void registerConfigurationObservers() { 2148 // Register Replication if possible, as now we support recreating replication peer storage, for 2149 // migrating across different replication peer storages online 2150 if (replicationSourceHandler instanceof ConfigurationObserver) { 2151 configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler); 2152 } 2153 if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) { 2154 configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler); 2155 } 2156 // Registering the compactSplitThread object with the ConfigurationManager. 2157 configurationManager.registerObserver(this.compactSplitThread); 2158 configurationManager.registerObserver(this.cacheFlusher); 2159 configurationManager.registerObserver(this.rpcServices); 2160 configurationManager.registerObserver(this.prefetchExecutorNotifier); 2161 configurationManager.registerObserver(this); 2162 } 2163 2164 /* 2165 * Verify that server is healthy 2166 */ 2167 private boolean isHealthy() { 2168 if (!dataFsOk) { 2169 // File system problem 2170 return false; 2171 } 2172 // Verify that all threads are alive 2173 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2174 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2175 && (this.walRoller == null || this.walRoller.isAlive()) 2176 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2177 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2178 if (!healthy) { 2179 stop("One or more threads are no longer alive -- stop"); 2180 } 2181 return healthy; 2182 } 2183 2184 @Override 2185 public List<WAL> getWALs() { 2186 return walFactory.getWALs(); 2187 } 2188 2189 @Override 2190 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2191 WAL wal = walFactory.getWAL(regionInfo); 2192 if (this.walRoller != null) { 2193 this.walRoller.addWAL(wal); 2194 } 2195 return wal; 2196 } 2197 2198 public LogRoller getWalRoller() { 2199 return walRoller; 2200 } 2201 2202 public WALFactory getWalFactory() { 2203 return walFactory; 2204 } 2205 2206 @Override 2207 public void stop(final String msg) { 2208 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2209 } 2210 2211 /** 2212 * Stops the regionserver. 2213 * @param msg Status message 2214 * @param force True if this is a regionserver abort 2215 * @param user The user executing the stop request, or null if no user is associated 2216 */ 2217 public void stop(final String msg, final boolean force, final User user) { 2218 if (!this.stopped) { 2219 LOG.info("***** STOPPING region server '{}' *****", this); 2220 if (this.rsHost != null) { 2221 // when forced via abort don't allow CPs to override 2222 try { 2223 this.rsHost.preStop(msg, user); 2224 } catch (IOException ioe) { 2225 if (!force) { 2226 LOG.warn("The region server did not stop", ioe); 2227 return; 2228 } 2229 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2230 } 2231 } 2232 this.stopped = true; 2233 LOG.info("STOPPED: " + msg); 2234 // Wakes run() if it is sleeping 2235 sleeper.skipSleepCycle(); 2236 } 2237 } 2238 2239 public void waitForServerOnline() { 2240 while (!isStopped() && !isOnline()) { 2241 synchronized (online) { 2242 try { 2243 online.wait(msgInterval); 2244 } catch (InterruptedException ie) { 2245 Thread.currentThread().interrupt(); 2246 break; 2247 } 2248 } 2249 } 2250 } 2251 2252 @Override 2253 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2254 HRegion r = context.getRegion(); 2255 long openProcId = context.getOpenProcId(); 2256 long masterSystemTime = context.getMasterSystemTime(); 2257 long initiatingMasterActiveTime = context.getInitiatingMasterActiveTime(); 2258 rpcServices.checkOpen(); 2259 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2260 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2261 // Do checks to see if we need to compact (references or too many files) 2262 // Skip compaction check if region is read only 2263 if (!r.isReadOnly()) { 2264 for (HStore s : r.stores.values()) { 2265 if (s.hasReferences() || s.needsCompaction()) { 2266 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2267 } 2268 } 2269 } 2270 long openSeqNum = r.getOpenSeqNum(); 2271 if (openSeqNum == HConstants.NO_SEQNUM) { 2272 // If we opened a region, we should have read some sequence number from it. 2273 LOG.error( 2274 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2275 openSeqNum = 0; 2276 } 2277 2278 // Notify master 2279 if ( 2280 !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2281 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo(), initiatingMasterActiveTime)) 2282 ) { 2283 throw new IOException( 2284 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2285 } 2286 2287 triggerFlushInPrimaryRegion(r); 2288 2289 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2290 } 2291 2292 /** 2293 * Helper method for use in tests. Skip the region transition report when there's no master around 2294 * to receive it. 2295 */ 2296 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2297 final TransitionCode code = context.getCode(); 2298 final long openSeqNum = context.getOpenSeqNum(); 2299 long masterSystemTime = context.getMasterSystemTime(); 2300 final RegionInfo[] hris = context.getHris(); 2301 2302 if (code == TransitionCode.OPENED) { 2303 Preconditions.checkArgument(hris != null && hris.length == 1); 2304 if (hris[0].isMetaRegion()) { 2305 LOG.warn( 2306 "meta table location is stored in master local store, so we can not skip reporting"); 2307 return false; 2308 } else { 2309 try { 2310 MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0], 2311 serverName, openSeqNum, masterSystemTime); 2312 } catch (IOException e) { 2313 LOG.info("Failed to update meta", e); 2314 return false; 2315 } 2316 } 2317 } 2318 return true; 2319 } 2320 2321 private ReportRegionStateTransitionRequest 2322 createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) { 2323 final TransitionCode code = context.getCode(); 2324 final long openSeqNum = context.getOpenSeqNum(); 2325 final RegionInfo[] hris = context.getHris(); 2326 final long[] procIds = context.getProcIds(); 2327 2328 ReportRegionStateTransitionRequest.Builder builder = 2329 ReportRegionStateTransitionRequest.newBuilder(); 2330 builder.setServer(ProtobufUtil.toServerName(serverName)); 2331 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2332 transition.setTransitionCode(code); 2333 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2334 transition.setOpenSeqNum(openSeqNum); 2335 } 2336 for (RegionInfo hri : hris) { 2337 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2338 } 2339 for (long procId : procIds) { 2340 transition.addProcId(procId); 2341 } 2342 transition.setInitiatingMasterActiveTime(context.getInitiatingMasterActiveTime()); 2343 2344 return builder.build(); 2345 } 2346 2347 @Override 2348 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2349 if (TEST_SKIP_REPORTING_TRANSITION) { 2350 return skipReportingTransition(context); 2351 } 2352 final ReportRegionStateTransitionRequest request = 2353 createReportRegionStateTransitionRequest(context); 2354 2355 int tries = 0; 2356 long pauseTime = this.retryPauseTime; 2357 // Keep looping till we get an error. We want to send reports even though server is going down. 2358 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2359 // HRegionServer does down. 2360 while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) { 2361 RegionServerStatusService.BlockingInterface rss = rssStub; 2362 try { 2363 if (rss == null) { 2364 createRegionServerStatusStub(); 2365 continue; 2366 } 2367 ReportRegionStateTransitionResponse response = 2368 rss.reportRegionStateTransition(null, request); 2369 if (response.hasErrorMessage()) { 2370 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2371 break; 2372 } 2373 // Log if we had to retry else don't log unless TRACE. We want to 2374 // know if were successful after an attempt showed in logs as failed. 2375 if (tries > 0 || LOG.isTraceEnabled()) { 2376 LOG.info("TRANSITION REPORTED " + request); 2377 } 2378 // NOTE: Return mid-method!!! 2379 return true; 2380 } catch (ServiceException se) { 2381 IOException ioe = ProtobufUtil.getRemoteException(se); 2382 boolean pause = ioe instanceof ServerNotRunningYetException 2383 || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException; 2384 if (pause) { 2385 // Do backoff else we flood the Master with requests. 2386 pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries); 2387 } else { 2388 pauseTime = this.retryPauseTime; // Reset. 2389 } 2390 LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" 2391 + tries + ")" 2392 + (pause 2393 ? " after " + pauseTime + "ms delay (Master is coming online...)." 2394 : " immediately."), 2395 ioe); 2396 if (pause) { 2397 Threads.sleep(pauseTime); 2398 } 2399 tries++; 2400 if (rssStub == rss) { 2401 rssStub = null; 2402 } 2403 } 2404 } 2405 return false; 2406 } 2407 2408 /** 2409 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2410 * block this thread. See RegionReplicaFlushHandler for details. 2411 */ 2412 private void triggerFlushInPrimaryRegion(final HRegion region) { 2413 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2414 return; 2415 } 2416 TableName tn = region.getTableDescriptor().getTableName(); 2417 if ( 2418 !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) 2419 || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) || 2420 // If the memstore replication not setup, we do not have to wait for observing a flush event 2421 // from primary before starting to serve reads, because gaps from replication is not 2422 // applicable,this logic is from 2423 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by 2424 // HBASE-13063 2425 !region.getTableDescriptor().hasRegionMemStoreReplication() 2426 ) { 2427 region.setReadsEnabled(true); 2428 return; 2429 } 2430 2431 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2432 // RegionReplicaFlushHandler might reset this. 2433 2434 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2435 if (this.executorService != null) { 2436 this.executorService.submit(new RegionReplicaFlushHandler(this, region)); 2437 } else { 2438 LOG.info("Executor is null; not running flush of primary region replica for {}", 2439 region.getRegionInfo()); 2440 } 2441 } 2442 2443 @InterfaceAudience.Private 2444 public RSRpcServices getRSRpcServices() { 2445 return rpcServices; 2446 } 2447 2448 /** 2449 * Cause the server to exit without closing the regions it is serving, the log it is using and 2450 * without notifying the master. Used unit testing and on catastrophic events such as HDFS is 2451 * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused 2452 * the abort, or null 2453 */ 2454 @Override 2455 public void abort(String reason, Throwable cause) { 2456 if (!setAbortRequested()) { 2457 // Abort already in progress, ignore the new request. 2458 LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason); 2459 return; 2460 } 2461 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2462 if (cause != null) { 2463 LOG.error(HBaseMarkers.FATAL, msg, cause); 2464 } else { 2465 LOG.error(HBaseMarkers.FATAL, msg); 2466 } 2467 // HBASE-4014: show list of coprocessors that were loaded to help debug 2468 // regionserver crashes.Note that we're implicitly using 2469 // java.util.HashSet's toString() method to print the coprocessor names. 2470 LOG.error(HBaseMarkers.FATAL, 2471 "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors()); 2472 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2473 try { 2474 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2475 } catch (MalformedObjectNameException | IOException e) { 2476 LOG.warn("Failed dumping metrics", e); 2477 } 2478 2479 // Do our best to report our abort to the master, but this may not work 2480 try { 2481 if (cause != null) { 2482 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2483 } 2484 // Report to the master but only if we have already registered with the master. 2485 RegionServerStatusService.BlockingInterface rss = rssStub; 2486 if (rss != null && this.serverName != null) { 2487 ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); 2488 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2489 builder.setErrorMessage(msg); 2490 rss.reportRSFatalError(null, builder.build()); 2491 } 2492 } catch (Throwable t) { 2493 LOG.warn("Unable to report fatal error to master", t); 2494 } 2495 2496 scheduleAbortTimer(); 2497 // shutdown should be run as the internal user 2498 stop(reason, true, null); 2499 } 2500 2501 /* 2502 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does 2503 * close socket in case want to bring up server on old hostname+port immediately. 2504 */ 2505 @InterfaceAudience.Private 2506 protected void kill() { 2507 this.killed = true; 2508 abort("Simulated kill"); 2509 } 2510 2511 // Limits the time spent in the shutdown process. 2512 private void scheduleAbortTimer() { 2513 if (this.abortMonitor == null) { 2514 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2515 TimerTask abortTimeoutTask = null; 2516 try { 2517 Constructor<? extends TimerTask> timerTaskCtor = 2518 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2519 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2520 timerTaskCtor.setAccessible(true); 2521 abortTimeoutTask = timerTaskCtor.newInstance(); 2522 } catch (Exception e) { 2523 LOG.warn("Initialize abort timeout task failed", e); 2524 } 2525 if (abortTimeoutTask != null) { 2526 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2527 } 2528 } 2529 } 2530 2531 /** 2532 * Wait on all threads to finish. Presumption is that all closes and stops have already been 2533 * called. 2534 */ 2535 protected void stopServiceThreads() { 2536 // clean up the scheduled chores 2537 stopChoreService(); 2538 if (bootstrapNodeManager != null) { 2539 bootstrapNodeManager.stop(); 2540 } 2541 if (this.cacheFlusher != null) { 2542 this.cacheFlusher.shutdown(); 2543 } 2544 if (this.walRoller != null) { 2545 this.walRoller.close(); 2546 } 2547 if (this.compactSplitThread != null) { 2548 this.compactSplitThread.join(); 2549 } 2550 stopExecutorService(); 2551 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 2552 this.replicationSourceHandler.stopReplicationService(); 2553 } else { 2554 if (this.replicationSourceHandler != null) { 2555 this.replicationSourceHandler.stopReplicationService(); 2556 } 2557 if (this.replicationSinkHandler != null) { 2558 this.replicationSinkHandler.stopReplicationService(); 2559 } 2560 } 2561 } 2562 2563 /** Returns Return the object that implements the replication source executorService. */ 2564 @Override 2565 public ReplicationSourceService getReplicationSourceService() { 2566 return replicationSourceHandler; 2567 } 2568 2569 /** Returns Return the object that implements the replication sink executorService. */ 2570 public ReplicationSinkService getReplicationSinkService() { 2571 return replicationSinkHandler; 2572 } 2573 2574 /** 2575 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2576 * connection, the current rssStub must be null. Method will block until a master is available. 2577 * You can break from this block by requesting the server stop. 2578 * @return master + port, or null if server has been stopped 2579 */ 2580 private synchronized ServerName createRegionServerStatusStub() { 2581 // Create RS stub without refreshing the master node from ZK, use cached data 2582 return createRegionServerStatusStub(false); 2583 } 2584 2585 /** 2586 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2587 * connection, the current rssStub must be null. Method will block until a master is available. 2588 * You can break from this block by requesting the server stop. 2589 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2590 * @return master + port, or null if server has been stopped 2591 */ 2592 @InterfaceAudience.Private 2593 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2594 if (rssStub != null) { 2595 return masterAddressTracker.getMasterAddress(); 2596 } 2597 ServerName sn = null; 2598 long previousLogTime = 0; 2599 RegionServerStatusService.BlockingInterface intRssStub = null; 2600 LockService.BlockingInterface intLockStub = null; 2601 boolean interrupted = false; 2602 try { 2603 while (keepLooping()) { 2604 sn = this.masterAddressTracker.getMasterAddress(refresh); 2605 if (sn == null) { 2606 if (!keepLooping()) { 2607 // give up with no connection. 2608 LOG.debug("No master found and cluster is stopped; bailing out"); 2609 return null; 2610 } 2611 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2612 LOG.debug("No master found; retry"); 2613 previousLogTime = EnvironmentEdgeManager.currentTime(); 2614 } 2615 refresh = true; // let's try pull it from ZK directly 2616 if (sleepInterrupted(200)) { 2617 interrupted = true; 2618 } 2619 continue; 2620 } 2621 try { 2622 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, 2623 userProvider.getCurrent(), shortOperationTimeout); 2624 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2625 intLockStub = LockService.newBlockingStub(channel); 2626 break; 2627 } catch (IOException e) { 2628 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2629 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 2630 if (e instanceof ServerNotRunningYetException) { 2631 LOG.info("Master isn't available yet, retrying"); 2632 } else { 2633 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2634 } 2635 previousLogTime = EnvironmentEdgeManager.currentTime(); 2636 } 2637 if (sleepInterrupted(200)) { 2638 interrupted = true; 2639 } 2640 } 2641 } 2642 } finally { 2643 if (interrupted) { 2644 Thread.currentThread().interrupt(); 2645 } 2646 } 2647 this.rssStub = intRssStub; 2648 this.lockStub = intLockStub; 2649 return sn; 2650 } 2651 2652 /** 2653 * @return True if we should break loop because cluster is going down or this server has been 2654 * stopped or hdfs has gone bad. 2655 */ 2656 private boolean keepLooping() { 2657 return !this.stopped && isClusterUp(); 2658 } 2659 2660 /* 2661 * Let the master know we're here Run initialization using parameters passed us by the master. 2662 * @return A Map of key/value configurations we got from the Master else null if we failed to 2663 * register. 2664 */ 2665 private RegionServerStartupResponse reportForDuty() throws IOException { 2666 if (this.masterless) { 2667 return RegionServerStartupResponse.getDefaultInstance(); 2668 } 2669 ServerName masterServerName = createRegionServerStatusStub(true); 2670 RegionServerStatusService.BlockingInterface rss = rssStub; 2671 if (masterServerName == null || rss == null) { 2672 return null; 2673 } 2674 RegionServerStartupResponse result = null; 2675 try { 2676 rpcServices.requestCount.reset(); 2677 rpcServices.rpcGetRequestCount.reset(); 2678 rpcServices.rpcScanRequestCount.reset(); 2679 rpcServices.rpcFullScanRequestCount.reset(); 2680 rpcServices.rpcMultiRequestCount.reset(); 2681 rpcServices.rpcMutateRequestCount.reset(); 2682 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2683 + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode); 2684 long now = EnvironmentEdgeManager.currentTime(); 2685 int port = rpcServices.getSocketAddress().getPort(); 2686 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2687 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2688 request.setUseThisHostnameInstead(useThisHostnameInstead); 2689 } 2690 request.setPort(port); 2691 request.setServerStartCode(this.startcode); 2692 request.setServerCurrentTime(now); 2693 result = rss.regionServerStartup(null, request.build()); 2694 } catch (ServiceException se) { 2695 IOException ioe = ProtobufUtil.getRemoteException(se); 2696 if (ioe instanceof ClockOutOfSyncException) { 2697 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe); 2698 // Re-throw IOE will cause RS to abort 2699 throw ioe; 2700 } else if (ioe instanceof DecommissionedHostRejectedException) { 2701 LOG.error(HBaseMarkers.FATAL, 2702 "Master rejected startup because the host is considered decommissioned", ioe); 2703 // Re-throw IOE will cause RS to abort 2704 throw ioe; 2705 } else if (ioe instanceof ServerNotRunningYetException) { 2706 LOG.debug("Master is not running yet"); 2707 } else { 2708 LOG.warn("error telling master we are up", se); 2709 } 2710 rssStub = null; 2711 } 2712 return result; 2713 } 2714 2715 @Override 2716 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2717 try { 2718 GetLastFlushedSequenceIdRequest req = 2719 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2720 RegionServerStatusService.BlockingInterface rss = rssStub; 2721 if (rss == null) { // Try to connect one more time 2722 createRegionServerStatusStub(); 2723 rss = rssStub; 2724 if (rss == null) { 2725 // Still no luck, we tried 2726 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2727 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2728 .build(); 2729 } 2730 } 2731 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2732 return RegionStoreSequenceIds.newBuilder() 2733 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2734 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2735 } catch (ServiceException e) { 2736 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2737 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2738 .build(); 2739 } 2740 } 2741 2742 /** 2743 * Close meta region if we carry it 2744 * @param abort Whether we're running an abort. 2745 */ 2746 private void closeMetaTableRegions(final boolean abort) { 2747 HRegion meta = null; 2748 this.onlineRegionsLock.writeLock().lock(); 2749 try { 2750 for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) { 2751 RegionInfo hri = e.getValue().getRegionInfo(); 2752 if (hri.isMetaRegion()) { 2753 meta = e.getValue(); 2754 } 2755 if (meta != null) { 2756 break; 2757 } 2758 } 2759 } finally { 2760 this.onlineRegionsLock.writeLock().unlock(); 2761 } 2762 if (meta != null) { 2763 closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2764 } 2765 } 2766 2767 /** 2768 * Schedule closes on all user regions. Should be safe calling multiple times because it wont' 2769 * close regions that are already closed or that are closing. 2770 * @param abort Whether we're running an abort. 2771 */ 2772 private void closeUserRegions(final boolean abort) { 2773 this.onlineRegionsLock.writeLock().lock(); 2774 try { 2775 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 2776 HRegion r = e.getValue(); 2777 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2778 // Don't update zk with this close transition; pass false. 2779 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2780 } 2781 } 2782 } finally { 2783 this.onlineRegionsLock.writeLock().unlock(); 2784 } 2785 } 2786 2787 protected Map<String, HRegion> getOnlineRegions() { 2788 return this.onlineRegions; 2789 } 2790 2791 public int getNumberOfOnlineRegions() { 2792 return this.onlineRegions.size(); 2793 } 2794 2795 /** 2796 * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM 2797 * as client; HRegion cannot be serialized to cross an rpc. 2798 */ 2799 public Collection<HRegion> getOnlineRegionsLocalContext() { 2800 Collection<HRegion> regions = this.onlineRegions.values(); 2801 return Collections.unmodifiableCollection(regions); 2802 } 2803 2804 @Override 2805 public void addRegion(HRegion region) { 2806 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2807 configurationManager.registerObserver(region); 2808 } 2809 2810 private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region, 2811 long size) { 2812 if (!sortedRegions.containsKey(size)) { 2813 sortedRegions.put(size, new ArrayList<>()); 2814 } 2815 sortedRegions.get(size).add(region); 2816 } 2817 2818 /** 2819 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2820 * the biggest. 2821 */ 2822 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2823 // we'll sort the regions in reverse 2824 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2825 // Copy over all regions. Regions are sorted by size with biggest first. 2826 for (HRegion region : this.onlineRegions.values()) { 2827 addRegion(sortedRegions, region, region.getMemStoreOffHeapSize()); 2828 } 2829 return sortedRegions; 2830 } 2831 2832 /** 2833 * @return A new Map of online regions sorted by region heap size with the first entry being the 2834 * biggest. 2835 */ 2836 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2837 // we'll sort the regions in reverse 2838 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2839 // Copy over all regions. Regions are sorted by size with biggest first. 2840 for (HRegion region : this.onlineRegions.values()) { 2841 addRegion(sortedRegions, region, region.getMemStoreHeapSize()); 2842 } 2843 return sortedRegions; 2844 } 2845 2846 /** Returns reference to FlushRequester */ 2847 @Override 2848 public FlushRequester getFlushRequester() { 2849 return this.cacheFlusher; 2850 } 2851 2852 @Override 2853 public CompactionRequester getCompactionRequestor() { 2854 return this.compactSplitThread; 2855 } 2856 2857 @Override 2858 public LeaseManager getLeaseManager() { 2859 return leaseManager; 2860 } 2861 2862 /** Returns {@code true} when the data file system is available, {@code false} otherwise. */ 2863 boolean isDataFileSystemOk() { 2864 return this.dataFsOk; 2865 } 2866 2867 public RegionServerCoprocessorHost getRegionServerCoprocessorHost() { 2868 return this.rsHost; 2869 } 2870 2871 @Override 2872 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2873 return this.regionsInTransitionInRS; 2874 } 2875 2876 @Override 2877 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 2878 return rsQuotaManager; 2879 } 2880 2881 // 2882 // Main program and support routines 2883 // 2884 /** 2885 * Load the replication executorService objects, if any 2886 */ 2887 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 2888 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { 2889 // read in the name of the source replication class from the config file. 2890 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 2891 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2892 2893 // read in the name of the sink replication class from the config file. 2894 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 2895 HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT); 2896 2897 // If both the sink and the source class names are the same, then instantiate 2898 // only one object. 2899 if (sourceClassname.equals(sinkClassname)) { 2900 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2901 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2902 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 2903 server.sameReplicationSourceAndSink = true; 2904 } else { 2905 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2906 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2907 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 2908 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2909 server.sameReplicationSourceAndSink = false; 2910 } 2911 } 2912 2913 private static <T extends ReplicationService> T newReplicationInstance(String classname, 2914 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 2915 Path oldLogDir, WALFactory walFactory) throws IOException { 2916 final Class<? extends T> clazz; 2917 try { 2918 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 2919 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 2920 } catch (java.lang.ClassNotFoundException nfe) { 2921 throw new IOException("Could not find class for " + classname); 2922 } 2923 T service = ReflectionUtils.newInstance(clazz, conf); 2924 service.initialize(server, walFs, logDir, oldLogDir, walFactory); 2925 return service; 2926 } 2927 2928 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() { 2929 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 2930 if (!this.isOnline()) { 2931 return walGroupsReplicationStatus; 2932 } 2933 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 2934 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 2935 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 2936 for (ReplicationSourceInterface source : allSources) { 2937 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 2938 } 2939 return walGroupsReplicationStatus; 2940 } 2941 2942 /** 2943 * Utility for constructing an instance of the passed HRegionServer class. 2944 */ 2945 static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass, 2946 final Configuration conf) { 2947 try { 2948 Constructor<? extends HRegionServer> c = 2949 regionServerClass.getConstructor(Configuration.class); 2950 return c.newInstance(conf); 2951 } catch (Exception e) { 2952 throw new RuntimeException( 2953 "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); 2954 } 2955 } 2956 2957 /** 2958 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 2959 */ 2960 public static void main(String[] args) { 2961 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 2962 VersionInfo.logVersion(); 2963 Configuration conf = HBaseConfiguration.create(); 2964 @SuppressWarnings("unchecked") 2965 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 2966 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 2967 2968 new HRegionServerCommandLine(regionServerClass).doMain(args); 2969 } 2970 2971 /** 2972 * Gets the online regions of the specified table. This method looks at the in-memory 2973 * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions. 2974 * If a region on this table has been closed during a disable, etc., it will not be included in 2975 * the returned list. So, the returned list may not necessarily be ALL regions in this table, its 2976 * all the ONLINE regions in the table. 2977 * @param tableName table to limit the scope of the query 2978 * @return Online regions from <code>tableName</code> 2979 */ 2980 @Override 2981 public List<HRegion> getRegions(TableName tableName) { 2982 List<HRegion> tableRegions = new ArrayList<>(); 2983 synchronized (this.onlineRegions) { 2984 for (HRegion region : this.onlineRegions.values()) { 2985 RegionInfo regionInfo = region.getRegionInfo(); 2986 if (regionInfo.getTable().equals(tableName)) { 2987 tableRegions.add(region); 2988 } 2989 } 2990 } 2991 return tableRegions; 2992 } 2993 2994 @Override 2995 public List<HRegion> getRegions() { 2996 List<HRegion> allRegions; 2997 synchronized (this.onlineRegions) { 2998 // Return a clone copy of the onlineRegions 2999 allRegions = new ArrayList<>(onlineRegions.values()); 3000 } 3001 return allRegions; 3002 } 3003 3004 /** 3005 * Gets the online tables in this RS. This method looks at the in-memory onlineRegions. 3006 * @return all the online tables in this RS 3007 */ 3008 public Set<TableName> getOnlineTables() { 3009 Set<TableName> tables = new HashSet<>(); 3010 synchronized (this.onlineRegions) { 3011 for (Region region : this.onlineRegions.values()) { 3012 tables.add(region.getTableDescriptor().getTableName()); 3013 } 3014 } 3015 return tables; 3016 } 3017 3018 public String[] getRegionServerCoprocessors() { 3019 TreeSet<String> coprocessors = new TreeSet<>(); 3020 try { 3021 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3022 } catch (IOException exception) { 3023 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " 3024 + "skipping."); 3025 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3026 } 3027 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3028 for (HRegion region : regions) { 3029 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3030 try { 3031 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3032 } catch (IOException exception) { 3033 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region 3034 + "; skipping."); 3035 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3036 } 3037 } 3038 coprocessors.addAll(rsHost.getCoprocessors()); 3039 return coprocessors.toArray(new String[0]); 3040 } 3041 3042 /** 3043 * Try to close the region, logs a warning on failure but continues. 3044 * @param region Region to close 3045 */ 3046 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3047 try { 3048 if (!closeRegion(region.getEncodedName(), abort, null)) { 3049 LOG 3050 .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing"); 3051 } 3052 } catch (IOException e) { 3053 LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing", 3054 e); 3055 } 3056 } 3057 3058 /** 3059 * Close asynchronously a region, can be called from the master or internally by the regionserver 3060 * when stopping. If called from the master, the region will update the status. 3061 * <p> 3062 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3063 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3064 * </p> 3065 * <p> 3066 * If a close was in progress, this new request will be ignored, and an exception thrown. 3067 * </p> 3068 * <p> 3069 * Provides additional flag to indicate if this region blocks should be evicted from the cache. 3070 * </p> 3071 * @param encodedName Region to close 3072 * @param abort True if we are aborting 3073 * @param destination Where the Region is being moved too... maybe null if unknown. 3074 * @return True if closed a region. 3075 * @throws NotServingRegionException if the region is not online 3076 */ 3077 protected boolean closeRegion(String encodedName, final boolean abort, 3078 final ServerName destination) throws NotServingRegionException { 3079 // Check for permissions to close. 3080 HRegion actualRegion = this.getRegion(encodedName); 3081 // Can be null if we're calling close on a region that's not online 3082 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3083 try { 3084 actualRegion.getCoprocessorHost().preClose(false); 3085 } catch (IOException exp) { 3086 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3087 return false; 3088 } 3089 } 3090 3091 // previous can come back 'null' if not in map. 3092 final Boolean previous = 3093 this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE); 3094 3095 if (Boolean.TRUE.equals(previous)) { 3096 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " 3097 + "trying to OPEN. Cancelling OPENING."); 3098 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3099 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3100 // We're going to try to do a standard close then. 3101 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." 3102 + " Doing a standard close now"); 3103 return closeRegion(encodedName, abort, destination); 3104 } 3105 // Let's get the region from the online region list again 3106 actualRegion = this.getRegion(encodedName); 3107 if (actualRegion == null) { // If already online, we still need to close it. 3108 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3109 // The master deletes the znode when it receives this exception. 3110 throw new NotServingRegionException( 3111 "The region " + encodedName + " was opening but not yet served. Opening is cancelled."); 3112 } 3113 } else if (previous == null) { 3114 LOG.info("Received CLOSE for {}", encodedName); 3115 } else if (Boolean.FALSE.equals(previous)) { 3116 LOG.info("Received CLOSE for the region: " + encodedName 3117 + ", which we are already trying to CLOSE, but not completed yet"); 3118 return true; 3119 } 3120 3121 if (actualRegion == null) { 3122 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3123 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3124 // The master deletes the znode when it receives this exception. 3125 throw new NotServingRegionException( 3126 "The region " + encodedName + " is not online, and is not opening."); 3127 } 3128 3129 CloseRegionHandler crh; 3130 final RegionInfo hri = actualRegion.getRegionInfo(); 3131 if (hri.isMetaRegion()) { 3132 crh = new CloseMetaHandler(this, this, hri, abort); 3133 } else { 3134 crh = new CloseRegionHandler(this, this, hri, abort, destination); 3135 } 3136 this.executorService.submit(crh); 3137 return true; 3138 } 3139 3140 /** 3141 * @return HRegion for the passed binary <code>regionName</code> or null if named region is not 3142 * member of the online regions. 3143 */ 3144 public HRegion getOnlineRegion(final byte[] regionName) { 3145 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3146 return this.onlineRegions.get(encodedRegionName); 3147 } 3148 3149 @Override 3150 public HRegion getRegion(final String encodedRegionName) { 3151 return this.onlineRegions.get(encodedRegionName); 3152 } 3153 3154 @Override 3155 public boolean removeRegion(final HRegion r, ServerName destination) { 3156 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3157 if (DataTieringManager.getInstance() != null) { 3158 DataTieringManager.getInstance().getRegionColdDataSize() 3159 .remove(r.getRegionInfo().getEncodedName()); 3160 } 3161 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3162 if (destination != null) { 3163 long closeSeqNum = r.getMaxFlushedSeqId(); 3164 if (closeSeqNum == HConstants.NO_SEQNUM) { 3165 // No edits in WAL for this region; get the sequence number when the region was opened. 3166 closeSeqNum = r.getOpenSeqNum(); 3167 if (closeSeqNum == HConstants.NO_SEQNUM) { 3168 closeSeqNum = 0; 3169 } 3170 } 3171 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3172 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3173 if (selfMove) { 3174 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put( 3175 r.getRegionInfo().getEncodedName(), 3176 new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3177 } 3178 } 3179 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3180 configurationManager.deregisterObserver(r); 3181 return toReturn != null; 3182 } 3183 3184 /** 3185 * Protected Utility method for safely obtaining an HRegion handle. 3186 * @param regionName Name of online {@link HRegion} to return 3187 * @return {@link HRegion} for <code>regionName</code> 3188 */ 3189 protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { 3190 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3191 return getRegionByEncodedName(regionName, encodedRegionName); 3192 } 3193 3194 public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { 3195 return getRegionByEncodedName(null, encodedRegionName); 3196 } 3197 3198 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3199 throws NotServingRegionException { 3200 HRegion region = this.onlineRegions.get(encodedRegionName); 3201 if (region == null) { 3202 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3203 if (moveInfo != null) { 3204 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3205 } 3206 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3207 String regionNameStr = 3208 regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName); 3209 if (isOpening != null && isOpening) { 3210 throw new RegionOpeningException( 3211 "Region " + regionNameStr + " is opening on " + this.serverName); 3212 } 3213 throw new NotServingRegionException( 3214 "" + regionNameStr + " is not online on " + this.serverName); 3215 } 3216 return region; 3217 } 3218 3219 /** 3220 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't 3221 * already. 3222 * @param t Throwable 3223 * @param msg Message to log in error. Can be null. 3224 * @return Throwable converted to an IOE; methods can only let out IOEs. 3225 */ 3226 private Throwable cleanup(final Throwable t, final String msg) { 3227 // Don't log as error if NSRE; NSRE is 'normal' operation. 3228 if (t instanceof NotServingRegionException) { 3229 LOG.debug("NotServingRegionException; " + t.getMessage()); 3230 return t; 3231 } 3232 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3233 if (msg == null) { 3234 LOG.error("", e); 3235 } else { 3236 LOG.error(msg, e); 3237 } 3238 if (!rpcServices.checkOOME(t)) { 3239 checkFileSystem(); 3240 } 3241 return t; 3242 } 3243 3244 /** 3245 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3246 * @return Make <code>t</code> an IOE if it isn't already. 3247 */ 3248 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3249 return (t instanceof IOException ? (IOException) t 3250 : msg == null || msg.length() == 0 ? new IOException(t) 3251 : new IOException(msg, t)); 3252 } 3253 3254 /** 3255 * Checks to see if the file system is still accessible. If not, sets abortRequested and 3256 * stopRequested 3257 * @return false if file system is not available 3258 */ 3259 boolean checkFileSystem() { 3260 if (this.dataFsOk && this.dataFs != null) { 3261 try { 3262 FSUtils.checkFileSystemAvailable(this.dataFs); 3263 } catch (IOException e) { 3264 abort("File System not available", e); 3265 this.dataFsOk = false; 3266 } 3267 } 3268 return this.dataFsOk; 3269 } 3270 3271 @Override 3272 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3273 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3274 Address[] addr = new Address[favoredNodes.size()]; 3275 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3276 // it is a map of region name to Address[] 3277 for (int i = 0; i < favoredNodes.size(); i++) { 3278 addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); 3279 } 3280 regionFavoredNodesMap.put(encodedRegionName, addr); 3281 } 3282 3283 /** 3284 * Return the favored nodes for a region given its encoded name. Look at the comment around 3285 * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here. 3286 * @param encodedRegionName the encoded region name. 3287 * @return array of favored locations 3288 */ 3289 @Override 3290 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3291 return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); 3292 } 3293 3294 @Override 3295 public ServerNonceManager getNonceManager() { 3296 return this.nonceManager; 3297 } 3298 3299 private static class MovedRegionInfo { 3300 private final ServerName serverName; 3301 private final long seqNum; 3302 3303 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3304 this.serverName = serverName; 3305 this.seqNum = closeSeqNum; 3306 } 3307 3308 public ServerName getServerName() { 3309 return serverName; 3310 } 3311 3312 public long getSeqNum() { 3313 return seqNum; 3314 } 3315 } 3316 3317 /** 3318 * We need a timeout. If not there is a risk of giving a wrong information: this would double the 3319 * number of network calls instead of reducing them. 3320 */ 3321 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3322 3323 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, 3324 boolean selfMove) { 3325 if (selfMove) { 3326 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3327 return; 3328 } 3329 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" 3330 + closeSeqNum); 3331 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3332 } 3333 3334 void removeFromMovedRegions(String encodedName) { 3335 movedRegionInfoCache.invalidate(encodedName); 3336 } 3337 3338 @InterfaceAudience.Private 3339 public MovedRegionInfo getMovedRegion(String encodedRegionName) { 3340 return movedRegionInfoCache.getIfPresent(encodedRegionName); 3341 } 3342 3343 @InterfaceAudience.Private 3344 public int movedRegionCacheExpiredTime() { 3345 return TIMEOUT_REGION_MOVED; 3346 } 3347 3348 private String getMyEphemeralNodePath() { 3349 return zooKeeper.getZNodePaths().getRsPath(serverName); 3350 } 3351 3352 private boolean isHealthCheckerConfigured() { 3353 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3354 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3355 } 3356 3357 /** Returns the underlying {@link CompactSplit} for the servers */ 3358 public CompactSplit getCompactSplitThread() { 3359 return this.compactSplitThread; 3360 } 3361 3362 CoprocessorServiceResponse execRegionServerService( 3363 @SuppressWarnings("UnusedParameters") final RpcController controller, 3364 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3365 try { 3366 ServerRpcController serviceController = new ServerRpcController(); 3367 CoprocessorServiceCall call = serviceRequest.getCall(); 3368 String serviceName = call.getServiceName(); 3369 Service service = coprocessorServiceHandlers.get(serviceName); 3370 if (service == null) { 3371 throw new UnknownProtocolException(null, 3372 "No registered coprocessor executorService found for " + serviceName); 3373 } 3374 ServiceDescriptor serviceDesc = service.getDescriptorForType(); 3375 3376 String methodName = call.getMethodName(); 3377 MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName); 3378 if (methodDesc == null) { 3379 throw new UnknownProtocolException(service.getClass(), 3380 "Unknown method " + methodName + " called on executorService " + serviceName); 3381 } 3382 3383 Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3384 final Message.Builder responseBuilder = 3385 service.getResponsePrototype(methodDesc).newBuilderForType(); 3386 service.callMethod(methodDesc, serviceController, request, message -> { 3387 if (message != null) { 3388 responseBuilder.mergeFrom(message); 3389 } 3390 }); 3391 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3392 if (exception != null) { 3393 throw exception; 3394 } 3395 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3396 } catch (IOException ie) { 3397 throw new ServiceException(ie); 3398 } 3399 } 3400 3401 /** 3402 * May be null if this is a master which not carry table. 3403 * @return The block cache instance used by the regionserver. 3404 */ 3405 @Override 3406 public Optional<BlockCache> getBlockCache() { 3407 return Optional.ofNullable(this.blockCache); 3408 } 3409 3410 /** 3411 * May be null if this is a master which not carry table. 3412 * @return The cache for mob files used by the regionserver. 3413 */ 3414 @Override 3415 public Optional<MobFileCache> getMobFileCache() { 3416 return Optional.ofNullable(this.mobFileCache); 3417 } 3418 3419 CacheEvictionStats clearRegionBlockCache(Region region) { 3420 long evictedBlocks = 0; 3421 3422 for (Store store : region.getStores()) { 3423 for (StoreFile hFile : store.getStorefiles()) { 3424 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3425 } 3426 } 3427 3428 return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build(); 3429 } 3430 3431 @Override 3432 public double getCompactionPressure() { 3433 double max = 0; 3434 for (Region region : onlineRegions.values()) { 3435 for (Store store : region.getStores()) { 3436 double normCount = store.getCompactionPressure(); 3437 if (normCount > max) { 3438 max = normCount; 3439 } 3440 } 3441 } 3442 return max; 3443 } 3444 3445 @Override 3446 public HeapMemoryManager getHeapMemoryManager() { 3447 return hMemManager; 3448 } 3449 3450 public MemStoreFlusher getMemStoreFlusher() { 3451 return cacheFlusher; 3452 } 3453 3454 /** 3455 * For testing 3456 * @return whether all wal roll request finished for this regionserver 3457 */ 3458 @InterfaceAudience.Private 3459 public boolean walRollRequestFinished() { 3460 return this.walRoller.walRollFinished(); 3461 } 3462 3463 @Override 3464 public ThroughputController getFlushThroughputController() { 3465 return flushThroughputController; 3466 } 3467 3468 @Override 3469 public double getFlushPressure() { 3470 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3471 // return 0 during RS initialization 3472 return 0.0; 3473 } 3474 return getRegionServerAccounting().getFlushPressure(); 3475 } 3476 3477 @Override 3478 public void onConfigurationChange(Configuration newConf) { 3479 ThroughputController old = this.flushThroughputController; 3480 if (old != null) { 3481 old.stop("configuration change"); 3482 } 3483 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3484 try { 3485 Superusers.initialize(newConf); 3486 } catch (IOException e) { 3487 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3488 } 3489 3490 // update region server coprocessor if the configuration has changed. 3491 if ( 3492 CoprocessorConfigurationUtil.checkConfigurationChange(this.rsHost, newConf, 3493 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY) 3494 ) { 3495 LOG.info("Update region server coprocessors because the configuration has changed"); 3496 this.rsHost = new RegionServerCoprocessorHost(this, newConf); 3497 } 3498 } 3499 3500 @Override 3501 public MetricsRegionServer getMetrics() { 3502 return metricsRegionServer; 3503 } 3504 3505 @Override 3506 public SecureBulkLoadManager getSecureBulkLoadManager() { 3507 return this.secureBulkLoadManager; 3508 } 3509 3510 @Override 3511 public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description, 3512 final Abortable abort) { 3513 final LockServiceClient client = 3514 new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator()); 3515 return client.regionLock(regionInfo, description, abort); 3516 } 3517 3518 @Override 3519 public void unassign(byte[] regionName) throws IOException { 3520 FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false)); 3521 } 3522 3523 @Override 3524 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3525 return this.rsSpaceQuotaManager; 3526 } 3527 3528 @Override 3529 public boolean reportFileArchivalForQuotas(TableName tableName, 3530 Collection<Entry<String, Long>> archivedFiles) { 3531 if (TEST_SKIP_REPORTING_TRANSITION) { 3532 return false; 3533 } 3534 RegionServerStatusService.BlockingInterface rss = rssStub; 3535 if (rss == null || rsSpaceQuotaManager == null) { 3536 // the current server could be stopping. 3537 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3538 return false; 3539 } 3540 try { 3541 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3542 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3543 rss.reportFileArchival(null, request); 3544 } catch (ServiceException se) { 3545 IOException ioe = ProtobufUtil.getRemoteException(se); 3546 if (ioe instanceof PleaseHoldException) { 3547 if (LOG.isTraceEnabled()) { 3548 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3549 + " This will be retried.", ioe); 3550 } 3551 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3552 return false; 3553 } 3554 if (rssStub == rss) { 3555 rssStub = null; 3556 } 3557 // re-create the stub if we failed to report the archival 3558 createRegionServerStatusStub(true); 3559 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3560 return false; 3561 } 3562 return true; 3563 } 3564 3565 void executeProcedure(long procId, long initiatingMasterActiveTime, 3566 RSProcedureCallable callable) { 3567 executorService 3568 .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable)); 3569 } 3570 3571 public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, Throwable error, 3572 byte[] procResultData) { 3573 procedureResultReporter.complete(procId, initiatingMasterActiveTime, error, procResultData); 3574 } 3575 3576 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3577 RegionServerStatusService.BlockingInterface rss; 3578 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 3579 for (;;) { 3580 rss = rssStub; 3581 if (rss != null) { 3582 break; 3583 } 3584 createRegionServerStatusStub(); 3585 } 3586 try { 3587 rss.reportProcedureDone(null, request); 3588 } catch (ServiceException se) { 3589 if (rssStub == rss) { 3590 rssStub = null; 3591 } 3592 throw ProtobufUtil.getRemoteException(se); 3593 } 3594 } 3595 3596 /** 3597 * Will ignore the open/close region procedures which already submitted or executed. When master 3598 * had unfinished open/close region procedure and restarted, new active master may send duplicate 3599 * open/close region request to regionserver. The open/close request is submitted to a thread pool 3600 * and execute. So first need a cache for submitted open/close region procedures. After the 3601 * open/close region request executed and report region transition succeed, cache it in executed 3602 * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3603 * transition succeed, master will not send the open/close region request to regionserver again. 3604 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3605 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See 3606 * HBASE-22404 for more details. 3607 * @param procId the id of the open/close region procedure 3608 * @return true if the procedure can be submitted. 3609 */ 3610 boolean submitRegionProcedure(long procId) { 3611 if (procId == -1) { 3612 return true; 3613 } 3614 // Ignore the region procedures which already submitted. 3615 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3616 if (previous != null) { 3617 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3618 return false; 3619 } 3620 // Ignore the region procedures which already executed. 3621 if (executedRegionProcedures.getIfPresent(procId) != null) { 3622 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3623 return false; 3624 } 3625 return true; 3626 } 3627 3628 /** 3629 * See {@link #submitRegionProcedure(long)}. 3630 * @param procId the id of the open/close region procedure 3631 */ 3632 public void finishRegionProcedure(long procId) { 3633 executedRegionProcedures.put(procId, procId); 3634 submittedRegionProcedures.remove(procId); 3635 } 3636 3637 /** 3638 * Force to terminate region server when abort timeout. 3639 */ 3640 private static class SystemExitWhenAbortTimeout extends TimerTask { 3641 3642 public SystemExitWhenAbortTimeout() { 3643 } 3644 3645 @Override 3646 public void run() { 3647 LOG.warn("Aborting region server timed out, terminating forcibly" 3648 + " and does not wait for any running shutdown hooks or finalizers to finish their work." 3649 + " Thread dump to stdout."); 3650 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3651 Runtime.getRuntime().halt(1); 3652 } 3653 } 3654 3655 @InterfaceAudience.Private 3656 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 3657 return compactedFileDischarger; 3658 } 3659 3660 /** 3661 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}} 3662 * @return pause time 3663 */ 3664 @InterfaceAudience.Private 3665 public long getRetryPauseTime() { 3666 return this.retryPauseTime; 3667 } 3668 3669 @Override 3670 public Optional<ServerName> getActiveMaster() { 3671 return Optional.ofNullable(masterAddressTracker.getMasterAddress()); 3672 } 3673 3674 @Override 3675 public List<ServerName> getBackupMasters() { 3676 return masterAddressTracker.getBackupMasters(); 3677 } 3678 3679 @Override 3680 public Iterator<ServerName> getBootstrapNodes() { 3681 return bootstrapNodeManager.getBootstrapNodes().iterator(); 3682 } 3683 3684 @Override 3685 public List<HRegionLocation> getMetaLocations() { 3686 return metaRegionLocationCache.getMetaRegionLocations(); 3687 } 3688 3689 @Override 3690 protected NamedQueueRecorder createNamedQueueRecord() { 3691 return NamedQueueRecorder.getInstance(conf); 3692 } 3693 3694 @Override 3695 protected boolean clusterMode() { 3696 // this method will be called in the constructor of super class, so we can not return masterless 3697 // directly here, as it will always be false. 3698 return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 3699 } 3700 3701 @InterfaceAudience.Private 3702 public BrokenStoreFileCleaner getBrokenStoreFileCleaner() { 3703 return brokenStoreFileCleaner; 3704 } 3705 3706 @InterfaceAudience.Private 3707 public RSMobFileCleanerChore getRSMobFileCleanerChore() { 3708 return rsMobFileCleanerChore; 3709 } 3710 3711 RSSnapshotVerifier getRsSnapshotVerifier() { 3712 return rsSnapshotVerifier; 3713 } 3714 3715 @Override 3716 protected void stopChores() { 3717 shutdownChore(nonceManagerChore); 3718 shutdownChore(compactionChecker); 3719 shutdownChore(compactedFileDischarger); 3720 shutdownChore(periodicFlusher); 3721 shutdownChore(healthCheckChore); 3722 shutdownChore(executorStatusChore); 3723 shutdownChore(storefileRefresher); 3724 shutdownChore(fsUtilizationChore); 3725 shutdownChore(namedQueueServiceChore); 3726 shutdownChore(brokenStoreFileCleaner); 3727 shutdownChore(rsMobFileCleanerChore); 3728 shutdownChore(replicationMarkerChore); 3729 } 3730 3731 @Override 3732 public RegionReplicationBufferManager getRegionReplicationBufferManager() { 3733 return regionReplicationBufferManager; 3734 } 3735}