001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
024import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
025
026import java.io.IOException;
027import java.io.PrintWriter;
028import java.lang.management.MemoryUsage;
029import java.lang.reflect.Constructor;
030import java.net.InetSocketAddress;
031import java.time.Duration;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.Comparator;
036import java.util.HashSet;
037import java.util.Iterator;
038import java.util.List;
039import java.util.Map;
040import java.util.Map.Entry;
041import java.util.Objects;
042import java.util.Optional;
043import java.util.Set;
044import java.util.SortedMap;
045import java.util.Timer;
046import java.util.TimerTask;
047import java.util.TreeMap;
048import java.util.TreeSet;
049import java.util.concurrent.ConcurrentHashMap;
050import java.util.concurrent.ConcurrentMap;
051import java.util.concurrent.ConcurrentSkipListMap;
052import java.util.concurrent.TimeUnit;
053import java.util.concurrent.atomic.AtomicBoolean;
054import java.util.concurrent.locks.ReentrantReadWriteLock;
055import java.util.stream.Collectors;
056import javax.management.MalformedObjectNameException;
057import javax.servlet.http.HttpServlet;
058import org.apache.commons.lang3.RandomUtils;
059import org.apache.commons.lang3.StringUtils;
060import org.apache.hadoop.conf.Configuration;
061import org.apache.hadoop.fs.FileSystem;
062import org.apache.hadoop.fs.Path;
063import org.apache.hadoop.hbase.Abortable;
064import org.apache.hadoop.hbase.CacheEvictionStats;
065import org.apache.hadoop.hbase.CallQueueTooBigException;
066import org.apache.hadoop.hbase.ClockOutOfSyncException;
067import org.apache.hadoop.hbase.DoNotRetryIOException;
068import org.apache.hadoop.hbase.ExecutorStatusChore;
069import org.apache.hadoop.hbase.HBaseConfiguration;
070import org.apache.hadoop.hbase.HBaseInterfaceAudience;
071import org.apache.hadoop.hbase.HBaseServerBase;
072import org.apache.hadoop.hbase.HConstants;
073import org.apache.hadoop.hbase.HDFSBlocksDistribution;
074import org.apache.hadoop.hbase.HRegionLocation;
075import org.apache.hadoop.hbase.HealthCheckChore;
076import org.apache.hadoop.hbase.MetaTableAccessor;
077import org.apache.hadoop.hbase.NotServingRegionException;
078import org.apache.hadoop.hbase.PleaseHoldException;
079import org.apache.hadoop.hbase.ScheduledChore;
080import org.apache.hadoop.hbase.ServerName;
081import org.apache.hadoop.hbase.Stoppable;
082import org.apache.hadoop.hbase.TableName;
083import org.apache.hadoop.hbase.YouAreDeadException;
084import org.apache.hadoop.hbase.ZNodeClearer;
085import org.apache.hadoop.hbase.client.ConnectionUtils;
086import org.apache.hadoop.hbase.client.RegionInfo;
087import org.apache.hadoop.hbase.client.RegionInfoBuilder;
088import org.apache.hadoop.hbase.client.locking.EntityLock;
089import org.apache.hadoop.hbase.client.locking.LockServiceClient;
090import org.apache.hadoop.hbase.conf.ConfigurationManager;
091import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
092import org.apache.hadoop.hbase.exceptions.RegionMovedException;
093import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
094import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
095import org.apache.hadoop.hbase.executor.ExecutorType;
096import org.apache.hadoop.hbase.http.InfoServer;
097import org.apache.hadoop.hbase.io.hfile.BlockCache;
098import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
099import org.apache.hadoop.hbase.io.hfile.HFile;
100import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
101import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
102import org.apache.hadoop.hbase.ipc.RpcClient;
103import org.apache.hadoop.hbase.ipc.RpcServer;
104import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
105import org.apache.hadoop.hbase.ipc.ServerRpcController;
106import org.apache.hadoop.hbase.log.HBaseMarkers;
107import org.apache.hadoop.hbase.mob.MobFileCache;
108import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
109import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
110import org.apache.hadoop.hbase.net.Address;
111import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
112import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
113import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
114import org.apache.hadoop.hbase.quotas.QuotaUtil;
115import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
116import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
117import org.apache.hadoop.hbase.quotas.RegionSize;
118import org.apache.hadoop.hbase.quotas.RegionSizeStore;
119import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
120import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
121import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
122import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
123import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
124import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
125import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
126import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
127import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
128import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
129import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
130import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
131import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
132import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
133import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
134import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
135import org.apache.hadoop.hbase.security.SecurityConstants;
136import org.apache.hadoop.hbase.security.Superusers;
137import org.apache.hadoop.hbase.security.User;
138import org.apache.hadoop.hbase.security.UserProvider;
139import org.apache.hadoop.hbase.util.Bytes;
140import org.apache.hadoop.hbase.util.CompressionTest;
141import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
142import org.apache.hadoop.hbase.util.FSUtils;
143import org.apache.hadoop.hbase.util.FutureUtils;
144import org.apache.hadoop.hbase.util.JvmPauseMonitor;
145import org.apache.hadoop.hbase.util.Pair;
146import org.apache.hadoop.hbase.util.RetryCounter;
147import org.apache.hadoop.hbase.util.RetryCounterFactory;
148import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
149import org.apache.hadoop.hbase.util.Threads;
150import org.apache.hadoop.hbase.util.VersionInfo;
151import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
152import org.apache.hadoop.hbase.wal.WAL;
153import org.apache.hadoop.hbase.wal.WALFactory;
154import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
155import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
156import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
157import org.apache.hadoop.hbase.zookeeper.ZKUtil;
158import org.apache.hadoop.ipc.RemoteException;
159import org.apache.hadoop.util.ReflectionUtils;
160import org.apache.yetus.audience.InterfaceAudience;
161import org.apache.zookeeper.KeeperException;
162import org.slf4j.Logger;
163import org.slf4j.LoggerFactory;
164
165import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
166import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
167import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
168import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
169import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
170import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
171import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
172import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
173import org.apache.hbase.thirdparty.com.google.protobuf.Message;
174import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
175import org.apache.hbase.thirdparty.com.google.protobuf.Service;
176import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
177import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
178import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
179
180import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
181import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
211
212/**
213 * HRegionServer makes a set of HRegions available to clients. It checks in with
214 * the HMaster. There are many HRegionServers in a single HBase deployment.
215 */
216@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
217@SuppressWarnings({ "deprecation"})
218public class HRegionServer extends HBaseServerBase<RSRpcServices>
219  implements RegionServerServices, LastSequenceId {
220
221  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
222
223  /**
224   * For testing only!  Set to true to skip notifying region assignment to master .
225   */
226  @InterfaceAudience.Private
227  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
228  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
229
230  /**
231   * A map from RegionName to current action in progress. Boolean value indicates:
232   * true - if open region action in progress
233   * false - if close region action in progress
234   */
235  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
236    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
237
238  /**
239   * Used to cache the open/close region procedures which already submitted.
240   * See {@link #submitRegionProcedure(long)}.
241   */
242  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
243  /**
244   * Used to cache the open/close region procedures which already executed.
245   * See {@link #submitRegionProcedure(long)}.
246   */
247  private final Cache<Long, Long> executedRegionProcedures =
248      CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
249
250  /**
251   * Used to cache the moved-out regions
252   */
253  private final Cache<String, MovedRegionInfo> movedRegionInfoCache =
254      CacheBuilder.newBuilder().expireAfterWrite(movedRegionCacheExpiredTime(),
255        TimeUnit.MILLISECONDS).build();
256
257  private MemStoreFlusher cacheFlusher;
258
259  private HeapMemoryManager hMemManager;
260
261  // Replication services. If no replication, this handler will be null.
262  private ReplicationSourceService replicationSourceHandler;
263  private ReplicationSinkService replicationSinkHandler;
264  private boolean sameReplicationSourceAndSink;
265
266  // Compactions
267  private CompactSplit compactSplitThread;
268
269  /**
270   * Map of regions currently being served by this region server. Key is the
271   * encoded region name.  All access should be synchronized.
272   */
273  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
274  /**
275   * Lock for gating access to {@link #onlineRegions}.
276   * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap?
277   */
278  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
279
280  /**
281   * Map of encoded region names to the DataNode locations they should be hosted on
282   * We store the value as Address since InetSocketAddress is required by the HDFS
283   * API (create() that takes favored nodes as hints for placing file blocks).
284   * We could have used ServerName here as the value class, but we'd need to
285   * convert it to InetSocketAddress at some point before the HDFS API call, and
286   * it seems a bit weird to store ServerName since ServerName refers to RegionServers
287   * and here we really mean DataNode locations. We don't store it as InetSocketAddress
288   * here because the conversion on demand from Address to InetSocketAddress will
289   * guarantee the resolution results will be fresh when we need it.
290   */
291  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
292
293  private LeaseManager leaseManager;
294
295  private volatile boolean dataFsOk;
296
297  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
298  // Default abort timeout is 1200 seconds for safe
299  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
300  // Will run this task when abort timeout
301  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
302
303  // A state before we go into stopped state.  At this stage we're closing user
304  // space regions.
305  private boolean stopping = false;
306  private volatile boolean killed = false;
307
308  private final int threadWakeFrequency;
309
310  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
311  private final int compactionCheckFrequency;
312  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
313  private final int flushCheckFrequency;
314
315  // Stub to do region server status calls against the master.
316  private volatile RegionServerStatusService.BlockingInterface rssStub;
317  private volatile LockService.BlockingInterface lockStub;
318  // RPC client. Used to make the stub above that does region server status checking.
319  private RpcClient rpcClient;
320
321  private UncaughtExceptionHandler uncaughtExceptionHandler;
322
323  private JvmPauseMonitor pauseMonitor;
324
325  /** region server process name */
326  public static final String REGIONSERVER = "regionserver";
327
328
329  private MetricsRegionServer metricsRegionServer;
330  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
331
332  /**
333   * Check for compactions requests.
334   */
335  private ScheduledChore compactionChecker;
336
337  /**
338   * Check for flushes
339   */
340  private ScheduledChore periodicFlusher;
341
342  private volatile WALFactory walFactory;
343
344  private LogRoller walRoller;
345
346  // A thread which calls reportProcedureDone
347  private RemoteProcedureResultReporter procedureResultReporter;
348
349  // flag set after we're done setting up server threads
350  final AtomicBoolean online = new AtomicBoolean(false);
351
352  // master address tracker
353  private final MasterAddressTracker masterAddressTracker;
354
355  // Log Splitting Worker
356  private SplitLogWorker splitLogWorker;
357
358  private final int shortOperationTimeout;
359
360  // Time to pause if master says 'please hold'
361  private final long retryPauseTime;
362
363  private final RegionServerAccounting regionServerAccounting;
364
365  private SlowLogTableOpsChore slowLogTableOpsChore = null;
366
367  // Block cache
368  private BlockCache blockCache;
369  // The cache for mob files
370  private MobFileCache mobFileCache;
371
372  /** The health check chore. */
373  private HealthCheckChore healthCheckChore;
374
375  /** The Executor status collect chore. */
376  private ExecutorStatusChore executorStatusChore;
377
378  /** The nonce manager chore. */
379  private ScheduledChore nonceManagerChore;
380
381  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
382
383  /**
384   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
385   * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
386   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
387   */
388  @Deprecated
389  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
390  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
391    "hbase.regionserver.hostname.disable.master.reversedns";
392
393  /**
394   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
395   * Exception will be thrown if both are used.
396   */
397  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
398  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
399    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
400
401  /**
402   * Unique identifier for the cluster we are a part of.
403   */
404  private String clusterId;
405
406  // chore for refreshing store files for secondary regions
407  private StorefileRefresherChore storefileRefresher;
408
409  private RegionServerCoprocessorHost rsHost;
410
411  private RegionServerProcedureManagerHost rspmHost;
412
413  private RegionServerRpcQuotaManager rsQuotaManager;
414  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
415
416  /**
417   * Nonce manager. Nonces are used to make operations like increment and append idempotent
418   * in the case where client doesn't receive the response from a successful operation and
419   * retries. We track the successful ops for some time via a nonce sent by client and handle
420   * duplicate operations (currently, by failing them; in future we might use MVCC to return
421   * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
422   * HBASE-3787) are:
423   * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
424   *   of past records. If we don't read the records, we don't read and recover the nonces.
425   *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
426   * - There's no WAL recovery during normal region move, so nonces will not be transfered.
427   * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
428   * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
429   * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
430   * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
431   * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
432   * latest nonce in it expired. It can also be recovered during move.
433   */
434  final ServerNonceManager nonceManager;
435
436  private BrokenStoreFileCleaner brokenStoreFileCleaner;
437
438  @InterfaceAudience.Private
439  CompactedHFilesDischarger compactedFileDischarger;
440
441  private volatile ThroughputController flushThroughputController;
442
443  private SecureBulkLoadManager secureBulkLoadManager;
444
445  private FileSystemUtilizationChore fsUtilizationChore;
446
447  private BootstrapNodeManager bootstrapNodeManager;
448
449  /**
450   * True if this RegionServer is coming up in a cluster where there is no Master;
451   * means it needs to just come up and make do without a Master to talk to: e.g. in test or
452   * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
453   * purpose is as a Replication-stream sink; see HBASE-18846 for more.
454   * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ?
455   */
456  private final boolean masterless;
457  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
458
459  /**regionserver codec list **/
460  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
461
462  // A timer to shutdown the process if abort takes too long
463  private Timer abortMonitor;
464
465  private RegionReplicationBufferManager regionReplicationBufferManager;
466  /**
467   * Starts a HRegionServer at the default location.
468   * <p/>
469   * Don't start any services or managers in here in the Constructor.
470   * Defer till after we register with the Master as much as possible. See {@link #startServices}.
471   */
472  public HRegionServer(final Configuration conf) throws IOException {
473    super(conf, "RegionServer");  // thread name
474    try {
475      this.dataFsOk = true;
476      this.masterless = !clusterMode();
477      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
478      HFile.checkHFileVersion(this.conf);
479      checkCodecs(this.conf);
480      FSUtils.setupShortCircuitRead(this.conf);
481
482      // Disable usage of meta replicas in the regionserver
483      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
484      // Config'ed params
485      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
486      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
487      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
488
489      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
490      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
491
492      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
493          HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
494
495      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
496        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
497
498      regionServerAccounting = new RegionServerAccounting(conf);
499
500      blockCache = BlockCacheFactory.createBlockCache(conf);
501      mobFileCache = new MobFileCache(conf);
502
503      uncaughtExceptionHandler =
504        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
505
506      // If no master in cluster, skip trying to track one or look for a cluster status.
507      if (!this.masterless) {
508        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
509        masterAddressTracker.start();
510      } else {
511        masterAddressTracker = null;
512      }
513      this.rpcServices.start(zooKeeper);
514    } catch (Throwable t) {
515      // Make sure we log the exception. HRegionServer is often started via reflection and the
516      // cause of failed startup is lost.
517      LOG.error("Failed construction RegionServer", t);
518      throw t;
519    }
520  }
521
522  // HMaster should override this method to load the specific config for master
523  @Override
524  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
525    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
526    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
527      if (!StringUtils.isBlank(hostname)) {
528        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " +
529          UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " +
530          UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " +
531          UNSAFE_RS_HOSTNAME_KEY + " is used";
532        throw new IOException(msg);
533      } else {
534        return rpcServices.getSocketAddress().getHostName();
535      }
536    } else {
537      return hostname;
538    }
539  }
540
541  @Override
542  protected void login(UserProvider user, String host) throws IOException {
543    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
544      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
545  }
546
547  @Override
548  protected String getProcessName() {
549    return REGIONSERVER;
550  }
551
552  @Override
553  protected boolean canCreateBaseZNode() {
554    return !clusterMode();
555  }
556
557  @Override
558  protected boolean canUpdateTableDescriptor() {
559    return false;
560  }
561
562  @Override
563  protected boolean cacheTableDescriptor() {
564    return false;
565  }
566
567  protected RSRpcServices createRpcServices() throws IOException {
568    return new RSRpcServices(this);
569  }
570
571  @Override
572  protected void configureInfoServer(InfoServer infoServer) {
573    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
574    infoServer.setAttribute(REGIONSERVER, this);
575  }
576
577  @Override
578  protected Class<? extends HttpServlet> getDumpServlet() {
579    return RSDumpServlet.class;
580  }
581
582  /**
583   * Used by {@link RSDumpServlet} to generate debugging information.
584   */
585  public void dumpRowLocks(final PrintWriter out) {
586    StringBuilder sb = new StringBuilder();
587    for (HRegion region : getRegions()) {
588      if (region.getLockedRows().size() > 0) {
589        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
590          sb.setLength(0);
591          sb.append(region.getTableDescriptor().getTableName()).append(",")
592            .append(region.getRegionInfo().getEncodedName()).append(",");
593          sb.append(rowLockContext.toString());
594          out.println(sb);
595        }
596      }
597    }
598  }
599
600  @Override
601  public boolean registerService(Service instance) {
602    // No stacking of instances is allowed for a single executorService name
603    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
604    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
605    if (coprocessorServiceHandlers.containsKey(serviceName)) {
606      LOG.error("Coprocessor executorService " + serviceName +
607        " already registered, rejecting request from " + instance);
608      return false;
609    }
610
611    coprocessorServiceHandlers.put(serviceName, instance);
612    if (LOG.isDebugEnabled()) {
613      LOG.debug(
614        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
615    }
616    return true;
617  }
618
619  /**
620   * Run test on configured codecs to make sure supporting libs are in place.
621   */
622  private static void checkCodecs(final Configuration c) throws IOException {
623    // check to see if the codec list is available:
624    String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null);
625    if (codecs == null) {
626      return;
627    }
628    for (String codec : codecs) {
629      if (!CompressionTest.testCompression(codec)) {
630        throw new IOException("Compression codec " + codec +
631          " not supported, aborting RS construction");
632      }
633    }
634  }
635
636  public String getClusterId() {
637    return this.clusterId;
638  }
639
640  /**
641   * All initialization needed before we go register with Master.<br>
642   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
643   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
644   */
645  private void preRegistrationInitialization() {
646    try {
647      initializeZooKeeper();
648      setupClusterConnection();
649      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
650      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
651      // Setup RPC client for master communication
652      this.rpcClient = asyncClusterConnection.getRpcClient();
653    } catch (Throwable t) {
654      // Call stop if error or process will stick around for ever since server
655      // puts up non-daemon threads.
656      this.rpcServices.stop();
657      abort("Initialization of RS failed.  Hence aborting RS.", t);
658    }
659  }
660
661  /**
662   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
663   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
664   * <p>
665   * Finally open long-living server short-circuit connection.
666   */
667  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
668    justification="cluster Id znode read would give us correct response")
669  private void initializeZooKeeper() throws IOException, InterruptedException {
670    // Nothing to do in here if no Master in the mix.
671    if (this.masterless) {
672      return;
673    }
674
675    // Create the master address tracker, register with zk, and start it.  Then
676    // block until a master is available.  No point in starting up if no master
677    // running.
678    blockAndCheckIfStopped(this.masterAddressTracker);
679
680    // Wait on cluster being up.  Master will set this flag up in zookeeper
681    // when ready.
682    blockAndCheckIfStopped(this.clusterStatusTracker);
683
684    // If we are HMaster then the cluster id should have already been set.
685    if (clusterId == null) {
686      // Retrieve clusterId
687      // Since cluster status is now up
688      // ID should have already been set by HMaster
689      try {
690        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
691        if (clusterId == null) {
692          this.abort("Cluster ID has not been set");
693        }
694        LOG.info("ClusterId : " + clusterId);
695      } catch (KeeperException e) {
696        this.abort("Failed to retrieve Cluster ID", e);
697      }
698    }
699
700    if (isStopped() || isAborted()) {
701      return; // No need for further initialization
702    }
703
704    // watch for snapshots and other procedures
705    try {
706      rspmHost = new RegionServerProcedureManagerHost();
707      rspmHost.loadProcedures(conf);
708      rspmHost.initialize(this);
709    } catch (KeeperException e) {
710      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
711    }
712  }
713
714  /**
715   * Utilty method to wait indefinitely on a znode availability while checking
716   * if the region server is shut down
717   * @param tracker znode tracker to use
718   * @throws IOException any IO exception, plus if the RS is stopped
719   * @throws InterruptedException if the waiting thread is interrupted
720   */
721  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
722      throws IOException, InterruptedException {
723    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
724      if (this.stopped) {
725        throw new IOException("Received the shutdown message while waiting.");
726      }
727    }
728  }
729
730  /**
731   * @return True if the cluster is up.
732   */
733  @Override
734  public boolean isClusterUp() {
735    return this.masterless ||
736        (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
737  }
738
739  /**
740   * The HRegionServer sticks in this loop until closed.
741   */
742  @Override
743  public void run() {
744    if (isStopped()) {
745      LOG.info("Skipping run; stopped");
746      return;
747    }
748    try {
749      // Do pre-registration initializations; zookeeper, lease threads, etc.
750      preRegistrationInitialization();
751    } catch (Throwable e) {
752      abort("Fatal exception during initialization", e);
753    }
754
755    try {
756      if (!isStopped() && !isAborted()) {
757        ShutdownHook.install(conf, dataFs, this, Thread.currentThread());
758        // Initialize the RegionServerCoprocessorHost now that our ephemeral
759        // node was created, in case any coprocessors want to use ZooKeeper
760        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
761
762        // Try and register with the Master; tell it we are here.  Break if server is stopped or
763        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
764        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
765        // come up.
766        LOG.debug("About to register with Master.");
767        RetryCounterFactory rcf =
768          new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
769        RetryCounter rc = rcf.create();
770        while (keepLooping()) {
771          RegionServerStartupResponse w = reportForDuty();
772          if (w == null) {
773            long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
774            LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
775            this.sleeper.sleep(sleepTime);
776          } else {
777            handleReportForDutyResponse(w);
778            break;
779          }
780        }
781      }
782
783      if (!isStopped() && isHealthy()) {
784        // start the snapshot handler and other procedure handlers,
785        // since the server is ready to run
786        if (this.rspmHost != null) {
787          this.rspmHost.start();
788        }
789        // Start the Quota Manager
790        if (this.rsQuotaManager != null) {
791          rsQuotaManager.start(getRpcServer().getScheduler());
792        }
793        if (this.rsSpaceQuotaManager != null) {
794          this.rsSpaceQuotaManager.start();
795        }
796      }
797
798      // We registered with the Master.  Go into run mode.
799      long lastMsg = EnvironmentEdgeManager.currentTime();
800      long oldRequestCount = -1;
801      // The main run loop.
802      while (!isStopped() && isHealthy()) {
803        if (!isClusterUp()) {
804          if (onlineRegions.isEmpty()) {
805            stop("Exiting; cluster shutdown set and not carrying any regions");
806          } else if (!this.stopping) {
807            this.stopping = true;
808            LOG.info("Closing user regions");
809            closeUserRegions(isAborted());
810          } else {
811            boolean allUserRegionsOffline = areAllUserRegionsOffline();
812            if (allUserRegionsOffline) {
813              // Set stopped if no more write requests tp meta tables
814              // since last time we went around the loop.  Any open
815              // meta regions will be closed on our way out.
816              if (oldRequestCount == getWriteRequestCount()) {
817                stop("Stopped; only catalog regions remaining online");
818                break;
819              }
820              oldRequestCount = getWriteRequestCount();
821            } else {
822              // Make sure all regions have been closed -- some regions may
823              // have not got it because we were splitting at the time of
824              // the call to closeUserRegions.
825              closeUserRegions(this.abortRequested.get());
826            }
827            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
828          }
829        }
830        long now = EnvironmentEdgeManager.currentTime();
831        if ((now - lastMsg) >= msgInterval) {
832          tryRegionServerReport(lastMsg, now);
833          lastMsg = EnvironmentEdgeManager.currentTime();
834        }
835        if (!isStopped() && !isAborted()) {
836          this.sleeper.sleep();
837        }
838      } // for
839    } catch (Throwable t) {
840      if (!rpcServices.checkOOME(t)) {
841        String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
842        abort(prefix + t.getMessage(), t);
843      }
844    }
845
846    if (this.leaseManager != null) {
847      this.leaseManager.closeAfterLeasesExpire();
848    }
849    if (this.splitLogWorker != null) {
850      splitLogWorker.stop();
851    }
852    stopInfoServer();
853    // Send cache a shutdown.
854    if (blockCache != null) {
855      blockCache.shutdown();
856    }
857    if (mobFileCache != null) {
858      mobFileCache.shutdown();
859    }
860
861    // Send interrupts to wake up threads if sleeping so they notice shutdown.
862    // TODO: Should we check they are alive? If OOME could have exited already
863    if (this.hMemManager != null) {
864      this.hMemManager.stop();
865    }
866    if (this.cacheFlusher != null) {
867      this.cacheFlusher.interruptIfNecessary();
868    }
869    if (this.compactSplitThread != null) {
870      this.compactSplitThread.interruptIfNecessary();
871    }
872
873    // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
874    if (rspmHost != null) {
875      rspmHost.stop(this.abortRequested.get() || this.killed);
876    }
877
878    if (this.killed) {
879      // Just skip out w/o closing regions.  Used when testing.
880    } else if (abortRequested.get()) {
881      if (this.dataFsOk) {
882        closeUserRegions(abortRequested.get()); // Don't leave any open file handles
883      }
884      LOG.info("aborting server " + this.serverName);
885    } else {
886      closeUserRegions(abortRequested.get());
887      LOG.info("stopping server " + this.serverName);
888    }
889    regionReplicationBufferManager.stop();
890    closeClusterConnection();
891    // Closing the compactSplit thread before closing meta regions
892    if (!this.killed && containsMetaTableRegions()) {
893      if (!abortRequested.get() || this.dataFsOk) {
894        if (this.compactSplitThread != null) {
895          this.compactSplitThread.join();
896          this.compactSplitThread = null;
897        }
898        closeMetaTableRegions(abortRequested.get());
899      }
900    }
901
902    if (!this.killed && this.dataFsOk) {
903      waitOnAllRegionsToClose(abortRequested.get());
904      LOG.info("stopping server " + this.serverName + "; all regions closed.");
905    }
906
907    // Stop the quota manager
908    if (rsQuotaManager != null) {
909      rsQuotaManager.stop();
910    }
911    if (rsSpaceQuotaManager != null) {
912      rsSpaceQuotaManager.stop();
913      rsSpaceQuotaManager = null;
914    }
915
916    // flag may be changed when closing regions throws exception.
917    if (this.dataFsOk) {
918      shutdownWAL(!abortRequested.get());
919    }
920
921    // Make sure the proxy is down.
922    if (this.rssStub != null) {
923      this.rssStub = null;
924    }
925    if (this.lockStub != null) {
926      this.lockStub = null;
927    }
928    if (this.rpcClient != null) {
929      this.rpcClient.close();
930    }
931    if (this.leaseManager != null) {
932      this.leaseManager.close();
933    }
934    if (this.pauseMonitor != null) {
935      this.pauseMonitor.stop();
936    }
937
938    if (!killed) {
939      stopServiceThreads();
940    }
941
942    if (this.rpcServices != null) {
943      this.rpcServices.stop();
944    }
945
946    try {
947      deleteMyEphemeralNode();
948    } catch (KeeperException.NoNodeException nn) {
949      // pass
950    } catch (KeeperException e) {
951      LOG.warn("Failed deleting my ephemeral node", e);
952    }
953    // We may have failed to delete the znode at the previous step, but
954    //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
955    ZNodeClearer.deleteMyEphemeralNodeOnDisk();
956
957    closeZooKeeper();
958    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
959  }
960
961  private boolean containsMetaTableRegions() {
962    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
963  }
964
965  private boolean areAllUserRegionsOffline() {
966    if (getNumberOfOnlineRegions() > 2) {
967      return false;
968    }
969    boolean allUserRegionsOffline = true;
970    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
971      if (!e.getValue().getRegionInfo().isMetaRegion()) {
972        allUserRegionsOffline = false;
973        break;
974      }
975    }
976    return allUserRegionsOffline;
977  }
978
979  /**
980   * @return Current write count for all online regions.
981   */
982  private long getWriteRequestCount() {
983    long writeCount = 0;
984    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
985      writeCount += e.getValue().getWriteRequestsCount();
986    }
987    return writeCount;
988  }
989
990  @InterfaceAudience.Private
991  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
992      throws IOException {
993    RegionServerStatusService.BlockingInterface rss = rssStub;
994    if (rss == null) {
995      // the current server could be stopping.
996      return;
997    }
998    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
999    try {
1000      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1001      request.setServer(ProtobufUtil.toServerName(this.serverName));
1002      request.setLoad(sl);
1003      rss.regionServerReport(null, request.build());
1004    } catch (ServiceException se) {
1005      IOException ioe = ProtobufUtil.getRemoteException(se);
1006      if (ioe instanceof YouAreDeadException) {
1007        // This will be caught and handled as a fatal error in run()
1008        throw ioe;
1009      }
1010      if (rssStub == rss) {
1011        rssStub = null;
1012      }
1013      // Couldn't connect to the master, get location from zk and reconnect
1014      // Method blocks until new master is found or we are stopped
1015      createRegionServerStatusStub(true);
1016    }
1017  }
1018
1019  /**
1020   * Reports the given map of Regions and their size on the filesystem to the active Master.
1021   *
1022   * @param regionSizeStore The store containing region sizes
1023   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1024   */
1025  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1026    RegionServerStatusService.BlockingInterface rss = rssStub;
1027    if (rss == null) {
1028      // the current server could be stopping.
1029      LOG.trace("Skipping Region size report to HMaster as stub is null");
1030      return true;
1031    }
1032    try {
1033      buildReportAndSend(rss, regionSizeStore);
1034    } catch (ServiceException se) {
1035      IOException ioe = ProtobufUtil.getRemoteException(se);
1036      if (ioe instanceof PleaseHoldException) {
1037        LOG.trace("Failed to report region sizes to Master because it is initializing."
1038            + " This will be retried.", ioe);
1039        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1040        return true;
1041      }
1042      if (rssStub == rss) {
1043        rssStub = null;
1044      }
1045      createRegionServerStatusStub(true);
1046      if (ioe instanceof DoNotRetryIOException) {
1047        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1048        if (doNotRetryEx.getCause() != null) {
1049          Throwable t = doNotRetryEx.getCause();
1050          if (t instanceof UnsupportedOperationException) {
1051            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1052            return false;
1053          }
1054        }
1055      }
1056      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1057    }
1058    return true;
1059  }
1060
1061  /**
1062   * Builds the region size report and sends it to the master. Upon successful sending of the
1063   * report, the region sizes that were sent are marked as sent.
1064   *
1065   * @param rss The stub to send to the Master
1066   * @param regionSizeStore The store containing region sizes
1067   */
1068  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1069      RegionSizeStore regionSizeStore) throws ServiceException {
1070    RegionSpaceUseReportRequest request =
1071        buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1072    rss.reportRegionSpaceUse(null, request);
1073    // Record the number of size reports sent
1074    if (metricsRegionServer != null) {
1075      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1076    }
1077  }
1078
1079  /**
1080   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1081   *
1082   * @param regionSizes The size in bytes of regions
1083   * @return The corresponding protocol buffer message.
1084   */
1085  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1086    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1087    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1088      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1089    }
1090    return request.build();
1091  }
1092
1093  /**
1094   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1095   * protobuf message.
1096   *
1097   * @param regionInfo The RegionInfo
1098   * @param sizeInBytes The size in bytes of the Region
1099   * @return The protocol buffer
1100   */
1101  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1102    return RegionSpaceUse.newBuilder()
1103        .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1104        .setRegionSize(Objects.requireNonNull(sizeInBytes))
1105        .build();
1106  }
1107
1108  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1109      throws IOException {
1110    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1111    // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1112    // the wrapper to compute those numbers in one place.
1113    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1114    // Instead they should be stored in an HBase table so that external visibility into HBase is
1115    // improved; Additionally the load balancer will be able to take advantage of a more complete
1116    // history.
1117    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1118    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1119    long usedMemory = -1L;
1120    long maxMemory = -1L;
1121    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1122    if (usage != null) {
1123      usedMemory = usage.getUsed();
1124      maxMemory = usage.getMax();
1125    }
1126
1127    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1128    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1129    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1130    serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1131    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1132    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1133    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1134    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1135    Builder coprocessorBuilder = Coprocessor.newBuilder();
1136    for (String coprocessor : coprocessors) {
1137      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1138    }
1139    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1140    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1141    for (HRegion region : regions) {
1142      if (region.getCoprocessorHost() != null) {
1143        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1144        for (String regionCoprocessor : regionCoprocessors) {
1145          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1146        }
1147      }
1148      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1149      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1150          .getCoprocessors()) {
1151        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1152      }
1153    }
1154    serverLoad.setReportStartTime(reportStartTime);
1155    serverLoad.setReportEndTime(reportEndTime);
1156    if (this.infoServer != null) {
1157      serverLoad.setInfoServerPort(this.infoServer.getPort());
1158    } else {
1159      serverLoad.setInfoServerPort(-1);
1160    }
1161    MetricsUserAggregateSource userSource =
1162        metricsRegionServer.getMetricsUserAggregate().getSource();
1163    if (userSource != null) {
1164      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1165      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1166        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1167      }
1168    }
1169
1170    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1171      // always refresh first to get the latest value
1172      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1173      if (rLoad != null) {
1174        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1175        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1176          .getReplicationLoadSourceEntries()) {
1177          serverLoad.addReplLoadSource(rLS);
1178        }
1179      }
1180    } else {
1181      if (replicationSourceHandler != null) {
1182        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1183        if (rLoad != null) {
1184          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1185            .getReplicationLoadSourceEntries()) {
1186            serverLoad.addReplLoadSource(rLS);
1187          }
1188        }
1189      }
1190      if (replicationSinkHandler != null) {
1191        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1192        if (rLoad != null) {
1193          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1194        }
1195      }
1196    }
1197
1198    return serverLoad.build();
1199  }
1200
1201  private String getOnlineRegionsAsPrintableString() {
1202    StringBuilder sb = new StringBuilder();
1203    for (Region r: this.onlineRegions.values()) {
1204      if (sb.length() > 0) {
1205        sb.append(", ");
1206      }
1207      sb.append(r.getRegionInfo().getEncodedName());
1208    }
1209    return sb.toString();
1210  }
1211
1212  /**
1213   * Wait on regions close.
1214   */
1215  private void waitOnAllRegionsToClose(final boolean abort) {
1216    // Wait till all regions are closed before going out.
1217    int lastCount = -1;
1218    long previousLogTime = 0;
1219    Set<String> closedRegions = new HashSet<>();
1220    boolean interrupted = false;
1221    try {
1222      while (!onlineRegions.isEmpty()) {
1223        int count = getNumberOfOnlineRegions();
1224        // Only print a message if the count of regions has changed.
1225        if (count != lastCount) {
1226          // Log every second at most
1227          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1228            previousLogTime = EnvironmentEdgeManager.currentTime();
1229            lastCount = count;
1230            LOG.info("Waiting on " + count + " regions to close");
1231            // Only print out regions still closing if a small number else will
1232            // swamp the log.
1233            if (count < 10 && LOG.isDebugEnabled()) {
1234              LOG.debug("Online Regions=" + this.onlineRegions);
1235            }
1236          }
1237        }
1238        // Ensure all user regions have been sent a close. Use this to
1239        // protect against the case where an open comes in after we start the
1240        // iterator of onlineRegions to close all user regions.
1241        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1242          RegionInfo hri = e.getValue().getRegionInfo();
1243          if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) &&
1244              !closedRegions.contains(hri.getEncodedName())) {
1245            closedRegions.add(hri.getEncodedName());
1246            // Don't update zk with this close transition; pass false.
1247            closeRegionIgnoreErrors(hri, abort);
1248          }
1249        }
1250        // No regions in RIT, we could stop waiting now.
1251        if (this.regionsInTransitionInRS.isEmpty()) {
1252          if (!onlineRegions.isEmpty()) {
1253            LOG.info("We were exiting though online regions are not empty," +
1254                " because some regions failed closing");
1255          }
1256          break;
1257        } else {
1258          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream().
1259            map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1260        }
1261        if (sleepInterrupted(200)) {
1262          interrupted = true;
1263        }
1264      }
1265    } finally {
1266      if (interrupted) {
1267        Thread.currentThread().interrupt();
1268      }
1269    }
1270  }
1271
1272  private static boolean sleepInterrupted(long millis) {
1273    boolean interrupted = false;
1274    try {
1275      Thread.sleep(millis);
1276    } catch (InterruptedException e) {
1277      LOG.warn("Interrupted while sleeping");
1278      interrupted = true;
1279    }
1280    return interrupted;
1281  }
1282
1283  private void shutdownWAL(final boolean close) {
1284    if (this.walFactory != null) {
1285      try {
1286        if (close) {
1287          walFactory.close();
1288        } else {
1289          walFactory.shutdown();
1290        }
1291      } catch (Throwable e) {
1292        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1293        LOG.error("Shutdown / close of WAL failed: " + e);
1294        LOG.debug("Shutdown / close exception details:", e);
1295      }
1296    }
1297  }
1298
1299  /**
1300   * Run init. Sets up wal and starts up all server threads.
1301   *
1302   * @param c Extra configuration.
1303   */
1304  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1305    throws IOException {
1306    try {
1307      boolean updateRootDir = false;
1308      for (NameStringPair e : c.getMapEntriesList()) {
1309        String key = e.getName();
1310        // The hostname the master sees us as.
1311        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1312          String hostnameFromMasterPOV = e.getValue();
1313          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1314            rpcServices.getSocketAddress().getPort(), this.startcode);
1315          if (!StringUtils.isBlank(useThisHostnameInstead) &&
1316            !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1317            String msg = "Master passed us a different hostname to use; was=" +
1318              this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1319            LOG.error(msg);
1320            throw new IOException(msg);
1321          }
1322          if (StringUtils.isBlank(useThisHostnameInstead) &&
1323            !hostnameFromMasterPOV.equals(rpcServices.getSocketAddress().getHostName())) {
1324            String msg = "Master passed us a different hostname to use; was=" +
1325              rpcServices.getSocketAddress().getHostName() + ", but now=" + hostnameFromMasterPOV;
1326            LOG.error(msg);
1327          }
1328          continue;
1329        }
1330
1331        String value = e.getValue();
1332        if (key.equals(HConstants.HBASE_DIR)) {
1333          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1334            updateRootDir = true;
1335          }
1336        }
1337
1338        if (LOG.isDebugEnabled()) {
1339          LOG.debug("Config from master: " + key + "=" + value);
1340        }
1341        this.conf.set(key, value);
1342      }
1343      // Set our ephemeral znode up in zookeeper now we have a name.
1344      createMyEphemeralNode();
1345
1346      if (updateRootDir) {
1347        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1348        initializeFileSystem();
1349      }
1350
1351      // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1352      // config param for task trackers, but we can piggyback off of it.
1353      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1354        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1355      }
1356
1357      // Save it in a file, this will allow to see if we crash
1358      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1359
1360      // This call sets up an initialized replication and WAL. Later we start it up.
1361      setupWALAndReplication();
1362      // Init in here rather than in constructor after thread name has been set
1363      final MetricsTable metricsTable =
1364          new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1365      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1366      this.metricsRegionServer = new MetricsRegionServer(
1367          metricsRegionServerImpl, conf, metricsTable);
1368      // Now that we have a metrics source, start the pause monitor
1369      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1370      pauseMonitor.start();
1371
1372      // There is a rare case where we do NOT want services to start. Check config.
1373      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1374        startServices();
1375      }
1376      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1377      // or make sense of it.
1378      startReplicationService();
1379
1380      // Set up ZK
1381      LOG.info("Serving as " + this.serverName + ", RpcServer on " +
1382        rpcServices.getSocketAddress() + ", sessionid=0x" +
1383        Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1384
1385      // Wake up anyone waiting for this server to online
1386      synchronized (online) {
1387        online.set(true);
1388        online.notifyAll();
1389      }
1390    } catch (Throwable e) {
1391      stop("Failed initialization");
1392      throw convertThrowableToIOE(cleanup(e, "Failed init"),
1393          "Region server startup failed");
1394    } finally {
1395      sleeper.skipSleepCycle();
1396    }
1397  }
1398
1399  private void startHeapMemoryManager() {
1400    if (this.blockCache != null) {
1401      this.hMemManager =
1402          new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1403      this.hMemManager.start(getChoreService());
1404    }
1405  }
1406
1407  private void createMyEphemeralNode() throws KeeperException {
1408    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1409    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1410    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1411    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1412    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1413  }
1414
1415  private void deleteMyEphemeralNode() throws KeeperException {
1416    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1417  }
1418
1419  @Override
1420  public RegionServerAccounting getRegionServerAccounting() {
1421    return regionServerAccounting;
1422  }
1423
1424  // Round the size with KB or MB.
1425  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1426  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1427  // HBASE-26340 for more details on why this is important.
1428  private static int roundSize(long sizeInByte, int sizeUnit) {
1429    if (sizeInByte == 0) {
1430      return 0;
1431    } else if (sizeInByte < sizeUnit) {
1432      return 1;
1433    } else {
1434      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1435    }
1436  }
1437
1438  /**
1439   * @param r Region to get RegionLoad for.
1440   * @param regionLoadBldr the RegionLoad.Builder, can be null
1441   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1442   * @return RegionLoad instance.
1443   */
1444  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1445      RegionSpecifier.Builder regionSpecifier) throws IOException {
1446    byte[] name = r.getRegionInfo().getRegionName();
1447    int stores = 0;
1448    int storefiles = 0;
1449    int storeRefCount = 0;
1450    int maxCompactedStoreFileRefCount = 0;
1451    long storeUncompressedSize = 0L;
1452    long storefileSize = 0L;
1453    long storefileIndexSize = 0L;
1454    long rootLevelIndexSize = 0L;
1455    long totalStaticIndexSize = 0L;
1456    long totalStaticBloomSize = 0L;
1457    long totalCompactingKVs = 0L;
1458    long currentCompactedKVs = 0L;
1459    List<HStore> storeList = r.getStores();
1460    stores += storeList.size();
1461    for (HStore store : storeList) {
1462      storefiles += store.getStorefilesCount();
1463      int currentStoreRefCount = store.getStoreRefCount();
1464      storeRefCount += currentStoreRefCount;
1465      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1466      maxCompactedStoreFileRefCount = Math.max(maxCompactedStoreFileRefCount,
1467        currentMaxCompactedStoreFileRefCount);
1468      storeUncompressedSize += store.getStoreSizeUncompressed();
1469      storefileSize += store.getStorefilesSize();
1470      //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1471      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1472      CompactionProgress progress = store.getCompactionProgress();
1473      if (progress != null) {
1474        totalCompactingKVs += progress.getTotalCompactingKVs();
1475        currentCompactedKVs += progress.currentCompactedKVs;
1476      }
1477      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1478      totalStaticIndexSize += store.getTotalStaticIndexSize();
1479      totalStaticBloomSize += store.getTotalStaticBloomSize();
1480    }
1481
1482    int unitMB = 1024 * 1024;
1483    int unitKB = 1024;
1484
1485    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1486    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1487    int storefileSizeMB = roundSize(storefileSize, unitMB);
1488    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1489    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1490    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1491    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1492
1493    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1494    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1495    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1496    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1497    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1498    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1499    if (regionLoadBldr == null) {
1500      regionLoadBldr = RegionLoad.newBuilder();
1501    }
1502    if (regionSpecifier == null) {
1503      regionSpecifier = RegionSpecifier.newBuilder();
1504    }
1505
1506    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1507    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1508    regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1509      .setStores(stores)
1510      .setStorefiles(storefiles)
1511      .setStoreRefCount(storeRefCount)
1512      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1513      .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1514      .setStorefileSizeMB(storefileSizeMB)
1515      .setMemStoreSizeMB(memstoreSizeMB)
1516      .setStorefileIndexSizeKB(storefileIndexSizeKB)
1517      .setRootIndexSizeKB(rootLevelIndexSizeKB)
1518      .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1519      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1520      .setReadRequestsCount(r.getReadRequestsCount())
1521      .setCpRequestsCount(r.getCpRequestsCount())
1522      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1523      .setWriteRequestsCount(r.getWriteRequestsCount())
1524      .setTotalCompactingKVs(totalCompactingKVs)
1525      .setCurrentCompactedKVs(currentCompactedKVs)
1526      .setDataLocality(dataLocality)
1527      .setDataLocalityForSsd(dataLocalityForSsd)
1528      .setBlocksLocalWeight(blocksLocalWeight)
1529      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight)
1530      .setBlocksTotalWeight(blocksTotalWeight)
1531      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1532      .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1533    r.setCompleteSequenceId(regionLoadBldr);
1534    return regionLoadBldr.build();
1535  }
1536
1537  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1538    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1539    userLoadBldr.setUserName(user);
1540    userSource.getClientMetrics().values().stream().map(
1541      clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1542            .setHostName(clientMetrics.getHostName())
1543            .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1544            .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1545            .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1546        .forEach(userLoadBldr::addClientMetrics);
1547    return userLoadBldr.build();
1548  }
1549
1550  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1551    HRegion r = onlineRegions.get(encodedRegionName);
1552    return r != null ? createRegionLoad(r, null, null) : null;
1553  }
1554
1555  /**
1556   * Inner class that runs on a long period checking if regions need compaction.
1557   */
1558  private static class CompactionChecker extends ScheduledChore {
1559    private final HRegionServer instance;
1560    private final int majorCompactPriority;
1561    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1562    //Iteration is 1-based rather than 0-based so we don't check for compaction
1563    // immediately upon region server startup
1564    private long iteration = 1;
1565
1566    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1567      super("CompactionChecker", stopper, sleepTime);
1568      this.instance = h;
1569      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1570
1571      /* MajorCompactPriority is configurable.
1572       * If not set, the compaction will use default priority.
1573       */
1574      this.majorCompactPriority = this.instance.conf.
1575          getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1576              DEFAULT_PRIORITY);
1577    }
1578
1579    @Override
1580    protected void chore() {
1581      for (Region r : this.instance.onlineRegions.values()) {
1582        // Skip compaction if region is read only
1583        if (r == null || r.isReadOnly()) {
1584          continue;
1585        }
1586
1587        HRegion hr = (HRegion) r;
1588        for (HStore s : hr.stores.values()) {
1589          try {
1590            long multiplier = s.getCompactionCheckMultiplier();
1591            assert multiplier > 0;
1592            if (iteration % multiplier != 0) {
1593              continue;
1594            }
1595            if (s.needsCompaction()) {
1596              // Queue a compaction. Will recognize if major is needed.
1597              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1598                getName() + " requests compaction");
1599            } else if (s.shouldPerformMajorCompaction()) {
1600              s.triggerMajorCompaction();
1601              if (majorCompactPriority == DEFAULT_PRIORITY ||
1602                  majorCompactPriority > hr.getCompactPriority()) {
1603                this.instance.compactSplitThread.requestCompaction(hr, s,
1604                    getName() + " requests major compaction; use default priority",
1605                    Store.NO_PRIORITY,
1606                    CompactionLifeCycleTracker.DUMMY, null);
1607              } else {
1608                this.instance.compactSplitThread.requestCompaction(hr, s,
1609                    getName() + " requests major compaction; use configured priority",
1610                    this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1611              }
1612            }
1613          } catch (IOException e) {
1614            LOG.warn("Failed major compaction check on " + r, e);
1615          }
1616        }
1617      }
1618      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1619    }
1620  }
1621
1622  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1623    private final HRegionServer server;
1624    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1625    private final static int MIN_DELAY_TIME = 0; // millisec
1626    private final long rangeOfDelayMs;
1627
1628    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1629      super("MemstoreFlusherChore", server, cacheFlushInterval);
1630      this.server = server;
1631
1632      final long configuredRangeOfDelay = server.getConfiguration().getInt(
1633          "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1634      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1635    }
1636
1637    @Override
1638    protected void chore() {
1639      final StringBuilder whyFlush = new StringBuilder();
1640      for (HRegion r : this.server.onlineRegions.values()) {
1641        if (r == null) {
1642          continue;
1643        }
1644        if (r.shouldFlush(whyFlush)) {
1645          FlushRequester requester = server.getFlushRequester();
1646          if (requester != null) {
1647            long randomDelay = RandomUtils.nextLong(0, rangeOfDelayMs) + MIN_DELAY_TIME;
1648            //Throttle the flushes by putting a delay. If we don't throttle, and there
1649            //is a balanced write-load on the regions in a table, we might end up
1650            //overwhelming the filesystem with too many flushes at once.
1651            if (requester.requestDelayedFlush(r, randomDelay)) {
1652              LOG.info("{} requesting flush of {} because {} after random delay {} ms",
1653                  getName(), r.getRegionInfo().getRegionNameAsString(),  whyFlush.toString(),
1654                  randomDelay);
1655            }
1656          }
1657        }
1658      }
1659    }
1660  }
1661
1662  /**
1663   * Report the status of the server. A server is online once all the startup is
1664   * completed (setting up filesystem, starting executorService threads, etc.). This
1665   * method is designed mostly to be useful in tests.
1666   *
1667   * @return true if online, false if not.
1668   */
1669  public boolean isOnline() {
1670    return online.get();
1671  }
1672
1673  /**
1674   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1675   * be hooked up to WAL.
1676   */
1677  private void setupWALAndReplication() throws IOException {
1678    WALFactory factory = new WALFactory(conf, serverName.toString(), this, true);
1679    // TODO Replication make assumptions here based on the default filesystem impl
1680    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1681    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1682
1683    Path logDir = new Path(walRootDir, logName);
1684    LOG.debug("logDir={}", logDir);
1685    if (this.walFs.exists(logDir)) {
1686      throw new RegionServerRunningException(
1687        "Region server has already created directory at " + this.serverName.toString());
1688    }
1689    // Always create wal directory as now we need this when master restarts to find out the live
1690    // region servers.
1691    if (!this.walFs.mkdirs(logDir)) {
1692      throw new IOException("Can not create wal directory " + logDir);
1693    }
1694    // Instantiate replication if replication enabled. Pass it the log directories.
1695    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1696    this.walFactory = factory;
1697  }
1698
1699  /**
1700   * Start up replication source and sink handlers.
1701   */
1702  private void startReplicationService() throws IOException {
1703    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1704      this.replicationSourceHandler.startReplicationService();
1705    } else {
1706      if (this.replicationSourceHandler != null) {
1707        this.replicationSourceHandler.startReplicationService();
1708      }
1709      if (this.replicationSinkHandler != null) {
1710        this.replicationSinkHandler.startReplicationService();
1711      }
1712    }
1713  }
1714
1715  /**
1716   * @return Master address tracker instance.
1717   */
1718  public MasterAddressTracker getMasterAddressTracker() {
1719    return this.masterAddressTracker;
1720  }
1721
1722  /**
1723   * Start maintenance Threads, Server, Worker and lease checker threads.
1724   * Start all threads we need to run. This is called after we've successfully
1725   * registered with the Master.
1726   * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1727   * get an unhandled exception. We cannot set the handler on all threads.
1728   * Server's internal Listener thread is off limits. For Server, if an OOME, it
1729   * waits a while then retries. Meantime, a flush or a compaction that tries to
1730   * run should trigger same critical condition and the shutdown will run. On
1731   * its way out, this server will shut down Server. Leases are sort of
1732   * inbetween. It has an internal thread that while it inherits from Chore, it
1733   * keeps its own internal stop mechanism so needs to be stopped by this
1734   * hosting server. Worker logs the exception and exits.
1735   */
1736  private void startServices() throws IOException {
1737    if (!isStopped() && !isAborted()) {
1738      initializeThreads();
1739    }
1740    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1741    this.secureBulkLoadManager.start();
1742
1743    // Health checker thread.
1744    if (isHealthCheckerConfigured()) {
1745      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1746        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1747      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1748    }
1749    // Executor status collect thread.
1750    if (this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1751        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)) {
1752      int sleepTime = this.conf.getInt(ExecutorStatusChore.WAKE_FREQ,
1753          ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1754      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1755          this.metricsRegionServer.getMetricsSource());
1756    }
1757
1758    this.walRoller = new LogRoller(this);
1759    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1760    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1761
1762    // Create the CompactedFileDischarger chore executorService. This chore helps to
1763    // remove the compacted files that will no longer be used in reads.
1764    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1765    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1766    int cleanerInterval =
1767      conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1768    this.compactedFileDischarger =
1769      new CompactedHFilesDischarger(cleanerInterval, this, this);
1770    choreService.scheduleChore(compactedFileDischarger);
1771
1772    // Start executor services
1773    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1774    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1775        ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1776    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1777    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1778        ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1779    final int openPriorityRegionThreads =
1780        conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1781    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1782        ExecutorType.RS_OPEN_PRIORITY_REGION).setCorePoolSize(openPriorityRegionThreads));
1783    final int closeRegionThreads =
1784        conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1785    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1786        ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1787    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1788    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1789        ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1790    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1791      final int storeScannerParallelSeekThreads =
1792          conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1793      executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1794          ExecutorType.RS_PARALLEL_SEEK).setCorePoolSize(storeScannerParallelSeekThreads)
1795          .setAllowCoreThreadTimeout(true));
1796    }
1797    final int logReplayOpsThreads = conf.getInt(
1798        HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1799    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1800        ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(logReplayOpsThreads)
1801        .setAllowCoreThreadTimeout(true));
1802    // Start the threads for compacted files discharger
1803    final int compactionDischargerThreads =
1804        conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1805    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1806        ExecutorType.RS_COMPACTED_FILES_DISCHARGER).setCorePoolSize(compactionDischargerThreads));
1807    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1808      final int regionReplicaFlushThreads = conf.getInt(
1809          "hbase.regionserver.region.replica.flusher.threads", conf.getInt(
1810              "hbase.regionserver.executor.openregion.threads", 3));
1811      executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1812          ExecutorType.RS_REGION_REPLICA_FLUSH_OPS).setCorePoolSize(regionReplicaFlushThreads));
1813    }
1814    final int refreshPeerThreads =
1815        conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1816    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1817        ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1818    final int replaySyncReplicationWALThreads =
1819        conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1820    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1821        ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL).setCorePoolSize(
1822            replaySyncReplicationWALThreads));
1823    final int switchRpcThrottleThreads =
1824        conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1825    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1826        ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads));
1827    final int claimReplicationQueueThreads =
1828      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1829    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1830        ExecutorType.RS_CLAIM_REPLICATION_QUEUE).setCorePoolSize(claimReplicationQueueThreads));
1831
1832    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1833        uncaughtExceptionHandler);
1834    if (this.cacheFlusher != null) {
1835      this.cacheFlusher.start(uncaughtExceptionHandler);
1836    }
1837    Threads.setDaemonThreadRunning(this.procedureResultReporter,
1838      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1839
1840    if (this.compactionChecker != null) {
1841      choreService.scheduleChore(compactionChecker);
1842    }
1843    if (this.periodicFlusher != null) {
1844      choreService.scheduleChore(periodicFlusher);
1845    }
1846    if (this.healthCheckChore != null) {
1847      choreService.scheduleChore(healthCheckChore);
1848    }
1849    if (this.executorStatusChore != null) {
1850      choreService.scheduleChore(executorStatusChore);
1851    }
1852    if (this.nonceManagerChore != null) {
1853      choreService.scheduleChore(nonceManagerChore);
1854    }
1855    if (this.storefileRefresher != null) {
1856      choreService.scheduleChore(storefileRefresher);
1857    }
1858    if (this.fsUtilizationChore != null) {
1859      choreService.scheduleChore(fsUtilizationChore);
1860    }
1861    if (this.slowLogTableOpsChore != null) {
1862      choreService.scheduleChore(slowLogTableOpsChore);
1863    }
1864    if (this.brokenStoreFileCleaner != null) {
1865      choreService.scheduleChore(brokenStoreFileCleaner);
1866    }
1867
1868    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1869    // an unhandled exception, it will just exit.
1870    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
1871        uncaughtExceptionHandler);
1872
1873    // Create the log splitting worker and start it
1874    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1875    // quite a while inside Connection layer. The worker won't be available for other
1876    // tasks even after current task is preempted after a split task times out.
1877    Configuration sinkConf = HBaseConfiguration.create(conf);
1878    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1879        conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1880    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1881        conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1882    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
1883    if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
1884      DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
1885      // SplitLogWorker needs csm. If none, don't start this.
1886      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
1887      splitLogWorker.start();
1888      LOG.debug("SplitLogWorker started");
1889    }
1890
1891    // Memstore services.
1892    startHeapMemoryManager();
1893    // Call it after starting HeapMemoryManager.
1894    initializeMemStoreChunkCreator(hMemManager);
1895  }
1896
1897  private void initializeThreads() {
1898    // Cache flushing thread.
1899    this.cacheFlusher = new MemStoreFlusher(conf, this);
1900
1901    // Compaction thread
1902    this.compactSplitThread = new CompactSplit(this);
1903
1904    // Background thread to check for compactions; needed if region has not gotten updates
1905    // in a while. It will take care of not checking too frequently on store-by-store basis.
1906    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
1907    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
1908    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
1909
1910    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
1911      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
1912    if (isSlowLogTableEnabled) {
1913      // default chore duration: 10 min
1914      final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
1915      slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
1916    }
1917
1918    if (this.nonceManager != null) {
1919      // Create the scheduled chore that cleans up nonces.
1920      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
1921    }
1922
1923    // Setup the Quota Manager
1924    rsQuotaManager = new RegionServerRpcQuotaManager(this);
1925    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
1926
1927    if (QuotaUtil.isQuotaEnabled(conf)) {
1928      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
1929    }
1930
1931
1932    boolean onlyMetaRefresh = false;
1933    int storefileRefreshPeriod = conf.getInt(
1934        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
1935        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
1936    if (storefileRefreshPeriod == 0) {
1937      storefileRefreshPeriod = conf.getInt(
1938          StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
1939          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
1940      onlyMetaRefresh = true;
1941    }
1942    if (storefileRefreshPeriod > 0) {
1943      this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
1944          onlyMetaRefresh, this, this);
1945    }
1946
1947    int brokenStoreFileCleanerPeriod  = conf.getInt(
1948      BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
1949      BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
1950    int brokenStoreFileCleanerDelay  = conf.getInt(
1951      BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
1952      BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
1953    double brokenStoreFileCleanerDelayJitter = conf.getDouble(
1954      BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
1955      BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
1956    double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
1957    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
1958    this.brokenStoreFileCleaner =
1959      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
1960        brokenStoreFileCleanerPeriod, this, conf, this);
1961
1962    registerConfigurationObservers();
1963  }
1964
1965  private void registerConfigurationObservers() {
1966    // Registering the compactSplitThread object with the ConfigurationManager.
1967    configurationManager.registerObserver(this.compactSplitThread);
1968    configurationManager.registerObserver(this.rpcServices);
1969    configurationManager.registerObserver(this);
1970  }
1971
1972  /*
1973   * Verify that server is healthy
1974   */
1975  private boolean isHealthy() {
1976    if (!dataFsOk) {
1977      // File system problem
1978      return false;
1979    }
1980    // Verify that all threads are alive
1981    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
1982        && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
1983        && (this.walRoller == null || this.walRoller.isAlive())
1984        && (this.compactionChecker == null || this.compactionChecker.isScheduled())
1985        && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
1986    if (!healthy) {
1987      stop("One or more threads are no longer alive -- stop");
1988    }
1989    return healthy;
1990  }
1991
1992  @Override
1993  public List<WAL> getWALs() {
1994    return walFactory.getWALs();
1995  }
1996
1997  @Override
1998  public WAL getWAL(RegionInfo regionInfo) throws IOException {
1999    WAL wal = walFactory.getWAL(regionInfo);
2000    if (this.walRoller != null) {
2001      this.walRoller.addWAL(wal);
2002    }
2003    return wal;
2004  }
2005
2006  public LogRoller getWalRoller() {
2007    return walRoller;
2008  }
2009
2010  WALFactory getWalFactory() {
2011    return walFactory;
2012  }
2013
2014  @Override
2015  public void stop(final String msg) {
2016    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2017  }
2018
2019  /**
2020   * Stops the regionserver.
2021   * @param msg Status message
2022   * @param force True if this is a regionserver abort
2023   * @param user The user executing the stop request, or null if no user is associated
2024   */
2025  public void stop(final String msg, final boolean force, final User user) {
2026    if (!this.stopped) {
2027      LOG.info("***** STOPPING region server '" + this + "' *****");
2028      if (this.rsHost != null) {
2029        // when forced via abort don't allow CPs to override
2030        try {
2031          this.rsHost.preStop(msg, user);
2032        } catch (IOException ioe) {
2033          if (!force) {
2034            LOG.warn("The region server did not stop", ioe);
2035            return;
2036          }
2037          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2038        }
2039      }
2040      this.stopped = true;
2041      LOG.info("STOPPED: " + msg);
2042      // Wakes run() if it is sleeping
2043      sleeper.skipSleepCycle();
2044    }
2045  }
2046
2047  public void waitForServerOnline(){
2048    while (!isStopped() && !isOnline()) {
2049      synchronized (online) {
2050        try {
2051          online.wait(msgInterval);
2052        } catch (InterruptedException ie) {
2053          Thread.currentThread().interrupt();
2054          break;
2055        }
2056      }
2057    }
2058  }
2059
2060  @Override
2061  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2062    HRegion r = context.getRegion();
2063    long openProcId = context.getOpenProcId();
2064    long masterSystemTime = context.getMasterSystemTime();
2065    rpcServices.checkOpen();
2066    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2067      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2068    // Do checks to see if we need to compact (references or too many files)
2069    // Skip compaction check if region is read only
2070    if (!r.isReadOnly()) {
2071      for (HStore s : r.stores.values()) {
2072        if (s.hasReferences() || s.needsCompaction()) {
2073          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2074        }
2075      }
2076    }
2077    long openSeqNum = r.getOpenSeqNum();
2078    if (openSeqNum == HConstants.NO_SEQNUM) {
2079      // If we opened a region, we should have read some sequence number from it.
2080      LOG.error(
2081        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2082      openSeqNum = 0;
2083    }
2084
2085    // Notify master
2086    if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2087      openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) {
2088      throw new IOException(
2089        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2090    }
2091
2092    triggerFlushInPrimaryRegion(r);
2093
2094    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2095  }
2096
2097  /**
2098   * Helper method for use in tests. Skip the region transition report when there's no master
2099   * around to receive it.
2100   */
2101  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2102    final TransitionCode code = context.getCode();
2103    final long openSeqNum = context.getOpenSeqNum();
2104    long masterSystemTime = context.getMasterSystemTime();
2105    final RegionInfo[] hris = context.getHris();
2106
2107    if (code == TransitionCode.OPENED) {
2108      Preconditions.checkArgument(hris != null && hris.length == 1);
2109      if (hris[0].isMetaRegion()) {
2110        LOG.warn(
2111          "meta table location is stored in master local store, so we can not skip reporting");
2112        return false;
2113      } else {
2114        try {
2115          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2116              serverName, openSeqNum, masterSystemTime);
2117        } catch (IOException e) {
2118          LOG.info("Failed to update meta", e);
2119          return false;
2120        }
2121      }
2122    }
2123    return true;
2124  }
2125
2126  private ReportRegionStateTransitionRequest createReportRegionStateTransitionRequest(
2127      final RegionStateTransitionContext context) {
2128    final TransitionCode code = context.getCode();
2129    final long openSeqNum = context.getOpenSeqNum();
2130    final RegionInfo[] hris = context.getHris();
2131    final long[] procIds = context.getProcIds();
2132
2133    ReportRegionStateTransitionRequest.Builder builder =
2134        ReportRegionStateTransitionRequest.newBuilder();
2135    builder.setServer(ProtobufUtil.toServerName(serverName));
2136    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2137    transition.setTransitionCode(code);
2138    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2139      transition.setOpenSeqNum(openSeqNum);
2140    }
2141    for (RegionInfo hri: hris) {
2142      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2143    }
2144    for (long procId: procIds) {
2145      transition.addProcId(procId);
2146    }
2147
2148    return builder.build();
2149  }
2150
2151  @Override
2152  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2153    if (TEST_SKIP_REPORTING_TRANSITION) {
2154      return skipReportingTransition(context);
2155    }
2156    final ReportRegionStateTransitionRequest request =
2157        createReportRegionStateTransitionRequest(context);
2158
2159    int tries = 0;
2160    long pauseTime = this.retryPauseTime;
2161    // Keep looping till we get an error. We want to send reports even though server is going down.
2162    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2163    // HRegionServer does down.
2164    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2165      RegionServerStatusService.BlockingInterface rss = rssStub;
2166      try {
2167        if (rss == null) {
2168          createRegionServerStatusStub();
2169          continue;
2170        }
2171        ReportRegionStateTransitionResponse response =
2172          rss.reportRegionStateTransition(null, request);
2173        if (response.hasErrorMessage()) {
2174          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2175          break;
2176        }
2177        // Log if we had to retry else don't log unless TRACE. We want to
2178        // know if were successful after an attempt showed in logs as failed.
2179        if (tries > 0 || LOG.isTraceEnabled()) {
2180          LOG.info("TRANSITION REPORTED " + request);
2181        }
2182        // NOTE: Return mid-method!!!
2183        return true;
2184      } catch (ServiceException se) {
2185        IOException ioe = ProtobufUtil.getRemoteException(se);
2186        boolean pause =
2187            ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException
2188                || ioe instanceof CallQueueTooBigException;
2189        if (pause) {
2190          // Do backoff else we flood the Master with requests.
2191          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2192        } else {
2193          pauseTime = this.retryPauseTime; // Reset.
2194        }
2195        LOG.info("Failed report transition " +
2196          TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" +
2197            (pause?
2198                " after " + pauseTime + "ms delay (Master is coming online...).":
2199                " immediately."),
2200            ioe);
2201        if (pause) {
2202          Threads.sleep(pauseTime);
2203        }
2204        tries++;
2205        if (rssStub == rss) {
2206          rssStub = null;
2207        }
2208      }
2209    }
2210    return false;
2211  }
2212
2213  /**
2214   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2215   * block this thread. See RegionReplicaFlushHandler for details.
2216   */
2217  private void triggerFlushInPrimaryRegion(final HRegion region) {
2218    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2219      return;
2220    }
2221    TableName tn = region.getTableDescriptor().getTableName();
2222    if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) ||
2223        !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf)) {
2224      region.setReadsEnabled(true);
2225      return;
2226    }
2227
2228    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2229    // RegionReplicaFlushHandler might reset this.
2230
2231    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2232    if (this.executorService != null) {
2233      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2234    } else {
2235      LOG.info("Executor is null; not running flush of primary region replica for {}",
2236        region.getRegionInfo());
2237    }
2238  }
2239
2240  @InterfaceAudience.Private
2241  public RSRpcServices getRSRpcServices() {
2242    return rpcServices;
2243  }
2244
2245  /**
2246   * Cause the server to exit without closing the regions it is serving, the log
2247   * it is using and without notifying the master. Used unit testing and on
2248   * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2249   *
2250   * @param reason
2251   *          the reason we are aborting
2252   * @param cause
2253   *          the exception that caused the abort, or null
2254   */
2255  @Override
2256  public void abort(String reason, Throwable cause) {
2257    if (!setAbortRequested()) {
2258      // Abort already in progress, ignore the new request.
2259      LOG.debug(
2260          "Abort already in progress. Ignoring the current request with reason: {}", reason);
2261      return;
2262    }
2263    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2264    if (cause != null) {
2265      LOG.error(HBaseMarkers.FATAL, msg, cause);
2266    } else {
2267      LOG.error(HBaseMarkers.FATAL, msg);
2268    }
2269    // HBASE-4014: show list of coprocessors that were loaded to help debug
2270    // regionserver crashes.Note that we're implicitly using
2271    // java.util.HashSet's toString() method to print the coprocessor names.
2272    LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " +
2273        CoprocessorHost.getLoadedCoprocessors());
2274    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2275    try {
2276      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2277    } catch (MalformedObjectNameException | IOException e) {
2278      LOG.warn("Failed dumping metrics", e);
2279    }
2280
2281    // Do our best to report our abort to the master, but this may not work
2282    try {
2283      if (cause != null) {
2284        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2285      }
2286      // Report to the master but only if we have already registered with the master.
2287      RegionServerStatusService.BlockingInterface rss = rssStub;
2288      if (rss != null && this.serverName != null) {
2289        ReportRSFatalErrorRequest.Builder builder =
2290          ReportRSFatalErrorRequest.newBuilder();
2291        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2292        builder.setErrorMessage(msg);
2293        rss.reportRSFatalError(null, builder.build());
2294      }
2295    } catch (Throwable t) {
2296      LOG.warn("Unable to report fatal error to master", t);
2297    }
2298
2299    scheduleAbortTimer();
2300    // shutdown should be run as the internal user
2301    stop(reason, true, null);
2302  }
2303
2304  /*
2305   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2306   * logs but it does close socket in case want to bring up server on old
2307   * hostname+port immediately.
2308   */
2309  @InterfaceAudience.Private
2310  protected void kill() {
2311    this.killed = true;
2312    abort("Simulated kill");
2313  }
2314
2315  // Limits the time spent in the shutdown process.
2316  private void scheduleAbortTimer() {
2317    if (this.abortMonitor == null) {
2318      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2319      TimerTask abortTimeoutTask = null;
2320      try {
2321        Constructor<? extends TimerTask> timerTaskCtor =
2322          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2323            .asSubclass(TimerTask.class).getDeclaredConstructor();
2324        timerTaskCtor.setAccessible(true);
2325        abortTimeoutTask = timerTaskCtor.newInstance();
2326      } catch (Exception e) {
2327        LOG.warn("Initialize abort timeout task failed", e);
2328      }
2329      if (abortTimeoutTask != null) {
2330        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2331      }
2332    }
2333  }
2334
2335  /**
2336   * Wait on all threads to finish. Presumption is that all closes and stops
2337   * have already been called.
2338   */
2339  protected void stopServiceThreads() {
2340    // clean up the scheduled chores
2341    stopChoreService();
2342    if (bootstrapNodeManager != null) {
2343      bootstrapNodeManager.stop();
2344    }
2345    if (this.cacheFlusher != null) {
2346      this.cacheFlusher.join();
2347    }
2348    if (this.walRoller != null) {
2349      this.walRoller.close();
2350    }
2351    if (this.compactSplitThread != null) {
2352      this.compactSplitThread.join();
2353    }
2354    stopExecutorService();
2355    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2356      this.replicationSourceHandler.stopReplicationService();
2357    } else {
2358      if (this.replicationSourceHandler != null) {
2359        this.replicationSourceHandler.stopReplicationService();
2360      }
2361      if (this.replicationSinkHandler != null) {
2362        this.replicationSinkHandler.stopReplicationService();
2363      }
2364    }
2365  }
2366
2367  /**
2368   * @return Return the object that implements the replication source executorService.
2369   */
2370  @Override
2371  public ReplicationSourceService getReplicationSourceService() {
2372    return replicationSourceHandler;
2373  }
2374
2375  /**
2376   * @return Return the object that implements the replication sink executorService.
2377   */
2378  public ReplicationSinkService getReplicationSinkService() {
2379    return replicationSinkHandler;
2380  }
2381
2382  /**
2383   * Get the current master from ZooKeeper and open the RPC connection to it.
2384   * To get a fresh connection, the current rssStub must be null.
2385   * Method will block until a master is available. You can break from this
2386   * block by requesting the server stop.
2387   *
2388   * @return master + port, or null if server has been stopped
2389   */
2390  private synchronized ServerName createRegionServerStatusStub() {
2391    // Create RS stub without refreshing the master node from ZK, use cached data
2392    return createRegionServerStatusStub(false);
2393  }
2394
2395  /**
2396   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2397   * connection, the current rssStub must be null. Method will block until a master is available.
2398   * You can break from this block by requesting the server stop.
2399   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2400   * @return master + port, or null if server has been stopped
2401   */
2402  @InterfaceAudience.Private
2403  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2404    if (rssStub != null) {
2405      return masterAddressTracker.getMasterAddress();
2406    }
2407    ServerName sn = null;
2408    long previousLogTime = 0;
2409    RegionServerStatusService.BlockingInterface intRssStub = null;
2410    LockService.BlockingInterface intLockStub = null;
2411    boolean interrupted = false;
2412    try {
2413      while (keepLooping()) {
2414        sn = this.masterAddressTracker.getMasterAddress(refresh);
2415        if (sn == null) {
2416          if (!keepLooping()) {
2417            // give up with no connection.
2418            LOG.debug("No master found and cluster is stopped; bailing out");
2419            return null;
2420          }
2421          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2422            LOG.debug("No master found; retry");
2423            previousLogTime = EnvironmentEdgeManager.currentTime();
2424          }
2425          refresh = true; // let's try pull it from ZK directly
2426          if (sleepInterrupted(200)) {
2427            interrupted = true;
2428          }
2429          continue;
2430        }
2431        try {
2432          BlockingRpcChannel channel =
2433            this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2434              shortOperationTimeout);
2435          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2436          intLockStub = LockService.newBlockingStub(channel);
2437          break;
2438        } catch (IOException e) {
2439          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2440            e = e instanceof RemoteException ?
2441              ((RemoteException)e).unwrapRemoteException() : e;
2442            if (e instanceof ServerNotRunningYetException) {
2443              LOG.info("Master isn't available yet, retrying");
2444            } else {
2445              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2446            }
2447            previousLogTime = EnvironmentEdgeManager.currentTime();
2448          }
2449          if (sleepInterrupted(200)) {
2450            interrupted = true;
2451          }
2452        }
2453      }
2454    } finally {
2455      if (interrupted) {
2456        Thread.currentThread().interrupt();
2457      }
2458    }
2459    this.rssStub = intRssStub;
2460    this.lockStub = intLockStub;
2461    return sn;
2462  }
2463
2464  /**
2465   * @return True if we should break loop because cluster is going down or this server has been
2466   *    stopped or hdfs has gone bad.
2467   */
2468  private boolean keepLooping() {
2469    return !this.stopped && isClusterUp();
2470  }
2471
2472  /*
2473   * Let the master know we're here Run initialization using parameters passed
2474   * us by the master.
2475   * @return A Map of key/value configurations we got from the Master else
2476   * null if we failed to register.
2477   * @throws IOException
2478   */
2479  private RegionServerStartupResponse reportForDuty() throws IOException {
2480    if (this.masterless) {
2481      return RegionServerStartupResponse.getDefaultInstance();
2482    }
2483    ServerName masterServerName = createRegionServerStatusStub(true);
2484    RegionServerStatusService.BlockingInterface rss = rssStub;
2485    if (masterServerName == null || rss == null) {
2486      return null;
2487    }
2488    RegionServerStartupResponse result = null;
2489    try {
2490      rpcServices.requestCount.reset();
2491      rpcServices.rpcGetRequestCount.reset();
2492      rpcServices.rpcScanRequestCount.reset();
2493      rpcServices.rpcFullScanRequestCount.reset();
2494      rpcServices.rpcMultiRequestCount.reset();
2495      rpcServices.rpcMutateRequestCount.reset();
2496      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2497        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2498      long now = EnvironmentEdgeManager.currentTime();
2499      int port = rpcServices.getSocketAddress().getPort();
2500      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2501      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2502        request.setUseThisHostnameInstead(useThisHostnameInstead);
2503      }
2504      request.setPort(port);
2505      request.setServerStartCode(this.startcode);
2506      request.setServerCurrentTime(now);
2507      result = rss.regionServerStartup(null, request.build());
2508    } catch (ServiceException se) {
2509      IOException ioe = ProtobufUtil.getRemoteException(se);
2510      if (ioe instanceof ClockOutOfSyncException) {
2511        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync",
2512            ioe);
2513        // Re-throw IOE will cause RS to abort
2514        throw ioe;
2515      } else if (ioe instanceof ServerNotRunningYetException) {
2516        LOG.debug("Master is not running yet");
2517      } else {
2518        LOG.warn("error telling master we are up", se);
2519      }
2520      rssStub = null;
2521    }
2522    return result;
2523  }
2524
2525  @Override
2526  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2527    try {
2528      GetLastFlushedSequenceIdRequest req =
2529          RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2530      RegionServerStatusService.BlockingInterface rss = rssStub;
2531      if (rss == null) { // Try to connect one more time
2532        createRegionServerStatusStub();
2533        rss = rssStub;
2534        if (rss == null) {
2535          // Still no luck, we tried
2536          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2537          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2538              .build();
2539        }
2540      }
2541      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2542      return RegionStoreSequenceIds.newBuilder()
2543          .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2544          .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2545    } catch (ServiceException e) {
2546      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2547      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2548          .build();
2549    }
2550  }
2551
2552  /**
2553   * Close meta region if we carry it
2554   * @param abort Whether we're running an abort.
2555   */
2556  private void closeMetaTableRegions(final boolean abort) {
2557    HRegion meta = null;
2558    this.onlineRegionsLock.writeLock().lock();
2559    try {
2560      for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2561        RegionInfo hri = e.getValue().getRegionInfo();
2562        if (hri.isMetaRegion()) {
2563          meta = e.getValue();
2564        }
2565        if (meta != null) {
2566          break;
2567        }
2568      }
2569    } finally {
2570      this.onlineRegionsLock.writeLock().unlock();
2571    }
2572    if (meta != null) {
2573      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2574    }
2575  }
2576
2577  /**
2578   * Schedule closes on all user regions.
2579   * Should be safe calling multiple times because it wont' close regions
2580   * that are already closed or that are closing.
2581   * @param abort Whether we're running an abort.
2582   */
2583  private void closeUserRegions(final boolean abort) {
2584    this.onlineRegionsLock.writeLock().lock();
2585    try {
2586      for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2587        HRegion r = e.getValue();
2588        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2589          // Don't update zk with this close transition; pass false.
2590          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2591        }
2592      }
2593    } finally {
2594      this.onlineRegionsLock.writeLock().unlock();
2595    }
2596  }
2597
2598  protected Map<String, HRegion> getOnlineRegions() {
2599    return this.onlineRegions;
2600  }
2601
2602  public int getNumberOfOnlineRegions() {
2603    return this.onlineRegions.size();
2604  }
2605
2606  /**
2607   * For tests, web ui and metrics.
2608   * This method will only work if HRegionServer is in the same JVM as client;
2609   * HRegion cannot be serialized to cross an rpc.
2610   */
2611  public Collection<HRegion> getOnlineRegionsLocalContext() {
2612    Collection<HRegion> regions = this.onlineRegions.values();
2613    return Collections.unmodifiableCollection(regions);
2614  }
2615
2616  @Override
2617  public void addRegion(HRegion region) {
2618    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2619    configurationManager.registerObserver(region);
2620  }
2621
2622  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2623      long size) {
2624    if (!sortedRegions.containsKey(size)) {
2625      sortedRegions.put(size, new ArrayList<>());
2626    }
2627    sortedRegions.get(size).add(region);
2628  }
2629  /**
2630   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2631   *   the biggest.
2632   */
2633  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2634    // we'll sort the regions in reverse
2635    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2636    // Copy over all regions. Regions are sorted by size with biggest first.
2637    for (HRegion region : this.onlineRegions.values()) {
2638      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2639    }
2640    return sortedRegions;
2641  }
2642
2643  /**
2644   * @return A new Map of online regions sorted by region heap size with the first entry being the
2645   *   biggest.
2646   */
2647  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2648    // we'll sort the regions in reverse
2649    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2650    // Copy over all regions. Regions are sorted by size with biggest first.
2651    for (HRegion region : this.onlineRegions.values()) {
2652      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2653    }
2654    return sortedRegions;
2655  }
2656
2657  /** @return reference to FlushRequester */
2658  @Override
2659  public FlushRequester getFlushRequester() {
2660    return this.cacheFlusher;
2661  }
2662
2663  @Override
2664  public CompactionRequester getCompactionRequestor() {
2665    return this.compactSplitThread;
2666  }
2667
2668  @Override
2669  public LeaseManager getLeaseManager() {
2670    return leaseManager;
2671  }
2672
2673  /**
2674   * @return {@code true} when the data file system is available, {@code false} otherwise.
2675   */
2676  boolean isDataFileSystemOk() {
2677    return this.dataFsOk;
2678  }
2679
2680  public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2681    return this.rsHost;
2682  }
2683
2684  @Override
2685  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2686    return this.regionsInTransitionInRS;
2687  }
2688
2689  @Override
2690  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2691    return rsQuotaManager;
2692  }
2693
2694  //
2695  // Main program and support routines
2696  //
2697  /**
2698   * Load the replication executorService objects, if any
2699   */
2700  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2701      FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2702    // read in the name of the source replication class from the config file.
2703    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2704      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2705
2706    // read in the name of the sink replication class from the config file.
2707    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2708      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2709
2710    // If both the sink and the source class names are the same, then instantiate
2711    // only one object.
2712    if (sourceClassname.equals(sinkClassname)) {
2713      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2714        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2715      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2716      server.sameReplicationSourceAndSink = true;
2717    } else {
2718      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2719        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2720      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2721        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2722      server.sameReplicationSourceAndSink = false;
2723    }
2724  }
2725
2726  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2727      Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2728      Path oldLogDir, WALFactory walFactory) throws IOException {
2729    final Class<? extends T> clazz;
2730    try {
2731      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2732      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2733    } catch (java.lang.ClassNotFoundException nfe) {
2734      throw new IOException("Could not find class for " + classname);
2735    }
2736    T service = ReflectionUtils.newInstance(clazz, conf);
2737    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2738    return service;
2739  }
2740
2741  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
2742    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2743    if(!this.isOnline()){
2744      return walGroupsReplicationStatus;
2745    }
2746    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2747    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2748    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2749    for(ReplicationSourceInterface source: allSources){
2750      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2751    }
2752    return walGroupsReplicationStatus;
2753  }
2754
2755  /**
2756   * Utility for constructing an instance of the passed HRegionServer class.
2757   */
2758  static HRegionServer constructRegionServer(
2759      final Class<? extends HRegionServer> regionServerClass,
2760      final Configuration conf
2761  ) {
2762    try {
2763      Constructor<? extends HRegionServer> c =
2764          regionServerClass.getConstructor(Configuration.class);
2765      return c.newInstance(conf);
2766    } catch (Exception e) {
2767      throw new RuntimeException("Failed construction of " + "Regionserver: "
2768          + regionServerClass.toString(), e);
2769    }
2770  }
2771
2772  /**
2773   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2774   */
2775  public static void main(String[] args) {
2776    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2777    VersionInfo.logVersion();
2778    Configuration conf = HBaseConfiguration.create();
2779    @SuppressWarnings("unchecked")
2780    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2781        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2782
2783    new HRegionServerCommandLine(regionServerClass).doMain(args);
2784  }
2785
2786  /**
2787   * Gets the online regions of the specified table.
2788   * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2789   * Only returns <em>online</em> regions.  If a region on this table has been
2790   * closed during a disable, etc., it will not be included in the returned list.
2791   * So, the returned list may not necessarily be ALL regions in this table, its
2792   * all the ONLINE regions in the table.
2793   * @param tableName table to limit the scope of the query
2794   * @return Online regions from <code>tableName</code>
2795   */
2796  @Override
2797  public List<HRegion> getRegions(TableName tableName) {
2798    List<HRegion> tableRegions = new ArrayList<>();
2799    synchronized (this.onlineRegions) {
2800      for (HRegion region: this.onlineRegions.values()) {
2801        RegionInfo regionInfo = region.getRegionInfo();
2802        if(regionInfo.getTable().equals(tableName)) {
2803          tableRegions.add(region);
2804        }
2805      }
2806    }
2807    return tableRegions;
2808  }
2809
2810  @Override
2811  public List<HRegion> getRegions() {
2812    List<HRegion> allRegions;
2813    synchronized (this.onlineRegions) {
2814      // Return a clone copy of the onlineRegions
2815      allRegions = new ArrayList<>(onlineRegions.values());
2816    }
2817    return allRegions;
2818  }
2819
2820  /**
2821   * Gets the online tables in this RS.
2822   * This method looks at the in-memory onlineRegions.
2823   * @return all the online tables in this RS
2824   */
2825  public Set<TableName> getOnlineTables() {
2826    Set<TableName> tables = new HashSet<>();
2827    synchronized (this.onlineRegions) {
2828      for (Region region: this.onlineRegions.values()) {
2829        tables.add(region.getTableDescriptor().getTableName());
2830      }
2831    }
2832    return tables;
2833  }
2834
2835  public String[] getRegionServerCoprocessors() {
2836    TreeSet<String> coprocessors = new TreeSet<>();
2837    try {
2838      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2839    } catch (IOException exception) {
2840      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2841          "skipping.");
2842      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2843    }
2844    Collection<HRegion> regions = getOnlineRegionsLocalContext();
2845    for (HRegion region: regions) {
2846      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2847      try {
2848        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2849      } catch (IOException exception) {
2850        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2851            "; skipping.");
2852        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2853      }
2854    }
2855    coprocessors.addAll(rsHost.getCoprocessors());
2856    return coprocessors.toArray(new String[0]);
2857  }
2858
2859  /**
2860   * Try to close the region, logs a warning on failure but continues.
2861   * @param region Region to close
2862   */
2863  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
2864    try {
2865      if (!closeRegion(region.getEncodedName(), abort, null)) {
2866        LOG.warn("Failed to close " + region.getRegionNameAsString() +
2867            " - ignoring and continuing");
2868      }
2869    } catch (IOException e) {
2870      LOG.warn("Failed to close " + region.getRegionNameAsString() +
2871          " - ignoring and continuing", e);
2872    }
2873  }
2874
2875  /**
2876   * Close asynchronously a region, can be called from the master or internally by the regionserver
2877   * when stopping. If called from the master, the region will update the status.
2878   *
2879   * <p>
2880   * If an opening was in progress, this method will cancel it, but will not start a new close. The
2881   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2882   * </p>
2883
2884   * <p>
2885   *   If a close was in progress, this new request will be ignored, and an exception thrown.
2886   * </p>
2887   *
2888   * @param encodedName Region to close
2889   * @param abort True if we are aborting
2890   * @param destination Where the Region is being moved too... maybe null if unknown.
2891   * @return True if closed a region.
2892   * @throws NotServingRegionException if the region is not online
2893   */
2894  protected boolean closeRegion(String encodedName, final boolean abort,
2895        final ServerName destination)
2896      throws NotServingRegionException {
2897    //Check for permissions to close.
2898    HRegion actualRegion = this.getRegion(encodedName);
2899    // Can be null if we're calling close on a region that's not online
2900    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2901      try {
2902        actualRegion.getCoprocessorHost().preClose(false);
2903      } catch (IOException exp) {
2904        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2905        return false;
2906      }
2907    }
2908
2909    // previous can come back 'null' if not in map.
2910    final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName),
2911        Boolean.FALSE);
2912
2913    if (Boolean.TRUE.equals(previous)) {
2914      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2915          "trying to OPEN. Cancelling OPENING.");
2916      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
2917        // The replace failed. That should be an exceptional case, but theoretically it can happen.
2918        // We're going to try to do a standard close then.
2919        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2920            " Doing a standard close now");
2921        return closeRegion(encodedName, abort, destination);
2922      }
2923      // Let's get the region from the online region list again
2924      actualRegion = this.getRegion(encodedName);
2925      if (actualRegion == null) { // If already online, we still need to close it.
2926        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2927        // The master deletes the znode when it receives this exception.
2928        throw new NotServingRegionException("The region " + encodedName +
2929          " was opening but not yet served. Opening is cancelled.");
2930      }
2931    } else if (previous == null) {
2932      LOG.info("Received CLOSE for {}", encodedName);
2933    } else if (Boolean.FALSE.equals(previous)) {
2934      LOG.info("Received CLOSE for the region: " + encodedName +
2935        ", which we are already trying to CLOSE, but not completed yet");
2936      return true;
2937    }
2938
2939    if (actualRegion == null) {
2940      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
2941      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
2942      // The master deletes the znode when it receives this exception.
2943      throw new NotServingRegionException("The region " + encodedName +
2944          " is not online, and is not opening.");
2945    }
2946
2947    CloseRegionHandler crh;
2948    final RegionInfo hri = actualRegion.getRegionInfo();
2949    if (hri.isMetaRegion()) {
2950      crh = new CloseMetaHandler(this, this, hri, abort);
2951    } else {
2952      crh = new CloseRegionHandler(this, this, hri, abort, destination);
2953    }
2954    this.executorService.submit(crh);
2955    return true;
2956  }
2957
2958   /**
2959   * @return HRegion for the passed binary <code>regionName</code> or null if
2960   *         named region is not member of the online regions.
2961   */
2962  public HRegion getOnlineRegion(final byte[] regionName) {
2963    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
2964    return this.onlineRegions.get(encodedRegionName);
2965  }
2966
2967  @Override
2968  public HRegion getRegion(final String encodedRegionName) {
2969    return this.onlineRegions.get(encodedRegionName);
2970  }
2971
2972
2973  @Override
2974  public boolean removeRegion(final HRegion r, ServerName destination) {
2975    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2976    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
2977    if (destination != null) {
2978      long closeSeqNum = r.getMaxFlushedSeqId();
2979      if (closeSeqNum == HConstants.NO_SEQNUM) {
2980        // No edits in WAL for this region; get the sequence number when the region was opened.
2981        closeSeqNum = r.getOpenSeqNum();
2982        if (closeSeqNum == HConstants.NO_SEQNUM) {
2983          closeSeqNum = 0;
2984        }
2985      }
2986      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
2987      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
2988      if (selfMove) {
2989        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
2990          r.getRegionInfo().getEncodedName(),
2991          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
2992      }
2993    }
2994    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2995    configurationManager.deregisterObserver(r);
2996    return toReturn != null;
2997  }
2998
2999  /**
3000   * Protected Utility method for safely obtaining an HRegion handle.
3001   *
3002   * @param regionName Name of online {@link HRegion} to return
3003   * @return {@link HRegion} for <code>regionName</code>
3004   */
3005  protected HRegion getRegion(final byte[] regionName)
3006      throws NotServingRegionException {
3007    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3008    return getRegionByEncodedName(regionName, encodedRegionName);
3009  }
3010
3011  public HRegion getRegionByEncodedName(String encodedRegionName)
3012      throws NotServingRegionException {
3013    return getRegionByEncodedName(null, encodedRegionName);
3014  }
3015
3016  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3017    throws NotServingRegionException {
3018    HRegion region = this.onlineRegions.get(encodedRegionName);
3019    if (region == null) {
3020      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3021      if (moveInfo != null) {
3022        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3023      }
3024      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3025      String regionNameStr = regionName == null?
3026        encodedRegionName: Bytes.toStringBinary(regionName);
3027      if (isOpening != null && isOpening) {
3028        throw new RegionOpeningException("Region " + regionNameStr +
3029          " is opening on " + this.serverName);
3030      }
3031      throw new NotServingRegionException("" + regionNameStr +
3032        " is not online on " + this.serverName);
3033    }
3034    return region;
3035  }
3036
3037  /**
3038   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3039   * IOE if it isn't already.
3040   *
3041   * @param t Throwable
3042   * @param msg Message to log in error. Can be null.
3043   * @return Throwable converted to an IOE; methods can only let out IOEs.
3044   */
3045  private Throwable cleanup(final Throwable t, final String msg) {
3046    // Don't log as error if NSRE; NSRE is 'normal' operation.
3047    if (t instanceof NotServingRegionException) {
3048      LOG.debug("NotServingRegionException; " + t.getMessage());
3049      return t;
3050    }
3051    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3052    if (msg == null) {
3053      LOG.error("", e);
3054    } else {
3055      LOG.error(msg, e);
3056    }
3057    if (!rpcServices.checkOOME(t)) {
3058      checkFileSystem();
3059    }
3060    return t;
3061  }
3062
3063  /**
3064   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3065   * @return Make <code>t</code> an IOE if it isn't already.
3066   */
3067  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3068    return (t instanceof IOException ? (IOException) t : msg == null
3069        || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3070  }
3071
3072  /**
3073   * Checks to see if the file system is still accessible. If not, sets
3074   * abortRequested and stopRequested
3075   *
3076   * @return false if file system is not available
3077   */
3078  boolean checkFileSystem() {
3079    if (this.dataFsOk && this.dataFs != null) {
3080      try {
3081        FSUtils.checkFileSystemAvailable(this.dataFs);
3082      } catch (IOException e) {
3083        abort("File System not available", e);
3084        this.dataFsOk = false;
3085      }
3086    }
3087    return this.dataFsOk;
3088  }
3089
3090  @Override
3091  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3092      List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3093    Address[] addr = new Address[favoredNodes.size()];
3094    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3095    // it is a map of region name to Address[]
3096    for (int i = 0; i < favoredNodes.size(); i++) {
3097      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(),
3098          favoredNodes.get(i).getPort());
3099    }
3100    regionFavoredNodesMap.put(encodedRegionName, addr);
3101  }
3102
3103  /**
3104   * Return the favored nodes for a region given its encoded name. Look at the
3105   * comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[]
3106   * here.
3107   * @param encodedRegionName the encoded region name.
3108   * @return array of favored locations
3109   */
3110  @Override
3111  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3112    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3113  }
3114
3115  @Override
3116  public ServerNonceManager getNonceManager() {
3117    return this.nonceManager;
3118  }
3119
3120  private static class MovedRegionInfo {
3121    private final ServerName serverName;
3122    private final long seqNum;
3123
3124    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3125      this.serverName = serverName;
3126      this.seqNum = closeSeqNum;
3127    }
3128
3129    public ServerName getServerName() {
3130      return serverName;
3131    }
3132
3133    public long getSeqNum() {
3134      return seqNum;
3135    }
3136  }
3137
3138  /**
3139   * We need a timeout. If not there is a risk of giving a wrong information: this would double
3140   * the number of network calls instead of reducing them.
3141   */
3142  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3143
3144  private void addToMovedRegions(String encodedName, ServerName destination,
3145    long closeSeqNum, boolean selfMove) {
3146    if (selfMove) {
3147      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3148      return;
3149    }
3150    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" +
3151        closeSeqNum);
3152    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3153  }
3154
3155  void removeFromMovedRegions(String encodedName) {
3156    movedRegionInfoCache.invalidate(encodedName);
3157  }
3158
3159  @InterfaceAudience.Private
3160  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3161    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3162  }
3163
3164  @InterfaceAudience.Private
3165  public int movedRegionCacheExpiredTime() {
3166    return TIMEOUT_REGION_MOVED;
3167  }
3168
3169  private String getMyEphemeralNodePath() {
3170    return zooKeeper.getZNodePaths().getRsPath(serverName);
3171  }
3172
3173  private boolean isHealthCheckerConfigured() {
3174    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3175    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3176  }
3177
3178  /**
3179   * @return the underlying {@link CompactSplit} for the servers
3180   */
3181  public CompactSplit getCompactSplitThread() {
3182    return this.compactSplitThread;
3183  }
3184
3185  CoprocessorServiceResponse execRegionServerService(
3186      @SuppressWarnings("UnusedParameters") final RpcController controller,
3187      final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3188    try {
3189      ServerRpcController serviceController = new ServerRpcController();
3190      CoprocessorServiceCall call = serviceRequest.getCall();
3191      String serviceName = call.getServiceName();
3192      Service service = coprocessorServiceHandlers.get(serviceName);
3193      if (service == null) {
3194        throw new UnknownProtocolException(null,
3195          "No registered coprocessor executorService found for " + serviceName);
3196      }
3197      ServiceDescriptor serviceDesc =
3198          service.getDescriptorForType();
3199
3200      String methodName = call.getMethodName();
3201      MethodDescriptor methodDesc =
3202          serviceDesc.findMethodByName(methodName);
3203      if (methodDesc == null) {
3204        throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName +
3205            " called on executorService " + serviceName);
3206      }
3207
3208      Message request =
3209          CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3210      final Message.Builder responseBuilder =
3211          service.getResponsePrototype(methodDesc).newBuilderForType();
3212      service.callMethod(methodDesc, serviceController, request, message -> {
3213        if (message != null) {
3214          responseBuilder.mergeFrom(message);
3215        }
3216      });
3217      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3218      if (exception != null) {
3219        throw exception;
3220      }
3221      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3222    } catch (IOException ie) {
3223      throw new ServiceException(ie);
3224    }
3225  }
3226
3227  /**
3228   * May be null if this is a master which not carry table.
3229   *
3230   * @return The block cache instance used by the regionserver.
3231   */
3232  @Override
3233  public Optional<BlockCache> getBlockCache() {
3234    return Optional.ofNullable(this.blockCache);
3235  }
3236
3237  /**
3238   * May be null if this is a master which not carry table.
3239   *
3240   * @return The cache for mob files used by the regionserver.
3241   */
3242  @Override
3243  public Optional<MobFileCache> getMobFileCache() {
3244    return Optional.ofNullable(this.mobFileCache);
3245  }
3246
3247  /**
3248   * @return : Returns the ConfigurationManager object for testing purposes.
3249   */
3250  ConfigurationManager getConfigurationManager() {
3251    return configurationManager;
3252  }
3253
3254  CacheEvictionStats clearRegionBlockCache(Region region) {
3255    long evictedBlocks = 0;
3256
3257    for(Store store : region.getStores()) {
3258      for(StoreFile hFile : store.getStorefiles()) {
3259        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3260      }
3261    }
3262
3263    return CacheEvictionStats.builder()
3264        .withEvictedBlocks(evictedBlocks)
3265        .build();
3266  }
3267
3268  @Override
3269  public double getCompactionPressure() {
3270    double max = 0;
3271    for (Region region : onlineRegions.values()) {
3272      for (Store store : region.getStores()) {
3273        double normCount = store.getCompactionPressure();
3274        if (normCount > max) {
3275          max = normCount;
3276        }
3277      }
3278    }
3279    return max;
3280  }
3281
3282  @Override
3283  public HeapMemoryManager getHeapMemoryManager() {
3284    return hMemManager;
3285  }
3286
3287  public MemStoreFlusher getMemStoreFlusher() {
3288    return cacheFlusher;
3289  }
3290
3291  /**
3292   * For testing
3293   * @return whether all wal roll request finished for this regionserver
3294   */
3295  @InterfaceAudience.Private
3296  public boolean walRollRequestFinished() {
3297    return this.walRoller.walRollFinished();
3298  }
3299
3300  @Override
3301  public ThroughputController getFlushThroughputController() {
3302    return flushThroughputController;
3303  }
3304
3305  @Override
3306  public double getFlushPressure() {
3307    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3308      // return 0 during RS initialization
3309      return 0.0;
3310    }
3311    return getRegionServerAccounting().getFlushPressure();
3312  }
3313
3314  @Override
3315  public void onConfigurationChange(Configuration newConf) {
3316    ThroughputController old = this.flushThroughputController;
3317    if (old != null) {
3318      old.stop("configuration change");
3319    }
3320    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3321    try {
3322      Superusers.initialize(newConf);
3323    } catch (IOException e) {
3324      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3325    }
3326  }
3327
3328  @Override
3329  public MetricsRegionServer getMetrics() {
3330    return metricsRegionServer;
3331  }
3332
3333  @Override
3334  public SecureBulkLoadManager getSecureBulkLoadManager() {
3335    return this.secureBulkLoadManager;
3336  }
3337
3338  @Override
3339  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3340      final Abortable abort) {
3341    final LockServiceClient client =
3342        new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3343    return client.regionLock(regionInfo, description, abort);
3344  }
3345
3346  @Override
3347  public void unassign(byte[] regionName) throws IOException {
3348    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3349  }
3350
3351  @Override
3352  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3353    return this.rsSpaceQuotaManager;
3354  }
3355
3356  @Override
3357  public boolean reportFileArchivalForQuotas(TableName tableName,
3358      Collection<Entry<String, Long>> archivedFiles) {
3359    if (TEST_SKIP_REPORTING_TRANSITION) {
3360      return false;
3361    }
3362    RegionServerStatusService.BlockingInterface rss = rssStub;
3363    if (rss == null || rsSpaceQuotaManager == null) {
3364      // the current server could be stopping.
3365      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3366      return false;
3367    }
3368    try {
3369      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3370          rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3371      rss.reportFileArchival(null, request);
3372    } catch (ServiceException se) {
3373      IOException ioe = ProtobufUtil.getRemoteException(se);
3374      if (ioe instanceof PleaseHoldException) {
3375        if (LOG.isTraceEnabled()) {
3376          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3377              + " This will be retried.", ioe);
3378        }
3379        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3380        return false;
3381      }
3382      if (rssStub == rss) {
3383        rssStub = null;
3384      }
3385      // re-create the stub if we failed to report the archival
3386      createRegionServerStatusStub(true);
3387      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3388      return false;
3389    }
3390    return true;
3391  }
3392
3393  void executeProcedure(long procId, RSProcedureCallable callable) {
3394    executorService.submit(new RSProcedureHandler(this, procId, callable));
3395  }
3396
3397  public void remoteProcedureComplete(long procId, Throwable error) {
3398    procedureResultReporter.complete(procId, error);
3399  }
3400
3401  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3402    RegionServerStatusService.BlockingInterface rss;
3403    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3404    for (;;) {
3405      rss = rssStub;
3406      if (rss != null) {
3407        break;
3408      }
3409      createRegionServerStatusStub();
3410    }
3411    try {
3412      rss.reportProcedureDone(null, request);
3413    } catch (ServiceException se) {
3414      if (rssStub == rss) {
3415        rssStub = null;
3416      }
3417      throw ProtobufUtil.getRemoteException(se);
3418    }
3419  }
3420
3421  /**
3422   * Will ignore the open/close region procedures which already submitted or executed.
3423   *
3424   * When master had unfinished open/close region procedure and restarted, new active master may
3425   * send duplicate open/close region request to regionserver. The open/close request is submitted
3426   * to a thread pool and execute. So first need a cache for submitted open/close region procedures.
3427   *
3428   * After the open/close region request executed and report region transition succeed, cache it in
3429   * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3430   * transition succeed, master will not send the open/close region request to regionserver again.
3431   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3432   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds.
3433   *
3434   * See HBASE-22404 for more details.
3435   *
3436   * @param procId the id of the open/close region procedure
3437   * @return true if the procedure can be submitted.
3438   */
3439  boolean submitRegionProcedure(long procId) {
3440    if (procId == -1) {
3441      return true;
3442    }
3443    // Ignore the region procedures which already submitted.
3444    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3445    if (previous != null) {
3446      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3447      return false;
3448    }
3449    // Ignore the region procedures which already executed.
3450    if (executedRegionProcedures.getIfPresent(procId) != null) {
3451      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3452      return false;
3453    }
3454    return true;
3455  }
3456
3457  /**
3458   * See {@link #submitRegionProcedure(long)}.
3459   * @param procId the id of the open/close region procedure
3460   */
3461  public void finishRegionProcedure(long procId) {
3462    executedRegionProcedures.put(procId, procId);
3463    submittedRegionProcedures.remove(procId);
3464  }
3465
3466  /**
3467   * Force to terminate region server when abort timeout.
3468   */
3469  private static class SystemExitWhenAbortTimeout extends TimerTask {
3470
3471    public SystemExitWhenAbortTimeout() {
3472    }
3473
3474    @Override
3475    public void run() {
3476      LOG.warn("Aborting region server timed out, terminating forcibly" +
3477          " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3478          " Thread dump to stdout.");
3479      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3480      Runtime.getRuntime().halt(1);
3481    }
3482  }
3483
3484  @InterfaceAudience.Private
3485  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3486    return compactedFileDischarger;
3487  }
3488
3489  /**
3490   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3491   * @return pause time
3492   */
3493  @InterfaceAudience.Private
3494  public long getRetryPauseTime() {
3495    return this.retryPauseTime;
3496  }
3497
3498  @Override
3499  public Optional<ServerName> getActiveMaster() {
3500    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3501  }
3502
3503  @Override
3504  public List<ServerName> getBackupMasters() {
3505    return masterAddressTracker.getBackupMasters();
3506  }
3507
3508  @Override
3509  public Iterator<ServerName> getBootstrapNodes() {
3510    return bootstrapNodeManager.getBootstrapNodes().iterator();
3511  }
3512
3513  @Override
3514  public List<HRegionLocation> getMetaLocations() {
3515    return metaRegionLocationCache.getMetaRegionLocations();
3516  }
3517
3518  @Override
3519  protected NamedQueueRecorder createNamedQueueRecord() {
3520    final boolean isOnlineLogProviderEnabled = conf.getBoolean(
3521      HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
3522    if (isOnlineLogProviderEnabled) {
3523      return NamedQueueRecorder.getInstance(conf);
3524    } else {
3525      return null;
3526    }
3527  }
3528
3529  @Override
3530  protected boolean clusterMode() {
3531    // this method will be called in the constructor of super class, so we can not return masterless
3532    // directly here, as it will always be false.
3533    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3534  }
3535
3536  @InterfaceAudience.Private
3537  public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
3538    return brokenStoreFileCleaner;
3539  }
3540
3541  @Override
3542  protected void stopChores() {
3543    shutdownChore(nonceManagerChore);
3544    shutdownChore(compactionChecker);
3545    shutdownChore(compactedFileDischarger);
3546    shutdownChore(periodicFlusher);
3547    shutdownChore(healthCheckChore);
3548    shutdownChore(executorStatusChore);
3549    shutdownChore(storefileRefresher);
3550    shutdownChore(fsUtilizationChore);
3551    shutdownChore(slowLogTableOpsChore);
3552    shutdownChore(brokenStoreFileCleaner);
3553  }
3554
3555  @Override
3556  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3557    return regionReplicationBufferManager;
3558  }
3559}