001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
024import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
025
026import io.opentelemetry.api.trace.Span;
027import io.opentelemetry.api.trace.StatusCode;
028import io.opentelemetry.context.Scope;
029import java.io.IOException;
030import java.io.PrintWriter;
031import java.lang.management.MemoryUsage;
032import java.lang.reflect.Constructor;
033import java.net.InetSocketAddress;
034import java.time.Duration;
035import java.util.ArrayList;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.Comparator;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Map;
043import java.util.Map.Entry;
044import java.util.Objects;
045import java.util.Optional;
046import java.util.Set;
047import java.util.SortedMap;
048import java.util.Timer;
049import java.util.TimerTask;
050import java.util.TreeMap;
051import java.util.TreeSet;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentMap;
054import java.util.concurrent.ConcurrentSkipListMap;
055import java.util.concurrent.ThreadLocalRandom;
056import java.util.concurrent.TimeUnit;
057import java.util.concurrent.atomic.AtomicBoolean;
058import java.util.concurrent.locks.ReentrantReadWriteLock;
059import java.util.stream.Collectors;
060import javax.management.MalformedObjectNameException;
061import javax.servlet.http.HttpServlet;
062import org.apache.commons.lang3.StringUtils;
063import org.apache.hadoop.conf.Configuration;
064import org.apache.hadoop.fs.FileSystem;
065import org.apache.hadoop.fs.Path;
066import org.apache.hadoop.hbase.Abortable;
067import org.apache.hadoop.hbase.CacheEvictionStats;
068import org.apache.hadoop.hbase.CallQueueTooBigException;
069import org.apache.hadoop.hbase.ClockOutOfSyncException;
070import org.apache.hadoop.hbase.DoNotRetryIOException;
071import org.apache.hadoop.hbase.ExecutorStatusChore;
072import org.apache.hadoop.hbase.HBaseConfiguration;
073import org.apache.hadoop.hbase.HBaseInterfaceAudience;
074import org.apache.hadoop.hbase.HBaseServerBase;
075import org.apache.hadoop.hbase.HConstants;
076import org.apache.hadoop.hbase.HDFSBlocksDistribution;
077import org.apache.hadoop.hbase.HRegionLocation;
078import org.apache.hadoop.hbase.HealthCheckChore;
079import org.apache.hadoop.hbase.MetaTableAccessor;
080import org.apache.hadoop.hbase.NotServingRegionException;
081import org.apache.hadoop.hbase.PleaseHoldException;
082import org.apache.hadoop.hbase.ScheduledChore;
083import org.apache.hadoop.hbase.ServerName;
084import org.apache.hadoop.hbase.Stoppable;
085import org.apache.hadoop.hbase.TableName;
086import org.apache.hadoop.hbase.YouAreDeadException;
087import org.apache.hadoop.hbase.ZNodeClearer;
088import org.apache.hadoop.hbase.client.ConnectionUtils;
089import org.apache.hadoop.hbase.client.RegionInfo;
090import org.apache.hadoop.hbase.client.RegionInfoBuilder;
091import org.apache.hadoop.hbase.client.locking.EntityLock;
092import org.apache.hadoop.hbase.client.locking.LockServiceClient;
093import org.apache.hadoop.hbase.conf.ConfigurationManager;
094import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
095import org.apache.hadoop.hbase.exceptions.RegionMovedException;
096import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
097import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
098import org.apache.hadoop.hbase.executor.ExecutorType;
099import org.apache.hadoop.hbase.http.InfoServer;
100import org.apache.hadoop.hbase.io.hfile.BlockCache;
101import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
102import org.apache.hadoop.hbase.io.hfile.HFile;
103import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
104import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
105import org.apache.hadoop.hbase.ipc.RpcClient;
106import org.apache.hadoop.hbase.ipc.RpcServer;
107import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
108import org.apache.hadoop.hbase.ipc.ServerRpcController;
109import org.apache.hadoop.hbase.log.HBaseMarkers;
110import org.apache.hadoop.hbase.mob.MobFileCache;
111import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
112import org.apache.hadoop.hbase.monitoring.TaskMonitor;
113import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
114import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
115import org.apache.hadoop.hbase.net.Address;
116import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
117import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
118import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
119import org.apache.hadoop.hbase.quotas.QuotaUtil;
120import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
121import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
122import org.apache.hadoop.hbase.quotas.RegionSize;
123import org.apache.hadoop.hbase.quotas.RegionSizeStore;
124import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
125import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
126import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
127import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
128import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
129import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
130import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
131import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
132import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
133import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
134import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
135import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
136import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
137import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
138import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
139import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
140import org.apache.hadoop.hbase.security.SecurityConstants;
141import org.apache.hadoop.hbase.security.Superusers;
142import org.apache.hadoop.hbase.security.User;
143import org.apache.hadoop.hbase.security.UserProvider;
144import org.apache.hadoop.hbase.trace.TraceUtil;
145import org.apache.hadoop.hbase.util.Bytes;
146import org.apache.hadoop.hbase.util.CompressionTest;
147import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
148import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
149import org.apache.hadoop.hbase.util.FSUtils;
150import org.apache.hadoop.hbase.util.FutureUtils;
151import org.apache.hadoop.hbase.util.JvmPauseMonitor;
152import org.apache.hadoop.hbase.util.Pair;
153import org.apache.hadoop.hbase.util.RetryCounter;
154import org.apache.hadoop.hbase.util.RetryCounterFactory;
155import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
156import org.apache.hadoop.hbase.util.Threads;
157import org.apache.hadoop.hbase.util.VersionInfo;
158import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
159import org.apache.hadoop.hbase.wal.WAL;
160import org.apache.hadoop.hbase.wal.WALFactory;
161import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
162import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
163import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
164import org.apache.hadoop.hbase.zookeeper.ZKUtil;
165import org.apache.hadoop.ipc.RemoteException;
166import org.apache.hadoop.util.ReflectionUtils;
167import org.apache.yetus.audience.InterfaceAudience;
168import org.apache.zookeeper.KeeperException;
169import org.slf4j.Logger;
170import org.slf4j.LoggerFactory;
171
172import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
173import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
174import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
175import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
176import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
177import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
178import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
179import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
180import org.apache.hbase.thirdparty.com.google.protobuf.Message;
181import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
182import org.apache.hbase.thirdparty.com.google.protobuf.Service;
183import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
184import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
185import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
186
187import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
188import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
218
219/**
220 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
221 * are many HRegionServers in a single HBase deployment.
222 */
223@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
224@SuppressWarnings({ "deprecation" })
225public class HRegionServer extends HBaseServerBase<RSRpcServices>
226  implements RegionServerServices, LastSequenceId {
227
228  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
229
230  /**
231   * For testing only! Set to true to skip notifying region assignment to master .
232   */
233  @InterfaceAudience.Private
234  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
235  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
236
237  /**
238   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
239   * region action in progress false - if close region action in progress
240   */
241  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
242    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
243
244  /**
245   * Used to cache the open/close region procedures which already submitted. See
246   * {@link #submitRegionProcedure(long)}.
247   */
248  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
249  /**
250   * Used to cache the open/close region procedures which already executed. See
251   * {@link #submitRegionProcedure(long)}.
252   */
253  private final Cache<Long, Long> executedRegionProcedures =
254    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
255
256  /**
257   * Used to cache the moved-out regions
258   */
259  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
260    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
261
262  private MemStoreFlusher cacheFlusher;
263
264  private HeapMemoryManager hMemManager;
265
266  // Replication services. If no replication, this handler will be null.
267  private ReplicationSourceService replicationSourceHandler;
268  private ReplicationSinkService replicationSinkHandler;
269  private boolean sameReplicationSourceAndSink;
270
271  // Compactions
272  private CompactSplit compactSplitThread;
273
274  /**
275   * Map of regions currently being served by this region server. Key is the encoded region name.
276   * All access should be synchronized.
277   */
278  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
279  /**
280   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
281   * need to be a ConcurrentHashMap?
282   */
283  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
284
285  /**
286   * Map of encoded region names to the DataNode locations they should be hosted on We store the
287   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
288   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
289   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
290   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
291   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
292   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
293   * will be fresh when we need it.
294   */
295  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
296
297  private LeaseManager leaseManager;
298
299  private volatile boolean dataFsOk;
300
301  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
302  // Default abort timeout is 1200 seconds for safe
303  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
304  // Will run this task when abort timeout
305  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
306
307  // A state before we go into stopped state. At this stage we're closing user
308  // space regions.
309  private boolean stopping = false;
310  private volatile boolean killed = false;
311
312  private final int threadWakeFrequency;
313
314  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
315  private final int compactionCheckFrequency;
316  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
317  private final int flushCheckFrequency;
318
319  // Stub to do region server status calls against the master.
320  private volatile RegionServerStatusService.BlockingInterface rssStub;
321  private volatile LockService.BlockingInterface lockStub;
322  // RPC client. Used to make the stub above that does region server status checking.
323  private RpcClient rpcClient;
324
325  private UncaughtExceptionHandler uncaughtExceptionHandler;
326
327  private JvmPauseMonitor pauseMonitor;
328
329  private RSSnapshotVerifier rsSnapshotVerifier;
330
331  /** region server process name */
332  public static final String REGIONSERVER = "regionserver";
333
334  private MetricsRegionServer metricsRegionServer;
335  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
336
337  /**
338   * Check for compactions requests.
339   */
340  private ScheduledChore compactionChecker;
341
342  /**
343   * Check for flushes
344   */
345  private ScheduledChore periodicFlusher;
346
347  private volatile WALFactory walFactory;
348
349  private LogRoller walRoller;
350
351  // A thread which calls reportProcedureDone
352  private RemoteProcedureResultReporter procedureResultReporter;
353
354  // flag set after we're done setting up server threads
355  final AtomicBoolean online = new AtomicBoolean(false);
356
357  // master address tracker
358  private final MasterAddressTracker masterAddressTracker;
359
360  // Log Splitting Worker
361  private SplitLogWorker splitLogWorker;
362
363  private final int shortOperationTimeout;
364
365  // Time to pause if master says 'please hold'
366  private final long retryPauseTime;
367
368  private final RegionServerAccounting regionServerAccounting;
369
370  private SlowLogTableOpsChore slowLogTableOpsChore = null;
371
372  // Block cache
373  private BlockCache blockCache;
374  // The cache for mob files
375  private MobFileCache mobFileCache;
376
377  /** The health check chore. */
378  private HealthCheckChore healthCheckChore;
379
380  /** The Executor status collect chore. */
381  private ExecutorStatusChore executorStatusChore;
382
383  /** The nonce manager chore. */
384  private ScheduledChore nonceManagerChore;
385
386  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
387
388  /**
389   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
390   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
391   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
392   */
393  @Deprecated
394  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
395  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
396    "hbase.regionserver.hostname.disable.master.reversedns";
397
398  /**
399   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
400   * Exception will be thrown if both are used.
401   */
402  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
403  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
404    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
405
406  /**
407   * Unique identifier for the cluster we are a part of.
408   */
409  private String clusterId;
410
411  // chore for refreshing store files for secondary regions
412  private StorefileRefresherChore storefileRefresher;
413
414  private volatile RegionServerCoprocessorHost rsHost;
415
416  private RegionServerProcedureManagerHost rspmHost;
417
418  private RegionServerRpcQuotaManager rsQuotaManager;
419  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
420
421  /**
422   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
423   * case where client doesn't receive the response from a successful operation and retries. We
424   * track the successful ops for some time via a nonce sent by client and handle duplicate
425   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
426   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
427   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
428   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
429   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
430   * recovery during normal region move, so nonces will not be transfered. We can have separate
431   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
432   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
433   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
434   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
435   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
436   * recovered during move.
437   */
438  final ServerNonceManager nonceManager;
439
440  private BrokenStoreFileCleaner brokenStoreFileCleaner;
441
442  private RSMobFileCleanerChore rsMobFileCleanerChore;
443
444  @InterfaceAudience.Private
445  CompactedHFilesDischarger compactedFileDischarger;
446
447  private volatile ThroughputController flushThroughputController;
448
449  private SecureBulkLoadManager secureBulkLoadManager;
450
451  private FileSystemUtilizationChore fsUtilizationChore;
452
453  private BootstrapNodeManager bootstrapNodeManager;
454
455  /**
456   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
457   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
458   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
459   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
460   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
461   */
462  private final boolean masterless;
463  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
464
465  /** regionserver codec list **/
466  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
467
468  // A timer to shutdown the process if abort takes too long
469  private Timer abortMonitor;
470
471  private RegionReplicationBufferManager regionReplicationBufferManager;
472
473  /**
474   * Starts a HRegionServer at the default location.
475   * <p/>
476   * Don't start any services or managers in here in the Constructor. Defer till after we register
477   * with the Master as much as possible. See {@link #startServices}.
478   */
479  public HRegionServer(final Configuration conf) throws IOException {
480    super(conf, "RegionServer"); // thread name
481    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
482    try (Scope ignored = span.makeCurrent()) {
483      this.dataFsOk = true;
484      this.masterless = !clusterMode();
485      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
486      HFile.checkHFileVersion(this.conf);
487      checkCodecs(this.conf);
488      FSUtils.setupShortCircuitRead(this.conf);
489
490      // Disable usage of meta replicas in the regionserver
491      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
492      // Config'ed params
493      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
494      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
495      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
496
497      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
498      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
499
500      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
501        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
502
503      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
504        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
505
506      regionServerAccounting = new RegionServerAccounting(conf);
507
508      blockCache = BlockCacheFactory.createBlockCache(conf);
509      mobFileCache = new MobFileCache(conf);
510
511      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
512
513      uncaughtExceptionHandler =
514        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
515
516      // If no master in cluster, skip trying to track one or look for a cluster status.
517      if (!this.masterless) {
518        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
519        masterAddressTracker.start();
520      } else {
521        masterAddressTracker = null;
522      }
523      this.rpcServices.start(zooKeeper);
524      span.setStatus(StatusCode.OK);
525    } catch (Throwable t) {
526      // Make sure we log the exception. HRegionServer is often started via reflection and the
527      // cause of failed startup is lost.
528      TraceUtil.setError(span, t);
529      LOG.error("Failed construction RegionServer", t);
530      throw t;
531    } finally {
532      span.end();
533    }
534  }
535
536  // HMaster should override this method to load the specific config for master
537  @Override
538  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
539    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
540    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
541      if (!StringUtils.isBlank(hostname)) {
542        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
543          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
544          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
545          + UNSAFE_RS_HOSTNAME_KEY + " is used";
546        throw new IOException(msg);
547      } else {
548        return rpcServices.getSocketAddress().getHostName();
549      }
550    } else {
551      return hostname;
552    }
553  }
554
555  @Override
556  protected void login(UserProvider user, String host) throws IOException {
557    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
558      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
559  }
560
561  @Override
562  protected String getProcessName() {
563    return REGIONSERVER;
564  }
565
566  @Override
567  protected boolean canCreateBaseZNode() {
568    return !clusterMode();
569  }
570
571  @Override
572  protected boolean canUpdateTableDescriptor() {
573    return false;
574  }
575
576  @Override
577  protected boolean cacheTableDescriptor() {
578    return false;
579  }
580
581  protected RSRpcServices createRpcServices() throws IOException {
582    return new RSRpcServices(this);
583  }
584
585  @Override
586  protected void configureInfoServer(InfoServer infoServer) {
587    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
588    infoServer.setAttribute(REGIONSERVER, this);
589  }
590
591  @Override
592  protected Class<? extends HttpServlet> getDumpServlet() {
593    return RSDumpServlet.class;
594  }
595
596  /**
597   * Used by {@link RSDumpServlet} to generate debugging information.
598   */
599  public void dumpRowLocks(final PrintWriter out) {
600    StringBuilder sb = new StringBuilder();
601    for (HRegion region : getRegions()) {
602      if (region.getLockedRows().size() > 0) {
603        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
604          sb.setLength(0);
605          sb.append(region.getTableDescriptor().getTableName()).append(",")
606            .append(region.getRegionInfo().getEncodedName()).append(",");
607          sb.append(rowLockContext.toString());
608          out.println(sb);
609        }
610      }
611    }
612  }
613
614  @Override
615  public boolean registerService(Service instance) {
616    // No stacking of instances is allowed for a single executorService name
617    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
618    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
619    if (coprocessorServiceHandlers.containsKey(serviceName)) {
620      LOG.error("Coprocessor executorService " + serviceName
621        + " already registered, rejecting request from " + instance);
622      return false;
623    }
624
625    coprocessorServiceHandlers.put(serviceName, instance);
626    if (LOG.isDebugEnabled()) {
627      LOG.debug(
628        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
629    }
630    return true;
631  }
632
633  /**
634   * Run test on configured codecs to make sure supporting libs are in place.
635   */
636  private static void checkCodecs(final Configuration c) throws IOException {
637    // check to see if the codec list is available:
638    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
639    if (codecs == null) {
640      return;
641    }
642    for (String codec : codecs) {
643      if (!CompressionTest.testCompression(codec)) {
644        throw new IOException(
645          "Compression codec " + codec + " not supported, aborting RS construction");
646      }
647    }
648  }
649
650  public String getClusterId() {
651    return this.clusterId;
652  }
653
654  /**
655   * All initialization needed before we go register with Master.<br>
656   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
657   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
658   */
659  private void preRegistrationInitialization() {
660    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
661    try (Scope ignored = span.makeCurrent()) {
662      initializeZooKeeper();
663      setupClusterConnection();
664      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
665      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
666      // Setup RPC client for master communication
667      this.rpcClient = asyncClusterConnection.getRpcClient();
668      span.setStatus(StatusCode.OK);
669    } catch (Throwable t) {
670      // Call stop if error or process will stick around for ever since server
671      // puts up non-daemon threads.
672      TraceUtil.setError(span, t);
673      this.rpcServices.stop();
674      abort("Initialization of RS failed.  Hence aborting RS.", t);
675    } finally {
676      span.end();
677    }
678  }
679
680  /**
681   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
682   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
683   * <p>
684   * Finally open long-living server short-circuit connection.
685   */
686  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
687      justification = "cluster Id znode read would give us correct response")
688  private void initializeZooKeeper() throws IOException, InterruptedException {
689    // Nothing to do in here if no Master in the mix.
690    if (this.masterless) {
691      return;
692    }
693
694    // Create the master address tracker, register with zk, and start it. Then
695    // block until a master is available. No point in starting up if no master
696    // running.
697    blockAndCheckIfStopped(this.masterAddressTracker);
698
699    // Wait on cluster being up. Master will set this flag up in zookeeper
700    // when ready.
701    blockAndCheckIfStopped(this.clusterStatusTracker);
702
703    // If we are HMaster then the cluster id should have already been set.
704    if (clusterId == null) {
705      // Retrieve clusterId
706      // Since cluster status is now up
707      // ID should have already been set by HMaster
708      try {
709        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
710        if (clusterId == null) {
711          this.abort("Cluster ID has not been set");
712        }
713        LOG.info("ClusterId : " + clusterId);
714      } catch (KeeperException e) {
715        this.abort("Failed to retrieve Cluster ID", e);
716      }
717    }
718
719    if (isStopped() || isAborted()) {
720      return; // No need for further initialization
721    }
722
723    // watch for snapshots and other procedures
724    try {
725      rspmHost = new RegionServerProcedureManagerHost();
726      rspmHost.loadProcedures(conf);
727      rspmHost.initialize(this);
728    } catch (KeeperException e) {
729      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
730    }
731  }
732
733  /**
734   * Utilty method to wait indefinitely on a znode availability while checking if the region server
735   * is shut down
736   * @param tracker znode tracker to use
737   * @throws IOException          any IO exception, plus if the RS is stopped
738   * @throws InterruptedException if the waiting thread is interrupted
739   */
740  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
741    throws IOException, InterruptedException {
742    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
743      if (this.stopped) {
744        throw new IOException("Received the shutdown message while waiting.");
745      }
746    }
747  }
748
749  /**
750   * @return True if the cluster is up.
751   */
752  @Override
753  public boolean isClusterUp() {
754    return this.masterless
755      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
756  }
757
758  /**
759   * The HRegionServer sticks in this loop until closed.
760   */
761  @Override
762  public void run() {
763    if (isStopped()) {
764      LOG.info("Skipping run; stopped");
765      return;
766    }
767    try {
768      // Do pre-registration initializations; zookeeper, lease threads, etc.
769      preRegistrationInitialization();
770    } catch (Throwable e) {
771      abort("Fatal exception during initialization", e);
772    }
773
774    try {
775      if (!isStopped() && !isAborted()) {
776        installShutdownHook();
777        // Initialize the RegionServerCoprocessorHost now that our ephemeral
778        // node was created, in case any coprocessors want to use ZooKeeper
779        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
780
781        // Try and register with the Master; tell it we are here. Break if server is stopped or
782        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
783        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
784        // come up.
785        LOG.debug("About to register with Master.");
786        TraceUtil.trace(() -> {
787          RetryCounterFactory rcf =
788            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
789          RetryCounter rc = rcf.create();
790          while (keepLooping()) {
791            RegionServerStartupResponse w = reportForDuty();
792            if (w == null) {
793              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
794              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
795              this.sleeper.sleep(sleepTime);
796            } else {
797              handleReportForDutyResponse(w);
798              break;
799            }
800          }
801        }, "HRegionServer.registerWithMaster");
802      }
803
804      if (!isStopped() && isHealthy()) {
805        TraceUtil.trace(() -> {
806          // start the snapshot handler and other procedure handlers,
807          // since the server is ready to run
808          if (this.rspmHost != null) {
809            this.rspmHost.start();
810          }
811          // Start the Quota Manager
812          if (this.rsQuotaManager != null) {
813            rsQuotaManager.start(getRpcServer().getScheduler());
814          }
815          if (this.rsSpaceQuotaManager != null) {
816            this.rsSpaceQuotaManager.start();
817          }
818        }, "HRegionServer.startup");
819      }
820
821      // We registered with the Master. Go into run mode.
822      long lastMsg = EnvironmentEdgeManager.currentTime();
823      long oldRequestCount = -1;
824      // The main run loop.
825      while (!isStopped() && isHealthy()) {
826        if (!isClusterUp()) {
827          if (onlineRegions.isEmpty()) {
828            stop("Exiting; cluster shutdown set and not carrying any regions");
829          } else if (!this.stopping) {
830            this.stopping = true;
831            LOG.info("Closing user regions");
832            closeUserRegions(isAborted());
833          } else {
834            boolean allUserRegionsOffline = areAllUserRegionsOffline();
835            if (allUserRegionsOffline) {
836              // Set stopped if no more write requests tp meta tables
837              // since last time we went around the loop. Any open
838              // meta regions will be closed on our way out.
839              if (oldRequestCount == getWriteRequestCount()) {
840                stop("Stopped; only catalog regions remaining online");
841                break;
842              }
843              oldRequestCount = getWriteRequestCount();
844            } else {
845              // Make sure all regions have been closed -- some regions may
846              // have not got it because we were splitting at the time of
847              // the call to closeUserRegions.
848              closeUserRegions(this.abortRequested.get());
849            }
850            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
851          }
852        }
853        long now = EnvironmentEdgeManager.currentTime();
854        if ((now - lastMsg) >= msgInterval) {
855          tryRegionServerReport(lastMsg, now);
856          lastMsg = EnvironmentEdgeManager.currentTime();
857        }
858        if (!isStopped() && !isAborted()) {
859          this.sleeper.sleep();
860        }
861      } // for
862    } catch (Throwable t) {
863      if (!rpcServices.checkOOME(t)) {
864        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
865        abort(prefix + t.getMessage(), t);
866      }
867    }
868
869    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
870    try (Scope ignored = span.makeCurrent()) {
871      if (this.leaseManager != null) {
872        this.leaseManager.closeAfterLeasesExpire();
873      }
874      if (this.splitLogWorker != null) {
875        splitLogWorker.stop();
876      }
877      stopInfoServer();
878      // Send cache a shutdown.
879      if (blockCache != null) {
880        blockCache.shutdown();
881      }
882      if (mobFileCache != null) {
883        mobFileCache.shutdown();
884      }
885
886      // Send interrupts to wake up threads if sleeping so they notice shutdown.
887      // TODO: Should we check they are alive? If OOME could have exited already
888      if (this.hMemManager != null) {
889        this.hMemManager.stop();
890      }
891      if (this.cacheFlusher != null) {
892        this.cacheFlusher.interruptIfNecessary();
893      }
894      if (this.compactSplitThread != null) {
895        this.compactSplitThread.interruptIfNecessary();
896      }
897
898      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
899      if (rspmHost != null) {
900        rspmHost.stop(this.abortRequested.get() || this.killed);
901      }
902
903      if (this.killed) {
904        // Just skip out w/o closing regions. Used when testing.
905      } else if (abortRequested.get()) {
906        if (this.dataFsOk) {
907          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
908        }
909        LOG.info("aborting server " + this.serverName);
910      } else {
911        closeUserRegions(abortRequested.get());
912        LOG.info("stopping server " + this.serverName);
913      }
914      regionReplicationBufferManager.stop();
915      closeClusterConnection();
916      // Closing the compactSplit thread before closing meta regions
917      if (!this.killed && containsMetaTableRegions()) {
918        if (!abortRequested.get() || this.dataFsOk) {
919          if (this.compactSplitThread != null) {
920            this.compactSplitThread.join();
921            this.compactSplitThread = null;
922          }
923          closeMetaTableRegions(abortRequested.get());
924        }
925      }
926
927      if (!this.killed && this.dataFsOk) {
928        waitOnAllRegionsToClose(abortRequested.get());
929        LOG.info("stopping server " + this.serverName + "; all regions closed.");
930      }
931
932      // Stop the quota manager
933      if (rsQuotaManager != null) {
934        rsQuotaManager.stop();
935      }
936      if (rsSpaceQuotaManager != null) {
937        rsSpaceQuotaManager.stop();
938        rsSpaceQuotaManager = null;
939      }
940
941      // flag may be changed when closing regions throws exception.
942      if (this.dataFsOk) {
943        shutdownWAL(!abortRequested.get());
944      }
945
946      // Make sure the proxy is down.
947      if (this.rssStub != null) {
948        this.rssStub = null;
949      }
950      if (this.lockStub != null) {
951        this.lockStub = null;
952      }
953      if (this.rpcClient != null) {
954        this.rpcClient.close();
955      }
956      if (this.leaseManager != null) {
957        this.leaseManager.close();
958      }
959      if (this.pauseMonitor != null) {
960        this.pauseMonitor.stop();
961      }
962
963      if (!killed) {
964        stopServiceThreads();
965      }
966
967      if (this.rpcServices != null) {
968        this.rpcServices.stop();
969      }
970
971      try {
972        deleteMyEphemeralNode();
973      } catch (KeeperException.NoNodeException nn) {
974        // pass
975      } catch (KeeperException e) {
976        LOG.warn("Failed deleting my ephemeral node", e);
977      }
978      // We may have failed to delete the znode at the previous step, but
979      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
980      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
981
982      closeZooKeeper();
983      closeTableDescriptors();
984      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
985      span.setStatus(StatusCode.OK);
986    } finally {
987      span.end();
988    }
989  }
990
991  private boolean containsMetaTableRegions() {
992    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
993  }
994
995  private boolean areAllUserRegionsOffline() {
996    if (getNumberOfOnlineRegions() > 2) {
997      return false;
998    }
999    boolean allUserRegionsOffline = true;
1000    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1001      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1002        allUserRegionsOffline = false;
1003        break;
1004      }
1005    }
1006    return allUserRegionsOffline;
1007  }
1008
1009  /**
1010   * @return Current write count for all online regions.
1011   */
1012  private long getWriteRequestCount() {
1013    long writeCount = 0;
1014    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1015      writeCount += e.getValue().getWriteRequestsCount();
1016    }
1017    return writeCount;
1018  }
1019
1020  @InterfaceAudience.Private
1021  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1022    throws IOException {
1023    RegionServerStatusService.BlockingInterface rss = rssStub;
1024    if (rss == null) {
1025      // the current server could be stopping.
1026      return;
1027    }
1028    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1029    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1030    try (Scope ignored = span.makeCurrent()) {
1031      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1032      request.setServer(ProtobufUtil.toServerName(this.serverName));
1033      request.setLoad(sl);
1034      rss.regionServerReport(null, request.build());
1035      span.setStatus(StatusCode.OK);
1036    } catch (ServiceException se) {
1037      IOException ioe = ProtobufUtil.getRemoteException(se);
1038      if (ioe instanceof YouAreDeadException) {
1039        // This will be caught and handled as a fatal error in run()
1040        TraceUtil.setError(span, ioe);
1041        throw ioe;
1042      }
1043      if (rssStub == rss) {
1044        rssStub = null;
1045      }
1046      TraceUtil.setError(span, se);
1047      // Couldn't connect to the master, get location from zk and reconnect
1048      // Method blocks until new master is found or we are stopped
1049      createRegionServerStatusStub(true);
1050    } finally {
1051      span.end();
1052    }
1053  }
1054
1055  /**
1056   * Reports the given map of Regions and their size on the filesystem to the active Master.
1057   * @param regionSizeStore The store containing region sizes
1058   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1059   */
1060  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1061    RegionServerStatusService.BlockingInterface rss = rssStub;
1062    if (rss == null) {
1063      // the current server could be stopping.
1064      LOG.trace("Skipping Region size report to HMaster as stub is null");
1065      return true;
1066    }
1067    try {
1068      buildReportAndSend(rss, regionSizeStore);
1069    } catch (ServiceException se) {
1070      IOException ioe = ProtobufUtil.getRemoteException(se);
1071      if (ioe instanceof PleaseHoldException) {
1072        LOG.trace("Failed to report region sizes to Master because it is initializing."
1073          + " This will be retried.", ioe);
1074        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1075        return true;
1076      }
1077      if (rssStub == rss) {
1078        rssStub = null;
1079      }
1080      createRegionServerStatusStub(true);
1081      if (ioe instanceof DoNotRetryIOException) {
1082        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1083        if (doNotRetryEx.getCause() != null) {
1084          Throwable t = doNotRetryEx.getCause();
1085          if (t instanceof UnsupportedOperationException) {
1086            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1087            return false;
1088          }
1089        }
1090      }
1091      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1092    }
1093    return true;
1094  }
1095
1096  /**
1097   * Builds the region size report and sends it to the master. Upon successful sending of the
1098   * report, the region sizes that were sent are marked as sent.
1099   * @param rss             The stub to send to the Master
1100   * @param regionSizeStore The store containing region sizes
1101   */
1102  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1103    RegionSizeStore regionSizeStore) throws ServiceException {
1104    RegionSpaceUseReportRequest request =
1105      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1106    rss.reportRegionSpaceUse(null, request);
1107    // Record the number of size reports sent
1108    if (metricsRegionServer != null) {
1109      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1110    }
1111  }
1112
1113  /**
1114   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1115   * @param regionSizes The size in bytes of regions
1116   * @return The corresponding protocol buffer message.
1117   */
1118  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1119    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1120    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1121      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1122    }
1123    return request.build();
1124  }
1125
1126  /**
1127   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1128   * message.
1129   * @param regionInfo  The RegionInfo
1130   * @param sizeInBytes The size in bytes of the Region
1131   * @return The protocol buffer
1132   */
1133  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1134    return RegionSpaceUse.newBuilder()
1135      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1136      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1137  }
1138
1139  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1140    throws IOException {
1141    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1142    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1143    // the wrapper to compute those numbers in one place.
1144    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1145    // Instead they should be stored in an HBase table so that external visibility into HBase is
1146    // improved; Additionally the load balancer will be able to take advantage of a more complete
1147    // history.
1148    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1149    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1150    long usedMemory = -1L;
1151    long maxMemory = -1L;
1152    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1153    if (usage != null) {
1154      usedMemory = usage.getUsed();
1155      maxMemory = usage.getMax();
1156    }
1157
1158    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1159    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1160    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1161    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1162    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1163    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1164    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1165    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1166    Builder coprocessorBuilder = Coprocessor.newBuilder();
1167    for (String coprocessor : coprocessors) {
1168      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1169    }
1170    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1171    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1172    for (HRegion region : regions) {
1173      if (region.getCoprocessorHost() != null) {
1174        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1175        for (String regionCoprocessor : regionCoprocessors) {
1176          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1177        }
1178      }
1179      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1180      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1181        .getCoprocessors()) {
1182        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1183      }
1184    }
1185    serverLoad.setReportStartTime(reportStartTime);
1186    serverLoad.setReportEndTime(reportEndTime);
1187    if (this.infoServer != null) {
1188      serverLoad.setInfoServerPort(this.infoServer.getPort());
1189    } else {
1190      serverLoad.setInfoServerPort(-1);
1191    }
1192    MetricsUserAggregateSource userSource =
1193      metricsRegionServer.getMetricsUserAggregate().getSource();
1194    if (userSource != null) {
1195      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1196      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1197        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1198      }
1199    }
1200
1201    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1202      // always refresh first to get the latest value
1203      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1204      if (rLoad != null) {
1205        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1206        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1207          .getReplicationLoadSourceEntries()) {
1208          serverLoad.addReplLoadSource(rLS);
1209        }
1210      }
1211    } else {
1212      if (replicationSourceHandler != null) {
1213        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1214        if (rLoad != null) {
1215          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1216            .getReplicationLoadSourceEntries()) {
1217            serverLoad.addReplLoadSource(rLS);
1218          }
1219        }
1220      }
1221      if (replicationSinkHandler != null) {
1222        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1223        if (rLoad != null) {
1224          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1225        }
1226      }
1227    }
1228
1229    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1230      .newBuilder().setDescription(task.getDescription())
1231      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1232      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1233      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1234
1235    return serverLoad.build();
1236  }
1237
1238  private String getOnlineRegionsAsPrintableString() {
1239    StringBuilder sb = new StringBuilder();
1240    for (Region r : this.onlineRegions.values()) {
1241      if (sb.length() > 0) {
1242        sb.append(", ");
1243      }
1244      sb.append(r.getRegionInfo().getEncodedName());
1245    }
1246    return sb.toString();
1247  }
1248
1249  /**
1250   * Wait on regions close.
1251   */
1252  private void waitOnAllRegionsToClose(final boolean abort) {
1253    // Wait till all regions are closed before going out.
1254    int lastCount = -1;
1255    long previousLogTime = 0;
1256    Set<String> closedRegions = new HashSet<>();
1257    boolean interrupted = false;
1258    try {
1259      while (!onlineRegions.isEmpty()) {
1260        int count = getNumberOfOnlineRegions();
1261        // Only print a message if the count of regions has changed.
1262        if (count != lastCount) {
1263          // Log every second at most
1264          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1265            previousLogTime = EnvironmentEdgeManager.currentTime();
1266            lastCount = count;
1267            LOG.info("Waiting on " + count + " regions to close");
1268            // Only print out regions still closing if a small number else will
1269            // swamp the log.
1270            if (count < 10 && LOG.isDebugEnabled()) {
1271              LOG.debug("Online Regions=" + this.onlineRegions);
1272            }
1273          }
1274        }
1275        // Ensure all user regions have been sent a close. Use this to
1276        // protect against the case where an open comes in after we start the
1277        // iterator of onlineRegions to close all user regions.
1278        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1279          RegionInfo hri = e.getValue().getRegionInfo();
1280          if (
1281            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1282              && !closedRegions.contains(hri.getEncodedName())
1283          ) {
1284            closedRegions.add(hri.getEncodedName());
1285            // Don't update zk with this close transition; pass false.
1286            closeRegionIgnoreErrors(hri, abort);
1287          }
1288        }
1289        // No regions in RIT, we could stop waiting now.
1290        if (this.regionsInTransitionInRS.isEmpty()) {
1291          if (!onlineRegions.isEmpty()) {
1292            LOG.info("We were exiting though online regions are not empty,"
1293              + " because some regions failed closing");
1294          }
1295          break;
1296        } else {
1297          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1298            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1299        }
1300        if (sleepInterrupted(200)) {
1301          interrupted = true;
1302        }
1303      }
1304    } finally {
1305      if (interrupted) {
1306        Thread.currentThread().interrupt();
1307      }
1308    }
1309  }
1310
1311  private static boolean sleepInterrupted(long millis) {
1312    boolean interrupted = false;
1313    try {
1314      Thread.sleep(millis);
1315    } catch (InterruptedException e) {
1316      LOG.warn("Interrupted while sleeping");
1317      interrupted = true;
1318    }
1319    return interrupted;
1320  }
1321
1322  private void shutdownWAL(final boolean close) {
1323    if (this.walFactory != null) {
1324      try {
1325        if (close) {
1326          walFactory.close();
1327        } else {
1328          walFactory.shutdown();
1329        }
1330      } catch (Throwable e) {
1331        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1332        LOG.error("Shutdown / close of WAL failed: " + e);
1333        LOG.debug("Shutdown / close exception details:", e);
1334      }
1335    }
1336  }
1337
1338  /**
1339   * Run init. Sets up wal and starts up all server threads.
1340   * @param c Extra configuration.
1341   */
1342  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1343    throws IOException {
1344    try {
1345      boolean updateRootDir = false;
1346      for (NameStringPair e : c.getMapEntriesList()) {
1347        String key = e.getName();
1348        // The hostname the master sees us as.
1349        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1350          String hostnameFromMasterPOV = e.getValue();
1351          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1352            rpcServices.getSocketAddress().getPort(), this.startcode);
1353          if (
1354            !StringUtils.isBlank(useThisHostnameInstead)
1355              && !hostnameFromMasterPOV.equals(useThisHostnameInstead)
1356          ) {
1357            String msg = "Master passed us a different hostname to use; was="
1358              + this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1359            LOG.error(msg);
1360            throw new IOException(msg);
1361          }
1362          if (
1363            StringUtils.isBlank(useThisHostnameInstead)
1364              && !hostnameFromMasterPOV.equals(rpcServices.getSocketAddress().getHostName())
1365          ) {
1366            String msg = "Master passed us a different hostname to use; was="
1367              + rpcServices.getSocketAddress().getHostName() + ", but now=" + hostnameFromMasterPOV;
1368            LOG.error(msg);
1369          }
1370          continue;
1371        }
1372
1373        String value = e.getValue();
1374        if (key.equals(HConstants.HBASE_DIR)) {
1375          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1376            updateRootDir = true;
1377          }
1378        }
1379
1380        if (LOG.isDebugEnabled()) {
1381          LOG.debug("Config from master: " + key + "=" + value);
1382        }
1383        this.conf.set(key, value);
1384      }
1385      // Set our ephemeral znode up in zookeeper now we have a name.
1386      createMyEphemeralNode();
1387
1388      if (updateRootDir) {
1389        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1390        initializeFileSystem();
1391      }
1392
1393      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1394      // config param for task trackers, but we can piggyback off of it.
1395      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1396        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1397      }
1398
1399      // Save it in a file, this will allow to see if we crash
1400      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1401
1402      // This call sets up an initialized replication and WAL. Later we start it up.
1403      setupWALAndReplication();
1404      // Init in here rather than in constructor after thread name has been set
1405      final MetricsTable metricsTable =
1406        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1407      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1408      this.metricsRegionServer =
1409        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1410      // Now that we have a metrics source, start the pause monitor
1411      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1412      pauseMonitor.start();
1413
1414      // There is a rare case where we do NOT want services to start. Check config.
1415      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1416        startServices();
1417      }
1418      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1419      // or make sense of it.
1420      startReplicationService();
1421
1422      // Set up ZK
1423      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress()
1424        + ", sessionid=0x"
1425        + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1426
1427      // Wake up anyone waiting for this server to online
1428      synchronized (online) {
1429        online.set(true);
1430        online.notifyAll();
1431      }
1432    } catch (Throwable e) {
1433      stop("Failed initialization");
1434      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1435    } finally {
1436      sleeper.skipSleepCycle();
1437    }
1438  }
1439
1440  private void startHeapMemoryManager() {
1441    if (this.blockCache != null) {
1442      this.hMemManager =
1443        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1444      this.hMemManager.start(getChoreService());
1445    }
1446  }
1447
1448  private void createMyEphemeralNode() throws KeeperException {
1449    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1450    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1451    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1452    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1453    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1454  }
1455
1456  private void deleteMyEphemeralNode() throws KeeperException {
1457    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1458  }
1459
1460  @Override
1461  public RegionServerAccounting getRegionServerAccounting() {
1462    return regionServerAccounting;
1463  }
1464
1465  // Round the size with KB or MB.
1466  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1467  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1468  // HBASE-26340 for more details on why this is important.
1469  private static int roundSize(long sizeInByte, int sizeUnit) {
1470    if (sizeInByte == 0) {
1471      return 0;
1472    } else if (sizeInByte < sizeUnit) {
1473      return 1;
1474    } else {
1475      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1476    }
1477  }
1478
1479  /**
1480   * @param r               Region to get RegionLoad for.
1481   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1482   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1483   * @return RegionLoad instance.
1484   */
1485  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1486    RegionSpecifier.Builder regionSpecifier) throws IOException {
1487    byte[] name = r.getRegionInfo().getRegionName();
1488    int stores = 0;
1489    int storefiles = 0;
1490    int storeRefCount = 0;
1491    int maxCompactedStoreFileRefCount = 0;
1492    long storeUncompressedSize = 0L;
1493    long storefileSize = 0L;
1494    long storefileIndexSize = 0L;
1495    long rootLevelIndexSize = 0L;
1496    long totalStaticIndexSize = 0L;
1497    long totalStaticBloomSize = 0L;
1498    long totalCompactingKVs = 0L;
1499    long currentCompactedKVs = 0L;
1500    List<HStore> storeList = r.getStores();
1501    stores += storeList.size();
1502    for (HStore store : storeList) {
1503      storefiles += store.getStorefilesCount();
1504      int currentStoreRefCount = store.getStoreRefCount();
1505      storeRefCount += currentStoreRefCount;
1506      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1507      maxCompactedStoreFileRefCount =
1508        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1509      storeUncompressedSize += store.getStoreSizeUncompressed();
1510      storefileSize += store.getStorefilesSize();
1511      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1512      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1513      CompactionProgress progress = store.getCompactionProgress();
1514      if (progress != null) {
1515        totalCompactingKVs += progress.getTotalCompactingKVs();
1516        currentCompactedKVs += progress.currentCompactedKVs;
1517      }
1518      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1519      totalStaticIndexSize += store.getTotalStaticIndexSize();
1520      totalStaticBloomSize += store.getTotalStaticBloomSize();
1521    }
1522
1523    int unitMB = 1024 * 1024;
1524    int unitKB = 1024;
1525
1526    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1527    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1528    int storefileSizeMB = roundSize(storefileSize, unitMB);
1529    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1530    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1531    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1532    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1533
1534    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1535    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1536    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1537    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1538    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1539    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1540    if (regionLoadBldr == null) {
1541      regionLoadBldr = RegionLoad.newBuilder();
1542    }
1543    if (regionSpecifier == null) {
1544      regionSpecifier = RegionSpecifier.newBuilder();
1545    }
1546
1547    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1548    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1549    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1550      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1551      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1552      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1553      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1554      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1555      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1556      .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount())
1557      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1558      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1559      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1560      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1561      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1562      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1563      .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1564    r.setCompleteSequenceId(regionLoadBldr);
1565    return regionLoadBldr.build();
1566  }
1567
1568  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1569    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1570    userLoadBldr.setUserName(user);
1571    userSource.getClientMetrics().values().stream()
1572      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1573        .setHostName(clientMetrics.getHostName())
1574        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1575        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1576        .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1577      .forEach(userLoadBldr::addClientMetrics);
1578    return userLoadBldr.build();
1579  }
1580
1581  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1582    HRegion r = onlineRegions.get(encodedRegionName);
1583    return r != null ? createRegionLoad(r, null, null) : null;
1584  }
1585
1586  /**
1587   * Inner class that runs on a long period checking if regions need compaction.
1588   */
1589  private static class CompactionChecker extends ScheduledChore {
1590    private final HRegionServer instance;
1591    private final int majorCompactPriority;
1592    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1593    // Iteration is 1-based rather than 0-based so we don't check for compaction
1594    // immediately upon region server startup
1595    private long iteration = 1;
1596
1597    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1598      super("CompactionChecker", stopper, sleepTime);
1599      this.instance = h;
1600      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1601
1602      /*
1603       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1604       */
1605      this.majorCompactPriority = this.instance.conf
1606        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1607    }
1608
1609    @Override
1610    protected void chore() {
1611      for (Region r : this.instance.onlineRegions.values()) {
1612        // Skip compaction if region is read only
1613        if (r == null || r.isReadOnly()) {
1614          continue;
1615        }
1616
1617        HRegion hr = (HRegion) r;
1618        for (HStore s : hr.stores.values()) {
1619          try {
1620            long multiplier = s.getCompactionCheckMultiplier();
1621            assert multiplier > 0;
1622            if (iteration % multiplier != 0) {
1623              continue;
1624            }
1625            if (s.needsCompaction()) {
1626              // Queue a compaction. Will recognize if major is needed.
1627              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1628                getName() + " requests compaction");
1629            } else if (s.shouldPerformMajorCompaction()) {
1630              s.triggerMajorCompaction();
1631              if (
1632                majorCompactPriority == DEFAULT_PRIORITY
1633                  || majorCompactPriority > hr.getCompactPriority()
1634              ) {
1635                this.instance.compactSplitThread.requestCompaction(hr, s,
1636                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
1637                  CompactionLifeCycleTracker.DUMMY, null);
1638              } else {
1639                this.instance.compactSplitThread.requestCompaction(hr, s,
1640                  getName() + " requests major compaction; use configured priority",
1641                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1642              }
1643            }
1644          } catch (IOException e) {
1645            LOG.warn("Failed major compaction check on " + r, e);
1646          }
1647        }
1648      }
1649      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1650    }
1651  }
1652
1653  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1654    private final HRegionServer server;
1655    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1656    private final static int MIN_DELAY_TIME = 0; // millisec
1657    private final long rangeOfDelayMs;
1658
1659    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1660      super("MemstoreFlusherChore", server, cacheFlushInterval);
1661      this.server = server;
1662
1663      final long configuredRangeOfDelay = server.getConfiguration()
1664        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1665      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1666    }
1667
1668    @Override
1669    protected void chore() {
1670      final StringBuilder whyFlush = new StringBuilder();
1671      for (HRegion r : this.server.onlineRegions.values()) {
1672        if (r == null) {
1673          continue;
1674        }
1675        if (r.shouldFlush(whyFlush)) {
1676          FlushRequester requester = server.getFlushRequester();
1677          if (requester != null) {
1678            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1679            // Throttle the flushes by putting a delay. If we don't throttle, and there
1680            // is a balanced write-load on the regions in a table, we might end up
1681            // overwhelming the filesystem with too many flushes at once.
1682            if (requester.requestDelayedFlush(r, delay)) {
1683              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
1684                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
1685            }
1686          }
1687        }
1688      }
1689    }
1690  }
1691
1692  /**
1693   * Report the status of the server. A server is online once all the startup is completed (setting
1694   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
1695   * useful in tests.
1696   * @return true if online, false if not.
1697   */
1698  public boolean isOnline() {
1699    return online.get();
1700  }
1701
1702  /**
1703   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1704   * be hooked up to WAL.
1705   */
1706  private void setupWALAndReplication() throws IOException {
1707    WALFactory factory = new WALFactory(conf, serverName.toString(), this, true);
1708    // TODO Replication make assumptions here based on the default filesystem impl
1709    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1710    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1711
1712    Path logDir = new Path(walRootDir, logName);
1713    LOG.debug("logDir={}", logDir);
1714    if (this.walFs.exists(logDir)) {
1715      throw new RegionServerRunningException(
1716        "Region server has already created directory at " + this.serverName.toString());
1717    }
1718    // Always create wal directory as now we need this when master restarts to find out the live
1719    // region servers.
1720    if (!this.walFs.mkdirs(logDir)) {
1721      throw new IOException("Can not create wal directory " + logDir);
1722    }
1723    // Instantiate replication if replication enabled. Pass it the log directories.
1724    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1725    this.walFactory = factory;
1726  }
1727
1728  /**
1729   * Start up replication source and sink handlers.
1730   */
1731  private void startReplicationService() throws IOException {
1732    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1733      this.replicationSourceHandler.startReplicationService();
1734    } else {
1735      if (this.replicationSourceHandler != null) {
1736        this.replicationSourceHandler.startReplicationService();
1737      }
1738      if (this.replicationSinkHandler != null) {
1739        this.replicationSinkHandler.startReplicationService();
1740      }
1741    }
1742  }
1743
1744  /**
1745   * @return Master address tracker instance.
1746   */
1747  public MasterAddressTracker getMasterAddressTracker() {
1748    return this.masterAddressTracker;
1749  }
1750
1751  /**
1752   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
1753   * to run. This is called after we've successfully registered with the Master. Install an
1754   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
1755   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
1756   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
1757   * to run should trigger same critical condition and the shutdown will run. On its way out, this
1758   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
1759   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
1760   * by this hosting server. Worker logs the exception and exits.
1761   */
1762  private void startServices() throws IOException {
1763    if (!isStopped() && !isAborted()) {
1764      initializeThreads();
1765    }
1766    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1767    this.secureBulkLoadManager.start();
1768
1769    // Health checker thread.
1770    if (isHealthCheckerConfigured()) {
1771      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1772        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1773      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1774    }
1775    // Executor status collect thread.
1776    if (
1777      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1778        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
1779    ) {
1780      int sleepTime =
1781        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1782      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1783        this.metricsRegionServer.getMetricsSource());
1784    }
1785
1786    this.walRoller = new LogRoller(this);
1787    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1788    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1789
1790    // Create the CompactedFileDischarger chore executorService. This chore helps to
1791    // remove the compacted files that will no longer be used in reads.
1792    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1793    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1794    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1795    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
1796    choreService.scheduleChore(compactedFileDischarger);
1797
1798    // Start executor services
1799    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1800    executorService.startExecutorService(executorService.new ExecutorConfig()
1801      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1802    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1803    executorService.startExecutorService(executorService.new ExecutorConfig()
1804      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1805    final int openPriorityRegionThreads =
1806      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1807    executorService.startExecutorService(
1808      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
1809        .setCorePoolSize(openPriorityRegionThreads));
1810    final int closeRegionThreads =
1811      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1812    executorService.startExecutorService(executorService.new ExecutorConfig()
1813      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1814    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1815    executorService.startExecutorService(executorService.new ExecutorConfig()
1816      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1817    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1818      final int storeScannerParallelSeekThreads =
1819        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1820      executorService.startExecutorService(
1821        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
1822          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
1823    }
1824    final int logReplayOpsThreads =
1825      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1826    executorService.startExecutorService(
1827      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
1828        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
1829    // Start the threads for compacted files discharger
1830    final int compactionDischargerThreads =
1831      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1832    executorService.startExecutorService(executorService.new ExecutorConfig()
1833      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
1834      .setCorePoolSize(compactionDischargerThreads));
1835    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1836      final int regionReplicaFlushThreads =
1837        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1838          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1839      executorService.startExecutorService(executorService.new ExecutorConfig()
1840        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
1841        .setCorePoolSize(regionReplicaFlushThreads));
1842    }
1843    final int refreshPeerThreads =
1844      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1845    executorService.startExecutorService(executorService.new ExecutorConfig()
1846      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1847    final int replaySyncReplicationWALThreads =
1848      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1849    executorService.startExecutorService(executorService.new ExecutorConfig()
1850      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
1851      .setCorePoolSize(replaySyncReplicationWALThreads));
1852    final int switchRpcThrottleThreads =
1853      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1854    executorService.startExecutorService(
1855      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
1856        .setCorePoolSize(switchRpcThrottleThreads));
1857    final int claimReplicationQueueThreads =
1858      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1859    executorService.startExecutorService(
1860      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
1861        .setCorePoolSize(claimReplicationQueueThreads));
1862    final int rsSnapshotOperationThreads =
1863      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1864    executorService.startExecutorService(
1865      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
1866        .setCorePoolSize(rsSnapshotOperationThreads));
1867
1868    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1869      uncaughtExceptionHandler);
1870    if (this.cacheFlusher != null) {
1871      this.cacheFlusher.start(uncaughtExceptionHandler);
1872    }
1873    Threads.setDaemonThreadRunning(this.procedureResultReporter,
1874      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1875
1876    if (this.compactionChecker != null) {
1877      choreService.scheduleChore(compactionChecker);
1878    }
1879    if (this.periodicFlusher != null) {
1880      choreService.scheduleChore(periodicFlusher);
1881    }
1882    if (this.healthCheckChore != null) {
1883      choreService.scheduleChore(healthCheckChore);
1884    }
1885    if (this.executorStatusChore != null) {
1886      choreService.scheduleChore(executorStatusChore);
1887    }
1888    if (this.nonceManagerChore != null) {
1889      choreService.scheduleChore(nonceManagerChore);
1890    }
1891    if (this.storefileRefresher != null) {
1892      choreService.scheduleChore(storefileRefresher);
1893    }
1894    if (this.fsUtilizationChore != null) {
1895      choreService.scheduleChore(fsUtilizationChore);
1896    }
1897    if (this.slowLogTableOpsChore != null) {
1898      choreService.scheduleChore(slowLogTableOpsChore);
1899    }
1900    if (this.brokenStoreFileCleaner != null) {
1901      choreService.scheduleChore(brokenStoreFileCleaner);
1902    }
1903
1904    if (this.rsMobFileCleanerChore != null) {
1905      choreService.scheduleChore(rsMobFileCleanerChore);
1906    }
1907
1908    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1909    // an unhandled exception, it will just exit.
1910    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
1911      uncaughtExceptionHandler);
1912
1913    // Create the log splitting worker and start it
1914    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1915    // quite a while inside Connection layer. The worker won't be available for other
1916    // tasks even after current task is preempted after a split task times out.
1917    Configuration sinkConf = HBaseConfiguration.create(conf);
1918    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1919      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1920    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1921      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1922    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
1923    if (
1924      this.csm != null
1925        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
1926    ) {
1927      // SplitLogWorker needs csm. If none, don't start this.
1928      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
1929      splitLogWorker.start();
1930      LOG.debug("SplitLogWorker started");
1931    }
1932
1933    // Memstore services.
1934    startHeapMemoryManager();
1935    // Call it after starting HeapMemoryManager.
1936    initializeMemStoreChunkCreator(hMemManager);
1937  }
1938
1939  private void initializeThreads() {
1940    // Cache flushing thread.
1941    this.cacheFlusher = new MemStoreFlusher(conf, this);
1942
1943    // Compaction thread
1944    this.compactSplitThread = new CompactSplit(this);
1945
1946    // Background thread to check for compactions; needed if region has not gotten updates
1947    // in a while. It will take care of not checking too frequently on store-by-store basis.
1948    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
1949    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
1950    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
1951
1952    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
1953      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
1954    if (isSlowLogTableEnabled) {
1955      // default chore duration: 10 min
1956      final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
1957      slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
1958    }
1959
1960    if (this.nonceManager != null) {
1961      // Create the scheduled chore that cleans up nonces.
1962      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
1963    }
1964
1965    // Setup the Quota Manager
1966    rsQuotaManager = new RegionServerRpcQuotaManager(this);
1967    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
1968
1969    if (QuotaUtil.isQuotaEnabled(conf)) {
1970      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
1971    }
1972
1973    boolean onlyMetaRefresh = false;
1974    int storefileRefreshPeriod =
1975      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
1976        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
1977    if (storefileRefreshPeriod == 0) {
1978      storefileRefreshPeriod =
1979        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
1980          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
1981      onlyMetaRefresh = true;
1982    }
1983    if (storefileRefreshPeriod > 0) {
1984      this.storefileRefresher =
1985        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
1986    }
1987
1988    int brokenStoreFileCleanerPeriod =
1989      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
1990        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
1991    int brokenStoreFileCleanerDelay =
1992      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
1993        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
1994    double brokenStoreFileCleanerDelayJitter =
1995      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
1996        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
1997    double jitterRate =
1998      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
1999    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2000    this.brokenStoreFileCleaner =
2001      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2002        brokenStoreFileCleanerPeriod, this, conf, this);
2003
2004    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
2005
2006    registerConfigurationObservers();
2007  }
2008
2009  private void registerConfigurationObservers() {
2010    // Registering the compactSplitThread object with the ConfigurationManager.
2011    configurationManager.registerObserver(this.compactSplitThread);
2012    configurationManager.registerObserver(this.rpcServices);
2013    configurationManager.registerObserver(this);
2014  }
2015
2016  /*
2017   * Verify that server is healthy
2018   */
2019  private boolean isHealthy() {
2020    if (!dataFsOk) {
2021      // File system problem
2022      return false;
2023    }
2024    // Verify that all threads are alive
2025    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2026      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2027      && (this.walRoller == null || this.walRoller.isAlive())
2028      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2029      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2030    if (!healthy) {
2031      stop("One or more threads are no longer alive -- stop");
2032    }
2033    return healthy;
2034  }
2035
2036  @Override
2037  public List<WAL> getWALs() {
2038    return walFactory.getWALs();
2039  }
2040
2041  @Override
2042  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2043    WAL wal = walFactory.getWAL(regionInfo);
2044    if (this.walRoller != null) {
2045      this.walRoller.addWAL(wal);
2046    }
2047    return wal;
2048  }
2049
2050  public LogRoller getWalRoller() {
2051    return walRoller;
2052  }
2053
2054  WALFactory getWalFactory() {
2055    return walFactory;
2056  }
2057
2058  @Override
2059  public void stop(final String msg) {
2060    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2061  }
2062
2063  /**
2064   * Stops the regionserver.
2065   * @param msg   Status message
2066   * @param force True if this is a regionserver abort
2067   * @param user  The user executing the stop request, or null if no user is associated
2068   */
2069  public void stop(final String msg, final boolean force, final User user) {
2070    if (!this.stopped) {
2071      LOG.info("***** STOPPING region server '" + this + "' *****");
2072      if (this.rsHost != null) {
2073        // when forced via abort don't allow CPs to override
2074        try {
2075          this.rsHost.preStop(msg, user);
2076        } catch (IOException ioe) {
2077          if (!force) {
2078            LOG.warn("The region server did not stop", ioe);
2079            return;
2080          }
2081          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2082        }
2083      }
2084      this.stopped = true;
2085      LOG.info("STOPPED: " + msg);
2086      // Wakes run() if it is sleeping
2087      sleeper.skipSleepCycle();
2088    }
2089  }
2090
2091  public void waitForServerOnline() {
2092    while (!isStopped() && !isOnline()) {
2093      synchronized (online) {
2094        try {
2095          online.wait(msgInterval);
2096        } catch (InterruptedException ie) {
2097          Thread.currentThread().interrupt();
2098          break;
2099        }
2100      }
2101    }
2102  }
2103
2104  @Override
2105  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2106    HRegion r = context.getRegion();
2107    long openProcId = context.getOpenProcId();
2108    long masterSystemTime = context.getMasterSystemTime();
2109    rpcServices.checkOpen();
2110    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2111      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2112    // Do checks to see if we need to compact (references or too many files)
2113    // Skip compaction check if region is read only
2114    if (!r.isReadOnly()) {
2115      for (HStore s : r.stores.values()) {
2116        if (s.hasReferences() || s.needsCompaction()) {
2117          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2118        }
2119      }
2120    }
2121    long openSeqNum = r.getOpenSeqNum();
2122    if (openSeqNum == HConstants.NO_SEQNUM) {
2123      // If we opened a region, we should have read some sequence number from it.
2124      LOG.error(
2125        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2126      openSeqNum = 0;
2127    }
2128
2129    // Notify master
2130    if (
2131      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2132        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))
2133    ) {
2134      throw new IOException(
2135        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2136    }
2137
2138    triggerFlushInPrimaryRegion(r);
2139
2140    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2141  }
2142
2143  /**
2144   * Helper method for use in tests. Skip the region transition report when there's no master around
2145   * to receive it.
2146   */
2147  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2148    final TransitionCode code = context.getCode();
2149    final long openSeqNum = context.getOpenSeqNum();
2150    long masterSystemTime = context.getMasterSystemTime();
2151    final RegionInfo[] hris = context.getHris();
2152
2153    if (code == TransitionCode.OPENED) {
2154      Preconditions.checkArgument(hris != null && hris.length == 1);
2155      if (hris[0].isMetaRegion()) {
2156        LOG.warn(
2157          "meta table location is stored in master local store, so we can not skip reporting");
2158        return false;
2159      } else {
2160        try {
2161          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2162            serverName, openSeqNum, masterSystemTime);
2163        } catch (IOException e) {
2164          LOG.info("Failed to update meta", e);
2165          return false;
2166        }
2167      }
2168    }
2169    return true;
2170  }
2171
2172  private ReportRegionStateTransitionRequest
2173    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2174    final TransitionCode code = context.getCode();
2175    final long openSeqNum = context.getOpenSeqNum();
2176    final RegionInfo[] hris = context.getHris();
2177    final long[] procIds = context.getProcIds();
2178
2179    ReportRegionStateTransitionRequest.Builder builder =
2180      ReportRegionStateTransitionRequest.newBuilder();
2181    builder.setServer(ProtobufUtil.toServerName(serverName));
2182    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2183    transition.setTransitionCode(code);
2184    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2185      transition.setOpenSeqNum(openSeqNum);
2186    }
2187    for (RegionInfo hri : hris) {
2188      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2189    }
2190    for (long procId : procIds) {
2191      transition.addProcId(procId);
2192    }
2193
2194    return builder.build();
2195  }
2196
2197  @Override
2198  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2199    if (TEST_SKIP_REPORTING_TRANSITION) {
2200      return skipReportingTransition(context);
2201    }
2202    final ReportRegionStateTransitionRequest request =
2203      createReportRegionStateTransitionRequest(context);
2204
2205    int tries = 0;
2206    long pauseTime = this.retryPauseTime;
2207    // Keep looping till we get an error. We want to send reports even though server is going down.
2208    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2209    // HRegionServer does down.
2210    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2211      RegionServerStatusService.BlockingInterface rss = rssStub;
2212      try {
2213        if (rss == null) {
2214          createRegionServerStatusStub();
2215          continue;
2216        }
2217        ReportRegionStateTransitionResponse response =
2218          rss.reportRegionStateTransition(null, request);
2219        if (response.hasErrorMessage()) {
2220          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2221          break;
2222        }
2223        // Log if we had to retry else don't log unless TRACE. We want to
2224        // know if were successful after an attempt showed in logs as failed.
2225        if (tries > 0 || LOG.isTraceEnabled()) {
2226          LOG.info("TRANSITION REPORTED " + request);
2227        }
2228        // NOTE: Return mid-method!!!
2229        return true;
2230      } catch (ServiceException se) {
2231        IOException ioe = ProtobufUtil.getRemoteException(se);
2232        boolean pause = ioe instanceof ServerNotRunningYetException
2233          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2234        if (pause) {
2235          // Do backoff else we flood the Master with requests.
2236          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2237        } else {
2238          pauseTime = this.retryPauseTime; // Reset.
2239        }
2240        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2241          + tries + ")"
2242          + (pause
2243            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2244            : " immediately."),
2245          ioe);
2246        if (pause) {
2247          Threads.sleep(pauseTime);
2248        }
2249        tries++;
2250        if (rssStub == rss) {
2251          rssStub = null;
2252        }
2253      }
2254    }
2255    return false;
2256  }
2257
2258  /**
2259   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2260   * block this thread. See RegionReplicaFlushHandler for details.
2261   */
2262  private void triggerFlushInPrimaryRegion(final HRegion region) {
2263    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2264      return;
2265    }
2266    TableName tn = region.getTableDescriptor().getTableName();
2267    if (
2268      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2269        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2270        // If the memstore replication not setup, we do not have to wait for observing a flush event
2271        // from primary before starting to serve reads, because gaps from replication is not
2272        // applicable,this logic is from
2273        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2274        // HBASE-13063
2275        !region.getTableDescriptor().hasRegionMemStoreReplication()
2276    ) {
2277      region.setReadsEnabled(true);
2278      return;
2279    }
2280
2281    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2282    // RegionReplicaFlushHandler might reset this.
2283
2284    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2285    if (this.executorService != null) {
2286      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2287    } else {
2288      LOG.info("Executor is null; not running flush of primary region replica for {}",
2289        region.getRegionInfo());
2290    }
2291  }
2292
2293  @InterfaceAudience.Private
2294  public RSRpcServices getRSRpcServices() {
2295    return rpcServices;
2296  }
2297
2298  /**
2299   * Cause the server to exit without closing the regions it is serving, the log it is using and
2300   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2301   * yanked out from under hbase or we OOME. n * the reason we are aborting n * the exception that
2302   * caused the abort, or null
2303   */
2304  @Override
2305  public void abort(String reason, Throwable cause) {
2306    if (!setAbortRequested()) {
2307      // Abort already in progress, ignore the new request.
2308      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2309      return;
2310    }
2311    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2312    if (cause != null) {
2313      LOG.error(HBaseMarkers.FATAL, msg, cause);
2314    } else {
2315      LOG.error(HBaseMarkers.FATAL, msg);
2316    }
2317    // HBASE-4014: show list of coprocessors that were loaded to help debug
2318    // regionserver crashes.Note that we're implicitly using
2319    // java.util.HashSet's toString() method to print the coprocessor names.
2320    LOG.error(HBaseMarkers.FATAL,
2321      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2322    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2323    try {
2324      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2325    } catch (MalformedObjectNameException | IOException e) {
2326      LOG.warn("Failed dumping metrics", e);
2327    }
2328
2329    // Do our best to report our abort to the master, but this may not work
2330    try {
2331      if (cause != null) {
2332        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2333      }
2334      // Report to the master but only if we have already registered with the master.
2335      RegionServerStatusService.BlockingInterface rss = rssStub;
2336      if (rss != null && this.serverName != null) {
2337        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2338        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2339        builder.setErrorMessage(msg);
2340        rss.reportRSFatalError(null, builder.build());
2341      }
2342    } catch (Throwable t) {
2343      LOG.warn("Unable to report fatal error to master", t);
2344    }
2345
2346    scheduleAbortTimer();
2347    // shutdown should be run as the internal user
2348    stop(reason, true, null);
2349  }
2350
2351  /*
2352   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2353   * close socket in case want to bring up server on old hostname+port immediately.
2354   */
2355  @InterfaceAudience.Private
2356  protected void kill() {
2357    this.killed = true;
2358    abort("Simulated kill");
2359  }
2360
2361  // Limits the time spent in the shutdown process.
2362  private void scheduleAbortTimer() {
2363    if (this.abortMonitor == null) {
2364      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2365      TimerTask abortTimeoutTask = null;
2366      try {
2367        Constructor<? extends TimerTask> timerTaskCtor =
2368          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2369            .asSubclass(TimerTask.class).getDeclaredConstructor();
2370        timerTaskCtor.setAccessible(true);
2371        abortTimeoutTask = timerTaskCtor.newInstance();
2372      } catch (Exception e) {
2373        LOG.warn("Initialize abort timeout task failed", e);
2374      }
2375      if (abortTimeoutTask != null) {
2376        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2377      }
2378    }
2379  }
2380
2381  /**
2382   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2383   * called.
2384   */
2385  protected void stopServiceThreads() {
2386    // clean up the scheduled chores
2387    stopChoreService();
2388    if (bootstrapNodeManager != null) {
2389      bootstrapNodeManager.stop();
2390    }
2391    if (this.cacheFlusher != null) {
2392      this.cacheFlusher.join();
2393    }
2394    if (this.walRoller != null) {
2395      this.walRoller.close();
2396    }
2397    if (this.compactSplitThread != null) {
2398      this.compactSplitThread.join();
2399    }
2400    stopExecutorService();
2401    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2402      this.replicationSourceHandler.stopReplicationService();
2403    } else {
2404      if (this.replicationSourceHandler != null) {
2405        this.replicationSourceHandler.stopReplicationService();
2406      }
2407      if (this.replicationSinkHandler != null) {
2408        this.replicationSinkHandler.stopReplicationService();
2409      }
2410    }
2411  }
2412
2413  /**
2414   * @return Return the object that implements the replication source executorService.
2415   */
2416  @Override
2417  public ReplicationSourceService getReplicationSourceService() {
2418    return replicationSourceHandler;
2419  }
2420
2421  /**
2422   * @return Return the object that implements the replication sink executorService.
2423   */
2424  public ReplicationSinkService getReplicationSinkService() {
2425    return replicationSinkHandler;
2426  }
2427
2428  /**
2429   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2430   * connection, the current rssStub must be null. Method will block until a master is available.
2431   * You can break from this block by requesting the server stop.
2432   * @return master + port, or null if server has been stopped
2433   */
2434  private synchronized ServerName createRegionServerStatusStub() {
2435    // Create RS stub without refreshing the master node from ZK, use cached data
2436    return createRegionServerStatusStub(false);
2437  }
2438
2439  /**
2440   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2441   * connection, the current rssStub must be null. Method will block until a master is available.
2442   * You can break from this block by requesting the server stop.
2443   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2444   * @return master + port, or null if server has been stopped
2445   */
2446  @InterfaceAudience.Private
2447  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2448    if (rssStub != null) {
2449      return masterAddressTracker.getMasterAddress();
2450    }
2451    ServerName sn = null;
2452    long previousLogTime = 0;
2453    RegionServerStatusService.BlockingInterface intRssStub = null;
2454    LockService.BlockingInterface intLockStub = null;
2455    boolean interrupted = false;
2456    try {
2457      while (keepLooping()) {
2458        sn = this.masterAddressTracker.getMasterAddress(refresh);
2459        if (sn == null) {
2460          if (!keepLooping()) {
2461            // give up with no connection.
2462            LOG.debug("No master found and cluster is stopped; bailing out");
2463            return null;
2464          }
2465          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2466            LOG.debug("No master found; retry");
2467            previousLogTime = EnvironmentEdgeManager.currentTime();
2468          }
2469          refresh = true; // let's try pull it from ZK directly
2470          if (sleepInterrupted(200)) {
2471            interrupted = true;
2472          }
2473          continue;
2474        }
2475        try {
2476          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
2477            userProvider.getCurrent(), shortOperationTimeout);
2478          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2479          intLockStub = LockService.newBlockingStub(channel);
2480          break;
2481        } catch (IOException e) {
2482          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2483            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
2484            if (e instanceof ServerNotRunningYetException) {
2485              LOG.info("Master isn't available yet, retrying");
2486            } else {
2487              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2488            }
2489            previousLogTime = EnvironmentEdgeManager.currentTime();
2490          }
2491          if (sleepInterrupted(200)) {
2492            interrupted = true;
2493          }
2494        }
2495      }
2496    } finally {
2497      if (interrupted) {
2498        Thread.currentThread().interrupt();
2499      }
2500    }
2501    this.rssStub = intRssStub;
2502    this.lockStub = intLockStub;
2503    return sn;
2504  }
2505
2506  /**
2507   * @return True if we should break loop because cluster is going down or this server has been
2508   *         stopped or hdfs has gone bad.
2509   */
2510  private boolean keepLooping() {
2511    return !this.stopped && isClusterUp();
2512  }
2513
2514  /*
2515   * Let the master know we're here Run initialization using parameters passed us by the master.
2516   * @return A Map of key/value configurations we got from the Master else null if we failed to
2517   * register. n
2518   */
2519  private RegionServerStartupResponse reportForDuty() throws IOException {
2520    if (this.masterless) {
2521      return RegionServerStartupResponse.getDefaultInstance();
2522    }
2523    ServerName masterServerName = createRegionServerStatusStub(true);
2524    RegionServerStatusService.BlockingInterface rss = rssStub;
2525    if (masterServerName == null || rss == null) {
2526      return null;
2527    }
2528    RegionServerStartupResponse result = null;
2529    try {
2530      rpcServices.requestCount.reset();
2531      rpcServices.rpcGetRequestCount.reset();
2532      rpcServices.rpcScanRequestCount.reset();
2533      rpcServices.rpcFullScanRequestCount.reset();
2534      rpcServices.rpcMultiRequestCount.reset();
2535      rpcServices.rpcMutateRequestCount.reset();
2536      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2537        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2538      long now = EnvironmentEdgeManager.currentTime();
2539      int port = rpcServices.getSocketAddress().getPort();
2540      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2541      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2542        request.setUseThisHostnameInstead(useThisHostnameInstead);
2543      }
2544      request.setPort(port);
2545      request.setServerStartCode(this.startcode);
2546      request.setServerCurrentTime(now);
2547      result = rss.regionServerStartup(null, request.build());
2548    } catch (ServiceException se) {
2549      IOException ioe = ProtobufUtil.getRemoteException(se);
2550      if (ioe instanceof ClockOutOfSyncException) {
2551        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
2552        // Re-throw IOE will cause RS to abort
2553        throw ioe;
2554      } else if (ioe instanceof ServerNotRunningYetException) {
2555        LOG.debug("Master is not running yet");
2556      } else {
2557        LOG.warn("error telling master we are up", se);
2558      }
2559      rssStub = null;
2560    }
2561    return result;
2562  }
2563
2564  @Override
2565  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2566    try {
2567      GetLastFlushedSequenceIdRequest req =
2568        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2569      RegionServerStatusService.BlockingInterface rss = rssStub;
2570      if (rss == null) { // Try to connect one more time
2571        createRegionServerStatusStub();
2572        rss = rssStub;
2573        if (rss == null) {
2574          // Still no luck, we tried
2575          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2576          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2577            .build();
2578        }
2579      }
2580      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2581      return RegionStoreSequenceIds.newBuilder()
2582        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2583        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2584    } catch (ServiceException e) {
2585      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2586      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2587        .build();
2588    }
2589  }
2590
2591  /**
2592   * Close meta region if we carry it
2593   * @param abort Whether we're running an abort.
2594   */
2595  private void closeMetaTableRegions(final boolean abort) {
2596    HRegion meta = null;
2597    this.onlineRegionsLock.writeLock().lock();
2598    try {
2599      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
2600        RegionInfo hri = e.getValue().getRegionInfo();
2601        if (hri.isMetaRegion()) {
2602          meta = e.getValue();
2603        }
2604        if (meta != null) {
2605          break;
2606        }
2607      }
2608    } finally {
2609      this.onlineRegionsLock.writeLock().unlock();
2610    }
2611    if (meta != null) {
2612      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2613    }
2614  }
2615
2616  /**
2617   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
2618   * close regions that are already closed or that are closing.
2619   * @param abort Whether we're running an abort.
2620   */
2621  private void closeUserRegions(final boolean abort) {
2622    this.onlineRegionsLock.writeLock().lock();
2623    try {
2624      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
2625        HRegion r = e.getValue();
2626        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2627          // Don't update zk with this close transition; pass false.
2628          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2629        }
2630      }
2631    } finally {
2632      this.onlineRegionsLock.writeLock().unlock();
2633    }
2634  }
2635
2636  protected Map<String, HRegion> getOnlineRegions() {
2637    return this.onlineRegions;
2638  }
2639
2640  public int getNumberOfOnlineRegions() {
2641    return this.onlineRegions.size();
2642  }
2643
2644  /**
2645   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
2646   * as client; HRegion cannot be serialized to cross an rpc.
2647   */
2648  public Collection<HRegion> getOnlineRegionsLocalContext() {
2649    Collection<HRegion> regions = this.onlineRegions.values();
2650    return Collections.unmodifiableCollection(regions);
2651  }
2652
2653  @Override
2654  public void addRegion(HRegion region) {
2655    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2656    configurationManager.registerObserver(region);
2657  }
2658
2659  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2660    long size) {
2661    if (!sortedRegions.containsKey(size)) {
2662      sortedRegions.put(size, new ArrayList<>());
2663    }
2664    sortedRegions.get(size).add(region);
2665  }
2666
2667  /**
2668   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2669   *         the biggest.
2670   */
2671  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2672    // we'll sort the regions in reverse
2673    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2674    // Copy over all regions. Regions are sorted by size with biggest first.
2675    for (HRegion region : this.onlineRegions.values()) {
2676      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2677    }
2678    return sortedRegions;
2679  }
2680
2681  /**
2682   * @return A new Map of online regions sorted by region heap size with the first entry being the
2683   *         biggest.
2684   */
2685  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2686    // we'll sort the regions in reverse
2687    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2688    // Copy over all regions. Regions are sorted by size with biggest first.
2689    for (HRegion region : this.onlineRegions.values()) {
2690      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2691    }
2692    return sortedRegions;
2693  }
2694
2695  /** @return reference to FlushRequester */
2696  @Override
2697  public FlushRequester getFlushRequester() {
2698    return this.cacheFlusher;
2699  }
2700
2701  @Override
2702  public CompactionRequester getCompactionRequestor() {
2703    return this.compactSplitThread;
2704  }
2705
2706  @Override
2707  public LeaseManager getLeaseManager() {
2708    return leaseManager;
2709  }
2710
2711  /**
2712   * @return {@code true} when the data file system is available, {@code false} otherwise.
2713   */
2714  boolean isDataFileSystemOk() {
2715    return this.dataFsOk;
2716  }
2717
2718  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
2719    return this.rsHost;
2720  }
2721
2722  @Override
2723  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2724    return this.regionsInTransitionInRS;
2725  }
2726
2727  @Override
2728  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2729    return rsQuotaManager;
2730  }
2731
2732  //
2733  // Main program and support routines
2734  //
2735  /**
2736   * Load the replication executorService objects, if any
2737   */
2738  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2739    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2740    // read in the name of the source replication class from the config file.
2741    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2742      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2743
2744    // read in the name of the sink replication class from the config file.
2745    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2746      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2747
2748    // If both the sink and the source class names are the same, then instantiate
2749    // only one object.
2750    if (sourceClassname.equals(sinkClassname)) {
2751      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2752        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2753      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2754      server.sameReplicationSourceAndSink = true;
2755    } else {
2756      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2757        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2758      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2759        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2760      server.sameReplicationSourceAndSink = false;
2761    }
2762  }
2763
2764  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2765    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2766    Path oldLogDir, WALFactory walFactory) throws IOException {
2767    final Class<? extends T> clazz;
2768    try {
2769      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2770      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2771    } catch (java.lang.ClassNotFoundException nfe) {
2772      throw new IOException("Could not find class for " + classname);
2773    }
2774    T service = ReflectionUtils.newInstance(clazz, conf);
2775    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2776    return service;
2777  }
2778
2779  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
2780    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2781    if (!this.isOnline()) {
2782      return walGroupsReplicationStatus;
2783    }
2784    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2785    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2786    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2787    for (ReplicationSourceInterface source : allSources) {
2788      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2789    }
2790    return walGroupsReplicationStatus;
2791  }
2792
2793  /**
2794   * Utility for constructing an instance of the passed HRegionServer class.
2795   */
2796  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
2797    final Configuration conf) {
2798    try {
2799      Constructor<? extends HRegionServer> c =
2800        regionServerClass.getConstructor(Configuration.class);
2801      return c.newInstance(conf);
2802    } catch (Exception e) {
2803      throw new RuntimeException(
2804        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
2805    }
2806  }
2807
2808  /**
2809   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2810   */
2811  public static void main(String[] args) {
2812    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2813    VersionInfo.logVersion();
2814    Configuration conf = HBaseConfiguration.create();
2815    @SuppressWarnings("unchecked")
2816    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2817      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2818
2819    new HRegionServerCommandLine(regionServerClass).doMain(args);
2820  }
2821
2822  /**
2823   * Gets the online regions of the specified table. This method looks at the in-memory
2824   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
2825   * If a region on this table has been closed during a disable, etc., it will not be included in
2826   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
2827   * all the ONLINE regions in the table.
2828   * @param tableName table to limit the scope of the query
2829   * @return Online regions from <code>tableName</code>
2830   */
2831  @Override
2832  public List<HRegion> getRegions(TableName tableName) {
2833    List<HRegion> tableRegions = new ArrayList<>();
2834    synchronized (this.onlineRegions) {
2835      for (HRegion region : this.onlineRegions.values()) {
2836        RegionInfo regionInfo = region.getRegionInfo();
2837        if (regionInfo.getTable().equals(tableName)) {
2838          tableRegions.add(region);
2839        }
2840      }
2841    }
2842    return tableRegions;
2843  }
2844
2845  @Override
2846  public List<HRegion> getRegions() {
2847    List<HRegion> allRegions;
2848    synchronized (this.onlineRegions) {
2849      // Return a clone copy of the onlineRegions
2850      allRegions = new ArrayList<>(onlineRegions.values());
2851    }
2852    return allRegions;
2853  }
2854
2855  /**
2856   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
2857   * @return all the online tables in this RS
2858   */
2859  public Set<TableName> getOnlineTables() {
2860    Set<TableName> tables = new HashSet<>();
2861    synchronized (this.onlineRegions) {
2862      for (Region region : this.onlineRegions.values()) {
2863        tables.add(region.getTableDescriptor().getTableName());
2864      }
2865    }
2866    return tables;
2867  }
2868
2869  public String[] getRegionServerCoprocessors() {
2870    TreeSet<String> coprocessors = new TreeSet<>();
2871    try {
2872      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2873    } catch (IOException exception) {
2874      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
2875        + "skipping.");
2876      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2877    }
2878    Collection<HRegion> regions = getOnlineRegionsLocalContext();
2879    for (HRegion region : regions) {
2880      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2881      try {
2882        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2883      } catch (IOException exception) {
2884        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
2885          + "; skipping.");
2886        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2887      }
2888    }
2889    coprocessors.addAll(rsHost.getCoprocessors());
2890    return coprocessors.toArray(new String[0]);
2891  }
2892
2893  /**
2894   * Try to close the region, logs a warning on failure but continues.
2895   * @param region Region to close
2896   */
2897  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
2898    try {
2899      if (!closeRegion(region.getEncodedName(), abort, null)) {
2900        LOG
2901          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
2902      }
2903    } catch (IOException e) {
2904      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
2905        e);
2906    }
2907  }
2908
2909  /**
2910   * Close asynchronously a region, can be called from the master or internally by the regionserver
2911   * when stopping. If called from the master, the region will update the status.
2912   * <p>
2913   * If an opening was in progress, this method will cancel it, but will not start a new close. The
2914   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2915   * </p>
2916   * <p>
2917   * If a close was in progress, this new request will be ignored, and an exception thrown.
2918   * </p>
2919   * @param encodedName Region to close
2920   * @param abort       True if we are aborting
2921   * @param destination Where the Region is being moved too... maybe null if unknown.
2922   * @return True if closed a region.
2923   * @throws NotServingRegionException if the region is not online
2924   */
2925  protected boolean closeRegion(String encodedName, final boolean abort,
2926    final ServerName destination) throws NotServingRegionException {
2927    // Check for permissions to close.
2928    HRegion actualRegion = this.getRegion(encodedName);
2929    // Can be null if we're calling close on a region that's not online
2930    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2931      try {
2932        actualRegion.getCoprocessorHost().preClose(false);
2933      } catch (IOException exp) {
2934        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2935        return false;
2936      }
2937    }
2938
2939    // previous can come back 'null' if not in map.
2940    final Boolean previous =
2941      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
2942
2943    if (Boolean.TRUE.equals(previous)) {
2944      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
2945        + "trying to OPEN. Cancelling OPENING.");
2946      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
2947        // The replace failed. That should be an exceptional case, but theoretically it can happen.
2948        // We're going to try to do a standard close then.
2949        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
2950          + " Doing a standard close now");
2951        return closeRegion(encodedName, abort, destination);
2952      }
2953      // Let's get the region from the online region list again
2954      actualRegion = this.getRegion(encodedName);
2955      if (actualRegion == null) { // If already online, we still need to close it.
2956        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2957        // The master deletes the znode when it receives this exception.
2958        throw new NotServingRegionException(
2959          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
2960      }
2961    } else if (previous == null) {
2962      LOG.info("Received CLOSE for {}", encodedName);
2963    } else if (Boolean.FALSE.equals(previous)) {
2964      LOG.info("Received CLOSE for the region: " + encodedName
2965        + ", which we are already trying to CLOSE, but not completed yet");
2966      return true;
2967    }
2968
2969    if (actualRegion == null) {
2970      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
2971      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
2972      // The master deletes the znode when it receives this exception.
2973      throw new NotServingRegionException(
2974        "The region " + encodedName + " is not online, and is not opening.");
2975    }
2976
2977    CloseRegionHandler crh;
2978    final RegionInfo hri = actualRegion.getRegionInfo();
2979    if (hri.isMetaRegion()) {
2980      crh = new CloseMetaHandler(this, this, hri, abort);
2981    } else {
2982      crh = new CloseRegionHandler(this, this, hri, abort, destination);
2983    }
2984    this.executorService.submit(crh);
2985    return true;
2986  }
2987
2988  /**
2989   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
2990   *         member of the online regions.
2991   */
2992  public HRegion getOnlineRegion(final byte[] regionName) {
2993    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
2994    return this.onlineRegions.get(encodedRegionName);
2995  }
2996
2997  @Override
2998  public HRegion getRegion(final String encodedRegionName) {
2999    return this.onlineRegions.get(encodedRegionName);
3000  }
3001
3002  @Override
3003  public boolean removeRegion(final HRegion r, ServerName destination) {
3004    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3005    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3006    if (destination != null) {
3007      long closeSeqNum = r.getMaxFlushedSeqId();
3008      if (closeSeqNum == HConstants.NO_SEQNUM) {
3009        // No edits in WAL for this region; get the sequence number when the region was opened.
3010        closeSeqNum = r.getOpenSeqNum();
3011        if (closeSeqNum == HConstants.NO_SEQNUM) {
3012          closeSeqNum = 0;
3013        }
3014      }
3015      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3016      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3017      if (selfMove) {
3018        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3019          r.getRegionInfo().getEncodedName(),
3020          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3021      }
3022    }
3023    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3024    configurationManager.deregisterObserver(r);
3025    return toReturn != null;
3026  }
3027
3028  /**
3029   * Protected Utility method for safely obtaining an HRegion handle.
3030   * @param regionName Name of online {@link HRegion} to return
3031   * @return {@link HRegion} for <code>regionName</code>
3032   */
3033  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3034    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3035    return getRegionByEncodedName(regionName, encodedRegionName);
3036  }
3037
3038  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3039    return getRegionByEncodedName(null, encodedRegionName);
3040  }
3041
3042  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3043    throws NotServingRegionException {
3044    HRegion region = this.onlineRegions.get(encodedRegionName);
3045    if (region == null) {
3046      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3047      if (moveInfo != null) {
3048        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3049      }
3050      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3051      String regionNameStr =
3052        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3053      if (isOpening != null && isOpening) {
3054        throw new RegionOpeningException(
3055          "Region " + regionNameStr + " is opening on " + this.serverName);
3056      }
3057      throw new NotServingRegionException(
3058        "" + regionNameStr + " is not online on " + this.serverName);
3059    }
3060    return region;
3061  }
3062
3063  /**
3064   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3065   * already.
3066   * @param t   Throwable
3067   * @param msg Message to log in error. Can be null.
3068   * @return Throwable converted to an IOE; methods can only let out IOEs.
3069   */
3070  private Throwable cleanup(final Throwable t, final String msg) {
3071    // Don't log as error if NSRE; NSRE is 'normal' operation.
3072    if (t instanceof NotServingRegionException) {
3073      LOG.debug("NotServingRegionException; " + t.getMessage());
3074      return t;
3075    }
3076    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3077    if (msg == null) {
3078      LOG.error("", e);
3079    } else {
3080      LOG.error(msg, e);
3081    }
3082    if (!rpcServices.checkOOME(t)) {
3083      checkFileSystem();
3084    }
3085    return t;
3086  }
3087
3088  /**
3089   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3090   * @return Make <code>t</code> an IOE if it isn't already.
3091   */
3092  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3093    return (t instanceof IOException ? (IOException) t
3094      : msg == null || msg.length() == 0 ? new IOException(t)
3095      : new IOException(msg, t));
3096  }
3097
3098  /**
3099   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3100   * stopRequested
3101   * @return false if file system is not available
3102   */
3103  boolean checkFileSystem() {
3104    if (this.dataFsOk && this.dataFs != null) {
3105      try {
3106        FSUtils.checkFileSystemAvailable(this.dataFs);
3107      } catch (IOException e) {
3108        abort("File System not available", e);
3109        this.dataFsOk = false;
3110      }
3111    }
3112    return this.dataFsOk;
3113  }
3114
3115  @Override
3116  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3117    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3118    Address[] addr = new Address[favoredNodes.size()];
3119    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3120    // it is a map of region name to Address[]
3121    for (int i = 0; i < favoredNodes.size(); i++) {
3122      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3123    }
3124    regionFavoredNodesMap.put(encodedRegionName, addr);
3125  }
3126
3127  /**
3128   * Return the favored nodes for a region given its encoded name. Look at the comment around
3129   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3130   * @param encodedRegionName the encoded region name.
3131   * @return array of favored locations
3132   */
3133  @Override
3134  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3135    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3136  }
3137
3138  @Override
3139  public ServerNonceManager getNonceManager() {
3140    return this.nonceManager;
3141  }
3142
3143  private static class MovedRegionInfo {
3144    private final ServerName serverName;
3145    private final long seqNum;
3146
3147    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3148      this.serverName = serverName;
3149      this.seqNum = closeSeqNum;
3150    }
3151
3152    public ServerName getServerName() {
3153      return serverName;
3154    }
3155
3156    public long getSeqNum() {
3157      return seqNum;
3158    }
3159  }
3160
3161  /**
3162   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3163   * number of network calls instead of reducing them.
3164   */
3165  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3166
3167  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3168    boolean selfMove) {
3169    if (selfMove) {
3170      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3171      return;
3172    }
3173    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3174      + closeSeqNum);
3175    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3176  }
3177
3178  void removeFromMovedRegions(String encodedName) {
3179    movedRegionInfoCache.invalidate(encodedName);
3180  }
3181
3182  @InterfaceAudience.Private
3183  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3184    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3185  }
3186
3187  @InterfaceAudience.Private
3188  public int movedRegionCacheExpiredTime() {
3189    return TIMEOUT_REGION_MOVED;
3190  }
3191
3192  private String getMyEphemeralNodePath() {
3193    return zooKeeper.getZNodePaths().getRsPath(serverName);
3194  }
3195
3196  private boolean isHealthCheckerConfigured() {
3197    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3198    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3199  }
3200
3201  /**
3202   * @return the underlying {@link CompactSplit} for the servers
3203   */
3204  public CompactSplit getCompactSplitThread() {
3205    return this.compactSplitThread;
3206  }
3207
3208  CoprocessorServiceResponse execRegionServerService(
3209    @SuppressWarnings("UnusedParameters") final RpcController controller,
3210    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3211    try {
3212      ServerRpcController serviceController = new ServerRpcController();
3213      CoprocessorServiceCall call = serviceRequest.getCall();
3214      String serviceName = call.getServiceName();
3215      Service service = coprocessorServiceHandlers.get(serviceName);
3216      if (service == null) {
3217        throw new UnknownProtocolException(null,
3218          "No registered coprocessor executorService found for " + serviceName);
3219      }
3220      ServiceDescriptor serviceDesc = service.getDescriptorForType();
3221
3222      String methodName = call.getMethodName();
3223      MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3224      if (methodDesc == null) {
3225        throw new UnknownProtocolException(service.getClass(),
3226          "Unknown method " + methodName + " called on executorService " + serviceName);
3227      }
3228
3229      Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3230      final Message.Builder responseBuilder =
3231        service.getResponsePrototype(methodDesc).newBuilderForType();
3232      service.callMethod(methodDesc, serviceController, request, message -> {
3233        if (message != null) {
3234          responseBuilder.mergeFrom(message);
3235        }
3236      });
3237      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3238      if (exception != null) {
3239        throw exception;
3240      }
3241      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3242    } catch (IOException ie) {
3243      throw new ServiceException(ie);
3244    }
3245  }
3246
3247  /**
3248   * May be null if this is a master which not carry table.
3249   * @return The block cache instance used by the regionserver.
3250   */
3251  @Override
3252  public Optional<BlockCache> getBlockCache() {
3253    return Optional.ofNullable(this.blockCache);
3254  }
3255
3256  /**
3257   * May be null if this is a master which not carry table.
3258   * @return The cache for mob files used by the regionserver.
3259   */
3260  @Override
3261  public Optional<MobFileCache> getMobFileCache() {
3262    return Optional.ofNullable(this.mobFileCache);
3263  }
3264
3265  /**
3266   * @return : Returns the ConfigurationManager object for testing purposes.
3267   */
3268  ConfigurationManager getConfigurationManager() {
3269    return configurationManager;
3270  }
3271
3272  CacheEvictionStats clearRegionBlockCache(Region region) {
3273    long evictedBlocks = 0;
3274
3275    for (Store store : region.getStores()) {
3276      for (StoreFile hFile : store.getStorefiles()) {
3277        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3278      }
3279    }
3280
3281    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3282  }
3283
3284  @Override
3285  public double getCompactionPressure() {
3286    double max = 0;
3287    for (Region region : onlineRegions.values()) {
3288      for (Store store : region.getStores()) {
3289        double normCount = store.getCompactionPressure();
3290        if (normCount > max) {
3291          max = normCount;
3292        }
3293      }
3294    }
3295    return max;
3296  }
3297
3298  @Override
3299  public HeapMemoryManager getHeapMemoryManager() {
3300    return hMemManager;
3301  }
3302
3303  public MemStoreFlusher getMemStoreFlusher() {
3304    return cacheFlusher;
3305  }
3306
3307  /**
3308   * For testing
3309   * @return whether all wal roll request finished for this regionserver
3310   */
3311  @InterfaceAudience.Private
3312  public boolean walRollRequestFinished() {
3313    return this.walRoller.walRollFinished();
3314  }
3315
3316  @Override
3317  public ThroughputController getFlushThroughputController() {
3318    return flushThroughputController;
3319  }
3320
3321  @Override
3322  public double getFlushPressure() {
3323    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3324      // return 0 during RS initialization
3325      return 0.0;
3326    }
3327    return getRegionServerAccounting().getFlushPressure();
3328  }
3329
3330  @Override
3331  public void onConfigurationChange(Configuration newConf) {
3332    ThroughputController old = this.flushThroughputController;
3333    if (old != null) {
3334      old.stop("configuration change");
3335    }
3336    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3337    try {
3338      Superusers.initialize(newConf);
3339    } catch (IOException e) {
3340      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3341    }
3342
3343    // update region server coprocessor if the configuration has changed.
3344    if (
3345      CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
3346        CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)
3347    ) {
3348      LOG.info("Update region server coprocessors because the configuration has changed");
3349      this.rsHost = new RegionServerCoprocessorHost(this, newConf);
3350    }
3351  }
3352
3353  @Override
3354  public MetricsRegionServer getMetrics() {
3355    return metricsRegionServer;
3356  }
3357
3358  @Override
3359  public SecureBulkLoadManager getSecureBulkLoadManager() {
3360    return this.secureBulkLoadManager;
3361  }
3362
3363  @Override
3364  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3365    final Abortable abort) {
3366    final LockServiceClient client =
3367      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3368    return client.regionLock(regionInfo, description, abort);
3369  }
3370
3371  @Override
3372  public void unassign(byte[] regionName) throws IOException {
3373    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3374  }
3375
3376  @Override
3377  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3378    return this.rsSpaceQuotaManager;
3379  }
3380
3381  @Override
3382  public boolean reportFileArchivalForQuotas(TableName tableName,
3383    Collection<Entry<String, Long>> archivedFiles) {
3384    if (TEST_SKIP_REPORTING_TRANSITION) {
3385      return false;
3386    }
3387    RegionServerStatusService.BlockingInterface rss = rssStub;
3388    if (rss == null || rsSpaceQuotaManager == null) {
3389      // the current server could be stopping.
3390      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3391      return false;
3392    }
3393    try {
3394      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3395        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3396      rss.reportFileArchival(null, request);
3397    } catch (ServiceException se) {
3398      IOException ioe = ProtobufUtil.getRemoteException(se);
3399      if (ioe instanceof PleaseHoldException) {
3400        if (LOG.isTraceEnabled()) {
3401          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3402            + " This will be retried.", ioe);
3403        }
3404        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3405        return false;
3406      }
3407      if (rssStub == rss) {
3408        rssStub = null;
3409      }
3410      // re-create the stub if we failed to report the archival
3411      createRegionServerStatusStub(true);
3412      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3413      return false;
3414    }
3415    return true;
3416  }
3417
3418  void executeProcedure(long procId, RSProcedureCallable callable) {
3419    executorService.submit(new RSProcedureHandler(this, procId, callable));
3420  }
3421
3422  public void remoteProcedureComplete(long procId, Throwable error) {
3423    procedureResultReporter.complete(procId, error);
3424  }
3425
3426  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3427    RegionServerStatusService.BlockingInterface rss;
3428    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3429    for (;;) {
3430      rss = rssStub;
3431      if (rss != null) {
3432        break;
3433      }
3434      createRegionServerStatusStub();
3435    }
3436    try {
3437      rss.reportProcedureDone(null, request);
3438    } catch (ServiceException se) {
3439      if (rssStub == rss) {
3440        rssStub = null;
3441      }
3442      throw ProtobufUtil.getRemoteException(se);
3443    }
3444  }
3445
3446  /**
3447   * Will ignore the open/close region procedures which already submitted or executed. When master
3448   * had unfinished open/close region procedure and restarted, new active master may send duplicate
3449   * open/close region request to regionserver. The open/close request is submitted to a thread pool
3450   * and execute. So first need a cache for submitted open/close region procedures. After the
3451   * open/close region request executed and report region transition succeed, cache it in executed
3452   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3453   * transition succeed, master will not send the open/close region request to regionserver again.
3454   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3455   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
3456   * HBASE-22404 for more details.
3457   * @param procId the id of the open/close region procedure
3458   * @return true if the procedure can be submitted.
3459   */
3460  boolean submitRegionProcedure(long procId) {
3461    if (procId == -1) {
3462      return true;
3463    }
3464    // Ignore the region procedures which already submitted.
3465    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3466    if (previous != null) {
3467      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3468      return false;
3469    }
3470    // Ignore the region procedures which already executed.
3471    if (executedRegionProcedures.getIfPresent(procId) != null) {
3472      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3473      return false;
3474    }
3475    return true;
3476  }
3477
3478  /**
3479   * See {@link #submitRegionProcedure(long)}.
3480   * @param procId the id of the open/close region procedure
3481   */
3482  public void finishRegionProcedure(long procId) {
3483    executedRegionProcedures.put(procId, procId);
3484    submittedRegionProcedures.remove(procId);
3485  }
3486
3487  /**
3488   * Force to terminate region server when abort timeout.
3489   */
3490  private static class SystemExitWhenAbortTimeout extends TimerTask {
3491
3492    public SystemExitWhenAbortTimeout() {
3493    }
3494
3495    @Override
3496    public void run() {
3497      LOG.warn("Aborting region server timed out, terminating forcibly"
3498        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
3499        + " Thread dump to stdout.");
3500      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3501      Runtime.getRuntime().halt(1);
3502    }
3503  }
3504
3505  @InterfaceAudience.Private
3506  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3507    return compactedFileDischarger;
3508  }
3509
3510  /**
3511   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3512   * @return pause time
3513   */
3514  @InterfaceAudience.Private
3515  public long getRetryPauseTime() {
3516    return this.retryPauseTime;
3517  }
3518
3519  @Override
3520  public Optional<ServerName> getActiveMaster() {
3521    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3522  }
3523
3524  @Override
3525  public List<ServerName> getBackupMasters() {
3526    return masterAddressTracker.getBackupMasters();
3527  }
3528
3529  @Override
3530  public Iterator<ServerName> getBootstrapNodes() {
3531    return bootstrapNodeManager.getBootstrapNodes().iterator();
3532  }
3533
3534  @Override
3535  public List<HRegionLocation> getMetaLocations() {
3536    return metaRegionLocationCache.getMetaRegionLocations();
3537  }
3538
3539  @Override
3540  protected NamedQueueRecorder createNamedQueueRecord() {
3541    final boolean isOnlineLogProviderEnabled = conf.getBoolean(
3542      HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
3543    if (isOnlineLogProviderEnabled) {
3544      return NamedQueueRecorder.getInstance(conf);
3545    } else {
3546      return null;
3547    }
3548  }
3549
3550  @Override
3551  protected boolean clusterMode() {
3552    // this method will be called in the constructor of super class, so we can not return masterless
3553    // directly here, as it will always be false.
3554    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3555  }
3556
3557  @InterfaceAudience.Private
3558  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
3559    return brokenStoreFileCleaner;
3560  }
3561
3562  @InterfaceAudience.Private
3563  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
3564    return rsMobFileCleanerChore;
3565  }
3566
3567  RSSnapshotVerifier getRsSnapshotVerifier() {
3568    return rsSnapshotVerifier;
3569  }
3570
3571  @Override
3572  protected void stopChores() {
3573    shutdownChore(nonceManagerChore);
3574    shutdownChore(compactionChecker);
3575    shutdownChore(compactedFileDischarger);
3576    shutdownChore(periodicFlusher);
3577    shutdownChore(healthCheckChore);
3578    shutdownChore(executorStatusChore);
3579    shutdownChore(storefileRefresher);
3580    shutdownChore(fsUtilizationChore);
3581    shutdownChore(slowLogTableOpsChore);
3582    shutdownChore(brokenStoreFileCleaner);
3583    shutdownChore(rsMobFileCleanerChore);
3584  }
3585
3586  @Override
3587  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3588    return regionReplicationBufferManager;
3589  }
3590}