001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT; 026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; 027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT; 028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY; 029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT; 030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; 031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; 032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; 033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; 034 035import io.opentelemetry.api.trace.Span; 036import io.opentelemetry.api.trace.StatusCode; 037import io.opentelemetry.context.Scope; 038import java.io.IOException; 039import java.io.PrintWriter; 040import java.lang.management.MemoryUsage; 041import java.lang.reflect.Constructor; 042import java.net.InetSocketAddress; 043import java.time.Duration; 044import java.util.ArrayList; 045import java.util.Collection; 046import java.util.Collections; 047import java.util.Comparator; 048import java.util.HashSet; 049import java.util.Iterator; 050import java.util.List; 051import java.util.Map; 052import java.util.Map.Entry; 053import java.util.Objects; 054import java.util.Optional; 055import java.util.Set; 056import java.util.SortedMap; 057import java.util.Timer; 058import java.util.TimerTask; 059import java.util.TreeMap; 060import java.util.TreeSet; 061import java.util.concurrent.ConcurrentHashMap; 062import java.util.concurrent.ConcurrentMap; 063import java.util.concurrent.ConcurrentSkipListMap; 064import java.util.concurrent.ThreadLocalRandom; 065import java.util.concurrent.TimeUnit; 066import java.util.concurrent.atomic.AtomicBoolean; 067import java.util.concurrent.locks.ReentrantReadWriteLock; 068import java.util.stream.Collectors; 069import javax.management.MalformedObjectNameException; 070import javax.servlet.http.HttpServlet; 071import org.apache.commons.lang3.StringUtils; 072import org.apache.hadoop.conf.Configuration; 073import org.apache.hadoop.fs.FileSystem; 074import org.apache.hadoop.fs.Path; 075import org.apache.hadoop.hbase.Abortable; 076import org.apache.hadoop.hbase.CacheEvictionStats; 077import org.apache.hadoop.hbase.CallQueueTooBigException; 078import org.apache.hadoop.hbase.ClockOutOfSyncException; 079import org.apache.hadoop.hbase.DoNotRetryIOException; 080import org.apache.hadoop.hbase.ExecutorStatusChore; 081import org.apache.hadoop.hbase.HBaseConfiguration; 082import org.apache.hadoop.hbase.HBaseInterfaceAudience; 083import org.apache.hadoop.hbase.HBaseServerBase; 084import org.apache.hadoop.hbase.HConstants; 085import org.apache.hadoop.hbase.HDFSBlocksDistribution; 086import org.apache.hadoop.hbase.HRegionLocation; 087import org.apache.hadoop.hbase.HealthCheckChore; 088import org.apache.hadoop.hbase.MetaTableAccessor; 089import org.apache.hadoop.hbase.NotServingRegionException; 090import org.apache.hadoop.hbase.PleaseHoldException; 091import org.apache.hadoop.hbase.ScheduledChore; 092import org.apache.hadoop.hbase.ServerName; 093import org.apache.hadoop.hbase.Stoppable; 094import org.apache.hadoop.hbase.TableName; 095import org.apache.hadoop.hbase.YouAreDeadException; 096import org.apache.hadoop.hbase.ZNodeClearer; 097import org.apache.hadoop.hbase.client.ConnectionUtils; 098import org.apache.hadoop.hbase.client.RegionInfo; 099import org.apache.hadoop.hbase.client.RegionInfoBuilder; 100import org.apache.hadoop.hbase.client.locking.EntityLock; 101import org.apache.hadoop.hbase.client.locking.LockServiceClient; 102import org.apache.hadoop.hbase.conf.ConfigurationObserver; 103import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 104import org.apache.hadoop.hbase.exceptions.RegionMovedException; 105import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 106import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 107import org.apache.hadoop.hbase.executor.ExecutorType; 108import org.apache.hadoop.hbase.http.InfoServer; 109import org.apache.hadoop.hbase.io.hfile.BlockCache; 110import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 111import org.apache.hadoop.hbase.io.hfile.HFile; 112import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 113import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 114import org.apache.hadoop.hbase.ipc.RpcClient; 115import org.apache.hadoop.hbase.ipc.RpcServer; 116import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 117import org.apache.hadoop.hbase.ipc.ServerRpcController; 118import org.apache.hadoop.hbase.log.HBaseMarkers; 119import org.apache.hadoop.hbase.mob.MobFileCache; 120import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; 121import org.apache.hadoop.hbase.monitoring.TaskMonitor; 122import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 123import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore; 124import org.apache.hadoop.hbase.net.Address; 125import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 126import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 127import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 128import org.apache.hadoop.hbase.quotas.QuotaUtil; 129import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 130import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 131import org.apache.hadoop.hbase.quotas.RegionSize; 132import org.apache.hadoop.hbase.quotas.RegionSizeStore; 133import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 134import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 135import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 136import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 137import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 138import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 139import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 140import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 141import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet; 142import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; 143import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; 144import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 145import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 146import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 147import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; 148import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 149import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; 150import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 151import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 152import org.apache.hadoop.hbase.security.SecurityConstants; 153import org.apache.hadoop.hbase.security.Superusers; 154import org.apache.hadoop.hbase.security.User; 155import org.apache.hadoop.hbase.security.UserProvider; 156import org.apache.hadoop.hbase.trace.TraceUtil; 157import org.apache.hadoop.hbase.util.Bytes; 158import org.apache.hadoop.hbase.util.CompressionTest; 159import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 160import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 161import org.apache.hadoop.hbase.util.FSUtils; 162import org.apache.hadoop.hbase.util.FutureUtils; 163import org.apache.hadoop.hbase.util.JvmPauseMonitor; 164import org.apache.hadoop.hbase.util.Pair; 165import org.apache.hadoop.hbase.util.RetryCounter; 166import org.apache.hadoop.hbase.util.RetryCounterFactory; 167import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 168import org.apache.hadoop.hbase.util.Threads; 169import org.apache.hadoop.hbase.util.VersionInfo; 170import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 171import org.apache.hadoop.hbase.wal.WAL; 172import org.apache.hadoop.hbase.wal.WALFactory; 173import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 174import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 175import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 176import org.apache.hadoop.hbase.zookeeper.ZKUtil; 177import org.apache.hadoop.ipc.RemoteException; 178import org.apache.hadoop.util.ReflectionUtils; 179import org.apache.yetus.audience.InterfaceAudience; 180import org.apache.zookeeper.KeeperException; 181import org.slf4j.Logger; 182import org.slf4j.LoggerFactory; 183 184import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 185import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 186import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 187import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 188import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 189import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses; 190import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 191import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 192import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 193import org.apache.hbase.thirdparty.com.google.protobuf.Message; 194import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 195import org.apache.hbase.thirdparty.com.google.protobuf.Service; 196import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 197import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 198import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 199 200import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 201import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 230 231/** 232 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There 233 * are many HRegionServers in a single HBase deployment. 234 */ 235@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 236@SuppressWarnings({ "deprecation" }) 237public class HRegionServer extends HBaseServerBase<RSRpcServices> 238 implements RegionServerServices, LastSequenceId { 239 240 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 241 242 /** 243 * For testing only! Set to true to skip notifying region assignment to master . 244 */ 245 @InterfaceAudience.Private 246 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") 247 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 248 249 /** 250 * A map from RegionName to current action in progress. Boolean value indicates: true - if open 251 * region action in progress false - if close region action in progress 252 */ 253 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 254 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 255 256 /** 257 * Used to cache the open/close region procedures which already submitted. See 258 * {@link #submitRegionProcedure(long)}. 259 */ 260 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 261 /** 262 * Used to cache the open/close region procedures which already executed. See 263 * {@link #submitRegionProcedure(long)}. 264 */ 265 private final Cache<Long, Long> executedRegionProcedures = 266 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 267 268 /** 269 * Used to cache the moved-out regions 270 */ 271 private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder() 272 .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build(); 273 274 private MemStoreFlusher cacheFlusher; 275 276 private HeapMemoryManager hMemManager; 277 278 // Replication services. If no replication, this handler will be null. 279 private ReplicationSourceService replicationSourceHandler; 280 private ReplicationSinkService replicationSinkHandler; 281 private boolean sameReplicationSourceAndSink; 282 283 // Compactions 284 private CompactSplit compactSplitThread; 285 286 /** 287 * Map of regions currently being served by this region server. Key is the encoded region name. 288 * All access should be synchronized. 289 */ 290 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 291 /** 292 * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it 293 * need to be a ConcurrentHashMap? 294 */ 295 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 296 297 /** 298 * Map of encoded region names to the DataNode locations they should be hosted on We store the 299 * value as Address since InetSocketAddress is required by the HDFS API (create() that takes 300 * favored nodes as hints for placing file blocks). We could have used ServerName here as the 301 * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API 302 * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and 303 * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the 304 * conversion on demand from Address to InetSocketAddress will guarantee the resolution results 305 * will be fresh when we need it. 306 */ 307 private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 308 309 private LeaseManager leaseManager; 310 311 private volatile boolean dataFsOk; 312 313 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 314 // Default abort timeout is 1200 seconds for safe 315 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 316 // Will run this task when abort timeout 317 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 318 319 // A state before we go into stopped state. At this stage we're closing user 320 // space regions. 321 private boolean stopping = false; 322 private volatile boolean killed = false; 323 324 private final int threadWakeFrequency; 325 326 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 327 private final int compactionCheckFrequency; 328 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 329 private final int flushCheckFrequency; 330 331 // Stub to do region server status calls against the master. 332 private volatile RegionServerStatusService.BlockingInterface rssStub; 333 private volatile LockService.BlockingInterface lockStub; 334 // RPC client. Used to make the stub above that does region server status checking. 335 private RpcClient rpcClient; 336 337 private UncaughtExceptionHandler uncaughtExceptionHandler; 338 339 private JvmPauseMonitor pauseMonitor; 340 341 private RSSnapshotVerifier rsSnapshotVerifier; 342 343 /** region server process name */ 344 public static final String REGIONSERVER = "regionserver"; 345 346 private MetricsRegionServer metricsRegionServer; 347 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 348 349 /** 350 * Check for compactions requests. 351 */ 352 private ScheduledChore compactionChecker; 353 354 /** 355 * Check for flushes 356 */ 357 private ScheduledChore periodicFlusher; 358 359 private volatile WALFactory walFactory; 360 361 private LogRoller walRoller; 362 363 // A thread which calls reportProcedureDone 364 private RemoteProcedureResultReporter procedureResultReporter; 365 366 // flag set after we're done setting up server threads 367 final AtomicBoolean online = new AtomicBoolean(false); 368 369 // master address tracker 370 private final MasterAddressTracker masterAddressTracker; 371 372 // Log Splitting Worker 373 private SplitLogWorker splitLogWorker; 374 375 private final int shortOperationTimeout; 376 377 // Time to pause if master says 'please hold' 378 private final long retryPauseTime; 379 380 private final RegionServerAccounting regionServerAccounting; 381 382 private NamedQueueServiceChore namedQueueServiceChore = null; 383 384 // Block cache 385 private BlockCache blockCache; 386 // The cache for mob files 387 private MobFileCache mobFileCache; 388 389 /** The health check chore. */ 390 private HealthCheckChore healthCheckChore; 391 392 /** The Executor status collect chore. */ 393 private ExecutorStatusChore executorStatusChore; 394 395 /** The nonce manager chore. */ 396 private ScheduledChore nonceManagerChore; 397 398 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap(); 399 400 /** 401 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use 402 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead. 403 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a> 404 */ 405 @Deprecated 406 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 407 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 408 "hbase.regionserver.hostname.disable.master.reversedns"; 409 410 /** 411 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive. 412 * Exception will be thrown if both are used. 413 */ 414 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 415 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 416 "hbase.unsafe.regionserver.hostname.disable.master.reversedns"; 417 418 /** 419 * Unique identifier for the cluster we are a part of. 420 */ 421 private String clusterId; 422 423 // chore for refreshing store files for secondary regions 424 private StorefileRefresherChore storefileRefresher; 425 426 private volatile RegionServerCoprocessorHost rsHost; 427 428 private RegionServerProcedureManagerHost rspmHost; 429 430 private RegionServerRpcQuotaManager rsQuotaManager; 431 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 432 433 /** 434 * Nonce manager. Nonces are used to make operations like increment and append idempotent in the 435 * case where client doesn't receive the response from a successful operation and retries. We 436 * track the successful ops for some time via a nonce sent by client and handle duplicate 437 * operations (currently, by failing them; in future we might use MVCC to return result). Nonces 438 * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL 439 * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past 440 * records. If we don't read the records, we don't read and recover the nonces. Some WALs within 441 * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL 442 * recovery during normal region move, so nonces will not be transfered. We can have separate 443 * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path 444 * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a 445 * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part 446 * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't 447 * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be 448 * recovered during move. 449 */ 450 final ServerNonceManager nonceManager; 451 452 private BrokenStoreFileCleaner brokenStoreFileCleaner; 453 454 private RSMobFileCleanerChore rsMobFileCleanerChore; 455 456 @InterfaceAudience.Private 457 CompactedHFilesDischarger compactedFileDischarger; 458 459 private volatile ThroughputController flushThroughputController; 460 461 private SecureBulkLoadManager secureBulkLoadManager; 462 463 private FileSystemUtilizationChore fsUtilizationChore; 464 465 private BootstrapNodeManager bootstrapNodeManager; 466 467 /** 468 * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to 469 * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing 470 * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a 471 * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace 472 * {@link #TEST_SKIP_REPORTING_TRANSITION} ? 473 */ 474 private final boolean masterless; 475 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 476 477 /** regionserver codec list **/ 478 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 479 480 // A timer to shutdown the process if abort takes too long 481 private Timer abortMonitor; 482 483 private RegionReplicationBufferManager regionReplicationBufferManager; 484 485 /* 486 * Chore that creates replication marker rows. 487 */ 488 private ReplicationMarkerChore replicationMarkerChore; 489 490 /** 491 * Starts a HRegionServer at the default location. 492 * <p/> 493 * Don't start any services or managers in here in the Constructor. Defer till after we register 494 * with the Master as much as possible. See {@link #startServices}. 495 */ 496 public HRegionServer(final Configuration conf) throws IOException { 497 super(conf, "RegionServer"); // thread name 498 final Span span = TraceUtil.createSpan("HRegionServer.cxtor"); 499 try (Scope ignored = span.makeCurrent()) { 500 this.dataFsOk = true; 501 this.masterless = !clusterMode(); 502 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); 503 HFile.checkHFileVersion(this.conf); 504 checkCodecs(this.conf); 505 FSUtils.setupShortCircuitRead(this.conf); 506 507 // Disable usage of meta replicas in the regionserver 508 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 509 // Config'ed params 510 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 511 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 512 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 513 514 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 515 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 516 517 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 518 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 519 520 this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME, 521 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); 522 523 regionServerAccounting = new RegionServerAccounting(conf); 524 525 blockCache = BlockCacheFactory.createBlockCache(conf); 526 mobFileCache = new MobFileCache(conf); 527 528 rsSnapshotVerifier = new RSSnapshotVerifier(conf); 529 530 uncaughtExceptionHandler = 531 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 532 533 // If no master in cluster, skip trying to track one or look for a cluster status. 534 if (!this.masterless) { 535 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 536 masterAddressTracker.start(); 537 } else { 538 masterAddressTracker = null; 539 } 540 this.rpcServices.start(zooKeeper); 541 span.setStatus(StatusCode.OK); 542 } catch (Throwable t) { 543 // Make sure we log the exception. HRegionServer is often started via reflection and the 544 // cause of failed startup is lost. 545 TraceUtil.setError(span, t); 546 LOG.error("Failed construction RegionServer", t); 547 throw t; 548 } finally { 549 span.end(); 550 } 551 } 552 553 // HMaster should override this method to load the specific config for master 554 @Override 555 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 556 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); 557 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 558 if (!StringUtils.isBlank(hostname)) { 559 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " 560 + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " 561 + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " 562 + UNSAFE_RS_HOSTNAME_KEY + " is used"; 563 throw new IOException(msg); 564 } else { 565 return rpcServices.getSocketAddress().getHostName(); 566 } 567 } else { 568 return hostname; 569 } 570 } 571 572 @Override 573 protected void login(UserProvider user, String host) throws IOException { 574 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 575 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 576 } 577 578 @Override 579 protected String getProcessName() { 580 return REGIONSERVER; 581 } 582 583 @Override 584 protected boolean canCreateBaseZNode() { 585 return !clusterMode(); 586 } 587 588 @Override 589 protected boolean canUpdateTableDescriptor() { 590 return false; 591 } 592 593 @Override 594 protected boolean cacheTableDescriptor() { 595 return false; 596 } 597 598 protected RSRpcServices createRpcServices() throws IOException { 599 return new RSRpcServices(this); 600 } 601 602 @Override 603 protected void configureInfoServer(InfoServer infoServer) { 604 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 605 infoServer.setAttribute(REGIONSERVER, this); 606 } 607 608 @Override 609 protected Class<? extends HttpServlet> getDumpServlet() { 610 return RSDumpServlet.class; 611 } 612 613 /** 614 * Used by {@link RSDumpServlet} to generate debugging information. 615 */ 616 public void dumpRowLocks(final PrintWriter out) { 617 StringBuilder sb = new StringBuilder(); 618 for (HRegion region : getRegions()) { 619 if (region.getLockedRows().size() > 0) { 620 for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) { 621 sb.setLength(0); 622 sb.append(region.getTableDescriptor().getTableName()).append(",") 623 .append(region.getRegionInfo().getEncodedName()).append(","); 624 sb.append(rowLockContext.toString()); 625 out.println(sb); 626 } 627 } 628 } 629 } 630 631 @Override 632 public boolean registerService(Service instance) { 633 // No stacking of instances is allowed for a single executorService name 634 ServiceDescriptor serviceDesc = instance.getDescriptorForType(); 635 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 636 if (coprocessorServiceHandlers.containsKey(serviceName)) { 637 LOG.error("Coprocessor executorService " + serviceName 638 + " already registered, rejecting request from " + instance); 639 return false; 640 } 641 642 coprocessorServiceHandlers.put(serviceName, instance); 643 if (LOG.isDebugEnabled()) { 644 LOG.debug( 645 "Registered regionserver coprocessor executorService: executorService=" + serviceName); 646 } 647 return true; 648 } 649 650 /** 651 * Run test on configured codecs to make sure supporting libs are in place. 652 */ 653 private static void checkCodecs(final Configuration c) throws IOException { 654 // check to see if the codec list is available: 655 String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null); 656 if (codecs == null) { 657 return; 658 } 659 for (String codec : codecs) { 660 if (!CompressionTest.testCompression(codec)) { 661 throw new IOException( 662 "Compression codec " + codec + " not supported, aborting RS construction"); 663 } 664 } 665 } 666 667 public String getClusterId() { 668 return this.clusterId; 669 } 670 671 /** 672 * All initialization needed before we go register with Master.<br> 673 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 674 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 675 */ 676 private void preRegistrationInitialization() { 677 final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization"); 678 try (Scope ignored = span.makeCurrent()) { 679 initializeZooKeeper(); 680 setupClusterConnection(); 681 bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker); 682 regionReplicationBufferManager = new RegionReplicationBufferManager(this); 683 // Setup RPC client for master communication 684 this.rpcClient = asyncClusterConnection.getRpcClient(); 685 span.setStatus(StatusCode.OK); 686 } catch (Throwable t) { 687 // Call stop if error or process will stick around for ever since server 688 // puts up non-daemon threads. 689 TraceUtil.setError(span, t); 690 this.rpcServices.stop(); 691 abort("Initialization of RS failed. Hence aborting RS.", t); 692 } finally { 693 span.end(); 694 } 695 } 696 697 /** 698 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 699 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 700 * <p> 701 * Finally open long-living server short-circuit connection. 702 */ 703 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 704 justification = "cluster Id znode read would give us correct response") 705 private void initializeZooKeeper() throws IOException, InterruptedException { 706 // Nothing to do in here if no Master in the mix. 707 if (this.masterless) { 708 return; 709 } 710 711 // Create the master address tracker, register with zk, and start it. Then 712 // block until a master is available. No point in starting up if no master 713 // running. 714 blockAndCheckIfStopped(this.masterAddressTracker); 715 716 // Wait on cluster being up. Master will set this flag up in zookeeper 717 // when ready. 718 blockAndCheckIfStopped(this.clusterStatusTracker); 719 720 // If we are HMaster then the cluster id should have already been set. 721 if (clusterId == null) { 722 // Retrieve clusterId 723 // Since cluster status is now up 724 // ID should have already been set by HMaster 725 try { 726 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 727 if (clusterId == null) { 728 this.abort("Cluster ID has not been set"); 729 } 730 LOG.info("ClusterId : " + clusterId); 731 } catch (KeeperException e) { 732 this.abort("Failed to retrieve Cluster ID", e); 733 } 734 } 735 736 if (isStopped() || isAborted()) { 737 return; // No need for further initialization 738 } 739 740 // watch for snapshots and other procedures 741 try { 742 rspmHost = new RegionServerProcedureManagerHost(); 743 rspmHost.loadProcedures(conf); 744 rspmHost.initialize(this); 745 } catch (KeeperException e) { 746 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 747 } 748 } 749 750 /** 751 * Utilty method to wait indefinitely on a znode availability while checking if the region server 752 * is shut down 753 * @param tracker znode tracker to use 754 * @throws IOException any IO exception, plus if the RS is stopped 755 * @throws InterruptedException if the waiting thread is interrupted 756 */ 757 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 758 throws IOException, InterruptedException { 759 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 760 if (this.stopped) { 761 throw new IOException("Received the shutdown message while waiting."); 762 } 763 } 764 } 765 766 /** Returns True if the cluster is up. */ 767 @Override 768 public boolean isClusterUp() { 769 return this.masterless 770 || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 771 } 772 773 private void initializeReplicationMarkerChore() { 774 boolean replicationMarkerEnabled = 775 conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); 776 // If replication or replication marker is not enabled then return immediately. 777 if (replicationMarkerEnabled) { 778 int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY, 779 REPLICATION_MARKER_CHORE_DURATION_DEFAULT); 780 replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf); 781 } 782 } 783 784 /** 785 * The HRegionServer sticks in this loop until closed. 786 */ 787 @Override 788 public void run() { 789 if (isStopped()) { 790 LOG.info("Skipping run; stopped"); 791 return; 792 } 793 try { 794 // Do pre-registration initializations; zookeeper, lease threads, etc. 795 preRegistrationInitialization(); 796 } catch (Throwable e) { 797 abort("Fatal exception during initialization", e); 798 } 799 800 try { 801 if (!isStopped() && !isAborted()) { 802 installShutdownHook(); 803 // Initialize the RegionServerCoprocessorHost now that our ephemeral 804 // node was created, in case any coprocessors want to use ZooKeeper 805 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 806 807 // Try and register with the Master; tell it we are here. Break if server is stopped or 808 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 809 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 810 // come up. 811 LOG.debug("About to register with Master."); 812 TraceUtil.trace(() -> { 813 RetryCounterFactory rcf = 814 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 815 RetryCounter rc = rcf.create(); 816 while (keepLooping()) { 817 RegionServerStartupResponse w = reportForDuty(); 818 if (w == null) { 819 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 820 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 821 this.sleeper.sleep(sleepTime); 822 } else { 823 handleReportForDutyResponse(w); 824 break; 825 } 826 } 827 }, "HRegionServer.registerWithMaster"); 828 } 829 830 if (!isStopped() && isHealthy()) { 831 TraceUtil.trace(() -> { 832 // start the snapshot handler and other procedure handlers, 833 // since the server is ready to run 834 if (this.rspmHost != null) { 835 this.rspmHost.start(); 836 } 837 // Start the Quota Manager 838 if (this.rsQuotaManager != null) { 839 rsQuotaManager.start(getRpcServer().getScheduler()); 840 } 841 if (this.rsSpaceQuotaManager != null) { 842 this.rsSpaceQuotaManager.start(); 843 } 844 }, "HRegionServer.startup"); 845 } 846 847 // We registered with the Master. Go into run mode. 848 long lastMsg = EnvironmentEdgeManager.currentTime(); 849 long oldRequestCount = -1; 850 // The main run loop. 851 while (!isStopped() && isHealthy()) { 852 if (!isClusterUp()) { 853 if (onlineRegions.isEmpty()) { 854 stop("Exiting; cluster shutdown set and not carrying any regions"); 855 } else if (!this.stopping) { 856 this.stopping = true; 857 LOG.info("Closing user regions"); 858 closeUserRegions(isAborted()); 859 } else { 860 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 861 if (allUserRegionsOffline) { 862 // Set stopped if no more write requests tp meta tables 863 // since last time we went around the loop. Any open 864 // meta regions will be closed on our way out. 865 if (oldRequestCount == getWriteRequestCount()) { 866 stop("Stopped; only catalog regions remaining online"); 867 break; 868 } 869 oldRequestCount = getWriteRequestCount(); 870 } else { 871 // Make sure all regions have been closed -- some regions may 872 // have not got it because we were splitting at the time of 873 // the call to closeUserRegions. 874 closeUserRegions(this.abortRequested.get()); 875 } 876 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 877 } 878 } 879 long now = EnvironmentEdgeManager.currentTime(); 880 if ((now - lastMsg) >= msgInterval) { 881 tryRegionServerReport(lastMsg, now); 882 lastMsg = EnvironmentEdgeManager.currentTime(); 883 } 884 if (!isStopped() && !isAborted()) { 885 this.sleeper.sleep(); 886 } 887 } // for 888 } catch (Throwable t) { 889 if (!rpcServices.checkOOME(t)) { 890 String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: "; 891 abort(prefix + t.getMessage(), t); 892 } 893 } 894 895 final Span span = TraceUtil.createSpan("HRegionServer exiting main loop"); 896 try (Scope ignored = span.makeCurrent()) { 897 if (this.leaseManager != null) { 898 this.leaseManager.closeAfterLeasesExpire(); 899 } 900 if (this.splitLogWorker != null) { 901 splitLogWorker.stop(); 902 } 903 stopInfoServer(); 904 // Send cache a shutdown. 905 if (blockCache != null) { 906 blockCache.shutdown(); 907 } 908 if (mobFileCache != null) { 909 mobFileCache.shutdown(); 910 } 911 912 // Send interrupts to wake up threads if sleeping so they notice shutdown. 913 // TODO: Should we check they are alive? If OOME could have exited already 914 if (this.hMemManager != null) { 915 this.hMemManager.stop(); 916 } 917 if (this.cacheFlusher != null) { 918 this.cacheFlusher.interruptIfNecessary(); 919 } 920 if (this.compactSplitThread != null) { 921 this.compactSplitThread.interruptIfNecessary(); 922 } 923 924 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 925 if (rspmHost != null) { 926 rspmHost.stop(this.abortRequested.get() || this.killed); 927 } 928 929 if (this.killed) { 930 // Just skip out w/o closing regions. Used when testing. 931 } else if (abortRequested.get()) { 932 if (this.dataFsOk) { 933 closeUserRegions(abortRequested.get()); // Don't leave any open file handles 934 } 935 LOG.info("aborting server " + this.serverName); 936 } else { 937 closeUserRegions(abortRequested.get()); 938 LOG.info("stopping server " + this.serverName); 939 } 940 regionReplicationBufferManager.stop(); 941 closeClusterConnection(); 942 // Closing the compactSplit thread before closing meta regions 943 if (!this.killed && containsMetaTableRegions()) { 944 if (!abortRequested.get() || this.dataFsOk) { 945 if (this.compactSplitThread != null) { 946 this.compactSplitThread.join(); 947 this.compactSplitThread = null; 948 } 949 closeMetaTableRegions(abortRequested.get()); 950 } 951 } 952 953 if (!this.killed && this.dataFsOk) { 954 waitOnAllRegionsToClose(abortRequested.get()); 955 LOG.info("stopping server " + this.serverName + "; all regions closed."); 956 } 957 958 // Stop the quota manager 959 if (rsQuotaManager != null) { 960 rsQuotaManager.stop(); 961 } 962 if (rsSpaceQuotaManager != null) { 963 rsSpaceQuotaManager.stop(); 964 rsSpaceQuotaManager = null; 965 } 966 967 // flag may be changed when closing regions throws exception. 968 if (this.dataFsOk) { 969 shutdownWAL(!abortRequested.get()); 970 } 971 972 // Make sure the proxy is down. 973 if (this.rssStub != null) { 974 this.rssStub = null; 975 } 976 if (this.lockStub != null) { 977 this.lockStub = null; 978 } 979 if (this.rpcClient != null) { 980 this.rpcClient.close(); 981 } 982 if (this.leaseManager != null) { 983 this.leaseManager.close(); 984 } 985 if (this.pauseMonitor != null) { 986 this.pauseMonitor.stop(); 987 } 988 989 if (!killed) { 990 stopServiceThreads(); 991 } 992 993 if (this.rpcServices != null) { 994 this.rpcServices.stop(); 995 } 996 997 try { 998 deleteMyEphemeralNode(); 999 } catch (KeeperException.NoNodeException nn) { 1000 // pass 1001 } catch (KeeperException e) { 1002 LOG.warn("Failed deleting my ephemeral node", e); 1003 } 1004 // We may have failed to delete the znode at the previous step, but 1005 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1006 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1007 1008 closeZooKeeper(); 1009 closeTableDescriptors(); 1010 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1011 span.setStatus(StatusCode.OK); 1012 } finally { 1013 span.end(); 1014 } 1015 } 1016 1017 private boolean containsMetaTableRegions() { 1018 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1019 } 1020 1021 private boolean areAllUserRegionsOffline() { 1022 if (getNumberOfOnlineRegions() > 2) { 1023 return false; 1024 } 1025 boolean allUserRegionsOffline = true; 1026 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1027 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1028 allUserRegionsOffline = false; 1029 break; 1030 } 1031 } 1032 return allUserRegionsOffline; 1033 } 1034 1035 /** Returns Current write count for all online regions. */ 1036 private long getWriteRequestCount() { 1037 long writeCount = 0; 1038 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1039 writeCount += e.getValue().getWriteRequestsCount(); 1040 } 1041 return writeCount; 1042 } 1043 1044 @InterfaceAudience.Private 1045 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1046 throws IOException { 1047 RegionServerStatusService.BlockingInterface rss = rssStub; 1048 if (rss == null) { 1049 // the current server could be stopping. 1050 return; 1051 } 1052 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1053 final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport"); 1054 try (Scope ignored = span.makeCurrent()) { 1055 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1056 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1057 request.setLoad(sl); 1058 rss.regionServerReport(null, request.build()); 1059 span.setStatus(StatusCode.OK); 1060 } catch (ServiceException se) { 1061 IOException ioe = ProtobufUtil.getRemoteException(se); 1062 if (ioe instanceof YouAreDeadException) { 1063 // This will be caught and handled as a fatal error in run() 1064 TraceUtil.setError(span, ioe); 1065 throw ioe; 1066 } 1067 if (rssStub == rss) { 1068 rssStub = null; 1069 } 1070 TraceUtil.setError(span, se); 1071 // Couldn't connect to the master, get location from zk and reconnect 1072 // Method blocks until new master is found or we are stopped 1073 createRegionServerStatusStub(true); 1074 } finally { 1075 span.end(); 1076 } 1077 } 1078 1079 /** 1080 * Reports the given map of Regions and their size on the filesystem to the active Master. 1081 * @param regionSizeStore The store containing region sizes 1082 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1083 */ 1084 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1085 RegionServerStatusService.BlockingInterface rss = rssStub; 1086 if (rss == null) { 1087 // the current server could be stopping. 1088 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1089 return true; 1090 } 1091 try { 1092 buildReportAndSend(rss, regionSizeStore); 1093 } catch (ServiceException se) { 1094 IOException ioe = ProtobufUtil.getRemoteException(se); 1095 if (ioe instanceof PleaseHoldException) { 1096 LOG.trace("Failed to report region sizes to Master because it is initializing." 1097 + " This will be retried.", ioe); 1098 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1099 return true; 1100 } 1101 if (rssStub == rss) { 1102 rssStub = null; 1103 } 1104 createRegionServerStatusStub(true); 1105 if (ioe instanceof DoNotRetryIOException) { 1106 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1107 if (doNotRetryEx.getCause() != null) { 1108 Throwable t = doNotRetryEx.getCause(); 1109 if (t instanceof UnsupportedOperationException) { 1110 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1111 return false; 1112 } 1113 } 1114 } 1115 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1116 } 1117 return true; 1118 } 1119 1120 /** 1121 * Builds the region size report and sends it to the master. Upon successful sending of the 1122 * report, the region sizes that were sent are marked as sent. 1123 * @param rss The stub to send to the Master 1124 * @param regionSizeStore The store containing region sizes 1125 */ 1126 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1127 RegionSizeStore regionSizeStore) throws ServiceException { 1128 RegionSpaceUseReportRequest request = 1129 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1130 rss.reportRegionSpaceUse(null, request); 1131 // Record the number of size reports sent 1132 if (metricsRegionServer != null) { 1133 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1134 } 1135 } 1136 1137 /** 1138 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1139 * @param regionSizes The size in bytes of regions 1140 * @return The corresponding protocol buffer message. 1141 */ 1142 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1143 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1144 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1145 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1146 } 1147 return request.build(); 1148 } 1149 1150 /** 1151 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf 1152 * message. 1153 * @param regionInfo The RegionInfo 1154 * @param sizeInBytes The size in bytes of the Region 1155 * @return The protocol buffer 1156 */ 1157 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1158 return RegionSpaceUse.newBuilder() 1159 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1160 .setRegionSize(Objects.requireNonNull(sizeInBytes)).build(); 1161 } 1162 1163 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1164 throws IOException { 1165 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1166 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1167 // the wrapper to compute those numbers in one place. 1168 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1169 // Instead they should be stored in an HBase table so that external visibility into HBase is 1170 // improved; Additionally the load balancer will be able to take advantage of a more complete 1171 // history. 1172 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1173 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1174 long usedMemory = -1L; 1175 long maxMemory = -1L; 1176 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1177 if (usage != null) { 1178 usedMemory = usage.getUsed(); 1179 maxMemory = usage.getMax(); 1180 } 1181 1182 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1183 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1184 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1185 serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024)); 1186 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1187 serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount()); 1188 serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount()); 1189 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1190 Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder(); 1191 for (String coprocessor : coprocessors) { 1192 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1193 } 1194 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1195 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1196 for (HRegion region : regions) { 1197 if (region.getCoprocessorHost() != null) { 1198 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1199 for (String regionCoprocessor : regionCoprocessors) { 1200 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1201 } 1202 } 1203 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1204 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1205 .getCoprocessors()) { 1206 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1207 } 1208 } 1209 serverLoad.setReportStartTime(reportStartTime); 1210 serverLoad.setReportEndTime(reportEndTime); 1211 if (this.infoServer != null) { 1212 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1213 } else { 1214 serverLoad.setInfoServerPort(-1); 1215 } 1216 MetricsUserAggregateSource userSource = 1217 metricsRegionServer.getMetricsUserAggregate().getSource(); 1218 if (userSource != null) { 1219 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1220 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1221 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1222 } 1223 } 1224 1225 if (sameReplicationSourceAndSink && replicationSourceHandler != null) { 1226 // always refresh first to get the latest value 1227 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1228 if (rLoad != null) { 1229 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1230 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1231 .getReplicationLoadSourceEntries()) { 1232 serverLoad.addReplLoadSource(rLS); 1233 } 1234 } 1235 } else { 1236 if (replicationSourceHandler != null) { 1237 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1238 if (rLoad != null) { 1239 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1240 .getReplicationLoadSourceEntries()) { 1241 serverLoad.addReplLoadSource(rLS); 1242 } 1243 } 1244 } 1245 if (replicationSinkHandler != null) { 1246 ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad(); 1247 if (rLoad != null) { 1248 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1249 } 1250 } 1251 } 1252 1253 TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask 1254 .newBuilder().setDescription(task.getDescription()) 1255 .setStatus(task.getStatus() != null ? task.getStatus() : "") 1256 .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) 1257 .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build())); 1258 1259 return serverLoad.build(); 1260 } 1261 1262 private String getOnlineRegionsAsPrintableString() { 1263 StringBuilder sb = new StringBuilder(); 1264 for (Region r : this.onlineRegions.values()) { 1265 if (sb.length() > 0) { 1266 sb.append(", "); 1267 } 1268 sb.append(r.getRegionInfo().getEncodedName()); 1269 } 1270 return sb.toString(); 1271 } 1272 1273 /** 1274 * Wait on regions close. 1275 */ 1276 private void waitOnAllRegionsToClose(final boolean abort) { 1277 // Wait till all regions are closed before going out. 1278 int lastCount = -1; 1279 long previousLogTime = 0; 1280 Set<String> closedRegions = new HashSet<>(); 1281 boolean interrupted = false; 1282 try { 1283 while (!onlineRegions.isEmpty()) { 1284 int count = getNumberOfOnlineRegions(); 1285 // Only print a message if the count of regions has changed. 1286 if (count != lastCount) { 1287 // Log every second at most 1288 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 1289 previousLogTime = EnvironmentEdgeManager.currentTime(); 1290 lastCount = count; 1291 LOG.info("Waiting on " + count + " regions to close"); 1292 // Only print out regions still closing if a small number else will 1293 // swamp the log. 1294 if (count < 10 && LOG.isDebugEnabled()) { 1295 LOG.debug("Online Regions=" + this.onlineRegions); 1296 } 1297 } 1298 } 1299 // Ensure all user regions have been sent a close. Use this to 1300 // protect against the case where an open comes in after we start the 1301 // iterator of onlineRegions to close all user regions. 1302 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1303 RegionInfo hri = e.getValue().getRegionInfo(); 1304 if ( 1305 !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) 1306 && !closedRegions.contains(hri.getEncodedName()) 1307 ) { 1308 closedRegions.add(hri.getEncodedName()); 1309 // Don't update zk with this close transition; pass false. 1310 closeRegionIgnoreErrors(hri, abort); 1311 } 1312 } 1313 // No regions in RIT, we could stop waiting now. 1314 if (this.regionsInTransitionInRS.isEmpty()) { 1315 if (!onlineRegions.isEmpty()) { 1316 LOG.info("We were exiting though online regions are not empty," 1317 + " because some regions failed closing"); 1318 } 1319 break; 1320 } else { 1321 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream() 1322 .map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1323 } 1324 if (sleepInterrupted(200)) { 1325 interrupted = true; 1326 } 1327 } 1328 } finally { 1329 if (interrupted) { 1330 Thread.currentThread().interrupt(); 1331 } 1332 } 1333 } 1334 1335 private static boolean sleepInterrupted(long millis) { 1336 boolean interrupted = false; 1337 try { 1338 Thread.sleep(millis); 1339 } catch (InterruptedException e) { 1340 LOG.warn("Interrupted while sleeping"); 1341 interrupted = true; 1342 } 1343 return interrupted; 1344 } 1345 1346 private void shutdownWAL(final boolean close) { 1347 if (this.walFactory != null) { 1348 try { 1349 if (close) { 1350 walFactory.close(); 1351 } else { 1352 walFactory.shutdown(); 1353 } 1354 } catch (Throwable e) { 1355 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1356 LOG.error("Shutdown / close of WAL failed: " + e); 1357 LOG.debug("Shutdown / close exception details:", e); 1358 } 1359 } 1360 } 1361 1362 /** 1363 * Run init. Sets up wal and starts up all server threads. 1364 * @param c Extra configuration. 1365 */ 1366 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1367 throws IOException { 1368 try { 1369 boolean updateRootDir = false; 1370 for (NameStringPair e : c.getMapEntriesList()) { 1371 String key = e.getName(); 1372 // The hostname the master sees us as. 1373 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1374 String hostnameFromMasterPOV = e.getValue(); 1375 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, 1376 rpcServices.getSocketAddress().getPort(), this.startcode); 1377 String expectedHostName = rpcServices.getSocketAddress().getHostName(); 1378 // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it 1379 // is set to disable. so we will use the ip of the RegionServer to compare with the 1380 // hostname passed by the Master, see HBASE-27304 for details. 1381 if ( 1382 StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent() 1383 && InetAddresses.isInetAddress(getActiveMaster().get().getHostname()) 1384 ) { 1385 expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress(); 1386 } 1387 boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead) 1388 ? hostnameFromMasterPOV.equals(expectedHostName) 1389 : hostnameFromMasterPOV.equals(useThisHostnameInstead); 1390 if (!isHostnameConsist) { 1391 String msg = "Master passed us a different hostname to use; was=" 1392 + (StringUtils.isBlank(useThisHostnameInstead) 1393 ? rpcServices.getSocketAddress().getHostName() 1394 : this.useThisHostnameInstead) 1395 + ", but now=" + hostnameFromMasterPOV; 1396 LOG.error(msg); 1397 throw new IOException(msg); 1398 } 1399 continue; 1400 } 1401 1402 String value = e.getValue(); 1403 if (key.equals(HConstants.HBASE_DIR)) { 1404 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1405 updateRootDir = true; 1406 } 1407 } 1408 1409 if (LOG.isDebugEnabled()) { 1410 LOG.debug("Config from master: " + key + "=" + value); 1411 } 1412 this.conf.set(key, value); 1413 } 1414 // Set our ephemeral znode up in zookeeper now we have a name. 1415 createMyEphemeralNode(); 1416 1417 if (updateRootDir) { 1418 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1419 initializeFileSystem(); 1420 } 1421 1422 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1423 // config param for task trackers, but we can piggyback off of it. 1424 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1425 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1426 } 1427 1428 // Save it in a file, this will allow to see if we crash 1429 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1430 1431 // This call sets up an initialized replication and WAL. Later we start it up. 1432 setupWALAndReplication(); 1433 // Init in here rather than in constructor after thread name has been set 1434 final MetricsTable metricsTable = 1435 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1436 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1437 this.metricsRegionServer = 1438 new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable); 1439 // Now that we have a metrics source, start the pause monitor 1440 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1441 pauseMonitor.start(); 1442 1443 // There is a rare case where we do NOT want services to start. Check config. 1444 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1445 startServices(); 1446 } 1447 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1448 // or make sense of it. 1449 startReplicationService(); 1450 1451 // Set up ZK 1452 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress() 1453 + ", sessionid=0x" 1454 + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1455 1456 // Wake up anyone waiting for this server to online 1457 synchronized (online) { 1458 online.set(true); 1459 online.notifyAll(); 1460 } 1461 } catch (Throwable e) { 1462 stop("Failed initialization"); 1463 throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed"); 1464 } finally { 1465 sleeper.skipSleepCycle(); 1466 } 1467 } 1468 1469 private void startHeapMemoryManager() { 1470 if (this.blockCache != null) { 1471 this.hMemManager = 1472 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1473 this.hMemManager.start(getChoreService()); 1474 } 1475 } 1476 1477 private void createMyEphemeralNode() throws KeeperException { 1478 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1479 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1480 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1481 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1482 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1483 } 1484 1485 private void deleteMyEphemeralNode() throws KeeperException { 1486 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1487 } 1488 1489 @Override 1490 public RegionServerAccounting getRegionServerAccounting() { 1491 return regionServerAccounting; 1492 } 1493 1494 // Round the size with KB or MB. 1495 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1 1496 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See 1497 // HBASE-26340 for more details on why this is important. 1498 private static int roundSize(long sizeInByte, int sizeUnit) { 1499 if (sizeInByte == 0) { 1500 return 0; 1501 } else if (sizeInByte < sizeUnit) { 1502 return 1; 1503 } else { 1504 return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE); 1505 } 1506 } 1507 1508 /** 1509 * @param r Region to get RegionLoad for. 1510 * @param regionLoadBldr the RegionLoad.Builder, can be null 1511 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1512 * @return RegionLoad instance. 1513 */ 1514 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1515 RegionSpecifier.Builder regionSpecifier) throws IOException { 1516 byte[] name = r.getRegionInfo().getRegionName(); 1517 int stores = 0; 1518 int storefiles = 0; 1519 int storeRefCount = 0; 1520 int maxCompactedStoreFileRefCount = 0; 1521 long storeUncompressedSize = 0L; 1522 long storefileSize = 0L; 1523 long storefileIndexSize = 0L; 1524 long rootLevelIndexSize = 0L; 1525 long totalStaticIndexSize = 0L; 1526 long totalStaticBloomSize = 0L; 1527 long totalCompactingKVs = 0L; 1528 long currentCompactedKVs = 0L; 1529 List<HStore> storeList = r.getStores(); 1530 stores += storeList.size(); 1531 for (HStore store : storeList) { 1532 storefiles += store.getStorefilesCount(); 1533 int currentStoreRefCount = store.getStoreRefCount(); 1534 storeRefCount += currentStoreRefCount; 1535 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1536 maxCompactedStoreFileRefCount = 1537 Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); 1538 storeUncompressedSize += store.getStoreSizeUncompressed(); 1539 storefileSize += store.getStorefilesSize(); 1540 // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1541 storefileIndexSize += store.getStorefilesRootLevelIndexSize(); 1542 CompactionProgress progress = store.getCompactionProgress(); 1543 if (progress != null) { 1544 totalCompactingKVs += progress.getTotalCompactingKVs(); 1545 currentCompactedKVs += progress.currentCompactedKVs; 1546 } 1547 rootLevelIndexSize += store.getStorefilesRootLevelIndexSize(); 1548 totalStaticIndexSize += store.getTotalStaticIndexSize(); 1549 totalStaticBloomSize += store.getTotalStaticBloomSize(); 1550 } 1551 1552 int unitMB = 1024 * 1024; 1553 int unitKB = 1024; 1554 1555 int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); 1556 int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); 1557 int storefileSizeMB = roundSize(storefileSize, unitMB); 1558 int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB); 1559 int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); 1560 int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); 1561 int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); 1562 1563 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); 1564 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); 1565 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname()); 1566 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); 1567 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); 1568 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); 1569 if (regionLoadBldr == null) { 1570 regionLoadBldr = RegionLoad.newBuilder(); 1571 } 1572 if (regionSpecifier == null) { 1573 regionSpecifier = RegionSpecifier.newBuilder(); 1574 } 1575 1576 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1577 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1578 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores) 1579 .setStorefiles(storefiles).setStoreRefCount(storeRefCount) 1580 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1581 .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB) 1582 .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB) 1583 .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1584 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1585 .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount()) 1586 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1587 .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs) 1588 .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality) 1589 .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) 1590 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) 1591 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) 1592 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); 1593 r.setCompleteSequenceId(regionLoadBldr); 1594 return regionLoadBldr.build(); 1595 } 1596 1597 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1598 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1599 userLoadBldr.setUserName(user); 1600 userSource.getClientMetrics().values().stream() 1601 .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1602 .setHostName(clientMetrics.getHostName()) 1603 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1604 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1605 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) 1606 .forEach(userLoadBldr::addClientMetrics); 1607 return userLoadBldr.build(); 1608 } 1609 1610 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1611 HRegion r = onlineRegions.get(encodedRegionName); 1612 return r != null ? createRegionLoad(r, null, null) : null; 1613 } 1614 1615 /** 1616 * Inner class that runs on a long period checking if regions need compaction. 1617 */ 1618 private static class CompactionChecker extends ScheduledChore { 1619 private final HRegionServer instance; 1620 private final int majorCompactPriority; 1621 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1622 // Iteration is 1-based rather than 0-based so we don't check for compaction 1623 // immediately upon region server startup 1624 private long iteration = 1; 1625 1626 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1627 super("CompactionChecker", stopper, sleepTime); 1628 this.instance = h; 1629 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1630 1631 /* 1632 * MajorCompactPriority is configurable. If not set, the compaction will use default priority. 1633 */ 1634 this.majorCompactPriority = this.instance.conf 1635 .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); 1636 } 1637 1638 @Override 1639 protected void chore() { 1640 for (Region r : this.instance.onlineRegions.values()) { 1641 // Skip compaction if region is read only 1642 if (r == null || r.isReadOnly()) { 1643 continue; 1644 } 1645 1646 HRegion hr = (HRegion) r; 1647 for (HStore s : hr.stores.values()) { 1648 try { 1649 long multiplier = s.getCompactionCheckMultiplier(); 1650 assert multiplier > 0; 1651 if (iteration % multiplier != 0) { 1652 continue; 1653 } 1654 if (s.needsCompaction()) { 1655 // Queue a compaction. Will recognize if major is needed. 1656 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1657 getName() + " requests compaction"); 1658 } else if (s.shouldPerformMajorCompaction()) { 1659 s.triggerMajorCompaction(); 1660 if ( 1661 majorCompactPriority == DEFAULT_PRIORITY 1662 || majorCompactPriority > hr.getCompactPriority() 1663 ) { 1664 this.instance.compactSplitThread.requestCompaction(hr, s, 1665 getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, 1666 CompactionLifeCycleTracker.DUMMY, null); 1667 } else { 1668 this.instance.compactSplitThread.requestCompaction(hr, s, 1669 getName() + " requests major compaction; use configured priority", 1670 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1671 } 1672 } 1673 } catch (IOException e) { 1674 LOG.warn("Failed major compaction check on " + r, e); 1675 } 1676 } 1677 } 1678 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1679 } 1680 } 1681 1682 private static class PeriodicMemStoreFlusher extends ScheduledChore { 1683 private final HRegionServer server; 1684 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1685 private final static int MIN_DELAY_TIME = 0; // millisec 1686 private final long rangeOfDelayMs; 1687 1688 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1689 super("MemstoreFlusherChore", server, cacheFlushInterval); 1690 this.server = server; 1691 1692 final long configuredRangeOfDelay = server.getConfiguration() 1693 .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 1694 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 1695 } 1696 1697 @Override 1698 protected void chore() { 1699 final StringBuilder whyFlush = new StringBuilder(); 1700 for (HRegion r : this.server.onlineRegions.values()) { 1701 if (r == null) { 1702 continue; 1703 } 1704 if (r.shouldFlush(whyFlush)) { 1705 FlushRequester requester = server.getFlushRequester(); 1706 if (requester != null) { 1707 long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME; 1708 // Throttle the flushes by putting a delay. If we don't throttle, and there 1709 // is a balanced write-load on the regions in a table, we might end up 1710 // overwhelming the filesystem with too many flushes at once. 1711 if (requester.requestDelayedFlush(r, delay)) { 1712 LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(), 1713 r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay); 1714 } 1715 } 1716 } 1717 } 1718 } 1719 } 1720 1721 /** 1722 * Report the status of the server. A server is online once all the startup is completed (setting 1723 * up filesystem, starting executorService threads, etc.). This method is designed mostly to be 1724 * useful in tests. 1725 * @return true if online, false if not. 1726 */ 1727 public boolean isOnline() { 1728 return online.get(); 1729 } 1730 1731 /** 1732 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1733 * be hooked up to WAL. 1734 */ 1735 private void setupWALAndReplication() throws IOException { 1736 WALFactory factory = new WALFactory(conf, serverName, this, true); 1737 // TODO Replication make assumptions here based on the default filesystem impl 1738 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1739 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1740 1741 Path logDir = new Path(walRootDir, logName); 1742 LOG.debug("logDir={}", logDir); 1743 if (this.walFs.exists(logDir)) { 1744 throw new RegionServerRunningException( 1745 "Region server has already created directory at " + this.serverName.toString()); 1746 } 1747 // Always create wal directory as now we need this when master restarts to find out the live 1748 // region servers. 1749 if (!this.walFs.mkdirs(logDir)) { 1750 throw new IOException("Can not create wal directory " + logDir); 1751 } 1752 // Instantiate replication if replication enabled. Pass it the log directories. 1753 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); 1754 1755 WALActionsListener walEventListener = getWALEventTrackerListener(conf); 1756 if (walEventListener != null && factory.getWALProvider() != null) { 1757 factory.getWALProvider().addWALActionsListener(walEventListener); 1758 } 1759 this.walFactory = factory; 1760 } 1761 1762 private WALActionsListener getWALEventTrackerListener(Configuration conf) { 1763 if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) { 1764 WALEventTrackerListener listener = 1765 new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName()); 1766 return listener; 1767 } 1768 return null; 1769 } 1770 1771 /** 1772 * Start up replication source and sink handlers. 1773 */ 1774 private void startReplicationService() throws IOException { 1775 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 1776 this.replicationSourceHandler.startReplicationService(); 1777 } else { 1778 if (this.replicationSourceHandler != null) { 1779 this.replicationSourceHandler.startReplicationService(); 1780 } 1781 if (this.replicationSinkHandler != null) { 1782 this.replicationSinkHandler.startReplicationService(); 1783 } 1784 } 1785 } 1786 1787 /** Returns Master address tracker instance. */ 1788 public MasterAddressTracker getMasterAddressTracker() { 1789 return this.masterAddressTracker; 1790 } 1791 1792 /** 1793 * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need 1794 * to run. This is called after we've successfully registered with the Master. Install an 1795 * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We 1796 * cannot set the handler on all threads. Server's internal Listener thread is off limits. For 1797 * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries 1798 * to run should trigger same critical condition and the shutdown will run. On its way out, this 1799 * server will shut down Server. Leases are sort of inbetween. It has an internal thread that 1800 * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped 1801 * by this hosting server. Worker logs the exception and exits. 1802 */ 1803 private void startServices() throws IOException { 1804 if (!isStopped() && !isAborted()) { 1805 initializeThreads(); 1806 } 1807 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection); 1808 this.secureBulkLoadManager.start(); 1809 1810 // Health checker thread. 1811 if (isHealthCheckerConfigured()) { 1812 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1813 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1814 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1815 } 1816 // Executor status collect thread. 1817 if ( 1818 this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED, 1819 HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED) 1820 ) { 1821 int sleepTime = 1822 this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ); 1823 executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(), 1824 this.metricsRegionServer.getMetricsSource()); 1825 } 1826 1827 this.walRoller = new LogRoller(this); 1828 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1829 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 1830 1831 // Create the CompactedFileDischarger chore executorService. This chore helps to 1832 // remove the compacted files that will no longer be used in reads. 1833 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1834 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1835 int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1836 this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this); 1837 choreService.scheduleChore(compactedFileDischarger); 1838 1839 // Start executor services 1840 final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3); 1841 executorService.startExecutorService(executorService.new ExecutorConfig() 1842 .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads)); 1843 final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1); 1844 executorService.startExecutorService(executorService.new ExecutorConfig() 1845 .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads)); 1846 final int openPriorityRegionThreads = 1847 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3); 1848 executorService.startExecutorService( 1849 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION) 1850 .setCorePoolSize(openPriorityRegionThreads)); 1851 final int closeRegionThreads = 1852 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3); 1853 executorService.startExecutorService(executorService.new ExecutorConfig() 1854 .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads)); 1855 final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1); 1856 executorService.startExecutorService(executorService.new ExecutorConfig() 1857 .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads)); 1858 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1859 final int storeScannerParallelSeekThreads = 1860 conf.getInt("hbase.storescanner.parallel.seek.threads", 10); 1861 executorService.startExecutorService( 1862 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK) 1863 .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true)); 1864 } 1865 final int logReplayOpsThreads = 1866 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 1867 executorService.startExecutorService( 1868 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS) 1869 .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true)); 1870 // Start the threads for compacted files discharger 1871 final int compactionDischargerThreads = 1872 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10); 1873 executorService.startExecutorService(executorService.new ExecutorConfig() 1874 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER) 1875 .setCorePoolSize(compactionDischargerThreads)); 1876 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1877 final int regionReplicaFlushThreads = 1878 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1879 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1880 executorService.startExecutorService(executorService.new ExecutorConfig() 1881 .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS) 1882 .setCorePoolSize(regionReplicaFlushThreads)); 1883 } 1884 final int refreshPeerThreads = 1885 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2); 1886 executorService.startExecutorService(executorService.new ExecutorConfig() 1887 .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads)); 1888 final int replaySyncReplicationWALThreads = 1889 conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1); 1890 executorService.startExecutorService(executorService.new ExecutorConfig() 1891 .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL) 1892 .setCorePoolSize(replaySyncReplicationWALThreads)); 1893 final int switchRpcThrottleThreads = 1894 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); 1895 executorService.startExecutorService( 1896 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE) 1897 .setCorePoolSize(switchRpcThrottleThreads)); 1898 final int claimReplicationQueueThreads = 1899 conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); 1900 executorService.startExecutorService( 1901 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE) 1902 .setCorePoolSize(claimReplicationQueueThreads)); 1903 final int rsSnapshotOperationThreads = 1904 conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); 1905 executorService.startExecutorService( 1906 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) 1907 .setCorePoolSize(rsSnapshotOperationThreads)); 1908 1909 Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", 1910 uncaughtExceptionHandler); 1911 if (this.cacheFlusher != null) { 1912 this.cacheFlusher.start(uncaughtExceptionHandler); 1913 } 1914 Threads.setDaemonThreadRunning(this.procedureResultReporter, 1915 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 1916 1917 if (this.compactionChecker != null) { 1918 choreService.scheduleChore(compactionChecker); 1919 } 1920 if (this.periodicFlusher != null) { 1921 choreService.scheduleChore(periodicFlusher); 1922 } 1923 if (this.healthCheckChore != null) { 1924 choreService.scheduleChore(healthCheckChore); 1925 } 1926 if (this.executorStatusChore != null) { 1927 choreService.scheduleChore(executorStatusChore); 1928 } 1929 if (this.nonceManagerChore != null) { 1930 choreService.scheduleChore(nonceManagerChore); 1931 } 1932 if (this.storefileRefresher != null) { 1933 choreService.scheduleChore(storefileRefresher); 1934 } 1935 if (this.fsUtilizationChore != null) { 1936 choreService.scheduleChore(fsUtilizationChore); 1937 } 1938 if (this.namedQueueServiceChore != null) { 1939 choreService.scheduleChore(namedQueueServiceChore); 1940 } 1941 if (this.brokenStoreFileCleaner != null) { 1942 choreService.scheduleChore(brokenStoreFileCleaner); 1943 } 1944 if (this.rsMobFileCleanerChore != null) { 1945 choreService.scheduleChore(rsMobFileCleanerChore); 1946 } 1947 if (replicationMarkerChore != null) { 1948 LOG.info("Starting replication marker chore"); 1949 choreService.scheduleChore(replicationMarkerChore); 1950 } 1951 1952 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 1953 // an unhandled exception, it will just exit. 1954 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 1955 uncaughtExceptionHandler); 1956 1957 // Create the log splitting worker and start it 1958 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 1959 // quite a while inside Connection layer. The worker won't be available for other 1960 // tasks even after current task is preempted after a split task times out. 1961 Configuration sinkConf = HBaseConfiguration.create(conf); 1962 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1963 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 1964 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1965 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 1966 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 1967 if ( 1968 this.csm != null 1969 && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 1970 ) { 1971 // SplitLogWorker needs csm. If none, don't start this. 1972 this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); 1973 splitLogWorker.start(); 1974 LOG.debug("SplitLogWorker started"); 1975 } 1976 1977 // Memstore services. 1978 startHeapMemoryManager(); 1979 // Call it after starting HeapMemoryManager. 1980 initializeMemStoreChunkCreator(hMemManager); 1981 } 1982 1983 private void initializeThreads() { 1984 // Cache flushing thread. 1985 this.cacheFlusher = new MemStoreFlusher(conf, this); 1986 1987 // Compaction thread 1988 this.compactSplitThread = new CompactSplit(this); 1989 1990 // Background thread to check for compactions; needed if region has not gotten updates 1991 // in a while. It will take care of not checking too frequently on store-by-store basis. 1992 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 1993 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 1994 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 1995 1996 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 1997 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 1998 final boolean walEventTrackerEnabled = 1999 conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); 2000 2001 if (isSlowLogTableEnabled || walEventTrackerEnabled) { 2002 // default chore duration: 10 min 2003 // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property 2004 final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY, 2005 DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION); 2006 2007 final int namedQueueChoreDuration = 2008 conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT); 2009 // Considering min of slowLogChoreDuration and namedQueueChoreDuration 2010 int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration); 2011 2012 namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration, 2013 this.namedQueueRecorder, this.getConnection()); 2014 } 2015 2016 if (this.nonceManager != null) { 2017 // Create the scheduled chore that cleans up nonces. 2018 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2019 } 2020 2021 // Setup the Quota Manager 2022 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2023 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2024 2025 if (QuotaUtil.isQuotaEnabled(conf)) { 2026 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2027 } 2028 2029 boolean onlyMetaRefresh = false; 2030 int storefileRefreshPeriod = 2031 conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2032 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2033 if (storefileRefreshPeriod == 0) { 2034 storefileRefreshPeriod = 2035 conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2036 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2037 onlyMetaRefresh = true; 2038 } 2039 if (storefileRefreshPeriod > 0) { 2040 this.storefileRefresher = 2041 new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); 2042 } 2043 2044 int brokenStoreFileCleanerPeriod = 2045 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, 2046 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); 2047 int brokenStoreFileCleanerDelay = 2048 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, 2049 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); 2050 double brokenStoreFileCleanerDelayJitter = 2051 conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, 2052 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); 2053 double jitterRate = 2054 (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; 2055 long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); 2056 this.brokenStoreFileCleaner = 2057 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), 2058 brokenStoreFileCleanerPeriod, this, conf, this); 2059 2060 this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); 2061 2062 registerConfigurationObservers(); 2063 initializeReplicationMarkerChore(); 2064 } 2065 2066 private void registerConfigurationObservers() { 2067 // Register Replication if possible, as now we support recreating replication peer storage, for 2068 // migrating across different replication peer storages online 2069 if (replicationSourceHandler instanceof ConfigurationObserver) { 2070 configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler); 2071 } 2072 if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) { 2073 configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler); 2074 } 2075 // Registering the compactSplitThread object with the ConfigurationManager. 2076 configurationManager.registerObserver(this.compactSplitThread); 2077 configurationManager.registerObserver(this.rpcServices); 2078 configurationManager.registerObserver(this); 2079 } 2080 2081 /* 2082 * Verify that server is healthy 2083 */ 2084 private boolean isHealthy() { 2085 if (!dataFsOk) { 2086 // File system problem 2087 return false; 2088 } 2089 // Verify that all threads are alive 2090 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2091 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2092 && (this.walRoller == null || this.walRoller.isAlive()) 2093 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2094 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2095 if (!healthy) { 2096 stop("One or more threads are no longer alive -- stop"); 2097 } 2098 return healthy; 2099 } 2100 2101 @Override 2102 public List<WAL> getWALs() { 2103 return walFactory.getWALs(); 2104 } 2105 2106 @Override 2107 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2108 WAL wal = walFactory.getWAL(regionInfo); 2109 if (this.walRoller != null) { 2110 this.walRoller.addWAL(wal); 2111 } 2112 return wal; 2113 } 2114 2115 public LogRoller getWalRoller() { 2116 return walRoller; 2117 } 2118 2119 WALFactory getWalFactory() { 2120 return walFactory; 2121 } 2122 2123 @Override 2124 public void stop(final String msg) { 2125 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2126 } 2127 2128 /** 2129 * Stops the regionserver. 2130 * @param msg Status message 2131 * @param force True if this is a regionserver abort 2132 * @param user The user executing the stop request, or null if no user is associated 2133 */ 2134 public void stop(final String msg, final boolean force, final User user) { 2135 if (!this.stopped) { 2136 LOG.info("***** STOPPING region server '" + this + "' *****"); 2137 if (this.rsHost != null) { 2138 // when forced via abort don't allow CPs to override 2139 try { 2140 this.rsHost.preStop(msg, user); 2141 } catch (IOException ioe) { 2142 if (!force) { 2143 LOG.warn("The region server did not stop", ioe); 2144 return; 2145 } 2146 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2147 } 2148 } 2149 this.stopped = true; 2150 LOG.info("STOPPED: " + msg); 2151 // Wakes run() if it is sleeping 2152 sleeper.skipSleepCycle(); 2153 } 2154 } 2155 2156 public void waitForServerOnline() { 2157 while (!isStopped() && !isOnline()) { 2158 synchronized (online) { 2159 try { 2160 online.wait(msgInterval); 2161 } catch (InterruptedException ie) { 2162 Thread.currentThread().interrupt(); 2163 break; 2164 } 2165 } 2166 } 2167 } 2168 2169 @Override 2170 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2171 HRegion r = context.getRegion(); 2172 long openProcId = context.getOpenProcId(); 2173 long masterSystemTime = context.getMasterSystemTime(); 2174 rpcServices.checkOpen(); 2175 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2176 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2177 // Do checks to see if we need to compact (references or too many files) 2178 // Skip compaction check if region is read only 2179 if (!r.isReadOnly()) { 2180 for (HStore s : r.stores.values()) { 2181 if (s.hasReferences() || s.needsCompaction()) { 2182 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2183 } 2184 } 2185 } 2186 long openSeqNum = r.getOpenSeqNum(); 2187 if (openSeqNum == HConstants.NO_SEQNUM) { 2188 // If we opened a region, we should have read some sequence number from it. 2189 LOG.error( 2190 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2191 openSeqNum = 0; 2192 } 2193 2194 // Notify master 2195 if ( 2196 !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2197 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo())) 2198 ) { 2199 throw new IOException( 2200 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2201 } 2202 2203 triggerFlushInPrimaryRegion(r); 2204 2205 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2206 } 2207 2208 /** 2209 * Helper method for use in tests. Skip the region transition report when there's no master around 2210 * to receive it. 2211 */ 2212 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2213 final TransitionCode code = context.getCode(); 2214 final long openSeqNum = context.getOpenSeqNum(); 2215 long masterSystemTime = context.getMasterSystemTime(); 2216 final RegionInfo[] hris = context.getHris(); 2217 2218 if (code == TransitionCode.OPENED) { 2219 Preconditions.checkArgument(hris != null && hris.length == 1); 2220 if (hris[0].isMetaRegion()) { 2221 LOG.warn( 2222 "meta table location is stored in master local store, so we can not skip reporting"); 2223 return false; 2224 } else { 2225 try { 2226 MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0], 2227 serverName, openSeqNum, masterSystemTime); 2228 } catch (IOException e) { 2229 LOG.info("Failed to update meta", e); 2230 return false; 2231 } 2232 } 2233 } 2234 return true; 2235 } 2236 2237 private ReportRegionStateTransitionRequest 2238 createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) { 2239 final TransitionCode code = context.getCode(); 2240 final long openSeqNum = context.getOpenSeqNum(); 2241 final RegionInfo[] hris = context.getHris(); 2242 final long[] procIds = context.getProcIds(); 2243 2244 ReportRegionStateTransitionRequest.Builder builder = 2245 ReportRegionStateTransitionRequest.newBuilder(); 2246 builder.setServer(ProtobufUtil.toServerName(serverName)); 2247 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2248 transition.setTransitionCode(code); 2249 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2250 transition.setOpenSeqNum(openSeqNum); 2251 } 2252 for (RegionInfo hri : hris) { 2253 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2254 } 2255 for (long procId : procIds) { 2256 transition.addProcId(procId); 2257 } 2258 2259 return builder.build(); 2260 } 2261 2262 @Override 2263 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2264 if (TEST_SKIP_REPORTING_TRANSITION) { 2265 return skipReportingTransition(context); 2266 } 2267 final ReportRegionStateTransitionRequest request = 2268 createReportRegionStateTransitionRequest(context); 2269 2270 int tries = 0; 2271 long pauseTime = this.retryPauseTime; 2272 // Keep looping till we get an error. We want to send reports even though server is going down. 2273 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2274 // HRegionServer does down. 2275 while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) { 2276 RegionServerStatusService.BlockingInterface rss = rssStub; 2277 try { 2278 if (rss == null) { 2279 createRegionServerStatusStub(); 2280 continue; 2281 } 2282 ReportRegionStateTransitionResponse response = 2283 rss.reportRegionStateTransition(null, request); 2284 if (response.hasErrorMessage()) { 2285 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2286 break; 2287 } 2288 // Log if we had to retry else don't log unless TRACE. We want to 2289 // know if were successful after an attempt showed in logs as failed. 2290 if (tries > 0 || LOG.isTraceEnabled()) { 2291 LOG.info("TRANSITION REPORTED " + request); 2292 } 2293 // NOTE: Return mid-method!!! 2294 return true; 2295 } catch (ServiceException se) { 2296 IOException ioe = ProtobufUtil.getRemoteException(se); 2297 boolean pause = ioe instanceof ServerNotRunningYetException 2298 || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException; 2299 if (pause) { 2300 // Do backoff else we flood the Master with requests. 2301 pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries); 2302 } else { 2303 pauseTime = this.retryPauseTime; // Reset. 2304 } 2305 LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" 2306 + tries + ")" 2307 + (pause 2308 ? " after " + pauseTime + "ms delay (Master is coming online...)." 2309 : " immediately."), 2310 ioe); 2311 if (pause) { 2312 Threads.sleep(pauseTime); 2313 } 2314 tries++; 2315 if (rssStub == rss) { 2316 rssStub = null; 2317 } 2318 } 2319 } 2320 return false; 2321 } 2322 2323 /** 2324 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2325 * block this thread. See RegionReplicaFlushHandler for details. 2326 */ 2327 private void triggerFlushInPrimaryRegion(final HRegion region) { 2328 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2329 return; 2330 } 2331 TableName tn = region.getTableDescriptor().getTableName(); 2332 if ( 2333 !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) 2334 || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) || 2335 // If the memstore replication not setup, we do not have to wait for observing a flush event 2336 // from primary before starting to serve reads, because gaps from replication is not 2337 // applicable,this logic is from 2338 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by 2339 // HBASE-13063 2340 !region.getTableDescriptor().hasRegionMemStoreReplication() 2341 ) { 2342 region.setReadsEnabled(true); 2343 return; 2344 } 2345 2346 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2347 // RegionReplicaFlushHandler might reset this. 2348 2349 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2350 if (this.executorService != null) { 2351 this.executorService.submit(new RegionReplicaFlushHandler(this, region)); 2352 } else { 2353 LOG.info("Executor is null; not running flush of primary region replica for {}", 2354 region.getRegionInfo()); 2355 } 2356 } 2357 2358 @InterfaceAudience.Private 2359 public RSRpcServices getRSRpcServices() { 2360 return rpcServices; 2361 } 2362 2363 /** 2364 * Cause the server to exit without closing the regions it is serving, the log it is using and 2365 * without notifying the master. Used unit testing and on catastrophic events such as HDFS is 2366 * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused 2367 * the abort, or null 2368 */ 2369 @Override 2370 public void abort(String reason, Throwable cause) { 2371 if (!setAbortRequested()) { 2372 // Abort already in progress, ignore the new request. 2373 LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason); 2374 return; 2375 } 2376 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2377 if (cause != null) { 2378 LOG.error(HBaseMarkers.FATAL, msg, cause); 2379 } else { 2380 LOG.error(HBaseMarkers.FATAL, msg); 2381 } 2382 // HBASE-4014: show list of coprocessors that were loaded to help debug 2383 // regionserver crashes.Note that we're implicitly using 2384 // java.util.HashSet's toString() method to print the coprocessor names. 2385 LOG.error(HBaseMarkers.FATAL, 2386 "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors()); 2387 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2388 try { 2389 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2390 } catch (MalformedObjectNameException | IOException e) { 2391 LOG.warn("Failed dumping metrics", e); 2392 } 2393 2394 // Do our best to report our abort to the master, but this may not work 2395 try { 2396 if (cause != null) { 2397 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2398 } 2399 // Report to the master but only if we have already registered with the master. 2400 RegionServerStatusService.BlockingInterface rss = rssStub; 2401 if (rss != null && this.serverName != null) { 2402 ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); 2403 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2404 builder.setErrorMessage(msg); 2405 rss.reportRSFatalError(null, builder.build()); 2406 } 2407 } catch (Throwable t) { 2408 LOG.warn("Unable to report fatal error to master", t); 2409 } 2410 2411 scheduleAbortTimer(); 2412 // shutdown should be run as the internal user 2413 stop(reason, true, null); 2414 } 2415 2416 /* 2417 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does 2418 * close socket in case want to bring up server on old hostname+port immediately. 2419 */ 2420 @InterfaceAudience.Private 2421 protected void kill() { 2422 this.killed = true; 2423 abort("Simulated kill"); 2424 } 2425 2426 // Limits the time spent in the shutdown process. 2427 private void scheduleAbortTimer() { 2428 if (this.abortMonitor == null) { 2429 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2430 TimerTask abortTimeoutTask = null; 2431 try { 2432 Constructor<? extends TimerTask> timerTaskCtor = 2433 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2434 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2435 timerTaskCtor.setAccessible(true); 2436 abortTimeoutTask = timerTaskCtor.newInstance(); 2437 } catch (Exception e) { 2438 LOG.warn("Initialize abort timeout task failed", e); 2439 } 2440 if (abortTimeoutTask != null) { 2441 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2442 } 2443 } 2444 } 2445 2446 /** 2447 * Wait on all threads to finish. Presumption is that all closes and stops have already been 2448 * called. 2449 */ 2450 protected void stopServiceThreads() { 2451 // clean up the scheduled chores 2452 stopChoreService(); 2453 if (bootstrapNodeManager != null) { 2454 bootstrapNodeManager.stop(); 2455 } 2456 if (this.cacheFlusher != null) { 2457 this.cacheFlusher.join(); 2458 } 2459 if (this.walRoller != null) { 2460 this.walRoller.close(); 2461 } 2462 if (this.compactSplitThread != null) { 2463 this.compactSplitThread.join(); 2464 } 2465 stopExecutorService(); 2466 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 2467 this.replicationSourceHandler.stopReplicationService(); 2468 } else { 2469 if (this.replicationSourceHandler != null) { 2470 this.replicationSourceHandler.stopReplicationService(); 2471 } 2472 if (this.replicationSinkHandler != null) { 2473 this.replicationSinkHandler.stopReplicationService(); 2474 } 2475 } 2476 } 2477 2478 /** Returns Return the object that implements the replication source executorService. */ 2479 @Override 2480 public ReplicationSourceService getReplicationSourceService() { 2481 return replicationSourceHandler; 2482 } 2483 2484 /** Returns Return the object that implements the replication sink executorService. */ 2485 public ReplicationSinkService getReplicationSinkService() { 2486 return replicationSinkHandler; 2487 } 2488 2489 /** 2490 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2491 * connection, the current rssStub must be null. Method will block until a master is available. 2492 * You can break from this block by requesting the server stop. 2493 * @return master + port, or null if server has been stopped 2494 */ 2495 private synchronized ServerName createRegionServerStatusStub() { 2496 // Create RS stub without refreshing the master node from ZK, use cached data 2497 return createRegionServerStatusStub(false); 2498 } 2499 2500 /** 2501 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2502 * connection, the current rssStub must be null. Method will block until a master is available. 2503 * You can break from this block by requesting the server stop. 2504 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2505 * @return master + port, or null if server has been stopped 2506 */ 2507 @InterfaceAudience.Private 2508 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2509 if (rssStub != null) { 2510 return masterAddressTracker.getMasterAddress(); 2511 } 2512 ServerName sn = null; 2513 long previousLogTime = 0; 2514 RegionServerStatusService.BlockingInterface intRssStub = null; 2515 LockService.BlockingInterface intLockStub = null; 2516 boolean interrupted = false; 2517 try { 2518 while (keepLooping()) { 2519 sn = this.masterAddressTracker.getMasterAddress(refresh); 2520 if (sn == null) { 2521 if (!keepLooping()) { 2522 // give up with no connection. 2523 LOG.debug("No master found and cluster is stopped; bailing out"); 2524 return null; 2525 } 2526 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2527 LOG.debug("No master found; retry"); 2528 previousLogTime = EnvironmentEdgeManager.currentTime(); 2529 } 2530 refresh = true; // let's try pull it from ZK directly 2531 if (sleepInterrupted(200)) { 2532 interrupted = true; 2533 } 2534 continue; 2535 } 2536 try { 2537 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, 2538 userProvider.getCurrent(), shortOperationTimeout); 2539 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2540 intLockStub = LockService.newBlockingStub(channel); 2541 break; 2542 } catch (IOException e) { 2543 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2544 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 2545 if (e instanceof ServerNotRunningYetException) { 2546 LOG.info("Master isn't available yet, retrying"); 2547 } else { 2548 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2549 } 2550 previousLogTime = EnvironmentEdgeManager.currentTime(); 2551 } 2552 if (sleepInterrupted(200)) { 2553 interrupted = true; 2554 } 2555 } 2556 } 2557 } finally { 2558 if (interrupted) { 2559 Thread.currentThread().interrupt(); 2560 } 2561 } 2562 this.rssStub = intRssStub; 2563 this.lockStub = intLockStub; 2564 return sn; 2565 } 2566 2567 /** 2568 * @return True if we should break loop because cluster is going down or this server has been 2569 * stopped or hdfs has gone bad. 2570 */ 2571 private boolean keepLooping() { 2572 return !this.stopped && isClusterUp(); 2573 } 2574 2575 /* 2576 * Let the master know we're here Run initialization using parameters passed us by the master. 2577 * @return A Map of key/value configurations we got from the Master else null if we failed to 2578 * register. 2579 */ 2580 private RegionServerStartupResponse reportForDuty() throws IOException { 2581 if (this.masterless) { 2582 return RegionServerStartupResponse.getDefaultInstance(); 2583 } 2584 ServerName masterServerName = createRegionServerStatusStub(true); 2585 RegionServerStatusService.BlockingInterface rss = rssStub; 2586 if (masterServerName == null || rss == null) { 2587 return null; 2588 } 2589 RegionServerStartupResponse result = null; 2590 try { 2591 rpcServices.requestCount.reset(); 2592 rpcServices.rpcGetRequestCount.reset(); 2593 rpcServices.rpcScanRequestCount.reset(); 2594 rpcServices.rpcFullScanRequestCount.reset(); 2595 rpcServices.rpcMultiRequestCount.reset(); 2596 rpcServices.rpcMutateRequestCount.reset(); 2597 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2598 + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode); 2599 long now = EnvironmentEdgeManager.currentTime(); 2600 int port = rpcServices.getSocketAddress().getPort(); 2601 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2602 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2603 request.setUseThisHostnameInstead(useThisHostnameInstead); 2604 } 2605 request.setPort(port); 2606 request.setServerStartCode(this.startcode); 2607 request.setServerCurrentTime(now); 2608 result = rss.regionServerStartup(null, request.build()); 2609 } catch (ServiceException se) { 2610 IOException ioe = ProtobufUtil.getRemoteException(se); 2611 if (ioe instanceof ClockOutOfSyncException) { 2612 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe); 2613 // Re-throw IOE will cause RS to abort 2614 throw ioe; 2615 } else if (ioe instanceof ServerNotRunningYetException) { 2616 LOG.debug("Master is not running yet"); 2617 } else { 2618 LOG.warn("error telling master we are up", se); 2619 } 2620 rssStub = null; 2621 } 2622 return result; 2623 } 2624 2625 @Override 2626 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2627 try { 2628 GetLastFlushedSequenceIdRequest req = 2629 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2630 RegionServerStatusService.BlockingInterface rss = rssStub; 2631 if (rss == null) { // Try to connect one more time 2632 createRegionServerStatusStub(); 2633 rss = rssStub; 2634 if (rss == null) { 2635 // Still no luck, we tried 2636 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2637 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2638 .build(); 2639 } 2640 } 2641 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2642 return RegionStoreSequenceIds.newBuilder() 2643 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2644 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2645 } catch (ServiceException e) { 2646 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2647 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2648 .build(); 2649 } 2650 } 2651 2652 /** 2653 * Close meta region if we carry it 2654 * @param abort Whether we're running an abort. 2655 */ 2656 private void closeMetaTableRegions(final boolean abort) { 2657 HRegion meta = null; 2658 this.onlineRegionsLock.writeLock().lock(); 2659 try { 2660 for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) { 2661 RegionInfo hri = e.getValue().getRegionInfo(); 2662 if (hri.isMetaRegion()) { 2663 meta = e.getValue(); 2664 } 2665 if (meta != null) { 2666 break; 2667 } 2668 } 2669 } finally { 2670 this.onlineRegionsLock.writeLock().unlock(); 2671 } 2672 if (meta != null) { 2673 closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2674 } 2675 } 2676 2677 /** 2678 * Schedule closes on all user regions. Should be safe calling multiple times because it wont' 2679 * close regions that are already closed or that are closing. 2680 * @param abort Whether we're running an abort. 2681 */ 2682 private void closeUserRegions(final boolean abort) { 2683 this.onlineRegionsLock.writeLock().lock(); 2684 try { 2685 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 2686 HRegion r = e.getValue(); 2687 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2688 // Don't update zk with this close transition; pass false. 2689 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2690 } 2691 } 2692 } finally { 2693 this.onlineRegionsLock.writeLock().unlock(); 2694 } 2695 } 2696 2697 protected Map<String, HRegion> getOnlineRegions() { 2698 return this.onlineRegions; 2699 } 2700 2701 public int getNumberOfOnlineRegions() { 2702 return this.onlineRegions.size(); 2703 } 2704 2705 /** 2706 * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM 2707 * as client; HRegion cannot be serialized to cross an rpc. 2708 */ 2709 public Collection<HRegion> getOnlineRegionsLocalContext() { 2710 Collection<HRegion> regions = this.onlineRegions.values(); 2711 return Collections.unmodifiableCollection(regions); 2712 } 2713 2714 @Override 2715 public void addRegion(HRegion region) { 2716 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2717 configurationManager.registerObserver(region); 2718 } 2719 2720 private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region, 2721 long size) { 2722 if (!sortedRegions.containsKey(size)) { 2723 sortedRegions.put(size, new ArrayList<>()); 2724 } 2725 sortedRegions.get(size).add(region); 2726 } 2727 2728 /** 2729 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2730 * the biggest. 2731 */ 2732 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2733 // we'll sort the regions in reverse 2734 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2735 // Copy over all regions. Regions are sorted by size with biggest first. 2736 for (HRegion region : this.onlineRegions.values()) { 2737 addRegion(sortedRegions, region, region.getMemStoreOffHeapSize()); 2738 } 2739 return sortedRegions; 2740 } 2741 2742 /** 2743 * @return A new Map of online regions sorted by region heap size with the first entry being the 2744 * biggest. 2745 */ 2746 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2747 // we'll sort the regions in reverse 2748 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2749 // Copy over all regions. Regions are sorted by size with biggest first. 2750 for (HRegion region : this.onlineRegions.values()) { 2751 addRegion(sortedRegions, region, region.getMemStoreHeapSize()); 2752 } 2753 return sortedRegions; 2754 } 2755 2756 /** Returns reference to FlushRequester */ 2757 @Override 2758 public FlushRequester getFlushRequester() { 2759 return this.cacheFlusher; 2760 } 2761 2762 @Override 2763 public CompactionRequester getCompactionRequestor() { 2764 return this.compactSplitThread; 2765 } 2766 2767 @Override 2768 public LeaseManager getLeaseManager() { 2769 return leaseManager; 2770 } 2771 2772 /** Returns {@code true} when the data file system is available, {@code false} otherwise. */ 2773 boolean isDataFileSystemOk() { 2774 return this.dataFsOk; 2775 } 2776 2777 public RegionServerCoprocessorHost getRegionServerCoprocessorHost() { 2778 return this.rsHost; 2779 } 2780 2781 @Override 2782 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2783 return this.regionsInTransitionInRS; 2784 } 2785 2786 @Override 2787 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 2788 return rsQuotaManager; 2789 } 2790 2791 // 2792 // Main program and support routines 2793 // 2794 /** 2795 * Load the replication executorService objects, if any 2796 */ 2797 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 2798 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { 2799 // read in the name of the source replication class from the config file. 2800 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 2801 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2802 2803 // read in the name of the sink replication class from the config file. 2804 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 2805 HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT); 2806 2807 // If both the sink and the source class names are the same, then instantiate 2808 // only one object. 2809 if (sourceClassname.equals(sinkClassname)) { 2810 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2811 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2812 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 2813 server.sameReplicationSourceAndSink = true; 2814 } else { 2815 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2816 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2817 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 2818 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2819 server.sameReplicationSourceAndSink = false; 2820 } 2821 } 2822 2823 private static <T extends ReplicationService> T newReplicationInstance(String classname, 2824 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 2825 Path oldLogDir, WALFactory walFactory) throws IOException { 2826 final Class<? extends T> clazz; 2827 try { 2828 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 2829 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 2830 } catch (java.lang.ClassNotFoundException nfe) { 2831 throw new IOException("Could not find class for " + classname); 2832 } 2833 T service = ReflectionUtils.newInstance(clazz, conf); 2834 service.initialize(server, walFs, logDir, oldLogDir, walFactory); 2835 return service; 2836 } 2837 2838 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() { 2839 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 2840 if (!this.isOnline()) { 2841 return walGroupsReplicationStatus; 2842 } 2843 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 2844 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 2845 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 2846 for (ReplicationSourceInterface source : allSources) { 2847 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 2848 } 2849 return walGroupsReplicationStatus; 2850 } 2851 2852 /** 2853 * Utility for constructing an instance of the passed HRegionServer class. 2854 */ 2855 static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass, 2856 final Configuration conf) { 2857 try { 2858 Constructor<? extends HRegionServer> c = 2859 regionServerClass.getConstructor(Configuration.class); 2860 return c.newInstance(conf); 2861 } catch (Exception e) { 2862 throw new RuntimeException( 2863 "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); 2864 } 2865 } 2866 2867 /** 2868 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 2869 */ 2870 public static void main(String[] args) { 2871 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 2872 VersionInfo.logVersion(); 2873 Configuration conf = HBaseConfiguration.create(); 2874 @SuppressWarnings("unchecked") 2875 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 2876 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 2877 2878 new HRegionServerCommandLine(regionServerClass).doMain(args); 2879 } 2880 2881 /** 2882 * Gets the online regions of the specified table. This method looks at the in-memory 2883 * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions. 2884 * If a region on this table has been closed during a disable, etc., it will not be included in 2885 * the returned list. So, the returned list may not necessarily be ALL regions in this table, its 2886 * all the ONLINE regions in the table. 2887 * @param tableName table to limit the scope of the query 2888 * @return Online regions from <code>tableName</code> 2889 */ 2890 @Override 2891 public List<HRegion> getRegions(TableName tableName) { 2892 List<HRegion> tableRegions = new ArrayList<>(); 2893 synchronized (this.onlineRegions) { 2894 for (HRegion region : this.onlineRegions.values()) { 2895 RegionInfo regionInfo = region.getRegionInfo(); 2896 if (regionInfo.getTable().equals(tableName)) { 2897 tableRegions.add(region); 2898 } 2899 } 2900 } 2901 return tableRegions; 2902 } 2903 2904 @Override 2905 public List<HRegion> getRegions() { 2906 List<HRegion> allRegions; 2907 synchronized (this.onlineRegions) { 2908 // Return a clone copy of the onlineRegions 2909 allRegions = new ArrayList<>(onlineRegions.values()); 2910 } 2911 return allRegions; 2912 } 2913 2914 /** 2915 * Gets the online tables in this RS. This method looks at the in-memory onlineRegions. 2916 * @return all the online tables in this RS 2917 */ 2918 public Set<TableName> getOnlineTables() { 2919 Set<TableName> tables = new HashSet<>(); 2920 synchronized (this.onlineRegions) { 2921 for (Region region : this.onlineRegions.values()) { 2922 tables.add(region.getTableDescriptor().getTableName()); 2923 } 2924 } 2925 return tables; 2926 } 2927 2928 public String[] getRegionServerCoprocessors() { 2929 TreeSet<String> coprocessors = new TreeSet<>(); 2930 try { 2931 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 2932 } catch (IOException exception) { 2933 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " 2934 + "skipping."); 2935 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 2936 } 2937 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 2938 for (HRegion region : regions) { 2939 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 2940 try { 2941 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 2942 } catch (IOException exception) { 2943 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region 2944 + "; skipping."); 2945 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 2946 } 2947 } 2948 coprocessors.addAll(rsHost.getCoprocessors()); 2949 return coprocessors.toArray(new String[0]); 2950 } 2951 2952 /** 2953 * Try to close the region, logs a warning on failure but continues. 2954 * @param region Region to close 2955 */ 2956 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 2957 try { 2958 if (!closeRegion(region.getEncodedName(), abort, null)) { 2959 LOG 2960 .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing"); 2961 } 2962 } catch (IOException e) { 2963 LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing", 2964 e); 2965 } 2966 } 2967 2968 /** 2969 * Close asynchronously a region, can be called from the master or internally by the regionserver 2970 * when stopping. If called from the master, the region will update the status. 2971 * <p> 2972 * If an opening was in progress, this method will cancel it, but will not start a new close. The 2973 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 2974 * </p> 2975 * <p> 2976 * If a close was in progress, this new request will be ignored, and an exception thrown. 2977 * </p> 2978 * <p> 2979 * Provides additional flag to indicate if this region blocks should be evicted from the cache. 2980 * </p> 2981 * @param encodedName Region to close 2982 * @param abort True if we are aborting 2983 * @param destination Where the Region is being moved too... maybe null if unknown. 2984 * @return True if closed a region. 2985 * @throws NotServingRegionException if the region is not online 2986 */ 2987 protected boolean closeRegion(String encodedName, final boolean abort, 2988 final ServerName destination) throws NotServingRegionException { 2989 // Check for permissions to close. 2990 HRegion actualRegion = this.getRegion(encodedName); 2991 // Can be null if we're calling close on a region that's not online 2992 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 2993 try { 2994 actualRegion.getCoprocessorHost().preClose(false); 2995 } catch (IOException exp) { 2996 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 2997 return false; 2998 } 2999 } 3000 3001 // previous can come back 'null' if not in map. 3002 final Boolean previous = 3003 this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE); 3004 3005 if (Boolean.TRUE.equals(previous)) { 3006 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " 3007 + "trying to OPEN. Cancelling OPENING."); 3008 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3009 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3010 // We're going to try to do a standard close then. 3011 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." 3012 + " Doing a standard close now"); 3013 return closeRegion(encodedName, abort, destination); 3014 } 3015 // Let's get the region from the online region list again 3016 actualRegion = this.getRegion(encodedName); 3017 if (actualRegion == null) { // If already online, we still need to close it. 3018 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3019 // The master deletes the znode when it receives this exception. 3020 throw new NotServingRegionException( 3021 "The region " + encodedName + " was opening but not yet served. Opening is cancelled."); 3022 } 3023 } else if (previous == null) { 3024 LOG.info("Received CLOSE for {}", encodedName); 3025 } else if (Boolean.FALSE.equals(previous)) { 3026 LOG.info("Received CLOSE for the region: " + encodedName 3027 + ", which we are already trying to CLOSE, but not completed yet"); 3028 return true; 3029 } 3030 3031 if (actualRegion == null) { 3032 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3033 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3034 // The master deletes the znode when it receives this exception. 3035 throw new NotServingRegionException( 3036 "The region " + encodedName + " is not online, and is not opening."); 3037 } 3038 3039 CloseRegionHandler crh; 3040 final RegionInfo hri = actualRegion.getRegionInfo(); 3041 if (hri.isMetaRegion()) { 3042 crh = new CloseMetaHandler(this, this, hri, abort); 3043 } else { 3044 crh = new CloseRegionHandler(this, this, hri, abort, destination); 3045 } 3046 this.executorService.submit(crh); 3047 return true; 3048 } 3049 3050 /** 3051 * @return HRegion for the passed binary <code>regionName</code> or null if named region is not 3052 * member of the online regions. 3053 */ 3054 public HRegion getOnlineRegion(final byte[] regionName) { 3055 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3056 return this.onlineRegions.get(encodedRegionName); 3057 } 3058 3059 @Override 3060 public HRegion getRegion(final String encodedRegionName) { 3061 return this.onlineRegions.get(encodedRegionName); 3062 } 3063 3064 @Override 3065 public boolean removeRegion(final HRegion r, ServerName destination) { 3066 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3067 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3068 if (destination != null) { 3069 long closeSeqNum = r.getMaxFlushedSeqId(); 3070 if (closeSeqNum == HConstants.NO_SEQNUM) { 3071 // No edits in WAL for this region; get the sequence number when the region was opened. 3072 closeSeqNum = r.getOpenSeqNum(); 3073 if (closeSeqNum == HConstants.NO_SEQNUM) { 3074 closeSeqNum = 0; 3075 } 3076 } 3077 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3078 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3079 if (selfMove) { 3080 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put( 3081 r.getRegionInfo().getEncodedName(), 3082 new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3083 } 3084 } 3085 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3086 configurationManager.deregisterObserver(r); 3087 return toReturn != null; 3088 } 3089 3090 /** 3091 * Protected Utility method for safely obtaining an HRegion handle. 3092 * @param regionName Name of online {@link HRegion} to return 3093 * @return {@link HRegion} for <code>regionName</code> 3094 */ 3095 protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { 3096 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3097 return getRegionByEncodedName(regionName, encodedRegionName); 3098 } 3099 3100 public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { 3101 return getRegionByEncodedName(null, encodedRegionName); 3102 } 3103 3104 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3105 throws NotServingRegionException { 3106 HRegion region = this.onlineRegions.get(encodedRegionName); 3107 if (region == null) { 3108 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3109 if (moveInfo != null) { 3110 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3111 } 3112 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3113 String regionNameStr = 3114 regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName); 3115 if (isOpening != null && isOpening) { 3116 throw new RegionOpeningException( 3117 "Region " + regionNameStr + " is opening on " + this.serverName); 3118 } 3119 throw new NotServingRegionException( 3120 "" + regionNameStr + " is not online on " + this.serverName); 3121 } 3122 return region; 3123 } 3124 3125 /** 3126 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't 3127 * already. 3128 * @param t Throwable 3129 * @param msg Message to log in error. Can be null. 3130 * @return Throwable converted to an IOE; methods can only let out IOEs. 3131 */ 3132 private Throwable cleanup(final Throwable t, final String msg) { 3133 // Don't log as error if NSRE; NSRE is 'normal' operation. 3134 if (t instanceof NotServingRegionException) { 3135 LOG.debug("NotServingRegionException; " + t.getMessage()); 3136 return t; 3137 } 3138 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3139 if (msg == null) { 3140 LOG.error("", e); 3141 } else { 3142 LOG.error(msg, e); 3143 } 3144 if (!rpcServices.checkOOME(t)) { 3145 checkFileSystem(); 3146 } 3147 return t; 3148 } 3149 3150 /** 3151 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3152 * @return Make <code>t</code> an IOE if it isn't already. 3153 */ 3154 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3155 return (t instanceof IOException ? (IOException) t 3156 : msg == null || msg.length() == 0 ? new IOException(t) 3157 : new IOException(msg, t)); 3158 } 3159 3160 /** 3161 * Checks to see if the file system is still accessible. If not, sets abortRequested and 3162 * stopRequested 3163 * @return false if file system is not available 3164 */ 3165 boolean checkFileSystem() { 3166 if (this.dataFsOk && this.dataFs != null) { 3167 try { 3168 FSUtils.checkFileSystemAvailable(this.dataFs); 3169 } catch (IOException e) { 3170 abort("File System not available", e); 3171 this.dataFsOk = false; 3172 } 3173 } 3174 return this.dataFsOk; 3175 } 3176 3177 @Override 3178 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3179 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3180 Address[] addr = new Address[favoredNodes.size()]; 3181 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3182 // it is a map of region name to Address[] 3183 for (int i = 0; i < favoredNodes.size(); i++) { 3184 addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); 3185 } 3186 regionFavoredNodesMap.put(encodedRegionName, addr); 3187 } 3188 3189 /** 3190 * Return the favored nodes for a region given its encoded name. Look at the comment around 3191 * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here. 3192 * @param encodedRegionName the encoded region name. 3193 * @return array of favored locations 3194 */ 3195 @Override 3196 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3197 return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); 3198 } 3199 3200 @Override 3201 public ServerNonceManager getNonceManager() { 3202 return this.nonceManager; 3203 } 3204 3205 private static class MovedRegionInfo { 3206 private final ServerName serverName; 3207 private final long seqNum; 3208 3209 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3210 this.serverName = serverName; 3211 this.seqNum = closeSeqNum; 3212 } 3213 3214 public ServerName getServerName() { 3215 return serverName; 3216 } 3217 3218 public long getSeqNum() { 3219 return seqNum; 3220 } 3221 } 3222 3223 /** 3224 * We need a timeout. If not there is a risk of giving a wrong information: this would double the 3225 * number of network calls instead of reducing them. 3226 */ 3227 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3228 3229 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, 3230 boolean selfMove) { 3231 if (selfMove) { 3232 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3233 return; 3234 } 3235 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" 3236 + closeSeqNum); 3237 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3238 } 3239 3240 void removeFromMovedRegions(String encodedName) { 3241 movedRegionInfoCache.invalidate(encodedName); 3242 } 3243 3244 @InterfaceAudience.Private 3245 public MovedRegionInfo getMovedRegion(String encodedRegionName) { 3246 return movedRegionInfoCache.getIfPresent(encodedRegionName); 3247 } 3248 3249 @InterfaceAudience.Private 3250 public int movedRegionCacheExpiredTime() { 3251 return TIMEOUT_REGION_MOVED; 3252 } 3253 3254 private String getMyEphemeralNodePath() { 3255 return zooKeeper.getZNodePaths().getRsPath(serverName); 3256 } 3257 3258 private boolean isHealthCheckerConfigured() { 3259 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3260 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3261 } 3262 3263 /** Returns the underlying {@link CompactSplit} for the servers */ 3264 public CompactSplit getCompactSplitThread() { 3265 return this.compactSplitThread; 3266 } 3267 3268 CoprocessorServiceResponse execRegionServerService( 3269 @SuppressWarnings("UnusedParameters") final RpcController controller, 3270 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3271 try { 3272 ServerRpcController serviceController = new ServerRpcController(); 3273 CoprocessorServiceCall call = serviceRequest.getCall(); 3274 String serviceName = call.getServiceName(); 3275 Service service = coprocessorServiceHandlers.get(serviceName); 3276 if (service == null) { 3277 throw new UnknownProtocolException(null, 3278 "No registered coprocessor executorService found for " + serviceName); 3279 } 3280 ServiceDescriptor serviceDesc = service.getDescriptorForType(); 3281 3282 String methodName = call.getMethodName(); 3283 MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName); 3284 if (methodDesc == null) { 3285 throw new UnknownProtocolException(service.getClass(), 3286 "Unknown method " + methodName + " called on executorService " + serviceName); 3287 } 3288 3289 Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3290 final Message.Builder responseBuilder = 3291 service.getResponsePrototype(methodDesc).newBuilderForType(); 3292 service.callMethod(methodDesc, serviceController, request, message -> { 3293 if (message != null) { 3294 responseBuilder.mergeFrom(message); 3295 } 3296 }); 3297 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3298 if (exception != null) { 3299 throw exception; 3300 } 3301 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3302 } catch (IOException ie) { 3303 throw new ServiceException(ie); 3304 } 3305 } 3306 3307 /** 3308 * May be null if this is a master which not carry table. 3309 * @return The block cache instance used by the regionserver. 3310 */ 3311 @Override 3312 public Optional<BlockCache> getBlockCache() { 3313 return Optional.ofNullable(this.blockCache); 3314 } 3315 3316 /** 3317 * May be null if this is a master which not carry table. 3318 * @return The cache for mob files used by the regionserver. 3319 */ 3320 @Override 3321 public Optional<MobFileCache> getMobFileCache() { 3322 return Optional.ofNullable(this.mobFileCache); 3323 } 3324 3325 CacheEvictionStats clearRegionBlockCache(Region region) { 3326 long evictedBlocks = 0; 3327 3328 for (Store store : region.getStores()) { 3329 for (StoreFile hFile : store.getStorefiles()) { 3330 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3331 } 3332 } 3333 3334 return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build(); 3335 } 3336 3337 @Override 3338 public double getCompactionPressure() { 3339 double max = 0; 3340 for (Region region : onlineRegions.values()) { 3341 for (Store store : region.getStores()) { 3342 double normCount = store.getCompactionPressure(); 3343 if (normCount > max) { 3344 max = normCount; 3345 } 3346 } 3347 } 3348 return max; 3349 } 3350 3351 @Override 3352 public HeapMemoryManager getHeapMemoryManager() { 3353 return hMemManager; 3354 } 3355 3356 public MemStoreFlusher getMemStoreFlusher() { 3357 return cacheFlusher; 3358 } 3359 3360 /** 3361 * For testing 3362 * @return whether all wal roll request finished for this regionserver 3363 */ 3364 @InterfaceAudience.Private 3365 public boolean walRollRequestFinished() { 3366 return this.walRoller.walRollFinished(); 3367 } 3368 3369 @Override 3370 public ThroughputController getFlushThroughputController() { 3371 return flushThroughputController; 3372 } 3373 3374 @Override 3375 public double getFlushPressure() { 3376 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3377 // return 0 during RS initialization 3378 return 0.0; 3379 } 3380 return getRegionServerAccounting().getFlushPressure(); 3381 } 3382 3383 @Override 3384 public void onConfigurationChange(Configuration newConf) { 3385 ThroughputController old = this.flushThroughputController; 3386 if (old != null) { 3387 old.stop("configuration change"); 3388 } 3389 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3390 try { 3391 Superusers.initialize(newConf); 3392 } catch (IOException e) { 3393 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3394 } 3395 3396 // update region server coprocessor if the configuration has changed. 3397 if ( 3398 CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf, 3399 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY) 3400 ) { 3401 LOG.info("Update region server coprocessors because the configuration has changed"); 3402 this.rsHost = new RegionServerCoprocessorHost(this, newConf); 3403 } 3404 } 3405 3406 @Override 3407 public MetricsRegionServer getMetrics() { 3408 return metricsRegionServer; 3409 } 3410 3411 @Override 3412 public SecureBulkLoadManager getSecureBulkLoadManager() { 3413 return this.secureBulkLoadManager; 3414 } 3415 3416 @Override 3417 public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description, 3418 final Abortable abort) { 3419 final LockServiceClient client = 3420 new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator()); 3421 return client.regionLock(regionInfo, description, abort); 3422 } 3423 3424 @Override 3425 public void unassign(byte[] regionName) throws IOException { 3426 FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false)); 3427 } 3428 3429 @Override 3430 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3431 return this.rsSpaceQuotaManager; 3432 } 3433 3434 @Override 3435 public boolean reportFileArchivalForQuotas(TableName tableName, 3436 Collection<Entry<String, Long>> archivedFiles) { 3437 if (TEST_SKIP_REPORTING_TRANSITION) { 3438 return false; 3439 } 3440 RegionServerStatusService.BlockingInterface rss = rssStub; 3441 if (rss == null || rsSpaceQuotaManager == null) { 3442 // the current server could be stopping. 3443 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3444 return false; 3445 } 3446 try { 3447 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3448 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3449 rss.reportFileArchival(null, request); 3450 } catch (ServiceException se) { 3451 IOException ioe = ProtobufUtil.getRemoteException(se); 3452 if (ioe instanceof PleaseHoldException) { 3453 if (LOG.isTraceEnabled()) { 3454 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3455 + " This will be retried.", ioe); 3456 } 3457 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3458 return false; 3459 } 3460 if (rssStub == rss) { 3461 rssStub = null; 3462 } 3463 // re-create the stub if we failed to report the archival 3464 createRegionServerStatusStub(true); 3465 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3466 return false; 3467 } 3468 return true; 3469 } 3470 3471 void executeProcedure(long procId, RSProcedureCallable callable) { 3472 executorService.submit(new RSProcedureHandler(this, procId, callable)); 3473 } 3474 3475 public void remoteProcedureComplete(long procId, Throwable error) { 3476 procedureResultReporter.complete(procId, error); 3477 } 3478 3479 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3480 RegionServerStatusService.BlockingInterface rss; 3481 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 3482 for (;;) { 3483 rss = rssStub; 3484 if (rss != null) { 3485 break; 3486 } 3487 createRegionServerStatusStub(); 3488 } 3489 try { 3490 rss.reportProcedureDone(null, request); 3491 } catch (ServiceException se) { 3492 if (rssStub == rss) { 3493 rssStub = null; 3494 } 3495 throw ProtobufUtil.getRemoteException(se); 3496 } 3497 } 3498 3499 /** 3500 * Will ignore the open/close region procedures which already submitted or executed. When master 3501 * had unfinished open/close region procedure and restarted, new active master may send duplicate 3502 * open/close region request to regionserver. The open/close request is submitted to a thread pool 3503 * and execute. So first need a cache for submitted open/close region procedures. After the 3504 * open/close region request executed and report region transition succeed, cache it in executed 3505 * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3506 * transition succeed, master will not send the open/close region request to regionserver again. 3507 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3508 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See 3509 * HBASE-22404 for more details. 3510 * @param procId the id of the open/close region procedure 3511 * @return true if the procedure can be submitted. 3512 */ 3513 boolean submitRegionProcedure(long procId) { 3514 if (procId == -1) { 3515 return true; 3516 } 3517 // Ignore the region procedures which already submitted. 3518 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3519 if (previous != null) { 3520 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3521 return false; 3522 } 3523 // Ignore the region procedures which already executed. 3524 if (executedRegionProcedures.getIfPresent(procId) != null) { 3525 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3526 return false; 3527 } 3528 return true; 3529 } 3530 3531 /** 3532 * See {@link #submitRegionProcedure(long)}. 3533 * @param procId the id of the open/close region procedure 3534 */ 3535 public void finishRegionProcedure(long procId) { 3536 executedRegionProcedures.put(procId, procId); 3537 submittedRegionProcedures.remove(procId); 3538 } 3539 3540 /** 3541 * Force to terminate region server when abort timeout. 3542 */ 3543 private static class SystemExitWhenAbortTimeout extends TimerTask { 3544 3545 public SystemExitWhenAbortTimeout() { 3546 } 3547 3548 @Override 3549 public void run() { 3550 LOG.warn("Aborting region server timed out, terminating forcibly" 3551 + " and does not wait for any running shutdown hooks or finalizers to finish their work." 3552 + " Thread dump to stdout."); 3553 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3554 Runtime.getRuntime().halt(1); 3555 } 3556 } 3557 3558 @InterfaceAudience.Private 3559 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 3560 return compactedFileDischarger; 3561 } 3562 3563 /** 3564 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}} 3565 * @return pause time 3566 */ 3567 @InterfaceAudience.Private 3568 public long getRetryPauseTime() { 3569 return this.retryPauseTime; 3570 } 3571 3572 @Override 3573 public Optional<ServerName> getActiveMaster() { 3574 return Optional.ofNullable(masterAddressTracker.getMasterAddress()); 3575 } 3576 3577 @Override 3578 public List<ServerName> getBackupMasters() { 3579 return masterAddressTracker.getBackupMasters(); 3580 } 3581 3582 @Override 3583 public Iterator<ServerName> getBootstrapNodes() { 3584 return bootstrapNodeManager.getBootstrapNodes().iterator(); 3585 } 3586 3587 @Override 3588 public List<HRegionLocation> getMetaLocations() { 3589 return metaRegionLocationCache.getMetaRegionLocations(); 3590 } 3591 3592 @Override 3593 protected NamedQueueRecorder createNamedQueueRecord() { 3594 return NamedQueueRecorder.getInstance(conf); 3595 } 3596 3597 @Override 3598 protected boolean clusterMode() { 3599 // this method will be called in the constructor of super class, so we can not return masterless 3600 // directly here, as it will always be false. 3601 return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 3602 } 3603 3604 @InterfaceAudience.Private 3605 public BrokenStoreFileCleaner getBrokenStoreFileCleaner() { 3606 return brokenStoreFileCleaner; 3607 } 3608 3609 @InterfaceAudience.Private 3610 public RSMobFileCleanerChore getRSMobFileCleanerChore() { 3611 return rsMobFileCleanerChore; 3612 } 3613 3614 RSSnapshotVerifier getRsSnapshotVerifier() { 3615 return rsSnapshotVerifier; 3616 } 3617 3618 @Override 3619 protected void stopChores() { 3620 shutdownChore(nonceManagerChore); 3621 shutdownChore(compactionChecker); 3622 shutdownChore(compactedFileDischarger); 3623 shutdownChore(periodicFlusher); 3624 shutdownChore(healthCheckChore); 3625 shutdownChore(executorStatusChore); 3626 shutdownChore(storefileRefresher); 3627 shutdownChore(fsUtilizationChore); 3628 shutdownChore(namedQueueServiceChore); 3629 shutdownChore(brokenStoreFileCleaner); 3630 shutdownChore(rsMobFileCleanerChore); 3631 shutdownChore(replicationMarkerChore); 3632 } 3633 3634 @Override 3635 public RegionReplicationBufferManager getRegionReplicationBufferManager() { 3636 return regionReplicationBufferManager; 3637 } 3638}