001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 024import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; 025 026import io.opentelemetry.api.trace.Span; 027import io.opentelemetry.api.trace.StatusCode; 028import io.opentelemetry.context.Scope; 029import java.io.IOException; 030import java.io.PrintWriter; 031import java.lang.management.MemoryUsage; 032import java.lang.reflect.Constructor; 033import java.net.InetSocketAddress; 034import java.time.Duration; 035import java.util.ArrayList; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.Comparator; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Map; 043import java.util.Map.Entry; 044import java.util.Objects; 045import java.util.Optional; 046import java.util.Set; 047import java.util.SortedMap; 048import java.util.Timer; 049import java.util.TimerTask; 050import java.util.TreeMap; 051import java.util.TreeSet; 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.ConcurrentMap; 054import java.util.concurrent.ConcurrentSkipListMap; 055import java.util.concurrent.ThreadLocalRandom; 056import java.util.concurrent.TimeUnit; 057import java.util.concurrent.atomic.AtomicBoolean; 058import java.util.concurrent.locks.ReentrantReadWriteLock; 059import java.util.stream.Collectors; 060import javax.management.MalformedObjectNameException; 061import javax.servlet.http.HttpServlet; 062import org.apache.commons.lang3.StringUtils; 063import org.apache.hadoop.conf.Configuration; 064import org.apache.hadoop.fs.FileSystem; 065import org.apache.hadoop.fs.Path; 066import org.apache.hadoop.hbase.Abortable; 067import org.apache.hadoop.hbase.CacheEvictionStats; 068import org.apache.hadoop.hbase.CallQueueTooBigException; 069import org.apache.hadoop.hbase.ClockOutOfSyncException; 070import org.apache.hadoop.hbase.DoNotRetryIOException; 071import org.apache.hadoop.hbase.ExecutorStatusChore; 072import org.apache.hadoop.hbase.HBaseConfiguration; 073import org.apache.hadoop.hbase.HBaseInterfaceAudience; 074import org.apache.hadoop.hbase.HBaseServerBase; 075import org.apache.hadoop.hbase.HConstants; 076import org.apache.hadoop.hbase.HDFSBlocksDistribution; 077import org.apache.hadoop.hbase.HRegionLocation; 078import org.apache.hadoop.hbase.HealthCheckChore; 079import org.apache.hadoop.hbase.MetaTableAccessor; 080import org.apache.hadoop.hbase.NotServingRegionException; 081import org.apache.hadoop.hbase.PleaseHoldException; 082import org.apache.hadoop.hbase.ScheduledChore; 083import org.apache.hadoop.hbase.ServerName; 084import org.apache.hadoop.hbase.Stoppable; 085import org.apache.hadoop.hbase.TableName; 086import org.apache.hadoop.hbase.YouAreDeadException; 087import org.apache.hadoop.hbase.ZNodeClearer; 088import org.apache.hadoop.hbase.client.ConnectionUtils; 089import org.apache.hadoop.hbase.client.RegionInfo; 090import org.apache.hadoop.hbase.client.RegionInfoBuilder; 091import org.apache.hadoop.hbase.client.locking.EntityLock; 092import org.apache.hadoop.hbase.client.locking.LockServiceClient; 093import org.apache.hadoop.hbase.conf.ConfigurationManager; 094import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 095import org.apache.hadoop.hbase.exceptions.RegionMovedException; 096import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 097import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 098import org.apache.hadoop.hbase.executor.ExecutorType; 099import org.apache.hadoop.hbase.http.InfoServer; 100import org.apache.hadoop.hbase.io.hfile.BlockCache; 101import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 102import org.apache.hadoop.hbase.io.hfile.HFile; 103import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 104import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 105import org.apache.hadoop.hbase.ipc.RpcClient; 106import org.apache.hadoop.hbase.ipc.RpcServer; 107import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 108import org.apache.hadoop.hbase.ipc.ServerRpcController; 109import org.apache.hadoop.hbase.log.HBaseMarkers; 110import org.apache.hadoop.hbase.mob.MobFileCache; 111import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; 112import org.apache.hadoop.hbase.monitoring.TaskMonitor; 113import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 114import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; 115import org.apache.hadoop.hbase.net.Address; 116import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 117import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 118import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 119import org.apache.hadoop.hbase.quotas.QuotaUtil; 120import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 121import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 122import org.apache.hadoop.hbase.quotas.RegionSize; 123import org.apache.hadoop.hbase.quotas.RegionSizeStore; 124import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 125import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 126import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 127import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 128import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 129import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 130import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 131import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 132import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet; 133import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; 134import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; 135import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 136import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 137import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 138import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 139import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 140import org.apache.hadoop.hbase.security.SecurityConstants; 141import org.apache.hadoop.hbase.security.Superusers; 142import org.apache.hadoop.hbase.security.User; 143import org.apache.hadoop.hbase.security.UserProvider; 144import org.apache.hadoop.hbase.trace.TraceUtil; 145import org.apache.hadoop.hbase.util.Bytes; 146import org.apache.hadoop.hbase.util.CompressionTest; 147import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 148import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 149import org.apache.hadoop.hbase.util.FSUtils; 150import org.apache.hadoop.hbase.util.FutureUtils; 151import org.apache.hadoop.hbase.util.JvmPauseMonitor; 152import org.apache.hadoop.hbase.util.Pair; 153import org.apache.hadoop.hbase.util.RetryCounter; 154import org.apache.hadoop.hbase.util.RetryCounterFactory; 155import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 156import org.apache.hadoop.hbase.util.Threads; 157import org.apache.hadoop.hbase.util.VersionInfo; 158import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 159import org.apache.hadoop.hbase.wal.WAL; 160import org.apache.hadoop.hbase.wal.WALFactory; 161import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 162import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 163import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 164import org.apache.hadoop.hbase.zookeeper.ZKUtil; 165import org.apache.hadoop.ipc.RemoteException; 166import org.apache.hadoop.util.ReflectionUtils; 167import org.apache.yetus.audience.InterfaceAudience; 168import org.apache.zookeeper.KeeperException; 169import org.slf4j.Logger; 170import org.slf4j.LoggerFactory; 171 172import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 173import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 174import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 175import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 176import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 177import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 178import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 179import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 180import org.apache.hbase.thirdparty.com.google.protobuf.Message; 181import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 182import org.apache.hbase.thirdparty.com.google.protobuf.Service; 183import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 184import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 185import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 186 187import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 188import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 218 219/** 220 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There 221 * are many HRegionServers in a single HBase deployment. 222 */ 223@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 224@SuppressWarnings({ "deprecation" }) 225public class HRegionServer extends HBaseServerBase<RSRpcServices> 226 implements RegionServerServices, LastSequenceId { 227 228 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 229 230 /** 231 * For testing only! Set to true to skip notifying region assignment to master . 232 */ 233 @InterfaceAudience.Private 234 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") 235 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 236 237 /** 238 * A map from RegionName to current action in progress. Boolean value indicates: true - if open 239 * region action in progress false - if close region action in progress 240 */ 241 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 242 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 243 244 /** 245 * Used to cache the open/close region procedures which already submitted. See 246 * {@link #submitRegionProcedure(long)}. 247 */ 248 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 249 /** 250 * Used to cache the open/close region procedures which already executed. See 251 * {@link #submitRegionProcedure(long)}. 252 */ 253 private final Cache<Long, Long> executedRegionProcedures = 254 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 255 256 /** 257 * Used to cache the moved-out regions 258 */ 259 private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder() 260 .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build(); 261 262 private MemStoreFlusher cacheFlusher; 263 264 private HeapMemoryManager hMemManager; 265 266 // Replication services. If no replication, this handler will be null. 267 private ReplicationSourceService replicationSourceHandler; 268 private ReplicationSinkService replicationSinkHandler; 269 private boolean sameReplicationSourceAndSink; 270 271 // Compactions 272 private CompactSplit compactSplitThread; 273 274 /** 275 * Map of regions currently being served by this region server. Key is the encoded region name. 276 * All access should be synchronized. 277 */ 278 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 279 /** 280 * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it 281 * need to be a ConcurrentHashMap? 282 */ 283 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 284 285 /** 286 * Map of encoded region names to the DataNode locations they should be hosted on We store the 287 * value as Address since InetSocketAddress is required by the HDFS API (create() that takes 288 * favored nodes as hints for placing file blocks). We could have used ServerName here as the 289 * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API 290 * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and 291 * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the 292 * conversion on demand from Address to InetSocketAddress will guarantee the resolution results 293 * will be fresh when we need it. 294 */ 295 private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 296 297 private LeaseManager leaseManager; 298 299 private volatile boolean dataFsOk; 300 301 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 302 // Default abort timeout is 1200 seconds for safe 303 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 304 // Will run this task when abort timeout 305 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 306 307 // A state before we go into stopped state. At this stage we're closing user 308 // space regions. 309 private boolean stopping = false; 310 private volatile boolean killed = false; 311 312 private final int threadWakeFrequency; 313 314 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 315 private final int compactionCheckFrequency; 316 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 317 private final int flushCheckFrequency; 318 319 // Stub to do region server status calls against the master. 320 private volatile RegionServerStatusService.BlockingInterface rssStub; 321 private volatile LockService.BlockingInterface lockStub; 322 // RPC client. Used to make the stub above that does region server status checking. 323 private RpcClient rpcClient; 324 325 private UncaughtExceptionHandler uncaughtExceptionHandler; 326 327 private JvmPauseMonitor pauseMonitor; 328 329 private RSSnapshotVerifier rsSnapshotVerifier; 330 331 /** region server process name */ 332 public static final String REGIONSERVER = "regionserver"; 333 334 private MetricsRegionServer metricsRegionServer; 335 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 336 337 /** 338 * Check for compactions requests. 339 */ 340 private ScheduledChore compactionChecker; 341 342 /** 343 * Check for flushes 344 */ 345 private ScheduledChore periodicFlusher; 346 347 private volatile WALFactory walFactory; 348 349 private LogRoller walRoller; 350 351 // A thread which calls reportProcedureDone 352 private RemoteProcedureResultReporter procedureResultReporter; 353 354 // flag set after we're done setting up server threads 355 final AtomicBoolean online = new AtomicBoolean(false); 356 357 // master address tracker 358 private final MasterAddressTracker masterAddressTracker; 359 360 // Log Splitting Worker 361 private SplitLogWorker splitLogWorker; 362 363 private final int shortOperationTimeout; 364 365 // Time to pause if master says 'please hold' 366 private final long retryPauseTime; 367 368 private final RegionServerAccounting regionServerAccounting; 369 370 private SlowLogTableOpsChore slowLogTableOpsChore = null; 371 372 // Block cache 373 private BlockCache blockCache; 374 // The cache for mob files 375 private MobFileCache mobFileCache; 376 377 /** The health check chore. */ 378 private HealthCheckChore healthCheckChore; 379 380 /** The Executor status collect chore. */ 381 private ExecutorStatusChore executorStatusChore; 382 383 /** The nonce manager chore. */ 384 private ScheduledChore nonceManagerChore; 385 386 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap(); 387 388 /** 389 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use 390 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead. 391 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a> 392 */ 393 @Deprecated 394 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 395 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 396 "hbase.regionserver.hostname.disable.master.reversedns"; 397 398 /** 399 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive. 400 * Exception will be thrown if both are used. 401 */ 402 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 403 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 404 "hbase.unsafe.regionserver.hostname.disable.master.reversedns"; 405 406 /** 407 * Unique identifier for the cluster we are a part of. 408 */ 409 private String clusterId; 410 411 // chore for refreshing store files for secondary regions 412 private StorefileRefresherChore storefileRefresher; 413 414 private volatile RegionServerCoprocessorHost rsHost; 415 416 private RegionServerProcedureManagerHost rspmHost; 417 418 private RegionServerRpcQuotaManager rsQuotaManager; 419 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 420 421 /** 422 * Nonce manager. Nonces are used to make operations like increment and append idempotent in the 423 * case where client doesn't receive the response from a successful operation and retries. We 424 * track the successful ops for some time via a nonce sent by client and handle duplicate 425 * operations (currently, by failing them; in future we might use MVCC to return result). Nonces 426 * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL 427 * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past 428 * records. If we don't read the records, we don't read and recover the nonces. Some WALs within 429 * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL 430 * recovery during normal region move, so nonces will not be transfered. We can have separate 431 * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path 432 * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a 433 * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part 434 * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't 435 * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be 436 * recovered during move. 437 */ 438 final ServerNonceManager nonceManager; 439 440 private BrokenStoreFileCleaner brokenStoreFileCleaner; 441 442 private RSMobFileCleanerChore rsMobFileCleanerChore; 443 444 @InterfaceAudience.Private 445 CompactedHFilesDischarger compactedFileDischarger; 446 447 private volatile ThroughputController flushThroughputController; 448 449 private SecureBulkLoadManager secureBulkLoadManager; 450 451 private FileSystemUtilizationChore fsUtilizationChore; 452 453 private BootstrapNodeManager bootstrapNodeManager; 454 455 /** 456 * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to 457 * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing 458 * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a 459 * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace 460 * {@link #TEST_SKIP_REPORTING_TRANSITION} ? 461 */ 462 private final boolean masterless; 463 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 464 465 /** regionserver codec list **/ 466 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 467 468 // A timer to shutdown the process if abort takes too long 469 private Timer abortMonitor; 470 471 private RegionReplicationBufferManager regionReplicationBufferManager; 472 473 /** 474 * Starts a HRegionServer at the default location. 475 * <p/> 476 * Don't start any services or managers in here in the Constructor. Defer till after we register 477 * with the Master as much as possible. See {@link #startServices}. 478 */ 479 public HRegionServer(final Configuration conf) throws IOException { 480 super(conf, "RegionServer"); // thread name 481 final Span span = TraceUtil.createSpan("HRegionServer.cxtor"); 482 try (Scope ignored = span.makeCurrent()) { 483 this.dataFsOk = true; 484 this.masterless = !clusterMode(); 485 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); 486 HFile.checkHFileVersion(this.conf); 487 checkCodecs(this.conf); 488 FSUtils.setupShortCircuitRead(this.conf); 489 490 // Disable usage of meta replicas in the regionserver 491 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 492 // Config'ed params 493 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 494 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 495 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 496 497 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 498 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 499 500 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 501 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 502 503 this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME, 504 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); 505 506 regionServerAccounting = new RegionServerAccounting(conf); 507 508 blockCache = BlockCacheFactory.createBlockCache(conf); 509 mobFileCache = new MobFileCache(conf); 510 511 rsSnapshotVerifier = new RSSnapshotVerifier(conf); 512 513 uncaughtExceptionHandler = 514 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 515 516 // If no master in cluster, skip trying to track one or look for a cluster status. 517 if (!this.masterless) { 518 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 519 masterAddressTracker.start(); 520 } else { 521 masterAddressTracker = null; 522 } 523 this.rpcServices.start(zooKeeper); 524 span.setStatus(StatusCode.OK); 525 } catch (Throwable t) { 526 // Make sure we log the exception. HRegionServer is often started via reflection and the 527 // cause of failed startup is lost. 528 TraceUtil.setError(span, t); 529 LOG.error("Failed construction RegionServer", t); 530 throw t; 531 } finally { 532 span.end(); 533 } 534 } 535 536 // HMaster should override this method to load the specific config for master 537 @Override 538 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 539 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); 540 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 541 if (!StringUtils.isBlank(hostname)) { 542 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " 543 + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " 544 + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " 545 + UNSAFE_RS_HOSTNAME_KEY + " is used"; 546 throw new IOException(msg); 547 } else { 548 return rpcServices.getSocketAddress().getHostName(); 549 } 550 } else { 551 return hostname; 552 } 553 } 554 555 @Override 556 protected void login(UserProvider user, String host) throws IOException { 557 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 558 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 559 } 560 561 @Override 562 protected String getProcessName() { 563 return REGIONSERVER; 564 } 565 566 @Override 567 protected boolean canCreateBaseZNode() { 568 return !clusterMode(); 569 } 570 571 @Override 572 protected boolean canUpdateTableDescriptor() { 573 return false; 574 } 575 576 @Override 577 protected boolean cacheTableDescriptor() { 578 return false; 579 } 580 581 protected RSRpcServices createRpcServices() throws IOException { 582 return new RSRpcServices(this); 583 } 584 585 @Override 586 protected void configureInfoServer(InfoServer infoServer) { 587 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 588 infoServer.setAttribute(REGIONSERVER, this); 589 } 590 591 @Override 592 protected Class<? extends HttpServlet> getDumpServlet() { 593 return RSDumpServlet.class; 594 } 595 596 /** 597 * Used by {@link RSDumpServlet} to generate debugging information. 598 */ 599 public void dumpRowLocks(final PrintWriter out) { 600 StringBuilder sb = new StringBuilder(); 601 for (HRegion region : getRegions()) { 602 if (region.getLockedRows().size() > 0) { 603 for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) { 604 sb.setLength(0); 605 sb.append(region.getTableDescriptor().getTableName()).append(",") 606 .append(region.getRegionInfo().getEncodedName()).append(","); 607 sb.append(rowLockContext.toString()); 608 out.println(sb); 609 } 610 } 611 } 612 } 613 614 @Override 615 public boolean registerService(Service instance) { 616 // No stacking of instances is allowed for a single executorService name 617 ServiceDescriptor serviceDesc = instance.getDescriptorForType(); 618 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 619 if (coprocessorServiceHandlers.containsKey(serviceName)) { 620 LOG.error("Coprocessor executorService " + serviceName 621 + " already registered, rejecting request from " + instance); 622 return false; 623 } 624 625 coprocessorServiceHandlers.put(serviceName, instance); 626 if (LOG.isDebugEnabled()) { 627 LOG.debug( 628 "Registered regionserver coprocessor executorService: executorService=" + serviceName); 629 } 630 return true; 631 } 632 633 /** 634 * Run test on configured codecs to make sure supporting libs are in place. 635 */ 636 private static void checkCodecs(final Configuration c) throws IOException { 637 // check to see if the codec list is available: 638 String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null); 639 if (codecs == null) { 640 return; 641 } 642 for (String codec : codecs) { 643 if (!CompressionTest.testCompression(codec)) { 644 throw new IOException( 645 "Compression codec " + codec + " not supported, aborting RS construction"); 646 } 647 } 648 } 649 650 public String getClusterId() { 651 return this.clusterId; 652 } 653 654 /** 655 * All initialization needed before we go register with Master.<br> 656 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 657 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 658 */ 659 private void preRegistrationInitialization() { 660 final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization"); 661 try (Scope ignored = span.makeCurrent()) { 662 initializeZooKeeper(); 663 setupClusterConnection(); 664 bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker); 665 regionReplicationBufferManager = new RegionReplicationBufferManager(this); 666 // Setup RPC client for master communication 667 this.rpcClient = asyncClusterConnection.getRpcClient(); 668 span.setStatus(StatusCode.OK); 669 } catch (Throwable t) { 670 // Call stop if error or process will stick around for ever since server 671 // puts up non-daemon threads. 672 TraceUtil.setError(span, t); 673 this.rpcServices.stop(); 674 abort("Initialization of RS failed. Hence aborting RS.", t); 675 } finally { 676 span.end(); 677 } 678 } 679 680 /** 681 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 682 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 683 * <p> 684 * Finally open long-living server short-circuit connection. 685 */ 686 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 687 justification = "cluster Id znode read would give us correct response") 688 private void initializeZooKeeper() throws IOException, InterruptedException { 689 // Nothing to do in here if no Master in the mix. 690 if (this.masterless) { 691 return; 692 } 693 694 // Create the master address tracker, register with zk, and start it. Then 695 // block until a master is available. No point in starting up if no master 696 // running. 697 blockAndCheckIfStopped(this.masterAddressTracker); 698 699 // Wait on cluster being up. Master will set this flag up in zookeeper 700 // when ready. 701 blockAndCheckIfStopped(this.clusterStatusTracker); 702 703 // If we are HMaster then the cluster id should have already been set. 704 if (clusterId == null) { 705 // Retrieve clusterId 706 // Since cluster status is now up 707 // ID should have already been set by HMaster 708 try { 709 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 710 if (clusterId == null) { 711 this.abort("Cluster ID has not been set"); 712 } 713 LOG.info("ClusterId : " + clusterId); 714 } catch (KeeperException e) { 715 this.abort("Failed to retrieve Cluster ID", e); 716 } 717 } 718 719 if (isStopped() || isAborted()) { 720 return; // No need for further initialization 721 } 722 723 // watch for snapshots and other procedures 724 try { 725 rspmHost = new RegionServerProcedureManagerHost(); 726 rspmHost.loadProcedures(conf); 727 rspmHost.initialize(this); 728 } catch (KeeperException e) { 729 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 730 } 731 } 732 733 /** 734 * Utilty method to wait indefinitely on a znode availability while checking if the region server 735 * is shut down 736 * @param tracker znode tracker to use 737 * @throws IOException any IO exception, plus if the RS is stopped 738 * @throws InterruptedException if the waiting thread is interrupted 739 */ 740 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 741 throws IOException, InterruptedException { 742 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 743 if (this.stopped) { 744 throw new IOException("Received the shutdown message while waiting."); 745 } 746 } 747 } 748 749 /** 750 * @return True if the cluster is up. 751 */ 752 @Override 753 public boolean isClusterUp() { 754 return this.masterless 755 || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 756 } 757 758 /** 759 * The HRegionServer sticks in this loop until closed. 760 */ 761 @Override 762 public void run() { 763 if (isStopped()) { 764 LOG.info("Skipping run; stopped"); 765 return; 766 } 767 try { 768 // Do pre-registration initializations; zookeeper, lease threads, etc. 769 preRegistrationInitialization(); 770 } catch (Throwable e) { 771 abort("Fatal exception during initialization", e); 772 } 773 774 try { 775 if (!isStopped() && !isAborted()) { 776 installShutdownHook(); 777 // Initialize the RegionServerCoprocessorHost now that our ephemeral 778 // node was created, in case any coprocessors want to use ZooKeeper 779 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 780 781 // Try and register with the Master; tell it we are here. Break if server is stopped or 782 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 783 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 784 // come up. 785 LOG.debug("About to register with Master."); 786 TraceUtil.trace(() -> { 787 RetryCounterFactory rcf = 788 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 789 RetryCounter rc = rcf.create(); 790 while (keepLooping()) { 791 RegionServerStartupResponse w = reportForDuty(); 792 if (w == null) { 793 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 794 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 795 this.sleeper.sleep(sleepTime); 796 } else { 797 handleReportForDutyResponse(w); 798 break; 799 } 800 } 801 }, "HRegionServer.registerWithMaster"); 802 } 803 804 if (!isStopped() && isHealthy()) { 805 TraceUtil.trace(() -> { 806 // start the snapshot handler and other procedure handlers, 807 // since the server is ready to run 808 if (this.rspmHost != null) { 809 this.rspmHost.start(); 810 } 811 // Start the Quota Manager 812 if (this.rsQuotaManager != null) { 813 rsQuotaManager.start(getRpcServer().getScheduler()); 814 } 815 if (this.rsSpaceQuotaManager != null) { 816 this.rsSpaceQuotaManager.start(); 817 } 818 }, "HRegionServer.startup"); 819 } 820 821 // We registered with the Master. Go into run mode. 822 long lastMsg = EnvironmentEdgeManager.currentTime(); 823 long oldRequestCount = -1; 824 // The main run loop. 825 while (!isStopped() && isHealthy()) { 826 if (!isClusterUp()) { 827 if (onlineRegions.isEmpty()) { 828 stop("Exiting; cluster shutdown set and not carrying any regions"); 829 } else if (!this.stopping) { 830 this.stopping = true; 831 LOG.info("Closing user regions"); 832 closeUserRegions(isAborted()); 833 } else { 834 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 835 if (allUserRegionsOffline) { 836 // Set stopped if no more write requests tp meta tables 837 // since last time we went around the loop. Any open 838 // meta regions will be closed on our way out. 839 if (oldRequestCount == getWriteRequestCount()) { 840 stop("Stopped; only catalog regions remaining online"); 841 break; 842 } 843 oldRequestCount = getWriteRequestCount(); 844 } else { 845 // Make sure all regions have been closed -- some regions may 846 // have not got it because we were splitting at the time of 847 // the call to closeUserRegions. 848 closeUserRegions(this.abortRequested.get()); 849 } 850 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 851 } 852 } 853 long now = EnvironmentEdgeManager.currentTime(); 854 if ((now - lastMsg) >= msgInterval) { 855 tryRegionServerReport(lastMsg, now); 856 lastMsg = EnvironmentEdgeManager.currentTime(); 857 } 858 if (!isStopped() && !isAborted()) { 859 this.sleeper.sleep(); 860 } 861 } // for 862 } catch (Throwable t) { 863 if (!rpcServices.checkOOME(t)) { 864 String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: "; 865 abort(prefix + t.getMessage(), t); 866 } 867 } 868 869 final Span span = TraceUtil.createSpan("HRegionServer exiting main loop"); 870 try (Scope ignored = span.makeCurrent()) { 871 if (this.leaseManager != null) { 872 this.leaseManager.closeAfterLeasesExpire(); 873 } 874 if (this.splitLogWorker != null) { 875 splitLogWorker.stop(); 876 } 877 stopInfoServer(); 878 // Send cache a shutdown. 879 if (blockCache != null) { 880 blockCache.shutdown(); 881 } 882 if (mobFileCache != null) { 883 mobFileCache.shutdown(); 884 } 885 886 // Send interrupts to wake up threads if sleeping so they notice shutdown. 887 // TODO: Should we check they are alive? If OOME could have exited already 888 if (this.hMemManager != null) { 889 this.hMemManager.stop(); 890 } 891 if (this.cacheFlusher != null) { 892 this.cacheFlusher.interruptIfNecessary(); 893 } 894 if (this.compactSplitThread != null) { 895 this.compactSplitThread.interruptIfNecessary(); 896 } 897 898 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 899 if (rspmHost != null) { 900 rspmHost.stop(this.abortRequested.get() || this.killed); 901 } 902 903 if (this.killed) { 904 // Just skip out w/o closing regions. Used when testing. 905 } else if (abortRequested.get()) { 906 if (this.dataFsOk) { 907 closeUserRegions(abortRequested.get()); // Don't leave any open file handles 908 } 909 LOG.info("aborting server " + this.serverName); 910 } else { 911 closeUserRegions(abortRequested.get()); 912 LOG.info("stopping server " + this.serverName); 913 } 914 regionReplicationBufferManager.stop(); 915 closeClusterConnection(); 916 // Closing the compactSplit thread before closing meta regions 917 if (!this.killed && containsMetaTableRegions()) { 918 if (!abortRequested.get() || this.dataFsOk) { 919 if (this.compactSplitThread != null) { 920 this.compactSplitThread.join(); 921 this.compactSplitThread = null; 922 } 923 closeMetaTableRegions(abortRequested.get()); 924 } 925 } 926 927 if (!this.killed && this.dataFsOk) { 928 waitOnAllRegionsToClose(abortRequested.get()); 929 LOG.info("stopping server " + this.serverName + "; all regions closed."); 930 } 931 932 // Stop the quota manager 933 if (rsQuotaManager != null) { 934 rsQuotaManager.stop(); 935 } 936 if (rsSpaceQuotaManager != null) { 937 rsSpaceQuotaManager.stop(); 938 rsSpaceQuotaManager = null; 939 } 940 941 // flag may be changed when closing regions throws exception. 942 if (this.dataFsOk) { 943 shutdownWAL(!abortRequested.get()); 944 } 945 946 // Make sure the proxy is down. 947 if (this.rssStub != null) { 948 this.rssStub = null; 949 } 950 if (this.lockStub != null) { 951 this.lockStub = null; 952 } 953 if (this.rpcClient != null) { 954 this.rpcClient.close(); 955 } 956 if (this.leaseManager != null) { 957 this.leaseManager.close(); 958 } 959 if (this.pauseMonitor != null) { 960 this.pauseMonitor.stop(); 961 } 962 963 if (!killed) { 964 stopServiceThreads(); 965 } 966 967 if (this.rpcServices != null) { 968 this.rpcServices.stop(); 969 } 970 971 try { 972 deleteMyEphemeralNode(); 973 } catch (KeeperException.NoNodeException nn) { 974 // pass 975 } catch (KeeperException e) { 976 LOG.warn("Failed deleting my ephemeral node", e); 977 } 978 // We may have failed to delete the znode at the previous step, but 979 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 980 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 981 982 closeZooKeeper(); 983 closeTableDescriptors(); 984 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 985 span.setStatus(StatusCode.OK); 986 } finally { 987 span.end(); 988 } 989 } 990 991 private boolean containsMetaTableRegions() { 992 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 993 } 994 995 private boolean areAllUserRegionsOffline() { 996 if (getNumberOfOnlineRegions() > 2) { 997 return false; 998 } 999 boolean allUserRegionsOffline = true; 1000 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1001 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1002 allUserRegionsOffline = false; 1003 break; 1004 } 1005 } 1006 return allUserRegionsOffline; 1007 } 1008 1009 /** 1010 * @return Current write count for all online regions. 1011 */ 1012 private long getWriteRequestCount() { 1013 long writeCount = 0; 1014 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1015 writeCount += e.getValue().getWriteRequestsCount(); 1016 } 1017 return writeCount; 1018 } 1019 1020 @InterfaceAudience.Private 1021 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1022 throws IOException { 1023 RegionServerStatusService.BlockingInterface rss = rssStub; 1024 if (rss == null) { 1025 // the current server could be stopping. 1026 return; 1027 } 1028 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1029 final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport"); 1030 try (Scope ignored = span.makeCurrent()) { 1031 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1032 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1033 request.setLoad(sl); 1034 rss.regionServerReport(null, request.build()); 1035 span.setStatus(StatusCode.OK); 1036 } catch (ServiceException se) { 1037 IOException ioe = ProtobufUtil.getRemoteException(se); 1038 if (ioe instanceof YouAreDeadException) { 1039 // This will be caught and handled as a fatal error in run() 1040 TraceUtil.setError(span, ioe); 1041 throw ioe; 1042 } 1043 if (rssStub == rss) { 1044 rssStub = null; 1045 } 1046 TraceUtil.setError(span, se); 1047 // Couldn't connect to the master, get location from zk and reconnect 1048 // Method blocks until new master is found or we are stopped 1049 createRegionServerStatusStub(true); 1050 } finally { 1051 span.end(); 1052 } 1053 } 1054 1055 /** 1056 * Reports the given map of Regions and their size on the filesystem to the active Master. 1057 * @param regionSizeStore The store containing region sizes 1058 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1059 */ 1060 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1061 RegionServerStatusService.BlockingInterface rss = rssStub; 1062 if (rss == null) { 1063 // the current server could be stopping. 1064 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1065 return true; 1066 } 1067 try { 1068 buildReportAndSend(rss, regionSizeStore); 1069 } catch (ServiceException se) { 1070 IOException ioe = ProtobufUtil.getRemoteException(se); 1071 if (ioe instanceof PleaseHoldException) { 1072 LOG.trace("Failed to report region sizes to Master because it is initializing." 1073 + " This will be retried.", ioe); 1074 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1075 return true; 1076 } 1077 if (rssStub == rss) { 1078 rssStub = null; 1079 } 1080 createRegionServerStatusStub(true); 1081 if (ioe instanceof DoNotRetryIOException) { 1082 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1083 if (doNotRetryEx.getCause() != null) { 1084 Throwable t = doNotRetryEx.getCause(); 1085 if (t instanceof UnsupportedOperationException) { 1086 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1087 return false; 1088 } 1089 } 1090 } 1091 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1092 } 1093 return true; 1094 } 1095 1096 /** 1097 * Builds the region size report and sends it to the master. Upon successful sending of the 1098 * report, the region sizes that were sent are marked as sent. 1099 * @param rss The stub to send to the Master 1100 * @param regionSizeStore The store containing region sizes 1101 */ 1102 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1103 RegionSizeStore regionSizeStore) throws ServiceException { 1104 RegionSpaceUseReportRequest request = 1105 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1106 rss.reportRegionSpaceUse(null, request); 1107 // Record the number of size reports sent 1108 if (metricsRegionServer != null) { 1109 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1110 } 1111 } 1112 1113 /** 1114 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1115 * @param regionSizes The size in bytes of regions 1116 * @return The corresponding protocol buffer message. 1117 */ 1118 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1119 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1120 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1121 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1122 } 1123 return request.build(); 1124 } 1125 1126 /** 1127 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf 1128 * message. 1129 * @param regionInfo The RegionInfo 1130 * @param sizeInBytes The size in bytes of the Region 1131 * @return The protocol buffer 1132 */ 1133 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1134 return RegionSpaceUse.newBuilder() 1135 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1136 .setRegionSize(Objects.requireNonNull(sizeInBytes)).build(); 1137 } 1138 1139 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1140 throws IOException { 1141 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1142 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1143 // the wrapper to compute those numbers in one place. 1144 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1145 // Instead they should be stored in an HBase table so that external visibility into HBase is 1146 // improved; Additionally the load balancer will be able to take advantage of a more complete 1147 // history. 1148 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1149 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1150 long usedMemory = -1L; 1151 long maxMemory = -1L; 1152 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1153 if (usage != null) { 1154 usedMemory = usage.getUsed(); 1155 maxMemory = usage.getMax(); 1156 } 1157 1158 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1159 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1160 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1161 serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024)); 1162 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1163 serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount()); 1164 serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount()); 1165 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1166 Builder coprocessorBuilder = Coprocessor.newBuilder(); 1167 for (String coprocessor : coprocessors) { 1168 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1169 } 1170 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1171 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1172 for (HRegion region : regions) { 1173 if (region.getCoprocessorHost() != null) { 1174 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1175 for (String regionCoprocessor : regionCoprocessors) { 1176 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1177 } 1178 } 1179 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1180 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1181 .getCoprocessors()) { 1182 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1183 } 1184 } 1185 serverLoad.setReportStartTime(reportStartTime); 1186 serverLoad.setReportEndTime(reportEndTime); 1187 if (this.infoServer != null) { 1188 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1189 } else { 1190 serverLoad.setInfoServerPort(-1); 1191 } 1192 MetricsUserAggregateSource userSource = 1193 metricsRegionServer.getMetricsUserAggregate().getSource(); 1194 if (userSource != null) { 1195 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1196 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1197 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1198 } 1199 } 1200 1201 if (sameReplicationSourceAndSink && replicationSourceHandler != null) { 1202 // always refresh first to get the latest value 1203 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1204 if (rLoad != null) { 1205 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1206 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1207 .getReplicationLoadSourceEntries()) { 1208 serverLoad.addReplLoadSource(rLS); 1209 } 1210 } 1211 } else { 1212 if (replicationSourceHandler != null) { 1213 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1214 if (rLoad != null) { 1215 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1216 .getReplicationLoadSourceEntries()) { 1217 serverLoad.addReplLoadSource(rLS); 1218 } 1219 } 1220 } 1221 if (replicationSinkHandler != null) { 1222 ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad(); 1223 if (rLoad != null) { 1224 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1225 } 1226 } 1227 } 1228 1229 TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask 1230 .newBuilder().setDescription(task.getDescription()) 1231 .setStatus(task.getStatus() != null ? task.getStatus() : "") 1232 .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) 1233 .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build())); 1234 1235 return serverLoad.build(); 1236 } 1237 1238 private String getOnlineRegionsAsPrintableString() { 1239 StringBuilder sb = new StringBuilder(); 1240 for (Region r : this.onlineRegions.values()) { 1241 if (sb.length() > 0) { 1242 sb.append(", "); 1243 } 1244 sb.append(r.getRegionInfo().getEncodedName()); 1245 } 1246 return sb.toString(); 1247 } 1248 1249 /** 1250 * Wait on regions close. 1251 */ 1252 private void waitOnAllRegionsToClose(final boolean abort) { 1253 // Wait till all regions are closed before going out. 1254 int lastCount = -1; 1255 long previousLogTime = 0; 1256 Set<String> closedRegions = new HashSet<>(); 1257 boolean interrupted = false; 1258 try { 1259 while (!onlineRegions.isEmpty()) { 1260 int count = getNumberOfOnlineRegions(); 1261 // Only print a message if the count of regions has changed. 1262 if (count != lastCount) { 1263 // Log every second at most 1264 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 1265 previousLogTime = EnvironmentEdgeManager.currentTime(); 1266 lastCount = count; 1267 LOG.info("Waiting on " + count + " regions to close"); 1268 // Only print out regions still closing if a small number else will 1269 // swamp the log. 1270 if (count < 10 && LOG.isDebugEnabled()) { 1271 LOG.debug("Online Regions=" + this.onlineRegions); 1272 } 1273 } 1274 } 1275 // Ensure all user regions have been sent a close. Use this to 1276 // protect against the case where an open comes in after we start the 1277 // iterator of onlineRegions to close all user regions. 1278 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1279 RegionInfo hri = e.getValue().getRegionInfo(); 1280 if ( 1281 !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) 1282 && !closedRegions.contains(hri.getEncodedName()) 1283 ) { 1284 closedRegions.add(hri.getEncodedName()); 1285 // Don't update zk with this close transition; pass false. 1286 closeRegionIgnoreErrors(hri, abort); 1287 } 1288 } 1289 // No regions in RIT, we could stop waiting now. 1290 if (this.regionsInTransitionInRS.isEmpty()) { 1291 if (!onlineRegions.isEmpty()) { 1292 LOG.info("We were exiting though online regions are not empty," 1293 + " because some regions failed closing"); 1294 } 1295 break; 1296 } else { 1297 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream() 1298 .map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1299 } 1300 if (sleepInterrupted(200)) { 1301 interrupted = true; 1302 } 1303 } 1304 } finally { 1305 if (interrupted) { 1306 Thread.currentThread().interrupt(); 1307 } 1308 } 1309 } 1310 1311 private static boolean sleepInterrupted(long millis) { 1312 boolean interrupted = false; 1313 try { 1314 Thread.sleep(millis); 1315 } catch (InterruptedException e) { 1316 LOG.warn("Interrupted while sleeping"); 1317 interrupted = true; 1318 } 1319 return interrupted; 1320 } 1321 1322 private void shutdownWAL(final boolean close) { 1323 if (this.walFactory != null) { 1324 try { 1325 if (close) { 1326 walFactory.close(); 1327 } else { 1328 walFactory.shutdown(); 1329 } 1330 } catch (Throwable e) { 1331 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1332 LOG.error("Shutdown / close of WAL failed: " + e); 1333 LOG.debug("Shutdown / close exception details:", e); 1334 } 1335 } 1336 } 1337 1338 /** 1339 * Run init. Sets up wal and starts up all server threads. 1340 * @param c Extra configuration. 1341 */ 1342 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1343 throws IOException { 1344 try { 1345 boolean updateRootDir = false; 1346 for (NameStringPair e : c.getMapEntriesList()) { 1347 String key = e.getName(); 1348 // The hostname the master sees us as. 1349 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1350 String hostnameFromMasterPOV = e.getValue(); 1351 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, 1352 rpcServices.getSocketAddress().getPort(), this.startcode); 1353 if ( 1354 !StringUtils.isBlank(useThisHostnameInstead) 1355 && !hostnameFromMasterPOV.equals(useThisHostnameInstead) 1356 ) { 1357 String msg = "Master passed us a different hostname to use; was=" 1358 + this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV; 1359 LOG.error(msg); 1360 throw new IOException(msg); 1361 } 1362 if ( 1363 StringUtils.isBlank(useThisHostnameInstead) 1364 && !hostnameFromMasterPOV.equals(rpcServices.getSocketAddress().getHostName()) 1365 ) { 1366 String msg = "Master passed us a different hostname to use; was=" 1367 + rpcServices.getSocketAddress().getHostName() + ", but now=" + hostnameFromMasterPOV; 1368 LOG.error(msg); 1369 } 1370 continue; 1371 } 1372 1373 String value = e.getValue(); 1374 if (key.equals(HConstants.HBASE_DIR)) { 1375 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1376 updateRootDir = true; 1377 } 1378 } 1379 1380 if (LOG.isDebugEnabled()) { 1381 LOG.debug("Config from master: " + key + "=" + value); 1382 } 1383 this.conf.set(key, value); 1384 } 1385 // Set our ephemeral znode up in zookeeper now we have a name. 1386 createMyEphemeralNode(); 1387 1388 if (updateRootDir) { 1389 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1390 initializeFileSystem(); 1391 } 1392 1393 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1394 // config param for task trackers, but we can piggyback off of it. 1395 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1396 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1397 } 1398 1399 // Save it in a file, this will allow to see if we crash 1400 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1401 1402 // This call sets up an initialized replication and WAL. Later we start it up. 1403 setupWALAndReplication(); 1404 // Init in here rather than in constructor after thread name has been set 1405 final MetricsTable metricsTable = 1406 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1407 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1408 this.metricsRegionServer = 1409 new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable); 1410 // Now that we have a metrics source, start the pause monitor 1411 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1412 pauseMonitor.start(); 1413 1414 // There is a rare case where we do NOT want services to start. Check config. 1415 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1416 startServices(); 1417 } 1418 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1419 // or make sense of it. 1420 startReplicationService(); 1421 1422 // Set up ZK 1423 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress() 1424 + ", sessionid=0x" 1425 + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1426 1427 // Wake up anyone waiting for this server to online 1428 synchronized (online) { 1429 online.set(true); 1430 online.notifyAll(); 1431 } 1432 } catch (Throwable e) { 1433 stop("Failed initialization"); 1434 throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed"); 1435 } finally { 1436 sleeper.skipSleepCycle(); 1437 } 1438 } 1439 1440 private void startHeapMemoryManager() { 1441 if (this.blockCache != null) { 1442 this.hMemManager = 1443 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1444 this.hMemManager.start(getChoreService()); 1445 } 1446 } 1447 1448 private void createMyEphemeralNode() throws KeeperException { 1449 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1450 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1451 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1452 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1453 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1454 } 1455 1456 private void deleteMyEphemeralNode() throws KeeperException { 1457 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1458 } 1459 1460 @Override 1461 public RegionServerAccounting getRegionServerAccounting() { 1462 return regionServerAccounting; 1463 } 1464 1465 // Round the size with KB or MB. 1466 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1 1467 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See 1468 // HBASE-26340 for more details on why this is important. 1469 private static int roundSize(long sizeInByte, int sizeUnit) { 1470 if (sizeInByte == 0) { 1471 return 0; 1472 } else if (sizeInByte < sizeUnit) { 1473 return 1; 1474 } else { 1475 return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE); 1476 } 1477 } 1478 1479 /** 1480 * @param r Region to get RegionLoad for. 1481 * @param regionLoadBldr the RegionLoad.Builder, can be null 1482 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1483 * @return RegionLoad instance. 1484 */ 1485 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1486 RegionSpecifier.Builder regionSpecifier) throws IOException { 1487 byte[] name = r.getRegionInfo().getRegionName(); 1488 int stores = 0; 1489 int storefiles = 0; 1490 int storeRefCount = 0; 1491 int maxCompactedStoreFileRefCount = 0; 1492 long storeUncompressedSize = 0L; 1493 long storefileSize = 0L; 1494 long storefileIndexSize = 0L; 1495 long rootLevelIndexSize = 0L; 1496 long totalStaticIndexSize = 0L; 1497 long totalStaticBloomSize = 0L; 1498 long totalCompactingKVs = 0L; 1499 long currentCompactedKVs = 0L; 1500 List<HStore> storeList = r.getStores(); 1501 stores += storeList.size(); 1502 for (HStore store : storeList) { 1503 storefiles += store.getStorefilesCount(); 1504 int currentStoreRefCount = store.getStoreRefCount(); 1505 storeRefCount += currentStoreRefCount; 1506 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1507 maxCompactedStoreFileRefCount = 1508 Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); 1509 storeUncompressedSize += store.getStoreSizeUncompressed(); 1510 storefileSize += store.getStorefilesSize(); 1511 // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1512 storefileIndexSize += store.getStorefilesRootLevelIndexSize(); 1513 CompactionProgress progress = store.getCompactionProgress(); 1514 if (progress != null) { 1515 totalCompactingKVs += progress.getTotalCompactingKVs(); 1516 currentCompactedKVs += progress.currentCompactedKVs; 1517 } 1518 rootLevelIndexSize += store.getStorefilesRootLevelIndexSize(); 1519 totalStaticIndexSize += store.getTotalStaticIndexSize(); 1520 totalStaticBloomSize += store.getTotalStaticBloomSize(); 1521 } 1522 1523 int unitMB = 1024 * 1024; 1524 int unitKB = 1024; 1525 1526 int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); 1527 int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); 1528 int storefileSizeMB = roundSize(storefileSize, unitMB); 1529 int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB); 1530 int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); 1531 int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); 1532 int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); 1533 1534 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); 1535 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); 1536 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname()); 1537 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); 1538 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); 1539 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); 1540 if (regionLoadBldr == null) { 1541 regionLoadBldr = RegionLoad.newBuilder(); 1542 } 1543 if (regionSpecifier == null) { 1544 regionSpecifier = RegionSpecifier.newBuilder(); 1545 } 1546 1547 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1548 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1549 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores) 1550 .setStorefiles(storefiles).setStoreRefCount(storeRefCount) 1551 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1552 .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB) 1553 .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB) 1554 .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1555 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1556 .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount()) 1557 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1558 .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs) 1559 .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality) 1560 .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) 1561 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) 1562 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) 1563 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); 1564 r.setCompleteSequenceId(regionLoadBldr); 1565 return regionLoadBldr.build(); 1566 } 1567 1568 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1569 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1570 userLoadBldr.setUserName(user); 1571 userSource.getClientMetrics().values().stream() 1572 .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1573 .setHostName(clientMetrics.getHostName()) 1574 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1575 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1576 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) 1577 .forEach(userLoadBldr::addClientMetrics); 1578 return userLoadBldr.build(); 1579 } 1580 1581 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1582 HRegion r = onlineRegions.get(encodedRegionName); 1583 return r != null ? createRegionLoad(r, null, null) : null; 1584 } 1585 1586 /** 1587 * Inner class that runs on a long period checking if regions need compaction. 1588 */ 1589 private static class CompactionChecker extends ScheduledChore { 1590 private final HRegionServer instance; 1591 private final int majorCompactPriority; 1592 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1593 // Iteration is 1-based rather than 0-based so we don't check for compaction 1594 // immediately upon region server startup 1595 private long iteration = 1; 1596 1597 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1598 super("CompactionChecker", stopper, sleepTime); 1599 this.instance = h; 1600 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1601 1602 /* 1603 * MajorCompactPriority is configurable. If not set, the compaction will use default priority. 1604 */ 1605 this.majorCompactPriority = this.instance.conf 1606 .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); 1607 } 1608 1609 @Override 1610 protected void chore() { 1611 for (Region r : this.instance.onlineRegions.values()) { 1612 // Skip compaction if region is read only 1613 if (r == null || r.isReadOnly()) { 1614 continue; 1615 } 1616 1617 HRegion hr = (HRegion) r; 1618 for (HStore s : hr.stores.values()) { 1619 try { 1620 long multiplier = s.getCompactionCheckMultiplier(); 1621 assert multiplier > 0; 1622 if (iteration % multiplier != 0) { 1623 continue; 1624 } 1625 if (s.needsCompaction()) { 1626 // Queue a compaction. Will recognize if major is needed. 1627 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1628 getName() + " requests compaction"); 1629 } else if (s.shouldPerformMajorCompaction()) { 1630 s.triggerMajorCompaction(); 1631 if ( 1632 majorCompactPriority == DEFAULT_PRIORITY 1633 || majorCompactPriority > hr.getCompactPriority() 1634 ) { 1635 this.instance.compactSplitThread.requestCompaction(hr, s, 1636 getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, 1637 CompactionLifeCycleTracker.DUMMY, null); 1638 } else { 1639 this.instance.compactSplitThread.requestCompaction(hr, s, 1640 getName() + " requests major compaction; use configured priority", 1641 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1642 } 1643 } 1644 } catch (IOException e) { 1645 LOG.warn("Failed major compaction check on " + r, e); 1646 } 1647 } 1648 } 1649 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1650 } 1651 } 1652 1653 private static class PeriodicMemStoreFlusher extends ScheduledChore { 1654 private final HRegionServer server; 1655 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1656 private final static int MIN_DELAY_TIME = 0; // millisec 1657 private final long rangeOfDelayMs; 1658 1659 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1660 super("MemstoreFlusherChore", server, cacheFlushInterval); 1661 this.server = server; 1662 1663 final long configuredRangeOfDelay = server.getConfiguration() 1664 .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 1665 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 1666 } 1667 1668 @Override 1669 protected void chore() { 1670 final StringBuilder whyFlush = new StringBuilder(); 1671 for (HRegion r : this.server.onlineRegions.values()) { 1672 if (r == null) { 1673 continue; 1674 } 1675 if (r.shouldFlush(whyFlush)) { 1676 FlushRequester requester = server.getFlushRequester(); 1677 if (requester != null) { 1678 long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME; 1679 // Throttle the flushes by putting a delay. If we don't throttle, and there 1680 // is a balanced write-load on the regions in a table, we might end up 1681 // overwhelming the filesystem with too many flushes at once. 1682 if (requester.requestDelayedFlush(r, delay)) { 1683 LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(), 1684 r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay); 1685 } 1686 } 1687 } 1688 } 1689 } 1690 } 1691 1692 /** 1693 * Report the status of the server. A server is online once all the startup is completed (setting 1694 * up filesystem, starting executorService threads, etc.). This method is designed mostly to be 1695 * useful in tests. 1696 * @return true if online, false if not. 1697 */ 1698 public boolean isOnline() { 1699 return online.get(); 1700 } 1701 1702 /** 1703 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1704 * be hooked up to WAL. 1705 */ 1706 private void setupWALAndReplication() throws IOException { 1707 WALFactory factory = new WALFactory(conf, serverName.toString(), this, true); 1708 // TODO Replication make assumptions here based on the default filesystem impl 1709 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1710 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1711 1712 Path logDir = new Path(walRootDir, logName); 1713 LOG.debug("logDir={}", logDir); 1714 if (this.walFs.exists(logDir)) { 1715 throw new RegionServerRunningException( 1716 "Region server has already created directory at " + this.serverName.toString()); 1717 } 1718 // Always create wal directory as now we need this when master restarts to find out the live 1719 // region servers. 1720 if (!this.walFs.mkdirs(logDir)) { 1721 throw new IOException("Can not create wal directory " + logDir); 1722 } 1723 // Instantiate replication if replication enabled. Pass it the log directories. 1724 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); 1725 this.walFactory = factory; 1726 } 1727 1728 /** 1729 * Start up replication source and sink handlers. 1730 */ 1731 private void startReplicationService() throws IOException { 1732 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 1733 this.replicationSourceHandler.startReplicationService(); 1734 } else { 1735 if (this.replicationSourceHandler != null) { 1736 this.replicationSourceHandler.startReplicationService(); 1737 } 1738 if (this.replicationSinkHandler != null) { 1739 this.replicationSinkHandler.startReplicationService(); 1740 } 1741 } 1742 } 1743 1744 /** 1745 * @return Master address tracker instance. 1746 */ 1747 public MasterAddressTracker getMasterAddressTracker() { 1748 return this.masterAddressTracker; 1749 } 1750 1751 /** 1752 * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need 1753 * to run. This is called after we've successfully registered with the Master. Install an 1754 * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We 1755 * cannot set the handler on all threads. Server's internal Listener thread is off limits. For 1756 * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries 1757 * to run should trigger same critical condition and the shutdown will run. On its way out, this 1758 * server will shut down Server. Leases are sort of inbetween. It has an internal thread that 1759 * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped 1760 * by this hosting server. Worker logs the exception and exits. 1761 */ 1762 private void startServices() throws IOException { 1763 if (!isStopped() && !isAborted()) { 1764 initializeThreads(); 1765 } 1766 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection); 1767 this.secureBulkLoadManager.start(); 1768 1769 // Health checker thread. 1770 if (isHealthCheckerConfigured()) { 1771 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1772 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1773 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1774 } 1775 // Executor status collect thread. 1776 if ( 1777 this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED, 1778 HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED) 1779 ) { 1780 int sleepTime = 1781 this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ); 1782 executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(), 1783 this.metricsRegionServer.getMetricsSource()); 1784 } 1785 1786 this.walRoller = new LogRoller(this); 1787 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1788 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 1789 1790 // Create the CompactedFileDischarger chore executorService. This chore helps to 1791 // remove the compacted files that will no longer be used in reads. 1792 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1793 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1794 int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1795 this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this); 1796 choreService.scheduleChore(compactedFileDischarger); 1797 1798 // Start executor services 1799 final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3); 1800 executorService.startExecutorService(executorService.new ExecutorConfig() 1801 .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads)); 1802 final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1); 1803 executorService.startExecutorService(executorService.new ExecutorConfig() 1804 .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads)); 1805 final int openPriorityRegionThreads = 1806 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3); 1807 executorService.startExecutorService( 1808 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION) 1809 .setCorePoolSize(openPriorityRegionThreads)); 1810 final int closeRegionThreads = 1811 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3); 1812 executorService.startExecutorService(executorService.new ExecutorConfig() 1813 .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads)); 1814 final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1); 1815 executorService.startExecutorService(executorService.new ExecutorConfig() 1816 .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads)); 1817 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1818 final int storeScannerParallelSeekThreads = 1819 conf.getInt("hbase.storescanner.parallel.seek.threads", 10); 1820 executorService.startExecutorService( 1821 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK) 1822 .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true)); 1823 } 1824 final int logReplayOpsThreads = 1825 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 1826 executorService.startExecutorService( 1827 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS) 1828 .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true)); 1829 // Start the threads for compacted files discharger 1830 final int compactionDischargerThreads = 1831 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10); 1832 executorService.startExecutorService(executorService.new ExecutorConfig() 1833 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER) 1834 .setCorePoolSize(compactionDischargerThreads)); 1835 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1836 final int regionReplicaFlushThreads = 1837 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1838 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1839 executorService.startExecutorService(executorService.new ExecutorConfig() 1840 .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS) 1841 .setCorePoolSize(regionReplicaFlushThreads)); 1842 } 1843 final int refreshPeerThreads = 1844 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2); 1845 executorService.startExecutorService(executorService.new ExecutorConfig() 1846 .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads)); 1847 final int replaySyncReplicationWALThreads = 1848 conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1); 1849 executorService.startExecutorService(executorService.new ExecutorConfig() 1850 .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL) 1851 .setCorePoolSize(replaySyncReplicationWALThreads)); 1852 final int switchRpcThrottleThreads = 1853 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); 1854 executorService.startExecutorService( 1855 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE) 1856 .setCorePoolSize(switchRpcThrottleThreads)); 1857 final int claimReplicationQueueThreads = 1858 conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); 1859 executorService.startExecutorService( 1860 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE) 1861 .setCorePoolSize(claimReplicationQueueThreads)); 1862 final int rsSnapshotOperationThreads = 1863 conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); 1864 executorService.startExecutorService( 1865 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) 1866 .setCorePoolSize(rsSnapshotOperationThreads)); 1867 1868 Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", 1869 uncaughtExceptionHandler); 1870 if (this.cacheFlusher != null) { 1871 this.cacheFlusher.start(uncaughtExceptionHandler); 1872 } 1873 Threads.setDaemonThreadRunning(this.procedureResultReporter, 1874 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 1875 1876 if (this.compactionChecker != null) { 1877 choreService.scheduleChore(compactionChecker); 1878 } 1879 if (this.periodicFlusher != null) { 1880 choreService.scheduleChore(periodicFlusher); 1881 } 1882 if (this.healthCheckChore != null) { 1883 choreService.scheduleChore(healthCheckChore); 1884 } 1885 if (this.executorStatusChore != null) { 1886 choreService.scheduleChore(executorStatusChore); 1887 } 1888 if (this.nonceManagerChore != null) { 1889 choreService.scheduleChore(nonceManagerChore); 1890 } 1891 if (this.storefileRefresher != null) { 1892 choreService.scheduleChore(storefileRefresher); 1893 } 1894 if (this.fsUtilizationChore != null) { 1895 choreService.scheduleChore(fsUtilizationChore); 1896 } 1897 if (this.slowLogTableOpsChore != null) { 1898 choreService.scheduleChore(slowLogTableOpsChore); 1899 } 1900 if (this.brokenStoreFileCleaner != null) { 1901 choreService.scheduleChore(brokenStoreFileCleaner); 1902 } 1903 1904 if (this.rsMobFileCleanerChore != null) { 1905 choreService.scheduleChore(rsMobFileCleanerChore); 1906 } 1907 1908 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 1909 // an unhandled exception, it will just exit. 1910 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 1911 uncaughtExceptionHandler); 1912 1913 // Create the log splitting worker and start it 1914 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 1915 // quite a while inside Connection layer. The worker won't be available for other 1916 // tasks even after current task is preempted after a split task times out. 1917 Configuration sinkConf = HBaseConfiguration.create(conf); 1918 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1919 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 1920 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1921 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 1922 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 1923 if ( 1924 this.csm != null 1925 && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 1926 ) { 1927 // SplitLogWorker needs csm. If none, don't start this. 1928 this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); 1929 splitLogWorker.start(); 1930 LOG.debug("SplitLogWorker started"); 1931 } 1932 1933 // Memstore services. 1934 startHeapMemoryManager(); 1935 // Call it after starting HeapMemoryManager. 1936 initializeMemStoreChunkCreator(hMemManager); 1937 } 1938 1939 private void initializeThreads() { 1940 // Cache flushing thread. 1941 this.cacheFlusher = new MemStoreFlusher(conf, this); 1942 1943 // Compaction thread 1944 this.compactSplitThread = new CompactSplit(this); 1945 1946 // Background thread to check for compactions; needed if region has not gotten updates 1947 // in a while. It will take care of not checking too frequently on store-by-store basis. 1948 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 1949 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 1950 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 1951 1952 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 1953 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 1954 if (isSlowLogTableEnabled) { 1955 // default chore duration: 10 min 1956 final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); 1957 slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder); 1958 } 1959 1960 if (this.nonceManager != null) { 1961 // Create the scheduled chore that cleans up nonces. 1962 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 1963 } 1964 1965 // Setup the Quota Manager 1966 rsQuotaManager = new RegionServerRpcQuotaManager(this); 1967 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 1968 1969 if (QuotaUtil.isQuotaEnabled(conf)) { 1970 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 1971 } 1972 1973 boolean onlyMetaRefresh = false; 1974 int storefileRefreshPeriod = 1975 conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1976 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 1977 if (storefileRefreshPeriod == 0) { 1978 storefileRefreshPeriod = 1979 conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 1980 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 1981 onlyMetaRefresh = true; 1982 } 1983 if (storefileRefreshPeriod > 0) { 1984 this.storefileRefresher = 1985 new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); 1986 } 1987 1988 int brokenStoreFileCleanerPeriod = 1989 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, 1990 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); 1991 int brokenStoreFileCleanerDelay = 1992 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, 1993 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); 1994 double brokenStoreFileCleanerDelayJitter = 1995 conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, 1996 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); 1997 double jitterRate = 1998 (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; 1999 long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); 2000 this.brokenStoreFileCleaner = 2001 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), 2002 brokenStoreFileCleanerPeriod, this, conf, this); 2003 2004 this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); 2005 2006 registerConfigurationObservers(); 2007 } 2008 2009 private void registerConfigurationObservers() { 2010 // Registering the compactSplitThread object with the ConfigurationManager. 2011 configurationManager.registerObserver(this.compactSplitThread); 2012 configurationManager.registerObserver(this.rpcServices); 2013 configurationManager.registerObserver(this); 2014 } 2015 2016 /* 2017 * Verify that server is healthy 2018 */ 2019 private boolean isHealthy() { 2020 if (!dataFsOk) { 2021 // File system problem 2022 return false; 2023 } 2024 // Verify that all threads are alive 2025 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2026 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2027 && (this.walRoller == null || this.walRoller.isAlive()) 2028 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2029 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2030 if (!healthy) { 2031 stop("One or more threads are no longer alive -- stop"); 2032 } 2033 return healthy; 2034 } 2035 2036 @Override 2037 public List<WAL> getWALs() { 2038 return walFactory.getWALs(); 2039 } 2040 2041 @Override 2042 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2043 WAL wal = walFactory.getWAL(regionInfo); 2044 if (this.walRoller != null) { 2045 this.walRoller.addWAL(wal); 2046 } 2047 return wal; 2048 } 2049 2050 public LogRoller getWalRoller() { 2051 return walRoller; 2052 } 2053 2054 WALFactory getWalFactory() { 2055 return walFactory; 2056 } 2057 2058 @Override 2059 public void stop(final String msg) { 2060 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2061 } 2062 2063 /** 2064 * Stops the regionserver. 2065 * @param msg Status message 2066 * @param force True if this is a regionserver abort 2067 * @param user The user executing the stop request, or null if no user is associated 2068 */ 2069 public void stop(final String msg, final boolean force, final User user) { 2070 if (!this.stopped) { 2071 LOG.info("***** STOPPING region server '" + this + "' *****"); 2072 if (this.rsHost != null) { 2073 // when forced via abort don't allow CPs to override 2074 try { 2075 this.rsHost.preStop(msg, user); 2076 } catch (IOException ioe) { 2077 if (!force) { 2078 LOG.warn("The region server did not stop", ioe); 2079 return; 2080 } 2081 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2082 } 2083 } 2084 this.stopped = true; 2085 LOG.info("STOPPED: " + msg); 2086 // Wakes run() if it is sleeping 2087 sleeper.skipSleepCycle(); 2088 } 2089 } 2090 2091 public void waitForServerOnline() { 2092 while (!isStopped() && !isOnline()) { 2093 synchronized (online) { 2094 try { 2095 online.wait(msgInterval); 2096 } catch (InterruptedException ie) { 2097 Thread.currentThread().interrupt(); 2098 break; 2099 } 2100 } 2101 } 2102 } 2103 2104 @Override 2105 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2106 HRegion r = context.getRegion(); 2107 long openProcId = context.getOpenProcId(); 2108 long masterSystemTime = context.getMasterSystemTime(); 2109 rpcServices.checkOpen(); 2110 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2111 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2112 // Do checks to see if we need to compact (references or too many files) 2113 // Skip compaction check if region is read only 2114 if (!r.isReadOnly()) { 2115 for (HStore s : r.stores.values()) { 2116 if (s.hasReferences() || s.needsCompaction()) { 2117 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2118 } 2119 } 2120 } 2121 long openSeqNum = r.getOpenSeqNum(); 2122 if (openSeqNum == HConstants.NO_SEQNUM) { 2123 // If we opened a region, we should have read some sequence number from it. 2124 LOG.error( 2125 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2126 openSeqNum = 0; 2127 } 2128 2129 // Notify master 2130 if ( 2131 !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2132 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo())) 2133 ) { 2134 throw new IOException( 2135 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2136 } 2137 2138 triggerFlushInPrimaryRegion(r); 2139 2140 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2141 } 2142 2143 /** 2144 * Helper method for use in tests. Skip the region transition report when there's no master around 2145 * to receive it. 2146 */ 2147 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2148 final TransitionCode code = context.getCode(); 2149 final long openSeqNum = context.getOpenSeqNum(); 2150 long masterSystemTime = context.getMasterSystemTime(); 2151 final RegionInfo[] hris = context.getHris(); 2152 2153 if (code == TransitionCode.OPENED) { 2154 Preconditions.checkArgument(hris != null && hris.length == 1); 2155 if (hris[0].isMetaRegion()) { 2156 LOG.warn( 2157 "meta table location is stored in master local store, so we can not skip reporting"); 2158 return false; 2159 } else { 2160 try { 2161 MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0], 2162 serverName, openSeqNum, masterSystemTime); 2163 } catch (IOException e) { 2164 LOG.info("Failed to update meta", e); 2165 return false; 2166 } 2167 } 2168 } 2169 return true; 2170 } 2171 2172 private ReportRegionStateTransitionRequest 2173 createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) { 2174 final TransitionCode code = context.getCode(); 2175 final long openSeqNum = context.getOpenSeqNum(); 2176 final RegionInfo[] hris = context.getHris(); 2177 final long[] procIds = context.getProcIds(); 2178 2179 ReportRegionStateTransitionRequest.Builder builder = 2180 ReportRegionStateTransitionRequest.newBuilder(); 2181 builder.setServer(ProtobufUtil.toServerName(serverName)); 2182 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2183 transition.setTransitionCode(code); 2184 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2185 transition.setOpenSeqNum(openSeqNum); 2186 } 2187 for (RegionInfo hri : hris) { 2188 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2189 } 2190 for (long procId : procIds) { 2191 transition.addProcId(procId); 2192 } 2193 2194 return builder.build(); 2195 } 2196 2197 @Override 2198 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2199 if (TEST_SKIP_REPORTING_TRANSITION) { 2200 return skipReportingTransition(context); 2201 } 2202 final ReportRegionStateTransitionRequest request = 2203 createReportRegionStateTransitionRequest(context); 2204 2205 int tries = 0; 2206 long pauseTime = this.retryPauseTime; 2207 // Keep looping till we get an error. We want to send reports even though server is going down. 2208 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2209 // HRegionServer does down. 2210 while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) { 2211 RegionServerStatusService.BlockingInterface rss = rssStub; 2212 try { 2213 if (rss == null) { 2214 createRegionServerStatusStub(); 2215 continue; 2216 } 2217 ReportRegionStateTransitionResponse response = 2218 rss.reportRegionStateTransition(null, request); 2219 if (response.hasErrorMessage()) { 2220 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2221 break; 2222 } 2223 // Log if we had to retry else don't log unless TRACE. We want to 2224 // know if were successful after an attempt showed in logs as failed. 2225 if (tries > 0 || LOG.isTraceEnabled()) { 2226 LOG.info("TRANSITION REPORTED " + request); 2227 } 2228 // NOTE: Return mid-method!!! 2229 return true; 2230 } catch (ServiceException se) { 2231 IOException ioe = ProtobufUtil.getRemoteException(se); 2232 boolean pause = ioe instanceof ServerNotRunningYetException 2233 || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException; 2234 if (pause) { 2235 // Do backoff else we flood the Master with requests. 2236 pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries); 2237 } else { 2238 pauseTime = this.retryPauseTime; // Reset. 2239 } 2240 LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" 2241 + tries + ")" 2242 + (pause 2243 ? " after " + pauseTime + "ms delay (Master is coming online...)." 2244 : " immediately."), 2245 ioe); 2246 if (pause) { 2247 Threads.sleep(pauseTime); 2248 } 2249 tries++; 2250 if (rssStub == rss) { 2251 rssStub = null; 2252 } 2253 } 2254 } 2255 return false; 2256 } 2257 2258 /** 2259 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2260 * block this thread. See RegionReplicaFlushHandler for details. 2261 */ 2262 private void triggerFlushInPrimaryRegion(final HRegion region) { 2263 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2264 return; 2265 } 2266 TableName tn = region.getTableDescriptor().getTableName(); 2267 if ( 2268 !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) 2269 || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) || 2270 // If the memstore replication not setup, we do not have to wait for observing a flush event 2271 // from primary before starting to serve reads, because gaps from replication is not 2272 // applicable,this logic is from 2273 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by 2274 // HBASE-13063 2275 !region.getTableDescriptor().hasRegionMemStoreReplication() 2276 ) { 2277 region.setReadsEnabled(true); 2278 return; 2279 } 2280 2281 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2282 // RegionReplicaFlushHandler might reset this. 2283 2284 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2285 if (this.executorService != null) { 2286 this.executorService.submit(new RegionReplicaFlushHandler(this, region)); 2287 } else { 2288 LOG.info("Executor is null; not running flush of primary region replica for {}", 2289 region.getRegionInfo()); 2290 } 2291 } 2292 2293 @InterfaceAudience.Private 2294 public RSRpcServices getRSRpcServices() { 2295 return rpcServices; 2296 } 2297 2298 /** 2299 * Cause the server to exit without closing the regions it is serving, the log it is using and 2300 * without notifying the master. Used unit testing and on catastrophic events such as HDFS is 2301 * yanked out from under hbase or we OOME. n * the reason we are aborting n * the exception that 2302 * caused the abort, or null 2303 */ 2304 @Override 2305 public void abort(String reason, Throwable cause) { 2306 if (!setAbortRequested()) { 2307 // Abort already in progress, ignore the new request. 2308 LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason); 2309 return; 2310 } 2311 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2312 if (cause != null) { 2313 LOG.error(HBaseMarkers.FATAL, msg, cause); 2314 } else { 2315 LOG.error(HBaseMarkers.FATAL, msg); 2316 } 2317 // HBASE-4014: show list of coprocessors that were loaded to help debug 2318 // regionserver crashes.Note that we're implicitly using 2319 // java.util.HashSet's toString() method to print the coprocessor names. 2320 LOG.error(HBaseMarkers.FATAL, 2321 "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors()); 2322 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2323 try { 2324 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2325 } catch (MalformedObjectNameException | IOException e) { 2326 LOG.warn("Failed dumping metrics", e); 2327 } 2328 2329 // Do our best to report our abort to the master, but this may not work 2330 try { 2331 if (cause != null) { 2332 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2333 } 2334 // Report to the master but only if we have already registered with the master. 2335 RegionServerStatusService.BlockingInterface rss = rssStub; 2336 if (rss != null && this.serverName != null) { 2337 ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); 2338 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2339 builder.setErrorMessage(msg); 2340 rss.reportRSFatalError(null, builder.build()); 2341 } 2342 } catch (Throwable t) { 2343 LOG.warn("Unable to report fatal error to master", t); 2344 } 2345 2346 scheduleAbortTimer(); 2347 // shutdown should be run as the internal user 2348 stop(reason, true, null); 2349 } 2350 2351 /* 2352 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does 2353 * close socket in case want to bring up server on old hostname+port immediately. 2354 */ 2355 @InterfaceAudience.Private 2356 protected void kill() { 2357 this.killed = true; 2358 abort("Simulated kill"); 2359 } 2360 2361 // Limits the time spent in the shutdown process. 2362 private void scheduleAbortTimer() { 2363 if (this.abortMonitor == null) { 2364 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2365 TimerTask abortTimeoutTask = null; 2366 try { 2367 Constructor<? extends TimerTask> timerTaskCtor = 2368 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2369 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2370 timerTaskCtor.setAccessible(true); 2371 abortTimeoutTask = timerTaskCtor.newInstance(); 2372 } catch (Exception e) { 2373 LOG.warn("Initialize abort timeout task failed", e); 2374 } 2375 if (abortTimeoutTask != null) { 2376 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2377 } 2378 } 2379 } 2380 2381 /** 2382 * Wait on all threads to finish. Presumption is that all closes and stops have already been 2383 * called. 2384 */ 2385 protected void stopServiceThreads() { 2386 // clean up the scheduled chores 2387 stopChoreService(); 2388 if (bootstrapNodeManager != null) { 2389 bootstrapNodeManager.stop(); 2390 } 2391 if (this.cacheFlusher != null) { 2392 this.cacheFlusher.join(); 2393 } 2394 if (this.walRoller != null) { 2395 this.walRoller.close(); 2396 } 2397 if (this.compactSplitThread != null) { 2398 this.compactSplitThread.join(); 2399 } 2400 stopExecutorService(); 2401 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 2402 this.replicationSourceHandler.stopReplicationService(); 2403 } else { 2404 if (this.replicationSourceHandler != null) { 2405 this.replicationSourceHandler.stopReplicationService(); 2406 } 2407 if (this.replicationSinkHandler != null) { 2408 this.replicationSinkHandler.stopReplicationService(); 2409 } 2410 } 2411 } 2412 2413 /** 2414 * @return Return the object that implements the replication source executorService. 2415 */ 2416 @Override 2417 public ReplicationSourceService getReplicationSourceService() { 2418 return replicationSourceHandler; 2419 } 2420 2421 /** 2422 * @return Return the object that implements the replication sink executorService. 2423 */ 2424 public ReplicationSinkService getReplicationSinkService() { 2425 return replicationSinkHandler; 2426 } 2427 2428 /** 2429 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2430 * connection, the current rssStub must be null. Method will block until a master is available. 2431 * You can break from this block by requesting the server stop. 2432 * @return master + port, or null if server has been stopped 2433 */ 2434 private synchronized ServerName createRegionServerStatusStub() { 2435 // Create RS stub without refreshing the master node from ZK, use cached data 2436 return createRegionServerStatusStub(false); 2437 } 2438 2439 /** 2440 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2441 * connection, the current rssStub must be null. Method will block until a master is available. 2442 * You can break from this block by requesting the server stop. 2443 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2444 * @return master + port, or null if server has been stopped 2445 */ 2446 @InterfaceAudience.Private 2447 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2448 if (rssStub != null) { 2449 return masterAddressTracker.getMasterAddress(); 2450 } 2451 ServerName sn = null; 2452 long previousLogTime = 0; 2453 RegionServerStatusService.BlockingInterface intRssStub = null; 2454 LockService.BlockingInterface intLockStub = null; 2455 boolean interrupted = false; 2456 try { 2457 while (keepLooping()) { 2458 sn = this.masterAddressTracker.getMasterAddress(refresh); 2459 if (sn == null) { 2460 if (!keepLooping()) { 2461 // give up with no connection. 2462 LOG.debug("No master found and cluster is stopped; bailing out"); 2463 return null; 2464 } 2465 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2466 LOG.debug("No master found; retry"); 2467 previousLogTime = EnvironmentEdgeManager.currentTime(); 2468 } 2469 refresh = true; // let's try pull it from ZK directly 2470 if (sleepInterrupted(200)) { 2471 interrupted = true; 2472 } 2473 continue; 2474 } 2475 try { 2476 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, 2477 userProvider.getCurrent(), shortOperationTimeout); 2478 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2479 intLockStub = LockService.newBlockingStub(channel); 2480 break; 2481 } catch (IOException e) { 2482 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2483 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 2484 if (e instanceof ServerNotRunningYetException) { 2485 LOG.info("Master isn't available yet, retrying"); 2486 } else { 2487 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2488 } 2489 previousLogTime = EnvironmentEdgeManager.currentTime(); 2490 } 2491 if (sleepInterrupted(200)) { 2492 interrupted = true; 2493 } 2494 } 2495 } 2496 } finally { 2497 if (interrupted) { 2498 Thread.currentThread().interrupt(); 2499 } 2500 } 2501 this.rssStub = intRssStub; 2502 this.lockStub = intLockStub; 2503 return sn; 2504 } 2505 2506 /** 2507 * @return True if we should break loop because cluster is going down or this server has been 2508 * stopped or hdfs has gone bad. 2509 */ 2510 private boolean keepLooping() { 2511 return !this.stopped && isClusterUp(); 2512 } 2513 2514 /* 2515 * Let the master know we're here Run initialization using parameters passed us by the master. 2516 * @return A Map of key/value configurations we got from the Master else null if we failed to 2517 * register. n 2518 */ 2519 private RegionServerStartupResponse reportForDuty() throws IOException { 2520 if (this.masterless) { 2521 return RegionServerStartupResponse.getDefaultInstance(); 2522 } 2523 ServerName masterServerName = createRegionServerStatusStub(true); 2524 RegionServerStatusService.BlockingInterface rss = rssStub; 2525 if (masterServerName == null || rss == null) { 2526 return null; 2527 } 2528 RegionServerStartupResponse result = null; 2529 try { 2530 rpcServices.requestCount.reset(); 2531 rpcServices.rpcGetRequestCount.reset(); 2532 rpcServices.rpcScanRequestCount.reset(); 2533 rpcServices.rpcFullScanRequestCount.reset(); 2534 rpcServices.rpcMultiRequestCount.reset(); 2535 rpcServices.rpcMutateRequestCount.reset(); 2536 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2537 + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode); 2538 long now = EnvironmentEdgeManager.currentTime(); 2539 int port = rpcServices.getSocketAddress().getPort(); 2540 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2541 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2542 request.setUseThisHostnameInstead(useThisHostnameInstead); 2543 } 2544 request.setPort(port); 2545 request.setServerStartCode(this.startcode); 2546 request.setServerCurrentTime(now); 2547 result = rss.regionServerStartup(null, request.build()); 2548 } catch (ServiceException se) { 2549 IOException ioe = ProtobufUtil.getRemoteException(se); 2550 if (ioe instanceof ClockOutOfSyncException) { 2551 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe); 2552 // Re-throw IOE will cause RS to abort 2553 throw ioe; 2554 } else if (ioe instanceof ServerNotRunningYetException) { 2555 LOG.debug("Master is not running yet"); 2556 } else { 2557 LOG.warn("error telling master we are up", se); 2558 } 2559 rssStub = null; 2560 } 2561 return result; 2562 } 2563 2564 @Override 2565 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2566 try { 2567 GetLastFlushedSequenceIdRequest req = 2568 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2569 RegionServerStatusService.BlockingInterface rss = rssStub; 2570 if (rss == null) { // Try to connect one more time 2571 createRegionServerStatusStub(); 2572 rss = rssStub; 2573 if (rss == null) { 2574 // Still no luck, we tried 2575 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2576 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2577 .build(); 2578 } 2579 } 2580 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2581 return RegionStoreSequenceIds.newBuilder() 2582 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2583 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2584 } catch (ServiceException e) { 2585 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2586 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2587 .build(); 2588 } 2589 } 2590 2591 /** 2592 * Close meta region if we carry it 2593 * @param abort Whether we're running an abort. 2594 */ 2595 private void closeMetaTableRegions(final boolean abort) { 2596 HRegion meta = null; 2597 this.onlineRegionsLock.writeLock().lock(); 2598 try { 2599 for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) { 2600 RegionInfo hri = e.getValue().getRegionInfo(); 2601 if (hri.isMetaRegion()) { 2602 meta = e.getValue(); 2603 } 2604 if (meta != null) { 2605 break; 2606 } 2607 } 2608 } finally { 2609 this.onlineRegionsLock.writeLock().unlock(); 2610 } 2611 if (meta != null) { 2612 closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2613 } 2614 } 2615 2616 /** 2617 * Schedule closes on all user regions. Should be safe calling multiple times because it wont' 2618 * close regions that are already closed or that are closing. 2619 * @param abort Whether we're running an abort. 2620 */ 2621 private void closeUserRegions(final boolean abort) { 2622 this.onlineRegionsLock.writeLock().lock(); 2623 try { 2624 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 2625 HRegion r = e.getValue(); 2626 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2627 // Don't update zk with this close transition; pass false. 2628 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2629 } 2630 } 2631 } finally { 2632 this.onlineRegionsLock.writeLock().unlock(); 2633 } 2634 } 2635 2636 protected Map<String, HRegion> getOnlineRegions() { 2637 return this.onlineRegions; 2638 } 2639 2640 public int getNumberOfOnlineRegions() { 2641 return this.onlineRegions.size(); 2642 } 2643 2644 /** 2645 * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM 2646 * as client; HRegion cannot be serialized to cross an rpc. 2647 */ 2648 public Collection<HRegion> getOnlineRegionsLocalContext() { 2649 Collection<HRegion> regions = this.onlineRegions.values(); 2650 return Collections.unmodifiableCollection(regions); 2651 } 2652 2653 @Override 2654 public void addRegion(HRegion region) { 2655 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2656 configurationManager.registerObserver(region); 2657 } 2658 2659 private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region, 2660 long size) { 2661 if (!sortedRegions.containsKey(size)) { 2662 sortedRegions.put(size, new ArrayList<>()); 2663 } 2664 sortedRegions.get(size).add(region); 2665 } 2666 2667 /** 2668 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2669 * the biggest. 2670 */ 2671 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2672 // we'll sort the regions in reverse 2673 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2674 // Copy over all regions. Regions are sorted by size with biggest first. 2675 for (HRegion region : this.onlineRegions.values()) { 2676 addRegion(sortedRegions, region, region.getMemStoreOffHeapSize()); 2677 } 2678 return sortedRegions; 2679 } 2680 2681 /** 2682 * @return A new Map of online regions sorted by region heap size with the first entry being the 2683 * biggest. 2684 */ 2685 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2686 // we'll sort the regions in reverse 2687 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2688 // Copy over all regions. Regions are sorted by size with biggest first. 2689 for (HRegion region : this.onlineRegions.values()) { 2690 addRegion(sortedRegions, region, region.getMemStoreHeapSize()); 2691 } 2692 return sortedRegions; 2693 } 2694 2695 /** @return reference to FlushRequester */ 2696 @Override 2697 public FlushRequester getFlushRequester() { 2698 return this.cacheFlusher; 2699 } 2700 2701 @Override 2702 public CompactionRequester getCompactionRequestor() { 2703 return this.compactSplitThread; 2704 } 2705 2706 @Override 2707 public LeaseManager getLeaseManager() { 2708 return leaseManager; 2709 } 2710 2711 /** 2712 * @return {@code true} when the data file system is available, {@code false} otherwise. 2713 */ 2714 boolean isDataFileSystemOk() { 2715 return this.dataFsOk; 2716 } 2717 2718 public RegionServerCoprocessorHost getRegionServerCoprocessorHost() { 2719 return this.rsHost; 2720 } 2721 2722 @Override 2723 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2724 return this.regionsInTransitionInRS; 2725 } 2726 2727 @Override 2728 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 2729 return rsQuotaManager; 2730 } 2731 2732 // 2733 // Main program and support routines 2734 // 2735 /** 2736 * Load the replication executorService objects, if any 2737 */ 2738 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 2739 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { 2740 // read in the name of the source replication class from the config file. 2741 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 2742 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2743 2744 // read in the name of the sink replication class from the config file. 2745 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 2746 HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT); 2747 2748 // If both the sink and the source class names are the same, then instantiate 2749 // only one object. 2750 if (sourceClassname.equals(sinkClassname)) { 2751 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2752 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2753 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 2754 server.sameReplicationSourceAndSink = true; 2755 } else { 2756 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2757 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2758 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 2759 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2760 server.sameReplicationSourceAndSink = false; 2761 } 2762 } 2763 2764 private static <T extends ReplicationService> T newReplicationInstance(String classname, 2765 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 2766 Path oldLogDir, WALFactory walFactory) throws IOException { 2767 final Class<? extends T> clazz; 2768 try { 2769 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 2770 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 2771 } catch (java.lang.ClassNotFoundException nfe) { 2772 throw new IOException("Could not find class for " + classname); 2773 } 2774 T service = ReflectionUtils.newInstance(clazz, conf); 2775 service.initialize(server, walFs, logDir, oldLogDir, walFactory); 2776 return service; 2777 } 2778 2779 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() { 2780 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 2781 if (!this.isOnline()) { 2782 return walGroupsReplicationStatus; 2783 } 2784 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 2785 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 2786 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 2787 for (ReplicationSourceInterface source : allSources) { 2788 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 2789 } 2790 return walGroupsReplicationStatus; 2791 } 2792 2793 /** 2794 * Utility for constructing an instance of the passed HRegionServer class. 2795 */ 2796 static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass, 2797 final Configuration conf) { 2798 try { 2799 Constructor<? extends HRegionServer> c = 2800 regionServerClass.getConstructor(Configuration.class); 2801 return c.newInstance(conf); 2802 } catch (Exception e) { 2803 throw new RuntimeException( 2804 "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); 2805 } 2806 } 2807 2808 /** 2809 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 2810 */ 2811 public static void main(String[] args) { 2812 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 2813 VersionInfo.logVersion(); 2814 Configuration conf = HBaseConfiguration.create(); 2815 @SuppressWarnings("unchecked") 2816 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 2817 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 2818 2819 new HRegionServerCommandLine(regionServerClass).doMain(args); 2820 } 2821 2822 /** 2823 * Gets the online regions of the specified table. This method looks at the in-memory 2824 * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions. 2825 * If a region on this table has been closed during a disable, etc., it will not be included in 2826 * the returned list. So, the returned list may not necessarily be ALL regions in this table, its 2827 * all the ONLINE regions in the table. 2828 * @param tableName table to limit the scope of the query 2829 * @return Online regions from <code>tableName</code> 2830 */ 2831 @Override 2832 public List<HRegion> getRegions(TableName tableName) { 2833 List<HRegion> tableRegions = new ArrayList<>(); 2834 synchronized (this.onlineRegions) { 2835 for (HRegion region : this.onlineRegions.values()) { 2836 RegionInfo regionInfo = region.getRegionInfo(); 2837 if (regionInfo.getTable().equals(tableName)) { 2838 tableRegions.add(region); 2839 } 2840 } 2841 } 2842 return tableRegions; 2843 } 2844 2845 @Override 2846 public List<HRegion> getRegions() { 2847 List<HRegion> allRegions; 2848 synchronized (this.onlineRegions) { 2849 // Return a clone copy of the onlineRegions 2850 allRegions = new ArrayList<>(onlineRegions.values()); 2851 } 2852 return allRegions; 2853 } 2854 2855 /** 2856 * Gets the online tables in this RS. This method looks at the in-memory onlineRegions. 2857 * @return all the online tables in this RS 2858 */ 2859 public Set<TableName> getOnlineTables() { 2860 Set<TableName> tables = new HashSet<>(); 2861 synchronized (this.onlineRegions) { 2862 for (Region region : this.onlineRegions.values()) { 2863 tables.add(region.getTableDescriptor().getTableName()); 2864 } 2865 } 2866 return tables; 2867 } 2868 2869 public String[] getRegionServerCoprocessors() { 2870 TreeSet<String> coprocessors = new TreeSet<>(); 2871 try { 2872 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 2873 } catch (IOException exception) { 2874 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " 2875 + "skipping."); 2876 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 2877 } 2878 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 2879 for (HRegion region : regions) { 2880 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 2881 try { 2882 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 2883 } catch (IOException exception) { 2884 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region 2885 + "; skipping."); 2886 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 2887 } 2888 } 2889 coprocessors.addAll(rsHost.getCoprocessors()); 2890 return coprocessors.toArray(new String[0]); 2891 } 2892 2893 /** 2894 * Try to close the region, logs a warning on failure but continues. 2895 * @param region Region to close 2896 */ 2897 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 2898 try { 2899 if (!closeRegion(region.getEncodedName(), abort, null)) { 2900 LOG 2901 .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing"); 2902 } 2903 } catch (IOException e) { 2904 LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing", 2905 e); 2906 } 2907 } 2908 2909 /** 2910 * Close asynchronously a region, can be called from the master or internally by the regionserver 2911 * when stopping. If called from the master, the region will update the status. 2912 * <p> 2913 * If an opening was in progress, this method will cancel it, but will not start a new close. The 2914 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 2915 * </p> 2916 * <p> 2917 * If a close was in progress, this new request will be ignored, and an exception thrown. 2918 * </p> 2919 * @param encodedName Region to close 2920 * @param abort True if we are aborting 2921 * @param destination Where the Region is being moved too... maybe null if unknown. 2922 * @return True if closed a region. 2923 * @throws NotServingRegionException if the region is not online 2924 */ 2925 protected boolean closeRegion(String encodedName, final boolean abort, 2926 final ServerName destination) throws NotServingRegionException { 2927 // Check for permissions to close. 2928 HRegion actualRegion = this.getRegion(encodedName); 2929 // Can be null if we're calling close on a region that's not online 2930 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 2931 try { 2932 actualRegion.getCoprocessorHost().preClose(false); 2933 } catch (IOException exp) { 2934 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 2935 return false; 2936 } 2937 } 2938 2939 // previous can come back 'null' if not in map. 2940 final Boolean previous = 2941 this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE); 2942 2943 if (Boolean.TRUE.equals(previous)) { 2944 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " 2945 + "trying to OPEN. Cancelling OPENING."); 2946 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 2947 // The replace failed. That should be an exceptional case, but theoretically it can happen. 2948 // We're going to try to do a standard close then. 2949 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." 2950 + " Doing a standard close now"); 2951 return closeRegion(encodedName, abort, destination); 2952 } 2953 // Let's get the region from the online region list again 2954 actualRegion = this.getRegion(encodedName); 2955 if (actualRegion == null) { // If already online, we still need to close it. 2956 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 2957 // The master deletes the znode when it receives this exception. 2958 throw new NotServingRegionException( 2959 "The region " + encodedName + " was opening but not yet served. Opening is cancelled."); 2960 } 2961 } else if (previous == null) { 2962 LOG.info("Received CLOSE for {}", encodedName); 2963 } else if (Boolean.FALSE.equals(previous)) { 2964 LOG.info("Received CLOSE for the region: " + encodedName 2965 + ", which we are already trying to CLOSE, but not completed yet"); 2966 return true; 2967 } 2968 2969 if (actualRegion == null) { 2970 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 2971 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 2972 // The master deletes the znode when it receives this exception. 2973 throw new NotServingRegionException( 2974 "The region " + encodedName + " is not online, and is not opening."); 2975 } 2976 2977 CloseRegionHandler crh; 2978 final RegionInfo hri = actualRegion.getRegionInfo(); 2979 if (hri.isMetaRegion()) { 2980 crh = new CloseMetaHandler(this, this, hri, abort); 2981 } else { 2982 crh = new CloseRegionHandler(this, this, hri, abort, destination); 2983 } 2984 this.executorService.submit(crh); 2985 return true; 2986 } 2987 2988 /** 2989 * @return HRegion for the passed binary <code>regionName</code> or null if named region is not 2990 * member of the online regions. 2991 */ 2992 public HRegion getOnlineRegion(final byte[] regionName) { 2993 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 2994 return this.onlineRegions.get(encodedRegionName); 2995 } 2996 2997 @Override 2998 public HRegion getRegion(final String encodedRegionName) { 2999 return this.onlineRegions.get(encodedRegionName); 3000 } 3001 3002 @Override 3003 public boolean removeRegion(final HRegion r, ServerName destination) { 3004 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3005 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3006 if (destination != null) { 3007 long closeSeqNum = r.getMaxFlushedSeqId(); 3008 if (closeSeqNum == HConstants.NO_SEQNUM) { 3009 // No edits in WAL for this region; get the sequence number when the region was opened. 3010 closeSeqNum = r.getOpenSeqNum(); 3011 if (closeSeqNum == HConstants.NO_SEQNUM) { 3012 closeSeqNum = 0; 3013 } 3014 } 3015 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3016 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3017 if (selfMove) { 3018 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put( 3019 r.getRegionInfo().getEncodedName(), 3020 new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3021 } 3022 } 3023 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3024 configurationManager.deregisterObserver(r); 3025 return toReturn != null; 3026 } 3027 3028 /** 3029 * Protected Utility method for safely obtaining an HRegion handle. 3030 * @param regionName Name of online {@link HRegion} to return 3031 * @return {@link HRegion} for <code>regionName</code> 3032 */ 3033 protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { 3034 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3035 return getRegionByEncodedName(regionName, encodedRegionName); 3036 } 3037 3038 public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { 3039 return getRegionByEncodedName(null, encodedRegionName); 3040 } 3041 3042 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3043 throws NotServingRegionException { 3044 HRegion region = this.onlineRegions.get(encodedRegionName); 3045 if (region == null) { 3046 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3047 if (moveInfo != null) { 3048 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3049 } 3050 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3051 String regionNameStr = 3052 regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName); 3053 if (isOpening != null && isOpening) { 3054 throw new RegionOpeningException( 3055 "Region " + regionNameStr + " is opening on " + this.serverName); 3056 } 3057 throw new NotServingRegionException( 3058 "" + regionNameStr + " is not online on " + this.serverName); 3059 } 3060 return region; 3061 } 3062 3063 /** 3064 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't 3065 * already. 3066 * @param t Throwable 3067 * @param msg Message to log in error. Can be null. 3068 * @return Throwable converted to an IOE; methods can only let out IOEs. 3069 */ 3070 private Throwable cleanup(final Throwable t, final String msg) { 3071 // Don't log as error if NSRE; NSRE is 'normal' operation. 3072 if (t instanceof NotServingRegionException) { 3073 LOG.debug("NotServingRegionException; " + t.getMessage()); 3074 return t; 3075 } 3076 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3077 if (msg == null) { 3078 LOG.error("", e); 3079 } else { 3080 LOG.error(msg, e); 3081 } 3082 if (!rpcServices.checkOOME(t)) { 3083 checkFileSystem(); 3084 } 3085 return t; 3086 } 3087 3088 /** 3089 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3090 * @return Make <code>t</code> an IOE if it isn't already. 3091 */ 3092 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3093 return (t instanceof IOException ? (IOException) t 3094 : msg == null || msg.length() == 0 ? new IOException(t) 3095 : new IOException(msg, t)); 3096 } 3097 3098 /** 3099 * Checks to see if the file system is still accessible. If not, sets abortRequested and 3100 * stopRequested 3101 * @return false if file system is not available 3102 */ 3103 boolean checkFileSystem() { 3104 if (this.dataFsOk && this.dataFs != null) { 3105 try { 3106 FSUtils.checkFileSystemAvailable(this.dataFs); 3107 } catch (IOException e) { 3108 abort("File System not available", e); 3109 this.dataFsOk = false; 3110 } 3111 } 3112 return this.dataFsOk; 3113 } 3114 3115 @Override 3116 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3117 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3118 Address[] addr = new Address[favoredNodes.size()]; 3119 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3120 // it is a map of region name to Address[] 3121 for (int i = 0; i < favoredNodes.size(); i++) { 3122 addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); 3123 } 3124 regionFavoredNodesMap.put(encodedRegionName, addr); 3125 } 3126 3127 /** 3128 * Return the favored nodes for a region given its encoded name. Look at the comment around 3129 * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here. 3130 * @param encodedRegionName the encoded region name. 3131 * @return array of favored locations 3132 */ 3133 @Override 3134 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3135 return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); 3136 } 3137 3138 @Override 3139 public ServerNonceManager getNonceManager() { 3140 return this.nonceManager; 3141 } 3142 3143 private static class MovedRegionInfo { 3144 private final ServerName serverName; 3145 private final long seqNum; 3146 3147 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3148 this.serverName = serverName; 3149 this.seqNum = closeSeqNum; 3150 } 3151 3152 public ServerName getServerName() { 3153 return serverName; 3154 } 3155 3156 public long getSeqNum() { 3157 return seqNum; 3158 } 3159 } 3160 3161 /** 3162 * We need a timeout. If not there is a risk of giving a wrong information: this would double the 3163 * number of network calls instead of reducing them. 3164 */ 3165 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3166 3167 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, 3168 boolean selfMove) { 3169 if (selfMove) { 3170 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3171 return; 3172 } 3173 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" 3174 + closeSeqNum); 3175 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3176 } 3177 3178 void removeFromMovedRegions(String encodedName) { 3179 movedRegionInfoCache.invalidate(encodedName); 3180 } 3181 3182 @InterfaceAudience.Private 3183 public MovedRegionInfo getMovedRegion(String encodedRegionName) { 3184 return movedRegionInfoCache.getIfPresent(encodedRegionName); 3185 } 3186 3187 @InterfaceAudience.Private 3188 public int movedRegionCacheExpiredTime() { 3189 return TIMEOUT_REGION_MOVED; 3190 } 3191 3192 private String getMyEphemeralNodePath() { 3193 return zooKeeper.getZNodePaths().getRsPath(serverName); 3194 } 3195 3196 private boolean isHealthCheckerConfigured() { 3197 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3198 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3199 } 3200 3201 /** 3202 * @return the underlying {@link CompactSplit} for the servers 3203 */ 3204 public CompactSplit getCompactSplitThread() { 3205 return this.compactSplitThread; 3206 } 3207 3208 CoprocessorServiceResponse execRegionServerService( 3209 @SuppressWarnings("UnusedParameters") final RpcController controller, 3210 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3211 try { 3212 ServerRpcController serviceController = new ServerRpcController(); 3213 CoprocessorServiceCall call = serviceRequest.getCall(); 3214 String serviceName = call.getServiceName(); 3215 Service service = coprocessorServiceHandlers.get(serviceName); 3216 if (service == null) { 3217 throw new UnknownProtocolException(null, 3218 "No registered coprocessor executorService found for " + serviceName); 3219 } 3220 ServiceDescriptor serviceDesc = service.getDescriptorForType(); 3221 3222 String methodName = call.getMethodName(); 3223 MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName); 3224 if (methodDesc == null) { 3225 throw new UnknownProtocolException(service.getClass(), 3226 "Unknown method " + methodName + " called on executorService " + serviceName); 3227 } 3228 3229 Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3230 final Message.Builder responseBuilder = 3231 service.getResponsePrototype(methodDesc).newBuilderForType(); 3232 service.callMethod(methodDesc, serviceController, request, message -> { 3233 if (message != null) { 3234 responseBuilder.mergeFrom(message); 3235 } 3236 }); 3237 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3238 if (exception != null) { 3239 throw exception; 3240 } 3241 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3242 } catch (IOException ie) { 3243 throw new ServiceException(ie); 3244 } 3245 } 3246 3247 /** 3248 * May be null if this is a master which not carry table. 3249 * @return The block cache instance used by the regionserver. 3250 */ 3251 @Override 3252 public Optional<BlockCache> getBlockCache() { 3253 return Optional.ofNullable(this.blockCache); 3254 } 3255 3256 /** 3257 * May be null if this is a master which not carry table. 3258 * @return The cache for mob files used by the regionserver. 3259 */ 3260 @Override 3261 public Optional<MobFileCache> getMobFileCache() { 3262 return Optional.ofNullable(this.mobFileCache); 3263 } 3264 3265 /** 3266 * @return : Returns the ConfigurationManager object for testing purposes. 3267 */ 3268 ConfigurationManager getConfigurationManager() { 3269 return configurationManager; 3270 } 3271 3272 CacheEvictionStats clearRegionBlockCache(Region region) { 3273 long evictedBlocks = 0; 3274 3275 for (Store store : region.getStores()) { 3276 for (StoreFile hFile : store.getStorefiles()) { 3277 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3278 } 3279 } 3280 3281 return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build(); 3282 } 3283 3284 @Override 3285 public double getCompactionPressure() { 3286 double max = 0; 3287 for (Region region : onlineRegions.values()) { 3288 for (Store store : region.getStores()) { 3289 double normCount = store.getCompactionPressure(); 3290 if (normCount > max) { 3291 max = normCount; 3292 } 3293 } 3294 } 3295 return max; 3296 } 3297 3298 @Override 3299 public HeapMemoryManager getHeapMemoryManager() { 3300 return hMemManager; 3301 } 3302 3303 public MemStoreFlusher getMemStoreFlusher() { 3304 return cacheFlusher; 3305 } 3306 3307 /** 3308 * For testing 3309 * @return whether all wal roll request finished for this regionserver 3310 */ 3311 @InterfaceAudience.Private 3312 public boolean walRollRequestFinished() { 3313 return this.walRoller.walRollFinished(); 3314 } 3315 3316 @Override 3317 public ThroughputController getFlushThroughputController() { 3318 return flushThroughputController; 3319 } 3320 3321 @Override 3322 public double getFlushPressure() { 3323 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3324 // return 0 during RS initialization 3325 return 0.0; 3326 } 3327 return getRegionServerAccounting().getFlushPressure(); 3328 } 3329 3330 @Override 3331 public void onConfigurationChange(Configuration newConf) { 3332 ThroughputController old = this.flushThroughputController; 3333 if (old != null) { 3334 old.stop("configuration change"); 3335 } 3336 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3337 try { 3338 Superusers.initialize(newConf); 3339 } catch (IOException e) { 3340 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3341 } 3342 3343 // update region server coprocessor if the configuration has changed. 3344 if ( 3345 CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf, 3346 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY) 3347 ) { 3348 LOG.info("Update region server coprocessors because the configuration has changed"); 3349 this.rsHost = new RegionServerCoprocessorHost(this, newConf); 3350 } 3351 } 3352 3353 @Override 3354 public MetricsRegionServer getMetrics() { 3355 return metricsRegionServer; 3356 } 3357 3358 @Override 3359 public SecureBulkLoadManager getSecureBulkLoadManager() { 3360 return this.secureBulkLoadManager; 3361 } 3362 3363 @Override 3364 public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description, 3365 final Abortable abort) { 3366 final LockServiceClient client = 3367 new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator()); 3368 return client.regionLock(regionInfo, description, abort); 3369 } 3370 3371 @Override 3372 public void unassign(byte[] regionName) throws IOException { 3373 FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false)); 3374 } 3375 3376 @Override 3377 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3378 return this.rsSpaceQuotaManager; 3379 } 3380 3381 @Override 3382 public boolean reportFileArchivalForQuotas(TableName tableName, 3383 Collection<Entry<String, Long>> archivedFiles) { 3384 if (TEST_SKIP_REPORTING_TRANSITION) { 3385 return false; 3386 } 3387 RegionServerStatusService.BlockingInterface rss = rssStub; 3388 if (rss == null || rsSpaceQuotaManager == null) { 3389 // the current server could be stopping. 3390 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3391 return false; 3392 } 3393 try { 3394 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3395 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3396 rss.reportFileArchival(null, request); 3397 } catch (ServiceException se) { 3398 IOException ioe = ProtobufUtil.getRemoteException(se); 3399 if (ioe instanceof PleaseHoldException) { 3400 if (LOG.isTraceEnabled()) { 3401 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3402 + " This will be retried.", ioe); 3403 } 3404 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3405 return false; 3406 } 3407 if (rssStub == rss) { 3408 rssStub = null; 3409 } 3410 // re-create the stub if we failed to report the archival 3411 createRegionServerStatusStub(true); 3412 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3413 return false; 3414 } 3415 return true; 3416 } 3417 3418 void executeProcedure(long procId, RSProcedureCallable callable) { 3419 executorService.submit(new RSProcedureHandler(this, procId, callable)); 3420 } 3421 3422 public void remoteProcedureComplete(long procId, Throwable error) { 3423 procedureResultReporter.complete(procId, error); 3424 } 3425 3426 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3427 RegionServerStatusService.BlockingInterface rss; 3428 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 3429 for (;;) { 3430 rss = rssStub; 3431 if (rss != null) { 3432 break; 3433 } 3434 createRegionServerStatusStub(); 3435 } 3436 try { 3437 rss.reportProcedureDone(null, request); 3438 } catch (ServiceException se) { 3439 if (rssStub == rss) { 3440 rssStub = null; 3441 } 3442 throw ProtobufUtil.getRemoteException(se); 3443 } 3444 } 3445 3446 /** 3447 * Will ignore the open/close region procedures which already submitted or executed. When master 3448 * had unfinished open/close region procedure and restarted, new active master may send duplicate 3449 * open/close region request to regionserver. The open/close request is submitted to a thread pool 3450 * and execute. So first need a cache for submitted open/close region procedures. After the 3451 * open/close region request executed and report region transition succeed, cache it in executed 3452 * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3453 * transition succeed, master will not send the open/close region request to regionserver again. 3454 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3455 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See 3456 * HBASE-22404 for more details. 3457 * @param procId the id of the open/close region procedure 3458 * @return true if the procedure can be submitted. 3459 */ 3460 boolean submitRegionProcedure(long procId) { 3461 if (procId == -1) { 3462 return true; 3463 } 3464 // Ignore the region procedures which already submitted. 3465 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3466 if (previous != null) { 3467 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3468 return false; 3469 } 3470 // Ignore the region procedures which already executed. 3471 if (executedRegionProcedures.getIfPresent(procId) != null) { 3472 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3473 return false; 3474 } 3475 return true; 3476 } 3477 3478 /** 3479 * See {@link #submitRegionProcedure(long)}. 3480 * @param procId the id of the open/close region procedure 3481 */ 3482 public void finishRegionProcedure(long procId) { 3483 executedRegionProcedures.put(procId, procId); 3484 submittedRegionProcedures.remove(procId); 3485 } 3486 3487 /** 3488 * Force to terminate region server when abort timeout. 3489 */ 3490 private static class SystemExitWhenAbortTimeout extends TimerTask { 3491 3492 public SystemExitWhenAbortTimeout() { 3493 } 3494 3495 @Override 3496 public void run() { 3497 LOG.warn("Aborting region server timed out, terminating forcibly" 3498 + " and does not wait for any running shutdown hooks or finalizers to finish their work." 3499 + " Thread dump to stdout."); 3500 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3501 Runtime.getRuntime().halt(1); 3502 } 3503 } 3504 3505 @InterfaceAudience.Private 3506 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 3507 return compactedFileDischarger; 3508 } 3509 3510 /** 3511 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}} 3512 * @return pause time 3513 */ 3514 @InterfaceAudience.Private 3515 public long getRetryPauseTime() { 3516 return this.retryPauseTime; 3517 } 3518 3519 @Override 3520 public Optional<ServerName> getActiveMaster() { 3521 return Optional.ofNullable(masterAddressTracker.getMasterAddress()); 3522 } 3523 3524 @Override 3525 public List<ServerName> getBackupMasters() { 3526 return masterAddressTracker.getBackupMasters(); 3527 } 3528 3529 @Override 3530 public Iterator<ServerName> getBootstrapNodes() { 3531 return bootstrapNodeManager.getBootstrapNodes().iterator(); 3532 } 3533 3534 @Override 3535 public List<HRegionLocation> getMetaLocations() { 3536 return metaRegionLocationCache.getMetaRegionLocations(); 3537 } 3538 3539 @Override 3540 protected NamedQueueRecorder createNamedQueueRecord() { 3541 final boolean isOnlineLogProviderEnabled = conf.getBoolean( 3542 HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 3543 if (isOnlineLogProviderEnabled) { 3544 return NamedQueueRecorder.getInstance(conf); 3545 } else { 3546 return null; 3547 } 3548 } 3549 3550 @Override 3551 protected boolean clusterMode() { 3552 // this method will be called in the constructor of super class, so we can not return masterless 3553 // directly here, as it will always be false. 3554 return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 3555 } 3556 3557 @InterfaceAudience.Private 3558 public BrokenStoreFileCleaner getBrokenStoreFileCleaner() { 3559 return brokenStoreFileCleaner; 3560 } 3561 3562 @InterfaceAudience.Private 3563 public RSMobFileCleanerChore getRSMobFileCleanerChore() { 3564 return rsMobFileCleanerChore; 3565 } 3566 3567 RSSnapshotVerifier getRsSnapshotVerifier() { 3568 return rsSnapshotVerifier; 3569 } 3570 3571 @Override 3572 protected void stopChores() { 3573 shutdownChore(nonceManagerChore); 3574 shutdownChore(compactionChecker); 3575 shutdownChore(compactedFileDischarger); 3576 shutdownChore(periodicFlusher); 3577 shutdownChore(healthCheckChore); 3578 shutdownChore(executorStatusChore); 3579 shutdownChore(storefileRefresher); 3580 shutdownChore(fsUtilizationChore); 3581 shutdownChore(slowLogTableOpsChore); 3582 shutdownChore(brokenStoreFileCleaner); 3583 shutdownChore(rsMobFileCleanerChore); 3584 } 3585 3586 @Override 3587 public RegionReplicationBufferManager getRegionReplicationBufferManager() { 3588 return regionReplicationBufferManager; 3589 } 3590}