001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT; 026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; 027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT; 028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY; 029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT; 030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; 031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; 032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; 033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; 034 035import io.opentelemetry.api.trace.Span; 036import io.opentelemetry.api.trace.StatusCode; 037import io.opentelemetry.context.Scope; 038import java.io.IOException; 039import java.io.PrintWriter; 040import java.lang.management.MemoryUsage; 041import java.lang.reflect.Constructor; 042import java.net.InetSocketAddress; 043import java.time.Duration; 044import java.util.ArrayList; 045import java.util.Collection; 046import java.util.Collections; 047import java.util.Comparator; 048import java.util.HashSet; 049import java.util.Iterator; 050import java.util.List; 051import java.util.Map; 052import java.util.Map.Entry; 053import java.util.Objects; 054import java.util.Optional; 055import java.util.Set; 056import java.util.SortedMap; 057import java.util.Timer; 058import java.util.TimerTask; 059import java.util.TreeMap; 060import java.util.TreeSet; 061import java.util.concurrent.ConcurrentHashMap; 062import java.util.concurrent.ConcurrentMap; 063import java.util.concurrent.ConcurrentSkipListMap; 064import java.util.concurrent.ThreadLocalRandom; 065import java.util.concurrent.TimeUnit; 066import java.util.concurrent.atomic.AtomicBoolean; 067import java.util.concurrent.locks.ReentrantReadWriteLock; 068import java.util.stream.Collectors; 069import javax.management.MalformedObjectNameException; 070import javax.servlet.http.HttpServlet; 071import org.apache.commons.lang3.StringUtils; 072import org.apache.commons.lang3.mutable.MutableFloat; 073import org.apache.hadoop.conf.Configuration; 074import org.apache.hadoop.fs.FileSystem; 075import org.apache.hadoop.fs.Path; 076import org.apache.hadoop.hbase.Abortable; 077import org.apache.hadoop.hbase.CacheEvictionStats; 078import org.apache.hadoop.hbase.CallQueueTooBigException; 079import org.apache.hadoop.hbase.ClockOutOfSyncException; 080import org.apache.hadoop.hbase.DoNotRetryIOException; 081import org.apache.hadoop.hbase.ExecutorStatusChore; 082import org.apache.hadoop.hbase.HBaseConfiguration; 083import org.apache.hadoop.hbase.HBaseInterfaceAudience; 084import org.apache.hadoop.hbase.HBaseServerBase; 085import org.apache.hadoop.hbase.HConstants; 086import org.apache.hadoop.hbase.HDFSBlocksDistribution; 087import org.apache.hadoop.hbase.HRegionLocation; 088import org.apache.hadoop.hbase.HealthCheckChore; 089import org.apache.hadoop.hbase.MetaTableAccessor; 090import org.apache.hadoop.hbase.NotServingRegionException; 091import org.apache.hadoop.hbase.PleaseHoldException; 092import org.apache.hadoop.hbase.ScheduledChore; 093import org.apache.hadoop.hbase.ServerName; 094import org.apache.hadoop.hbase.Stoppable; 095import org.apache.hadoop.hbase.TableName; 096import org.apache.hadoop.hbase.YouAreDeadException; 097import org.apache.hadoop.hbase.ZNodeClearer; 098import org.apache.hadoop.hbase.client.ConnectionUtils; 099import org.apache.hadoop.hbase.client.RegionInfo; 100import org.apache.hadoop.hbase.client.RegionInfoBuilder; 101import org.apache.hadoop.hbase.client.locking.EntityLock; 102import org.apache.hadoop.hbase.client.locking.LockServiceClient; 103import org.apache.hadoop.hbase.conf.ConfigurationObserver; 104import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 105import org.apache.hadoop.hbase.exceptions.RegionMovedException; 106import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 107import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 108import org.apache.hadoop.hbase.executor.ExecutorType; 109import org.apache.hadoop.hbase.http.InfoServer; 110import org.apache.hadoop.hbase.io.hfile.BlockCache; 111import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 112import org.apache.hadoop.hbase.io.hfile.HFile; 113import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 114import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 115import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException; 116import org.apache.hadoop.hbase.ipc.RpcClient; 117import org.apache.hadoop.hbase.ipc.RpcServer; 118import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 119import org.apache.hadoop.hbase.ipc.ServerRpcController; 120import org.apache.hadoop.hbase.log.HBaseMarkers; 121import org.apache.hadoop.hbase.mob.MobFileCache; 122import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; 123import org.apache.hadoop.hbase.monitoring.TaskMonitor; 124import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 125import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore; 126import org.apache.hadoop.hbase.net.Address; 127import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 128import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 129import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 130import org.apache.hadoop.hbase.quotas.QuotaUtil; 131import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 132import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 133import org.apache.hadoop.hbase.quotas.RegionSize; 134import org.apache.hadoop.hbase.quotas.RegionSizeStore; 135import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 136import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 137import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 138import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 139import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 140import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 141import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 142import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 143import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet; 144import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; 145import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; 146import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 147import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 148import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 149import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; 150import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 151import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; 152import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 153import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 154import org.apache.hadoop.hbase.security.SecurityConstants; 155import org.apache.hadoop.hbase.security.Superusers; 156import org.apache.hadoop.hbase.security.User; 157import org.apache.hadoop.hbase.security.UserProvider; 158import org.apache.hadoop.hbase.trace.TraceUtil; 159import org.apache.hadoop.hbase.util.Bytes; 160import org.apache.hadoop.hbase.util.CompressionTest; 161import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 162import org.apache.hadoop.hbase.util.DNS; 163import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 164import org.apache.hadoop.hbase.util.FSUtils; 165import org.apache.hadoop.hbase.util.FutureUtils; 166import org.apache.hadoop.hbase.util.JvmPauseMonitor; 167import org.apache.hadoop.hbase.util.Pair; 168import org.apache.hadoop.hbase.util.RetryCounter; 169import org.apache.hadoop.hbase.util.RetryCounterFactory; 170import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 171import org.apache.hadoop.hbase.util.Threads; 172import org.apache.hadoop.hbase.util.VersionInfo; 173import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 174import org.apache.hadoop.hbase.wal.WAL; 175import org.apache.hadoop.hbase.wal.WALFactory; 176import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 177import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 178import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 179import org.apache.hadoop.hbase.zookeeper.ZKUtil; 180import org.apache.hadoop.ipc.RemoteException; 181import org.apache.hadoop.util.ReflectionUtils; 182import org.apache.yetus.audience.InterfaceAudience; 183import org.apache.zookeeper.KeeperException; 184import org.slf4j.Logger; 185import org.slf4j.LoggerFactory; 186 187import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 188import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 189import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 190import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 191import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 192import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses; 193import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 194import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 195import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 196import org.apache.hbase.thirdparty.com.google.protobuf.Message; 197import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 198import org.apache.hbase.thirdparty.com.google.protobuf.Service; 199import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 200import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 201import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 202 203import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 204import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 233 234/** 235 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There 236 * are many HRegionServers in a single HBase deployment. 237 */ 238@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 239@SuppressWarnings({ "deprecation" }) 240public class HRegionServer extends HBaseServerBase<RSRpcServices> 241 implements RegionServerServices, LastSequenceId { 242 243 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 244 245 int unitMB = 1024 * 1024; 246 int unitKB = 1024; 247 248 /** 249 * For testing only! Set to true to skip notifying region assignment to master . 250 */ 251 @InterfaceAudience.Private 252 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") 253 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 254 255 /** 256 * A map from RegionName to current action in progress. Boolean value indicates: true - if open 257 * region action in progress false - if close region action in progress 258 */ 259 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 260 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 261 262 /** 263 * Used to cache the open/close region procedures which already submitted. See 264 * {@link #submitRegionProcedure(long)}. 265 */ 266 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 267 /** 268 * Used to cache the open/close region procedures which already executed. See 269 * {@link #submitRegionProcedure(long)}. 270 */ 271 private final Cache<Long, Long> executedRegionProcedures = 272 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 273 274 /** 275 * Used to cache the moved-out regions 276 */ 277 private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder() 278 .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build(); 279 280 private MemStoreFlusher cacheFlusher; 281 282 private HeapMemoryManager hMemManager; 283 284 // Replication services. If no replication, this handler will be null. 285 private ReplicationSourceService replicationSourceHandler; 286 private ReplicationSinkService replicationSinkHandler; 287 private boolean sameReplicationSourceAndSink; 288 289 // Compactions 290 private CompactSplit compactSplitThread; 291 292 /** 293 * Map of regions currently being served by this region server. Key is the encoded region name. 294 * All access should be synchronized. 295 */ 296 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 297 /** 298 * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it 299 * need to be a ConcurrentHashMap? 300 */ 301 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 302 303 /** 304 * Map of encoded region names to the DataNode locations they should be hosted on We store the 305 * value as Address since InetSocketAddress is required by the HDFS API (create() that takes 306 * favored nodes as hints for placing file blocks). We could have used ServerName here as the 307 * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API 308 * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and 309 * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the 310 * conversion on demand from Address to InetSocketAddress will guarantee the resolution results 311 * will be fresh when we need it. 312 */ 313 private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 314 315 private LeaseManager leaseManager; 316 317 private volatile boolean dataFsOk; 318 private volatile boolean isGlobalReadOnlyEnabled; 319 320 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 321 // Default abort timeout is 1200 seconds for safe 322 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 323 // Will run this task when abort timeout 324 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 325 326 // A state before we go into stopped state. At this stage we're closing user 327 // space regions. 328 private boolean stopping = false; 329 private volatile boolean killed = false; 330 331 private final int threadWakeFrequency; 332 333 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 334 private final int compactionCheckFrequency; 335 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 336 private final int flushCheckFrequency; 337 338 // Stub to do region server status calls against the master. 339 private volatile RegionServerStatusService.BlockingInterface rssStub; 340 private volatile LockService.BlockingInterface lockStub; 341 // RPC client. Used to make the stub above that does region server status checking. 342 private RpcClient rpcClient; 343 344 private UncaughtExceptionHandler uncaughtExceptionHandler; 345 346 private JvmPauseMonitor pauseMonitor; 347 348 private RSSnapshotVerifier rsSnapshotVerifier; 349 350 /** region server process name */ 351 public static final String REGIONSERVER = "regionserver"; 352 353 private MetricsRegionServer metricsRegionServer; 354 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 355 356 /** 357 * Check for compactions requests. 358 */ 359 private ScheduledChore compactionChecker; 360 361 /** 362 * Check for flushes 363 */ 364 private ScheduledChore periodicFlusher; 365 366 private volatile WALFactory walFactory; 367 368 private LogRoller walRoller; 369 370 // A thread which calls reportProcedureDone 371 private RemoteProcedureResultReporter procedureResultReporter; 372 373 // flag set after we're done setting up server threads 374 final AtomicBoolean online = new AtomicBoolean(false); 375 376 // master address tracker 377 private final MasterAddressTracker masterAddressTracker; 378 379 // Log Splitting Worker 380 private SplitLogWorker splitLogWorker; 381 382 private final int shortOperationTimeout; 383 384 // Time to pause if master says 'please hold' 385 private final long retryPauseTime; 386 387 private final RegionServerAccounting regionServerAccounting; 388 389 private NamedQueueServiceChore namedQueueServiceChore = null; 390 391 // Block cache 392 private BlockCache blockCache; 393 // The cache for mob files 394 private MobFileCache mobFileCache; 395 396 /** The health check chore. */ 397 private HealthCheckChore healthCheckChore; 398 399 /** The Executor status collect chore. */ 400 private ExecutorStatusChore executorStatusChore; 401 402 /** The nonce manager chore. */ 403 private ScheduledChore nonceManagerChore; 404 405 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap(); 406 407 /** 408 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use 409 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead. 410 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a> 411 */ 412 @Deprecated 413 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 414 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 415 "hbase.regionserver.hostname.disable.master.reversedns"; 416 417 /** 418 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive. 419 * Exception will be thrown if both are used. 420 */ 421 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 422 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 423 "hbase.unsafe.regionserver.hostname.disable.master.reversedns"; 424 425 /** 426 * Unique identifier for the cluster we are a part of. 427 */ 428 private String clusterId; 429 430 // chore for refreshing store files for secondary regions 431 private StorefileRefresherChore storefileRefresher; 432 433 private volatile RegionServerCoprocessorHost rsHost; 434 435 private RegionServerProcedureManagerHost rspmHost; 436 437 private RegionServerRpcQuotaManager rsQuotaManager; 438 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 439 440 /** 441 * Nonce manager. Nonces are used to make operations like increment and append idempotent in the 442 * case where client doesn't receive the response from a successful operation and retries. We 443 * track the successful ops for some time via a nonce sent by client and handle duplicate 444 * operations (currently, by failing them; in future we might use MVCC to return result). Nonces 445 * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL 446 * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past 447 * records. If we don't read the records, we don't read and recover the nonces. Some WALs within 448 * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL 449 * recovery during normal region move, so nonces will not be transfered. We can have separate 450 * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path 451 * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a 452 * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part 453 * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't 454 * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be 455 * recovered during move. 456 */ 457 final ServerNonceManager nonceManager; 458 459 private BrokenStoreFileCleaner brokenStoreFileCleaner; 460 461 private RSMobFileCleanerChore rsMobFileCleanerChore; 462 463 @InterfaceAudience.Private 464 CompactedHFilesDischarger compactedFileDischarger; 465 466 private volatile ThroughputController flushThroughputController; 467 468 private SecureBulkLoadManager secureBulkLoadManager; 469 470 private FileSystemUtilizationChore fsUtilizationChore; 471 472 private BootstrapNodeManager bootstrapNodeManager; 473 474 /** 475 * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to 476 * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing 477 * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a 478 * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace 479 * {@link #TEST_SKIP_REPORTING_TRANSITION} ? 480 */ 481 private final boolean masterless; 482 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 483 484 /** regionserver codec list **/ 485 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 486 487 // A timer to shutdown the process if abort takes too long 488 private Timer abortMonitor; 489 490 private RegionReplicationBufferManager regionReplicationBufferManager; 491 492 /* 493 * Chore that creates replication marker rows. 494 */ 495 private ReplicationMarkerChore replicationMarkerChore; 496 497 // A timer submit requests to the PrefetchExecutor 498 private PrefetchExecutorNotifier prefetchExecutorNotifier; 499 500 /** 501 * Starts a HRegionServer at the default location. 502 * <p/> 503 * Don't start any services or managers in here in the Constructor. Defer till after we register 504 * with the Master as much as possible. See {@link #startServices}. 505 */ 506 public HRegionServer(final Configuration conf) throws IOException { 507 super(conf, "RegionServer"); // thread name 508 final Span span = TraceUtil.createSpan("HRegionServer.cxtor"); 509 try (Scope ignored = span.makeCurrent()) { 510 this.dataFsOk = true; 511 this.masterless = !clusterMode(); 512 MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf); 513 HFile.checkHFileVersion(this.conf); 514 checkCodecs(this.conf); 515 FSUtils.setupShortCircuitRead(this.conf); 516 517 // Disable usage of meta replicas in the regionserver 518 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 519 // Config'ed params 520 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 521 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 522 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 523 524 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 525 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 526 527 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 528 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 529 530 this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME, 531 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); 532 533 regionServerAccounting = new RegionServerAccounting(conf); 534 535 blockCache = BlockCacheFactory.createBlockCache(conf); 536 // The call below, instantiates the DataTieringManager only when 537 // the configuration "hbase.regionserver.datatiering.enable" is set to true. 538 DataTieringManager.instantiate(conf, onlineRegions); 539 540 mobFileCache = new MobFileCache(conf); 541 542 rsSnapshotVerifier = new RSSnapshotVerifier(conf); 543 544 uncaughtExceptionHandler = 545 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 546 547 // If no master in cluster, skip trying to track one or look for a cluster status. 548 if (!this.masterless) { 549 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 550 masterAddressTracker.start(); 551 } else { 552 masterAddressTracker = null; 553 } 554 this.rpcServices.start(zooKeeper); 555 span.setStatus(StatusCode.OK); 556 } catch (Throwable t) { 557 // Make sure we log the exception. HRegionServer is often started via reflection and the 558 // cause of failed startup is lost. 559 TraceUtil.setError(span, t); 560 LOG.error("Failed construction RegionServer", t); 561 throw t; 562 } finally { 563 span.end(); 564 } 565 } 566 567 // HMaster should override this method to load the specific config for master 568 @Override 569 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 570 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); 571 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 572 if (!StringUtils.isBlank(hostname)) { 573 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " 574 + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " 575 + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " 576 + UNSAFE_RS_HOSTNAME_KEY + " is used"; 577 throw new IOException(msg); 578 } else { 579 return DNS.getHostname(conf, DNS.ServerType.REGIONSERVER); 580 } 581 } else { 582 return hostname; 583 } 584 } 585 586 @Override 587 protected DNS.ServerType getDNSServerType() { 588 return DNS.ServerType.REGIONSERVER; 589 } 590 591 @Override 592 protected void login(UserProvider user, String host) throws IOException { 593 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 594 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 595 } 596 597 @Override 598 protected String getProcessName() { 599 return REGIONSERVER; 600 } 601 602 @Override 603 protected RegionServerCoprocessorHost getCoprocessorHost() { 604 return getRegionServerCoprocessorHost(); 605 } 606 607 @Override 608 protected boolean canCreateBaseZNode() { 609 return !clusterMode(); 610 } 611 612 @Override 613 protected boolean canUpdateTableDescriptor() { 614 return false; 615 } 616 617 @Override 618 protected boolean cacheTableDescriptor() { 619 return false; 620 } 621 622 protected RSRpcServices createRpcServices() throws IOException { 623 return new RSRpcServices(this); 624 } 625 626 @Override 627 protected void configureInfoServer(InfoServer infoServer) { 628 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 629 infoServer.setAttribute(REGIONSERVER, this); 630 } 631 632 @Override 633 protected Class<? extends HttpServlet> getDumpServlet() { 634 return RSDumpServlet.class; 635 } 636 637 /** 638 * Used by {@link RSDumpServlet} to generate debugging information. 639 */ 640 public void dumpRowLocks(final PrintWriter out) { 641 StringBuilder sb = new StringBuilder(); 642 for (HRegion region : getRegions()) { 643 if (region.getLockedRows().size() > 0) { 644 for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) { 645 sb.setLength(0); 646 sb.append(region.getTableDescriptor().getTableName()).append(",") 647 .append(region.getRegionInfo().getEncodedName()).append(","); 648 sb.append(rowLockContext.toString()); 649 out.println(sb); 650 } 651 } 652 } 653 } 654 655 @Override 656 public boolean registerService(Service instance) { 657 // No stacking of instances is allowed for a single executorService name 658 ServiceDescriptor serviceDesc = instance.getDescriptorForType(); 659 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 660 if (coprocessorServiceHandlers.containsKey(serviceName)) { 661 LOG.error("Coprocessor executorService " + serviceName 662 + " already registered, rejecting request from " + instance); 663 return false; 664 } 665 666 coprocessorServiceHandlers.put(serviceName, instance); 667 if (LOG.isDebugEnabled()) { 668 LOG.debug( 669 "Registered regionserver coprocessor executorService: executorService=" + serviceName); 670 } 671 return true; 672 } 673 674 /** 675 * Run test on configured codecs to make sure supporting libs are in place. 676 */ 677 private static void checkCodecs(final Configuration c) throws IOException { 678 // check to see if the codec list is available: 679 String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null); 680 if (codecs == null) { 681 return; 682 } 683 for (String codec : codecs) { 684 if (!CompressionTest.testCompression(codec)) { 685 throw new IOException( 686 "Compression codec " + codec + " not supported, aborting RS construction"); 687 } 688 } 689 } 690 691 public String getClusterId() { 692 return this.clusterId; 693 } 694 695 /** 696 * All initialization needed before we go register with Master.<br> 697 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 698 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 699 */ 700 private void preRegistrationInitialization() { 701 final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization"); 702 try (Scope ignored = span.makeCurrent()) { 703 initializeZooKeeper(); 704 setupClusterConnection(); 705 bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker); 706 regionReplicationBufferManager = new RegionReplicationBufferManager(this); 707 // Setup RPC client for master communication 708 this.rpcClient = asyncClusterConnection.getRpcClient(); 709 span.setStatus(StatusCode.OK); 710 } catch (Throwable t) { 711 // Call stop if error or process will stick around for ever since server 712 // puts up non-daemon threads. 713 TraceUtil.setError(span, t); 714 this.rpcServices.stop(); 715 abort("Initialization of RS failed. Hence aborting RS.", t); 716 } finally { 717 span.end(); 718 } 719 } 720 721 /** 722 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 723 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 724 * <p> 725 * Finally open long-living server short-circuit connection. 726 */ 727 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 728 justification = "cluster Id znode read would give us correct response") 729 private void initializeZooKeeper() throws IOException, InterruptedException { 730 // Nothing to do in here if no Master in the mix. 731 if (this.masterless) { 732 return; 733 } 734 735 // Create the master address tracker, register with zk, and start it. Then 736 // block until a master is available. No point in starting up if no master 737 // running. 738 blockAndCheckIfStopped(this.masterAddressTracker); 739 740 // Wait on cluster being up. Master will set this flag up in zookeeper 741 // when ready. 742 blockAndCheckIfStopped(this.clusterStatusTracker); 743 744 // If we are HMaster then the cluster id should have already been set. 745 if (clusterId == null) { 746 // Retrieve clusterId 747 // Since cluster status is now up 748 // ID should have already been set by HMaster 749 try { 750 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 751 if (clusterId == null) { 752 this.abort("Cluster ID has not been set"); 753 } 754 LOG.info("ClusterId : " + clusterId); 755 } catch (KeeperException e) { 756 this.abort("Failed to retrieve Cluster ID", e); 757 } 758 } 759 760 if (isStopped() || isAborted()) { 761 return; // No need for further initialization 762 } 763 764 // watch for snapshots and other procedures 765 try { 766 rspmHost = new RegionServerProcedureManagerHost(); 767 rspmHost.loadProcedures(conf); 768 rspmHost.initialize(this); 769 } catch (KeeperException e) { 770 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 771 } 772 } 773 774 /** 775 * Utilty method to wait indefinitely on a znode availability while checking if the region server 776 * is shut down 777 * @param tracker znode tracker to use 778 * @throws IOException any IO exception, plus if the RS is stopped 779 * @throws InterruptedException if the waiting thread is interrupted 780 */ 781 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 782 throws IOException, InterruptedException { 783 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 784 if (this.stopped) { 785 throw new IOException("Received the shutdown message while waiting."); 786 } 787 } 788 } 789 790 /** Returns True if the cluster is up. */ 791 @Override 792 public boolean isClusterUp() { 793 return this.masterless 794 || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 795 } 796 797 private void initializeReplicationMarkerChore() { 798 boolean replicationMarkerEnabled = 799 conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); 800 // If replication or replication marker is not enabled then return immediately. 801 if (replicationMarkerEnabled) { 802 int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY, 803 REPLICATION_MARKER_CHORE_DURATION_DEFAULT); 804 replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf); 805 } 806 } 807 808 @Override 809 public boolean isStopping() { 810 return stopping; 811 } 812 813 /** 814 * The HRegionServer sticks in this loop until closed. 815 */ 816 @Override 817 public void run() { 818 if (isStopped()) { 819 LOG.info("Skipping run; stopped"); 820 return; 821 } 822 try { 823 // Do pre-registration initializations; zookeeper, lease threads, etc. 824 preRegistrationInitialization(); 825 } catch (Throwable e) { 826 abort("Fatal exception during initialization", e); 827 } 828 829 try { 830 if (!isStopped() && !isAborted()) { 831 installShutdownHook(); 832 833 CoprocessorConfigurationUtil.syncReadOnlyConfigurations(conf, 834 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 835 836 // Initialize the RegionServerCoprocessorHost now that our ephemeral 837 // node was created, in case any coprocessors want to use ZooKeeper 838 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 839 840 // Try and register with the Master; tell it we are here. Break if server is stopped or 841 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 842 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 843 // come up. 844 LOG.debug("About to register with Master."); 845 TraceUtil.trace(() -> { 846 RetryCounterFactory rcf = 847 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 848 RetryCounter rc = rcf.create(); 849 while (keepLooping()) { 850 RegionServerStartupResponse w = reportForDuty(); 851 if (w == null) { 852 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 853 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 854 this.sleeper.sleep(sleepTime); 855 } else { 856 handleReportForDutyResponse(w); 857 break; 858 } 859 } 860 }, "HRegionServer.registerWithMaster"); 861 } 862 863 if (!isStopped() && isHealthy()) { 864 TraceUtil.trace(() -> { 865 // start the snapshot handler and other procedure handlers, 866 // since the server is ready to run 867 if (this.rspmHost != null) { 868 this.rspmHost.start(); 869 } 870 // Start the Quota Manager 871 if (this.rsQuotaManager != null) { 872 rsQuotaManager.start(getRpcServer().getScheduler()); 873 } 874 if (this.rsSpaceQuotaManager != null) { 875 this.rsSpaceQuotaManager.start(); 876 } 877 }, "HRegionServer.startup"); 878 } 879 880 // We registered with the Master. Go into run mode. 881 long lastMsg = EnvironmentEdgeManager.currentTime(); 882 long oldRequestCount = -1; 883 // The main run loop. 884 while (!isStopped() && isHealthy()) { 885 if (!isClusterUp()) { 886 if (onlineRegions.isEmpty()) { 887 stop("Exiting; cluster shutdown set and not carrying any regions"); 888 } else if (!this.stopping) { 889 this.stopping = true; 890 LOG.info("Closing user regions"); 891 closeUserRegions(isAborted()); 892 } else { 893 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 894 if (allUserRegionsOffline) { 895 // Set stopped if no more write requests tp meta tables 896 // since last time we went around the loop. Any open 897 // meta regions will be closed on our way out. 898 if (oldRequestCount == getWriteRequestCount()) { 899 stop("Stopped; only catalog regions remaining online"); 900 break; 901 } 902 oldRequestCount = getWriteRequestCount(); 903 } else { 904 // Make sure all regions have been closed -- some regions may 905 // have not got it because we were splitting at the time of 906 // the call to closeUserRegions. 907 closeUserRegions(this.abortRequested.get()); 908 } 909 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 910 } 911 } 912 long now = EnvironmentEdgeManager.currentTime(); 913 if ((now - lastMsg) >= msgInterval) { 914 tryRegionServerReport(lastMsg, now); 915 lastMsg = EnvironmentEdgeManager.currentTime(); 916 } 917 if (!isStopped() && !isAborted()) { 918 this.sleeper.sleep(); 919 } 920 } // for 921 } catch (Throwable t) { 922 if (!rpcServices.checkOOME(t)) { 923 String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: "; 924 abort(prefix + t.getMessage(), t); 925 } 926 } 927 928 final Span span = TraceUtil.createSpan("HRegionServer exiting main loop"); 929 try (Scope ignored = span.makeCurrent()) { 930 if (this.leaseManager != null) { 931 this.leaseManager.closeAfterLeasesExpire(); 932 } 933 if (this.splitLogWorker != null) { 934 splitLogWorker.stop(); 935 } 936 stopInfoServer(); 937 // Send cache a shutdown. 938 if (blockCache != null) { 939 blockCache.shutdown(); 940 } 941 if (mobFileCache != null) { 942 mobFileCache.shutdown(); 943 } 944 945 // Send interrupts to wake up threads if sleeping so they notice shutdown. 946 // TODO: Should we check they are alive? If OOME could have exited already 947 if (this.hMemManager != null) { 948 this.hMemManager.stop(); 949 } 950 if (this.cacheFlusher != null) { 951 this.cacheFlusher.interruptIfNecessary(); 952 } 953 if (this.compactSplitThread != null) { 954 this.compactSplitThread.interruptIfNecessary(); 955 } 956 957 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 958 if (rspmHost != null) { 959 rspmHost.stop(this.abortRequested.get() || this.killed); 960 } 961 962 if (this.killed) { 963 // Just skip out w/o closing regions. Used when testing. 964 } else if (abortRequested.get()) { 965 if (this.dataFsOk) { 966 closeUserRegions(abortRequested.get()); // Don't leave any open file handles 967 } 968 LOG.info("aborting server " + this.serverName); 969 } else { 970 closeUserRegions(abortRequested.get()); 971 LOG.info("stopping server " + this.serverName); 972 } 973 regionReplicationBufferManager.stop(); 974 closeClusterConnection(); 975 // Closing the compactSplit thread before closing meta regions 976 if (!this.killed && containsMetaTableRegions()) { 977 if (!abortRequested.get() || this.dataFsOk) { 978 if (this.compactSplitThread != null) { 979 this.compactSplitThread.join(); 980 this.compactSplitThread = null; 981 } 982 closeMetaTableRegions(abortRequested.get()); 983 } 984 } 985 986 if (!this.killed && this.dataFsOk) { 987 waitOnAllRegionsToClose(abortRequested.get()); 988 LOG.info("stopping server " + this.serverName + "; all regions closed."); 989 } 990 991 // Stop the quota manager 992 if (rsQuotaManager != null) { 993 rsQuotaManager.stop(); 994 } 995 if (rsSpaceQuotaManager != null) { 996 rsSpaceQuotaManager.stop(); 997 rsSpaceQuotaManager = null; 998 } 999 1000 // flag may be changed when closing regions throws exception. 1001 if (this.dataFsOk) { 1002 shutdownWAL(!abortRequested.get()); 1003 } 1004 1005 // Make sure the proxy is down. 1006 if (this.rssStub != null) { 1007 this.rssStub = null; 1008 } 1009 if (this.lockStub != null) { 1010 this.lockStub = null; 1011 } 1012 if (this.rpcClient != null) { 1013 this.rpcClient.close(); 1014 } 1015 if (this.leaseManager != null) { 1016 this.leaseManager.close(); 1017 } 1018 if (this.pauseMonitor != null) { 1019 this.pauseMonitor.stop(); 1020 } 1021 1022 if (!killed) { 1023 stopServiceThreads(); 1024 } 1025 1026 if (this.rpcServices != null) { 1027 this.rpcServices.stop(); 1028 } 1029 1030 try { 1031 deleteMyEphemeralNode(); 1032 } catch (KeeperException.NoNodeException nn) { 1033 // pass 1034 } catch (KeeperException e) { 1035 LOG.warn("Failed deleting my ephemeral node", e); 1036 } 1037 // We may have failed to delete the znode at the previous step, but 1038 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1039 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1040 1041 closeZooKeeper(); 1042 closeTableDescriptors(); 1043 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1044 span.setStatus(StatusCode.OK); 1045 } finally { 1046 span.end(); 1047 } 1048 } 1049 1050 private boolean containsMetaTableRegions() { 1051 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1052 } 1053 1054 private boolean areAllUserRegionsOffline() { 1055 if (getNumberOfOnlineRegions() > 2) { 1056 return false; 1057 } 1058 boolean allUserRegionsOffline = true; 1059 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1060 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1061 allUserRegionsOffline = false; 1062 break; 1063 } 1064 } 1065 return allUserRegionsOffline; 1066 } 1067 1068 /** Returns Current write count for all online regions. */ 1069 private long getWriteRequestCount() { 1070 long writeCount = 0; 1071 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1072 writeCount += e.getValue().getWriteRequestsCount(); 1073 } 1074 return writeCount; 1075 } 1076 1077 @InterfaceAudience.Private 1078 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1079 throws IOException { 1080 RegionServerStatusService.BlockingInterface rss = rssStub; 1081 if (rss == null) { 1082 // the current server could be stopping. 1083 return; 1084 } 1085 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1086 final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport"); 1087 try (Scope ignored = span.makeCurrent()) { 1088 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1089 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1090 request.setLoad(sl); 1091 rss.regionServerReport(null, request.build()); 1092 span.setStatus(StatusCode.OK); 1093 } catch (ServiceException se) { 1094 IOException ioe = ProtobufUtil.getRemoteException(se); 1095 if (ioe instanceof YouAreDeadException) { 1096 // This will be caught and handled as a fatal error in run() 1097 TraceUtil.setError(span, ioe); 1098 throw ioe; 1099 } 1100 if (rssStub == rss) { 1101 rssStub = null; 1102 } 1103 TraceUtil.setError(span, se); 1104 // Couldn't connect to the master, get location from zk and reconnect 1105 // Method blocks until new master is found or we are stopped 1106 createRegionServerStatusStub(true); 1107 } finally { 1108 span.end(); 1109 } 1110 } 1111 1112 /** 1113 * Reports the given map of Regions and their size on the filesystem to the active Master. 1114 * @param regionSizeStore The store containing region sizes 1115 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1116 */ 1117 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1118 RegionServerStatusService.BlockingInterface rss = rssStub; 1119 if (rss == null) { 1120 // the current server could be stopping. 1121 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1122 return true; 1123 } 1124 try { 1125 buildReportAndSend(rss, regionSizeStore); 1126 } catch (ServiceException se) { 1127 IOException ioe = ProtobufUtil.getRemoteException(se); 1128 if (ioe instanceof PleaseHoldException) { 1129 LOG.trace("Failed to report region sizes to Master because it is initializing." 1130 + " This will be retried.", ioe); 1131 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1132 return true; 1133 } 1134 if (rssStub == rss) { 1135 rssStub = null; 1136 } 1137 createRegionServerStatusStub(true); 1138 if (ioe instanceof DoNotRetryIOException) { 1139 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1140 if (doNotRetryEx.getCause() != null) { 1141 Throwable t = doNotRetryEx.getCause(); 1142 if (t instanceof UnsupportedOperationException) { 1143 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1144 return false; 1145 } 1146 } 1147 } 1148 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1149 } 1150 return true; 1151 } 1152 1153 /** 1154 * Builds the region size report and sends it to the master. Upon successful sending of the 1155 * report, the region sizes that were sent are marked as sent. 1156 * @param rss The stub to send to the Master 1157 * @param regionSizeStore The store containing region sizes 1158 */ 1159 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1160 RegionSizeStore regionSizeStore) throws ServiceException { 1161 RegionSpaceUseReportRequest request = 1162 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1163 rss.reportRegionSpaceUse(null, request); 1164 // Record the number of size reports sent 1165 if (metricsRegionServer != null) { 1166 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1167 } 1168 } 1169 1170 /** 1171 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1172 * @param regionSizes The size in bytes of regions 1173 * @return The corresponding protocol buffer message. 1174 */ 1175 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1176 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1177 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1178 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1179 } 1180 return request.build(); 1181 } 1182 1183 /** 1184 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf 1185 * message. 1186 * @param regionInfo The RegionInfo 1187 * @param sizeInBytes The size in bytes of the Region 1188 * @return The protocol buffer 1189 */ 1190 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1191 return RegionSpaceUse.newBuilder() 1192 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1193 .setRegionSize(Objects.requireNonNull(sizeInBytes)).build(); 1194 } 1195 1196 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1197 throws IOException { 1198 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1199 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1200 // the wrapper to compute those numbers in one place. 1201 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1202 // Instead they should be stored in an HBase table so that external visibility into HBase is 1203 // improved; Additionally the load balancer will be able to take advantage of a more complete 1204 // history. 1205 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1206 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1207 long usedMemory = -1L; 1208 long maxMemory = -1L; 1209 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1210 if (usage != null) { 1211 usedMemory = usage.getUsed(); 1212 maxMemory = usage.getMax(); 1213 } 1214 1215 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1216 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1217 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1218 serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024)); 1219 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1220 serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount()); 1221 serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount()); 1222 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1223 Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder(); 1224 for (String coprocessor : coprocessors) { 1225 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1226 } 1227 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1228 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1229 for (HRegion region : regions) { 1230 if (region.getCoprocessorHost() != null) { 1231 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1232 for (String regionCoprocessor : regionCoprocessors) { 1233 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1234 } 1235 } 1236 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1237 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1238 .getCoprocessors()) { 1239 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1240 } 1241 } 1242 1243 getBlockCache().ifPresent(cache -> { 1244 cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> { 1245 regionCachedInfo.forEach((regionName, prefetchSize) -> { 1246 serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB)); 1247 }); 1248 }); 1249 }); 1250 serverLoad.setCacheFreeSize(regionServerWrapper.getBlockCacheFreeSize()); 1251 if (DataTieringManager.getInstance() != null) { 1252 DataTieringManager.getInstance().getRegionColdDataSize() 1253 .forEach((regionName, coldDataSize) -> serverLoad.putRegionColdData(regionName, 1254 roundSize(coldDataSize.getSecond(), unitMB))); 1255 } 1256 serverLoad.setReportStartTime(reportStartTime); 1257 serverLoad.setReportEndTime(reportEndTime); 1258 if (this.infoServer != null) { 1259 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1260 } else { 1261 serverLoad.setInfoServerPort(-1); 1262 } 1263 MetricsUserAggregateSource userSource = 1264 metricsRegionServer.getMetricsUserAggregate().getSource(); 1265 if (userSource != null) { 1266 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1267 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1268 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1269 } 1270 } 1271 1272 if (sameReplicationSourceAndSink && replicationSourceHandler != null) { 1273 // always refresh first to get the latest value 1274 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1275 if (rLoad != null) { 1276 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1277 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1278 .getReplicationLoadSourceEntries()) { 1279 serverLoad.addReplLoadSource(rLS); 1280 } 1281 } 1282 } else { 1283 if (replicationSourceHandler != null) { 1284 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); 1285 if (rLoad != null) { 1286 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1287 .getReplicationLoadSourceEntries()) { 1288 serverLoad.addReplLoadSource(rLS); 1289 } 1290 } 1291 } 1292 if (replicationSinkHandler != null) { 1293 ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad(); 1294 if (rLoad != null) { 1295 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1296 } 1297 } 1298 } 1299 1300 TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask 1301 .newBuilder().setDescription(task.getDescription()) 1302 .setStatus(task.getStatus() != null ? task.getStatus() : "") 1303 .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) 1304 .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build())); 1305 1306 return serverLoad.build(); 1307 } 1308 1309 private String getOnlineRegionsAsPrintableString() { 1310 StringBuilder sb = new StringBuilder(); 1311 for (Region r : this.onlineRegions.values()) { 1312 if (sb.length() > 0) { 1313 sb.append(", "); 1314 } 1315 sb.append(r.getRegionInfo().getEncodedName()); 1316 } 1317 return sb.toString(); 1318 } 1319 1320 /** 1321 * Wait on regions close. 1322 */ 1323 private void waitOnAllRegionsToClose(final boolean abort) { 1324 // Wait till all regions are closed before going out. 1325 int lastCount = -1; 1326 long previousLogTime = 0; 1327 Set<String> closedRegions = new HashSet<>(); 1328 boolean interrupted = false; 1329 try { 1330 while (!onlineRegions.isEmpty()) { 1331 int count = getNumberOfOnlineRegions(); 1332 // Only print a message if the count of regions has changed. 1333 if (count != lastCount) { 1334 // Log every second at most 1335 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 1336 previousLogTime = EnvironmentEdgeManager.currentTime(); 1337 lastCount = count; 1338 LOG.info("Waiting on " + count + " regions to close"); 1339 // Only print out regions still closing if a small number else will 1340 // swamp the log. 1341 if (count < 10 && LOG.isDebugEnabled()) { 1342 LOG.debug("Online Regions=" + this.onlineRegions); 1343 } 1344 } 1345 } 1346 // Ensure all user regions have been sent a close. Use this to 1347 // protect against the case where an open comes in after we start the 1348 // iterator of onlineRegions to close all user regions. 1349 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1350 RegionInfo hri = e.getValue().getRegionInfo(); 1351 if ( 1352 !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) 1353 && !closedRegions.contains(hri.getEncodedName()) 1354 ) { 1355 closedRegions.add(hri.getEncodedName()); 1356 // Don't update zk with this close transition; pass false. 1357 closeRegionIgnoreErrors(hri, abort); 1358 } 1359 } 1360 // No regions in RIT, we could stop waiting now. 1361 if (this.regionsInTransitionInRS.isEmpty()) { 1362 if (!onlineRegions.isEmpty()) { 1363 LOG.info("We were exiting though online regions are not empty," 1364 + " because some regions failed closing"); 1365 } 1366 break; 1367 } else { 1368 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream() 1369 .map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1370 } 1371 if (sleepInterrupted(200)) { 1372 interrupted = true; 1373 } 1374 } 1375 } finally { 1376 if (interrupted) { 1377 Thread.currentThread().interrupt(); 1378 } 1379 } 1380 } 1381 1382 private static boolean sleepInterrupted(long millis) { 1383 boolean interrupted = false; 1384 try { 1385 Thread.sleep(millis); 1386 } catch (InterruptedException e) { 1387 LOG.warn("Interrupted while sleeping"); 1388 interrupted = true; 1389 } 1390 return interrupted; 1391 } 1392 1393 private void shutdownWAL(final boolean close) { 1394 if (this.walFactory != null) { 1395 try { 1396 if (close) { 1397 walFactory.close(); 1398 } else { 1399 walFactory.shutdown(); 1400 } 1401 } catch (Throwable e) { 1402 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1403 LOG.error("Shutdown / close of WAL failed: " + e); 1404 LOG.debug("Shutdown / close exception details:", e); 1405 } 1406 } 1407 } 1408 1409 /** 1410 * Run init. Sets up wal and starts up all server threads. 1411 * @param c Extra configuration. 1412 */ 1413 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1414 throws IOException { 1415 try { 1416 boolean updateRootDir = false; 1417 for (NameStringPair e : c.getMapEntriesList()) { 1418 String key = e.getName(); 1419 // The hostname the master sees us as. 1420 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1421 String hostnameFromMasterPOV = e.getValue(); 1422 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, 1423 rpcServices.getSocketAddress().getPort(), this.startcode); 1424 String expectedHostName = rpcServices.getSocketAddress().getHostName(); 1425 // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it 1426 // is set to disable. so we will use the ip of the RegionServer to compare with the 1427 // hostname passed by the Master, see HBASE-27304 for details. 1428 if ( 1429 StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent() 1430 && InetAddresses.isInetAddress(getActiveMaster().get().getHostname()) 1431 ) { 1432 expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress(); 1433 } 1434 boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead) 1435 ? hostnameFromMasterPOV.equals(expectedHostName) 1436 : hostnameFromMasterPOV.equals(useThisHostnameInstead); 1437 if (!isHostnameConsist) { 1438 String msg = "Master passed us a different hostname to use; was=" 1439 + (StringUtils.isBlank(useThisHostnameInstead) 1440 ? expectedHostName 1441 : this.useThisHostnameInstead) 1442 + ", but now=" + hostnameFromMasterPOV; 1443 LOG.error(msg); 1444 throw new IOException(msg); 1445 } 1446 continue; 1447 } 1448 1449 String value = e.getValue(); 1450 if (key.equals(HConstants.HBASE_DIR)) { 1451 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1452 updateRootDir = true; 1453 } 1454 } 1455 1456 if (LOG.isDebugEnabled()) { 1457 LOG.debug("Config from master: " + key + "=" + value); 1458 } 1459 this.conf.set(key, value); 1460 } 1461 // Set our ephemeral znode up in zookeeper now we have a name. 1462 createMyEphemeralNode(); 1463 1464 if (updateRootDir) { 1465 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1466 initializeFileSystem(); 1467 } 1468 1469 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1470 // config param for task trackers, but we can piggyback off of it. 1471 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1472 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1473 } 1474 1475 // Save it in a file, this will allow to see if we crash 1476 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1477 1478 // This call sets up an initialized replication and WAL. Later we start it up. 1479 setupWALAndReplication(); 1480 // Init in here rather than in constructor after thread name has been set 1481 final MetricsTable metricsTable = 1482 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1483 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1484 this.metricsRegionServer = 1485 new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable); 1486 // Now that we have a metrics source, start the pause monitor 1487 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1488 pauseMonitor.start(); 1489 1490 // There is a rare case where we do NOT want services to start. Check config. 1491 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1492 startServices(); 1493 } 1494 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1495 // or make sense of it. 1496 startReplicationService(); 1497 1498 // Set up ZK 1499 LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress() 1500 + ", sessionid=0x" 1501 + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1502 1503 // Wake up anyone waiting for this server to online 1504 synchronized (online) { 1505 online.set(true); 1506 online.notifyAll(); 1507 } 1508 } catch (Throwable e) { 1509 stop("Failed initialization"); 1510 throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed"); 1511 } finally { 1512 sleeper.skipSleepCycle(); 1513 } 1514 } 1515 1516 private void startHeapMemoryManager() { 1517 if (this.blockCache != null) { 1518 this.hMemManager = 1519 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1520 this.hMemManager.start(getChoreService()); 1521 } 1522 } 1523 1524 private void createMyEphemeralNode() throws KeeperException { 1525 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1526 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1527 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1528 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1529 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1530 } 1531 1532 private void deleteMyEphemeralNode() throws KeeperException { 1533 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1534 } 1535 1536 @Override 1537 public RegionServerAccounting getRegionServerAccounting() { 1538 return regionServerAccounting; 1539 } 1540 1541 // Round the size with KB or MB. 1542 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1 1543 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See 1544 // HBASE-26340 for more details on why this is important. 1545 private static int roundSize(long sizeInByte, int sizeUnit) { 1546 if (sizeInByte == 0) { 1547 return 0; 1548 } else if (sizeInByte < sizeUnit) { 1549 return 1; 1550 } else { 1551 return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE); 1552 } 1553 } 1554 1555 /** 1556 * @param r Region to get RegionLoad for. 1557 * @param regionLoadBldr the RegionLoad.Builder, can be null 1558 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1559 * @return RegionLoad instance. 1560 */ 1561 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1562 RegionSpecifier.Builder regionSpecifier) throws IOException { 1563 byte[] name = r.getRegionInfo().getRegionName(); 1564 String regionEncodedName = r.getRegionInfo().getEncodedName(); 1565 int stores = 0; 1566 int storefiles = 0; 1567 int storeRefCount = 0; 1568 int maxCompactedStoreFileRefCount = 0; 1569 long storeUncompressedSize = 0L; 1570 long storefileSize = 0L; 1571 long storefileIndexSize = 0L; 1572 long rootLevelIndexSize = 0L; 1573 long totalStaticIndexSize = 0L; 1574 long totalStaticBloomSize = 0L; 1575 long totalCompactingKVs = 0L; 1576 long currentCompactedKVs = 0L; 1577 long totalRegionSize = 0L; 1578 List<HStore> storeList = r.getStores(); 1579 stores += storeList.size(); 1580 for (HStore store : storeList) { 1581 storefiles += store.getStorefilesCount(); 1582 int currentStoreRefCount = store.getStoreRefCount(); 1583 storeRefCount += currentStoreRefCount; 1584 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1585 maxCompactedStoreFileRefCount = 1586 Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); 1587 storeUncompressedSize += store.getStoreSizeUncompressed(); 1588 storefileSize += store.getStorefilesSize(); 1589 totalRegionSize += store.getHFilesSize(); 1590 // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1591 storefileIndexSize += store.getStorefilesRootLevelIndexSize(); 1592 CompactionProgress progress = store.getCompactionProgress(); 1593 if (progress != null) { 1594 totalCompactingKVs += progress.getTotalCompactingKVs(); 1595 currentCompactedKVs += progress.currentCompactedKVs; 1596 } 1597 rootLevelIndexSize += store.getStorefilesRootLevelIndexSize(); 1598 totalStaticIndexSize += store.getTotalStaticIndexSize(); 1599 totalStaticBloomSize += store.getTotalStaticBloomSize(); 1600 } 1601 1602 int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); 1603 int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); 1604 int storefileSizeMB = roundSize(storefileSize, unitMB); 1605 int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB); 1606 int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); 1607 int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); 1608 int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); 1609 int regionSizeMB = roundSize(totalRegionSize, unitMB); 1610 final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f); 1611 getBlockCache().ifPresent(bc -> { 1612 bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> { 1613 if (regionCachedInfo.containsKey(regionEncodedName)) { 1614 currentRegionCachedRatio.setValue(regionSizeMB == 0 1615 ? 0.0f 1616 : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB); 1617 } 1618 }); 1619 }); 1620 final MutableFloat currentRegionColdDataRatio = new MutableFloat(0.0f); 1621 if (DataTieringManager.getInstance() != null) { 1622 DataTieringManager.getInstance().getRegionColdDataSize().computeIfPresent(regionEncodedName, 1623 (k, v) -> { 1624 int coldSizeMB = roundSize(v.getSecond(), unitMB); 1625 currentRegionColdDataRatio 1626 .setValue(regionSizeMB == 0 ? 0.0f : (float) coldSizeMB / regionSizeMB); 1627 return v; 1628 }); 1629 } 1630 1631 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); 1632 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); 1633 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname()); 1634 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); 1635 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); 1636 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); 1637 if (regionLoadBldr == null) { 1638 regionLoadBldr = RegionLoad.newBuilder(); 1639 } 1640 if (regionSpecifier == null) { 1641 regionSpecifier = RegionSpecifier.newBuilder(); 1642 } 1643 1644 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1645 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1646 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores) 1647 .setStorefiles(storefiles).setStoreRefCount(storeRefCount) 1648 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1649 .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB) 1650 .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB) 1651 .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1652 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1653 .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount()) 1654 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1655 .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs) 1656 .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality) 1657 .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) 1658 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) 1659 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) 1660 .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB) 1661 .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue()) 1662 .setCurrentRegionColdDataRatio(currentRegionColdDataRatio.floatValue()); 1663 r.setCompleteSequenceId(regionLoadBldr); 1664 return regionLoadBldr.build(); 1665 } 1666 1667 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1668 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1669 userLoadBldr.setUserName(user); 1670 userSource.getClientMetrics().values().stream() 1671 .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1672 .setHostName(clientMetrics.getHostName()) 1673 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1674 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1675 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) 1676 .forEach(userLoadBldr::addClientMetrics); 1677 return userLoadBldr.build(); 1678 } 1679 1680 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1681 HRegion r = onlineRegions.get(encodedRegionName); 1682 return r != null ? createRegionLoad(r, null, null) : null; 1683 } 1684 1685 /** 1686 * Inner class that runs on a long period checking if regions need compaction. 1687 */ 1688 private static class CompactionChecker extends ScheduledChore { 1689 private final HRegionServer instance; 1690 private final int majorCompactPriority; 1691 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1692 // Iteration is 1-based rather than 0-based so we don't check for compaction 1693 // immediately upon region server startup 1694 private long iteration = 1; 1695 1696 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1697 super("CompactionChecker", stopper, sleepTime); 1698 this.instance = h; 1699 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1700 1701 /* 1702 * MajorCompactPriority is configurable. If not set, the compaction will use default priority. 1703 */ 1704 this.majorCompactPriority = this.instance.conf 1705 .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); 1706 } 1707 1708 @Override 1709 protected void chore() { 1710 for (HRegion hr : this.instance.onlineRegions.values()) { 1711 // If region is read only or compaction is disabled at table level, there's no need to 1712 // iterate through region's stores 1713 if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) { 1714 continue; 1715 } 1716 1717 for (HStore s : hr.stores.values()) { 1718 try { 1719 long multiplier = s.getCompactionCheckMultiplier(); 1720 assert multiplier > 0; 1721 if (iteration % multiplier != 0) { 1722 continue; 1723 } 1724 if (s.needsCompaction()) { 1725 // Queue a compaction. Will recognize if major is needed. 1726 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 1727 getName() + " requests compaction"); 1728 } else if (s.shouldPerformMajorCompaction()) { 1729 s.triggerMajorCompaction(); 1730 if ( 1731 majorCompactPriority == DEFAULT_PRIORITY 1732 || majorCompactPriority > hr.getCompactPriority() 1733 ) { 1734 this.instance.compactSplitThread.requestCompaction(hr, s, 1735 getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, 1736 CompactionLifeCycleTracker.DUMMY, null); 1737 } else { 1738 this.instance.compactSplitThread.requestCompaction(hr, s, 1739 getName() + " requests major compaction; use configured priority", 1740 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 1741 } 1742 } 1743 } catch (IOException e) { 1744 LOG.warn("Failed major compaction check on " + hr, e); 1745 } 1746 } 1747 } 1748 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 1749 } 1750 } 1751 1752 private static class PeriodicMemStoreFlusher extends ScheduledChore { 1753 private final HRegionServer server; 1754 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 1755 private final static int MIN_DELAY_TIME = 0; // millisec 1756 private final long rangeOfDelayMs; 1757 1758 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 1759 super("MemstoreFlusherChore", server, cacheFlushInterval); 1760 this.server = server; 1761 1762 final long configuredRangeOfDelay = server.getConfiguration() 1763 .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 1764 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 1765 } 1766 1767 @Override 1768 protected void chore() { 1769 final StringBuilder whyFlush = new StringBuilder(); 1770 for (HRegion r : this.server.onlineRegions.values()) { 1771 if (r == null) { 1772 continue; 1773 } 1774 if (r.shouldFlush(whyFlush)) { 1775 FlushRequester requester = server.getFlushRequester(); 1776 if (requester != null) { 1777 long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME; 1778 // Throttle the flushes by putting a delay. If we don't throttle, and there 1779 // is a balanced write-load on the regions in a table, we might end up 1780 // overwhelming the filesystem with too many flushes at once. 1781 if (requester.requestDelayedFlush(r, delay)) { 1782 LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(), 1783 r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay); 1784 } 1785 } 1786 } 1787 } 1788 } 1789 } 1790 1791 /** 1792 * Report the status of the server. A server is online once all the startup is completed (setting 1793 * up filesystem, starting executorService threads, etc.). This method is designed mostly to be 1794 * useful in tests. 1795 * @return true if online, false if not. 1796 */ 1797 public boolean isOnline() { 1798 return online.get(); 1799 } 1800 1801 /** 1802 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 1803 * be hooked up to WAL. 1804 */ 1805 private void setupWALAndReplication() throws IOException { 1806 WALFactory factory = new WALFactory(conf, serverName, this); 1807 // TODO Replication make assumptions here based on the default filesystem impl 1808 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 1809 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 1810 1811 Path logDir = new Path(walRootDir, logName); 1812 LOG.debug("logDir={}", logDir); 1813 if (this.walFs.exists(logDir)) { 1814 throw new RegionServerRunningException( 1815 "Region server has already created directory at " + this.serverName.toString()); 1816 } 1817 // Create wal directory here and we will never create it again in other places. This is 1818 // important to make sure that our fencing way takes effect. See HBASE-29797 for more details. 1819 if (!this.walFs.mkdirs(logDir)) { 1820 throw new IOException("Can not create wal directory " + logDir); 1821 } 1822 // Instantiate replication if replication enabled. Pass it the log directories. 1823 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); 1824 1825 WALActionsListener walEventListener = getWALEventTrackerListener(conf); 1826 if (walEventListener != null && factory.getWALProvider() != null) { 1827 factory.getWALProvider().addWALActionsListener(walEventListener); 1828 } 1829 this.walFactory = factory; 1830 } 1831 1832 private WALActionsListener getWALEventTrackerListener(Configuration conf) { 1833 if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) { 1834 WALEventTrackerListener listener = 1835 new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName()); 1836 return listener; 1837 } 1838 return null; 1839 } 1840 1841 /** 1842 * Start up replication source and sink handlers. 1843 */ 1844 private void startReplicationService() throws IOException { 1845 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 1846 this.replicationSourceHandler.startReplicationService(); 1847 } else { 1848 if (this.replicationSourceHandler != null) { 1849 this.replicationSourceHandler.startReplicationService(); 1850 } 1851 if (this.replicationSinkHandler != null) { 1852 this.replicationSinkHandler.startReplicationService(); 1853 } 1854 } 1855 } 1856 1857 /** Returns Master address tracker instance. */ 1858 public MasterAddressTracker getMasterAddressTracker() { 1859 return this.masterAddressTracker; 1860 } 1861 1862 /** 1863 * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need 1864 * to run. This is called after we've successfully registered with the Master. Install an 1865 * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We 1866 * cannot set the handler on all threads. Server's internal Listener thread is off limits. For 1867 * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries 1868 * to run should trigger same critical condition and the shutdown will run. On its way out, this 1869 * server will shut down Server. Leases are sort of inbetween. It has an internal thread that 1870 * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped 1871 * by this hosting server. Worker logs the exception and exits. 1872 */ 1873 private void startServices() throws IOException { 1874 if (!isStopped() && !isAborted()) { 1875 initializeThreads(); 1876 } 1877 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection); 1878 this.secureBulkLoadManager.start(); 1879 1880 // Health checker thread. 1881 if (isHealthCheckerConfigured()) { 1882 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 1883 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1884 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 1885 } 1886 // Executor status collect thread. 1887 if ( 1888 this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED, 1889 HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED) 1890 ) { 1891 int sleepTime = 1892 this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ); 1893 executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(), 1894 this.metricsRegionServer.getMetricsSource()); 1895 } 1896 1897 this.walRoller = new LogRoller(this); 1898 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 1899 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 1900 1901 // Create the CompactedFileDischarger chore executorService. This chore helps to 1902 // remove the compacted files that will no longer be used in reads. 1903 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 1904 // 2 mins so that compacted files can be archived before the TTLCleaner runs 1905 int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 1906 this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this); 1907 choreService.scheduleChore(compactedFileDischarger); 1908 1909 // Start executor services 1910 final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3); 1911 executorService.startExecutorService(executorService.new ExecutorConfig() 1912 .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads)); 1913 final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1); 1914 executorService.startExecutorService(executorService.new ExecutorConfig() 1915 .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads)); 1916 final int openPriorityRegionThreads = 1917 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3); 1918 executorService.startExecutorService( 1919 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION) 1920 .setCorePoolSize(openPriorityRegionThreads)); 1921 final int closeRegionThreads = 1922 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3); 1923 executorService.startExecutorService(executorService.new ExecutorConfig() 1924 .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads)); 1925 final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1); 1926 executorService.startExecutorService(executorService.new ExecutorConfig() 1927 .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads)); 1928 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 1929 final int storeScannerParallelSeekThreads = 1930 conf.getInt("hbase.storescanner.parallel.seek.threads", 10); 1931 executorService.startExecutorService( 1932 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK) 1933 .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true)); 1934 } 1935 final int logReplayOpsThreads = 1936 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 1937 executorService.startExecutorService( 1938 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS) 1939 .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true)); 1940 // Start the threads for compacted files discharger 1941 final int compactionDischargerThreads = 1942 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10); 1943 executorService.startExecutorService(executorService.new ExecutorConfig() 1944 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER) 1945 .setCorePoolSize(compactionDischargerThreads)); 1946 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 1947 final int regionReplicaFlushThreads = 1948 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 1949 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 1950 executorService.startExecutorService(executorService.new ExecutorConfig() 1951 .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS) 1952 .setCorePoolSize(regionReplicaFlushThreads)); 1953 } 1954 final int refreshPeerThreads = 1955 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2); 1956 executorService.startExecutorService(executorService.new ExecutorConfig() 1957 .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads)); 1958 final int replaySyncReplicationWALThreads = 1959 conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1); 1960 executorService.startExecutorService(executorService.new ExecutorConfig() 1961 .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL) 1962 .setCorePoolSize(replaySyncReplicationWALThreads)); 1963 final int switchRpcThrottleThreads = 1964 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); 1965 executorService.startExecutorService( 1966 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE) 1967 .setCorePoolSize(switchRpcThrottleThreads)); 1968 final int claimReplicationQueueThreads = 1969 conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); 1970 executorService.startExecutorService( 1971 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE) 1972 .setCorePoolSize(claimReplicationQueueThreads)); 1973 final int rsSnapshotOperationThreads = 1974 conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); 1975 executorService.startExecutorService( 1976 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) 1977 .setCorePoolSize(rsSnapshotOperationThreads)); 1978 final int rsFlushOperationThreads = 1979 conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3); 1980 executorService.startExecutorService(executorService.new ExecutorConfig() 1981 .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads)); 1982 final int rsRefreshQuotasThreads = 1983 conf.getInt("hbase.regionserver.executor.refresh.quotas.threads", 1); 1984 executorService.startExecutorService( 1985 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS) 1986 .setCorePoolSize(rsRefreshQuotasThreads)); 1987 final int logRollThreads = conf.getInt("hbase.regionserver.executor.log.roll.threads", 1); 1988 executorService.startExecutorService(executorService.new ExecutorConfig() 1989 .setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads)); 1990 final int rsRefreshHFilesThreads = 1991 conf.getInt("hbase.regionserver.executor.refresh.hfiles.threads", 3); 1992 executorService.startExecutorService(executorService.new ExecutorConfig() 1993 .setExecutorType(ExecutorType.RS_REFRESH_HFILES).setCorePoolSize(rsRefreshHFilesThreads)); 1994 1995 Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", 1996 uncaughtExceptionHandler); 1997 if (this.cacheFlusher != null) { 1998 this.cacheFlusher.start(uncaughtExceptionHandler); 1999 } 2000 Threads.setDaemonThreadRunning(this.procedureResultReporter, 2001 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 2002 2003 if (this.compactionChecker != null) { 2004 choreService.scheduleChore(compactionChecker); 2005 } 2006 if (this.periodicFlusher != null) { 2007 choreService.scheduleChore(periodicFlusher); 2008 } 2009 if (this.healthCheckChore != null) { 2010 choreService.scheduleChore(healthCheckChore); 2011 } 2012 if (this.executorStatusChore != null) { 2013 choreService.scheduleChore(executorStatusChore); 2014 } 2015 if (this.nonceManagerChore != null) { 2016 choreService.scheduleChore(nonceManagerChore); 2017 } 2018 if (this.storefileRefresher != null) { 2019 choreService.scheduleChore(storefileRefresher); 2020 } 2021 if (this.fsUtilizationChore != null) { 2022 choreService.scheduleChore(fsUtilizationChore); 2023 } 2024 if (this.namedQueueServiceChore != null) { 2025 choreService.scheduleChore(namedQueueServiceChore); 2026 } 2027 if (this.brokenStoreFileCleaner != null) { 2028 choreService.scheduleChore(brokenStoreFileCleaner); 2029 } 2030 if (this.rsMobFileCleanerChore != null) { 2031 choreService.scheduleChore(rsMobFileCleanerChore); 2032 } 2033 if (replicationMarkerChore != null) { 2034 LOG.info("Starting replication marker chore"); 2035 choreService.scheduleChore(replicationMarkerChore); 2036 } 2037 2038 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 2039 // an unhandled exception, it will just exit. 2040 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 2041 uncaughtExceptionHandler); 2042 2043 // Create the log splitting worker and start it 2044 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 2045 // quite a while inside Connection layer. The worker won't be available for other 2046 // tasks even after current task is preempted after a split task times out. 2047 Configuration sinkConf = HBaseConfiguration.create(conf); 2048 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2049 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 2050 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2051 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 2052 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 2053 if ( 2054 this.csm != null 2055 && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 2056 ) { 2057 // SplitLogWorker needs csm. If none, don't start this. 2058 this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); 2059 splitLogWorker.start(); 2060 LOG.debug("SplitLogWorker started"); 2061 } 2062 2063 // Memstore services. 2064 startHeapMemoryManager(); 2065 // Call it after starting HeapMemoryManager. 2066 initializeMemStoreChunkCreator(hMemManager); 2067 } 2068 2069 private void initializeThreads() { 2070 // Cache flushing thread. 2071 this.cacheFlusher = new MemStoreFlusher(conf, this); 2072 2073 // Compaction thread 2074 this.compactSplitThread = new CompactSplit(this); 2075 2076 // Prefetch Notifier 2077 this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 2078 2079 // Background thread to check for compactions; needed if region has not gotten updates 2080 // in a while. It will take care of not checking too frequently on store-by-store basis. 2081 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 2082 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 2083 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 2084 2085 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 2086 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 2087 final boolean walEventTrackerEnabled = 2088 conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); 2089 2090 if (isSlowLogTableEnabled || walEventTrackerEnabled) { 2091 // default chore duration: 10 min 2092 // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property 2093 final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY, 2094 DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION); 2095 2096 final int namedQueueChoreDuration = 2097 conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT); 2098 // Considering min of slowLogChoreDuration and namedQueueChoreDuration 2099 int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration); 2100 2101 namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration, 2102 this.namedQueueRecorder, this.getConnection()); 2103 } 2104 2105 if (this.nonceManager != null) { 2106 // Create the scheduled chore that cleans up nonces. 2107 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2108 } 2109 2110 // Setup the Quota Manager 2111 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2112 configurationManager.registerObserver(rsQuotaManager); 2113 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2114 2115 if (QuotaUtil.isQuotaEnabled(conf)) { 2116 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2117 } 2118 2119 boolean onlyMetaRefresh = false; 2120 int storefileRefreshPeriod = 2121 conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2122 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2123 if (storefileRefreshPeriod == 0) { 2124 storefileRefreshPeriod = 2125 conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2126 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2127 onlyMetaRefresh = true; 2128 } 2129 if (storefileRefreshPeriod > 0) { 2130 this.storefileRefresher = 2131 new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); 2132 } 2133 2134 int brokenStoreFileCleanerPeriod = 2135 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, 2136 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); 2137 int brokenStoreFileCleanerDelay = 2138 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, 2139 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); 2140 double brokenStoreFileCleanerDelayJitter = 2141 conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, 2142 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); 2143 double jitterRate = 2144 (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; 2145 long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); 2146 this.brokenStoreFileCleaner = 2147 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), 2148 brokenStoreFileCleanerPeriod, this, conf, this); 2149 2150 this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); 2151 2152 registerConfigurationObservers(); 2153 initializeReplicationMarkerChore(); 2154 } 2155 2156 private void registerConfigurationObservers() { 2157 // Register Replication if possible, as now we support recreating replication peer storage, for 2158 // migrating across different replication peer storages online 2159 if (replicationSourceHandler instanceof ConfigurationObserver) { 2160 configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler); 2161 } 2162 if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) { 2163 configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler); 2164 } 2165 // Registering the compactSplitThread object with the ConfigurationManager. 2166 configurationManager.registerObserver(this.compactSplitThread); 2167 configurationManager.registerObserver(this.cacheFlusher); 2168 configurationManager.registerObserver(this.rpcServices); 2169 configurationManager.registerObserver(this.prefetchExecutorNotifier); 2170 configurationManager.registerObserver(this); 2171 } 2172 2173 /* 2174 * Verify that server is healthy 2175 */ 2176 private boolean isHealthy() { 2177 if (!dataFsOk) { 2178 // File system problem 2179 return false; 2180 } 2181 // Verify that all threads are alive 2182 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2183 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2184 && (this.walRoller == null || this.walRoller.isAlive()) 2185 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2186 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2187 if (!healthy) { 2188 stop("One or more threads are no longer alive -- stop"); 2189 } 2190 return healthy; 2191 } 2192 2193 @Override 2194 public List<WAL> getWALs() { 2195 return walFactory.getWALs(); 2196 } 2197 2198 @Override 2199 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2200 WAL wal = walFactory.getWAL(regionInfo); 2201 if (this.walRoller != null) { 2202 this.walRoller.addWAL(wal); 2203 } 2204 return wal; 2205 } 2206 2207 public LogRoller getWalRoller() { 2208 return walRoller; 2209 } 2210 2211 public WALFactory getWalFactory() { 2212 return walFactory; 2213 } 2214 2215 @Override 2216 public void stop(final String msg) { 2217 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2218 } 2219 2220 /** 2221 * Stops the regionserver. 2222 * @param msg Status message 2223 * @param force True if this is a regionserver abort 2224 * @param user The user executing the stop request, or null if no user is associated 2225 */ 2226 public void stop(final String msg, final boolean force, final User user) { 2227 if (!this.stopped) { 2228 LOG.info("***** STOPPING region server '{}' *****", this); 2229 if (this.rsHost != null) { 2230 // when forced via abort don't allow CPs to override 2231 try { 2232 this.rsHost.preStop(msg, user); 2233 } catch (IOException ioe) { 2234 if (!force) { 2235 LOG.warn("The region server did not stop", ioe); 2236 return; 2237 } 2238 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2239 } 2240 } 2241 this.stopped = true; 2242 LOG.info("STOPPED: " + msg); 2243 // Wakes run() if it is sleeping 2244 sleeper.skipSleepCycle(); 2245 } 2246 } 2247 2248 public void waitForServerOnline() { 2249 while (!isStopped() && !isOnline()) { 2250 synchronized (online) { 2251 try { 2252 online.wait(msgInterval); 2253 } catch (InterruptedException ie) { 2254 Thread.currentThread().interrupt(); 2255 break; 2256 } 2257 } 2258 } 2259 } 2260 2261 @Override 2262 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2263 HRegion r = context.getRegion(); 2264 long openProcId = context.getOpenProcId(); 2265 long masterSystemTime = context.getMasterSystemTime(); 2266 long initiatingMasterActiveTime = context.getInitiatingMasterActiveTime(); 2267 rpcServices.checkOpen(); 2268 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2269 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2270 // Do checks to see if we need to compact (references or too many files) 2271 // Skip compaction check if region is read only 2272 if (!r.isReadOnly()) { 2273 for (HStore s : r.stores.values()) { 2274 if (s.hasReferences() || s.needsCompaction()) { 2275 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2276 } 2277 } 2278 } 2279 long openSeqNum = r.getOpenSeqNum(); 2280 if (openSeqNum == HConstants.NO_SEQNUM) { 2281 // If we opened a region, we should have read some sequence number from it. 2282 LOG.error( 2283 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2284 openSeqNum = 0; 2285 } 2286 2287 // Notify master 2288 if ( 2289 !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2290 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo(), initiatingMasterActiveTime)) 2291 ) { 2292 throw new IOException( 2293 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2294 } 2295 2296 triggerFlushInPrimaryRegion(r); 2297 2298 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2299 } 2300 2301 /** 2302 * Helper method for use in tests. Skip the region transition report when there's no master around 2303 * to receive it. 2304 */ 2305 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2306 final TransitionCode code = context.getCode(); 2307 final long openSeqNum = context.getOpenSeqNum(); 2308 long masterSystemTime = context.getMasterSystemTime(); 2309 final RegionInfo[] hris = context.getHris(); 2310 2311 if (code == TransitionCode.OPENED) { 2312 Preconditions.checkArgument(hris != null && hris.length == 1); 2313 if (hris[0].isMetaRegion()) { 2314 LOG.warn( 2315 "meta table location is stored in master local store, so we can not skip reporting"); 2316 return false; 2317 } else { 2318 try { 2319 MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0], 2320 serverName, openSeqNum, masterSystemTime); 2321 } catch (IOException e) { 2322 LOG.info("Failed to update meta", e); 2323 return false; 2324 } 2325 } 2326 } 2327 return true; 2328 } 2329 2330 private ReportRegionStateTransitionRequest 2331 createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) { 2332 final TransitionCode code = context.getCode(); 2333 final long openSeqNum = context.getOpenSeqNum(); 2334 final RegionInfo[] hris = context.getHris(); 2335 final long[] procIds = context.getProcIds(); 2336 2337 ReportRegionStateTransitionRequest.Builder builder = 2338 ReportRegionStateTransitionRequest.newBuilder(); 2339 builder.setServer(ProtobufUtil.toServerName(serverName)); 2340 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2341 transition.setTransitionCode(code); 2342 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2343 transition.setOpenSeqNum(openSeqNum); 2344 } 2345 for (RegionInfo hri : hris) { 2346 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2347 } 2348 for (long procId : procIds) { 2349 transition.addProcId(procId); 2350 } 2351 transition.setInitiatingMasterActiveTime(context.getInitiatingMasterActiveTime()); 2352 2353 return builder.build(); 2354 } 2355 2356 @Override 2357 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2358 if (TEST_SKIP_REPORTING_TRANSITION) { 2359 return skipReportingTransition(context); 2360 } 2361 final ReportRegionStateTransitionRequest request = 2362 createReportRegionStateTransitionRequest(context); 2363 2364 int tries = 0; 2365 long pauseTime = this.retryPauseTime; 2366 // Keep looping till we get an error. We want to send reports even though server is going down. 2367 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2368 // HRegionServer does down. 2369 while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) { 2370 RegionServerStatusService.BlockingInterface rss = rssStub; 2371 try { 2372 if (rss == null) { 2373 createRegionServerStatusStub(); 2374 continue; 2375 } 2376 ReportRegionStateTransitionResponse response = 2377 rss.reportRegionStateTransition(null, request); 2378 if (response.hasErrorMessage()) { 2379 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2380 break; 2381 } 2382 // Log if we had to retry else don't log unless TRACE. We want to 2383 // know if were successful after an attempt showed in logs as failed. 2384 if (tries > 0 || LOG.isTraceEnabled()) { 2385 LOG.info("TRANSITION REPORTED " + request); 2386 } 2387 // NOTE: Return mid-method!!! 2388 return true; 2389 } catch (ServiceException se) { 2390 IOException ioe = ProtobufUtil.getRemoteException(se); 2391 boolean pause = ioe instanceof ServerNotRunningYetException 2392 || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException; 2393 if (pause) { 2394 // Do backoff else we flood the Master with requests. 2395 pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries); 2396 } else { 2397 pauseTime = this.retryPauseTime; // Reset. 2398 } 2399 LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" 2400 + tries + ")" 2401 + (pause 2402 ? " after " + pauseTime + "ms delay (Master is coming online...)." 2403 : " immediately."), 2404 ioe); 2405 if (pause) { 2406 Threads.sleep(pauseTime); 2407 } 2408 tries++; 2409 if (rssStub == rss) { 2410 rssStub = null; 2411 } 2412 } 2413 } 2414 return false; 2415 } 2416 2417 /** 2418 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2419 * block this thread. See RegionReplicaFlushHandler for details. 2420 */ 2421 private void triggerFlushInPrimaryRegion(final HRegion region) { 2422 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2423 return; 2424 } 2425 TableName tn = region.getTableDescriptor().getTableName(); 2426 if ( 2427 !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) 2428 || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) || 2429 // If the memstore replication not setup, we do not have to wait for observing a flush event 2430 // from primary before starting to serve reads, because gaps from replication is not 2431 // applicable,this logic is from 2432 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by 2433 // HBASE-13063 2434 !region.getTableDescriptor().hasRegionMemStoreReplication() 2435 ) { 2436 region.setReadsEnabled(true); 2437 return; 2438 } 2439 2440 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2441 // RegionReplicaFlushHandler might reset this. 2442 2443 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2444 if (this.executorService != null) { 2445 this.executorService.submit(new RegionReplicaFlushHandler(this, region)); 2446 } else { 2447 LOG.info("Executor is null; not running flush of primary region replica for {}", 2448 region.getRegionInfo()); 2449 } 2450 } 2451 2452 @InterfaceAudience.Private 2453 public RSRpcServices getRSRpcServices() { 2454 return rpcServices; 2455 } 2456 2457 /** 2458 * Cause the server to exit without closing the regions it is serving, the log it is using and 2459 * without notifying the master. Used unit testing and on catastrophic events such as HDFS is 2460 * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused 2461 * the abort, or null 2462 */ 2463 @Override 2464 public void abort(String reason, Throwable cause) { 2465 if (!setAbortRequested()) { 2466 // Abort already in progress, ignore the new request. 2467 LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason); 2468 return; 2469 } 2470 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2471 if (cause != null) { 2472 LOG.error(HBaseMarkers.FATAL, msg, cause); 2473 } else { 2474 LOG.error(HBaseMarkers.FATAL, msg); 2475 } 2476 // HBASE-4014: show list of coprocessors that were loaded to help debug 2477 // regionserver crashes.Note that we're implicitly using 2478 // java.util.HashSet's toString() method to print the coprocessor names. 2479 LOG.error(HBaseMarkers.FATAL, 2480 "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors()); 2481 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2482 try { 2483 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2484 } catch (MalformedObjectNameException | IOException e) { 2485 LOG.warn("Failed dumping metrics", e); 2486 } 2487 2488 // Do our best to report our abort to the master, but this may not work 2489 try { 2490 if (cause != null) { 2491 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2492 } 2493 // Report to the master but only if we have already registered with the master. 2494 RegionServerStatusService.BlockingInterface rss = rssStub; 2495 if (rss != null && this.serverName != null) { 2496 ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); 2497 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2498 builder.setErrorMessage(msg); 2499 rss.reportRSFatalError(null, builder.build()); 2500 } 2501 } catch (Throwable t) { 2502 LOG.warn("Unable to report fatal error to master", t); 2503 } 2504 2505 scheduleAbortTimer(); 2506 // shutdown should be run as the internal user 2507 stop(reason, true, null); 2508 } 2509 2510 /* 2511 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does 2512 * close socket in case want to bring up server on old hostname+port immediately. 2513 */ 2514 @InterfaceAudience.Private 2515 protected void kill() { 2516 this.killed = true; 2517 abort("Simulated kill"); 2518 } 2519 2520 // Limits the time spent in the shutdown process. 2521 private void scheduleAbortTimer() { 2522 if (this.abortMonitor == null) { 2523 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2524 TimerTask abortTimeoutTask = null; 2525 try { 2526 Constructor<? extends TimerTask> timerTaskCtor = 2527 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2528 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2529 timerTaskCtor.setAccessible(true); 2530 abortTimeoutTask = timerTaskCtor.newInstance(); 2531 } catch (Exception e) { 2532 LOG.warn("Initialize abort timeout task failed", e); 2533 } 2534 if (abortTimeoutTask != null) { 2535 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2536 } 2537 } 2538 } 2539 2540 /** 2541 * Wait on all threads to finish. Presumption is that all closes and stops have already been 2542 * called. 2543 */ 2544 protected void stopServiceThreads() { 2545 // clean up the scheduled chores 2546 stopChoreService(); 2547 if (bootstrapNodeManager != null) { 2548 bootstrapNodeManager.stop(); 2549 } 2550 if (this.cacheFlusher != null) { 2551 this.cacheFlusher.shutdown(); 2552 } 2553 if (this.walRoller != null) { 2554 this.walRoller.close(); 2555 } 2556 if (this.compactSplitThread != null) { 2557 this.compactSplitThread.join(); 2558 } 2559 stopExecutorService(); 2560 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { 2561 this.replicationSourceHandler.stopReplicationService(); 2562 } else { 2563 if (this.replicationSourceHandler != null) { 2564 this.replicationSourceHandler.stopReplicationService(); 2565 } 2566 if (this.replicationSinkHandler != null) { 2567 this.replicationSinkHandler.stopReplicationService(); 2568 } 2569 } 2570 } 2571 2572 /** Returns Return the object that implements the replication source executorService. */ 2573 @Override 2574 public ReplicationSourceService getReplicationSourceService() { 2575 return replicationSourceHandler; 2576 } 2577 2578 /** Returns Return the object that implements the replication sink executorService. */ 2579 public ReplicationSinkService getReplicationSinkService() { 2580 return replicationSinkHandler; 2581 } 2582 2583 /** 2584 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2585 * connection, the current rssStub must be null. Method will block until a master is available. 2586 * You can break from this block by requesting the server stop. 2587 * @return master + port, or null if server has been stopped 2588 */ 2589 private synchronized ServerName createRegionServerStatusStub() { 2590 // Create RS stub without refreshing the master node from ZK, use cached data 2591 return createRegionServerStatusStub(false); 2592 } 2593 2594 /** 2595 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2596 * connection, the current rssStub must be null. Method will block until a master is available. 2597 * You can break from this block by requesting the server stop. 2598 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2599 * @return master + port, or null if server has been stopped 2600 */ 2601 @InterfaceAudience.Private 2602 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2603 if (rssStub != null) { 2604 return masterAddressTracker.getMasterAddress(); 2605 } 2606 ServerName sn = null; 2607 long previousLogTime = 0; 2608 RegionServerStatusService.BlockingInterface intRssStub = null; 2609 LockService.BlockingInterface intLockStub = null; 2610 boolean interrupted = false; 2611 try { 2612 while (keepLooping()) { 2613 sn = this.masterAddressTracker.getMasterAddress(refresh); 2614 if (sn == null) { 2615 if (!keepLooping()) { 2616 // give up with no connection. 2617 LOG.debug("No master found and cluster is stopped; bailing out"); 2618 return null; 2619 } 2620 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2621 LOG.debug("No master found; retry"); 2622 previousLogTime = EnvironmentEdgeManager.currentTime(); 2623 } 2624 refresh = true; // let's try pull it from ZK directly 2625 if (sleepInterrupted(200)) { 2626 interrupted = true; 2627 } 2628 continue; 2629 } 2630 try { 2631 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, 2632 userProvider.getCurrent(), shortOperationTimeout); 2633 intRssStub = RegionServerStatusService.newBlockingStub(channel); 2634 intLockStub = LockService.newBlockingStub(channel); 2635 break; 2636 } catch (IOException e) { 2637 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2638 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 2639 if (e instanceof ServerNotRunningYetException) { 2640 LOG.info("Master isn't available yet, retrying"); 2641 } else { 2642 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 2643 } 2644 previousLogTime = EnvironmentEdgeManager.currentTime(); 2645 } 2646 if (sleepInterrupted(200)) { 2647 interrupted = true; 2648 } 2649 } 2650 } 2651 } finally { 2652 if (interrupted) { 2653 Thread.currentThread().interrupt(); 2654 } 2655 } 2656 this.rssStub = intRssStub; 2657 this.lockStub = intLockStub; 2658 return sn; 2659 } 2660 2661 /** 2662 * @return True if we should break loop because cluster is going down or this server has been 2663 * stopped or hdfs has gone bad. 2664 */ 2665 private boolean keepLooping() { 2666 return !this.stopped && isClusterUp(); 2667 } 2668 2669 /* 2670 * Let the master know we're here Run initialization using parameters passed us by the master. 2671 * @return A Map of key/value configurations we got from the Master else null if we failed to 2672 * register. 2673 */ 2674 private RegionServerStartupResponse reportForDuty() throws IOException { 2675 if (this.masterless) { 2676 return RegionServerStartupResponse.getDefaultInstance(); 2677 } 2678 ServerName masterServerName = createRegionServerStatusStub(true); 2679 RegionServerStatusService.BlockingInterface rss = rssStub; 2680 if (masterServerName == null || rss == null) { 2681 return null; 2682 } 2683 RegionServerStartupResponse result = null; 2684 try { 2685 rpcServices.requestCount.reset(); 2686 rpcServices.rpcGetRequestCount.reset(); 2687 rpcServices.rpcScanRequestCount.reset(); 2688 rpcServices.rpcFullScanRequestCount.reset(); 2689 rpcServices.rpcMultiRequestCount.reset(); 2690 rpcServices.rpcMutateRequestCount.reset(); 2691 LOG.info("reportForDuty to master=" + masterServerName + " with port=" 2692 + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode); 2693 long now = EnvironmentEdgeManager.currentTime(); 2694 int port = rpcServices.getSocketAddress().getPort(); 2695 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 2696 if (!StringUtils.isBlank(useThisHostnameInstead)) { 2697 request.setUseThisHostnameInstead(useThisHostnameInstead); 2698 } 2699 request.setPort(port); 2700 request.setServerStartCode(this.startcode); 2701 request.setServerCurrentTime(now); 2702 result = rss.regionServerStartup(null, request.build()); 2703 } catch (ServiceException se) { 2704 IOException ioe = ProtobufUtil.getRemoteException(se); 2705 if (ioe instanceof ClockOutOfSyncException) { 2706 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe); 2707 // Re-throw IOE will cause RS to abort 2708 throw ioe; 2709 } else if (ioe instanceof DecommissionedHostRejectedException) { 2710 LOG.error(HBaseMarkers.FATAL, 2711 "Master rejected startup because the host is considered decommissioned", ioe); 2712 // Re-throw IOE will cause RS to abort 2713 throw ioe; 2714 } else if (ioe instanceof ServerNotRunningYetException) { 2715 LOG.debug("Master is not running yet"); 2716 } else { 2717 LOG.warn("error telling master we are up", se); 2718 } 2719 rssStub = null; 2720 } 2721 return result; 2722 } 2723 2724 @Override 2725 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 2726 try { 2727 GetLastFlushedSequenceIdRequest req = 2728 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 2729 RegionServerStatusService.BlockingInterface rss = rssStub; 2730 if (rss == null) { // Try to connect one more time 2731 createRegionServerStatusStub(); 2732 rss = rssStub; 2733 if (rss == null) { 2734 // Still no luck, we tried 2735 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 2736 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2737 .build(); 2738 } 2739 } 2740 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 2741 return RegionStoreSequenceIds.newBuilder() 2742 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 2743 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 2744 } catch (ServiceException e) { 2745 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 2746 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 2747 .build(); 2748 } 2749 } 2750 2751 /** 2752 * Close meta region if we carry it 2753 * @param abort Whether we're running an abort. 2754 */ 2755 private void closeMetaTableRegions(final boolean abort) { 2756 HRegion meta = null; 2757 this.onlineRegionsLock.writeLock().lock(); 2758 try { 2759 for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) { 2760 RegionInfo hri = e.getValue().getRegionInfo(); 2761 if (hri.isMetaRegion()) { 2762 meta = e.getValue(); 2763 } 2764 if (meta != null) { 2765 break; 2766 } 2767 } 2768 } finally { 2769 this.onlineRegionsLock.writeLock().unlock(); 2770 } 2771 if (meta != null) { 2772 closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 2773 } 2774 } 2775 2776 /** 2777 * Schedule closes on all user regions. Should be safe calling multiple times because it wont' 2778 * close regions that are already closed or that are closing. 2779 * @param abort Whether we're running an abort. 2780 */ 2781 private void closeUserRegions(final boolean abort) { 2782 this.onlineRegionsLock.writeLock().lock(); 2783 try { 2784 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 2785 HRegion r = e.getValue(); 2786 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 2787 // Don't update zk with this close transition; pass false. 2788 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 2789 } 2790 } 2791 } finally { 2792 this.onlineRegionsLock.writeLock().unlock(); 2793 } 2794 } 2795 2796 protected Map<String, HRegion> getOnlineRegions() { 2797 return this.onlineRegions; 2798 } 2799 2800 public int getNumberOfOnlineRegions() { 2801 return this.onlineRegions.size(); 2802 } 2803 2804 /** 2805 * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM 2806 * as client; HRegion cannot be serialized to cross an rpc. 2807 */ 2808 public Collection<HRegion> getOnlineRegionsLocalContext() { 2809 Collection<HRegion> regions = this.onlineRegions.values(); 2810 return Collections.unmodifiableCollection(regions); 2811 } 2812 2813 @Override 2814 public void addRegion(HRegion region) { 2815 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 2816 configurationManager.registerObserver(region); 2817 } 2818 2819 private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region, 2820 long size) { 2821 if (!sortedRegions.containsKey(size)) { 2822 sortedRegions.put(size, new ArrayList<>()); 2823 } 2824 sortedRegions.get(size).add(region); 2825 } 2826 2827 /** 2828 * @return A new Map of online regions sorted by region off-heap size with the first entry being 2829 * the biggest. 2830 */ 2831 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() { 2832 // we'll sort the regions in reverse 2833 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2834 // Copy over all regions. Regions are sorted by size with biggest first. 2835 for (HRegion region : this.onlineRegions.values()) { 2836 addRegion(sortedRegions, region, region.getMemStoreOffHeapSize()); 2837 } 2838 return sortedRegions; 2839 } 2840 2841 /** 2842 * @return A new Map of online regions sorted by region heap size with the first entry being the 2843 * biggest. 2844 */ 2845 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() { 2846 // we'll sort the regions in reverse 2847 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 2848 // Copy over all regions. Regions are sorted by size with biggest first. 2849 for (HRegion region : this.onlineRegions.values()) { 2850 addRegion(sortedRegions, region, region.getMemStoreHeapSize()); 2851 } 2852 return sortedRegions; 2853 } 2854 2855 /** Returns reference to FlushRequester */ 2856 @Override 2857 public FlushRequester getFlushRequester() { 2858 return this.cacheFlusher; 2859 } 2860 2861 @Override 2862 public CompactionRequester getCompactionRequestor() { 2863 return this.compactSplitThread; 2864 } 2865 2866 @Override 2867 public LeaseManager getLeaseManager() { 2868 return leaseManager; 2869 } 2870 2871 /** Returns {@code true} when the data file system is available, {@code false} otherwise. */ 2872 boolean isDataFileSystemOk() { 2873 return this.dataFsOk; 2874 } 2875 2876 public RegionServerCoprocessorHost getRegionServerCoprocessorHost() { 2877 return this.rsHost; 2878 } 2879 2880 @Override 2881 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 2882 return this.regionsInTransitionInRS; 2883 } 2884 2885 @Override 2886 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 2887 return rsQuotaManager; 2888 } 2889 2890 // 2891 // Main program and support routines 2892 // 2893 /** 2894 * Load the replication executorService objects, if any 2895 */ 2896 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 2897 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { 2898 // read in the name of the source replication class from the config file. 2899 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 2900 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 2901 2902 // read in the name of the sink replication class from the config file. 2903 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 2904 HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT); 2905 2906 // If both the sink and the source class names are the same, then instantiate 2907 // only one object. 2908 if (sourceClassname.equals(sinkClassname)) { 2909 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2910 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2911 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 2912 server.sameReplicationSourceAndSink = true; 2913 } else { 2914 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 2915 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2916 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 2917 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 2918 server.sameReplicationSourceAndSink = false; 2919 } 2920 } 2921 2922 private static <T extends ReplicationService> T newReplicationInstance(String classname, 2923 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 2924 Path oldLogDir, WALFactory walFactory) throws IOException { 2925 final Class<? extends T> clazz; 2926 try { 2927 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 2928 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 2929 } catch (java.lang.ClassNotFoundException nfe) { 2930 throw new IOException("Could not find class for " + classname); 2931 } 2932 T service = ReflectionUtils.newInstance(clazz, conf); 2933 service.initialize(server, walFs, logDir, oldLogDir, walFactory); 2934 return service; 2935 } 2936 2937 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() { 2938 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 2939 if (!this.isOnline()) { 2940 return walGroupsReplicationStatus; 2941 } 2942 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 2943 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 2944 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 2945 for (ReplicationSourceInterface source : allSources) { 2946 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 2947 } 2948 return walGroupsReplicationStatus; 2949 } 2950 2951 /** 2952 * Utility for constructing an instance of the passed HRegionServer class. 2953 */ 2954 static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass, 2955 final Configuration conf) { 2956 try { 2957 Constructor<? extends HRegionServer> c = 2958 regionServerClass.getConstructor(Configuration.class); 2959 return c.newInstance(conf); 2960 } catch (Exception e) { 2961 throw new RuntimeException( 2962 "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); 2963 } 2964 } 2965 2966 /** 2967 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 2968 */ 2969 public static void main(String[] args) { 2970 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 2971 VersionInfo.logVersion(); 2972 Configuration conf = HBaseConfiguration.create(); 2973 @SuppressWarnings("unchecked") 2974 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 2975 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 2976 2977 new HRegionServerCommandLine(regionServerClass).doMain(args); 2978 } 2979 2980 /** 2981 * Gets the online regions of the specified table. This method looks at the in-memory 2982 * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions. 2983 * If a region on this table has been closed during a disable, etc., it will not be included in 2984 * the returned list. So, the returned list may not necessarily be ALL regions in this table, its 2985 * all the ONLINE regions in the table. 2986 * @param tableName table to limit the scope of the query 2987 * @return Online regions from <code>tableName</code> 2988 */ 2989 @Override 2990 public List<HRegion> getRegions(TableName tableName) { 2991 List<HRegion> tableRegions = new ArrayList<>(); 2992 synchronized (this.onlineRegions) { 2993 for (HRegion region : this.onlineRegions.values()) { 2994 RegionInfo regionInfo = region.getRegionInfo(); 2995 if (regionInfo.getTable().equals(tableName)) { 2996 tableRegions.add(region); 2997 } 2998 } 2999 } 3000 return tableRegions; 3001 } 3002 3003 @Override 3004 public List<HRegion> getRegions() { 3005 List<HRegion> allRegions; 3006 synchronized (this.onlineRegions) { 3007 // Return a clone copy of the onlineRegions 3008 allRegions = new ArrayList<>(onlineRegions.values()); 3009 } 3010 return allRegions; 3011 } 3012 3013 /** 3014 * Gets the online tables in this RS. This method looks at the in-memory onlineRegions. 3015 * @return all the online tables in this RS 3016 */ 3017 public Set<TableName> getOnlineTables() { 3018 Set<TableName> tables = new HashSet<>(); 3019 synchronized (this.onlineRegions) { 3020 for (Region region : this.onlineRegions.values()) { 3021 tables.add(region.getTableDescriptor().getTableName()); 3022 } 3023 } 3024 return tables; 3025 } 3026 3027 public String[] getRegionServerCoprocessors() { 3028 TreeSet<String> coprocessors = new TreeSet<>(); 3029 try { 3030 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3031 } catch (IOException exception) { 3032 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " 3033 + "skipping."); 3034 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3035 } 3036 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3037 for (HRegion region : regions) { 3038 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3039 try { 3040 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3041 } catch (IOException exception) { 3042 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region 3043 + "; skipping."); 3044 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3045 } 3046 } 3047 coprocessors.addAll(rsHost.getCoprocessors()); 3048 return coprocessors.toArray(new String[0]); 3049 } 3050 3051 /** 3052 * Try to close the region, logs a warning on failure but continues. 3053 * @param region Region to close 3054 */ 3055 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3056 try { 3057 if (!closeRegion(region.getEncodedName(), abort, null)) { 3058 LOG 3059 .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing"); 3060 } 3061 } catch (IOException e) { 3062 LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing", 3063 e); 3064 } 3065 } 3066 3067 /** 3068 * Close asynchronously a region, can be called from the master or internally by the regionserver 3069 * when stopping. If called from the master, the region will update the status. 3070 * <p> 3071 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3072 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3073 * </p> 3074 * <p> 3075 * If a close was in progress, this new request will be ignored, and an exception thrown. 3076 * </p> 3077 * <p> 3078 * Provides additional flag to indicate if this region blocks should be evicted from the cache. 3079 * </p> 3080 * @param encodedName Region to close 3081 * @param abort True if we are aborting 3082 * @param destination Where the Region is being moved too... maybe null if unknown. 3083 * @return True if closed a region. 3084 * @throws NotServingRegionException if the region is not online 3085 */ 3086 protected boolean closeRegion(String encodedName, final boolean abort, 3087 final ServerName destination) throws NotServingRegionException { 3088 // Check for permissions to close. 3089 HRegion actualRegion = this.getRegion(encodedName); 3090 // Can be null if we're calling close on a region that's not online 3091 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3092 try { 3093 actualRegion.getCoprocessorHost().preClose(false); 3094 } catch (IOException exp) { 3095 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3096 return false; 3097 } 3098 } 3099 3100 // previous can come back 'null' if not in map. 3101 final Boolean previous = 3102 this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE); 3103 3104 if (Boolean.TRUE.equals(previous)) { 3105 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " 3106 + "trying to OPEN. Cancelling OPENING."); 3107 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3108 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3109 // We're going to try to do a standard close then. 3110 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." 3111 + " Doing a standard close now"); 3112 return closeRegion(encodedName, abort, destination); 3113 } 3114 // Let's get the region from the online region list again 3115 actualRegion = this.getRegion(encodedName); 3116 if (actualRegion == null) { // If already online, we still need to close it. 3117 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3118 // The master deletes the znode when it receives this exception. 3119 throw new NotServingRegionException( 3120 "The region " + encodedName + " was opening but not yet served. Opening is cancelled."); 3121 } 3122 } else if (previous == null) { 3123 LOG.info("Received CLOSE for {}", encodedName); 3124 } else if (Boolean.FALSE.equals(previous)) { 3125 LOG.info("Received CLOSE for the region: " + encodedName 3126 + ", which we are already trying to CLOSE, but not completed yet"); 3127 return true; 3128 } 3129 3130 if (actualRegion == null) { 3131 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3132 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3133 // The master deletes the znode when it receives this exception. 3134 throw new NotServingRegionException( 3135 "The region " + encodedName + " is not online, and is not opening."); 3136 } 3137 3138 CloseRegionHandler crh; 3139 final RegionInfo hri = actualRegion.getRegionInfo(); 3140 if (hri.isMetaRegion()) { 3141 crh = new CloseMetaHandler(this, this, hri, abort); 3142 } else { 3143 crh = new CloseRegionHandler(this, this, hri, abort, destination); 3144 } 3145 this.executorService.submit(crh); 3146 return true; 3147 } 3148 3149 /** 3150 * @return HRegion for the passed binary <code>regionName</code> or null if named region is not 3151 * member of the online regions. 3152 */ 3153 public HRegion getOnlineRegion(final byte[] regionName) { 3154 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3155 return this.onlineRegions.get(encodedRegionName); 3156 } 3157 3158 @Override 3159 public HRegion getRegion(final String encodedRegionName) { 3160 return this.onlineRegions.get(encodedRegionName); 3161 } 3162 3163 @Override 3164 public boolean removeRegion(final HRegion r, ServerName destination) { 3165 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3166 if (DataTieringManager.getInstance() != null) { 3167 DataTieringManager.getInstance().getRegionColdDataSize() 3168 .remove(r.getRegionInfo().getEncodedName()); 3169 } 3170 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3171 if (destination != null) { 3172 long closeSeqNum = r.getMaxFlushedSeqId(); 3173 if (closeSeqNum == HConstants.NO_SEQNUM) { 3174 // No edits in WAL for this region; get the sequence number when the region was opened. 3175 closeSeqNum = r.getOpenSeqNum(); 3176 if (closeSeqNum == HConstants.NO_SEQNUM) { 3177 closeSeqNum = 0; 3178 } 3179 } 3180 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3181 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3182 if (selfMove) { 3183 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put( 3184 r.getRegionInfo().getEncodedName(), 3185 new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3186 } 3187 } 3188 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3189 configurationManager.deregisterObserver(r); 3190 return toReturn != null; 3191 } 3192 3193 /** 3194 * Protected Utility method for safely obtaining an HRegion handle. 3195 * @param regionName Name of online {@link HRegion} to return 3196 * @return {@link HRegion} for <code>regionName</code> 3197 */ 3198 protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { 3199 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3200 return getRegionByEncodedName(regionName, encodedRegionName); 3201 } 3202 3203 public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { 3204 return getRegionByEncodedName(null, encodedRegionName); 3205 } 3206 3207 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3208 throws NotServingRegionException { 3209 HRegion region = this.onlineRegions.get(encodedRegionName); 3210 if (region == null) { 3211 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3212 if (moveInfo != null) { 3213 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3214 } 3215 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3216 String regionNameStr = 3217 regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName); 3218 if (isOpening != null && isOpening) { 3219 throw new RegionOpeningException( 3220 "Region " + regionNameStr + " is opening on " + this.serverName); 3221 } 3222 throw new NotServingRegionException( 3223 "" + regionNameStr + " is not online on " + this.serverName); 3224 } 3225 return region; 3226 } 3227 3228 /** 3229 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't 3230 * already. 3231 * @param t Throwable 3232 * @param msg Message to log in error. Can be null. 3233 * @return Throwable converted to an IOE; methods can only let out IOEs. 3234 */ 3235 private Throwable cleanup(final Throwable t, final String msg) { 3236 // Don't log as error if NSRE; NSRE is 'normal' operation. 3237 if (t instanceof NotServingRegionException) { 3238 LOG.debug("NotServingRegionException; " + t.getMessage()); 3239 return t; 3240 } 3241 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3242 if (msg == null) { 3243 LOG.error("", e); 3244 } else { 3245 LOG.error(msg, e); 3246 } 3247 if (!rpcServices.checkOOME(t)) { 3248 checkFileSystem(); 3249 } 3250 return t; 3251 } 3252 3253 /** 3254 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3255 * @return Make <code>t</code> an IOE if it isn't already. 3256 */ 3257 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3258 return (t instanceof IOException ? (IOException) t 3259 : msg == null || msg.length() == 0 ? new IOException(t) 3260 : new IOException(msg, t)); 3261 } 3262 3263 /** 3264 * Checks to see if the file system is still accessible. If not, sets abortRequested and 3265 * stopRequested 3266 * @return false if file system is not available 3267 */ 3268 boolean checkFileSystem() { 3269 if (this.dataFsOk && this.dataFs != null) { 3270 try { 3271 FSUtils.checkFileSystemAvailable(this.dataFs); 3272 } catch (IOException e) { 3273 abort("File System not available", e); 3274 this.dataFsOk = false; 3275 } 3276 } 3277 return this.dataFsOk; 3278 } 3279 3280 @Override 3281 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3282 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3283 Address[] addr = new Address[favoredNodes.size()]; 3284 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3285 // it is a map of region name to Address[] 3286 for (int i = 0; i < favoredNodes.size(); i++) { 3287 addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); 3288 } 3289 regionFavoredNodesMap.put(encodedRegionName, addr); 3290 } 3291 3292 /** 3293 * Return the favored nodes for a region given its encoded name. Look at the comment around 3294 * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here. 3295 * @param encodedRegionName the encoded region name. 3296 * @return array of favored locations 3297 */ 3298 @Override 3299 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3300 return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); 3301 } 3302 3303 @Override 3304 public ServerNonceManager getNonceManager() { 3305 return this.nonceManager; 3306 } 3307 3308 private static class MovedRegionInfo { 3309 private final ServerName serverName; 3310 private final long seqNum; 3311 3312 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3313 this.serverName = serverName; 3314 this.seqNum = closeSeqNum; 3315 } 3316 3317 public ServerName getServerName() { 3318 return serverName; 3319 } 3320 3321 public long getSeqNum() { 3322 return seqNum; 3323 } 3324 } 3325 3326 /** 3327 * We need a timeout. If not there is a risk of giving a wrong information: this would double the 3328 * number of network calls instead of reducing them. 3329 */ 3330 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3331 3332 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, 3333 boolean selfMove) { 3334 if (selfMove) { 3335 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3336 return; 3337 } 3338 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" 3339 + closeSeqNum); 3340 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3341 } 3342 3343 void removeFromMovedRegions(String encodedName) { 3344 movedRegionInfoCache.invalidate(encodedName); 3345 } 3346 3347 @InterfaceAudience.Private 3348 public MovedRegionInfo getMovedRegion(String encodedRegionName) { 3349 return movedRegionInfoCache.getIfPresent(encodedRegionName); 3350 } 3351 3352 @InterfaceAudience.Private 3353 public int movedRegionCacheExpiredTime() { 3354 return TIMEOUT_REGION_MOVED; 3355 } 3356 3357 private String getMyEphemeralNodePath() { 3358 return zooKeeper.getZNodePaths().getRsPath(serverName); 3359 } 3360 3361 private boolean isHealthCheckerConfigured() { 3362 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3363 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3364 } 3365 3366 /** Returns the underlying {@link CompactSplit} for the servers */ 3367 public CompactSplit getCompactSplitThread() { 3368 return this.compactSplitThread; 3369 } 3370 3371 CoprocessorServiceResponse execRegionServerService( 3372 @SuppressWarnings("UnusedParameters") final RpcController controller, 3373 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3374 try { 3375 ServerRpcController serviceController = new ServerRpcController(); 3376 CoprocessorServiceCall call = serviceRequest.getCall(); 3377 String serviceName = call.getServiceName(); 3378 Service service = coprocessorServiceHandlers.get(serviceName); 3379 if (service == null) { 3380 throw new UnknownProtocolException(null, 3381 "No registered coprocessor executorService found for " + serviceName); 3382 } 3383 ServiceDescriptor serviceDesc = service.getDescriptorForType(); 3384 3385 String methodName = call.getMethodName(); 3386 MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName); 3387 if (methodDesc == null) { 3388 throw new UnknownProtocolException(service.getClass(), 3389 "Unknown method " + methodName + " called on executorService " + serviceName); 3390 } 3391 3392 Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3393 final Message.Builder responseBuilder = 3394 service.getResponsePrototype(methodDesc).newBuilderForType(); 3395 service.callMethod(methodDesc, serviceController, request, message -> { 3396 if (message != null) { 3397 responseBuilder.mergeFrom(message); 3398 } 3399 }); 3400 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3401 if (exception != null) { 3402 throw exception; 3403 } 3404 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3405 } catch (IOException ie) { 3406 throw new ServiceException(ie); 3407 } 3408 } 3409 3410 /** 3411 * May be null if this is a master which not carry table. 3412 * @return The block cache instance used by the regionserver. 3413 */ 3414 @Override 3415 public Optional<BlockCache> getBlockCache() { 3416 return Optional.ofNullable(this.blockCache); 3417 } 3418 3419 /** 3420 * May be null if this is a master which not carry table. 3421 * @return The cache for mob files used by the regionserver. 3422 */ 3423 @Override 3424 public Optional<MobFileCache> getMobFileCache() { 3425 return Optional.ofNullable(this.mobFileCache); 3426 } 3427 3428 CacheEvictionStats clearRegionBlockCache(Region region) { 3429 long evictedBlocks = 0; 3430 3431 for (Store store : region.getStores()) { 3432 for (StoreFile hFile : store.getStorefiles()) { 3433 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3434 } 3435 } 3436 3437 return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build(); 3438 } 3439 3440 @Override 3441 public double getCompactionPressure() { 3442 double max = 0; 3443 for (Region region : onlineRegions.values()) { 3444 for (Store store : region.getStores()) { 3445 double normCount = store.getCompactionPressure(); 3446 if (normCount > max) { 3447 max = normCount; 3448 } 3449 } 3450 } 3451 return max; 3452 } 3453 3454 @Override 3455 public HeapMemoryManager getHeapMemoryManager() { 3456 return hMemManager; 3457 } 3458 3459 public MemStoreFlusher getMemStoreFlusher() { 3460 return cacheFlusher; 3461 } 3462 3463 /** 3464 * For testing 3465 * @return whether all wal roll request finished for this regionserver 3466 */ 3467 @InterfaceAudience.Private 3468 public boolean walRollRequestFinished() { 3469 return this.walRoller.walRollFinished(); 3470 } 3471 3472 @Override 3473 public ThroughputController getFlushThroughputController() { 3474 return flushThroughputController; 3475 } 3476 3477 @Override 3478 public double getFlushPressure() { 3479 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3480 // return 0 during RS initialization 3481 return 0.0; 3482 } 3483 return getRegionServerAccounting().getFlushPressure(); 3484 } 3485 3486 @Override 3487 public void onConfigurationChange(Configuration newConf) { 3488 ThroughputController old = this.flushThroughputController; 3489 if (old != null) { 3490 old.stop("configuration change"); 3491 } 3492 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3493 try { 3494 Superusers.initialize(newConf); 3495 } catch (IOException e) { 3496 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3497 } 3498 3499 boolean originalIsReadOnlyEnabled = CoprocessorConfigurationUtil 3500 .areReadOnlyCoprocessorsLoaded(this.conf, CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 3501 3502 CoprocessorConfigurationUtil.maybeUpdateCoprocessors(newConf, originalIsReadOnlyEnabled, 3503 this.rsHost, CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, false, this.toString(), 3504 conf -> { 3505 this.rsHost = new RegionServerCoprocessorHost(this, conf); 3506 CoprocessorConfigurationUtil.updateCoprocessorListInConf(this.conf, conf, 3507 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 3508 }); 3509 } 3510 3511 @Override 3512 public MetricsRegionServer getMetrics() { 3513 return metricsRegionServer; 3514 } 3515 3516 @Override 3517 public SecureBulkLoadManager getSecureBulkLoadManager() { 3518 return this.secureBulkLoadManager; 3519 } 3520 3521 @Override 3522 public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description, 3523 final Abortable abort) { 3524 final LockServiceClient client = 3525 new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator()); 3526 return client.regionLock(regionInfo, description, abort); 3527 } 3528 3529 @Override 3530 public void unassign(byte[] regionName) throws IOException { 3531 FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false)); 3532 } 3533 3534 @Override 3535 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 3536 return this.rsSpaceQuotaManager; 3537 } 3538 3539 @Override 3540 public boolean reportFileArchivalForQuotas(TableName tableName, 3541 Collection<Entry<String, Long>> archivedFiles) { 3542 if (TEST_SKIP_REPORTING_TRANSITION) { 3543 return false; 3544 } 3545 RegionServerStatusService.BlockingInterface rss = rssStub; 3546 if (rss == null || rsSpaceQuotaManager == null) { 3547 // the current server could be stopping. 3548 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 3549 return false; 3550 } 3551 try { 3552 RegionServerStatusProtos.FileArchiveNotificationRequest request = 3553 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 3554 rss.reportFileArchival(null, request); 3555 } catch (ServiceException se) { 3556 IOException ioe = ProtobufUtil.getRemoteException(se); 3557 if (ioe instanceof PleaseHoldException) { 3558 if (LOG.isTraceEnabled()) { 3559 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 3560 + " This will be retried.", ioe); 3561 } 3562 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 3563 return false; 3564 } 3565 if (rssStub == rss) { 3566 rssStub = null; 3567 } 3568 // re-create the stub if we failed to report the archival 3569 createRegionServerStatusStub(true); 3570 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 3571 return false; 3572 } 3573 return true; 3574 } 3575 3576 void executeProcedure(long procId, long initiatingMasterActiveTime, 3577 RSProcedureCallable callable) { 3578 executorService 3579 .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable)); 3580 } 3581 3582 public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, Throwable error, 3583 byte[] procResultData) { 3584 procedureResultReporter.complete(procId, initiatingMasterActiveTime, error, procResultData); 3585 } 3586 3587 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 3588 RegionServerStatusService.BlockingInterface rss; 3589 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 3590 for (;;) { 3591 rss = rssStub; 3592 if (rss != null) { 3593 break; 3594 } 3595 createRegionServerStatusStub(); 3596 } 3597 try { 3598 rss.reportProcedureDone(null, request); 3599 } catch (ServiceException se) { 3600 if (rssStub == rss) { 3601 rssStub = null; 3602 } 3603 throw ProtobufUtil.getRemoteException(se); 3604 } 3605 } 3606 3607 /** 3608 * Will ignore the open/close region procedures which already submitted or executed. When master 3609 * had unfinished open/close region procedure and restarted, new active master may send duplicate 3610 * open/close region request to regionserver. The open/close request is submitted to a thread pool 3611 * and execute. So first need a cache for submitted open/close region procedures. After the 3612 * open/close region request executed and report region transition succeed, cache it in executed 3613 * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 3614 * transition succeed, master will not send the open/close region request to regionserver again. 3615 * And we thought that the ongoing duplicate open/close region request should not be delayed more 3616 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See 3617 * HBASE-22404 for more details. 3618 * @param procId the id of the open/close region procedure 3619 * @return true if the procedure can be submitted. 3620 */ 3621 boolean submitRegionProcedure(long procId) { 3622 if (procId == -1) { 3623 return true; 3624 } 3625 // Ignore the region procedures which already submitted. 3626 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 3627 if (previous != null) { 3628 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 3629 return false; 3630 } 3631 // Ignore the region procedures which already executed. 3632 if (executedRegionProcedures.getIfPresent(procId) != null) { 3633 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 3634 return false; 3635 } 3636 return true; 3637 } 3638 3639 /** 3640 * See {@link #submitRegionProcedure(long)}. 3641 * @param procId the id of the open/close region procedure 3642 */ 3643 public void finishRegionProcedure(long procId) { 3644 executedRegionProcedures.put(procId, procId); 3645 submittedRegionProcedures.remove(procId); 3646 } 3647 3648 /** 3649 * Force to terminate region server when abort timeout. 3650 */ 3651 private static class SystemExitWhenAbortTimeout extends TimerTask { 3652 3653 public SystemExitWhenAbortTimeout() { 3654 } 3655 3656 @Override 3657 public void run() { 3658 LOG.warn("Aborting region server timed out, terminating forcibly" 3659 + " and does not wait for any running shutdown hooks or finalizers to finish their work." 3660 + " Thread dump to stdout."); 3661 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 3662 Runtime.getRuntime().halt(1); 3663 } 3664 } 3665 3666 @InterfaceAudience.Private 3667 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 3668 return compactedFileDischarger; 3669 } 3670 3671 /** 3672 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}} 3673 * @return pause time 3674 */ 3675 @InterfaceAudience.Private 3676 public long getRetryPauseTime() { 3677 return this.retryPauseTime; 3678 } 3679 3680 @Override 3681 public Optional<ServerName> getActiveMaster() { 3682 return Optional.ofNullable(masterAddressTracker.getMasterAddress()); 3683 } 3684 3685 @Override 3686 public List<ServerName> getBackupMasters() { 3687 return masterAddressTracker.getBackupMasters(); 3688 } 3689 3690 @Override 3691 public Iterator<ServerName> getBootstrapNodes() { 3692 return bootstrapNodeManager.getBootstrapNodes().iterator(); 3693 } 3694 3695 @Override 3696 public List<HRegionLocation> getMetaLocations() { 3697 return metaRegionLocationCache.getMetaRegionLocations(); 3698 } 3699 3700 @Override 3701 protected NamedQueueRecorder createNamedQueueRecord() { 3702 return NamedQueueRecorder.getInstance(conf); 3703 } 3704 3705 @Override 3706 protected boolean clusterMode() { 3707 // this method will be called in the constructor of super class, so we can not return masterless 3708 // directly here, as it will always be false. 3709 return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 3710 } 3711 3712 @InterfaceAudience.Private 3713 public BrokenStoreFileCleaner getBrokenStoreFileCleaner() { 3714 return brokenStoreFileCleaner; 3715 } 3716 3717 @InterfaceAudience.Private 3718 public RSMobFileCleanerChore getRSMobFileCleanerChore() { 3719 return rsMobFileCleanerChore; 3720 } 3721 3722 RSSnapshotVerifier getRsSnapshotVerifier() { 3723 return rsSnapshotVerifier; 3724 } 3725 3726 @Override 3727 protected void stopChores() { 3728 shutdownChore(nonceManagerChore); 3729 shutdownChore(compactionChecker); 3730 shutdownChore(compactedFileDischarger); 3731 shutdownChore(periodicFlusher); 3732 shutdownChore(healthCheckChore); 3733 shutdownChore(executorStatusChore); 3734 shutdownChore(storefileRefresher); 3735 shutdownChore(fsUtilizationChore); 3736 shutdownChore(namedQueueServiceChore); 3737 shutdownChore(brokenStoreFileCleaner); 3738 shutdownChore(rsMobFileCleanerChore); 3739 shutdownChore(replicationMarkerChore); 3740 } 3741 3742 @Override 3743 public RegionReplicationBufferManager getRegionReplicationBufferManager() { 3744 return regionReplicationBufferManager; 3745 } 3746}