001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
024import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
025
026import io.opentelemetry.api.trace.Span;
027import io.opentelemetry.api.trace.StatusCode;
028import io.opentelemetry.context.Scope;
029import java.io.IOException;
030import java.io.PrintWriter;
031import java.lang.management.MemoryUsage;
032import java.lang.reflect.Constructor;
033import java.net.InetSocketAddress;
034import java.time.Duration;
035import java.util.ArrayList;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.Comparator;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Map;
043import java.util.Map.Entry;
044import java.util.Objects;
045import java.util.Optional;
046import java.util.Set;
047import java.util.SortedMap;
048import java.util.Timer;
049import java.util.TimerTask;
050import java.util.TreeMap;
051import java.util.TreeSet;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentMap;
054import java.util.concurrent.ConcurrentSkipListMap;
055import java.util.concurrent.ThreadLocalRandom;
056import java.util.concurrent.TimeUnit;
057import java.util.concurrent.atomic.AtomicBoolean;
058import java.util.concurrent.locks.ReentrantReadWriteLock;
059import java.util.stream.Collectors;
060import javax.management.MalformedObjectNameException;
061import javax.servlet.http.HttpServlet;
062import org.apache.commons.lang3.StringUtils;
063import org.apache.hadoop.conf.Configuration;
064import org.apache.hadoop.fs.FileSystem;
065import org.apache.hadoop.fs.Path;
066import org.apache.hadoop.hbase.Abortable;
067import org.apache.hadoop.hbase.CacheEvictionStats;
068import org.apache.hadoop.hbase.CallQueueTooBigException;
069import org.apache.hadoop.hbase.ClockOutOfSyncException;
070import org.apache.hadoop.hbase.DoNotRetryIOException;
071import org.apache.hadoop.hbase.ExecutorStatusChore;
072import org.apache.hadoop.hbase.HBaseConfiguration;
073import org.apache.hadoop.hbase.HBaseInterfaceAudience;
074import org.apache.hadoop.hbase.HBaseServerBase;
075import org.apache.hadoop.hbase.HConstants;
076import org.apache.hadoop.hbase.HDFSBlocksDistribution;
077import org.apache.hadoop.hbase.HRegionLocation;
078import org.apache.hadoop.hbase.HealthCheckChore;
079import org.apache.hadoop.hbase.MetaTableAccessor;
080import org.apache.hadoop.hbase.NotServingRegionException;
081import org.apache.hadoop.hbase.PleaseHoldException;
082import org.apache.hadoop.hbase.ScheduledChore;
083import org.apache.hadoop.hbase.ServerName;
084import org.apache.hadoop.hbase.Stoppable;
085import org.apache.hadoop.hbase.TableName;
086import org.apache.hadoop.hbase.YouAreDeadException;
087import org.apache.hadoop.hbase.ZNodeClearer;
088import org.apache.hadoop.hbase.client.ConnectionUtils;
089import org.apache.hadoop.hbase.client.RegionInfo;
090import org.apache.hadoop.hbase.client.RegionInfoBuilder;
091import org.apache.hadoop.hbase.client.locking.EntityLock;
092import org.apache.hadoop.hbase.client.locking.LockServiceClient;
093import org.apache.hadoop.hbase.conf.ConfigurationManager;
094import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
095import org.apache.hadoop.hbase.exceptions.RegionMovedException;
096import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
097import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
098import org.apache.hadoop.hbase.executor.ExecutorType;
099import org.apache.hadoop.hbase.http.InfoServer;
100import org.apache.hadoop.hbase.io.hfile.BlockCache;
101import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
102import org.apache.hadoop.hbase.io.hfile.HFile;
103import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
104import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
105import org.apache.hadoop.hbase.ipc.RpcClient;
106import org.apache.hadoop.hbase.ipc.RpcServer;
107import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
108import org.apache.hadoop.hbase.ipc.ServerRpcController;
109import org.apache.hadoop.hbase.log.HBaseMarkers;
110import org.apache.hadoop.hbase.mob.MobFileCache;
111import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
112import org.apache.hadoop.hbase.monitoring.TaskMonitor;
113import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
114import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
115import org.apache.hadoop.hbase.net.Address;
116import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
117import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
118import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
119import org.apache.hadoop.hbase.quotas.QuotaUtil;
120import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
121import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
122import org.apache.hadoop.hbase.quotas.RegionSize;
123import org.apache.hadoop.hbase.quotas.RegionSizeStore;
124import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
125import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
126import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
127import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
128import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
129import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
130import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
131import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
132import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
133import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
134import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
135import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
136import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
137import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
138import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
139import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
140import org.apache.hadoop.hbase.security.SecurityConstants;
141import org.apache.hadoop.hbase.security.Superusers;
142import org.apache.hadoop.hbase.security.User;
143import org.apache.hadoop.hbase.security.UserProvider;
144import org.apache.hadoop.hbase.trace.TraceUtil;
145import org.apache.hadoop.hbase.util.Bytes;
146import org.apache.hadoop.hbase.util.CompressionTest;
147import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
148import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
149import org.apache.hadoop.hbase.util.FSUtils;
150import org.apache.hadoop.hbase.util.FutureUtils;
151import org.apache.hadoop.hbase.util.JvmPauseMonitor;
152import org.apache.hadoop.hbase.util.Pair;
153import org.apache.hadoop.hbase.util.RetryCounter;
154import org.apache.hadoop.hbase.util.RetryCounterFactory;
155import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
156import org.apache.hadoop.hbase.util.Threads;
157import org.apache.hadoop.hbase.util.VersionInfo;
158import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
159import org.apache.hadoop.hbase.wal.WAL;
160import org.apache.hadoop.hbase.wal.WALFactory;
161import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
162import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
163import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
164import org.apache.hadoop.hbase.zookeeper.ZKUtil;
165import org.apache.hadoop.ipc.RemoteException;
166import org.apache.hadoop.util.ReflectionUtils;
167import org.apache.yetus.audience.InterfaceAudience;
168import org.apache.zookeeper.KeeperException;
169import org.slf4j.Logger;
170import org.slf4j.LoggerFactory;
171
172import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
173import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
174import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
175import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
176import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
177import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
178import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
179import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
180import org.apache.hbase.thirdparty.com.google.protobuf.Message;
181import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
182import org.apache.hbase.thirdparty.com.google.protobuf.Service;
183import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
184import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
185import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
186
187import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
188import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
218
219/**
220 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
221 * are many HRegionServers in a single HBase deployment.
222 */
223@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
224@SuppressWarnings({ "deprecation" })
225public class HRegionServer extends HBaseServerBase<RSRpcServices>
226  implements RegionServerServices, LastSequenceId {
227
228  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
229
230  /**
231   * For testing only! Set to true to skip notifying region assignment to master .
232   */
233  @InterfaceAudience.Private
234  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
235  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
236
237  /**
238   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
239   * region action in progress false - if close region action in progress
240   */
241  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
242    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
243
244  /**
245   * Used to cache the open/close region procedures which already submitted. See
246   * {@link #submitRegionProcedure(long)}.
247   */
248  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
249  /**
250   * Used to cache the open/close region procedures which already executed. See
251   * {@link #submitRegionProcedure(long)}.
252   */
253  private final Cache<Long, Long> executedRegionProcedures =
254    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
255
256  /**
257   * Used to cache the moved-out regions
258   */
259  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
260    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
261
262  private MemStoreFlusher cacheFlusher;
263
264  private HeapMemoryManager hMemManager;
265
266  // Replication services. If no replication, this handler will be null.
267  private ReplicationSourceService replicationSourceHandler;
268  private ReplicationSinkService replicationSinkHandler;
269  private boolean sameReplicationSourceAndSink;
270
271  // Compactions
272  private CompactSplit compactSplitThread;
273
274  /**
275   * Map of regions currently being served by this region server. Key is the encoded region name.
276   * All access should be synchronized.
277   */
278  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
279  /**
280   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
281   * need to be a ConcurrentHashMap?
282   */
283  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
284
285  /**
286   * Map of encoded region names to the DataNode locations they should be hosted on We store the
287   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
288   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
289   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
290   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
291   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
292   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
293   * will be fresh when we need it.
294   */
295  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
296
297  private LeaseManager leaseManager;
298
299  private volatile boolean dataFsOk;
300
301  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
302  // Default abort timeout is 1200 seconds for safe
303  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
304  // Will run this task when abort timeout
305  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
306
307  // A state before we go into stopped state. At this stage we're closing user
308  // space regions.
309  private boolean stopping = false;
310  private volatile boolean killed = false;
311
312  private final int threadWakeFrequency;
313
314  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
315  private final int compactionCheckFrequency;
316  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
317  private final int flushCheckFrequency;
318
319  // Stub to do region server status calls against the master.
320  private volatile RegionServerStatusService.BlockingInterface rssStub;
321  private volatile LockService.BlockingInterface lockStub;
322  // RPC client. Used to make the stub above that does region server status checking.
323  private RpcClient rpcClient;
324
325  private UncaughtExceptionHandler uncaughtExceptionHandler;
326
327  private JvmPauseMonitor pauseMonitor;
328
329  private RSSnapshotVerifier rsSnapshotVerifier;
330
331  /** region server process name */
332  public static final String REGIONSERVER = "regionserver";
333
334  private MetricsRegionServer metricsRegionServer;
335  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
336
337  /**
338   * Check for compactions requests.
339   */
340  private ScheduledChore compactionChecker;
341
342  /**
343   * Check for flushes
344   */
345  private ScheduledChore periodicFlusher;
346
347  private volatile WALFactory walFactory;
348
349  private LogRoller walRoller;
350
351  // A thread which calls reportProcedureDone
352  private RemoteProcedureResultReporter procedureResultReporter;
353
354  // flag set after we're done setting up server threads
355  final AtomicBoolean online = new AtomicBoolean(false);
356
357  // master address tracker
358  private final MasterAddressTracker masterAddressTracker;
359
360  // Log Splitting Worker
361  private SplitLogWorker splitLogWorker;
362
363  private final int shortOperationTimeout;
364
365  // Time to pause if master says 'please hold'
366  private final long retryPauseTime;
367
368  private final RegionServerAccounting regionServerAccounting;
369
370  private SlowLogTableOpsChore slowLogTableOpsChore = null;
371
372  // Block cache
373  private BlockCache blockCache;
374  // The cache for mob files
375  private MobFileCache mobFileCache;
376
377  /** The health check chore. */
378  private HealthCheckChore healthCheckChore;
379
380  /** The Executor status collect chore. */
381  private ExecutorStatusChore executorStatusChore;
382
383  /** The nonce manager chore. */
384  private ScheduledChore nonceManagerChore;
385
386  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
387
388  /**
389   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
390   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
391   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
392   */
393  @Deprecated
394  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
395  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
396    "hbase.regionserver.hostname.disable.master.reversedns";
397
398  /**
399   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
400   * Exception will be thrown if both are used.
401   */
402  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
403  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
404    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
405
406  /**
407   * Unique identifier for the cluster we are a part of.
408   */
409  private String clusterId;
410
411  // chore for refreshing store files for secondary regions
412  private StorefileRefresherChore storefileRefresher;
413
414  private volatile RegionServerCoprocessorHost rsHost;
415
416  private RegionServerProcedureManagerHost rspmHost;
417
418  private RegionServerRpcQuotaManager rsQuotaManager;
419  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
420
421  /**
422   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
423   * case where client doesn't receive the response from a successful operation and retries. We
424   * track the successful ops for some time via a nonce sent by client and handle duplicate
425   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
426   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
427   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
428   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
429   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
430   * recovery during normal region move, so nonces will not be transfered. We can have separate
431   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
432   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
433   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
434   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
435   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
436   * recovered during move.
437   */
438  final ServerNonceManager nonceManager;
439
440  private BrokenStoreFileCleaner brokenStoreFileCleaner;
441
442  private RSMobFileCleanerChore rsMobFileCleanerChore;
443
444  @InterfaceAudience.Private
445  CompactedHFilesDischarger compactedFileDischarger;
446
447  private volatile ThroughputController flushThroughputController;
448
449  private SecureBulkLoadManager secureBulkLoadManager;
450
451  private FileSystemUtilizationChore fsUtilizationChore;
452
453  private BootstrapNodeManager bootstrapNodeManager;
454
455  /**
456   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
457   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
458   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
459   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
460   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
461   */
462  private final boolean masterless;
463  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
464
465  /** regionserver codec list **/
466  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
467
468  // A timer to shutdown the process if abort takes too long
469  private Timer abortMonitor;
470
471  private RegionReplicationBufferManager regionReplicationBufferManager;
472
473  /**
474   * Starts a HRegionServer at the default location.
475   * <p/>
476   * Don't start any services or managers in here in the Constructor. Defer till after we register
477   * with the Master as much as possible. See {@link #startServices}.
478   */
479  public HRegionServer(final Configuration conf) throws IOException {
480    super(conf, "RegionServer"); // thread name
481    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
482    try (Scope ignored = span.makeCurrent()) {
483      this.dataFsOk = true;
484      this.masterless = !clusterMode();
485      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
486      HFile.checkHFileVersion(this.conf);
487      checkCodecs(this.conf);
488      FSUtils.setupShortCircuitRead(this.conf);
489
490      // Disable usage of meta replicas in the regionserver
491      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
492      // Config'ed params
493      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
494      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
495      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
496
497      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
498      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
499
500      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
501        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
502
503      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
504        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
505
506      regionServerAccounting = new RegionServerAccounting(conf);
507
508      blockCache = BlockCacheFactory.createBlockCache(conf);
509      mobFileCache = new MobFileCache(conf);
510
511      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
512
513      uncaughtExceptionHandler =
514        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
515
516      // If no master in cluster, skip trying to track one or look for a cluster status.
517      if (!this.masterless) {
518        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
519        masterAddressTracker.start();
520      } else {
521        masterAddressTracker = null;
522      }
523      this.rpcServices.start(zooKeeper);
524      span.setStatus(StatusCode.OK);
525    } catch (Throwable t) {
526      // Make sure we log the exception. HRegionServer is often started via reflection and the
527      // cause of failed startup is lost.
528      TraceUtil.setError(span, t);
529      LOG.error("Failed construction RegionServer", t);
530      throw t;
531    } finally {
532      span.end();
533    }
534  }
535
536  // HMaster should override this method to load the specific config for master
537  @Override
538  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
539    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
540    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
541      if (!StringUtils.isBlank(hostname)) {
542        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
543          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
544          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
545          + UNSAFE_RS_HOSTNAME_KEY + " is used";
546        throw new IOException(msg);
547      } else {
548        return rpcServices.getSocketAddress().getHostName();
549      }
550    } else {
551      return hostname;
552    }
553  }
554
555  @Override
556  protected void login(UserProvider user, String host) throws IOException {
557    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
558      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
559  }
560
561  @Override
562  protected String getProcessName() {
563    return REGIONSERVER;
564  }
565
566  @Override
567  protected boolean canCreateBaseZNode() {
568    return !clusterMode();
569  }
570
571  @Override
572  protected boolean canUpdateTableDescriptor() {
573    return false;
574  }
575
576  @Override
577  protected boolean cacheTableDescriptor() {
578    return false;
579  }
580
581  protected RSRpcServices createRpcServices() throws IOException {
582    return new RSRpcServices(this);
583  }
584
585  @Override
586  protected void configureInfoServer(InfoServer infoServer) {
587    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
588    infoServer.setAttribute(REGIONSERVER, this);
589  }
590
591  @Override
592  protected Class<? extends HttpServlet> getDumpServlet() {
593    return RSDumpServlet.class;
594  }
595
596  /**
597   * Used by {@link RSDumpServlet} to generate debugging information.
598   */
599  public void dumpRowLocks(final PrintWriter out) {
600    StringBuilder sb = new StringBuilder();
601    for (HRegion region : getRegions()) {
602      if (region.getLockedRows().size() > 0) {
603        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
604          sb.setLength(0);
605          sb.append(region.getTableDescriptor().getTableName()).append(",")
606            .append(region.getRegionInfo().getEncodedName()).append(",");
607          sb.append(rowLockContext.toString());
608          out.println(sb);
609        }
610      }
611    }
612  }
613
614  @Override
615  public boolean registerService(Service instance) {
616    // No stacking of instances is allowed for a single executorService name
617    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
618    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
619    if (coprocessorServiceHandlers.containsKey(serviceName)) {
620      LOG.error("Coprocessor executorService " + serviceName
621        + " already registered, rejecting request from " + instance);
622      return false;
623    }
624
625    coprocessorServiceHandlers.put(serviceName, instance);
626    if (LOG.isDebugEnabled()) {
627      LOG.debug(
628        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
629    }
630    return true;
631  }
632
633  /**
634   * Run test on configured codecs to make sure supporting libs are in place.
635   */
636  private static void checkCodecs(final Configuration c) throws IOException {
637    // check to see if the codec list is available:
638    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
639    if (codecs == null) {
640      return;
641    }
642    for (String codec : codecs) {
643      if (!CompressionTest.testCompression(codec)) {
644        throw new IOException(
645          "Compression codec " + codec + " not supported, aborting RS construction");
646      }
647    }
648  }
649
650  public String getClusterId() {
651    return this.clusterId;
652  }
653
654  /**
655   * All initialization needed before we go register with Master.<br>
656   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
657   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
658   */
659  private void preRegistrationInitialization() {
660    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
661    try (Scope ignored = span.makeCurrent()) {
662      initializeZooKeeper();
663      setupClusterConnection();
664      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
665      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
666      // Setup RPC client for master communication
667      this.rpcClient = asyncClusterConnection.getRpcClient();
668      span.setStatus(StatusCode.OK);
669    } catch (Throwable t) {
670      // Call stop if error or process will stick around for ever since server
671      // puts up non-daemon threads.
672      TraceUtil.setError(span, t);
673      this.rpcServices.stop();
674      abort("Initialization of RS failed.  Hence aborting RS.", t);
675    } finally {
676      span.end();
677    }
678  }
679
680  /**
681   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
682   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
683   * <p>
684   * Finally open long-living server short-circuit connection.
685   */
686  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
687      justification = "cluster Id znode read would give us correct response")
688  private void initializeZooKeeper() throws IOException, InterruptedException {
689    // Nothing to do in here if no Master in the mix.
690    if (this.masterless) {
691      return;
692    }
693
694    // Create the master address tracker, register with zk, and start it. Then
695    // block until a master is available. No point in starting up if no master
696    // running.
697    blockAndCheckIfStopped(this.masterAddressTracker);
698
699    // Wait on cluster being up. Master will set this flag up in zookeeper
700    // when ready.
701    blockAndCheckIfStopped(this.clusterStatusTracker);
702
703    // If we are HMaster then the cluster id should have already been set.
704    if (clusterId == null) {
705      // Retrieve clusterId
706      // Since cluster status is now up
707      // ID should have already been set by HMaster
708      try {
709        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
710        if (clusterId == null) {
711          this.abort("Cluster ID has not been set");
712        }
713        LOG.info("ClusterId : " + clusterId);
714      } catch (KeeperException e) {
715        this.abort("Failed to retrieve Cluster ID", e);
716      }
717    }
718
719    if (isStopped() || isAborted()) {
720      return; // No need for further initialization
721    }
722
723    // watch for snapshots and other procedures
724    try {
725      rspmHost = new RegionServerProcedureManagerHost();
726      rspmHost.loadProcedures(conf);
727      rspmHost.initialize(this);
728    } catch (KeeperException e) {
729      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
730    }
731  }
732
733  /**
734   * Utilty method to wait indefinitely on a znode availability while checking if the region server
735   * is shut down
736   * @param tracker znode tracker to use
737   * @throws IOException          any IO exception, plus if the RS is stopped
738   * @throws InterruptedException if the waiting thread is interrupted
739   */
740  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
741    throws IOException, InterruptedException {
742    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
743      if (this.stopped) {
744        throw new IOException("Received the shutdown message while waiting.");
745      }
746    }
747  }
748
749  /** Returns True if the cluster is up. */
750  @Override
751  public boolean isClusterUp() {
752    return this.masterless
753      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
754  }
755
756  /**
757   * The HRegionServer sticks in this loop until closed.
758   */
759  @Override
760  public void run() {
761    if (isStopped()) {
762      LOG.info("Skipping run; stopped");
763      return;
764    }
765    try {
766      // Do pre-registration initializations; zookeeper, lease threads, etc.
767      preRegistrationInitialization();
768    } catch (Throwable e) {
769      abort("Fatal exception during initialization", e);
770    }
771
772    try {
773      if (!isStopped() && !isAborted()) {
774        installShutdownHook();
775        // Initialize the RegionServerCoprocessorHost now that our ephemeral
776        // node was created, in case any coprocessors want to use ZooKeeper
777        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
778
779        // Try and register with the Master; tell it we are here. Break if server is stopped or
780        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
781        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
782        // come up.
783        LOG.debug("About to register with Master.");
784        TraceUtil.trace(() -> {
785          RetryCounterFactory rcf =
786            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
787          RetryCounter rc = rcf.create();
788          while (keepLooping()) {
789            RegionServerStartupResponse w = reportForDuty();
790            if (w == null) {
791              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
792              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
793              this.sleeper.sleep(sleepTime);
794            } else {
795              handleReportForDutyResponse(w);
796              break;
797            }
798          }
799        }, "HRegionServer.registerWithMaster");
800      }
801
802      if (!isStopped() && isHealthy()) {
803        TraceUtil.trace(() -> {
804          // start the snapshot handler and other procedure handlers,
805          // since the server is ready to run
806          if (this.rspmHost != null) {
807            this.rspmHost.start();
808          }
809          // Start the Quota Manager
810          if (this.rsQuotaManager != null) {
811            rsQuotaManager.start(getRpcServer().getScheduler());
812          }
813          if (this.rsSpaceQuotaManager != null) {
814            this.rsSpaceQuotaManager.start();
815          }
816        }, "HRegionServer.startup");
817      }
818
819      // We registered with the Master. Go into run mode.
820      long lastMsg = EnvironmentEdgeManager.currentTime();
821      long oldRequestCount = -1;
822      // The main run loop.
823      while (!isStopped() && isHealthy()) {
824        if (!isClusterUp()) {
825          if (onlineRegions.isEmpty()) {
826            stop("Exiting; cluster shutdown set and not carrying any regions");
827          } else if (!this.stopping) {
828            this.stopping = true;
829            LOG.info("Closing user regions");
830            closeUserRegions(isAborted());
831          } else {
832            boolean allUserRegionsOffline = areAllUserRegionsOffline();
833            if (allUserRegionsOffline) {
834              // Set stopped if no more write requests tp meta tables
835              // since last time we went around the loop. Any open
836              // meta regions will be closed on our way out.
837              if (oldRequestCount == getWriteRequestCount()) {
838                stop("Stopped; only catalog regions remaining online");
839                break;
840              }
841              oldRequestCount = getWriteRequestCount();
842            } else {
843              // Make sure all regions have been closed -- some regions may
844              // have not got it because we were splitting at the time of
845              // the call to closeUserRegions.
846              closeUserRegions(this.abortRequested.get());
847            }
848            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
849          }
850        }
851        long now = EnvironmentEdgeManager.currentTime();
852        if ((now - lastMsg) >= msgInterval) {
853          tryRegionServerReport(lastMsg, now);
854          lastMsg = EnvironmentEdgeManager.currentTime();
855        }
856        if (!isStopped() && !isAborted()) {
857          this.sleeper.sleep();
858        }
859      } // for
860    } catch (Throwable t) {
861      if (!rpcServices.checkOOME(t)) {
862        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
863        abort(prefix + t.getMessage(), t);
864      }
865    }
866
867    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
868    try (Scope ignored = span.makeCurrent()) {
869      if (this.leaseManager != null) {
870        this.leaseManager.closeAfterLeasesExpire();
871      }
872      if (this.splitLogWorker != null) {
873        splitLogWorker.stop();
874      }
875      stopInfoServer();
876      // Send cache a shutdown.
877      if (blockCache != null) {
878        blockCache.shutdown();
879      }
880      if (mobFileCache != null) {
881        mobFileCache.shutdown();
882      }
883
884      // Send interrupts to wake up threads if sleeping so they notice shutdown.
885      // TODO: Should we check they are alive? If OOME could have exited already
886      if (this.hMemManager != null) {
887        this.hMemManager.stop();
888      }
889      if (this.cacheFlusher != null) {
890        this.cacheFlusher.interruptIfNecessary();
891      }
892      if (this.compactSplitThread != null) {
893        this.compactSplitThread.interruptIfNecessary();
894      }
895
896      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
897      if (rspmHost != null) {
898        rspmHost.stop(this.abortRequested.get() || this.killed);
899      }
900
901      if (this.killed) {
902        // Just skip out w/o closing regions. Used when testing.
903      } else if (abortRequested.get()) {
904        if (this.dataFsOk) {
905          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
906        }
907        LOG.info("aborting server " + this.serverName);
908      } else {
909        closeUserRegions(abortRequested.get());
910        LOG.info("stopping server " + this.serverName);
911      }
912      regionReplicationBufferManager.stop();
913      closeClusterConnection();
914      // Closing the compactSplit thread before closing meta regions
915      if (!this.killed && containsMetaTableRegions()) {
916        if (!abortRequested.get() || this.dataFsOk) {
917          if (this.compactSplitThread != null) {
918            this.compactSplitThread.join();
919            this.compactSplitThread = null;
920          }
921          closeMetaTableRegions(abortRequested.get());
922        }
923      }
924
925      if (!this.killed && this.dataFsOk) {
926        waitOnAllRegionsToClose(abortRequested.get());
927        LOG.info("stopping server " + this.serverName + "; all regions closed.");
928      }
929
930      // Stop the quota manager
931      if (rsQuotaManager != null) {
932        rsQuotaManager.stop();
933      }
934      if (rsSpaceQuotaManager != null) {
935        rsSpaceQuotaManager.stop();
936        rsSpaceQuotaManager = null;
937      }
938
939      // flag may be changed when closing regions throws exception.
940      if (this.dataFsOk) {
941        shutdownWAL(!abortRequested.get());
942      }
943
944      // Make sure the proxy is down.
945      if (this.rssStub != null) {
946        this.rssStub = null;
947      }
948      if (this.lockStub != null) {
949        this.lockStub = null;
950      }
951      if (this.rpcClient != null) {
952        this.rpcClient.close();
953      }
954      if (this.leaseManager != null) {
955        this.leaseManager.close();
956      }
957      if (this.pauseMonitor != null) {
958        this.pauseMonitor.stop();
959      }
960
961      if (!killed) {
962        stopServiceThreads();
963      }
964
965      if (this.rpcServices != null) {
966        this.rpcServices.stop();
967      }
968
969      try {
970        deleteMyEphemeralNode();
971      } catch (KeeperException.NoNodeException nn) {
972        // pass
973      } catch (KeeperException e) {
974        LOG.warn("Failed deleting my ephemeral node", e);
975      }
976      // We may have failed to delete the znode at the previous step, but
977      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
978      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
979
980      closeZooKeeper();
981      closeTableDescriptors();
982      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
983      span.setStatus(StatusCode.OK);
984    } finally {
985      span.end();
986    }
987  }
988
989  private boolean containsMetaTableRegions() {
990    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
991  }
992
993  private boolean areAllUserRegionsOffline() {
994    if (getNumberOfOnlineRegions() > 2) {
995      return false;
996    }
997    boolean allUserRegionsOffline = true;
998    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
999      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1000        allUserRegionsOffline = false;
1001        break;
1002      }
1003    }
1004    return allUserRegionsOffline;
1005  }
1006
1007  /** Returns Current write count for all online regions. */
1008  private long getWriteRequestCount() {
1009    long writeCount = 0;
1010    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1011      writeCount += e.getValue().getWriteRequestsCount();
1012    }
1013    return writeCount;
1014  }
1015
1016  @InterfaceAudience.Private
1017  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1018    throws IOException {
1019    RegionServerStatusService.BlockingInterface rss = rssStub;
1020    if (rss == null) {
1021      // the current server could be stopping.
1022      return;
1023    }
1024    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1025    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1026    try (Scope ignored = span.makeCurrent()) {
1027      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1028      request.setServer(ProtobufUtil.toServerName(this.serverName));
1029      request.setLoad(sl);
1030      rss.regionServerReport(null, request.build());
1031      span.setStatus(StatusCode.OK);
1032    } catch (ServiceException se) {
1033      IOException ioe = ProtobufUtil.getRemoteException(se);
1034      if (ioe instanceof YouAreDeadException) {
1035        // This will be caught and handled as a fatal error in run()
1036        TraceUtil.setError(span, ioe);
1037        throw ioe;
1038      }
1039      if (rssStub == rss) {
1040        rssStub = null;
1041      }
1042      TraceUtil.setError(span, se);
1043      // Couldn't connect to the master, get location from zk and reconnect
1044      // Method blocks until new master is found or we are stopped
1045      createRegionServerStatusStub(true);
1046    } finally {
1047      span.end();
1048    }
1049  }
1050
1051  /**
1052   * Reports the given map of Regions and their size on the filesystem to the active Master.
1053   * @param regionSizeStore The store containing region sizes
1054   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1055   */
1056  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1057    RegionServerStatusService.BlockingInterface rss = rssStub;
1058    if (rss == null) {
1059      // the current server could be stopping.
1060      LOG.trace("Skipping Region size report to HMaster as stub is null");
1061      return true;
1062    }
1063    try {
1064      buildReportAndSend(rss, regionSizeStore);
1065    } catch (ServiceException se) {
1066      IOException ioe = ProtobufUtil.getRemoteException(se);
1067      if (ioe instanceof PleaseHoldException) {
1068        LOG.trace("Failed to report region sizes to Master because it is initializing."
1069          + " This will be retried.", ioe);
1070        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1071        return true;
1072      }
1073      if (rssStub == rss) {
1074        rssStub = null;
1075      }
1076      createRegionServerStatusStub(true);
1077      if (ioe instanceof DoNotRetryIOException) {
1078        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1079        if (doNotRetryEx.getCause() != null) {
1080          Throwable t = doNotRetryEx.getCause();
1081          if (t instanceof UnsupportedOperationException) {
1082            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1083            return false;
1084          }
1085        }
1086      }
1087      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1088    }
1089    return true;
1090  }
1091
1092  /**
1093   * Builds the region size report and sends it to the master. Upon successful sending of the
1094   * report, the region sizes that were sent are marked as sent.
1095   * @param rss             The stub to send to the Master
1096   * @param regionSizeStore The store containing region sizes
1097   */
1098  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1099    RegionSizeStore regionSizeStore) throws ServiceException {
1100    RegionSpaceUseReportRequest request =
1101      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1102    rss.reportRegionSpaceUse(null, request);
1103    // Record the number of size reports sent
1104    if (metricsRegionServer != null) {
1105      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1106    }
1107  }
1108
1109  /**
1110   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1111   * @param regionSizes The size in bytes of regions
1112   * @return The corresponding protocol buffer message.
1113   */
1114  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1115    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1116    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1117      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1118    }
1119    return request.build();
1120  }
1121
1122  /**
1123   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1124   * message.
1125   * @param regionInfo  The RegionInfo
1126   * @param sizeInBytes The size in bytes of the Region
1127   * @return The protocol buffer
1128   */
1129  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1130    return RegionSpaceUse.newBuilder()
1131      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1132      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1133  }
1134
1135  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1136    throws IOException {
1137    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1138    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1139    // the wrapper to compute those numbers in one place.
1140    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1141    // Instead they should be stored in an HBase table so that external visibility into HBase is
1142    // improved; Additionally the load balancer will be able to take advantage of a more complete
1143    // history.
1144    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1145    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1146    long usedMemory = -1L;
1147    long maxMemory = -1L;
1148    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1149    if (usage != null) {
1150      usedMemory = usage.getUsed();
1151      maxMemory = usage.getMax();
1152    }
1153
1154    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1155    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1156    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1157    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1158    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1159    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1160    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1161    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1162    Builder coprocessorBuilder = Coprocessor.newBuilder();
1163    for (String coprocessor : coprocessors) {
1164      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1165    }
1166    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1167    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1168    for (HRegion region : regions) {
1169      if (region.getCoprocessorHost() != null) {
1170        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1171        for (String regionCoprocessor : regionCoprocessors) {
1172          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1173        }
1174      }
1175      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1176      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1177        .getCoprocessors()) {
1178        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1179      }
1180    }
1181    serverLoad.setReportStartTime(reportStartTime);
1182    serverLoad.setReportEndTime(reportEndTime);
1183    if (this.infoServer != null) {
1184      serverLoad.setInfoServerPort(this.infoServer.getPort());
1185    } else {
1186      serverLoad.setInfoServerPort(-1);
1187    }
1188    MetricsUserAggregateSource userSource =
1189      metricsRegionServer.getMetricsUserAggregate().getSource();
1190    if (userSource != null) {
1191      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1192      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1193        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1194      }
1195    }
1196
1197    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1198      // always refresh first to get the latest value
1199      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1200      if (rLoad != null) {
1201        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1202        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1203          .getReplicationLoadSourceEntries()) {
1204          serverLoad.addReplLoadSource(rLS);
1205        }
1206      }
1207    } else {
1208      if (replicationSourceHandler != null) {
1209        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1210        if (rLoad != null) {
1211          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1212            .getReplicationLoadSourceEntries()) {
1213            serverLoad.addReplLoadSource(rLS);
1214          }
1215        }
1216      }
1217      if (replicationSinkHandler != null) {
1218        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1219        if (rLoad != null) {
1220          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1221        }
1222      }
1223    }
1224
1225    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1226      .newBuilder().setDescription(task.getDescription())
1227      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1228      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1229      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1230
1231    return serverLoad.build();
1232  }
1233
1234  private String getOnlineRegionsAsPrintableString() {
1235    StringBuilder sb = new StringBuilder();
1236    for (Region r : this.onlineRegions.values()) {
1237      if (sb.length() > 0) {
1238        sb.append(", ");
1239      }
1240      sb.append(r.getRegionInfo().getEncodedName());
1241    }
1242    return sb.toString();
1243  }
1244
1245  /**
1246   * Wait on regions close.
1247   */
1248  private void waitOnAllRegionsToClose(final boolean abort) {
1249    // Wait till all regions are closed before going out.
1250    int lastCount = -1;
1251    long previousLogTime = 0;
1252    Set<String> closedRegions = new HashSet<>();
1253    boolean interrupted = false;
1254    try {
1255      while (!onlineRegions.isEmpty()) {
1256        int count = getNumberOfOnlineRegions();
1257        // Only print a message if the count of regions has changed.
1258        if (count != lastCount) {
1259          // Log every second at most
1260          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1261            previousLogTime = EnvironmentEdgeManager.currentTime();
1262            lastCount = count;
1263            LOG.info("Waiting on " + count + " regions to close");
1264            // Only print out regions still closing if a small number else will
1265            // swamp the log.
1266            if (count < 10 && LOG.isDebugEnabled()) {
1267              LOG.debug("Online Regions=" + this.onlineRegions);
1268            }
1269          }
1270        }
1271        // Ensure all user regions have been sent a close. Use this to
1272        // protect against the case where an open comes in after we start the
1273        // iterator of onlineRegions to close all user regions.
1274        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1275          RegionInfo hri = e.getValue().getRegionInfo();
1276          if (
1277            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1278              && !closedRegions.contains(hri.getEncodedName())
1279          ) {
1280            closedRegions.add(hri.getEncodedName());
1281            // Don't update zk with this close transition; pass false.
1282            closeRegionIgnoreErrors(hri, abort);
1283          }
1284        }
1285        // No regions in RIT, we could stop waiting now.
1286        if (this.regionsInTransitionInRS.isEmpty()) {
1287          if (!onlineRegions.isEmpty()) {
1288            LOG.info("We were exiting though online regions are not empty,"
1289              + " because some regions failed closing");
1290          }
1291          break;
1292        } else {
1293          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1294            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1295        }
1296        if (sleepInterrupted(200)) {
1297          interrupted = true;
1298        }
1299      }
1300    } finally {
1301      if (interrupted) {
1302        Thread.currentThread().interrupt();
1303      }
1304    }
1305  }
1306
1307  private static boolean sleepInterrupted(long millis) {
1308    boolean interrupted = false;
1309    try {
1310      Thread.sleep(millis);
1311    } catch (InterruptedException e) {
1312      LOG.warn("Interrupted while sleeping");
1313      interrupted = true;
1314    }
1315    return interrupted;
1316  }
1317
1318  private void shutdownWAL(final boolean close) {
1319    if (this.walFactory != null) {
1320      try {
1321        if (close) {
1322          walFactory.close();
1323        } else {
1324          walFactory.shutdown();
1325        }
1326      } catch (Throwable e) {
1327        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1328        LOG.error("Shutdown / close of WAL failed: " + e);
1329        LOG.debug("Shutdown / close exception details:", e);
1330      }
1331    }
1332  }
1333
1334  /**
1335   * Run init. Sets up wal and starts up all server threads.
1336   * @param c Extra configuration.
1337   */
1338  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1339    throws IOException {
1340    try {
1341      boolean updateRootDir = false;
1342      for (NameStringPair e : c.getMapEntriesList()) {
1343        String key = e.getName();
1344        // The hostname the master sees us as.
1345        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1346          String hostnameFromMasterPOV = e.getValue();
1347          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1348            rpcServices.getSocketAddress().getPort(), this.startcode);
1349          if (
1350            !StringUtils.isBlank(useThisHostnameInstead)
1351              && !hostnameFromMasterPOV.equals(useThisHostnameInstead)
1352          ) {
1353            String msg = "Master passed us a different hostname to use; was="
1354              + this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1355            LOG.error(msg);
1356            throw new IOException(msg);
1357          }
1358          if (
1359            StringUtils.isBlank(useThisHostnameInstead)
1360              && !hostnameFromMasterPOV.equals(rpcServices.getSocketAddress().getHostName())
1361          ) {
1362            String msg = "Master passed us a different hostname to use; was="
1363              + rpcServices.getSocketAddress().getHostName() + ", but now=" + hostnameFromMasterPOV;
1364            LOG.error(msg);
1365          }
1366          continue;
1367        }
1368
1369        String value = e.getValue();
1370        if (key.equals(HConstants.HBASE_DIR)) {
1371          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1372            updateRootDir = true;
1373          }
1374        }
1375
1376        if (LOG.isDebugEnabled()) {
1377          LOG.debug("Config from master: " + key + "=" + value);
1378        }
1379        this.conf.set(key, value);
1380      }
1381      // Set our ephemeral znode up in zookeeper now we have a name.
1382      createMyEphemeralNode();
1383
1384      if (updateRootDir) {
1385        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1386        initializeFileSystem();
1387      }
1388
1389      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1390      // config param for task trackers, but we can piggyback off of it.
1391      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1392        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1393      }
1394
1395      // Save it in a file, this will allow to see if we crash
1396      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1397
1398      // This call sets up an initialized replication and WAL. Later we start it up.
1399      setupWALAndReplication();
1400      // Init in here rather than in constructor after thread name has been set
1401      final MetricsTable metricsTable =
1402        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1403      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1404      this.metricsRegionServer =
1405        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1406      // Now that we have a metrics source, start the pause monitor
1407      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1408      pauseMonitor.start();
1409
1410      // There is a rare case where we do NOT want services to start. Check config.
1411      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1412        startServices();
1413      }
1414      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1415      // or make sense of it.
1416      startReplicationService();
1417
1418      // Set up ZK
1419      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress()
1420        + ", sessionid=0x"
1421        + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1422
1423      // Wake up anyone waiting for this server to online
1424      synchronized (online) {
1425        online.set(true);
1426        online.notifyAll();
1427      }
1428    } catch (Throwable e) {
1429      stop("Failed initialization");
1430      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1431    } finally {
1432      sleeper.skipSleepCycle();
1433    }
1434  }
1435
1436  private void startHeapMemoryManager() {
1437    if (this.blockCache != null) {
1438      this.hMemManager =
1439        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1440      this.hMemManager.start(getChoreService());
1441    }
1442  }
1443
1444  private void createMyEphemeralNode() throws KeeperException {
1445    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1446    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1447    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1448    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1449    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1450  }
1451
1452  private void deleteMyEphemeralNode() throws KeeperException {
1453    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1454  }
1455
1456  @Override
1457  public RegionServerAccounting getRegionServerAccounting() {
1458    return regionServerAccounting;
1459  }
1460
1461  // Round the size with KB or MB.
1462  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1463  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1464  // HBASE-26340 for more details on why this is important.
1465  private static int roundSize(long sizeInByte, int sizeUnit) {
1466    if (sizeInByte == 0) {
1467      return 0;
1468    } else if (sizeInByte < sizeUnit) {
1469      return 1;
1470    } else {
1471      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1472    }
1473  }
1474
1475  /**
1476   * @param r               Region to get RegionLoad for.
1477   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1478   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1479   * @return RegionLoad instance.
1480   */
1481  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1482    RegionSpecifier.Builder regionSpecifier) throws IOException {
1483    byte[] name = r.getRegionInfo().getRegionName();
1484    int stores = 0;
1485    int storefiles = 0;
1486    int storeRefCount = 0;
1487    int maxCompactedStoreFileRefCount = 0;
1488    long storeUncompressedSize = 0L;
1489    long storefileSize = 0L;
1490    long storefileIndexSize = 0L;
1491    long rootLevelIndexSize = 0L;
1492    long totalStaticIndexSize = 0L;
1493    long totalStaticBloomSize = 0L;
1494    long totalCompactingKVs = 0L;
1495    long currentCompactedKVs = 0L;
1496    List<HStore> storeList = r.getStores();
1497    stores += storeList.size();
1498    for (HStore store : storeList) {
1499      storefiles += store.getStorefilesCount();
1500      int currentStoreRefCount = store.getStoreRefCount();
1501      storeRefCount += currentStoreRefCount;
1502      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1503      maxCompactedStoreFileRefCount =
1504        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1505      storeUncompressedSize += store.getStoreSizeUncompressed();
1506      storefileSize += store.getStorefilesSize();
1507      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1508      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1509      CompactionProgress progress = store.getCompactionProgress();
1510      if (progress != null) {
1511        totalCompactingKVs += progress.getTotalCompactingKVs();
1512        currentCompactedKVs += progress.currentCompactedKVs;
1513      }
1514      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1515      totalStaticIndexSize += store.getTotalStaticIndexSize();
1516      totalStaticBloomSize += store.getTotalStaticBloomSize();
1517    }
1518
1519    int unitMB = 1024 * 1024;
1520    int unitKB = 1024;
1521
1522    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1523    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1524    int storefileSizeMB = roundSize(storefileSize, unitMB);
1525    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1526    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1527    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1528    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1529
1530    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1531    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1532    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1533    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1534    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1535    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1536    if (regionLoadBldr == null) {
1537      regionLoadBldr = RegionLoad.newBuilder();
1538    }
1539    if (regionSpecifier == null) {
1540      regionSpecifier = RegionSpecifier.newBuilder();
1541    }
1542
1543    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1544    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1545    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1546      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1547      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1548      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1549      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1550      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1551      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1552      .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount())
1553      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1554      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1555      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1556      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1557      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1558      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1559      .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1560    r.setCompleteSequenceId(regionLoadBldr);
1561    return regionLoadBldr.build();
1562  }
1563
1564  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1565    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1566    userLoadBldr.setUserName(user);
1567    userSource.getClientMetrics().values().stream()
1568      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1569        .setHostName(clientMetrics.getHostName())
1570        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1571        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1572        .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1573      .forEach(userLoadBldr::addClientMetrics);
1574    return userLoadBldr.build();
1575  }
1576
1577  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1578    HRegion r = onlineRegions.get(encodedRegionName);
1579    return r != null ? createRegionLoad(r, null, null) : null;
1580  }
1581
1582  /**
1583   * Inner class that runs on a long period checking if regions need compaction.
1584   */
1585  private static class CompactionChecker extends ScheduledChore {
1586    private final HRegionServer instance;
1587    private final int majorCompactPriority;
1588    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1589    // Iteration is 1-based rather than 0-based so we don't check for compaction
1590    // immediately upon region server startup
1591    private long iteration = 1;
1592
1593    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1594      super("CompactionChecker", stopper, sleepTime);
1595      this.instance = h;
1596      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1597
1598      /*
1599       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1600       */
1601      this.majorCompactPriority = this.instance.conf
1602        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1603    }
1604
1605    @Override
1606    protected void chore() {
1607      for (Region r : this.instance.onlineRegions.values()) {
1608        // Skip compaction if region is read only
1609        if (r == null || r.isReadOnly()) {
1610          continue;
1611        }
1612
1613        HRegion hr = (HRegion) r;
1614        for (HStore s : hr.stores.values()) {
1615          try {
1616            long multiplier = s.getCompactionCheckMultiplier();
1617            assert multiplier > 0;
1618            if (iteration % multiplier != 0) {
1619              continue;
1620            }
1621            if (s.needsCompaction()) {
1622              // Queue a compaction. Will recognize if major is needed.
1623              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1624                getName() + " requests compaction");
1625            } else if (s.shouldPerformMajorCompaction()) {
1626              s.triggerMajorCompaction();
1627              if (
1628                majorCompactPriority == DEFAULT_PRIORITY
1629                  || majorCompactPriority > hr.getCompactPriority()
1630              ) {
1631                this.instance.compactSplitThread.requestCompaction(hr, s,
1632                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
1633                  CompactionLifeCycleTracker.DUMMY, null);
1634              } else {
1635                this.instance.compactSplitThread.requestCompaction(hr, s,
1636                  getName() + " requests major compaction; use configured priority",
1637                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1638              }
1639            }
1640          } catch (IOException e) {
1641            LOG.warn("Failed major compaction check on " + r, e);
1642          }
1643        }
1644      }
1645      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1646    }
1647  }
1648
1649  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1650    private final HRegionServer server;
1651    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1652    private final static int MIN_DELAY_TIME = 0; // millisec
1653    private final long rangeOfDelayMs;
1654
1655    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1656      super("MemstoreFlusherChore", server, cacheFlushInterval);
1657      this.server = server;
1658
1659      final long configuredRangeOfDelay = server.getConfiguration()
1660        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1661      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1662    }
1663
1664    @Override
1665    protected void chore() {
1666      final StringBuilder whyFlush = new StringBuilder();
1667      for (HRegion r : this.server.onlineRegions.values()) {
1668        if (r == null) {
1669          continue;
1670        }
1671        if (r.shouldFlush(whyFlush)) {
1672          FlushRequester requester = server.getFlushRequester();
1673          if (requester != null) {
1674            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1675            // Throttle the flushes by putting a delay. If we don't throttle, and there
1676            // is a balanced write-load on the regions in a table, we might end up
1677            // overwhelming the filesystem with too many flushes at once.
1678            if (requester.requestDelayedFlush(r, delay)) {
1679              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
1680                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
1681            }
1682          }
1683        }
1684      }
1685    }
1686  }
1687
1688  /**
1689   * Report the status of the server. A server is online once all the startup is completed (setting
1690   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
1691   * useful in tests.
1692   * @return true if online, false if not.
1693   */
1694  public boolean isOnline() {
1695    return online.get();
1696  }
1697
1698  /**
1699   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1700   * be hooked up to WAL.
1701   */
1702  private void setupWALAndReplication() throws IOException {
1703    WALFactory factory = new WALFactory(conf, serverName.toString(), this, true);
1704    // TODO Replication make assumptions here based on the default filesystem impl
1705    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1706    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1707
1708    Path logDir = new Path(walRootDir, logName);
1709    LOG.debug("logDir={}", logDir);
1710    if (this.walFs.exists(logDir)) {
1711      throw new RegionServerRunningException(
1712        "Region server has already created directory at " + this.serverName.toString());
1713    }
1714    // Always create wal directory as now we need this when master restarts to find out the live
1715    // region servers.
1716    if (!this.walFs.mkdirs(logDir)) {
1717      throw new IOException("Can not create wal directory " + logDir);
1718    }
1719    // Instantiate replication if replication enabled. Pass it the log directories.
1720    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1721    this.walFactory = factory;
1722  }
1723
1724  /**
1725   * Start up replication source and sink handlers.
1726   */
1727  private void startReplicationService() throws IOException {
1728    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1729      this.replicationSourceHandler.startReplicationService();
1730    } else {
1731      if (this.replicationSourceHandler != null) {
1732        this.replicationSourceHandler.startReplicationService();
1733      }
1734      if (this.replicationSinkHandler != null) {
1735        this.replicationSinkHandler.startReplicationService();
1736      }
1737    }
1738  }
1739
1740  /** Returns Master address tracker instance. */
1741  public MasterAddressTracker getMasterAddressTracker() {
1742    return this.masterAddressTracker;
1743  }
1744
1745  /**
1746   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
1747   * to run. This is called after we've successfully registered with the Master. Install an
1748   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
1749   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
1750   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
1751   * to run should trigger same critical condition and the shutdown will run. On its way out, this
1752   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
1753   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
1754   * by this hosting server. Worker logs the exception and exits.
1755   */
1756  private void startServices() throws IOException {
1757    if (!isStopped() && !isAborted()) {
1758      initializeThreads();
1759    }
1760    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1761    this.secureBulkLoadManager.start();
1762
1763    // Health checker thread.
1764    if (isHealthCheckerConfigured()) {
1765      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1766        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1767      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1768    }
1769    // Executor status collect thread.
1770    if (
1771      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1772        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
1773    ) {
1774      int sleepTime =
1775        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1776      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1777        this.metricsRegionServer.getMetricsSource());
1778    }
1779
1780    this.walRoller = new LogRoller(this);
1781    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1782    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1783
1784    // Create the CompactedFileDischarger chore executorService. This chore helps to
1785    // remove the compacted files that will no longer be used in reads.
1786    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1787    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1788    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1789    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
1790    choreService.scheduleChore(compactedFileDischarger);
1791
1792    // Start executor services
1793    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1794    executorService.startExecutorService(executorService.new ExecutorConfig()
1795      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1796    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1797    executorService.startExecutorService(executorService.new ExecutorConfig()
1798      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1799    final int openPriorityRegionThreads =
1800      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1801    executorService.startExecutorService(
1802      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
1803        .setCorePoolSize(openPriorityRegionThreads));
1804    final int closeRegionThreads =
1805      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1806    executorService.startExecutorService(executorService.new ExecutorConfig()
1807      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1808    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1809    executorService.startExecutorService(executorService.new ExecutorConfig()
1810      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1811    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1812      final int storeScannerParallelSeekThreads =
1813        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1814      executorService.startExecutorService(
1815        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
1816          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
1817    }
1818    final int logReplayOpsThreads =
1819      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1820    executorService.startExecutorService(
1821      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
1822        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
1823    // Start the threads for compacted files discharger
1824    final int compactionDischargerThreads =
1825      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1826    executorService.startExecutorService(executorService.new ExecutorConfig()
1827      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
1828      .setCorePoolSize(compactionDischargerThreads));
1829    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1830      final int regionReplicaFlushThreads =
1831        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1832          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1833      executorService.startExecutorService(executorService.new ExecutorConfig()
1834        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
1835        .setCorePoolSize(regionReplicaFlushThreads));
1836    }
1837    final int refreshPeerThreads =
1838      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1839    executorService.startExecutorService(executorService.new ExecutorConfig()
1840      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1841    final int replaySyncReplicationWALThreads =
1842      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1843    executorService.startExecutorService(executorService.new ExecutorConfig()
1844      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
1845      .setCorePoolSize(replaySyncReplicationWALThreads));
1846    final int switchRpcThrottleThreads =
1847      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1848    executorService.startExecutorService(
1849      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
1850        .setCorePoolSize(switchRpcThrottleThreads));
1851    final int claimReplicationQueueThreads =
1852      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1853    executorService.startExecutorService(
1854      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
1855        .setCorePoolSize(claimReplicationQueueThreads));
1856    final int rsSnapshotOperationThreads =
1857      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1858    executorService.startExecutorService(
1859      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
1860        .setCorePoolSize(rsSnapshotOperationThreads));
1861
1862    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1863      uncaughtExceptionHandler);
1864    if (this.cacheFlusher != null) {
1865      this.cacheFlusher.start(uncaughtExceptionHandler);
1866    }
1867    Threads.setDaemonThreadRunning(this.procedureResultReporter,
1868      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1869
1870    if (this.compactionChecker != null) {
1871      choreService.scheduleChore(compactionChecker);
1872    }
1873    if (this.periodicFlusher != null) {
1874      choreService.scheduleChore(periodicFlusher);
1875    }
1876    if (this.healthCheckChore != null) {
1877      choreService.scheduleChore(healthCheckChore);
1878    }
1879    if (this.executorStatusChore != null) {
1880      choreService.scheduleChore(executorStatusChore);
1881    }
1882    if (this.nonceManagerChore != null) {
1883      choreService.scheduleChore(nonceManagerChore);
1884    }
1885    if (this.storefileRefresher != null) {
1886      choreService.scheduleChore(storefileRefresher);
1887    }
1888    if (this.fsUtilizationChore != null) {
1889      choreService.scheduleChore(fsUtilizationChore);
1890    }
1891    if (this.slowLogTableOpsChore != null) {
1892      choreService.scheduleChore(slowLogTableOpsChore);
1893    }
1894    if (this.brokenStoreFileCleaner != null) {
1895      choreService.scheduleChore(brokenStoreFileCleaner);
1896    }
1897
1898    if (this.rsMobFileCleanerChore != null) {
1899      choreService.scheduleChore(rsMobFileCleanerChore);
1900    }
1901
1902    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1903    // an unhandled exception, it will just exit.
1904    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
1905      uncaughtExceptionHandler);
1906
1907    // Create the log splitting worker and start it
1908    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1909    // quite a while inside Connection layer. The worker won't be available for other
1910    // tasks even after current task is preempted after a split task times out.
1911    Configuration sinkConf = HBaseConfiguration.create(conf);
1912    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1913      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1914    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1915      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1916    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
1917    if (
1918      this.csm != null
1919        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
1920    ) {
1921      // SplitLogWorker needs csm. If none, don't start this.
1922      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
1923      splitLogWorker.start();
1924      LOG.debug("SplitLogWorker started");
1925    }
1926
1927    // Memstore services.
1928    startHeapMemoryManager();
1929    // Call it after starting HeapMemoryManager.
1930    initializeMemStoreChunkCreator(hMemManager);
1931  }
1932
1933  private void initializeThreads() {
1934    // Cache flushing thread.
1935    this.cacheFlusher = new MemStoreFlusher(conf, this);
1936
1937    // Compaction thread
1938    this.compactSplitThread = new CompactSplit(this);
1939
1940    // Background thread to check for compactions; needed if region has not gotten updates
1941    // in a while. It will take care of not checking too frequently on store-by-store basis.
1942    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
1943    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
1944    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
1945
1946    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
1947      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
1948    if (isSlowLogTableEnabled) {
1949      // default chore duration: 10 min
1950      final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
1951      slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
1952    }
1953
1954    if (this.nonceManager != null) {
1955      // Create the scheduled chore that cleans up nonces.
1956      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
1957    }
1958
1959    // Setup the Quota Manager
1960    rsQuotaManager = new RegionServerRpcQuotaManager(this);
1961    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
1962
1963    if (QuotaUtil.isQuotaEnabled(conf)) {
1964      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
1965    }
1966
1967    boolean onlyMetaRefresh = false;
1968    int storefileRefreshPeriod =
1969      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
1970        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
1971    if (storefileRefreshPeriod == 0) {
1972      storefileRefreshPeriod =
1973        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
1974          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
1975      onlyMetaRefresh = true;
1976    }
1977    if (storefileRefreshPeriod > 0) {
1978      this.storefileRefresher =
1979        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
1980    }
1981
1982    int brokenStoreFileCleanerPeriod =
1983      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
1984        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
1985    int brokenStoreFileCleanerDelay =
1986      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
1987        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
1988    double brokenStoreFileCleanerDelayJitter =
1989      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
1990        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
1991    double jitterRate =
1992      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
1993    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
1994    this.brokenStoreFileCleaner =
1995      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
1996        brokenStoreFileCleanerPeriod, this, conf, this);
1997
1998    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
1999
2000    registerConfigurationObservers();
2001  }
2002
2003  private void registerConfigurationObservers() {
2004    // Registering the compactSplitThread object with the ConfigurationManager.
2005    configurationManager.registerObserver(this.compactSplitThread);
2006    configurationManager.registerObserver(this.rpcServices);
2007    configurationManager.registerObserver(this);
2008  }
2009
2010  /*
2011   * Verify that server is healthy
2012   */
2013  private boolean isHealthy() {
2014    if (!dataFsOk) {
2015      // File system problem
2016      return false;
2017    }
2018    // Verify that all threads are alive
2019    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2020      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2021      && (this.walRoller == null || this.walRoller.isAlive())
2022      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2023      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2024    if (!healthy) {
2025      stop("One or more threads are no longer alive -- stop");
2026    }
2027    return healthy;
2028  }
2029
2030  @Override
2031  public List<WAL> getWALs() {
2032    return walFactory.getWALs();
2033  }
2034
2035  @Override
2036  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2037    WAL wal = walFactory.getWAL(regionInfo);
2038    if (this.walRoller != null) {
2039      this.walRoller.addWAL(wal);
2040    }
2041    return wal;
2042  }
2043
2044  public LogRoller getWalRoller() {
2045    return walRoller;
2046  }
2047
2048  WALFactory getWalFactory() {
2049    return walFactory;
2050  }
2051
2052  @Override
2053  public void stop(final String msg) {
2054    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2055  }
2056
2057  /**
2058   * Stops the regionserver.
2059   * @param msg   Status message
2060   * @param force True if this is a regionserver abort
2061   * @param user  The user executing the stop request, or null if no user is associated
2062   */
2063  public void stop(final String msg, final boolean force, final User user) {
2064    if (!this.stopped) {
2065      LOG.info("***** STOPPING region server '" + this + "' *****");
2066      if (this.rsHost != null) {
2067        // when forced via abort don't allow CPs to override
2068        try {
2069          this.rsHost.preStop(msg, user);
2070        } catch (IOException ioe) {
2071          if (!force) {
2072            LOG.warn("The region server did not stop", ioe);
2073            return;
2074          }
2075          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2076        }
2077      }
2078      this.stopped = true;
2079      LOG.info("STOPPED: " + msg);
2080      // Wakes run() if it is sleeping
2081      sleeper.skipSleepCycle();
2082    }
2083  }
2084
2085  public void waitForServerOnline() {
2086    while (!isStopped() && !isOnline()) {
2087      synchronized (online) {
2088        try {
2089          online.wait(msgInterval);
2090        } catch (InterruptedException ie) {
2091          Thread.currentThread().interrupt();
2092          break;
2093        }
2094      }
2095    }
2096  }
2097
2098  @Override
2099  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2100    HRegion r = context.getRegion();
2101    long openProcId = context.getOpenProcId();
2102    long masterSystemTime = context.getMasterSystemTime();
2103    rpcServices.checkOpen();
2104    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2105      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2106    // Do checks to see if we need to compact (references or too many files)
2107    // Skip compaction check if region is read only
2108    if (!r.isReadOnly()) {
2109      for (HStore s : r.stores.values()) {
2110        if (s.hasReferences() || s.needsCompaction()) {
2111          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2112        }
2113      }
2114    }
2115    long openSeqNum = r.getOpenSeqNum();
2116    if (openSeqNum == HConstants.NO_SEQNUM) {
2117      // If we opened a region, we should have read some sequence number from it.
2118      LOG.error(
2119        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2120      openSeqNum = 0;
2121    }
2122
2123    // Notify master
2124    if (
2125      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2126        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))
2127    ) {
2128      throw new IOException(
2129        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2130    }
2131
2132    triggerFlushInPrimaryRegion(r);
2133
2134    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2135  }
2136
2137  /**
2138   * Helper method for use in tests. Skip the region transition report when there's no master around
2139   * to receive it.
2140   */
2141  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2142    final TransitionCode code = context.getCode();
2143    final long openSeqNum = context.getOpenSeqNum();
2144    long masterSystemTime = context.getMasterSystemTime();
2145    final RegionInfo[] hris = context.getHris();
2146
2147    if (code == TransitionCode.OPENED) {
2148      Preconditions.checkArgument(hris != null && hris.length == 1);
2149      if (hris[0].isMetaRegion()) {
2150        LOG.warn(
2151          "meta table location is stored in master local store, so we can not skip reporting");
2152        return false;
2153      } else {
2154        try {
2155          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2156            serverName, openSeqNum, masterSystemTime);
2157        } catch (IOException e) {
2158          LOG.info("Failed to update meta", e);
2159          return false;
2160        }
2161      }
2162    }
2163    return true;
2164  }
2165
2166  private ReportRegionStateTransitionRequest
2167    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2168    final TransitionCode code = context.getCode();
2169    final long openSeqNum = context.getOpenSeqNum();
2170    final RegionInfo[] hris = context.getHris();
2171    final long[] procIds = context.getProcIds();
2172
2173    ReportRegionStateTransitionRequest.Builder builder =
2174      ReportRegionStateTransitionRequest.newBuilder();
2175    builder.setServer(ProtobufUtil.toServerName(serverName));
2176    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2177    transition.setTransitionCode(code);
2178    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2179      transition.setOpenSeqNum(openSeqNum);
2180    }
2181    for (RegionInfo hri : hris) {
2182      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2183    }
2184    for (long procId : procIds) {
2185      transition.addProcId(procId);
2186    }
2187
2188    return builder.build();
2189  }
2190
2191  @Override
2192  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2193    if (TEST_SKIP_REPORTING_TRANSITION) {
2194      return skipReportingTransition(context);
2195    }
2196    final ReportRegionStateTransitionRequest request =
2197      createReportRegionStateTransitionRequest(context);
2198
2199    int tries = 0;
2200    long pauseTime = this.retryPauseTime;
2201    // Keep looping till we get an error. We want to send reports even though server is going down.
2202    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2203    // HRegionServer does down.
2204    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2205      RegionServerStatusService.BlockingInterface rss = rssStub;
2206      try {
2207        if (rss == null) {
2208          createRegionServerStatusStub();
2209          continue;
2210        }
2211        ReportRegionStateTransitionResponse response =
2212          rss.reportRegionStateTransition(null, request);
2213        if (response.hasErrorMessage()) {
2214          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2215          break;
2216        }
2217        // Log if we had to retry else don't log unless TRACE. We want to
2218        // know if were successful after an attempt showed in logs as failed.
2219        if (tries > 0 || LOG.isTraceEnabled()) {
2220          LOG.info("TRANSITION REPORTED " + request);
2221        }
2222        // NOTE: Return mid-method!!!
2223        return true;
2224      } catch (ServiceException se) {
2225        IOException ioe = ProtobufUtil.getRemoteException(se);
2226        boolean pause = ioe instanceof ServerNotRunningYetException
2227          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2228        if (pause) {
2229          // Do backoff else we flood the Master with requests.
2230          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2231        } else {
2232          pauseTime = this.retryPauseTime; // Reset.
2233        }
2234        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2235          + tries + ")"
2236          + (pause
2237            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2238            : " immediately."),
2239          ioe);
2240        if (pause) {
2241          Threads.sleep(pauseTime);
2242        }
2243        tries++;
2244        if (rssStub == rss) {
2245          rssStub = null;
2246        }
2247      }
2248    }
2249    return false;
2250  }
2251
2252  /**
2253   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2254   * block this thread. See RegionReplicaFlushHandler for details.
2255   */
2256  private void triggerFlushInPrimaryRegion(final HRegion region) {
2257    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2258      return;
2259    }
2260    TableName tn = region.getTableDescriptor().getTableName();
2261    if (
2262      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2263        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2264        // If the memstore replication not setup, we do not have to wait for observing a flush event
2265        // from primary before starting to serve reads, because gaps from replication is not
2266        // applicable,this logic is from
2267        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2268        // HBASE-13063
2269        !region.getTableDescriptor().hasRegionMemStoreReplication()
2270    ) {
2271      region.setReadsEnabled(true);
2272      return;
2273    }
2274
2275    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2276    // RegionReplicaFlushHandler might reset this.
2277
2278    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2279    if (this.executorService != null) {
2280      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2281    } else {
2282      LOG.info("Executor is null; not running flush of primary region replica for {}",
2283        region.getRegionInfo());
2284    }
2285  }
2286
2287  @InterfaceAudience.Private
2288  public RSRpcServices getRSRpcServices() {
2289    return rpcServices;
2290  }
2291
2292  /**
2293   * Cause the server to exit without closing the regions it is serving, the log it is using and
2294   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2295   * yanked out from under hbase or we OOME. n * the reason we are aborting n * the exception that
2296   * caused the abort, or null
2297   */
2298  @Override
2299  public void abort(String reason, Throwable cause) {
2300    if (!setAbortRequested()) {
2301      // Abort already in progress, ignore the new request.
2302      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2303      return;
2304    }
2305    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2306    if (cause != null) {
2307      LOG.error(HBaseMarkers.FATAL, msg, cause);
2308    } else {
2309      LOG.error(HBaseMarkers.FATAL, msg);
2310    }
2311    // HBASE-4014: show list of coprocessors that were loaded to help debug
2312    // regionserver crashes.Note that we're implicitly using
2313    // java.util.HashSet's toString() method to print the coprocessor names.
2314    LOG.error(HBaseMarkers.FATAL,
2315      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2316    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2317    try {
2318      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2319    } catch (MalformedObjectNameException | IOException e) {
2320      LOG.warn("Failed dumping metrics", e);
2321    }
2322
2323    // Do our best to report our abort to the master, but this may not work
2324    try {
2325      if (cause != null) {
2326        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2327      }
2328      // Report to the master but only if we have already registered with the master.
2329      RegionServerStatusService.BlockingInterface rss = rssStub;
2330      if (rss != null && this.serverName != null) {
2331        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2332        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2333        builder.setErrorMessage(msg);
2334        rss.reportRSFatalError(null, builder.build());
2335      }
2336    } catch (Throwable t) {
2337      LOG.warn("Unable to report fatal error to master", t);
2338    }
2339
2340    scheduleAbortTimer();
2341    // shutdown should be run as the internal user
2342    stop(reason, true, null);
2343  }
2344
2345  /*
2346   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2347   * close socket in case want to bring up server on old hostname+port immediately.
2348   */
2349  @InterfaceAudience.Private
2350  protected void kill() {
2351    this.killed = true;
2352    abort("Simulated kill");
2353  }
2354
2355  // Limits the time spent in the shutdown process.
2356  private void scheduleAbortTimer() {
2357    if (this.abortMonitor == null) {
2358      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2359      TimerTask abortTimeoutTask = null;
2360      try {
2361        Constructor<? extends TimerTask> timerTaskCtor =
2362          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2363            .asSubclass(TimerTask.class).getDeclaredConstructor();
2364        timerTaskCtor.setAccessible(true);
2365        abortTimeoutTask = timerTaskCtor.newInstance();
2366      } catch (Exception e) {
2367        LOG.warn("Initialize abort timeout task failed", e);
2368      }
2369      if (abortTimeoutTask != null) {
2370        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2371      }
2372    }
2373  }
2374
2375  /**
2376   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2377   * called.
2378   */
2379  protected void stopServiceThreads() {
2380    // clean up the scheduled chores
2381    stopChoreService();
2382    if (bootstrapNodeManager != null) {
2383      bootstrapNodeManager.stop();
2384    }
2385    if (this.cacheFlusher != null) {
2386      this.cacheFlusher.join();
2387    }
2388    if (this.walRoller != null) {
2389      this.walRoller.close();
2390    }
2391    if (this.compactSplitThread != null) {
2392      this.compactSplitThread.join();
2393    }
2394    stopExecutorService();
2395    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2396      this.replicationSourceHandler.stopReplicationService();
2397    } else {
2398      if (this.replicationSourceHandler != null) {
2399        this.replicationSourceHandler.stopReplicationService();
2400      }
2401      if (this.replicationSinkHandler != null) {
2402        this.replicationSinkHandler.stopReplicationService();
2403      }
2404    }
2405  }
2406
2407  /** Returns Return the object that implements the replication source executorService. */
2408  @Override
2409  public ReplicationSourceService getReplicationSourceService() {
2410    return replicationSourceHandler;
2411  }
2412
2413  /** Returns Return the object that implements the replication sink executorService. */
2414  public ReplicationSinkService getReplicationSinkService() {
2415    return replicationSinkHandler;
2416  }
2417
2418  /**
2419   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2420   * connection, the current rssStub must be null. Method will block until a master is available.
2421   * You can break from this block by requesting the server stop.
2422   * @return master + port, or null if server has been stopped
2423   */
2424  private synchronized ServerName createRegionServerStatusStub() {
2425    // Create RS stub without refreshing the master node from ZK, use cached data
2426    return createRegionServerStatusStub(false);
2427  }
2428
2429  /**
2430   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2431   * connection, the current rssStub must be null. Method will block until a master is available.
2432   * You can break from this block by requesting the server stop.
2433   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2434   * @return master + port, or null if server has been stopped
2435   */
2436  @InterfaceAudience.Private
2437  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2438    if (rssStub != null) {
2439      return masterAddressTracker.getMasterAddress();
2440    }
2441    ServerName sn = null;
2442    long previousLogTime = 0;
2443    RegionServerStatusService.BlockingInterface intRssStub = null;
2444    LockService.BlockingInterface intLockStub = null;
2445    boolean interrupted = false;
2446    try {
2447      while (keepLooping()) {
2448        sn = this.masterAddressTracker.getMasterAddress(refresh);
2449        if (sn == null) {
2450          if (!keepLooping()) {
2451            // give up with no connection.
2452            LOG.debug("No master found and cluster is stopped; bailing out");
2453            return null;
2454          }
2455          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2456            LOG.debug("No master found; retry");
2457            previousLogTime = EnvironmentEdgeManager.currentTime();
2458          }
2459          refresh = true; // let's try pull it from ZK directly
2460          if (sleepInterrupted(200)) {
2461            interrupted = true;
2462          }
2463          continue;
2464        }
2465        try {
2466          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
2467            userProvider.getCurrent(), shortOperationTimeout);
2468          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2469          intLockStub = LockService.newBlockingStub(channel);
2470          break;
2471        } catch (IOException e) {
2472          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2473            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
2474            if (e instanceof ServerNotRunningYetException) {
2475              LOG.info("Master isn't available yet, retrying");
2476            } else {
2477              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2478            }
2479            previousLogTime = EnvironmentEdgeManager.currentTime();
2480          }
2481          if (sleepInterrupted(200)) {
2482            interrupted = true;
2483          }
2484        }
2485      }
2486    } finally {
2487      if (interrupted) {
2488        Thread.currentThread().interrupt();
2489      }
2490    }
2491    this.rssStub = intRssStub;
2492    this.lockStub = intLockStub;
2493    return sn;
2494  }
2495
2496  /**
2497   * @return True if we should break loop because cluster is going down or this server has been
2498   *         stopped or hdfs has gone bad.
2499   */
2500  private boolean keepLooping() {
2501    return !this.stopped && isClusterUp();
2502  }
2503
2504  /*
2505   * Let the master know we're here Run initialization using parameters passed us by the master.
2506   * @return A Map of key/value configurations we got from the Master else null if we failed to
2507   * register. n
2508   */
2509  private RegionServerStartupResponse reportForDuty() throws IOException {
2510    if (this.masterless) {
2511      return RegionServerStartupResponse.getDefaultInstance();
2512    }
2513    ServerName masterServerName = createRegionServerStatusStub(true);
2514    RegionServerStatusService.BlockingInterface rss = rssStub;
2515    if (masterServerName == null || rss == null) {
2516      return null;
2517    }
2518    RegionServerStartupResponse result = null;
2519    try {
2520      rpcServices.requestCount.reset();
2521      rpcServices.rpcGetRequestCount.reset();
2522      rpcServices.rpcScanRequestCount.reset();
2523      rpcServices.rpcFullScanRequestCount.reset();
2524      rpcServices.rpcMultiRequestCount.reset();
2525      rpcServices.rpcMutateRequestCount.reset();
2526      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2527        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2528      long now = EnvironmentEdgeManager.currentTime();
2529      int port = rpcServices.getSocketAddress().getPort();
2530      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2531      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2532        request.setUseThisHostnameInstead(useThisHostnameInstead);
2533      }
2534      request.setPort(port);
2535      request.setServerStartCode(this.startcode);
2536      request.setServerCurrentTime(now);
2537      result = rss.regionServerStartup(null, request.build());
2538    } catch (ServiceException se) {
2539      IOException ioe = ProtobufUtil.getRemoteException(se);
2540      if (ioe instanceof ClockOutOfSyncException) {
2541        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
2542        // Re-throw IOE will cause RS to abort
2543        throw ioe;
2544      } else if (ioe instanceof ServerNotRunningYetException) {
2545        LOG.debug("Master is not running yet");
2546      } else {
2547        LOG.warn("error telling master we are up", se);
2548      }
2549      rssStub = null;
2550    }
2551    return result;
2552  }
2553
2554  @Override
2555  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2556    try {
2557      GetLastFlushedSequenceIdRequest req =
2558        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2559      RegionServerStatusService.BlockingInterface rss = rssStub;
2560      if (rss == null) { // Try to connect one more time
2561        createRegionServerStatusStub();
2562        rss = rssStub;
2563        if (rss == null) {
2564          // Still no luck, we tried
2565          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2566          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2567            .build();
2568        }
2569      }
2570      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2571      return RegionStoreSequenceIds.newBuilder()
2572        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2573        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2574    } catch (ServiceException e) {
2575      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2576      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2577        .build();
2578    }
2579  }
2580
2581  /**
2582   * Close meta region if we carry it
2583   * @param abort Whether we're running an abort.
2584   */
2585  private void closeMetaTableRegions(final boolean abort) {
2586    HRegion meta = null;
2587    this.onlineRegionsLock.writeLock().lock();
2588    try {
2589      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
2590        RegionInfo hri = e.getValue().getRegionInfo();
2591        if (hri.isMetaRegion()) {
2592          meta = e.getValue();
2593        }
2594        if (meta != null) {
2595          break;
2596        }
2597      }
2598    } finally {
2599      this.onlineRegionsLock.writeLock().unlock();
2600    }
2601    if (meta != null) {
2602      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2603    }
2604  }
2605
2606  /**
2607   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
2608   * close regions that are already closed or that are closing.
2609   * @param abort Whether we're running an abort.
2610   */
2611  private void closeUserRegions(final boolean abort) {
2612    this.onlineRegionsLock.writeLock().lock();
2613    try {
2614      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
2615        HRegion r = e.getValue();
2616        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2617          // Don't update zk with this close transition; pass false.
2618          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2619        }
2620      }
2621    } finally {
2622      this.onlineRegionsLock.writeLock().unlock();
2623    }
2624  }
2625
2626  protected Map<String, HRegion> getOnlineRegions() {
2627    return this.onlineRegions;
2628  }
2629
2630  public int getNumberOfOnlineRegions() {
2631    return this.onlineRegions.size();
2632  }
2633
2634  /**
2635   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
2636   * as client; HRegion cannot be serialized to cross an rpc.
2637   */
2638  public Collection<HRegion> getOnlineRegionsLocalContext() {
2639    Collection<HRegion> regions = this.onlineRegions.values();
2640    return Collections.unmodifiableCollection(regions);
2641  }
2642
2643  @Override
2644  public void addRegion(HRegion region) {
2645    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2646    configurationManager.registerObserver(region);
2647  }
2648
2649  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2650    long size) {
2651    if (!sortedRegions.containsKey(size)) {
2652      sortedRegions.put(size, new ArrayList<>());
2653    }
2654    sortedRegions.get(size).add(region);
2655  }
2656
2657  /**
2658   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2659   *         the biggest.
2660   */
2661  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2662    // we'll sort the regions in reverse
2663    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2664    // Copy over all regions. Regions are sorted by size with biggest first.
2665    for (HRegion region : this.onlineRegions.values()) {
2666      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2667    }
2668    return sortedRegions;
2669  }
2670
2671  /**
2672   * @return A new Map of online regions sorted by region heap size with the first entry being the
2673   *         biggest.
2674   */
2675  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2676    // we'll sort the regions in reverse
2677    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2678    // Copy over all regions. Regions are sorted by size with biggest first.
2679    for (HRegion region : this.onlineRegions.values()) {
2680      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2681    }
2682    return sortedRegions;
2683  }
2684
2685  /** Returns reference to FlushRequester */
2686  @Override
2687  public FlushRequester getFlushRequester() {
2688    return this.cacheFlusher;
2689  }
2690
2691  @Override
2692  public CompactionRequester getCompactionRequestor() {
2693    return this.compactSplitThread;
2694  }
2695
2696  @Override
2697  public LeaseManager getLeaseManager() {
2698    return leaseManager;
2699  }
2700
2701  /** Returns {@code true} when the data file system is available, {@code false} otherwise. */
2702  boolean isDataFileSystemOk() {
2703    return this.dataFsOk;
2704  }
2705
2706  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
2707    return this.rsHost;
2708  }
2709
2710  @Override
2711  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2712    return this.regionsInTransitionInRS;
2713  }
2714
2715  @Override
2716  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2717    return rsQuotaManager;
2718  }
2719
2720  //
2721  // Main program and support routines
2722  //
2723  /**
2724   * Load the replication executorService objects, if any
2725   */
2726  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2727    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2728    // read in the name of the source replication class from the config file.
2729    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2730      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2731
2732    // read in the name of the sink replication class from the config file.
2733    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2734      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2735
2736    // If both the sink and the source class names are the same, then instantiate
2737    // only one object.
2738    if (sourceClassname.equals(sinkClassname)) {
2739      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2740        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2741      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2742      server.sameReplicationSourceAndSink = true;
2743    } else {
2744      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2745        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2746      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2747        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2748      server.sameReplicationSourceAndSink = false;
2749    }
2750  }
2751
2752  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2753    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2754    Path oldLogDir, WALFactory walFactory) throws IOException {
2755    final Class<? extends T> clazz;
2756    try {
2757      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2758      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2759    } catch (java.lang.ClassNotFoundException nfe) {
2760      throw new IOException("Could not find class for " + classname);
2761    }
2762    T service = ReflectionUtils.newInstance(clazz, conf);
2763    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2764    return service;
2765  }
2766
2767  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
2768    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2769    if (!this.isOnline()) {
2770      return walGroupsReplicationStatus;
2771    }
2772    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2773    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2774    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2775    for (ReplicationSourceInterface source : allSources) {
2776      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2777    }
2778    return walGroupsReplicationStatus;
2779  }
2780
2781  /**
2782   * Utility for constructing an instance of the passed HRegionServer class.
2783   */
2784  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
2785    final Configuration conf) {
2786    try {
2787      Constructor<? extends HRegionServer> c =
2788        regionServerClass.getConstructor(Configuration.class);
2789      return c.newInstance(conf);
2790    } catch (Exception e) {
2791      throw new RuntimeException(
2792        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
2793    }
2794  }
2795
2796  /**
2797   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2798   */
2799  public static void main(String[] args) {
2800    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2801    VersionInfo.logVersion();
2802    Configuration conf = HBaseConfiguration.create();
2803    @SuppressWarnings("unchecked")
2804    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2805      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2806
2807    new HRegionServerCommandLine(regionServerClass).doMain(args);
2808  }
2809
2810  /**
2811   * Gets the online regions of the specified table. This method looks at the in-memory
2812   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
2813   * If a region on this table has been closed during a disable, etc., it will not be included in
2814   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
2815   * all the ONLINE regions in the table.
2816   * @param tableName table to limit the scope of the query
2817   * @return Online regions from <code>tableName</code>
2818   */
2819  @Override
2820  public List<HRegion> getRegions(TableName tableName) {
2821    List<HRegion> tableRegions = new ArrayList<>();
2822    synchronized (this.onlineRegions) {
2823      for (HRegion region : this.onlineRegions.values()) {
2824        RegionInfo regionInfo = region.getRegionInfo();
2825        if (regionInfo.getTable().equals(tableName)) {
2826          tableRegions.add(region);
2827        }
2828      }
2829    }
2830    return tableRegions;
2831  }
2832
2833  @Override
2834  public List<HRegion> getRegions() {
2835    List<HRegion> allRegions;
2836    synchronized (this.onlineRegions) {
2837      // Return a clone copy of the onlineRegions
2838      allRegions = new ArrayList<>(onlineRegions.values());
2839    }
2840    return allRegions;
2841  }
2842
2843  /**
2844   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
2845   * @return all the online tables in this RS
2846   */
2847  public Set<TableName> getOnlineTables() {
2848    Set<TableName> tables = new HashSet<>();
2849    synchronized (this.onlineRegions) {
2850      for (Region region : this.onlineRegions.values()) {
2851        tables.add(region.getTableDescriptor().getTableName());
2852      }
2853    }
2854    return tables;
2855  }
2856
2857  public String[] getRegionServerCoprocessors() {
2858    TreeSet<String> coprocessors = new TreeSet<>();
2859    try {
2860      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2861    } catch (IOException exception) {
2862      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
2863        + "skipping.");
2864      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2865    }
2866    Collection<HRegion> regions = getOnlineRegionsLocalContext();
2867    for (HRegion region : regions) {
2868      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2869      try {
2870        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2871      } catch (IOException exception) {
2872        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
2873          + "; skipping.");
2874        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2875      }
2876    }
2877    coprocessors.addAll(rsHost.getCoprocessors());
2878    return coprocessors.toArray(new String[0]);
2879  }
2880
2881  /**
2882   * Try to close the region, logs a warning on failure but continues.
2883   * @param region Region to close
2884   */
2885  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
2886    try {
2887      if (!closeRegion(region.getEncodedName(), abort, null)) {
2888        LOG
2889          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
2890      }
2891    } catch (IOException e) {
2892      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
2893        e);
2894    }
2895  }
2896
2897  /**
2898   * Close asynchronously a region, can be called from the master or internally by the regionserver
2899   * when stopping. If called from the master, the region will update the status.
2900   * <p>
2901   * If an opening was in progress, this method will cancel it, but will not start a new close. The
2902   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2903   * </p>
2904   * <p>
2905   * If a close was in progress, this new request will be ignored, and an exception thrown.
2906   * </p>
2907   * @param encodedName Region to close
2908   * @param abort       True if we are aborting
2909   * @param destination Where the Region is being moved too... maybe null if unknown.
2910   * @return True if closed a region.
2911   * @throws NotServingRegionException if the region is not online
2912   */
2913  protected boolean closeRegion(String encodedName, final boolean abort,
2914    final ServerName destination) throws NotServingRegionException {
2915    // Check for permissions to close.
2916    HRegion actualRegion = this.getRegion(encodedName);
2917    // Can be null if we're calling close on a region that's not online
2918    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2919      try {
2920        actualRegion.getCoprocessorHost().preClose(false);
2921      } catch (IOException exp) {
2922        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2923        return false;
2924      }
2925    }
2926
2927    // previous can come back 'null' if not in map.
2928    final Boolean previous =
2929      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
2930
2931    if (Boolean.TRUE.equals(previous)) {
2932      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
2933        + "trying to OPEN. Cancelling OPENING.");
2934      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
2935        // The replace failed. That should be an exceptional case, but theoretically it can happen.
2936        // We're going to try to do a standard close then.
2937        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
2938          + " Doing a standard close now");
2939        return closeRegion(encodedName, abort, destination);
2940      }
2941      // Let's get the region from the online region list again
2942      actualRegion = this.getRegion(encodedName);
2943      if (actualRegion == null) { // If already online, we still need to close it.
2944        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2945        // The master deletes the znode when it receives this exception.
2946        throw new NotServingRegionException(
2947          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
2948      }
2949    } else if (previous == null) {
2950      LOG.info("Received CLOSE for {}", encodedName);
2951    } else if (Boolean.FALSE.equals(previous)) {
2952      LOG.info("Received CLOSE for the region: " + encodedName
2953        + ", which we are already trying to CLOSE, but not completed yet");
2954      return true;
2955    }
2956
2957    if (actualRegion == null) {
2958      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
2959      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
2960      // The master deletes the znode when it receives this exception.
2961      throw new NotServingRegionException(
2962        "The region " + encodedName + " is not online, and is not opening.");
2963    }
2964
2965    CloseRegionHandler crh;
2966    final RegionInfo hri = actualRegion.getRegionInfo();
2967    if (hri.isMetaRegion()) {
2968      crh = new CloseMetaHandler(this, this, hri, abort);
2969    } else {
2970      crh = new CloseRegionHandler(this, this, hri, abort, destination);
2971    }
2972    this.executorService.submit(crh);
2973    return true;
2974  }
2975
2976  /**
2977   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
2978   *         member of the online regions.
2979   */
2980  public HRegion getOnlineRegion(final byte[] regionName) {
2981    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
2982    return this.onlineRegions.get(encodedRegionName);
2983  }
2984
2985  @Override
2986  public HRegion getRegion(final String encodedRegionName) {
2987    return this.onlineRegions.get(encodedRegionName);
2988  }
2989
2990  @Override
2991  public boolean removeRegion(final HRegion r, ServerName destination) {
2992    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2993    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
2994    if (destination != null) {
2995      long closeSeqNum = r.getMaxFlushedSeqId();
2996      if (closeSeqNum == HConstants.NO_SEQNUM) {
2997        // No edits in WAL for this region; get the sequence number when the region was opened.
2998        closeSeqNum = r.getOpenSeqNum();
2999        if (closeSeqNum == HConstants.NO_SEQNUM) {
3000          closeSeqNum = 0;
3001        }
3002      }
3003      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3004      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3005      if (selfMove) {
3006        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3007          r.getRegionInfo().getEncodedName(),
3008          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3009      }
3010    }
3011    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3012    configurationManager.deregisterObserver(r);
3013    return toReturn != null;
3014  }
3015
3016  /**
3017   * Protected Utility method for safely obtaining an HRegion handle.
3018   * @param regionName Name of online {@link HRegion} to return
3019   * @return {@link HRegion} for <code>regionName</code>
3020   */
3021  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3022    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3023    return getRegionByEncodedName(regionName, encodedRegionName);
3024  }
3025
3026  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3027    return getRegionByEncodedName(null, encodedRegionName);
3028  }
3029
3030  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3031    throws NotServingRegionException {
3032    HRegion region = this.onlineRegions.get(encodedRegionName);
3033    if (region == null) {
3034      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3035      if (moveInfo != null) {
3036        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3037      }
3038      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3039      String regionNameStr =
3040        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3041      if (isOpening != null && isOpening) {
3042        throw new RegionOpeningException(
3043          "Region " + regionNameStr + " is opening on " + this.serverName);
3044      }
3045      throw new NotServingRegionException(
3046        "" + regionNameStr + " is not online on " + this.serverName);
3047    }
3048    return region;
3049  }
3050
3051  /**
3052   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3053   * already.
3054   * @param t   Throwable
3055   * @param msg Message to log in error. Can be null.
3056   * @return Throwable converted to an IOE; methods can only let out IOEs.
3057   */
3058  private Throwable cleanup(final Throwable t, final String msg) {
3059    // Don't log as error if NSRE; NSRE is 'normal' operation.
3060    if (t instanceof NotServingRegionException) {
3061      LOG.debug("NotServingRegionException; " + t.getMessage());
3062      return t;
3063    }
3064    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3065    if (msg == null) {
3066      LOG.error("", e);
3067    } else {
3068      LOG.error(msg, e);
3069    }
3070    if (!rpcServices.checkOOME(t)) {
3071      checkFileSystem();
3072    }
3073    return t;
3074  }
3075
3076  /**
3077   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3078   * @return Make <code>t</code> an IOE if it isn't already.
3079   */
3080  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3081    return (t instanceof IOException ? (IOException) t
3082      : msg == null || msg.length() == 0 ? new IOException(t)
3083      : new IOException(msg, t));
3084  }
3085
3086  /**
3087   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3088   * stopRequested
3089   * @return false if file system is not available
3090   */
3091  boolean checkFileSystem() {
3092    if (this.dataFsOk && this.dataFs != null) {
3093      try {
3094        FSUtils.checkFileSystemAvailable(this.dataFs);
3095      } catch (IOException e) {
3096        abort("File System not available", e);
3097        this.dataFsOk = false;
3098      }
3099    }
3100    return this.dataFsOk;
3101  }
3102
3103  @Override
3104  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3105    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3106    Address[] addr = new Address[favoredNodes.size()];
3107    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3108    // it is a map of region name to Address[]
3109    for (int i = 0; i < favoredNodes.size(); i++) {
3110      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3111    }
3112    regionFavoredNodesMap.put(encodedRegionName, addr);
3113  }
3114
3115  /**
3116   * Return the favored nodes for a region given its encoded name. Look at the comment around
3117   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3118   * @param encodedRegionName the encoded region name.
3119   * @return array of favored locations
3120   */
3121  @Override
3122  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3123    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3124  }
3125
3126  @Override
3127  public ServerNonceManager getNonceManager() {
3128    return this.nonceManager;
3129  }
3130
3131  private static class MovedRegionInfo {
3132    private final ServerName serverName;
3133    private final long seqNum;
3134
3135    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3136      this.serverName = serverName;
3137      this.seqNum = closeSeqNum;
3138    }
3139
3140    public ServerName getServerName() {
3141      return serverName;
3142    }
3143
3144    public long getSeqNum() {
3145      return seqNum;
3146    }
3147  }
3148
3149  /**
3150   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3151   * number of network calls instead of reducing them.
3152   */
3153  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3154
3155  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3156    boolean selfMove) {
3157    if (selfMove) {
3158      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3159      return;
3160    }
3161    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3162      + closeSeqNum);
3163    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3164  }
3165
3166  void removeFromMovedRegions(String encodedName) {
3167    movedRegionInfoCache.invalidate(encodedName);
3168  }
3169
3170  @InterfaceAudience.Private
3171  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3172    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3173  }
3174
3175  @InterfaceAudience.Private
3176  public int movedRegionCacheExpiredTime() {
3177    return TIMEOUT_REGION_MOVED;
3178  }
3179
3180  private String getMyEphemeralNodePath() {
3181    return zooKeeper.getZNodePaths().getRsPath(serverName);
3182  }
3183
3184  private boolean isHealthCheckerConfigured() {
3185    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3186    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3187  }
3188
3189  /** Returns the underlying {@link CompactSplit} for the servers */
3190  public CompactSplit getCompactSplitThread() {
3191    return this.compactSplitThread;
3192  }
3193
3194  CoprocessorServiceResponse execRegionServerService(
3195    @SuppressWarnings("UnusedParameters") final RpcController controller,
3196    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3197    try {
3198      ServerRpcController serviceController = new ServerRpcController();
3199      CoprocessorServiceCall call = serviceRequest.getCall();
3200      String serviceName = call.getServiceName();
3201      Service service = coprocessorServiceHandlers.get(serviceName);
3202      if (service == null) {
3203        throw new UnknownProtocolException(null,
3204          "No registered coprocessor executorService found for " + serviceName);
3205      }
3206      ServiceDescriptor serviceDesc = service.getDescriptorForType();
3207
3208      String methodName = call.getMethodName();
3209      MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3210      if (methodDesc == null) {
3211        throw new UnknownProtocolException(service.getClass(),
3212          "Unknown method " + methodName + " called on executorService " + serviceName);
3213      }
3214
3215      Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3216      final Message.Builder responseBuilder =
3217        service.getResponsePrototype(methodDesc).newBuilderForType();
3218      service.callMethod(methodDesc, serviceController, request, message -> {
3219        if (message != null) {
3220          responseBuilder.mergeFrom(message);
3221        }
3222      });
3223      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3224      if (exception != null) {
3225        throw exception;
3226      }
3227      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3228    } catch (IOException ie) {
3229      throw new ServiceException(ie);
3230    }
3231  }
3232
3233  /**
3234   * May be null if this is a master which not carry table.
3235   * @return The block cache instance used by the regionserver.
3236   */
3237  @Override
3238  public Optional<BlockCache> getBlockCache() {
3239    return Optional.ofNullable(this.blockCache);
3240  }
3241
3242  /**
3243   * May be null if this is a master which not carry table.
3244   * @return The cache for mob files used by the regionserver.
3245   */
3246  @Override
3247  public Optional<MobFileCache> getMobFileCache() {
3248    return Optional.ofNullable(this.mobFileCache);
3249  }
3250
3251  /** Returns : Returns the ConfigurationManager object for testing purposes. */
3252  ConfigurationManager getConfigurationManager() {
3253    return configurationManager;
3254  }
3255
3256  CacheEvictionStats clearRegionBlockCache(Region region) {
3257    long evictedBlocks = 0;
3258
3259    for (Store store : region.getStores()) {
3260      for (StoreFile hFile : store.getStorefiles()) {
3261        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3262      }
3263    }
3264
3265    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3266  }
3267
3268  @Override
3269  public double getCompactionPressure() {
3270    double max = 0;
3271    for (Region region : onlineRegions.values()) {
3272      for (Store store : region.getStores()) {
3273        double normCount = store.getCompactionPressure();
3274        if (normCount > max) {
3275          max = normCount;
3276        }
3277      }
3278    }
3279    return max;
3280  }
3281
3282  @Override
3283  public HeapMemoryManager getHeapMemoryManager() {
3284    return hMemManager;
3285  }
3286
3287  public MemStoreFlusher getMemStoreFlusher() {
3288    return cacheFlusher;
3289  }
3290
3291  /**
3292   * For testing
3293   * @return whether all wal roll request finished for this regionserver
3294   */
3295  @InterfaceAudience.Private
3296  public boolean walRollRequestFinished() {
3297    return this.walRoller.walRollFinished();
3298  }
3299
3300  @Override
3301  public ThroughputController getFlushThroughputController() {
3302    return flushThroughputController;
3303  }
3304
3305  @Override
3306  public double getFlushPressure() {
3307    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3308      // return 0 during RS initialization
3309      return 0.0;
3310    }
3311    return getRegionServerAccounting().getFlushPressure();
3312  }
3313
3314  @Override
3315  public void onConfigurationChange(Configuration newConf) {
3316    ThroughputController old = this.flushThroughputController;
3317    if (old != null) {
3318      old.stop("configuration change");
3319    }
3320    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3321    try {
3322      Superusers.initialize(newConf);
3323    } catch (IOException e) {
3324      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3325    }
3326
3327    // update region server coprocessor if the configuration has changed.
3328    if (
3329      CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
3330        CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)
3331    ) {
3332      LOG.info("Update region server coprocessors because the configuration has changed");
3333      this.rsHost = new RegionServerCoprocessorHost(this, newConf);
3334    }
3335  }
3336
3337  @Override
3338  public MetricsRegionServer getMetrics() {
3339    return metricsRegionServer;
3340  }
3341
3342  @Override
3343  public SecureBulkLoadManager getSecureBulkLoadManager() {
3344    return this.secureBulkLoadManager;
3345  }
3346
3347  @Override
3348  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3349    final Abortable abort) {
3350    final LockServiceClient client =
3351      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3352    return client.regionLock(regionInfo, description, abort);
3353  }
3354
3355  @Override
3356  public void unassign(byte[] regionName) throws IOException {
3357    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3358  }
3359
3360  @Override
3361  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3362    return this.rsSpaceQuotaManager;
3363  }
3364
3365  @Override
3366  public boolean reportFileArchivalForQuotas(TableName tableName,
3367    Collection<Entry<String, Long>> archivedFiles) {
3368    if (TEST_SKIP_REPORTING_TRANSITION) {
3369      return false;
3370    }
3371    RegionServerStatusService.BlockingInterface rss = rssStub;
3372    if (rss == null || rsSpaceQuotaManager == null) {
3373      // the current server could be stopping.
3374      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3375      return false;
3376    }
3377    try {
3378      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3379        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3380      rss.reportFileArchival(null, request);
3381    } catch (ServiceException se) {
3382      IOException ioe = ProtobufUtil.getRemoteException(se);
3383      if (ioe instanceof PleaseHoldException) {
3384        if (LOG.isTraceEnabled()) {
3385          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3386            + " This will be retried.", ioe);
3387        }
3388        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3389        return false;
3390      }
3391      if (rssStub == rss) {
3392        rssStub = null;
3393      }
3394      // re-create the stub if we failed to report the archival
3395      createRegionServerStatusStub(true);
3396      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3397      return false;
3398    }
3399    return true;
3400  }
3401
3402  void executeProcedure(long procId, RSProcedureCallable callable) {
3403    executorService.submit(new RSProcedureHandler(this, procId, callable));
3404  }
3405
3406  public void remoteProcedureComplete(long procId, Throwable error) {
3407    procedureResultReporter.complete(procId, error);
3408  }
3409
3410  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3411    RegionServerStatusService.BlockingInterface rss;
3412    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3413    for (;;) {
3414      rss = rssStub;
3415      if (rss != null) {
3416        break;
3417      }
3418      createRegionServerStatusStub();
3419    }
3420    try {
3421      rss.reportProcedureDone(null, request);
3422    } catch (ServiceException se) {
3423      if (rssStub == rss) {
3424        rssStub = null;
3425      }
3426      throw ProtobufUtil.getRemoteException(se);
3427    }
3428  }
3429
3430  /**
3431   * Will ignore the open/close region procedures which already submitted or executed. When master
3432   * had unfinished open/close region procedure and restarted, new active master may send duplicate
3433   * open/close region request to regionserver. The open/close request is submitted to a thread pool
3434   * and execute. So first need a cache for submitted open/close region procedures. After the
3435   * open/close region request executed and report region transition succeed, cache it in executed
3436   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3437   * transition succeed, master will not send the open/close region request to regionserver again.
3438   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3439   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
3440   * HBASE-22404 for more details.
3441   * @param procId the id of the open/close region procedure
3442   * @return true if the procedure can be submitted.
3443   */
3444  boolean submitRegionProcedure(long procId) {
3445    if (procId == -1) {
3446      return true;
3447    }
3448    // Ignore the region procedures which already submitted.
3449    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3450    if (previous != null) {
3451      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3452      return false;
3453    }
3454    // Ignore the region procedures which already executed.
3455    if (executedRegionProcedures.getIfPresent(procId) != null) {
3456      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3457      return false;
3458    }
3459    return true;
3460  }
3461
3462  /**
3463   * See {@link #submitRegionProcedure(long)}.
3464   * @param procId the id of the open/close region procedure
3465   */
3466  public void finishRegionProcedure(long procId) {
3467    executedRegionProcedures.put(procId, procId);
3468    submittedRegionProcedures.remove(procId);
3469  }
3470
3471  /**
3472   * Force to terminate region server when abort timeout.
3473   */
3474  private static class SystemExitWhenAbortTimeout extends TimerTask {
3475
3476    public SystemExitWhenAbortTimeout() {
3477    }
3478
3479    @Override
3480    public void run() {
3481      LOG.warn("Aborting region server timed out, terminating forcibly"
3482        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
3483        + " Thread dump to stdout.");
3484      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3485      Runtime.getRuntime().halt(1);
3486    }
3487  }
3488
3489  @InterfaceAudience.Private
3490  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3491    return compactedFileDischarger;
3492  }
3493
3494  /**
3495   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3496   * @return pause time
3497   */
3498  @InterfaceAudience.Private
3499  public long getRetryPauseTime() {
3500    return this.retryPauseTime;
3501  }
3502
3503  @Override
3504  public Optional<ServerName> getActiveMaster() {
3505    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3506  }
3507
3508  @Override
3509  public List<ServerName> getBackupMasters() {
3510    return masterAddressTracker.getBackupMasters();
3511  }
3512
3513  @Override
3514  public Iterator<ServerName> getBootstrapNodes() {
3515    return bootstrapNodeManager.getBootstrapNodes().iterator();
3516  }
3517
3518  @Override
3519  public List<HRegionLocation> getMetaLocations() {
3520    return metaRegionLocationCache.getMetaRegionLocations();
3521  }
3522
3523  @Override
3524  protected NamedQueueRecorder createNamedQueueRecord() {
3525    final boolean isOnlineLogProviderEnabled = conf.getBoolean(
3526      HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
3527    if (isOnlineLogProviderEnabled) {
3528      return NamedQueueRecorder.getInstance(conf);
3529    } else {
3530      return null;
3531    }
3532  }
3533
3534  @Override
3535  protected boolean clusterMode() {
3536    // this method will be called in the constructor of super class, so we can not return masterless
3537    // directly here, as it will always be false.
3538    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3539  }
3540
3541  @InterfaceAudience.Private
3542  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
3543    return brokenStoreFileCleaner;
3544  }
3545
3546  @InterfaceAudience.Private
3547  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
3548    return rsMobFileCleanerChore;
3549  }
3550
3551  RSSnapshotVerifier getRsSnapshotVerifier() {
3552    return rsSnapshotVerifier;
3553  }
3554
3555  @Override
3556  protected void stopChores() {
3557    shutdownChore(nonceManagerChore);
3558    shutdownChore(compactionChecker);
3559    shutdownChore(compactedFileDischarger);
3560    shutdownChore(periodicFlusher);
3561    shutdownChore(healthCheckChore);
3562    shutdownChore(executorStatusChore);
3563    shutdownChore(storefileRefresher);
3564    shutdownChore(fsUtilizationChore);
3565    shutdownChore(slowLogTableOpsChore);
3566    shutdownChore(brokenStoreFileCleaner);
3567    shutdownChore(rsMobFileCleanerChore);
3568  }
3569
3570  @Override
3571  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3572    return regionReplicationBufferManager;
3573  }
3574}