001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 024import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; 025 026import io.opentelemetry.api.trace.Span; 027import io.opentelemetry.api.trace.StatusCode; 028import io.opentelemetry.context.Scope; 029import java.io.IOException; 030import java.io.PrintWriter; 031import java.lang.management.MemoryUsage; 032import java.lang.reflect.Constructor; 033import java.net.InetSocketAddress; 034import java.time.Duration; 035import java.util.ArrayList; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.Comparator; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Map; 043import java.util.Map.Entry; 044import java.util.Objects; 045import java.util.Optional; 046import java.util.Set; 047import java.util.SortedMap; 048import java.util.Timer; 049import java.util.TimerTask; 050import java.util.TreeMap; 051import java.util.TreeSet; 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.ConcurrentMap; 054import java.util.concurrent.ConcurrentSkipListMap; 055import java.util.concurrent.ThreadLocalRandom; 056import java.util.concurrent.TimeUnit; 057import java.util.concurrent.atomic.AtomicBoolean; 058import java.util.concurrent.locks.ReentrantReadWriteLock; 059import java.util.stream.Collectors; 060import javax.management.MalformedObjectNameException; 061import javax.servlet.http.HttpServlet; 062import org.apache.commons.lang3.StringUtils; 063import org.apache.hadoop.conf.Configuration; 064import org.apache.hadoop.fs.FileSystem; 065import org.apache.hadoop.fs.Path; 066import org.apache.hadoop.hbase.Abortable; 067import org.apache.hadoop.hbase.CacheEvictionStats; 068import org.apache.hadoop.hbase.CallQueueTooBigException; 069import org.apache.hadoop.hbase.ClockOutOfSyncException; 070import org.apache.hadoop.hbase.DoNotRetryIOException; 071import org.apache.hadoop.hbase.ExecutorStatusChore; 072import org.apache.hadoop.hbase.HBaseConfiguration; 073import org.apache.hadoop.hbase.HBaseInterfaceAudience; 074import org.apache.hadoop.hbase.HBaseServerBase; 075import org.apache.hadoop.hbase.HConstants; 076import org.apache.hadoop.hbase.HDFSBlocksDistribution; 077import org.apache.hadoop.hbase.HRegionLocation; 078import org.apache.hadoop.hbase.HealthCheckChore; 079import org.apache.hadoop.hbase.MetaTableAccessor; 080import org.apache.hadoop.hbase.NotServingRegionException; 081import org.apache.hadoop.hbase.PleaseHoldException; 082import org.apache.hadoop.hbase.ScheduledChore; 083import org.apache.hadoop.hbase.ServerName; 084import org.apache.hadoop.hbase.Stoppable; 085import org.apache.hadoop.hbase.TableName; 086import org.apache.hadoop.hbase.YouAreDeadException; 087import org.apache.hadoop.hbase.ZNodeClearer; 088import org.apache.hadoop.hbase.client.ConnectionUtils; 089import org.apache.hadoop.hbase.client.RegionInfo; 090import org.apache.hadoop.hbase.client.RegionInfoBuilder; 091import org.apache.hadoop.hbase.client.locking.EntityLock; 092import org.apache.hadoop.hbase.client.locking.LockServiceClient; 093import org.apache.hadoop.hbase.conf.ConfigurationManager; 094import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 095import org.apache.hadoop.hbase.exceptions.RegionMovedException; 096import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 097import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 098import org.apache.hadoop.hbase.executor.ExecutorType; 099import org.apache.hadoop.hbase.http.InfoServer; 100import org.apache.hadoop.hbase.io.hfile.BlockCache; 101import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 102import org.apache.hadoop.hbase.io.hfile.HFile; 103import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 104import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 105import org.apache.hadoop.hbase.ipc.RpcClient; 106import org.apache.hadoop.hbase.ipc.RpcServer; 107import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 108import org.apache.hadoop.hbase.ipc.ServerRpcController; 109import org.apache.hadoop.hbase.log.HBaseMarkers; 110import org.apache.hadoop.hbase.mob.MobFileCache; 111import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; 112import org.apache.hadoop.hbase.monitoring.TaskMonitor; 113import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 114import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; 115import org.apache.hadoop.hbase.net.Address; 116import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 117import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 118import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 119import org.apache.hadoop.hbase.quotas.QuotaUtil; 120import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 121import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 122import org.apache.hadoop.hbase.quotas.RegionSize; 123import org.apache.hadoop.hbase.quotas.RegionSizeStore; 124import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 125import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 126import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 127import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 128import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 129import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 130import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 131import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 132import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet; 133import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; 134import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; 135import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 136import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 137import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 138import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 139import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 140import org.apache.hadoop.hbase.security.SecurityConstants; 141import org.apache.hadoop.hbase.security.Superusers; 142import org.apache.hadoop.hbase.security.User; 143import org.apache.hadoop.hbase.security.UserProvider; 144import org.apache.hadoop.hbase.trace.TraceUtil; 145import org.apache.hadoop.hbase.util.Bytes; 146import org.apache.hadoop.hbase.util.CompressionTest; 147import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 148import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 149import org.apache.hadoop.hbase.util.FSUtils; 150import org.apache.hadoop.hbase.util.FutureUtils; 151import org.apache.hadoop.hbase.util.JvmPauseMonitor; 152import org.apache.hadoop.hbase.util.Pair; 153import org.apache.hadoop.hbase.util.RetryCounter; 154import org.apache.hadoop.hbase.util.RetryCounterFactory; 155import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 156import org.apache.hadoop.hbase.util.Threads; 157import org.apache.hadoop.hbase.util.VersionInfo; 158import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 159import org.apache.hadoop.hbase.wal.WAL; 160import org.apache.hadoop.hbase.wal.WALFactory; 161import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 162import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 163import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 164import org.apache.hadoop.hbase.zookeeper.ZKUtil; 165import org.apache.hadoop.ipc.RemoteException; 166import org.apache.hadoop.util.ReflectionUtils; 167import org.apache.yetus.audience.InterfaceAudience; 168import org.apache.zookeeper.KeeperException; 169import org.slf4j.Logger; 170import org.slf4j.LoggerFactory; 171 172import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 173import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 174import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 175import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 176import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 177import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 178import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 179import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 180import org.apache.hbase.thirdparty.com.google.protobuf.Message; 181import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 182import org.apache.hbase.thirdparty.com.google.protobuf.Service; 183import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 184import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 185import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 186 187import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 188import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 218 219/** 220 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There 221 * are many HRegionServers in a single HBase deployment. 222 */ 223@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 224@SuppressWarnings({ "deprecation" }) 225public class HRegionServer extends HBaseServerBase<RSRpcServices> 226 implements RegionServerServices, LastSequenceId { 227 228 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 229 230 /** 231 * For testing only! Set to true to skip notifying region assignment to master . 232 */ 233 @InterfaceAudience.Private 234 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") 235 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 236 237 /** 238 * A map from RegionName to current action in progress. Boolean value indicates: true - if open 239 * region action in progress false - if close region action in progress 240 */ 241 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 242 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 243 244 /** 245 * Used to cache the open/close region procedures which already submitted. See 246 * {@link #submitRegionProcedure(long)}. 247 */ 248 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 249 /** 250 * Used to cache the open/close region procedures which already executed. See 251 * {@link #submitRegionProcedure(long)}. 252 */ 253 private final Cache<Long, Long> executedRegionProcedures = 254 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 255 256 /** 257 * Used to cache the moved-out regions 258 */ 259 private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder() 260 .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build(); 261 262 private MemStoreFlusher cacheFlusher; 263 264 private HeapMemoryManager hMemManager; 265 266 // Replication services. If no replication, this handler will be null. 267 private ReplicationSourceService replicationSourceHandler; 268 private ReplicationSinkService replicationSinkHandler; 269 private boolean sameReplicationSourceAndSink; 270 271 // Compactions 272 private CompactSplit compactSplitThread; 273 274 /** 275 * Map of regions currently being served by this region server. Key is the encoded region name. 276 * All access should be synchronized. 277 */ 278 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 279 /** 280 * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it 281 * need to be a ConcurrentHashMap? 282 */ 283 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 284 285 /** 286 * Map of encoded region names to the DataNode locations they should be hosted on We store the 287 * value as Address since InetSocketAddress is required by the HDFS API (create() that takes 288 * favored nodes as hints for placing file blocks). We could have used ServerName here as the 289 * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API 290 * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and 291 * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the 292 * conversion on demand from Address to InetSocketAddress will guarantee the resolution results 293 * will be fresh when we need it. 294 */ 295 private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 296 297 private LeaseManager leaseManager; 298 299 private volatile boolean dataFsOk; 300 301 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 302 // Default abort timeout is 1200 seconds for safe 303 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 304 // Will run this task when abort timeout 305 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 306 307 // A state before we go into stopped state. At this stage we're closing user 308 // space regions. 309 private boolean stopping = false; 310 private volatile boolean killed = false; 311 312 private final int threadWakeFrequency; 313 314 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 315 private final int compactionCheckFrequency; 316 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 317 private final int flushCheckFrequency; 318 319 // Stub to do region server status calls against the master. 320 private volatile RegionServerStatusService.BlockingInterface rssStub; 321 private volatile LockService.BlockingInterface lockStub; 322 // RPC client. Used to make the stub above that does region server status checking. 323 private RpcClient rpcClient; 324 325 private UncaughtExceptionHandler uncaughtExceptionHandler; 326 327 private JvmPauseMonitor pauseMonitor; 328 329 private RSSnapshotVerifier rsSnapshotVerifier; 330 331 /** region server process name */ 332 public static final String REGIONSERVER = "regionserver"; 333 334 private MetricsRegionServer metricsRegionServer; 335 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 336 337 /** 338 * Check for compactions requests. 339 */ 340 private ScheduledChore compactionChecker; 341 342 /** 343 * Check for flushes 344 */ 345 private ScheduledChore periodicFlusher; 346 347 private volatile WALFactory walFactory; 348 349 private LogRoller walRoller; 350 351 // A thread which calls reportProcedureDone 352 private RemoteProcedureResultReporter procedureResultReporter; 353 354 // flag set after we're done setting up server threads 355 final AtomicBoolean online = new AtomicBoolean(false); 356 357 // master address tracker 358 private final MasterAddressTracker masterAddressTracker; 359 360 // Log Splitting Worker 361 private SplitLogWorker splitLogWorker; 362 363 private final int shortOperationTimeout; 364 365 // Time to pause if master says 'please hold' 366 private final long retryPauseTime; 367 368 private final RegionServerAccounting regionServerAccounting; 369 370 private SlowLogTableOpsChore slowLogTableOpsChore = null; 371 372 // Block cache 373 private BlockCache blockCache; 374 // The cache for mob files 375 private MobFileCache mobFileCache; 376 377 /** The health check chore. */ 378 private HealthCheckChore healthCheckChore; 379 380 /** The Executor status collect chore. */ 381 private ExecutorStatusChore executorStatusChore; 382 383 /** The nonce manager chore. */ 384 private ScheduledChore nonceManagerChore; 385 386 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap(); 387 388 /** 389 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use 390 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead. 391 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a> 392 */ 393 @Deprecated 394 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 395 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 396 "hbase.regionserver.hostname.disable.master.reversedns"; 397 398 /** 399 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive. 400 * Exception will be thrown if both are used. 401 */ 402 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 403 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 404 "hbase.unsafe.regionserver.hostname.disable.master.reversedns"; 405 406 /** 407 * Unique identifier for the cluster we are a part of. 408 */ 409 private String clusterId; 410 411 // chore for refreshing store files for secondary regions 412 private StorefileRefresherChore storefileRefresher; 413 414 private volatile RegionServerCoprocessorHost rsHost; 415 416 private RegionServerProcedureManagerHost rspmHost; 417 418 private RegionServerRpcQuotaManager rsQuotaManager; 419 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 420 421 /** 422 * Nonce manager. Nonces are used to make operations like increment and append idempotent in the 423 * case where client doesn't receive the response from a successful operation and retries. We 424 * track the successful ops for some time via a nonce sent by client and handle duplicate 425 * operations (currently, by failing them; in future we might use MVCC to return result). Nonces 426 * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL 427 * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past 428 * records. If we don't read the records, we don't read and recover the nonces. Some WALs within 429 * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL 430 * recovery during normal region move, so nonces will not be transfered. We can have separate 431 * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path 432 * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a 433 * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part 434 * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't 435 * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be 436 * recovered during move. 437 */ 438 final ServerNonceManager nonceManager; 439 440 private BrokenStoreFileCleaner brokenStoreFileCleaner; 441 442 private RSMobFileCleanerChore rsMobFileCleanerChore; 443 444 @InterfaceAudience.Private 445 CompactedHFilesDischarger compactedFileDischarger; 446 447 private volatile ThroughputController flushThroughputController; 448 449 private SecureBulkLoadManager secureBulkLoadManager; 450 451 private FileSystemUtilizationChore fsUtilizationChore; 452 453 private BootstrapNodeManager bootstrapNodeManager; 454 455 /** 456 * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to 457 * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing 458 * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a 459 * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace 460 * {@link #TEST_SKIP_REPORTING_TRANSITION} ? 461 */ 462 private final boolean masterless; 463 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 464 465 /** regionserver codec list **/ 466 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 467 468 // A timer to shutdown the process if abort takes too long 469 private Timer abortMonitor; 470 471 private RegionReplicationBufferManager regionReplicationBufferManager; 472 473 /** 474 * Starts a HRegionServer at the default location. 475 * <p/> 476 * Don't start any services or managers in here in the Constructor. Defer till after we register 477 * with the Master as much as possible. See {@link #startServices}. 478 */ 479 public HRegionServer(final Configuration conf) throws IOException { 480 super(conf, "RegionServer"); // thread name 481 final Span span = TraceUtil.createSpan("HRegionServer.cxtor"); 482 try (Scope ignored = span.makeCurrent()) { 483 this.dataFsOk = true; 484 this.masterless = !clusterMode(); 485 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); 486 HFile.checkHFileVersion(this.conf); 487 checkCodecs(this.conf); 488 FSUtils.setupShortCircuitRead(this.conf); 489 490 // Disable usage of meta replicas in the regionserver 491 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 492 // Config'ed params 493 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 494 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 495 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 496 497 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 498 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 499 500 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 501 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 502 503 this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME, 504 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); 505 506 regionServerAccounting = new RegionServerAccounting(conf); 507 508 blockCache = BlockCacheFactory.createBlockCache(conf); 509 mobFileCache = new MobFileCache(conf); 510 511 rsSnapshotVerifier = new RSSnapshotVerifier(conf); 512 513 uncaughtExceptionHandler = 514 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 515 516 // If no master in cluster, skip trying to track one or look for a cluster status. 517 if (!this.masterless) { 518 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 519 masterAddressTracker.start(); 520 } else { 521 masterAddressTracker = null; 522 } 523 this.rpcServices.start(zooKeeper); 524 span.setStatus(StatusCode.OK); 525 } catch (Throwable t) { 526 // Make sure we log the exception. HRegionServer is often started via reflection and the 527 // cause of failed startup is lost. 528 TraceUtil.setError(span, t); 529 LOG.error("Failed construction RegionServer", t); 530 throw t; 531 } finally { 532 span.end(); 533 } 534 } 535 536 // HMaster should override this method to load the specific config for master 537 @Override 538 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 539 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); 540 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 541 if (!StringUtils.isBlank(hostname)) { 542 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " 543 + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " 544 + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " 545 + UNSAFE_RS_HOSTNAME_KEY + " is used"; 546 throw new IOException(msg); 547 } else { 548 return rpcServices.getSocketAddress().getHostName(); 549 } 550 } else { 551 return hostname; 552 } 553 } 554 555 @Override 556 protected void login(UserProvider user, String host) throws IOException { 557 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 558 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 559 } 560 561 @Override 562 protected String getProcessName() { 563 return REGIONSERVER; 564 } 565 566 @Override 567 protected boolean canCreateBaseZNode() { 568 return !clusterMode(); 569 } 570 571 @Override 572 protected boolean canUpdateTableDescriptor() { 573 return false; 574 } 575 576 @Override 577 protected boolean cacheTableDescriptor() { 578 return false; 579 } 580 581 protected RSRpcServices createRpcServices() throws IOException { 582 return new RSRpcServices(this); 583 } 584 585 @Override 586 protected void configureInfoServer(InfoServer infoServer) { 587 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 588 infoServer.setAttribute(REGIONSERVER, this); 589 } 590 591 @Override 592 protected Class<? extends HttpServlet> getDumpServlet() { 593 return RSDumpServlet.class; 594 } 595 596 /** 597 * Used by {@link RSDumpServlet} to generate debugging information. 598 */ 599 public void dumpRowLocks(final PrintWriter out) { 600 StringBuilder sb = new StringBuilder(); 601 for (HRegion region : getRegions()) { 602 if (region.getLockedRows().size() > 0) { 603 for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) { 604 sb.setLength(0); 605 sb.append(region.getTableDescriptor().getTableName()).append(",") 606 .append(region.getRegionInfo().getEncodedName()).append(","); 607 sb.append(rowLockContext.toString()); 608 out.println(sb); 609 } 610 } 611 } 612 } 613 614 @Override 615 public boolean registerService(Service instance) { 616 // No stacking of instances is allowed for a single executorService name 617 ServiceDescriptor serviceDesc = instance.getDescriptorForType(); 618 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 619 if (coprocessorServiceHandlers.containsKey(serviceName)) { 620 LOG.error("Coprocessor executorService " + serviceName 621 + " already registered, rejecting request from " + instance); 622 return false; 623 } 624 625 coprocessorServiceHandlers.put(serviceName, instance); 626 if (LOG.isDebugEnabled()) { 627 LOG.debug( 628 "Registered regionserver coprocessor executorService: executorService=" + serviceName); 629 } 630 return true; 631 } 632 633 /** 634 * Run test on configured codecs to make sure supporting libs are in place. 635 */ 636 private static void checkCodecs(final Configuration c) throws IOException { 637 // check to see if the codec list is available: 638 String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null); 639 if (codecs == null) { 640 return; 641 } 642 for (String codec : codecs) { 643 if (!CompressionTest.testCompression(codec)) { 644 throw new IOException( 645 "Compression codec " + codec + " not supported, aborting RS construction"); 646 } 647 } 648 } 649 650 public String getClusterId() { 651 return this.clusterId; 652 } 653 654 /** 655 * All initialization needed before we go register with Master.<br> 656 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 657 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 658 */ 659 private void preRegistrationInitialization() { 660 final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization"); 661 try (Scope ignored = span.makeCurrent()) { 662 initializeZooKeeper(); 663 setupClusterConnection(); 664 bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker); 665 regionReplicationBufferManager = new RegionReplicationBufferManager(this); 666 // Setup RPC client for master communication 667 this.rpcClient = asyncClusterConnection.getRpcClient(); 668 span.setStatus(StatusCode.OK); 669 } catch (Throwable t) { 670 // Call stop if error or process will stick around for ever since server 671 // puts up non-daemon threads. 672 TraceUtil.setError(span, t); 673 this.rpcServices.stop(); 674 abort("Initialization of RS failed. Hence aborting RS.", t); 675 } finally { 676 span.end(); 677 } 678 } 679 680 /** 681 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 682 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 683 * <p> 684 * Finally open long-living server short-circuit connection. 685 */ 686 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 687 justification = "cluster Id znode read would give us correct response") 688 private void initializeZooKeeper() throws IOException, InterruptedException { 689 // Nothing to do in here if no Master in the mix. 690 if (this.masterless) { 691 return; 692 } 693 694 // Create the master address tracker, register with zk, and start it. Then 695 // block until a master is available. No point in starting up if no master 696 // running. 697 blockAndCheckIfStopped(this.masterAddressTracker); 698 699 // Wait on cluster being up. Master will set this flag up in zookeeper 700 // when ready. 701 blockAndCheckIfStopped(this.clusterStatusTracker); 702 703 // If we are HMaster then the cluster id should have already been set. 704 if (clusterId == null) { 705 // Retrieve clusterId 706 // Since cluster status is now up 707 // ID should have already been set by HMaster 708 try { 709 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 710 if (clusterId == null) { 711 this.abort("Cluster ID has not been set"); 712 } 713 LOG.info("ClusterId : " + clusterId); 714 } catch (KeeperException e) { 715 this.abort("Failed to retrieve Cluster ID", e); 716 } 717 } 718 719 if (isStopped() || isAborted()) { 720 return; // No need for further initialization 721 } 722 723 // watch for snapshots and other procedures 724 try { 725 rspmHost = new RegionServerProcedureManagerHost(); 726 rspmHost.loadProcedures(conf); 727 rspmHost.initialize(this); 728 } catch (KeeperException e) { 729 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 730 } 731 } 732 733 /** 734 * Utilty method to wait indefinitely on a znode availability while checking if the region server 735 * is shut down 736 * @param tracker znode tracker to use 737 * @throws IOException any IO exception, plus if the RS is stopped 738 * @throws InterruptedException if the waiting thread is interrupted 739 */ 740 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 741 throws IOException, InterruptedException { 742 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 743 if (this.stopped) { 744 throw new IOException("Received the shutdown message while waiting."); 745 } 746 } 747 } 748 749 /** Returns True if the cluster is up. */ 750 @Override 751 public boolean isClusterUp() { 752 return this.masterless 753 || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 754 } 755 756 /** 757 * The HRegionServer sticks in this loop until closed. 758 */ 759 @Override 760 public void run() { 761 if (isStopped()) { 762 LOG.info("Skipping run; stopped"); 763 return; 764 } 765 try { 766 // Do pre-registration initializations; zookeeper, lease threads, etc. 767 preRegistrationInitialization(); 768 } catch (Throwable e) { 769 abort("Fatal exception during initialization", e); 770 } 771 772 try { 773 if (!isStopped() && !isAborted()) { 774 installShutdownHook(); 775 // Initialize the RegionServerCoprocessorHost now that our ephemeral 776 // node was created, in case any coprocessors want to use ZooKeeper 777 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 778 779 // Try and register with the Master; tell it we are here. Break if server is stopped or 780 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 781 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 782 // come up. 783 LOG.debug("About to register with Master."); 784 TraceUtil.trace(() -> { 785 RetryCounterFactory rcf = 786 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 787 RetryCounter rc = rcf.create(); 788 while (keepLooping()) { 789 RegionServerStartupResponse w = reportForDuty(); 790 if (w == null) { 791 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 792 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 793 this.sleeper.sleep(sleepTime); 794 } else { 795 handleReportForDutyResponse(w); 796 break; 797 } 798 } 799 }, "HRegionServer.registerWithMaster"); 800 } 801 802 if (!isStopped() && isHealthy()) { 803 TraceUtil.trace(() -> { 804 // start the snapshot handler and other procedure handlers, 805 // since the server is ready to run 806 if (this.rspmHost != null) { 807 this.rspmHost.start(); 808 } 809 // Start the Quota Manager 810 if (this.rsQuotaManager != null) { 811 rsQuotaManager.start(getRpcServer().getScheduler()); 812 } 813 if (this.rsSpaceQuotaManager != null) { 814 this.rsSpaceQuotaManager.start(); 815 } 816 }, "HRegionServer.startup"); 817 } 818 819 // We registered with the Master. Go into run mode. 820 long lastMsg = EnvironmentEdgeManager.currentTime(); 821 long oldRequestCount = -1; 822 // The main run loop. 823 while (!isStopped() && isHealthy()) { 824 if (!isClusterUp()) { 825 if (onlineRegions.isEmpty()) { 826 stop("Exiting; cluster shutdown set and not carrying any regions"); 827 } else if (!this.stopping) { 828 this.stopping = true; 829 LOG.info("Closing user regions"); 830 closeUserRegions(isAborted()); 831 } else { 832 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 833 if (allUserRegionsOffline) { 834 // Set stopped if no more write requests tp meta tables 835 // since last time we went around the loop. Any open 836 // meta regions will be closed on our way out. 837 if (oldRequestCount == getWriteRequestCount()) { 838 stop("Stopped; only catalog regions remaining online"); 839 break; 840 } 841 oldRequestCount = getWriteRequestCount(); 842 } else { 843 // Make sure all regions have been closed -- some regions may 844 // have not got it because we were splitting at the time of 845 // the call to closeUserRegions. 846 closeUserRegions(this.abortRequested.get()); 847 } 848 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 849 } 850 } 851 long now = EnvironmentEdgeManager.currentTime(); 852 if ((now - lastMsg) >= msgInterval) { 853 tryRegionServerReport(lastMsg, now); 854 lastMsg = EnvironmentEdgeManager.currentTime(); 855 } 856 if (!isStopped() && !isAborted()) { 857 this.sleeper.sleep(); 858 } 859 } // for 860 } catch (Throwable t) { 861 if (!rpcServices.checkOOME(t)) { 862 String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: "; 863 abort(prefix + t.getMessage(), t); 864 } 865 } 866 867 final Span span = TraceUtil.createSpan("HRegionServer exiting main loop"); 868 try (Scope ignored = span.makeCurrent()) { 869 if (this.leaseManager != null) { 870 this.leaseManager.closeAfterLeasesExpire(); 871 } 872 if (this.splitLogWorker != null) { 873 splitLogWorker.stop(); 874 } 875 stopInfoServer(); 876 // Send cache a shutdown. 877 if (blockCache != null) { 878 blockCache.shutdown(); 879 } 880 if (mobFileCache != null) { 881 mobFileCache.shutdown(); 882 } 883 884 // Send interrupts to wake up threads if sleeping so they notice shutdown. 885 // TODO: Should we check they are alive? If OOME could have exited already 886 if (this.hMemManager != null) { 887 this.hMemManager.stop(); 888 } 889 if (this.cacheFlusher != null) { 890 this.cacheFlusher.interruptIfNecessary(); 891 } 892 if (this.compactSplitThread != null) { 893 this.compactSplitThread.interruptIfNecessary(); 894 } 895 896 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 897 if (rspmHost != null) { 898 rspmHost.stop(this.abortRequested.get() || this.killed); 899 } 900 901 if (this.killed) { 902 // Just skip out w/o closing regions. Used when testing. 903 } else if (abortRequested.get()) { 904 if (this.dataFsOk) { 905 closeUserRegions(abortRequested.get()); // Don't leave any open file handles 906 } 907 LOG.info("aborting server " + this.serverName); 908 } else { 909 closeUserRegions(abortRequested.get()); 910 LOG.info("stopping server " + this.serverName); 911 } 912 regionReplicationBufferManager.stop(); 913 closeClusterConnection(); 914 // Closing the compactSplit thread before closing meta regions 915 if (!this.killed && containsMetaTableRegions()) { 916 if (!abortRequested.get() || this.dataFsOk) { 917 if (this.compactSplitThread != null) { 918 this.compactSplitThread.join(); 919 this.compactSplitThread = null; 920 } 921 closeMetaTableRegions(abortRequested.get()); 922 } 923 } 924 925 if (!this.killed && this.dataFsOk) { 926 waitOnAllRegionsToClose(abortRequested.get()); 927 LOG.info("stopping server " + this.serverName + "; all regions closed."); 928 } 929 930 // Stop the quota manager 931 if (rsQuotaManager != null) { 932 rsQuotaManager.stop(); 933 } 934 if (rsSpaceQuotaManager != null) { 935 rsSpaceQuotaManager.stop(); 936 rsSpaceQuotaManager = null; 937 } 938 939 // flag may be changed when closing regions throws exception. 940 if (this.dataFsOk) { 941 shutdownWAL(!abortRequested.get()); 942 } 943 944 // Make sure the proxy is down. 945 if (this.rssStub != null) { 946 this.rssStub = null; 947 } 948 if (this.lockStub != null) { 949 this.lockStub = null; 950 } 951 if (this.rpcClient != null) { 952 this.rpcClient.close(); 953 } 954 if (this.leaseManager != null) { 955 this.leaseManager.close(); 956 } 957 if (this.pauseMonitor != null) { 958 this.pauseMonitor.stop(); 959 } 960 961 if (!killed) { 962 stopServiceThreads(); 963 } 964 965 if (this.rpcServices != null) { 966 this.rpcServices.stop(); 967 } 968 969 try { 970 deleteMyEphemeralNode(); 971 } catch (KeeperException.NoNodeException nn) { 972 // pass 973 } catch (KeeperException e) { 974 LOG.warn("Failed deleting my ephemeral node", e); 975 } 976 // We may have failed to delete the znode at the previous step, but 977 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 978 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 979 980 closeZooKeeper(); 981 closeTableDescriptors(); 982 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 983 span.setStatus(StatusCode.OK); 984 } finally { 985 span.end(); 986 } 987 } 988 989 private boolean containsMetaTableRegions() { 990 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 991 } 992 993 private boolean areAllUserRegionsOffline() { 994 if (getNumberOfOnlineRegions() > 2) { 995 return false; 996 } 997 boolean allUserRegionsOffline = true; 998 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 999 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1000 allUserRegionsOffline = false; 1001 break; 1002 } 1003 } 1004 return allUserRegionsOffline; 1005 } 1006 1007 /** Returns Current write count for all online regions. */ 1008 private long getWriteRequestCount() { 1009 long writeCount = 0; 1010 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1011 writeCount += e.getValue().getWriteRequestsCount(); 1012 } 1013 return writeCount; 1014 } 1015 1016 @InterfaceAudience.Private 1017 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1018 throws IOException { 1019 RegionServerStatusService.BlockingInterface rss = rssStub; 1020 if (rss == null) { 1021 // the current server could be stopping. 1022 return; 1023 } 1024 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1025 final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport"); 1026 try (Scope ignored = span.makeCurrent()) { 1027 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1028 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1029 request.setLoad(sl); 1030 rss.regionServerReport(null, request.build()); 1031 span.setStatus(StatusCode.OK); 1032 } catch (ServiceException se) { 1033 IOException ioe = ProtobufUtil.getRemoteException(se); 1034 if (ioe instanceof YouAreDeadException) { 1035 // This will be caught and handled as a fatal error in run() 1036 TraceUtil.setError(span, ioe); 1037 throw ioe; 1038 } 1039 if (rssStub == rss) { 1040 rssStub = null; 1041 } 1042 TraceUtil.setError(span, se); 1043 // Couldn't connect to the master, get location from zk and reconnect 1044 // Method blocks until new master is found or we are stopped 1045 createRegionServerStatusStub(true); 1046 } finally { 1047 span.end(); 1048 } 1049 } 1050 1051 /** 1052 * Reports the given map of Regions and their size on the filesystem to the active Master. 1053 * @param regionSizeStore The store containing region sizes 1054 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1055 */ 1056 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1057 RegionServerStatusService.BlockingInterface rss = rssStub; 1058 if (rss == null) { 1059 // the current server could be stopping. 1060 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1061 return true; 1062 } 1063 try { 1064 buildReportAndSend(rss, regionSizeStore); 1065 } catch (ServiceException se) { 1066 IOException ioe = ProtobufUtil.getRemoteException(se); 1067 if (ioe instanceof PleaseHoldException) { 1068 LOG.trace("Failed to report region sizes to Master because it is initializing." 1069 + " This will be retried.", ioe); 1070 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1071 return true; 1072 } 1073 if (rssStub == rss) { 1074 rssStub = null; 1075 } 1076 createRegionServerStatusStub(true); 1077 if (ioe instanceof DoNotRetryIOException) { 1078 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1079 if (doNotRetryEx.getCause() != null) { 1080 Throwable t = doNotRetryEx.getCause(); 1081 if (t instanceof UnsupportedOperationException) { 1082 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1083 return false; 1084 } 1085 } 1086 } 1087 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1088 } 1089 return true; 1090 } 1091 1092 /** 1093 * Builds the region size report and sends it to the master. Upon successful sending of the 1094 * report, the region sizes that were sent are marked as sent. 1095 * @param rss The stub to send to the Master 1096 * @param regionSizeStore The store containing region sizes 1097 */ 1098 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1099 RegionSizeStore regionSizeStore) throws ServiceException { 1100 RegionSpaceUseReportRequest request = 1101 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1102 rss.reportRegionSpaceUse(null, request); 1103 // Record the number of size reports sent 1104 if (metricsRegionServer != null) { 1105 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1106 } 1107 } 1108 1109 /** 1110 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1111 * @param regionSizes The size in bytes of regions 1112 * @return The corresponding protocol buffer message. 1113 */ 1114 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1115 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1116 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1117 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1118 } 1119 return request.build(); 1120 } 1121 1122 /** 1123 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf 1124 * message. 1125 * @param regionInfo The RegionInfo 1126 * @param sizeInBytes The size in bytes of the Region 1127 * @return The protocol buffer 1128 */ 1129 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1130 return RegionSpaceUse.newBuilder() 1131 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1132 .setRegionSize(Objects.requireNonNull(sizeInBytes)).build(); 1133 } 1134 1135 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1136 throws IOException { 1137 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1138 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1139 // the wrapper to compute those numbers in one place. 1140 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1141 // Instead they should be stored in an HBase table so that external visibility into HBase is 1142 // improved; Additionally the load balancer will be able to take advantage of a more complete 1143 // history. 1144 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1145 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1146 long usedMemory = -1L; 1147 long maxMemory = -1L; 1148 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1149 if (usage != null) { 1150 usedMemory = usage.getUsed(); 1151 maxMemory = usage.getMax(); 1152 } 1153 1154 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1155 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1156 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1157 serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024)); 1158 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1159 serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount()); 1160 serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount()); 1161 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1162 Builder coprocessorBuilder = Coprocessor.newBuilder(); 1163 for (String coprocessor : coprocessors) { 1164 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1165 } 1166 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1167 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1168 for (HRegion region : regions) { 1169 if (region.getCoprocessorHost() != null) { 1170 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1171 for (String regionCoprocessor : regionCoprocessors) { 1172 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1173 } 1174 } 1175 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1176 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1177 .getCoprocessors()) { 1178 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1179 } 1180 } 1181 serverLoad.setReportStartTime(reportStartTime); 1182 serverLoad.setReportEndTime(reportEndTime); 1183 if (this.infoServer != null) { 1184 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1185 } else { 1186 serverLoad.setInfoServerPort(-1); 1187 } 1188 MetricsUserAggregateSource userSource = 1189 metricsRegionServer.getMetricsUserAggregate().getSource(); 1190 if (userSource != null) { 1191 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1192 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1193 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1194 } 1195 } 1196 1197 if (sameReplicationSourceAndSink && replicationSourceHandler != null) { 1198 // always refresh first to get the latest value 1199 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1200 if (rLoad != null) { 1201 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1202 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1203 .getReplicationLoadSourceEntries()) { 1204 serverLoad.addReplLoadSource(rLS); 1205 } 1206 } 1207 } else { 1208 if (replicationSourceHandler != null) { 1209 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1210 if (rLoad != null) { 1211 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1212 .getReplicationLoadSourceEntries()) { 1213 serverLoad.addReplLoadSource(rLS); 1214 } 1215 } 1216 } 1217 if (replicationSinkHandler != null) { 1218 ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad(); 1219 if (rLoad != null) { 1220 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1221 } 1222 } 1223 } 1224 1225 TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask 1226 .newBuilder().setDescription(task.getDescription()) 1227 .setStatus(task.getStatus() != null ? task.getStatus() : "") 1228 .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) 1229 .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build())); 1230 1231 return serverLoad.build(); 1232 } 1233 1234 private String getOnlineRegionsAsPrintableString() { 1235 StringBuilder sb = new StringBuilder(); 1236 for (Region r : this.onlineRegions.values()) { 1237 if (sb.length() > 0) { 1238 sb.append(", "); 1239 } 1240 sb.append(r.getRegionInfo().getEncodedName()); 1241 } 1242 return sb.toString(); 1243 } 1244 1245 /** 1246 * Wait on regions close. 1247 */ 1248 private void waitOnAllRegionsToClose(final boolean abort) { 1249 // Wait till all regions are closed before going out. 1250 int lastCount = -1; 1251 long previousLogTime = 0; 1252 Set<String> closedRegions = new HashSet<>(); 1253 boolean interrupted = false; 1254 try { 1255 while (!onlineRegions.isEmpty()) { 1256 int count = getNumberOfOnlineRegions(); 1257 // Only print a message if the count of regions has changed. 1258 if (count != lastCount) { 1259 // Log every second at most 1260 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 1261 previousLogTime = EnvironmentEdgeManager.currentTime(); 1262 lastCount = count; 1263 LOG.info("Waiting on " + count + " regions to close"); 1264 // Only print out regions still closing if a small number else will 1265 // swamp the log. 1266 if (count < 10 && LOG.isDebugEnabled()) { 1267 LOG.debug("Online Regions=" + this.onlineRegions); 1268 } 1269 } 1270 } 1271 // Ensure all user regions have been sent a close. Use this to 1272 // protect against the case where an open comes in after we start the 1273 // iterator of onlineRegions to close all user regions. 1274 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1275 RegionInfo hri = e.getValue().getRegionInfo(); 1276 if ( 1277 !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) 1278 && !closedRegions.contains(hri.getEncodedName()) 1279 ) { 1280 closedRegions.add(hri.getEncodedName()); 1281 // Don't update zk with this close transition; pass false. 1282 closeRegionIgnoreErrors(hri, abort); 1283 } 1284 } 1285 // No regions in RIT, we could stop waiting now. 1286 if (this.regionsInTransitionInRS.isEmpty()) { 1287 if (!onlineRegions.isEmpty()) { 1288 LOG.info("We were exiting though online regions are not empty," 1289 + " because some regions failed closing"); 1290 } 1291 break; 1292 } else { 1293 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream() 1294 .map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1295 } 1296 if (sleepInterrupted(200)) { 1297 interrupted = true; 1298 } 1299 } 1300 } finally { 1301 if (interrupted) { 1302 Thread.currentThread().interrupt(); 1303 } 1304 } 1305 } 1306 1307 private static boolean sleepInterrupted(long millis) { 1308 boolean interrupted = false; 1309 try { 1310 Thread.sleep(millis); 1311 } catch (InterruptedException e) { 1312 LOG.warn("Interrupted while sleeping"); 1313 interrupted = true; 1314 } 1315 return interrupted; 1316 } 1317 1318 private void shutdownWAL(final boolean close) { 1319 if (this.walFactory != null) { 1320 try { 1321 if (close) { 1322 walFactory.close(); 1323 } else { 1324 walFactory.shutdown(); 1325 } 1326 } catch (Throwable e) { 1327 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1328 LOG.error("Shutdown / close of WAL failed: " + e); 1329 LOG.debug("Shutdown / close exception details:", e); 1330 } 1331 } 1332 } 1333 1334 /** 1335 * Run init. Sets up wal and starts up all server threads. 1336 * @param c Extra configuration. 1337 */ 1338 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1339 throws IOException { 1340 try { 1341 boolean updateRootDir = false; 1342 for (NameStringPair e : c.getMapEntriesList()) { 1343 String key = e.getName(); 1344 // The hostname the master sees us as. 1345 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1346 String hostnameFromMasterPOV = e.getValue(); 1347 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, 1348 rpcServices.getSocketAddress().getPort(), this.startcode); 1349 if ( 1350 !StringUtils.isBlank(useThisHostnameInstead) 1351 && !hostnameFromMasterPOV.equals(useThisHostnameInstead) 1352 ) { 1353 String msg = "Master passed us a different hostname to use; was=" 1354 + this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV; 1355 LOG.error(msg); 1356 throw new IOException(msg); 1357 } 1358 if ( 1359 StringUtils.isBlank(useThisHostnameInstead) 1360 && !hostnameFromMasterPOV.equals(rpcServices.getSocketAddress().getHostName()) 1361 ) { 1362 String msg = "Master passed us a different hostname to use; was=" 1363 + rpcServices.getSocketAddress().getHostName() + ", but now=" + hostnameFromMasterPOV; 1364 LOG.error(msg); 1365 } 1366 continue; 1367 } 1368 1369 String value = e.getValue(); 1370 if (key.equals(HConstants.HBASE_DIR)) { 1371 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1372 updateRootDir = true; 1373 } 1374 } 1375 1376 if (LOG.isDebugEnabled()) { 1377 LOG.debug("Config from master: " + key + "=" + value); 1378 } 1379 this.conf.set(key, value); 1380 } 1381 // Set our ephemeral znode up in zookeeper now we have a name. 1382 createMyEphemeralNode(); 1383 1384 if (updateRootDir) { 1385 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1386 initializeFileSystem(); 1387 } 1388 1389 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1390 // config param for task trackers, but we can piggyback off of it. 1391 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1392 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1393 } 1394 1395 // Save it in a file, this will allow to see if we crash 1396 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1397 1398 // This call sets up an initialized replication and WAL. Later we start it up. 1399 setupWALAndReplication(); 1400 // Init in here rather than in constructor after thread name has been set 1401 final MetricsTable metricsTable = 1402 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1403 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1404 this.metricsRegionServer = 1405 new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable); 1406 // Now that we have a metrics source, start the pause monitor 1407 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1408 pauseMonitor.start(); 1409 1410 // There is a rare case where we do NOT want services to start. Check config. 1411 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1412 startServices(); 1413 } 1414 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1415 // or make sense of it. 1416 startReplicationService(); 1417 1418 // Set up ZK 1419 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress() 1420 + ", sessionid=0x" 1421 + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1422 1423 // Wake up anyone waiting for this server to online 1424 synchronized (online) { 1425 online.set(true); 1426 online.notifyAll(); 1427 } 1428 } catch (Throwable e) { 1429 stop("Failed initialization"); 1430 throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed"); 1431 } finally { 1432 sleeper.skipSleepCycle(); 1433 } 1434 } 1435 1436 private void startHeapMemoryManager() { 1437 if (this.blockCache != null) { 1438 this.hMemManager = 1439 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1440 this.hMemManager.start(getChoreService()); 1441 } 1442 } 1443 1444 private void createMyEphemeralNode() throws KeeperException { 1445 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1446 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1447 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1448 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1449 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1450 } 1451 1452 private void deleteMyEphemeralNode() throws KeeperException { 1453 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1454 } 1455 1456 @Override 1457 public RegionServerAccounting getRegionServerAccounting() { 1458 return regionServerAccounting; 1459 } 1460 1461 // Round the size with KB or MB. 1462 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1 1463 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See 1464 // HBASE-26340 for more details on why this is important. 1465 private static int roundSize(long sizeInByte, int sizeUnit) { 1466 if (sizeInByte == 0) { 1467 return 0; 1468 } else if (sizeInByte < sizeUnit) { 1469 return 1; 1470 } else { 1471 return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE); 1472 } 1473 } 1474 1475 /** 1476 * @param r Region to get RegionLoad for. 1477 * @param regionLoadBldr the RegionLoad.Builder, can be null 1478 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1479 * @return RegionLoad instance. 1480 */ 1481 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1482 RegionSpecifier.Builder regionSpecifier) throws IOException { 1483 byte[] name = r.getRegionInfo().getRegionName(); 1484 int stores = 0; 1485 int storefiles = 0; 1486 int storeRefCount = 0; 1487 int maxCompactedStoreFileRefCount = 0; 1488 long storeUncompressedSize = 0L; 1489 long storefileSize = 0L; 1490 long storefileIndexSize = 0L; 1491 long rootLevelIndexSize = 0L; 1492 long totalStaticIndexSize = 0L; 1493 long totalStaticBloomSize = 0L; 1494 long totalCompactingKVs = 0L; 1495 long currentCompactedKVs = 0L; 1496 List<HStore> storeList = r.getStores(); 1497 stores += storeList.size(); 1498 for (HStore store : storeList) { 1499 storefiles += store.getStorefilesCount(); 1500 int currentStoreRefCount = store.getStoreRefCount(); 1501 storeRefCount += currentStoreRefCount; 1502 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1503 maxCompactedStoreFileRefCount = 1504 Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); 1505 storeUncompressedSize += store.getStoreSizeUncompressed(); 1506 storefileSize += store.getStorefilesSize(); 1507 // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1508 storefileIndexSize += store.getStorefilesRootLevelIndexSize(); 1509 CompactionProgress progress = store.getCompactionProgress(); 1510 if (progress != null) { 1511 totalCompactingKVs += progress.getTotalCompactingKVs(); 1512 currentCompactedKVs += progress.currentCompactedKVs; 1513 } 1514 rootLevelIndexSize += store.getStorefilesRootLevelIndexSize(); 1515 totalStaticIndexSize += store.getTotalStaticIndexSize(); 1516 totalStaticBloomSize += store.getTotalStaticBloomSize(); 1517 } 1518 1519 int unitMB = 1024 * 1024; 1520 int unitKB = 1024; 1521 1522 int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); 1523 int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); 1524 int storefileSizeMB = roundSize(storefileSize, unitMB); 1525 int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB); 1526 int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); 1527 int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); 1528 int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); 1529 1530 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); 1531 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); 1532 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname()); 1533 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); 1534 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); 1535 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); 1536 if (regionLoadBldr == null) { 1537 regionLoadBldr = RegionLoad.newBuilder(); 1538 } 1539 if (regionSpecifier == null) { 1540 regionSpecifier = RegionSpecifier.newBuilder(); 1541 } 1542 1543 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1544 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1545 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores) 1546 .setStorefiles(storefiles).setStoreRefCount(storeRefCount) 1547 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1548 .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB) 1549 .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB) 1550 .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1551 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1552 .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount()) 1553 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1554 .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs) 1555 .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality) 1556 .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) 1557 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) 1558 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) 1559 .setLastMajorCompactionTs(r.getOldestHfileTs(true)); 1560 r.setCompleteSequenceId(regionLoadBldr); 1561 return regionLoadBldr.build(); 1562 } 1563 1564 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1565 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1566 userLoadBldr.setUserName(user); 1567 userSource.getClientMetrics().values().stream() 1568 .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1569 .setHostName(clientMetrics.getHostName()) 1570 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1571 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1572 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) 1573 .forEach(userLoadBldr::addClientMetrics); 1574 return userLoadBldr.build(); 1575 } 1576 1577 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1578 HRegion r = onlineRegions.get(encodedRegionName); 1579 return r != null ? createRegionLoad(r, null, null) : null; 1580 } 1581 1582 /** 1583 * Inner class that runs on a long period checking if regions need compaction. 1584 */ 1585 private static class CompactionChecker extends ScheduledChore { 1586 private final HRegionServer instance; 1587 private final int majorCompactPriority; 1588 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1589 // Iteration is 1-based rather than 0-based so we don't check for compaction 1590 // immediately upon region server startup 1591 private long iteration = 1; 1592 1593 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1594 super("CompactionChecker", stopper, sleepTime); 1595 this.instance = h; 1596 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1597 1598 /* 1599 * MajorCompactPriority is configurable. If not set, the compaction will use default priority. 1600 */ 1601 this.majorCompactPriority = this.instance.conf 1602 .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); 1603 } 1604 1605 @Override 1606 protected void chore() { 1607 for (Region r : this.instance.onlineRegions.values()) { 1608 // Skip compaction if region is read only 1609 if (r == null || r.isReadOnly()) { 1610 continue; 1611 } 1612 1613 HRegion hr = (HRegion) r; 1614 for (HStore s : hr.stores.values()) { 1615 try { 1616 long multiplier = s.getCompactionCheckMultiplier(); 1617 assert multiplier > 0; 1618 if (iteration % multiplier != 0) { 1619 continue; 1620 } 1621 if (s.needsCompaction()) { 1622 // Queue a compaction. Will recognize if major is needed. 1623 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1624 getName() + " requests compaction"); 1625 } else if (s.shouldPerformMajorCompaction()) { 1626 s.triggerMajorCompaction(); 1627 if ( 1628 majorCompactPriority == DEFAULT_PRIORITY 1629 || majorCompactPriority > hr.getCompactPriority() 1630 ) { 1631 this.instance.compactSplitThread.requestCompaction(hr, s, 1632 getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, 1633 CompactionLifeCycleTracker.DUMMY, null); 1634 } else { 1635 this.instance.compactSplitThread.requestCompaction(hr, s, 1636 getName() + " requests major compaction; use configured priority", 1637 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1638 } 1639 } 1640 } catch (IOException e) { 1641 LOG.warn("Failed major compaction check on " + r, e); 1642 } 1643 } 1644 } 1645 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1646 } 1647 } 1648 1649 private static class PeriodicMemStoreFlusher extends ScheduledChore { 1650 private final HRegionServer server; 1651 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1652 private final static int MIN_DELAY_TIME = 0; // millisec 1653 private final long rangeOfDelayMs; 1654 1655 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1656 super("MemstoreFlusherChore", server, cacheFlushInterval); 1657 this.server = server; 1658 1659 final long configuredRangeOfDelay = server.getConfiguration() 1660 .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 1661 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 1662 } 1663 1664 @Override 1665 protected void chore() { 1666 final StringBuilder whyFlush = new StringBuilder(); 1667 for (HRegion r : this.server.onlineRegions.values()) { 1668 if (r == null) { 1669 continue; 1670 } 1671 if (r.shouldFlush(whyFlush)) { 1672 FlushRequester requester = server.getFlushRequester(); 1673 if (requester != null) { 1674 long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME; 1675 // Throttle the flushes by putting a delay. If we don't throttle, and there 1676 // is a balanced write-load on the regions in a table, we might end up 1677 // overwhelming the filesystem with too many flushes at once. 1678 if (requester.requestDelayedFlush(r, delay)) { 1679 LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(), 1680 r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay); 1681 } 1682 } 1683 } 1684 } 1685 } 1686 } 1687 1688 /** 1689 * Report the status of the server. A server is online once all the startup is completed (setting 1690 * up filesystem, starting executorService threads, etc.). This method is designed mostly to be 1691 * useful in tests. 1692 * @return true if online, false if not. 1693 */ 1694 public boolean isOnline() { 1695 return online.get(); 1696 } 1697 1698 /** 1699 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1700 * be hooked up to WAL. 1701 */ 1702 private void setupWALAndReplication() throws IOException { 1703 WALFactory factory = new WALFactory(conf, serverName.toString(), this, true); 1704 // TODO Replication make assumptions here based on the default filesystem impl 1705 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1706 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1707 1708 Path logDir = new Path(walRootDir, logName); 1709 LOG.debug("logDir={}", logDir); 1710 if (this.walFs.exists(logDir)) { 1711 throw new RegionServerRunningException( 1712 "Region server has already created directory at " + this.serverName.toString()); 1713 } 1714 // Always create wal directory as now we need this when master restarts to find out the live 1715 // region servers. 1716 if (!this.walFs.mkdirs(logDir)) { 1717 throw new IOException("Can not create wal directory " + logDir); 1718 } 1719 // Instantiate replication if replication enabled. Pass it the log directories. 1720 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); 1721 this.walFactory = factory; 1722 } 1723 1724 /** 1725 * Start up replication source and sink handlers. 1726 */ 1727 private void startReplicationService() throws IOException { 1728 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 1729 this.replicationSourceHandler.startReplicationService(); 1730 } else { 1731 if (this.replicationSourceHandler != null) { 1732 this.replicationSourceHandler.startReplicationService(); 1733 } 1734 if (this.replicationSinkHandler != null) { 1735 this.replicationSinkHandler.startReplicationService(); 1736 } 1737 } 1738 } 1739 1740 /** Returns Master address tracker instance. */ 1741 public MasterAddressTracker getMasterAddressTracker() { 1742 return this.masterAddressTracker; 1743 } 1744 1745 /** 1746 * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need 1747 * to run. This is called after we've successfully registered with the Master. Install an 1748 * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We 1749 * cannot set the handler on all threads. Server's internal Listener thread is off limits. For 1750 * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries 1751 * to run should trigger same critical condition and the shutdown will run. On its way out, this 1752 * server will shut down Server. Leases are sort of inbetween. It has an internal thread that 1753 * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped 1754 * by this hosting server. Worker logs the exception and exits. 1755 */ 1756 private void startServices() throws IOException { 1757 if (!isStopped() && !isAborted()) { 1758 initializeThreads(); 1759 } 1760 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection); 1761 this.secureBulkLoadManager.start(); 1762 1763 // Health checker thread. 1764 if (isHealthCheckerConfigured()) { 1765 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1766 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1767 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1768 } 1769 // Executor status collect thread. 1770 if ( 1771 this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED, 1772 HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED) 1773 ) { 1774 int sleepTime = 1775 this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ); 1776 executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(), 1777 this.metricsRegionServer.getMetricsSource()); 1778 } 1779 1780 this.walRoller = new LogRoller(this); 1781 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1782 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 1783 1784 // Create the CompactedFileDischarger chore executorService. This chore helps to 1785 // remove the compacted files that will no longer be used in reads. 1786 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1787 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1788 int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1789 this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this); 1790 choreService.scheduleChore(compactedFileDischarger); 1791 1792 // Start executor services 1793 final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3); 1794 executorService.startExecutorService(executorService.new ExecutorConfig() 1795 .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads)); 1796 final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1); 1797 executorService.startExecutorService(executorService.new ExecutorConfig() 1798 .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads)); 1799 final int openPriorityRegionThreads = 1800 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3); 1801 executorService.startExecutorService( 1802 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION) 1803 .setCorePoolSize(openPriorityRegionThreads)); 1804 final int closeRegionThreads = 1805 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3); 1806 executorService.startExecutorService(executorService.new ExecutorConfig() 1807 .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads)); 1808 final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1); 1809 executorService.startExecutorService(executorService.new ExecutorConfig() 1810 .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads)); 1811 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1812 final int storeScannerParallelSeekThreads = 1813 conf.getInt("hbase.storescanner.parallel.seek.threads", 10); 1814 executorService.startExecutorService( 1815 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK) 1816 .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true)); 1817 } 1818 final int logReplayOpsThreads = 1819 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 1820 executorService.startExecutorService( 1821 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS) 1822 .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true)); 1823 // Start the threads for compacted files discharger 1824 final int compactionDischargerThreads = 1825 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10); 1826 executorService.startExecutorService(executorService.new ExecutorConfig() 1827 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER) 1828 .setCorePoolSize(compactionDischargerThreads)); 1829 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1830 final int regionReplicaFlushThreads = 1831 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1832 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1833 executorService.startExecutorService(executorService.new ExecutorConfig() 1834 .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS) 1835 .setCorePoolSize(regionReplicaFlushThreads)); 1836 } 1837 final int refreshPeerThreads = 1838 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2); 1839 executorService.startExecutorService(executorService.new ExecutorConfig() 1840 .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads)); 1841 final int replaySyncReplicationWALThreads = 1842 conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1); 1843 executorService.startExecutorService(executorService.new ExecutorConfig() 1844 .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL) 1845 .setCorePoolSize(replaySyncReplicationWALThreads)); 1846 final int switchRpcThrottleThreads = 1847 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); 1848 executorService.startExecutorService( 1849 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE) 1850 .setCorePoolSize(switchRpcThrottleThreads)); 1851 final int claimReplicationQueueThreads = 1852 conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); 1853 executorService.startExecutorService( 1854 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE) 1855 .setCorePoolSize(claimReplicationQueueThreads)); 1856 final int rsSnapshotOperationThreads = 1857 conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); 1858 executorService.startExecutorService( 1859 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) 1860 .setCorePoolSize(rsSnapshotOperationThreads)); 1861 1862 Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", 1863 uncaughtExceptionHandler); 1864 if (this.cacheFlusher != null) { 1865 this.cacheFlusher.start(uncaughtExceptionHandler); 1866 } 1867 Threads.setDaemonThreadRunning(this.procedureResultReporter, 1868 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 1869 1870 if (this.compactionChecker != null) { 1871 choreService.scheduleChore(compactionChecker); 1872 } 1873 if (this.periodicFlusher != null) { 1874 choreService.scheduleChore(periodicFlusher); 1875 } 1876 if (this.healthCheckChore != null) { 1877 choreService.scheduleChore(healthCheckChore); 1878 } 1879 if (this.executorStatusChore != null) { 1880 choreService.scheduleChore(executorStatusChore); 1881 } 1882 if (this.nonceManagerChore != null) { 1883 choreService.scheduleChore(nonceManagerChore); 1884 } 1885 if (this.storefileRefresher != null) { 1886 choreService.scheduleChore(storefileRefresher); 1887 } 1888 if (this.fsUtilizationChore != null) { 1889 choreService.scheduleChore(fsUtilizationChore); 1890 } 1891 if (this.slowLogTableOpsChore != null) { 1892 choreService.scheduleChore(slowLogTableOpsChore); 1893 } 1894 if (this.brokenStoreFileCleaner != null) { 1895 choreService.scheduleChore(brokenStoreFileCleaner); 1896 } 1897 1898 if (this.rsMobFileCleanerChore != null) { 1899 choreService.scheduleChore(rsMobFileCleanerChore); 1900 } 1901 1902 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 1903 // an unhandled exception, it will just exit. 1904 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 1905 uncaughtExceptionHandler); 1906 1907 // Create the log splitting worker and start it 1908 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 1909 // quite a while inside Connection layer. The worker won't be available for other 1910 // tasks even after current task is preempted after a split task times out. 1911 Configuration sinkConf = HBaseConfiguration.create(conf); 1912 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1913 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 1914 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1915 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 1916 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 1917 if ( 1918 this.csm != null 1919 && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 1920 ) { 1921 // SplitLogWorker needs csm. If none, don't start this. 1922 this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); 1923 splitLogWorker.start(); 1924 LOG.debug("SplitLogWorker started"); 1925 } 1926 1927 // Memstore services. 1928 startHeapMemoryManager(); 1929 // Call it after starting HeapMemoryManager. 1930 initializeMemStoreChunkCreator(hMemManager); 1931 } 1932 1933 private void initializeThreads() { 1934 // Cache flushing thread. 1935 this.cacheFlusher = new MemStoreFlusher(conf, this); 1936 1937 // Compaction thread 1938 this.compactSplitThread = new CompactSplit(this); 1939 1940 // Background thread to check for compactions; needed if region has not gotten updates 1941 // in a while. It will take care of not checking too frequently on store-by-store basis. 1942 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 1943 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 1944 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 1945 1946 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 1947 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 1948 if (isSlowLogTableEnabled) { 1949 // default chore duration: 10 min 1950 final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); 1951 slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder); 1952 } 1953 1954 if (this.nonceManager != null) { 1955 // Create the scheduled chore that cleans up nonces. 1956 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 1957 } 1958 1959 // Setup the Quota Manager 1960 rsQuotaManager = new RegionServerRpcQuotaManager(this); 1961 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 1962 1963 if (QuotaUtil.isQuotaEnabled(conf)) { 1964 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 1965 } 1966 1967 boolean onlyMetaRefresh = false; 1968 int storefileRefreshPeriod = 1969 conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1970 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 1971 if (storefileRefreshPeriod == 0) { 1972 storefileRefreshPeriod = 1973 conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 1974 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 1975 onlyMetaRefresh = true; 1976 } 1977 if (storefileRefreshPeriod > 0) { 1978 this.storefileRefresher = 1979 new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); 1980 } 1981 1982 int brokenStoreFileCleanerPeriod = 1983 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, 1984 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); 1985 int brokenStoreFileCleanerDelay = 1986 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, 1987 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); 1988 double brokenStoreFileCleanerDelayJitter = 1989 conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, 1990 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); 1991 double jitterRate = 1992 (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; 1993 long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); 1994 this.brokenStoreFileCleaner = 1995 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), 1996 brokenStoreFileCleanerPeriod, this, conf, this); 1997 1998 this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); 1999 2000 registerConfigurationObservers(); 2001 } 2002 2003 private void registerConfigurationObservers() { 2004 // Registering the compactSplitThread object with the ConfigurationManager. 2005 configurationManager.registerObserver(this.compactSplitThread); 2006 configurationManager.registerObserver(this.rpcServices); 2007 configurationManager.registerObserver(this); 2008 } 2009 2010 /* 2011 * Verify that server is healthy 2012 */ 2013 private boolean isHealthy() { 2014 if (!dataFsOk) { 2015 // File system problem 2016 return false; 2017 } 2018 // Verify that all threads are alive 2019 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2020 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2021 && (this.walRoller == null || this.walRoller.isAlive()) 2022 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2023 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2024 if (!healthy) { 2025 stop("One or more threads are no longer alive -- stop"); 2026 } 2027 return healthy; 2028 } 2029 2030 @Override 2031 public List<WAL> getWALs() { 2032 return walFactory.getWALs(); 2033 } 2034 2035 @Override 2036 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2037 WAL wal = walFactory.getWAL(regionInfo); 2038 if (this.walRoller != null) { 2039 this.walRoller.addWAL(wal); 2040 } 2041 return wal; 2042 } 2043 2044 public LogRoller getWalRoller() { 2045 return walRoller; 2046 } 2047 2048 WALFactory getWalFactory() { 2049 return walFactory; 2050 } 2051 2052 @Override 2053 public void stop(final String msg) { 2054 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2055 } 2056 2057 /** 2058 * Stops the regionserver. 2059 * @param msg Status message 2060 * @param force True if this is a regionserver abort 2061 * @param user The user executing the stop request, or null if no user is associated 2062 */ 2063 public void stop(final String msg, final boolean force, final User user) { 2064 if (!this.stopped) { 2065 LOG.info("***** STOPPING region server '" + this + "' *****"); 2066 if (this.rsHost != null) { 2067 // when forced via abort don't allow CPs to override 2068 try { 2069 this.rsHost.preStop(msg, user); 2070 } catch (IOException ioe) { 2071 if (!force) { 2072 LOG.warn("The region server did not stop", ioe); 2073 return; 2074 } 2075 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2076 } 2077 } 2078 this.stopped = true; 2079 LOG.info("STOPPED: " + msg); 2080 // Wakes run() if it is sleeping 2081 sleeper.skipSleepCycle(); 2082 } 2083 } 2084 2085 public void waitForServerOnline() { 2086 while (!isStopped() && !isOnline()) { 2087 synchronized (online) { 2088 try { 2089 online.wait(msgInterval); 2090 } catch (InterruptedException ie) { 2091 Thread.currentThread().interrupt(); 2092 break; 2093 } 2094 } 2095 } 2096 } 2097 2098 @Override 2099 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2100 HRegion r = context.getRegion(); 2101 long openProcId = context.getOpenProcId(); 2102 long masterSystemTime = context.getMasterSystemTime(); 2103 rpcServices.checkOpen(); 2104 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2105 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2106 // Do checks to see if we need to compact (references or too many files) 2107 // Skip compaction check if region is read only 2108 if (!r.isReadOnly()) { 2109 for (HStore s : r.stores.values()) { 2110 if (s.hasReferences() || s.needsCompaction()) { 2111 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2112 } 2113 } 2114 } 2115 long openSeqNum = r.getOpenSeqNum(); 2116 if (openSeqNum == HConstants.NO_SEQNUM) { 2117 // If we opened a region, we should have read some sequence number from it. 2118 LOG.error( 2119 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2120 openSeqNum = 0; 2121 } 2122 2123 // Notify master 2124 if ( 2125 !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2126 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo())) 2127 ) { 2128 throw new IOException( 2129 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2130 } 2131 2132 triggerFlushInPrimaryRegion(r); 2133 2134 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2135 } 2136 2137 /** 2138 * Helper method for use in tests. Skip the region transition report when there's no master around 2139 * to receive it. 2140 */ 2141 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2142 final TransitionCode code = context.getCode(); 2143 final long openSeqNum = context.getOpenSeqNum(); 2144 long masterSystemTime = context.getMasterSystemTime(); 2145 final RegionInfo[] hris = context.getHris(); 2146 2147 if (code == TransitionCode.OPENED) { 2148 Preconditions.checkArgument(hris != null && hris.length == 1); 2149 if (hris[0].isMetaRegion()) { 2150 LOG.warn( 2151 "meta table location is stored in master local store, so we can not skip reporting"); 2152 return false; 2153 } else { 2154 try { 2155 MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0], 2156 serverName, openSeqNum, masterSystemTime); 2157 } catch (IOException e) { 2158 LOG.info("Failed to update meta", e); 2159 return false; 2160 } 2161 } 2162 } 2163 return true; 2164 } 2165 2166 private ReportRegionStateTransitionRequest 2167 createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) { 2168 final TransitionCode code = context.getCode(); 2169 final long openSeqNum = context.getOpenSeqNum(); 2170 final RegionInfo[] hris = context.getHris(); 2171 final long[] procIds = context.getProcIds(); 2172 2173 ReportRegionStateTransitionRequest.Builder builder = 2174 ReportRegionStateTransitionRequest.newBuilder(); 2175 builder.setServer(ProtobufUtil.toServerName(serverName)); 2176 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2177 transition.setTransitionCode(code); 2178 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2179 transition.setOpenSeqNum(openSeqNum); 2180 } 2181 for (RegionInfo hri : hris) { 2182 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2183 } 2184 for (long procId : procIds) { 2185 transition.addProcId(procId); 2186 } 2187 2188 return builder.build(); 2189 } 2190 2191 @Override 2192 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2193 if (TEST_SKIP_REPORTING_TRANSITION) { 2194 return skipReportingTransition(context); 2195 } 2196 final ReportRegionStateTransitionRequest request = 2197 createReportRegionStateTransitionRequest(context); 2198 2199 int tries = 0; 2200 long pauseTime = this.retryPauseTime; 2201 // Keep looping till we get an error. We want to send reports even though server is going down. 2202 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2203 // HRegionServer does down. 2204 while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) { 2205 RegionServerStatusService.BlockingInterface rss = rssStub; 2206 try { 2207 if (rss == null) { 2208 createRegionServerStatusStub(); 2209 continue; 2210 } 2211 ReportRegionStateTransitionResponse response = 2212 rss.reportRegionStateTransition(null, request); 2213 if (response.hasErrorMessage()) { 2214 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2215 break; 2216 } 2217 // Log if we had to retry else don't log unless TRACE. We want to 2218 // know if were successful after an attempt showed in logs as failed. 2219 if (tries > 0 || LOG.isTraceEnabled()) { 2220 LOG.info("TRANSITION REPORTED " + request); 2221 } 2222 // NOTE: Return mid-method!!! 2223 return true; 2224 } catch (ServiceException se) { 2225 IOException ioe = ProtobufUtil.getRemoteException(se); 2226 boolean pause = ioe instanceof ServerNotRunningYetException 2227 || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException; 2228 if (pause) { 2229 // Do backoff else we flood the Master with requests. 2230 pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries); 2231 } else { 2232 pauseTime = this.retryPauseTime; // Reset. 2233 } 2234 LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" 2235 + tries + ")" 2236 + (pause 2237 ? " after " + pauseTime + "ms delay (Master is coming online...)." 2238 : " immediately."), 2239 ioe); 2240 if (pause) { 2241 Threads.sleep(pauseTime); 2242 } 2243 tries++; 2244 if (rssStub == rss) { 2245 rssStub = null; 2246 } 2247 } 2248 } 2249 return false; 2250 } 2251 2252 /** 2253 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2254 * block this thread. See RegionReplicaFlushHandler for details. 2255 */ 2256 private void triggerFlushInPrimaryRegion(final HRegion region) { 2257 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2258 return; 2259 } 2260 TableName tn = region.getTableDescriptor().getTableName(); 2261 if ( 2262 !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) 2263 || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) || 2264 // If the memstore replication not setup, we do not have to wait for observing a flush event 2265 // from primary before starting to serve reads, because gaps from replication is not 2266 // applicable,this logic is from 2267 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by 2268 // HBASE-13063 2269 !region.getTableDescriptor().hasRegionMemStoreReplication() 2270 ) { 2271 region.setReadsEnabled(true); 2272 return; 2273 } 2274 2275 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2276 // RegionReplicaFlushHandler might reset this. 2277 2278 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2279 if (this.executorService != null) { 2280 this.executorService.submit(new RegionReplicaFlushHandler(this, region)); 2281 } else { 2282 LOG.info("Executor is null; not running flush of primary region replica for {}", 2283 region.getRegionInfo()); 2284 } 2285 } 2286 2287 @InterfaceAudience.Private 2288 public RSRpcServices getRSRpcServices() { 2289 return rpcServices; 2290 } 2291 2292 /** 2293 * Cause the server to exit without closing the regions it is serving, the log it is using and 2294 * without notifying the master. Used unit testing and on catastrophic events such as HDFS is 2295 * yanked out from under hbase or we OOME. n * the reason we are aborting n * the exception that 2296 * caused the abort, or null 2297 */ 2298 @Override 2299 public void abort(String reason, Throwable cause) { 2300 if (!setAbortRequested()) { 2301 // Abort already in progress, ignore the new request. 2302 LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason); 2303 return; 2304 } 2305 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2306 if (cause != null) { 2307 LOG.error(HBaseMarkers.FATAL, msg, cause); 2308 } else { 2309 LOG.error(HBaseMarkers.FATAL, msg); 2310 } 2311 // HBASE-4014: show list of coprocessors that were loaded to help debug 2312 // regionserver crashes.Note that we're implicitly using 2313 // java.util.HashSet's toString() method to print the coprocessor names. 2314 LOG.error(HBaseMarkers.FATAL, 2315 "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors()); 2316 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2317 try { 2318 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2319 } catch (MalformedObjectNameException | IOException e) { 2320 LOG.warn("Failed dumping metrics", e); 2321 } 2322 2323 // Do our best to report our abort to the master, but this may not work 2324 try { 2325 if (cause != null) { 2326 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2327 } 2328 // Report to the master but only if we have already registered with the master. 2329 RegionServerStatusService.BlockingInterface rss = rssStub; 2330 if (rss != null && this.serverName != null) { 2331 ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); 2332 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2333 builder.setErrorMessage(msg); 2334 rss.reportRSFatalError(null, builder.build()); 2335 } 2336 } catch (Throwable t) { 2337 LOG.warn("Unable to report fatal error to master", t); 2338 } 2339 2340 scheduleAbortTimer(); 2341 // shutdown should be run as the internal user 2342 stop(reason, true, null); 2343 } 2344 2345 /* 2346 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does 2347 * close socket in case want to bring up server on old hostname+port immediately. 2348 */ 2349 @InterfaceAudience.Private 2350 protected void kill() { 2351 this.killed = true; 2352 abort("Simulated kill"); 2353 } 2354 2355 // Limits the time spent in the shutdown process. 2356 private void scheduleAbortTimer() { 2357 if (this.abortMonitor == null) { 2358 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2359 TimerTask abortTimeoutTask = null; 2360 try { 2361 Constructor<? extends TimerTask> timerTaskCtor = 2362 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2363 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2364 timerTaskCtor.setAccessible(true); 2365 abortTimeoutTask = timerTaskCtor.newInstance(); 2366 } catch (Exception e) { 2367 LOG.warn("Initialize abort timeout task failed", e); 2368 } 2369 if (abortTimeoutTask != null) { 2370 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2371 } 2372 } 2373 } 2374 2375 /** 2376 * Wait on all threads to finish. Presumption is that all closes and stops have already been 2377 * called. 2378 */ 2379 protected void stopServiceThreads() { 2380 // clean up the scheduled chores 2381 stopChoreService(); 2382 if (bootstrapNodeManager != null) { 2383 bootstrapNodeManager.stop(); 2384 } 2385 if (this.cacheFlusher != null) { 2386 this.cacheFlusher.join(); 2387 } 2388 if (this.walRoller != null) { 2389 this.walRoller.close(); 2390 } 2391 if (this.compactSplitThread != null) { 2392 this.compactSplitThread.join(); 2393 } 2394 stopExecutorService(); 2395 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 2396 this.replicationSourceHandler.stopReplicationService(); 2397 } else { 2398 if (this.replicationSourceHandler != null) { 2399 this.replicationSourceHandler.stopReplicationService(); 2400 } 2401 if (this.replicationSinkHandler != null) { 2402 this.replicationSinkHandler.stopReplicationService(); 2403 } 2404 } 2405 } 2406 2407 /** Returns Return the object that implements the replication source executorService. */ 2408 @Override 2409 public ReplicationSourceService getReplicationSourceService() { 2410 return replicationSourceHandler; 2411 } 2412 2413 /** Returns Return the object that implements the replication sink executorService. */ 2414 public ReplicationSinkService getReplicationSinkService() { 2415 return replicationSinkHandler; 2416 } 2417 2418 /** 2419 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2420 * connection, the current rssStub must be null. Method will block until a master is available. 2421 * You can break from this block by requesting the server stop. 2422 * @return master + port, or null if server has been stopped 2423 */ 2424 private synchronized ServerName createRegionServerStatusStub() { 2425 // Create RS stub without refreshing the master node from ZK, use cached data 2426 return createRegionServerStatusStub(false); 2427 } 2428 2429 /** 2430 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2431 * connection, the current rssStub must be null. Method will block until a master is available. 2432 * You can break from this block by requesting the server stop. 2433 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2434 * @return master + port, or null if server has been stopped 2435 */ 2436 @InterfaceAudience.Private 2437 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2438 if (rssStub != null) { 2439 return masterAddressTracker.getMasterAddress(); 2440 } 2441 ServerName sn = null; 2442 long previousLogTime = 0; 2443 RegionServerStatusService.BlockingInterface intRssStub = null; 2444 LockService.BlockingInterface intLockStub = null; 2445 boolean interrupted = false; 2446 try { 2447 while (keepLooping()) { 2448 sn = this.masterAddressTracker.getMasterAddress(refresh); 2449 if (sn == null) { 2450 if (!keepLooping()) { 2451 // give up with no connection. 2452 LOG.debug("No master found and cluster is stopped; bailing out"); 2453 return null; 2454 } 2455 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2456 LOG.debug("No master found; retry"); 2457 previousLogTime = EnvironmentEdgeManager.currentTime(); 2458 } 2459 refresh = true; // let's try pull it from ZK directly 2460 if (sleepInterrupted(200)) { 2461 interrupted = true; 2462 } 2463 continue; 2464 } 2465 try { 2466 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, 2467 userProvider.getCurrent(), shortOperationTimeout); 2468 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2469 intLockStub = LockService.newBlockingStub(channel); 2470 break; 2471 } catch (IOException e) { 2472 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2473 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 2474 if (e instanceof ServerNotRunningYetException) { 2475 LOG.info("Master isn't available yet, retrying"); 2476 } else { 2477 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2478 } 2479 previousLogTime = EnvironmentEdgeManager.currentTime(); 2480 } 2481 if (sleepInterrupted(200)) { 2482 interrupted = true; 2483 } 2484 } 2485 } 2486 } finally { 2487 if (interrupted) { 2488 Thread.currentThread().interrupt(); 2489 } 2490 } 2491 this.rssStub = intRssStub; 2492 this.lockStub = intLockStub; 2493 return sn; 2494 } 2495 2496 /** 2497 * @return True if we should break loop because cluster is going down or this server has been 2498 * stopped or hdfs has gone bad. 2499 */ 2500 private boolean keepLooping() { 2501 return !this.stopped && isClusterUp(); 2502 } 2503 2504 /* 2505 * Let the master know we're here Run initialization using parameters passed us by the master. 2506 * @return A Map of key/value configurations we got from the Master else null if we failed to 2507 * register. n 2508 */ 2509 private RegionServerStartupResponse reportForDuty() throws IOException { 2510 if (this.masterless) { 2511 return RegionServerStartupResponse.getDefaultInstance(); 2512 } 2513 ServerName masterServerName = createRegionServerStatusStub(true); 2514 RegionServerStatusService.BlockingInterface rss = rssStub; 2515 if (masterServerName == null || rss == null) { 2516 return null; 2517 } 2518 RegionServerStartupResponse result = null; 2519 try { 2520 rpcServices.requestCount.reset(); 2521 rpcServices.rpcGetRequestCount.reset(); 2522 rpcServices.rpcScanRequestCount.reset(); 2523 rpcServices.rpcFullScanRequestCount.reset(); 2524 rpcServices.rpcMultiRequestCount.reset(); 2525 rpcServices.rpcMutateRequestCount.reset(); 2526 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2527 + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode); 2528 long now = EnvironmentEdgeManager.currentTime(); 2529 int port = rpcServices.getSocketAddress().getPort(); 2530 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2531 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2532 request.setUseThisHostnameInstead(useThisHostnameInstead); 2533 } 2534 request.setPort(port); 2535 request.setServerStartCode(this.startcode); 2536 request.setServerCurrentTime(now); 2537 result = rss.regionServerStartup(null, request.build()); 2538 } catch (ServiceException se) { 2539 IOException ioe = ProtobufUtil.getRemoteException(se); 2540 if (ioe instanceof ClockOutOfSyncException) { 2541 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe); 2542 // Re-throw IOE will cause RS to abort 2543 throw ioe; 2544 } else if (ioe instanceof ServerNotRunningYetException) { 2545 LOG.debug("Master is not running yet"); 2546 } else { 2547 LOG.warn("error telling master we are up", se); 2548 } 2549 rssStub = null; 2550 } 2551 return result; 2552 } 2553 2554 @Override 2555 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2556 try { 2557 GetLastFlushedSequenceIdRequest req = 2558 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2559 RegionServerStatusService.BlockingInterface rss = rssStub; 2560 if (rss == null) { // Try to connect one more time 2561 createRegionServerStatusStub(); 2562 rss = rssStub; 2563 if (rss == null) { 2564 // Still no luck, we tried 2565 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2566 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2567 .build(); 2568 } 2569 } 2570 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2571 return RegionStoreSequenceIds.newBuilder() 2572 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2573 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2574 } catch (ServiceException e) { 2575 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2576 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2577 .build(); 2578 } 2579 } 2580 2581 /** 2582 * Close meta region if we carry it 2583 * @param abort Whether we're running an abort. 2584 */ 2585 private void closeMetaTableRegions(final boolean abort) { 2586 HRegion meta = null; 2587 this.onlineRegionsLock.writeLock().lock(); 2588 try { 2589 for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) { 2590 RegionInfo hri = e.getValue().getRegionInfo(); 2591 if (hri.isMetaRegion()) { 2592 meta = e.getValue(); 2593 } 2594 if (meta != null) { 2595 break; 2596 } 2597 } 2598 } finally { 2599 this.onlineRegionsLock.writeLock().unlock(); 2600 } 2601 if (meta != null) { 2602 closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2603 } 2604 } 2605 2606 /** 2607 * Schedule closes on all user regions. Should be safe calling multiple times because it wont' 2608 * close regions that are already closed or that are closing. 2609 * @param abort Whether we're running an abort. 2610 */ 2611 private void closeUserRegions(final boolean abort) { 2612 this.onlineRegionsLock.writeLock().lock(); 2613 try { 2614 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 2615 HRegion r = e.getValue(); 2616 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2617 // Don't update zk with this close transition; pass false. 2618 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2619 } 2620 } 2621 } finally { 2622 this.onlineRegionsLock.writeLock().unlock(); 2623 } 2624 } 2625 2626 protected Map<String, HRegion> getOnlineRegions() { 2627 return this.onlineRegions; 2628 } 2629 2630 public int getNumberOfOnlineRegions() { 2631 return this.onlineRegions.size(); 2632 } 2633 2634 /** 2635 * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM 2636 * as client; HRegion cannot be serialized to cross an rpc. 2637 */ 2638 public Collection<HRegion> getOnlineRegionsLocalContext() { 2639 Collection<HRegion> regions = this.onlineRegions.values(); 2640 return Collections.unmodifiableCollection(regions); 2641 } 2642 2643 @Override 2644 public void addRegion(HRegion region) { 2645 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2646 configurationManager.registerObserver(region); 2647 } 2648 2649 private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region, 2650 long size) { 2651 if (!sortedRegions.containsKey(size)) { 2652 sortedRegions.put(size, new ArrayList<>()); 2653 } 2654 sortedRegions.get(size).add(region); 2655 } 2656 2657 /** 2658 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2659 * the biggest. 2660 */ 2661 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2662 // we'll sort the regions in reverse 2663 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2664 // Copy over all regions. Regions are sorted by size with biggest first. 2665 for (HRegion region : this.onlineRegions.values()) { 2666 addRegion(sortedRegions, region, region.getMemStoreOffHeapSize()); 2667 } 2668 return sortedRegions; 2669 } 2670 2671 /** 2672 * @return A new Map of online regions sorted by region heap size with the first entry being the 2673 * biggest. 2674 */ 2675 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2676 // we'll sort the regions in reverse 2677 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2678 // Copy over all regions. Regions are sorted by size with biggest first. 2679 for (HRegion region : this.onlineRegions.values()) { 2680 addRegion(sortedRegions, region, region.getMemStoreHeapSize()); 2681 } 2682 return sortedRegions; 2683 } 2684 2685 /** Returns reference to FlushRequester */ 2686 @Override 2687 public FlushRequester getFlushRequester() { 2688 return this.cacheFlusher; 2689 } 2690 2691 @Override 2692 public CompactionRequester getCompactionRequestor() { 2693 return this.compactSplitThread; 2694 } 2695 2696 @Override 2697 public LeaseManager getLeaseManager() { 2698 return leaseManager; 2699 } 2700 2701 /** Returns {@code true} when the data file system is available, {@code false} otherwise. */ 2702 boolean isDataFileSystemOk() { 2703 return this.dataFsOk; 2704 } 2705 2706 public RegionServerCoprocessorHost getRegionServerCoprocessorHost() { 2707 return this.rsHost; 2708 } 2709 2710 @Override 2711 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2712 return this.regionsInTransitionInRS; 2713 } 2714 2715 @Override 2716 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 2717 return rsQuotaManager; 2718 } 2719 2720 // 2721 // Main program and support routines 2722 // 2723 /** 2724 * Load the replication executorService objects, if any 2725 */ 2726 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 2727 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { 2728 // read in the name of the source replication class from the config file. 2729 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 2730 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2731 2732 // read in the name of the sink replication class from the config file. 2733 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 2734 HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT); 2735 2736 // If both the sink and the source class names are the same, then instantiate 2737 // only one object. 2738 if (sourceClassname.equals(sinkClassname)) { 2739 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2740 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2741 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 2742 server.sameReplicationSourceAndSink = true; 2743 } else { 2744 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2745 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2746 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 2747 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2748 server.sameReplicationSourceAndSink = false; 2749 } 2750 } 2751 2752 private static <T extends ReplicationService> T newReplicationInstance(String classname, 2753 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 2754 Path oldLogDir, WALFactory walFactory) throws IOException { 2755 final Class<? extends T> clazz; 2756 try { 2757 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 2758 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 2759 } catch (java.lang.ClassNotFoundException nfe) { 2760 throw new IOException("Could not find class for " + classname); 2761 } 2762 T service = ReflectionUtils.newInstance(clazz, conf); 2763 service.initialize(server, walFs, logDir, oldLogDir, walFactory); 2764 return service; 2765 } 2766 2767 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() { 2768 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 2769 if (!this.isOnline()) { 2770 return walGroupsReplicationStatus; 2771 } 2772 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 2773 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 2774 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 2775 for (ReplicationSourceInterface source : allSources) { 2776 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 2777 } 2778 return walGroupsReplicationStatus; 2779 } 2780 2781 /** 2782 * Utility for constructing an instance of the passed HRegionServer class. 2783 */ 2784 static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass, 2785 final Configuration conf) { 2786 try { 2787 Constructor<? extends HRegionServer> c = 2788 regionServerClass.getConstructor(Configuration.class); 2789 return c.newInstance(conf); 2790 } catch (Exception e) { 2791 throw new RuntimeException( 2792 "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); 2793 } 2794 } 2795 2796 /** 2797 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 2798 */ 2799 public static void main(String[] args) { 2800 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 2801 VersionInfo.logVersion(); 2802 Configuration conf = HBaseConfiguration.create(); 2803 @SuppressWarnings("unchecked") 2804 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 2805 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 2806 2807 new HRegionServerCommandLine(regionServerClass).doMain(args); 2808 } 2809 2810 /** 2811 * Gets the online regions of the specified table. This method looks at the in-memory 2812 * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions. 2813 * If a region on this table has been closed during a disable, etc., it will not be included in 2814 * the returned list. So, the returned list may not necessarily be ALL regions in this table, its 2815 * all the ONLINE regions in the table. 2816 * @param tableName table to limit the scope of the query 2817 * @return Online regions from <code>tableName</code> 2818 */ 2819 @Override 2820 public List<HRegion> getRegions(TableName tableName) { 2821 List<HRegion> tableRegions = new ArrayList<>(); 2822 synchronized (this.onlineRegions) { 2823 for (HRegion region : this.onlineRegions.values()) { 2824 RegionInfo regionInfo = region.getRegionInfo(); 2825 if (regionInfo.getTable().equals(tableName)) { 2826 tableRegions.add(region); 2827 } 2828 } 2829 } 2830 return tableRegions; 2831 } 2832 2833 @Override 2834 public List<HRegion> getRegions() { 2835 List<HRegion> allRegions; 2836 synchronized (this.onlineRegions) { 2837 // Return a clone copy of the onlineRegions 2838 allRegions = new ArrayList<>(onlineRegions.values()); 2839 } 2840 return allRegions; 2841 } 2842 2843 /** 2844 * Gets the online tables in this RS. This method looks at the in-memory onlineRegions. 2845 * @return all the online tables in this RS 2846 */ 2847 public Set<TableName> getOnlineTables() { 2848 Set<TableName> tables = new HashSet<>(); 2849 synchronized (this.onlineRegions) { 2850 for (Region region : this.onlineRegions.values()) { 2851 tables.add(region.getTableDescriptor().getTableName()); 2852 } 2853 } 2854 return tables; 2855 } 2856 2857 public String[] getRegionServerCoprocessors() { 2858 TreeSet<String> coprocessors = new TreeSet<>(); 2859 try { 2860 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 2861 } catch (IOException exception) { 2862 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " 2863 + "skipping."); 2864 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 2865 } 2866 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 2867 for (HRegion region : regions) { 2868 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 2869 try { 2870 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 2871 } catch (IOException exception) { 2872 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region 2873 + "; skipping."); 2874 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 2875 } 2876 } 2877 coprocessors.addAll(rsHost.getCoprocessors()); 2878 return coprocessors.toArray(new String[0]); 2879 } 2880 2881 /** 2882 * Try to close the region, logs a warning on failure but continues. 2883 * @param region Region to close 2884 */ 2885 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 2886 try { 2887 if (!closeRegion(region.getEncodedName(), abort, null)) { 2888 LOG 2889 .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing"); 2890 } 2891 } catch (IOException e) { 2892 LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing", 2893 e); 2894 } 2895 } 2896 2897 /** 2898 * Close asynchronously a region, can be called from the master or internally by the regionserver 2899 * when stopping. If called from the master, the region will update the status. 2900 * <p> 2901 * If an opening was in progress, this method will cancel it, but will not start a new close. The 2902 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 2903 * </p> 2904 * <p> 2905 * If a close was in progress, this new request will be ignored, and an exception thrown. 2906 * </p> 2907 * @param encodedName Region to close 2908 * @param abort True if we are aborting 2909 * @param destination Where the Region is being moved too... maybe null if unknown. 2910 * @return True if closed a region. 2911 * @throws NotServingRegionException if the region is not online 2912 */ 2913 protected boolean closeRegion(String encodedName, final boolean abort, 2914 final ServerName destination) throws NotServingRegionException { 2915 // Check for permissions to close. 2916 HRegion actualRegion = this.getRegion(encodedName); 2917 // Can be null if we're calling close on a region that's not online 2918 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 2919 try { 2920 actualRegion.getCoprocessorHost().preClose(false); 2921 } catch (IOException exp) { 2922 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 2923 return false; 2924 } 2925 } 2926 2927 // previous can come back 'null' if not in map. 2928 final Boolean previous = 2929 this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE); 2930 2931 if (Boolean.TRUE.equals(previous)) { 2932 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " 2933 + "trying to OPEN. Cancelling OPENING."); 2934 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 2935 // The replace failed. That should be an exceptional case, but theoretically it can happen. 2936 // We're going to try to do a standard close then. 2937 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." 2938 + " Doing a standard close now"); 2939 return closeRegion(encodedName, abort, destination); 2940 } 2941 // Let's get the region from the online region list again 2942 actualRegion = this.getRegion(encodedName); 2943 if (actualRegion == null) { // If already online, we still need to close it. 2944 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 2945 // The master deletes the znode when it receives this exception. 2946 throw new NotServingRegionException( 2947 "The region " + encodedName + " was opening but not yet served. Opening is cancelled."); 2948 } 2949 } else if (previous == null) { 2950 LOG.info("Received CLOSE for {}", encodedName); 2951 } else if (Boolean.FALSE.equals(previous)) { 2952 LOG.info("Received CLOSE for the region: " + encodedName 2953 + ", which we are already trying to CLOSE, but not completed yet"); 2954 return true; 2955 } 2956 2957 if (actualRegion == null) { 2958 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 2959 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 2960 // The master deletes the znode when it receives this exception. 2961 throw new NotServingRegionException( 2962 "The region " + encodedName + " is not online, and is not opening."); 2963 } 2964 2965 CloseRegionHandler crh; 2966 final RegionInfo hri = actualRegion.getRegionInfo(); 2967 if (hri.isMetaRegion()) { 2968 crh = new CloseMetaHandler(this, this, hri, abort); 2969 } else { 2970 crh = new CloseRegionHandler(this, this, hri, abort, destination); 2971 } 2972 this.executorService.submit(crh); 2973 return true; 2974 } 2975 2976 /** 2977 * @return HRegion for the passed binary <code>regionName</code> or null if named region is not 2978 * member of the online regions. 2979 */ 2980 public HRegion getOnlineRegion(final byte[] regionName) { 2981 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 2982 return this.onlineRegions.get(encodedRegionName); 2983 } 2984 2985 @Override 2986 public HRegion getRegion(final String encodedRegionName) { 2987 return this.onlineRegions.get(encodedRegionName); 2988 } 2989 2990 @Override 2991 public boolean removeRegion(final HRegion r, ServerName destination) { 2992 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 2993 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 2994 if (destination != null) { 2995 long closeSeqNum = r.getMaxFlushedSeqId(); 2996 if (closeSeqNum == HConstants.NO_SEQNUM) { 2997 // No edits in WAL for this region; get the sequence number when the region was opened. 2998 closeSeqNum = r.getOpenSeqNum(); 2999 if (closeSeqNum == HConstants.NO_SEQNUM) { 3000 closeSeqNum = 0; 3001 } 3002 } 3003 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3004 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3005 if (selfMove) { 3006 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put( 3007 r.getRegionInfo().getEncodedName(), 3008 new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3009 } 3010 } 3011 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3012 configurationManager.deregisterObserver(r); 3013 return toReturn != null; 3014 } 3015 3016 /** 3017 * Protected Utility method for safely obtaining an HRegion handle. 3018 * @param regionName Name of online {@link HRegion} to return 3019 * @return {@link HRegion} for <code>regionName</code> 3020 */ 3021 protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { 3022 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3023 return getRegionByEncodedName(regionName, encodedRegionName); 3024 } 3025 3026 public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { 3027 return getRegionByEncodedName(null, encodedRegionName); 3028 } 3029 3030 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3031 throws NotServingRegionException { 3032 HRegion region = this.onlineRegions.get(encodedRegionName); 3033 if (region == null) { 3034 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3035 if (moveInfo != null) { 3036 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3037 } 3038 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3039 String regionNameStr = 3040 regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName); 3041 if (isOpening != null && isOpening) { 3042 throw new RegionOpeningException( 3043 "Region " + regionNameStr + " is opening on " + this.serverName); 3044 } 3045 throw new NotServingRegionException( 3046 "" + regionNameStr + " is not online on " + this.serverName); 3047 } 3048 return region; 3049 } 3050 3051 /** 3052 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't 3053 * already. 3054 * @param t Throwable 3055 * @param msg Message to log in error. Can be null. 3056 * @return Throwable converted to an IOE; methods can only let out IOEs. 3057 */ 3058 private Throwable cleanup(final Throwable t, final String msg) { 3059 // Don't log as error if NSRE; NSRE is 'normal' operation. 3060 if (t instanceof NotServingRegionException) { 3061 LOG.debug("NotServingRegionException; " + t.getMessage()); 3062 return t; 3063 } 3064 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3065 if (msg == null) { 3066 LOG.error("", e); 3067 } else { 3068 LOG.error(msg, e); 3069 } 3070 if (!rpcServices.checkOOME(t)) { 3071 checkFileSystem(); 3072 } 3073 return t; 3074 } 3075 3076 /** 3077 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3078 * @return Make <code>t</code> an IOE if it isn't already. 3079 */ 3080 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3081 return (t instanceof IOException ? (IOException) t 3082 : msg == null || msg.length() == 0 ? new IOException(t) 3083 : new IOException(msg, t)); 3084 } 3085 3086 /** 3087 * Checks to see if the file system is still accessible. If not, sets abortRequested and 3088 * stopRequested 3089 * @return false if file system is not available 3090 */ 3091 boolean checkFileSystem() { 3092 if (this.dataFsOk && this.dataFs != null) { 3093 try { 3094 FSUtils.checkFileSystemAvailable(this.dataFs); 3095 } catch (IOException e) { 3096 abort("File System not available", e); 3097 this.dataFsOk = false; 3098 } 3099 } 3100 return this.dataFsOk; 3101 } 3102 3103 @Override 3104 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3105 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3106 Address[] addr = new Address[favoredNodes.size()]; 3107 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3108 // it is a map of region name to Address[] 3109 for (int i = 0; i < favoredNodes.size(); i++) { 3110 addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); 3111 } 3112 regionFavoredNodesMap.put(encodedRegionName, addr); 3113 } 3114 3115 /** 3116 * Return the favored nodes for a region given its encoded name. Look at the comment around 3117 * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here. 3118 * @param encodedRegionName the encoded region name. 3119 * @return array of favored locations 3120 */ 3121 @Override 3122 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3123 return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); 3124 } 3125 3126 @Override 3127 public ServerNonceManager getNonceManager() { 3128 return this.nonceManager; 3129 } 3130 3131 private static class MovedRegionInfo { 3132 private final ServerName serverName; 3133 private final long seqNum; 3134 3135 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3136 this.serverName = serverName; 3137 this.seqNum = closeSeqNum; 3138 } 3139 3140 public ServerName getServerName() { 3141 return serverName; 3142 } 3143 3144 public long getSeqNum() { 3145 return seqNum; 3146 } 3147 } 3148 3149 /** 3150 * We need a timeout. If not there is a risk of giving a wrong information: this would double the 3151 * number of network calls instead of reducing them. 3152 */ 3153 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3154 3155 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, 3156 boolean selfMove) { 3157 if (selfMove) { 3158 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3159 return; 3160 } 3161 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" 3162 + closeSeqNum); 3163 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3164 } 3165 3166 void removeFromMovedRegions(String encodedName) { 3167 movedRegionInfoCache.invalidate(encodedName); 3168 } 3169 3170 @InterfaceAudience.Private 3171 public MovedRegionInfo getMovedRegion(String encodedRegionName) { 3172 return movedRegionInfoCache.getIfPresent(encodedRegionName); 3173 } 3174 3175 @InterfaceAudience.Private 3176 public int movedRegionCacheExpiredTime() { 3177 return TIMEOUT_REGION_MOVED; 3178 } 3179 3180 private String getMyEphemeralNodePath() { 3181 return zooKeeper.getZNodePaths().getRsPath(serverName); 3182 } 3183 3184 private boolean isHealthCheckerConfigured() { 3185 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3186 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3187 } 3188 3189 /** Returns the underlying {@link CompactSplit} for the servers */ 3190 public CompactSplit getCompactSplitThread() { 3191 return this.compactSplitThread; 3192 } 3193 3194 CoprocessorServiceResponse execRegionServerService( 3195 @SuppressWarnings("UnusedParameters") final RpcController controller, 3196 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3197 try { 3198 ServerRpcController serviceController = new ServerRpcController(); 3199 CoprocessorServiceCall call = serviceRequest.getCall(); 3200 String serviceName = call.getServiceName(); 3201 Service service = coprocessorServiceHandlers.get(serviceName); 3202 if (service == null) { 3203 throw new UnknownProtocolException(null, 3204 "No registered coprocessor executorService found for " + serviceName); 3205 } 3206 ServiceDescriptor serviceDesc = service.getDescriptorForType(); 3207 3208 String methodName = call.getMethodName(); 3209 MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName); 3210 if (methodDesc == null) { 3211 throw new UnknownProtocolException(service.getClass(), 3212 "Unknown method " + methodName + " called on executorService " + serviceName); 3213 } 3214 3215 Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3216 final Message.Builder responseBuilder = 3217 service.getResponsePrototype(methodDesc).newBuilderForType(); 3218 service.callMethod(methodDesc, serviceController, request, message -> { 3219 if (message != null) { 3220 responseBuilder.mergeFrom(message); 3221 } 3222 }); 3223 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3224 if (exception != null) { 3225 throw exception; 3226 } 3227 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3228 } catch (IOException ie) { 3229 throw new ServiceException(ie); 3230 } 3231 } 3232 3233 /** 3234 * May be null if this is a master which not carry table. 3235 * @return The block cache instance used by the regionserver. 3236 */ 3237 @Override 3238 public Optional<BlockCache> getBlockCache() { 3239 return Optional.ofNullable(this.blockCache); 3240 } 3241 3242 /** 3243 * May be null if this is a master which not carry table. 3244 * @return The cache for mob files used by the regionserver. 3245 */ 3246 @Override 3247 public Optional<MobFileCache> getMobFileCache() { 3248 return Optional.ofNullable(this.mobFileCache); 3249 } 3250 3251 /** Returns : Returns the ConfigurationManager object for testing purposes. */ 3252 ConfigurationManager getConfigurationManager() { 3253 return configurationManager; 3254 } 3255 3256 CacheEvictionStats clearRegionBlockCache(Region region) { 3257 long evictedBlocks = 0; 3258 3259 for (Store store : region.getStores()) { 3260 for (StoreFile hFile : store.getStorefiles()) { 3261 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3262 } 3263 } 3264 3265 return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build(); 3266 } 3267 3268 @Override 3269 public double getCompactionPressure() { 3270 double max = 0; 3271 for (Region region : onlineRegions.values()) { 3272 for (Store store : region.getStores()) { 3273 double normCount = store.getCompactionPressure(); 3274 if (normCount > max) { 3275 max = normCount; 3276 } 3277 } 3278 } 3279 return max; 3280 } 3281 3282 @Override 3283 public HeapMemoryManager getHeapMemoryManager() { 3284 return hMemManager; 3285 } 3286 3287 public MemStoreFlusher getMemStoreFlusher() { 3288 return cacheFlusher; 3289 } 3290 3291 /** 3292 * For testing 3293 * @return whether all wal roll request finished for this regionserver 3294 */ 3295 @InterfaceAudience.Private 3296 public boolean walRollRequestFinished() { 3297 return this.walRoller.walRollFinished(); 3298 } 3299 3300 @Override 3301 public ThroughputController getFlushThroughputController() { 3302 return flushThroughputController; 3303 } 3304 3305 @Override 3306 public double getFlushPressure() { 3307 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3308 // return 0 during RS initialization 3309 return 0.0; 3310 } 3311 return getRegionServerAccounting().getFlushPressure(); 3312 } 3313 3314 @Override 3315 public void onConfigurationChange(Configuration newConf) { 3316 ThroughputController old = this.flushThroughputController; 3317 if (old != null) { 3318 old.stop("configuration change"); 3319 } 3320 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3321 try { 3322 Superusers.initialize(newConf); 3323 } catch (IOException e) { 3324 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3325 } 3326 3327 // update region server coprocessor if the configuration has changed. 3328 if ( 3329 CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf, 3330 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY) 3331 ) { 3332 LOG.info("Update region server coprocessors because the configuration has changed"); 3333 this.rsHost = new RegionServerCoprocessorHost(this, newConf); 3334 } 3335 } 3336 3337 @Override 3338 public MetricsRegionServer getMetrics() { 3339 return metricsRegionServer; 3340 } 3341 3342 @Override 3343 public SecureBulkLoadManager getSecureBulkLoadManager() { 3344 return this.secureBulkLoadManager; 3345 } 3346 3347 @Override 3348 public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description, 3349 final Abortable abort) { 3350 final LockServiceClient client = 3351 new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator()); 3352 return client.regionLock(regionInfo, description, abort); 3353 } 3354 3355 @Override 3356 public void unassign(byte[] regionName) throws IOException { 3357 FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false)); 3358 } 3359 3360 @Override 3361 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3362 return this.rsSpaceQuotaManager; 3363 } 3364 3365 @Override 3366 public boolean reportFileArchivalForQuotas(TableName tableName, 3367 Collection<Entry<String, Long>> archivedFiles) { 3368 if (TEST_SKIP_REPORTING_TRANSITION) { 3369 return false; 3370 } 3371 RegionServerStatusService.BlockingInterface rss = rssStub; 3372 if (rss == null || rsSpaceQuotaManager == null) { 3373 // the current server could be stopping. 3374 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3375 return false; 3376 } 3377 try { 3378 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3379 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3380 rss.reportFileArchival(null, request); 3381 } catch (ServiceException se) { 3382 IOException ioe = ProtobufUtil.getRemoteException(se); 3383 if (ioe instanceof PleaseHoldException) { 3384 if (LOG.isTraceEnabled()) { 3385 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3386 + " This will be retried.", ioe); 3387 } 3388 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3389 return false; 3390 } 3391 if (rssStub == rss) { 3392 rssStub = null; 3393 } 3394 // re-create the stub if we failed to report the archival 3395 createRegionServerStatusStub(true); 3396 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3397 return false; 3398 } 3399 return true; 3400 } 3401 3402 void executeProcedure(long procId, RSProcedureCallable callable) { 3403 executorService.submit(new RSProcedureHandler(this, procId, callable)); 3404 } 3405 3406 public void remoteProcedureComplete(long procId, Throwable error) { 3407 procedureResultReporter.complete(procId, error); 3408 } 3409 3410 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3411 RegionServerStatusService.BlockingInterface rss; 3412 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 3413 for (;;) { 3414 rss = rssStub; 3415 if (rss != null) { 3416 break; 3417 } 3418 createRegionServerStatusStub(); 3419 } 3420 try { 3421 rss.reportProcedureDone(null, request); 3422 } catch (ServiceException se) { 3423 if (rssStub == rss) { 3424 rssStub = null; 3425 } 3426 throw ProtobufUtil.getRemoteException(se); 3427 } 3428 } 3429 3430 /** 3431 * Will ignore the open/close region procedures which already submitted or executed. When master 3432 * had unfinished open/close region procedure and restarted, new active master may send duplicate 3433 * open/close region request to regionserver. The open/close request is submitted to a thread pool 3434 * and execute. So first need a cache for submitted open/close region procedures. After the 3435 * open/close region request executed and report region transition succeed, cache it in executed 3436 * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3437 * transition succeed, master will not send the open/close region request to regionserver again. 3438 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3439 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See 3440 * HBASE-22404 for more details. 3441 * @param procId the id of the open/close region procedure 3442 * @return true if the procedure can be submitted. 3443 */ 3444 boolean submitRegionProcedure(long procId) { 3445 if (procId == -1) { 3446 return true; 3447 } 3448 // Ignore the region procedures which already submitted. 3449 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3450 if (previous != null) { 3451 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3452 return false; 3453 } 3454 // Ignore the region procedures which already executed. 3455 if (executedRegionProcedures.getIfPresent(procId) != null) { 3456 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3457 return false; 3458 } 3459 return true; 3460 } 3461 3462 /** 3463 * See {@link #submitRegionProcedure(long)}. 3464 * @param procId the id of the open/close region procedure 3465 */ 3466 public void finishRegionProcedure(long procId) { 3467 executedRegionProcedures.put(procId, procId); 3468 submittedRegionProcedures.remove(procId); 3469 } 3470 3471 /** 3472 * Force to terminate region server when abort timeout. 3473 */ 3474 private static class SystemExitWhenAbortTimeout extends TimerTask { 3475 3476 public SystemExitWhenAbortTimeout() { 3477 } 3478 3479 @Override 3480 public void run() { 3481 LOG.warn("Aborting region server timed out, terminating forcibly" 3482 + " and does not wait for any running shutdown hooks or finalizers to finish their work." 3483 + " Thread dump to stdout."); 3484 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3485 Runtime.getRuntime().halt(1); 3486 } 3487 } 3488 3489 @InterfaceAudience.Private 3490 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 3491 return compactedFileDischarger; 3492 } 3493 3494 /** 3495 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}} 3496 * @return pause time 3497 */ 3498 @InterfaceAudience.Private 3499 public long getRetryPauseTime() { 3500 return this.retryPauseTime; 3501 } 3502 3503 @Override 3504 public Optional<ServerName> getActiveMaster() { 3505 return Optional.ofNullable(masterAddressTracker.getMasterAddress()); 3506 } 3507 3508 @Override 3509 public List<ServerName> getBackupMasters() { 3510 return masterAddressTracker.getBackupMasters(); 3511 } 3512 3513 @Override 3514 public Iterator<ServerName> getBootstrapNodes() { 3515 return bootstrapNodeManager.getBootstrapNodes().iterator(); 3516 } 3517 3518 @Override 3519 public List<HRegionLocation> getMetaLocations() { 3520 return metaRegionLocationCache.getMetaRegionLocations(); 3521 } 3522 3523 @Override 3524 protected NamedQueueRecorder createNamedQueueRecord() { 3525 final boolean isOnlineLogProviderEnabled = conf.getBoolean( 3526 HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 3527 if (isOnlineLogProviderEnabled) { 3528 return NamedQueueRecorder.getInstance(conf); 3529 } else { 3530 return null; 3531 } 3532 } 3533 3534 @Override 3535 protected boolean clusterMode() { 3536 // this method will be called in the constructor of super class, so we can not return masterless 3537 // directly here, as it will always be false. 3538 return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 3539 } 3540 3541 @InterfaceAudience.Private 3542 public BrokenStoreFileCleaner getBrokenStoreFileCleaner() { 3543 return brokenStoreFileCleaner; 3544 } 3545 3546 @InterfaceAudience.Private 3547 public RSMobFileCleanerChore getRSMobFileCleanerChore() { 3548 return rsMobFileCleanerChore; 3549 } 3550 3551 RSSnapshotVerifier getRsSnapshotVerifier() { 3552 return rsSnapshotVerifier; 3553 } 3554 3555 @Override 3556 protected void stopChores() { 3557 shutdownChore(nonceManagerChore); 3558 shutdownChore(compactionChecker); 3559 shutdownChore(compactedFileDischarger); 3560 shutdownChore(periodicFlusher); 3561 shutdownChore(healthCheckChore); 3562 shutdownChore(executorStatusChore); 3563 shutdownChore(storefileRefresher); 3564 shutdownChore(fsUtilizationChore); 3565 shutdownChore(slowLogTableOpsChore); 3566 shutdownChore(brokenStoreFileCleaner); 3567 shutdownChore(rsMobFileCleanerChore); 3568 } 3569 3570 @Override 3571 public RegionReplicationBufferManager getRegionReplicationBufferManager() { 3572 return regionReplicationBufferManager; 3573 } 3574}