001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
024import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
025
026import java.io.IOException;
027import java.io.PrintWriter;
028import java.lang.management.MemoryType;
029import java.lang.management.MemoryUsage;
030import java.lang.reflect.Constructor;
031import java.net.BindException;
032import java.net.InetAddress;
033import java.net.InetSocketAddress;
034import java.time.Duration;
035import java.util.ArrayList;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.Comparator;
039import java.util.HashSet;
040import java.util.List;
041import java.util.Map;
042import java.util.Map.Entry;
043import java.util.Objects;
044import java.util.Optional;
045import java.util.Set;
046import java.util.SortedMap;
047import java.util.Timer;
048import java.util.TimerTask;
049import java.util.TreeMap;
050import java.util.TreeSet;
051import java.util.concurrent.ConcurrentHashMap;
052import java.util.concurrent.ConcurrentMap;
053import java.util.concurrent.ConcurrentSkipListMap;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicBoolean;
056import java.util.concurrent.locks.ReentrantReadWriteLock;
057import java.util.stream.Collectors;
058import javax.management.MalformedObjectNameException;
059import javax.servlet.http.HttpServlet;
060import org.apache.commons.lang3.RandomUtils;
061import org.apache.commons.lang3.StringUtils;
062import org.apache.commons.lang3.SystemUtils;
063import org.apache.hadoop.conf.Configuration;
064import org.apache.hadoop.fs.FileSystem;
065import org.apache.hadoop.fs.Path;
066import org.apache.hadoop.hbase.Abortable;
067import org.apache.hadoop.hbase.CacheEvictionStats;
068import org.apache.hadoop.hbase.CallQueueTooBigException;
069import org.apache.hadoop.hbase.ChoreService;
070import org.apache.hadoop.hbase.ClockOutOfSyncException;
071import org.apache.hadoop.hbase.CoordinatedStateManager;
072import org.apache.hadoop.hbase.DoNotRetryIOException;
073import org.apache.hadoop.hbase.ExecutorStatusChore;
074import org.apache.hadoop.hbase.HBaseConfiguration;
075import org.apache.hadoop.hbase.HBaseInterfaceAudience;
076import org.apache.hadoop.hbase.HConstants;
077import org.apache.hadoop.hbase.HDFSBlocksDistribution;
078import org.apache.hadoop.hbase.HealthCheckChore;
079import org.apache.hadoop.hbase.MetaRegionLocationCache;
080import org.apache.hadoop.hbase.MetaTableAccessor;
081import org.apache.hadoop.hbase.NotServingRegionException;
082import org.apache.hadoop.hbase.PleaseHoldException;
083import org.apache.hadoop.hbase.ScheduledChore;
084import org.apache.hadoop.hbase.ServerName;
085import org.apache.hadoop.hbase.Stoppable;
086import org.apache.hadoop.hbase.TableDescriptors;
087import org.apache.hadoop.hbase.TableName;
088import org.apache.hadoop.hbase.YouAreDeadException;
089import org.apache.hadoop.hbase.ZNodeClearer;
090import org.apache.hadoop.hbase.client.AsyncClusterConnection;
091import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
092import org.apache.hadoop.hbase.client.Connection;
093import org.apache.hadoop.hbase.client.ConnectionFactory;
094import org.apache.hadoop.hbase.client.ConnectionUtils;
095import org.apache.hadoop.hbase.client.RegionInfo;
096import org.apache.hadoop.hbase.client.RegionInfoBuilder;
097import org.apache.hadoop.hbase.client.locking.EntityLock;
098import org.apache.hadoop.hbase.client.locking.LockServiceClient;
099import org.apache.hadoop.hbase.conf.ConfigurationManager;
100import org.apache.hadoop.hbase.conf.ConfigurationObserver;
101import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
102import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
103import org.apache.hadoop.hbase.exceptions.RegionMovedException;
104import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
105import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
106import org.apache.hadoop.hbase.executor.ExecutorService;
107import org.apache.hadoop.hbase.executor.ExecutorType;
108import org.apache.hadoop.hbase.fs.HFileSystem;
109import org.apache.hadoop.hbase.http.InfoServer;
110import org.apache.hadoop.hbase.io.hfile.BlockCache;
111import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
112import org.apache.hadoop.hbase.io.hfile.HFile;
113import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
114import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
115import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
116import org.apache.hadoop.hbase.ipc.RpcClient;
117import org.apache.hadoop.hbase.ipc.RpcServer;
118import org.apache.hadoop.hbase.ipc.RpcServerInterface;
119import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
120import org.apache.hadoop.hbase.ipc.ServerRpcController;
121import org.apache.hadoop.hbase.log.HBaseMarkers;
122import org.apache.hadoop.hbase.master.HMaster;
123import org.apache.hadoop.hbase.master.MasterRpcServicesVersionWrapper;
124import org.apache.hadoop.hbase.master.RegionState;
125import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
126import org.apache.hadoop.hbase.mob.MobFileCache;
127import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
128import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
129import org.apache.hadoop.hbase.net.Address;
130import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
131import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
132import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
133import org.apache.hadoop.hbase.quotas.QuotaUtil;
134import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
135import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
136import org.apache.hadoop.hbase.quotas.RegionSize;
137import org.apache.hadoop.hbase.quotas.RegionSizeStore;
138import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
139import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
140import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
141import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
142import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
143import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
144import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
145import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
146import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
147import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
148import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
149import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
150import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
151import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
152import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
153import org.apache.hadoop.hbase.security.SecurityConstants;
154import org.apache.hadoop.hbase.security.Superusers;
155import org.apache.hadoop.hbase.security.User;
156import org.apache.hadoop.hbase.security.UserProvider;
157import org.apache.hadoop.hbase.security.access.AccessChecker;
158import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
159import org.apache.hadoop.hbase.util.Addressing;
160import org.apache.hadoop.hbase.util.Bytes;
161import org.apache.hadoop.hbase.util.CommonFSUtils;
162import org.apache.hadoop.hbase.util.CompressionTest;
163import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
164import org.apache.hadoop.hbase.util.FSTableDescriptors;
165import org.apache.hadoop.hbase.util.FSUtils;
166import org.apache.hadoop.hbase.util.FutureUtils;
167import org.apache.hadoop.hbase.util.JvmPauseMonitor;
168import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
169import org.apache.hadoop.hbase.util.Pair;
170import org.apache.hadoop.hbase.util.RetryCounter;
171import org.apache.hadoop.hbase.util.RetryCounterFactory;
172import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
173import org.apache.hadoop.hbase.util.Sleeper;
174import org.apache.hadoop.hbase.util.Threads;
175import org.apache.hadoop.hbase.util.VersionInfo;
176import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
177import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
178import org.apache.hadoop.hbase.wal.WAL;
179import org.apache.hadoop.hbase.wal.WALFactory;
180import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
181import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
182import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
183import org.apache.hadoop.hbase.zookeeper.RegionServerAddressTracker;
184import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
185import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
186import org.apache.hadoop.hbase.zookeeper.ZKUtil;
187import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
188import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
189import org.apache.hadoop.ipc.RemoteException;
190import org.apache.hadoop.util.ReflectionUtils;
191import org.apache.yetus.audience.InterfaceAudience;
192import org.apache.zookeeper.KeeperException;
193import org.slf4j.Logger;
194import org.slf4j.LoggerFactory;
195import sun.misc.Signal;
196
197import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
198import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
199import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
200import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
201import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
202import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
203import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
204import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
205import org.apache.hbase.thirdparty.com.google.protobuf.Message;
206import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
207import org.apache.hbase.thirdparty.com.google.protobuf.Service;
208import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
209import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
210import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
211
212import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
213import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
243
244/**
245 * HRegionServer makes a set of HRegions available to clients. It checks in with
246 * the HMaster. There are many HRegionServers in a single HBase deployment.
247 */
248@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
249@SuppressWarnings({ "deprecation"})
250public class HRegionServer extends Thread implements
251    RegionServerServices, LastSequenceId, ConfigurationObserver {
252  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
253
254  /**
255   * For testing only!  Set to true to skip notifying region assignment to master .
256   */
257  @InterfaceAudience.Private
258  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
259  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
260
261  /**
262   * A map from RegionName to current action in progress. Boolean value indicates:
263   * true - if open region action in progress
264   * false - if close region action in progress
265   */
266  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
267    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
268
269  /**
270   * Used to cache the open/close region procedures which already submitted.
271   * See {@link #submitRegionProcedure(long)}.
272   */
273  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
274  /**
275   * Used to cache the open/close region procedures which already executed.
276   * See {@link #submitRegionProcedure(long)}.
277   */
278  private final Cache<Long, Long> executedRegionProcedures =
279      CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
280
281  /**
282   * Used to cache the moved-out regions
283   */
284  private final Cache<String, MovedRegionInfo> movedRegionInfoCache =
285      CacheBuilder.newBuilder().expireAfterWrite(movedRegionCacheExpiredTime(),
286        TimeUnit.MILLISECONDS).build();
287
288  private MemStoreFlusher cacheFlusher;
289
290  private HeapMemoryManager hMemManager;
291
292  /**
293   * The asynchronous cluster connection to be shared by services.
294   */
295  protected AsyncClusterConnection asyncClusterConnection;
296
297  /**
298   * Go here to get table descriptors.
299   */
300  protected TableDescriptors tableDescriptors;
301
302  // Replication services. If no replication, this handler will be null.
303  private ReplicationSourceService replicationSourceHandler;
304  private ReplicationSinkService replicationSinkHandler;
305  private boolean sameReplicationSourceAndSink;
306
307  // Compactions
308  private CompactSplit compactSplitThread;
309
310  /**
311   * Map of regions currently being served by this region server. Key is the
312   * encoded region name.  All access should be synchronized.
313   */
314  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
315  /**
316   * Lock for gating access to {@link #onlineRegions}.
317   * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap?
318   */
319  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
320
321  /**
322   * Map of encoded region names to the DataNode locations they should be hosted on
323   * We store the value as Address since InetSocketAddress is required by the HDFS
324   * API (create() that takes favored nodes as hints for placing file blocks).
325   * We could have used ServerName here as the value class, but we'd need to
326   * convert it to InetSocketAddress at some point before the HDFS API call, and
327   * it seems a bit weird to store ServerName since ServerName refers to RegionServers
328   * and here we really mean DataNode locations. We don't store it as InetSocketAddress
329   * here because the conversion on demand from Address to InetSocketAddress will
330   * guarantee the resolution results will be fresh when we need it.
331   */
332  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
333
334  private LeaseManager leaseManager;
335
336  // Instance of the hbase executor executorService.
337  protected ExecutorService executorService;
338
339  private volatile boolean dataFsOk;
340  private HFileSystem dataFs;
341  private HFileSystem walFs;
342
343  // Set when a report to the master comes back with a message asking us to
344  // shutdown. Also set by call to stop when debugging or running unit tests
345  // of HRegionServer in isolation.
346  private volatile boolean stopped = false;
347
348  // Go down hard. Used if file system becomes unavailable and also in
349  // debugging and unit tests.
350  private AtomicBoolean abortRequested;
351  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
352  // Default abort timeout is 1200 seconds for safe
353  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
354  // Will run this task when abort timeout
355  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
356
357  // A state before we go into stopped state.  At this stage we're closing user
358  // space regions.
359  private boolean stopping = false;
360  private volatile boolean killed = false;
361  private volatile boolean shutDown = false;
362
363  protected final Configuration conf;
364
365  private Path dataRootDir;
366  private Path walRootDir;
367
368  private final int threadWakeFrequency;
369  final int msgInterval;
370
371  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
372  private final int compactionCheckFrequency;
373  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
374  private final int flushCheckFrequency;
375
376  // Stub to do region server status calls against the master.
377  private volatile RegionServerStatusService.BlockingInterface rssStub;
378  private volatile LockService.BlockingInterface lockStub;
379  // RPC client. Used to make the stub above that does region server status checking.
380  private RpcClient rpcClient;
381
382  private UncaughtExceptionHandler uncaughtExceptionHandler;
383
384  // Info server. Default access so can be used by unit tests. REGIONSERVER
385  // is name of the webapp and the attribute name used stuffing this instance
386  // into web context.
387  protected InfoServer infoServer;
388  private JvmPauseMonitor pauseMonitor;
389
390  /** region server process name */
391  public static final String REGIONSERVER = "regionserver";
392
393
394  private MetricsRegionServer metricsRegionServer;
395  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
396
397  /**
398   * ChoreService used to schedule tasks that we want to run periodically
399   */
400  private ChoreService choreService;
401
402  /**
403   * Check for compactions requests.
404   */
405  private ScheduledChore compactionChecker;
406
407  /**
408   * Check for flushes
409   */
410  private ScheduledChore periodicFlusher;
411
412  private volatile WALFactory walFactory;
413
414  private LogRoller walRoller;
415
416  // A thread which calls reportProcedureDone
417  private RemoteProcedureResultReporter procedureResultReporter;
418
419  // flag set after we're done setting up server threads
420  final AtomicBoolean online = new AtomicBoolean(false);
421
422  // zookeeper connection and watcher
423  protected final ZKWatcher zooKeeper;
424
425  // master address tracker
426  private final MasterAddressTracker masterAddressTracker;
427
428  /**
429   * Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache
430   * entries. Used for serving ClientMetaService.
431   */
432  private final MetaRegionLocationCache metaRegionLocationCache;
433  /**
434   * Cache for all the region servers in the cluster. Used for serving ClientMetaService.
435   */
436  private final RegionServerAddressTracker regionServerAddressTracker;
437
438  // Cluster Status Tracker
439  protected final ClusterStatusTracker clusterStatusTracker;
440
441  // Log Splitting Worker
442  private SplitLogWorker splitLogWorker;
443
444  // A sleeper that sleeps for msgInterval.
445  protected final Sleeper sleeper;
446
447  private final int shortOperationTimeout;
448
449  // Time to pause if master says 'please hold'
450  private final long retryPauseTime;
451
452  private final RegionServerAccounting regionServerAccounting;
453
454  private SlowLogTableOpsChore slowLogTableOpsChore = null;
455
456  // Block cache
457  private BlockCache blockCache;
458  // The cache for mob files
459  private MobFileCache mobFileCache;
460
461  /** The health check chore. */
462  private HealthCheckChore healthCheckChore;
463
464  /** The Executor status collect chore. */
465  private ExecutorStatusChore executorStatusChore;
466
467  /** The nonce manager chore. */
468  private ScheduledChore nonceManagerChore;
469
470  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
471
472  /**
473   * The server name the Master sees us as.  Its made from the hostname the
474   * master passes us, port, and server startcode. Gets set after registration
475   * against Master.
476   */
477  protected ServerName serverName;
478
479  /**
480   * hostname specified by hostname config
481   */
482  protected String useThisHostnameInstead;
483
484  /**
485   * @deprecated since 2.4.0 and will be removed in 4.0.0.
486   * Use {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
487   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
488   */
489  @Deprecated
490  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
491  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
492    "hbase.regionserver.hostname.disable.master.reversedns";
493
494  /**
495   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
496   * Exception will be thrown if both are used.
497   */
498  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
499  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
500    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
501
502  /**
503   * This servers startcode.
504   */
505  protected final long startcode;
506
507  /**
508   * Unique identifier for the cluster we are a part of.
509   */
510  protected String clusterId;
511
512  // chore for refreshing store files for secondary regions
513  private StorefileRefresherChore storefileRefresher;
514
515  private RegionServerCoprocessorHost rsHost;
516
517  private RegionServerProcedureManagerHost rspmHost;
518
519  private RegionServerRpcQuotaManager rsQuotaManager;
520  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
521
522  /**
523   * Nonce manager. Nonces are used to make operations like increment and append idempotent
524   * in the case where client doesn't receive the response from a successful operation and
525   * retries. We track the successful ops for some time via a nonce sent by client and handle
526   * duplicate operations (currently, by failing them; in future we might use MVCC to return
527   * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
528   * HBASE-3787) are:
529   * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
530   *   of past records. If we don't read the records, we don't read and recover the nonces.
531   *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
532   * - There's no WAL recovery during normal region move, so nonces will not be transfered.
533   * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
534   * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
535   * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
536   * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
537   * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
538   * latest nonce in it expired. It can also be recovered during move.
539   */
540  final ServerNonceManager nonceManager;
541
542  private UserProvider userProvider;
543
544  protected final RSRpcServices rpcServices;
545
546  private CoordinatedStateManager csm;
547
548  /**
549   * Configuration manager is used to register/deregister and notify the configuration observers
550   * when the regionserver is notified that there was a change in the on disk configs.
551   */
552  protected final ConfigurationManager configurationManager;
553
554  @InterfaceAudience.Private
555  CompactedHFilesDischarger compactedFileDischarger;
556
557  private volatile ThroughputController flushThroughputController;
558
559  private SecureBulkLoadManager secureBulkLoadManager;
560
561  private FileSystemUtilizationChore fsUtilizationChore;
562
563  private final NettyEventLoopGroupConfig eventLoopGroupConfig;
564
565  /**
566   * Provide online slow log responses from ringbuffer
567   */
568  private NamedQueueRecorder namedQueueRecorder = null;
569
570  /**
571   * True if this RegionServer is coming up in a cluster where there is no Master;
572   * means it needs to just come up and make do without a Master to talk to: e.g. in test or
573   * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
574   * purpose is as a Replication-stream sink; see HBASE-18846 for more.
575   * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ?
576   */
577  private final boolean masterless;
578  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
579
580  /**regionserver codec list **/
581  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
582
583  // A timer to shutdown the process if abort takes too long
584  private Timer abortMonitor;
585
586  /**
587   * Starts a HRegionServer at the default location.
588   * <p/>
589   * Don't start any services or managers in here in the Constructor.
590   * Defer till after we register with the Master as much as possible. See {@link #startServices}.
591   */
592  public HRegionServer(final Configuration conf) throws IOException {
593    super("RegionServer");  // thread name
594    try {
595      this.startcode = EnvironmentEdgeManager.currentTime();
596      this.conf = conf;
597      this.dataFsOk = true;
598      this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
599      this.eventLoopGroupConfig = setupNetty(this.conf);
600      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
601      HFile.checkHFileVersion(this.conf);
602      checkCodecs(this.conf);
603      this.userProvider = UserProvider.instantiate(conf);
604      FSUtils.setupShortCircuitRead(this.conf);
605
606      // Disable usage of meta replicas in the regionserver
607      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
608      // Config'ed params
609      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
610      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
611      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
612      this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
613
614      this.sleeper = new Sleeper(this.msgInterval, this);
615
616      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
617      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
618
619      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
620          HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
621
622      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
623        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
624
625      this.abortRequested = new AtomicBoolean(false);
626      this.stopped = false;
627
628      initNamedQueueRecorder(conf);
629      rpcServices = createRpcServices();
630      useThisHostnameInstead = getUseThisHostnameInstead(conf);
631      String hostName =
632          StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName()
633              : this.useThisHostnameInstead;
634      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
635
636      // login the zookeeper client principal (if using security)
637      ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
638          HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
639      // login the server principal (if using secure Hadoop)
640      login(userProvider, hostName);
641      // init superusers and add the server principal (if using security)
642      // or process owner as default super user.
643      Superusers.initialize(conf);
644      regionServerAccounting = new RegionServerAccounting(conf);
645
646      boolean isMasterNotCarryTable =
647        this instanceof HMaster && !((HMaster) this).isInMaintenanceMode();
648
649      // no need to instantiate block cache and mob file cache when master not carry table
650      if (!isMasterNotCarryTable) {
651        blockCache = BlockCacheFactory.createBlockCache(conf);
652        mobFileCache = new MobFileCache(conf);
653      }
654
655      uncaughtExceptionHandler =
656        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
657
658      initializeFileSystem();
659
660      this.configurationManager = new ConfigurationManager();
661      setupWindows(getConfiguration(), getConfigurationManager());
662
663      // Some unit tests don't need a cluster, so no zookeeper at all
664      // Open connection to zookeeper and set primary watcher
665      zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this,
666        canCreateBaseZNode());
667      // If no master in cluster, skip trying to track one or look for a cluster status.
668      if (!this.masterless) {
669        if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
670          DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
671          this.csm = new ZkCoordinatedStateManager(this);
672        }
673
674        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
675        masterAddressTracker.start();
676
677        clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
678        clusterStatusTracker.start();
679      } else {
680        masterAddressTracker = null;
681        clusterStatusTracker = null;
682      }
683      this.rpcServices.start(zooKeeper);
684      this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
685      this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
686      // This violates 'no starting stuff in Constructor' but Master depends on the below chore
687      // and executor being created and takes a different startup route. Lots of overlap between HRS
688      // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
689      // Master expects Constructor to put up web servers. Ugh.
690      // class HRS. TODO.
691      this.choreService = new ChoreService(getName(), true);
692      this.executorService = new ExecutorService(getName());
693      putUpWebUI();
694    } catch (Throwable t) {
695      // Make sure we log the exception. HRegionServer is often started via reflection and the
696      // cause of failed startup is lost.
697      LOG.error("Failed construction RegionServer", t);
698      throw t;
699    }
700  }
701
702  private void initNamedQueueRecorder(Configuration conf) {
703    if (!(this instanceof HMaster)) {
704      final boolean isOnlineLogProviderEnabled = conf.getBoolean(
705        HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
706        HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
707      if (isOnlineLogProviderEnabled) {
708        this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
709      }
710    } else {
711      final boolean isBalancerDecisionRecording = conf
712        .getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
713          BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
714      final boolean isBalancerRejectionRecording = conf
715        .getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
716          BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
717      if (isBalancerDecisionRecording || isBalancerRejectionRecording) {
718        this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
719      }
720    }
721  }
722
723  // HMaster should override this method to load the specific config for master
724  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
725    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
726    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
727      if (!StringUtils.isBlank(hostname)) {
728        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + UNSAFE_RS_HOSTNAME_KEY +
729          " are mutually exclusive. Do not set " + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
730          " to true while " + UNSAFE_RS_HOSTNAME_KEY + " is used";
731        throw new IOException(msg);
732      } else {
733        return rpcServices.isa.getHostName();
734      }
735    } else {
736      return hostname;
737    }
738  }
739
740  /**
741   * If running on Windows, do windows-specific setup.
742   */
743  private static void setupWindows(final Configuration conf, ConfigurationManager cm) {
744    if (!SystemUtils.IS_OS_WINDOWS) {
745      Signal.handle(new Signal("HUP"), signal -> {
746        conf.reloadConfiguration();
747        cm.notifyAllObservers(conf);
748      });
749    }
750  }
751
752  private static NettyEventLoopGroupConfig setupNetty(Configuration conf) {
753    // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
754    NettyEventLoopGroupConfig nelgc =
755      new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
756    NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
757    NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
758    return nelgc;
759  }
760
761  private void initializeFileSystem() throws IOException {
762    // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
763    // checksum verification enabled, then automatically switch off hdfs checksum verification.
764    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
765    String walDirUri = CommonFSUtils.getDirUri(this.conf,
766      new Path(conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR))));
767    // set WAL's uri
768    if (walDirUri != null) {
769      CommonFSUtils.setFsDefault(this.conf, walDirUri);
770    }
771    // init the WALFs
772    this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
773    this.walRootDir = CommonFSUtils.getWALRootDir(this.conf);
774    // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
775    // underlying hadoop hdfs accessors will be going against wrong filesystem
776    // (unless all is set to defaults).
777    String rootDirUri =
778        CommonFSUtils.getDirUri(this.conf, new Path(conf.get(HConstants.HBASE_DIR)));
779    if (rootDirUri != null) {
780      CommonFSUtils.setFsDefault(this.conf, rootDirUri);
781    }
782    // init the filesystem
783    this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
784    this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
785    this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir,
786        !canUpdateTableDescriptor(), cacheTableDescriptor());
787  }
788
789  protected void login(UserProvider user, String host) throws IOException {
790    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
791      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
792  }
793
794  /**
795   * Wait for an active Master.
796   * See override in Master superclass for how it is used.
797   */
798  protected void waitForMasterActive() {}
799
800  protected String getProcessName() {
801    return REGIONSERVER;
802  }
803
804  protected boolean canCreateBaseZNode() {
805    return this.masterless;
806  }
807
808  protected boolean canUpdateTableDescriptor() {
809    return false;
810  }
811
812  protected boolean cacheTableDescriptor() {
813    return false;
814  }
815
816  protected RSRpcServices createRpcServices() throws IOException {
817    return new RSRpcServices(this);
818  }
819
820  protected void configureInfoServer() {
821    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
822    infoServer.setAttribute(REGIONSERVER, this);
823  }
824
825  protected Class<? extends HttpServlet> getDumpServlet() {
826    return RSDumpServlet.class;
827  }
828
829  /**
830   * Used by {@link RSDumpServlet} to generate debugging information.
831   */
832  public void dumpRowLocks(final PrintWriter out) {
833    StringBuilder sb = new StringBuilder();
834    for (HRegion region : getRegions()) {
835      if (region.getLockedRows().size() > 0) {
836        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
837          sb.setLength(0);
838          sb.append(region.getTableDescriptor().getTableName()).append(",")
839            .append(region.getRegionInfo().getEncodedName()).append(",");
840          sb.append(rowLockContext.toString());
841          out.println(sb);
842        }
843      }
844    }
845  }
846
847  @Override
848  public boolean registerService(Service instance) {
849    // No stacking of instances is allowed for a single executorService name
850    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
851    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
852    if (coprocessorServiceHandlers.containsKey(serviceName)) {
853      LOG.error("Coprocessor executorService " + serviceName +
854        " already registered, rejecting request from " + instance);
855      return false;
856    }
857
858    coprocessorServiceHandlers.put(serviceName, instance);
859    if (LOG.isDebugEnabled()) {
860      LOG.debug(
861        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
862    }
863    return true;
864  }
865
866  private Configuration cleanupConfiguration() {
867    Configuration conf = this.conf;
868    // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
869    // - Decouples RS and master life cycles. RegionServers can continue be up independent of
870    //   masters' availability.
871    // - Configuration management for region servers (cluster internal) is much simpler when adding
872    //   new masters or removing existing masters, since only clients' config needs to be updated.
873    // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
874    //   other internal connections too.
875    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
876        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
877    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
878      // Use server ZK cluster for server-issued connections, so we clone
879      // the conf and unset the client ZK related properties
880      conf = new Configuration(this.conf);
881      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
882    }
883    return conf;
884  }
885
886  /**
887   * Run test on configured codecs to make sure supporting libs are in place.
888   */
889  private static void checkCodecs(final Configuration c) throws IOException {
890    // check to see if the codec list is available:
891    String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null);
892    if (codecs == null) return;
893    for (String codec : codecs) {
894      if (!CompressionTest.testCompression(codec)) {
895        throw new IOException("Compression codec " + codec +
896          " not supported, aborting RS construction");
897      }
898    }
899  }
900
901  public String getClusterId() {
902    return this.clusterId;
903  }
904
905  /**
906   * Setup our cluster connection if not already initialized.
907   */
908  protected final synchronized void setupClusterConnection() throws IOException {
909    if (asyncClusterConnection == null) {
910      Configuration conf = cleanupConfiguration();
911      InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
912      User user = userProvider.getCurrent();
913      asyncClusterConnection =
914        ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user);
915    }
916  }
917
918  /**
919   * All initialization needed before we go register with Master.<br>
920   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
921   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
922   */
923  private void preRegistrationInitialization() {
924    try {
925      initializeZooKeeper();
926      setupClusterConnection();
927      // Setup RPC client for master communication
928      this.rpcClient = asyncClusterConnection.getRpcClient();
929    } catch (Throwable t) {
930      // Call stop if error or process will stick around for ever since server
931      // puts up non-daemon threads.
932      this.rpcServices.stop();
933      abort("Initialization of RS failed.  Hence aborting RS.", t);
934    }
935  }
936
937  /**
938   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
939   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
940   * <p>
941   * Finally open long-living server short-circuit connection.
942   */
943  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
944    justification="cluster Id znode read would give us correct response")
945  private void initializeZooKeeper() throws IOException, InterruptedException {
946    // Nothing to do in here if no Master in the mix.
947    if (this.masterless) {
948      return;
949    }
950
951    // Create the master address tracker, register with zk, and start it.  Then
952    // block until a master is available.  No point in starting up if no master
953    // running.
954    blockAndCheckIfStopped(this.masterAddressTracker);
955
956    // Wait on cluster being up.  Master will set this flag up in zookeeper
957    // when ready.
958    blockAndCheckIfStopped(this.clusterStatusTracker);
959
960    // If we are HMaster then the cluster id should have already been set.
961    if (clusterId == null) {
962      // Retrieve clusterId
963      // Since cluster status is now up
964      // ID should have already been set by HMaster
965      try {
966        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
967        if (clusterId == null) {
968          this.abort("Cluster ID has not been set");
969        }
970        LOG.info("ClusterId : " + clusterId);
971      } catch (KeeperException e) {
972        this.abort("Failed to retrieve Cluster ID", e);
973      }
974    }
975
976    waitForMasterActive();
977    if (isStopped() || isAborted()) {
978      return; // No need for further initialization
979    }
980
981    // watch for snapshots and other procedures
982    try {
983      rspmHost = new RegionServerProcedureManagerHost();
984      rspmHost.loadProcedures(conf);
985      rspmHost.initialize(this);
986    } catch (KeeperException e) {
987      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
988    }
989  }
990
991  /**
992   * Utilty method to wait indefinitely on a znode availability while checking
993   * if the region server is shut down
994   * @param tracker znode tracker to use
995   * @throws IOException any IO exception, plus if the RS is stopped
996   * @throws InterruptedException if the waiting thread is interrupted
997   */
998  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
999      throws IOException, InterruptedException {
1000    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
1001      if (this.stopped) {
1002        throw new IOException("Received the shutdown message while waiting.");
1003      }
1004    }
1005  }
1006
1007  /**
1008   * @return True if the cluster is up.
1009   */
1010  @Override
1011  public boolean isClusterUp() {
1012    return this.masterless ||
1013        (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
1014  }
1015
1016  /**
1017   * The HRegionServer sticks in this loop until closed.
1018   */
1019  @Override
1020  public void run() {
1021    if (isStopped()) {
1022      LOG.info("Skipping run; stopped");
1023      return;
1024    }
1025    try {
1026      // Do pre-registration initializations; zookeeper, lease threads, etc.
1027      preRegistrationInitialization();
1028    } catch (Throwable e) {
1029      abort("Fatal exception during initialization", e);
1030    }
1031
1032    try {
1033      if (!isStopped() && !isAborted()) {
1034        ShutdownHook.install(conf, dataFs, this, Thread.currentThread());
1035        // Initialize the RegionServerCoprocessorHost now that our ephemeral
1036        // node was created, in case any coprocessors want to use ZooKeeper
1037        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
1038
1039        // Try and register with the Master; tell it we are here.  Break if server is stopped or
1040        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
1041        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
1042        // come up.
1043        LOG.debug("About to register with Master.");
1044        RetryCounterFactory rcf =
1045          new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
1046        RetryCounter rc = rcf.create();
1047        while (keepLooping()) {
1048          RegionServerStartupResponse w = reportForDuty();
1049          if (w == null) {
1050            long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
1051            LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
1052            this.sleeper.sleep(sleepTime);
1053          } else {
1054            handleReportForDutyResponse(w);
1055            break;
1056          }
1057        }
1058      }
1059
1060      if (!isStopped() && isHealthy()) {
1061        // start the snapshot handler and other procedure handlers,
1062        // since the server is ready to run
1063        if (this.rspmHost != null) {
1064          this.rspmHost.start();
1065        }
1066        // Start the Quota Manager
1067        if (this.rsQuotaManager != null) {
1068          rsQuotaManager.start(getRpcServer().getScheduler());
1069        }
1070        if (this.rsSpaceQuotaManager != null) {
1071          this.rsSpaceQuotaManager.start();
1072        }
1073      }
1074
1075      // We registered with the Master.  Go into run mode.
1076      long lastMsg = EnvironmentEdgeManager.currentTime();
1077      long oldRequestCount = -1;
1078      // The main run loop.
1079      while (!isStopped() && isHealthy()) {
1080        if (!isClusterUp()) {
1081          if (onlineRegions.isEmpty()) {
1082            stop("Exiting; cluster shutdown set and not carrying any regions");
1083          } else if (!this.stopping) {
1084            this.stopping = true;
1085            LOG.info("Closing user regions");
1086            closeUserRegions(this.abortRequested.get());
1087          } else {
1088            boolean allUserRegionsOffline = areAllUserRegionsOffline();
1089            if (allUserRegionsOffline) {
1090              // Set stopped if no more write requests tp meta tables
1091              // since last time we went around the loop.  Any open
1092              // meta regions will be closed on our way out.
1093              if (oldRequestCount == getWriteRequestCount()) {
1094                stop("Stopped; only catalog regions remaining online");
1095                break;
1096              }
1097              oldRequestCount = getWriteRequestCount();
1098            } else {
1099              // Make sure all regions have been closed -- some regions may
1100              // have not got it because we were splitting at the time of
1101              // the call to closeUserRegions.
1102              closeUserRegions(this.abortRequested.get());
1103            }
1104            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1105          }
1106        }
1107        long now = EnvironmentEdgeManager.currentTime();
1108        if ((now - lastMsg) >= msgInterval) {
1109          tryRegionServerReport(lastMsg, now);
1110          lastMsg = EnvironmentEdgeManager.currentTime();
1111        }
1112        if (!isStopped() && !isAborted()) {
1113          this.sleeper.sleep();
1114        }
1115      } // for
1116    } catch (Throwable t) {
1117      if (!rpcServices.checkOOME(t)) {
1118        String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
1119        abort(prefix + t.getMessage(), t);
1120      }
1121    }
1122
1123    if (this.leaseManager != null) {
1124      this.leaseManager.closeAfterLeasesExpire();
1125    }
1126    if (this.splitLogWorker != null) {
1127      splitLogWorker.stop();
1128    }
1129    if (this.infoServer != null) {
1130      LOG.info("Stopping infoServer");
1131      try {
1132        this.infoServer.stop();
1133      } catch (Exception e) {
1134        LOG.error("Failed to stop infoServer", e);
1135      }
1136    }
1137    // Send cache a shutdown.
1138    if (blockCache != null) {
1139      blockCache.shutdown();
1140    }
1141    if (mobFileCache != null) {
1142      mobFileCache.shutdown();
1143    }
1144
1145    // Send interrupts to wake up threads if sleeping so they notice shutdown.
1146    // TODO: Should we check they are alive? If OOME could have exited already
1147    if (this.hMemManager != null) this.hMemManager.stop();
1148    if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1149    if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1150
1151    // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1152    if (rspmHost != null) {
1153      rspmHost.stop(this.abortRequested.get() || this.killed);
1154    }
1155
1156    if (this.killed) {
1157      // Just skip out w/o closing regions.  Used when testing.
1158    } else if (abortRequested.get()) {
1159      if (this.dataFsOk) {
1160        closeUserRegions(abortRequested.get()); // Don't leave any open file handles
1161      }
1162      LOG.info("aborting server " + this.serverName);
1163    } else {
1164      closeUserRegions(abortRequested.get());
1165      LOG.info("stopping server " + this.serverName);
1166    }
1167
1168    if (this.asyncClusterConnection != null) {
1169      try {
1170        this.asyncClusterConnection.close();
1171      } catch (IOException e) {
1172        // Although the {@link Closeable} interface throws an {@link
1173        // IOException}, in reality, the implementation would never do that.
1174        LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e);
1175      }
1176    }
1177    // Closing the compactSplit thread before closing meta regions
1178    if (!this.killed && containsMetaTableRegions()) {
1179      if (!abortRequested.get() || this.dataFsOk) {
1180        if (this.compactSplitThread != null) {
1181          this.compactSplitThread.join();
1182          this.compactSplitThread = null;
1183        }
1184        closeMetaTableRegions(abortRequested.get());
1185      }
1186    }
1187
1188    if (!this.killed && this.dataFsOk) {
1189      waitOnAllRegionsToClose(abortRequested.get());
1190      LOG.info("stopping server " + this.serverName + "; all regions closed.");
1191    }
1192
1193    // Stop the quota manager
1194    if (rsQuotaManager != null) {
1195      rsQuotaManager.stop();
1196    }
1197    if (rsSpaceQuotaManager != null) {
1198      rsSpaceQuotaManager.stop();
1199      rsSpaceQuotaManager = null;
1200    }
1201
1202    // flag may be changed when closing regions throws exception.
1203    if (this.dataFsOk) {
1204      shutdownWAL(!abortRequested.get());
1205    }
1206
1207    // Make sure the proxy is down.
1208    if (this.rssStub != null) {
1209      this.rssStub = null;
1210    }
1211    if (this.lockStub != null) {
1212      this.lockStub = null;
1213    }
1214    if (this.rpcClient != null) {
1215      this.rpcClient.close();
1216    }
1217    if (this.leaseManager != null) {
1218      this.leaseManager.close();
1219    }
1220    if (this.pauseMonitor != null) {
1221      this.pauseMonitor.stop();
1222    }
1223
1224    if (!killed) {
1225      stopServiceThreads();
1226    }
1227
1228    if (this.rpcServices != null) {
1229      this.rpcServices.stop();
1230    }
1231
1232    try {
1233      deleteMyEphemeralNode();
1234    } catch (KeeperException.NoNodeException nn) {
1235      // pass
1236    } catch (KeeperException e) {
1237      LOG.warn("Failed deleting my ephemeral node", e);
1238    }
1239    // We may have failed to delete the znode at the previous step, but
1240    //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1241    ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1242
1243    if (this.zooKeeper != null) {
1244      this.zooKeeper.close();
1245    }
1246    this.shutDown = true;
1247    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1248  }
1249
1250  private boolean containsMetaTableRegions() {
1251    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1252  }
1253
1254  private boolean areAllUserRegionsOffline() {
1255    if (getNumberOfOnlineRegions() > 2) return false;
1256    boolean allUserRegionsOffline = true;
1257    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1258      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1259        allUserRegionsOffline = false;
1260        break;
1261      }
1262    }
1263    return allUserRegionsOffline;
1264  }
1265
1266  /**
1267   * @return Current write count for all online regions.
1268   */
1269  private long getWriteRequestCount() {
1270    long writeCount = 0;
1271    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1272      writeCount += e.getValue().getWriteRequestsCount();
1273    }
1274    return writeCount;
1275  }
1276
1277  @InterfaceAudience.Private
1278  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1279      throws IOException {
1280    RegionServerStatusService.BlockingInterface rss = rssStub;
1281    if (rss == null) {
1282      // the current server could be stopping.
1283      return;
1284    }
1285    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1286    try {
1287      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1288      request.setServer(ProtobufUtil.toServerName(this.serverName));
1289      request.setLoad(sl);
1290      rss.regionServerReport(null, request.build());
1291    } catch (ServiceException se) {
1292      IOException ioe = ProtobufUtil.getRemoteException(se);
1293      if (ioe instanceof YouAreDeadException) {
1294        // This will be caught and handled as a fatal error in run()
1295        throw ioe;
1296      }
1297      if (rssStub == rss) {
1298        rssStub = null;
1299      }
1300      // Couldn't connect to the master, get location from zk and reconnect
1301      // Method blocks until new master is found or we are stopped
1302      createRegionServerStatusStub(true);
1303    }
1304  }
1305
1306  /**
1307   * Reports the given map of Regions and their size on the filesystem to the active Master.
1308   *
1309   * @param regionSizeStore The store containing region sizes
1310   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1311   */
1312  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1313    RegionServerStatusService.BlockingInterface rss = rssStub;
1314    if (rss == null) {
1315      // the current server could be stopping.
1316      LOG.trace("Skipping Region size report to HMaster as stub is null");
1317      return true;
1318    }
1319    try {
1320      buildReportAndSend(rss, regionSizeStore);
1321    } catch (ServiceException se) {
1322      IOException ioe = ProtobufUtil.getRemoteException(se);
1323      if (ioe instanceof PleaseHoldException) {
1324        LOG.trace("Failed to report region sizes to Master because it is initializing."
1325            + " This will be retried.", ioe);
1326        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1327        return true;
1328      }
1329      if (rssStub == rss) {
1330        rssStub = null;
1331      }
1332      createRegionServerStatusStub(true);
1333      if (ioe instanceof DoNotRetryIOException) {
1334        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1335        if (doNotRetryEx.getCause() != null) {
1336          Throwable t = doNotRetryEx.getCause();
1337          if (t instanceof UnsupportedOperationException) {
1338            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1339            return false;
1340          }
1341        }
1342      }
1343      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1344    }
1345    return true;
1346  }
1347
1348  /**
1349   * Builds the region size report and sends it to the master. Upon successful sending of the
1350   * report, the region sizes that were sent are marked as sent.
1351   *
1352   * @param rss The stub to send to the Master
1353   * @param regionSizeStore The store containing region sizes
1354   */
1355  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1356      RegionSizeStore regionSizeStore) throws ServiceException {
1357    RegionSpaceUseReportRequest request =
1358        buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1359    rss.reportRegionSpaceUse(null, request);
1360    // Record the number of size reports sent
1361    if (metricsRegionServer != null) {
1362      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1363    }
1364  }
1365
1366  /**
1367   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1368   *
1369   * @param regionSizes The size in bytes of regions
1370   * @return The corresponding protocol buffer message.
1371   */
1372  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1373    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1374    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1375      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1376    }
1377    return request.build();
1378  }
1379
1380  /**
1381   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1382   * protobuf message.
1383   *
1384   * @param regionInfo The RegionInfo
1385   * @param sizeInBytes The size in bytes of the Region
1386   * @return The protocol buffer
1387   */
1388  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1389    return RegionSpaceUse.newBuilder()
1390        .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1391        .setRegionSize(Objects.requireNonNull(sizeInBytes))
1392        .build();
1393  }
1394
1395  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1396      throws IOException {
1397    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1398    // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1399    // the wrapper to compute those numbers in one place.
1400    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1401    // Instead they should be stored in an HBase table so that external visibility into HBase is
1402    // improved; Additionally the load balancer will be able to take advantage of a more complete
1403    // history.
1404    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1405    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1406    long usedMemory = -1L;
1407    long maxMemory = -1L;
1408    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1409    if (usage != null) {
1410      usedMemory = usage.getUsed();
1411      maxMemory = usage.getMax();
1412    }
1413
1414    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1415    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1416    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1417    serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1418    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1419    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1420    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1421    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1422    Builder coprocessorBuilder = Coprocessor.newBuilder();
1423    for (String coprocessor : coprocessors) {
1424      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1425    }
1426    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1427    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1428    for (HRegion region : regions) {
1429      if (region.getCoprocessorHost() != null) {
1430        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1431        for (String regionCoprocessor : regionCoprocessors) {
1432          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1433        }
1434      }
1435      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1436      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1437          .getCoprocessors()) {
1438        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1439      }
1440    }
1441    serverLoad.setReportStartTime(reportStartTime);
1442    serverLoad.setReportEndTime(reportEndTime);
1443    if (this.infoServer != null) {
1444      serverLoad.setInfoServerPort(this.infoServer.getPort());
1445    } else {
1446      serverLoad.setInfoServerPort(-1);
1447    }
1448    MetricsUserAggregateSource userSource =
1449        metricsRegionServer.getMetricsUserAggregate().getSource();
1450    if (userSource != null) {
1451      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1452      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1453        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1454      }
1455    }
1456
1457    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1458      // always refresh first to get the latest value
1459      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1460      if (rLoad != null) {
1461        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1462        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1463          .getReplicationLoadSourceEntries()) {
1464          serverLoad.addReplLoadSource(rLS);
1465        }
1466      }
1467    } else {
1468      if (replicationSourceHandler != null) {
1469        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1470        if (rLoad != null) {
1471          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1472            .getReplicationLoadSourceEntries()) {
1473            serverLoad.addReplLoadSource(rLS);
1474          }
1475        }
1476      }
1477      if (replicationSinkHandler != null) {
1478        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1479        if (rLoad != null) {
1480          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1481        }
1482      }
1483    }
1484
1485    return serverLoad.build();
1486  }
1487
1488  private String getOnlineRegionsAsPrintableString() {
1489    StringBuilder sb = new StringBuilder();
1490    for (Region r: this.onlineRegions.values()) {
1491      if (sb.length() > 0) sb.append(", ");
1492      sb.append(r.getRegionInfo().getEncodedName());
1493    }
1494    return sb.toString();
1495  }
1496
1497  /**
1498   * Wait on regions close.
1499   */
1500  private void waitOnAllRegionsToClose(final boolean abort) {
1501    // Wait till all regions are closed before going out.
1502    int lastCount = -1;
1503    long previousLogTime = 0;
1504    Set<String> closedRegions = new HashSet<>();
1505    boolean interrupted = false;
1506    try {
1507      while (!onlineRegions.isEmpty()) {
1508        int count = getNumberOfOnlineRegions();
1509        // Only print a message if the count of regions has changed.
1510        if (count != lastCount) {
1511          // Log every second at most
1512          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1513            previousLogTime = EnvironmentEdgeManager.currentTime();
1514            lastCount = count;
1515            LOG.info("Waiting on " + count + " regions to close");
1516            // Only print out regions still closing if a small number else will
1517            // swamp the log.
1518            if (count < 10 && LOG.isDebugEnabled()) {
1519              LOG.debug("Online Regions=" + this.onlineRegions);
1520            }
1521          }
1522        }
1523        // Ensure all user regions have been sent a close. Use this to
1524        // protect against the case where an open comes in after we start the
1525        // iterator of onlineRegions to close all user regions.
1526        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1527          RegionInfo hri = e.getValue().getRegionInfo();
1528          if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) &&
1529              !closedRegions.contains(hri.getEncodedName())) {
1530            closedRegions.add(hri.getEncodedName());
1531            // Don't update zk with this close transition; pass false.
1532            closeRegionIgnoreErrors(hri, abort);
1533          }
1534        }
1535        // No regions in RIT, we could stop waiting now.
1536        if (this.regionsInTransitionInRS.isEmpty()) {
1537          if (!onlineRegions.isEmpty()) {
1538            LOG.info("We were exiting though online regions are not empty," +
1539                " because some regions failed closing");
1540          }
1541          break;
1542        } else {
1543          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream().
1544            map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1545        }
1546        if (sleepInterrupted(200)) {
1547          interrupted = true;
1548        }
1549      }
1550    } finally {
1551      if (interrupted) {
1552        Thread.currentThread().interrupt();
1553      }
1554    }
1555  }
1556
1557  private static boolean sleepInterrupted(long millis) {
1558    boolean interrupted = false;
1559    try {
1560      Thread.sleep(millis);
1561    } catch (InterruptedException e) {
1562      LOG.warn("Interrupted while sleeping");
1563      interrupted = true;
1564    }
1565    return interrupted;
1566  }
1567
1568  private void shutdownWAL(final boolean close) {
1569    if (this.walFactory != null) {
1570      try {
1571        if (close) {
1572          walFactory.close();
1573        } else {
1574          walFactory.shutdown();
1575        }
1576      } catch (Throwable e) {
1577        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1578        LOG.error("Shutdown / close of WAL failed: " + e);
1579        LOG.debug("Shutdown / close exception details:", e);
1580      }
1581    }
1582  }
1583
1584  /**
1585   * get NamedQueue Provider to add different logs to ringbuffer
1586   *
1587   * @return NamedQueueRecorder
1588   */
1589  public NamedQueueRecorder getNamedQueueRecorder() {
1590    return this.namedQueueRecorder;
1591  }
1592
1593  /*
1594   * Run init. Sets up wal and starts up all server threads.
1595   *
1596   * @param c Extra configuration.
1597   */
1598  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1599  throws IOException {
1600    try {
1601      boolean updateRootDir = false;
1602      for (NameStringPair e : c.getMapEntriesList()) {
1603        String key = e.getName();
1604        // The hostname the master sees us as.
1605        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1606          String hostnameFromMasterPOV = e.getValue();
1607          this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(),
1608              this.startcode);
1609          if (!StringUtils.isBlank(useThisHostnameInstead) &&
1610              !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1611            String msg = "Master passed us a different hostname to use; was=" +
1612                this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1613            LOG.error(msg);
1614            throw new IOException(msg);
1615          }
1616          if (StringUtils.isBlank(useThisHostnameInstead) &&
1617              !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1618            String msg = "Master passed us a different hostname to use; was=" +
1619                rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1620            LOG.error(msg);
1621          }
1622          continue;
1623        }
1624
1625        String value = e.getValue();
1626        if (key.equals(HConstants.HBASE_DIR)) {
1627          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1628            updateRootDir = true;
1629          }
1630        }
1631
1632        if (LOG.isDebugEnabled()) {
1633          LOG.debug("Config from master: " + key + "=" + value);
1634        }
1635        this.conf.set(key, value);
1636      }
1637      // Set our ephemeral znode up in zookeeper now we have a name.
1638      createMyEphemeralNode();
1639
1640      if (updateRootDir) {
1641        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1642        initializeFileSystem();
1643      }
1644
1645      // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1646      // config param for task trackers, but we can piggyback off of it.
1647      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1648        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1649      }
1650
1651      // Save it in a file, this will allow to see if we crash
1652      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1653
1654      // This call sets up an initialized replication and WAL. Later we start it up.
1655      setupWALAndReplication();
1656      // Init in here rather than in constructor after thread name has been set
1657      final MetricsTable metricsTable =
1658          new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1659      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1660      this.metricsRegionServer = new MetricsRegionServer(
1661          metricsRegionServerImpl, conf, metricsTable);
1662      // Now that we have a metrics source, start the pause monitor
1663      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1664      pauseMonitor.start();
1665
1666      // There is a rare case where we do NOT want services to start. Check config.
1667      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1668        startServices();
1669      }
1670      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1671      // or make sense of it.
1672      startReplicationService();
1673
1674
1675      // Set up ZK
1676      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa +
1677          ", sessionid=0x" +
1678          Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1679
1680      // Wake up anyone waiting for this server to online
1681      synchronized (online) {
1682        online.set(true);
1683        online.notifyAll();
1684      }
1685    } catch (Throwable e) {
1686      stop("Failed initialization");
1687      throw convertThrowableToIOE(cleanup(e, "Failed init"),
1688          "Region server startup failed");
1689    } finally {
1690      sleeper.skipSleepCycle();
1691    }
1692  }
1693
1694  protected void initializeMemStoreChunkCreator() {
1695    if (MemStoreLAB.isEnabled(conf)) {
1696      // MSLAB is enabled. So initialize MemStoreChunkPool
1697      // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1698      // it.
1699      Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
1700      long globalMemStoreSize = pair.getFirst();
1701      boolean offheap = this.regionServerAccounting.isOffheap();
1702      // When off heap memstore in use, take full area for chunk pool.
1703      float poolSizePercentage = offheap ? 1.0F :
1704        conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
1705      float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
1706        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
1707      int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
1708      float indexChunkSizePercent = conf.getFloat(MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_KEY,
1709        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1710      // init the chunkCreator
1711      ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
1712        initialCountPercentage, this.hMemManager, indexChunkSizePercent);
1713    }
1714  }
1715
1716  private void startHeapMemoryManager() {
1717    if (this.blockCache != null) {
1718      this.hMemManager =
1719          new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1720      this.hMemManager.start(getChoreService());
1721    }
1722  }
1723
1724  private void createMyEphemeralNode() throws KeeperException {
1725    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1726    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1727    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1728    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1729    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1730  }
1731
1732  private void deleteMyEphemeralNode() throws KeeperException {
1733    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1734  }
1735
1736  @Override
1737  public RegionServerAccounting getRegionServerAccounting() {
1738    return regionServerAccounting;
1739  }
1740
1741  /**
1742   * @param r Region to get RegionLoad for.
1743   * @param regionLoadBldr the RegionLoad.Builder, can be null
1744   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1745   * @return RegionLoad instance.
1746   */
1747  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1748      RegionSpecifier.Builder regionSpecifier) throws IOException {
1749    byte[] name = r.getRegionInfo().getRegionName();
1750    int stores = 0;
1751    int storefiles = 0;
1752    int storeRefCount = 0;
1753    int maxCompactedStoreFileRefCount = 0;
1754    int storeUncompressedSizeMB = 0;
1755    int storefileSizeMB = 0;
1756    int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024);
1757    long storefileIndexSizeKB = 0;
1758    int rootLevelIndexSizeKB = 0;
1759    int totalStaticIndexSizeKB = 0;
1760    int totalStaticBloomSizeKB = 0;
1761    long totalCompactingKVs = 0;
1762    long currentCompactedKVs = 0;
1763    List<HStore> storeList = r.getStores();
1764    stores += storeList.size();
1765    for (HStore store : storeList) {
1766      storefiles += store.getStorefilesCount();
1767      int currentStoreRefCount = store.getStoreRefCount();
1768      storeRefCount += currentStoreRefCount;
1769      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1770      maxCompactedStoreFileRefCount = Math.max(maxCompactedStoreFileRefCount,
1771        currentMaxCompactedStoreFileRefCount);
1772      storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1773      storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1774      //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1775      storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024;
1776      CompactionProgress progress = store.getCompactionProgress();
1777      if (progress != null) {
1778        totalCompactingKVs += progress.getTotalCompactingKVs();
1779        currentCompactedKVs += progress.currentCompactedKVs;
1780      }
1781      rootLevelIndexSizeKB += (int) (store.getStorefilesRootLevelIndexSize() / 1024);
1782      totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1783      totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1784    }
1785
1786    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1787    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1788    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1789    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1790    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1791    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1792    if (regionLoadBldr == null) {
1793      regionLoadBldr = RegionLoad.newBuilder();
1794    }
1795    if (regionSpecifier == null) {
1796      regionSpecifier = RegionSpecifier.newBuilder();
1797    }
1798    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1799    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1800    regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1801      .setStores(stores)
1802      .setStorefiles(storefiles)
1803      .setStoreRefCount(storeRefCount)
1804      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1805      .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1806      .setStorefileSizeMB(storefileSizeMB)
1807      .setMemStoreSizeMB(memstoreSizeMB)
1808      .setStorefileIndexSizeKB(storefileIndexSizeKB)
1809      .setRootIndexSizeKB(rootLevelIndexSizeKB)
1810      .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1811      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1812      .setReadRequestsCount(r.getReadRequestsCount())
1813      .setCpRequestsCount(r.getCpRequestsCount())
1814      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1815      .setWriteRequestsCount(r.getWriteRequestsCount())
1816      .setTotalCompactingKVs(totalCompactingKVs)
1817      .setCurrentCompactedKVs(currentCompactedKVs)
1818      .setDataLocality(dataLocality)
1819      .setDataLocalityForSsd(dataLocalityForSsd)
1820      .setBlocksLocalWeight(blocksLocalWeight)
1821      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight)
1822      .setBlocksTotalWeight(blocksTotalWeight)
1823      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1824      .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1825    r.setCompleteSequenceId(regionLoadBldr);
1826    return regionLoadBldr.build();
1827  }
1828
1829  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1830    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1831    userLoadBldr.setUserName(user);
1832    userSource.getClientMetrics().values().stream().map(
1833      clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1834            .setHostName(clientMetrics.getHostName())
1835            .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1836            .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1837            .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1838        .forEach(userLoadBldr::addClientMetrics);
1839    return userLoadBldr.build();
1840  }
1841
1842  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1843    HRegion r = onlineRegions.get(encodedRegionName);
1844    return r != null ? createRegionLoad(r, null, null) : null;
1845  }
1846
1847  /**
1848   * Inner class that runs on a long period checking if regions need compaction.
1849   */
1850  private static class CompactionChecker extends ScheduledChore {
1851    private final HRegionServer instance;
1852    private final int majorCompactPriority;
1853    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1854    //Iteration is 1-based rather than 0-based so we don't check for compaction
1855    // immediately upon region server startup
1856    private long iteration = 1;
1857
1858    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1859      super("CompactionChecker", stopper, sleepTime);
1860      this.instance = h;
1861      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1862
1863      /* MajorCompactPriority is configurable.
1864       * If not set, the compaction will use default priority.
1865       */
1866      this.majorCompactPriority = this.instance.conf.
1867          getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1868              DEFAULT_PRIORITY);
1869    }
1870
1871    @Override
1872    protected void chore() {
1873      for (Region r : this.instance.onlineRegions.values()) {
1874        // Skip compaction if region is read only
1875        if (r == null || r.isReadOnly()) {
1876          continue;
1877        }
1878
1879        HRegion hr = (HRegion) r;
1880        for (HStore s : hr.stores.values()) {
1881          try {
1882            long multiplier = s.getCompactionCheckMultiplier();
1883            assert multiplier > 0;
1884            if (iteration % multiplier != 0) {
1885              continue;
1886            }
1887            if (s.needsCompaction()) {
1888              // Queue a compaction. Will recognize if major is needed.
1889              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1890                getName() + " requests compaction");
1891            } else if (s.shouldPerformMajorCompaction()) {
1892              s.triggerMajorCompaction();
1893              if (majorCompactPriority == DEFAULT_PRIORITY ||
1894                  majorCompactPriority > hr.getCompactPriority()) {
1895                this.instance.compactSplitThread.requestCompaction(hr, s,
1896                    getName() + " requests major compaction; use default priority",
1897                    Store.NO_PRIORITY,
1898                CompactionLifeCycleTracker.DUMMY, null);
1899              } else {
1900                this.instance.compactSplitThread.requestCompaction(hr, s,
1901                    getName() + " requests major compaction; use configured priority",
1902                    this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1903              }
1904            }
1905          } catch (IOException e) {
1906            LOG.warn("Failed major compaction check on " + r, e);
1907          }
1908        }
1909      }
1910      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1911    }
1912  }
1913
1914  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1915    private final HRegionServer server;
1916    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1917    private final static int MIN_DELAY_TIME = 0; // millisec
1918    private final long rangeOfDelayMs;
1919
1920    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1921      super("MemstoreFlusherChore", server, cacheFlushInterval);
1922      this.server = server;
1923
1924      final long configuredRangeOfDelay = server.getConfiguration().getInt(
1925          "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1926      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1927    }
1928
1929    @Override
1930    protected void chore() {
1931      final StringBuilder whyFlush = new StringBuilder();
1932      for (HRegion r : this.server.onlineRegions.values()) {
1933        if (r == null) continue;
1934        if (r.shouldFlush(whyFlush)) {
1935          FlushRequester requester = server.getFlushRequester();
1936          if (requester != null) {
1937            long randomDelay = RandomUtils.nextLong(0, rangeOfDelayMs) + MIN_DELAY_TIME;
1938            //Throttle the flushes by putting a delay. If we don't throttle, and there
1939            //is a balanced write-load on the regions in a table, we might end up
1940            //overwhelming the filesystem with too many flushes at once.
1941            if (requester.requestDelayedFlush(r, randomDelay)) {
1942              LOG.info("{} requesting flush of {} because {} after random delay {} ms",
1943                  getName(), r.getRegionInfo().getRegionNameAsString(),  whyFlush.toString(),
1944                  randomDelay);
1945            }
1946          }
1947        }
1948      }
1949    }
1950  }
1951
1952  /**
1953   * Report the status of the server. A server is online once all the startup is
1954   * completed (setting up filesystem, starting executorService threads, etc.). This
1955   * method is designed mostly to be useful in tests.
1956   *
1957   * @return true if online, false if not.
1958   */
1959  public boolean isOnline() {
1960    return online.get();
1961  }
1962
1963  /**
1964   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1965   * be hooked up to WAL.
1966   */
1967  private void setupWALAndReplication() throws IOException {
1968    boolean isMaster = this instanceof HMaster;
1969    WALFactory factory =
1970        new WALFactory(conf, serverName.toString(), this, !isMaster);
1971    if (!isMaster) {
1972      // TODO Replication make assumptions here based on the default filesystem impl
1973      Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1974      String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1975
1976      Path logDir = new Path(walRootDir, logName);
1977      LOG.debug("logDir={}", logDir);
1978      if (this.walFs.exists(logDir)) {
1979        throw new RegionServerRunningException(
1980            "Region server has already created directory at " + this.serverName.toString());
1981      }
1982      // Always create wal directory as now we need this when master restarts to find out the live
1983      // region servers.
1984      if (!this.walFs.mkdirs(logDir)) {
1985        throw new IOException("Can not create wal directory " + logDir);
1986      }
1987      // Instantiate replication if replication enabled. Pass it the log directories.
1988      createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1989    }
1990    this.walFactory = factory;
1991  }
1992
1993  /**
1994   * Start up replication source and sink handlers.
1995   */
1996  private void startReplicationService() throws IOException {
1997    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1998      this.replicationSourceHandler.startReplicationService();
1999    } else {
2000      if (this.replicationSourceHandler != null) {
2001        this.replicationSourceHandler.startReplicationService();
2002      }
2003      if (this.replicationSinkHandler != null) {
2004        this.replicationSinkHandler.startReplicationService();
2005      }
2006    }
2007  }
2008
2009  /**
2010   * @return Master address tracker instance.
2011   */
2012  public MasterAddressTracker getMasterAddressTracker() {
2013    return this.masterAddressTracker;
2014  }
2015
2016  /**
2017   * Start maintenance Threads, Server, Worker and lease checker threads.
2018   * Start all threads we need to run. This is called after we've successfully
2019   * registered with the Master.
2020   * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
2021   * get an unhandled exception. We cannot set the handler on all threads.
2022   * Server's internal Listener thread is off limits. For Server, if an OOME, it
2023   * waits a while then retries. Meantime, a flush or a compaction that tries to
2024   * run should trigger same critical condition and the shutdown will run. On
2025   * its way out, this server will shut down Server. Leases are sort of
2026   * inbetween. It has an internal thread that while it inherits from Chore, it
2027   * keeps its own internal stop mechanism so needs to be stopped by this
2028   * hosting server. Worker logs the exception and exits.
2029   */
2030  private void startServices() throws IOException {
2031    if (!isStopped() && !isAborted()) {
2032      initializeThreads();
2033    }
2034    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
2035    this.secureBulkLoadManager.start();
2036
2037    // Health checker thread.
2038    if (isHealthCheckerConfigured()) {
2039      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
2040      HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
2041      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
2042    }
2043    // Executor status collect thread.
2044    if (this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
2045        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)) {
2046      int sleepTime = this.conf.getInt(ExecutorStatusChore.WAKE_FREQ,
2047          ExecutorStatusChore.DEFAULT_WAKE_FREQ);
2048      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
2049          this.metricsRegionServer.getMetricsSource());
2050    }
2051
2052    this.walRoller = new LogRoller(this);
2053    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
2054    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
2055
2056    // Create the CompactedFileDischarger chore executorService. This chore helps to
2057    // remove the compacted files that will no longer be used in reads.
2058    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
2059    // 2 mins so that compacted files can be archived before the TTLCleaner runs
2060    int cleanerInterval =
2061      conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
2062    this.compactedFileDischarger =
2063      new CompactedHFilesDischarger(cleanerInterval, this, this);
2064    choreService.scheduleChore(compactedFileDischarger);
2065
2066    // Start executor services
2067    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
2068    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2069        ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
2070    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
2071    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2072        ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
2073    final int openPriorityRegionThreads =
2074        conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
2075    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2076        ExecutorType.RS_OPEN_PRIORITY_REGION).setCorePoolSize(openPriorityRegionThreads));
2077    final int closeRegionThreads =
2078        conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
2079    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2080        ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
2081    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
2082    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2083        ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
2084    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
2085      final int storeScannerParallelSeekThreads =
2086          conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
2087      executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2088          ExecutorType.RS_PARALLEL_SEEK).setCorePoolSize(storeScannerParallelSeekThreads)
2089          .setAllowCoreThreadTimeout(true));
2090    }
2091    final int logReplayOpsThreads = conf.getInt(
2092        HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
2093    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2094        ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(logReplayOpsThreads)
2095        .setAllowCoreThreadTimeout(true));
2096    // Start the threads for compacted files discharger
2097    final int compactionDischargerThreads =
2098        conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
2099    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2100        ExecutorType.RS_COMPACTED_FILES_DISCHARGER).setCorePoolSize(compactionDischargerThreads));
2101    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
2102      final int regionReplicaFlushThreads = conf.getInt(
2103          "hbase.regionserver.region.replica.flusher.threads", conf.getInt(
2104              "hbase.regionserver.executor.openregion.threads", 3));
2105      executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2106          ExecutorType.RS_REGION_REPLICA_FLUSH_OPS).setCorePoolSize(regionReplicaFlushThreads));
2107    }
2108    final int refreshPeerThreads =
2109        conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
2110    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2111        ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
2112    final int replaySyncReplicationWALThreads =
2113        conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
2114    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2115        ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL).setCorePoolSize(
2116            replaySyncReplicationWALThreads));
2117    final int switchRpcThrottleThreads =
2118        conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
2119    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2120        ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads));
2121    final int claimReplicationQueueThreads =
2122      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
2123    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2124        ExecutorType.RS_CLAIM_REPLICATION_QUEUE).setCorePoolSize(claimReplicationQueueThreads));
2125
2126    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
2127        uncaughtExceptionHandler);
2128    if (this.cacheFlusher != null) {
2129      this.cacheFlusher.start(uncaughtExceptionHandler);
2130    }
2131    Threads.setDaemonThreadRunning(this.procedureResultReporter,
2132      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
2133
2134    if (this.compactionChecker != null) {
2135      choreService.scheduleChore(compactionChecker);
2136    }
2137    if (this.periodicFlusher != null) {
2138      choreService.scheduleChore(periodicFlusher);
2139    }
2140    if (this.healthCheckChore != null) {
2141      choreService.scheduleChore(healthCheckChore);
2142    }
2143    if (this.executorStatusChore != null) {
2144      choreService.scheduleChore(executorStatusChore);
2145    }
2146    if (this.nonceManagerChore != null) {
2147      choreService.scheduleChore(nonceManagerChore);
2148    }
2149    if (this.storefileRefresher != null) {
2150      choreService.scheduleChore(storefileRefresher);
2151    }
2152    if (this.fsUtilizationChore != null) {
2153      choreService.scheduleChore(fsUtilizationChore);
2154    }
2155    if (this.slowLogTableOpsChore != null) {
2156      choreService.scheduleChore(slowLogTableOpsChore);
2157    }
2158
2159    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2160    // an unhandled exception, it will just exit.
2161    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2162        uncaughtExceptionHandler);
2163
2164    // Create the log splitting worker and start it
2165    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2166    // quite a while inside Connection layer. The worker won't be available for other
2167    // tasks even after current task is preempted after a split task times out.
2168    Configuration sinkConf = HBaseConfiguration.create(conf);
2169    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2170        conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2171    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2172        conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2173    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2174    if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
2175      DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
2176      // SplitLogWorker needs csm. If none, don't start this.
2177      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
2178      splitLogWorker.start();
2179      LOG.debug("SplitLogWorker started");
2180    }
2181
2182    // Memstore services.
2183    startHeapMemoryManager();
2184    // Call it after starting HeapMemoryManager.
2185    initializeMemStoreChunkCreator();
2186  }
2187
2188  private void initializeThreads() {
2189    // Cache flushing thread.
2190    this.cacheFlusher = new MemStoreFlusher(conf, this);
2191
2192    // Compaction thread
2193    this.compactSplitThread = new CompactSplit(this);
2194
2195    // Background thread to check for compactions; needed if region has not gotten updates
2196    // in a while. It will take care of not checking too frequently on store-by-store basis.
2197    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2198    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2199    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2200
2201    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2202      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2203    if (isSlowLogTableEnabled) {
2204      // default chore duration: 10 min
2205      final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
2206      slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
2207    }
2208
2209    if (this.nonceManager != null) {
2210      // Create the scheduled chore that cleans up nonces.
2211      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2212    }
2213
2214    // Setup the Quota Manager
2215    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2216    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2217
2218    if (QuotaUtil.isQuotaEnabled(conf)) {
2219      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2220    }
2221
2222
2223    boolean onlyMetaRefresh = false;
2224    int storefileRefreshPeriod = conf.getInt(
2225        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2226        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2227    if (storefileRefreshPeriod == 0) {
2228      storefileRefreshPeriod = conf.getInt(
2229          StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2230          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2231      onlyMetaRefresh = true;
2232    }
2233    if (storefileRefreshPeriod > 0) {
2234      this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
2235          onlyMetaRefresh, this, this);
2236    }
2237    registerConfigurationObservers();
2238  }
2239
2240  private void registerConfigurationObservers() {
2241    // Registering the compactSplitThread object with the ConfigurationManager.
2242    configurationManager.registerObserver(this.compactSplitThread);
2243    configurationManager.registerObserver(this.rpcServices);
2244    configurationManager.registerObserver(this);
2245  }
2246
2247  /**
2248   * Puts up the webui.
2249   */
2250  private void putUpWebUI() throws IOException {
2251    int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2252      HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2253    String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
2254
2255    if(this instanceof HMaster) {
2256      port = conf.getInt(HConstants.MASTER_INFO_PORT,
2257          HConstants.DEFAULT_MASTER_INFOPORT);
2258      addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
2259    }
2260    // -1 is for disabling info server
2261    if (port < 0) {
2262      return;
2263    }
2264
2265    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
2266      String msg =
2267          "Failed to start http info server. Address " + addr
2268              + " does not belong to this host. Correct configuration parameter: "
2269              + "hbase.regionserver.info.bindAddress";
2270      LOG.error(msg);
2271      throw new IOException(msg);
2272    }
2273    // check if auto port bind enabled
2274    boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false);
2275    while (true) {
2276      try {
2277        this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
2278        infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet());
2279        configureInfoServer();
2280        this.infoServer.start();
2281        break;
2282      } catch (BindException e) {
2283        if (!auto) {
2284          // auto bind disabled throw BindException
2285          LOG.error("Failed binding http info server to port: " + port);
2286          throw e;
2287        }
2288        // auto bind enabled, try to use another port
2289        LOG.info("Failed binding http info server to port: " + port);
2290        port++;
2291        LOG.info("Retry starting http info server with port: " + port);
2292      }
2293    }
2294    port = this.infoServer.getPort();
2295    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
2296    int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
2297      HConstants.DEFAULT_MASTER_INFOPORT);
2298    conf.setInt("hbase.master.info.port.orig", masterInfoPort);
2299    conf.setInt(HConstants.MASTER_INFO_PORT, port);
2300  }
2301
2302  /*
2303   * Verify that server is healthy
2304   */
2305  private boolean isHealthy() {
2306    if (!dataFsOk) {
2307      // File system problem
2308      return false;
2309    }
2310    // Verify that all threads are alive
2311    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2312        && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2313        && (this.walRoller == null || this.walRoller.isAlive())
2314        && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2315        && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2316    if (!healthy) {
2317      stop("One or more threads are no longer alive -- stop");
2318    }
2319    return healthy;
2320  }
2321
2322  @Override
2323  public List<WAL> getWALs() {
2324    return walFactory.getWALs();
2325  }
2326
2327  @Override
2328  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2329    WAL wal = walFactory.getWAL(regionInfo);
2330    if (this.walRoller != null) {
2331      this.walRoller.addWAL(wal);
2332    }
2333    return wal;
2334  }
2335
2336  public LogRoller getWalRoller() {
2337    return walRoller;
2338  }
2339
2340  WALFactory getWalFactory() {
2341    return walFactory;
2342  }
2343
2344  @Override
2345  public Connection getConnection() {
2346    return getAsyncConnection().toConnection();
2347  }
2348
2349  @Override
2350  public void stop(final String msg) {
2351    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2352  }
2353
2354  /**
2355   * Stops the regionserver.
2356   * @param msg Status message
2357   * @param force True if this is a regionserver abort
2358   * @param user The user executing the stop request, or null if no user is associated
2359   */
2360  public void stop(final String msg, final boolean force, final User user) {
2361    if (!this.stopped) {
2362      LOG.info("***** STOPPING region server '" + this + "' *****");
2363      if (this.rsHost != null) {
2364        // when forced via abort don't allow CPs to override
2365        try {
2366          this.rsHost.preStop(msg, user);
2367        } catch (IOException ioe) {
2368          if (!force) {
2369            LOG.warn("The region server did not stop", ioe);
2370            return;
2371          }
2372          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2373        }
2374      }
2375      this.stopped = true;
2376      LOG.info("STOPPED: " + msg);
2377      // Wakes run() if it is sleeping
2378      sleeper.skipSleepCycle();
2379    }
2380  }
2381
2382  public void waitForServerOnline(){
2383    while (!isStopped() && !isOnline()) {
2384      synchronized (online) {
2385        try {
2386          online.wait(msgInterval);
2387        } catch (InterruptedException ie) {
2388          Thread.currentThread().interrupt();
2389          break;
2390        }
2391      }
2392    }
2393  }
2394
2395  @Override
2396  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2397    HRegion r = context.getRegion();
2398    long openProcId = context.getOpenProcId();
2399    long masterSystemTime = context.getMasterSystemTime();
2400    rpcServices.checkOpen();
2401    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2402      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2403    // Do checks to see if we need to compact (references or too many files)
2404    // Skip compaction check if region is read only
2405    if (!r.isReadOnly()) {
2406      for (HStore s : r.stores.values()) {
2407        if (s.hasReferences() || s.needsCompaction()) {
2408          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2409        }
2410      }
2411    }
2412    long openSeqNum = r.getOpenSeqNum();
2413    if (openSeqNum == HConstants.NO_SEQNUM) {
2414      // If we opened a region, we should have read some sequence number from it.
2415      LOG.error(
2416        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2417      openSeqNum = 0;
2418    }
2419
2420    // Notify master
2421    if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2422      openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) {
2423      throw new IOException(
2424        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2425    }
2426
2427    triggerFlushInPrimaryRegion(r);
2428
2429    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2430  }
2431
2432  /**
2433   * Helper method for use in tests. Skip the region transition report when there's no master
2434   * around to receive it.
2435   */
2436  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2437    final TransitionCode code = context.getCode();
2438    final long openSeqNum = context.getOpenSeqNum();
2439    long masterSystemTime = context.getMasterSystemTime();
2440    final RegionInfo[] hris = context.getHris();
2441
2442    if (code == TransitionCode.OPENED) {
2443      Preconditions.checkArgument(hris != null && hris.length == 1);
2444      if (hris[0].isMetaRegion()) {
2445        try {
2446          MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
2447              hris[0].getReplicaId(), RegionState.State.OPEN);
2448        } catch (KeeperException e) {
2449          LOG.info("Failed to update meta location", e);
2450          return false;
2451        }
2452      } else {
2453        try {
2454          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2455              serverName, openSeqNum, masterSystemTime);
2456        } catch (IOException e) {
2457          LOG.info("Failed to update meta", e);
2458          return false;
2459        }
2460      }
2461    }
2462    return true;
2463  }
2464
2465  private ReportRegionStateTransitionRequest createReportRegionStateTransitionRequest(
2466      final RegionStateTransitionContext context) {
2467    final TransitionCode code = context.getCode();
2468    final long openSeqNum = context.getOpenSeqNum();
2469    final RegionInfo[] hris = context.getHris();
2470    final long[] procIds = context.getProcIds();
2471
2472    ReportRegionStateTransitionRequest.Builder builder =
2473        ReportRegionStateTransitionRequest.newBuilder();
2474    builder.setServer(ProtobufUtil.toServerName(serverName));
2475    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2476    transition.setTransitionCode(code);
2477    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2478      transition.setOpenSeqNum(openSeqNum);
2479    }
2480    for (RegionInfo hri: hris) {
2481      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2482    }
2483    for (long procId: procIds) {
2484      transition.addProcId(procId);
2485    }
2486
2487    return builder.build();
2488  }
2489
2490  @Override
2491  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2492    if (TEST_SKIP_REPORTING_TRANSITION) {
2493      return skipReportingTransition(context);
2494    }
2495    final ReportRegionStateTransitionRequest request =
2496        createReportRegionStateTransitionRequest(context);
2497
2498    int tries = 0;
2499    long pauseTime = this.retryPauseTime;
2500    // Keep looping till we get an error. We want to send reports even though server is going down.
2501    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2502    // HRegionServer does down.
2503    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2504      RegionServerStatusService.BlockingInterface rss = rssStub;
2505      try {
2506        if (rss == null) {
2507          createRegionServerStatusStub();
2508          continue;
2509        }
2510        ReportRegionStateTransitionResponse response =
2511          rss.reportRegionStateTransition(null, request);
2512        if (response.hasErrorMessage()) {
2513          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2514          break;
2515        }
2516        // Log if we had to retry else don't log unless TRACE. We want to
2517        // know if were successful after an attempt showed in logs as failed.
2518        if (tries > 0 || LOG.isTraceEnabled()) {
2519          LOG.info("TRANSITION REPORTED " + request);
2520        }
2521        // NOTE: Return mid-method!!!
2522        return true;
2523      } catch (ServiceException se) {
2524        IOException ioe = ProtobufUtil.getRemoteException(se);
2525        boolean pause =
2526            ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException
2527                || ioe instanceof CallQueueTooBigException;
2528        if (pause) {
2529          // Do backoff else we flood the Master with requests.
2530          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2531        } else {
2532          pauseTime = this.retryPauseTime; // Reset.
2533        }
2534        LOG.info("Failed report transition " +
2535          TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" +
2536            (pause?
2537                " after " + pauseTime + "ms delay (Master is coming online...).":
2538                " immediately."),
2539            ioe);
2540        if (pause) Threads.sleep(pauseTime);
2541        tries++;
2542        if (rssStub == rss) {
2543          rssStub = null;
2544        }
2545      }
2546    }
2547    return false;
2548  }
2549
2550  /**
2551   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2552   * block this thread. See RegionReplicaFlushHandler for details.
2553   */
2554  private void triggerFlushInPrimaryRegion(final HRegion region) {
2555    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2556      return;
2557    }
2558    TableName tn = region.getTableDescriptor().getTableName();
2559    if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) ||
2560        !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf)) {
2561      region.setReadsEnabled(true);
2562      return;
2563    }
2564
2565    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2566    // RegionReplicaFlushHandler might reset this.
2567
2568    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2569    if (this.executorService != null) {
2570      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2571    } else {
2572      LOG.info("Executor is null; not running flush of primary region replica for {}",
2573        region.getRegionInfo());
2574     }
2575  }
2576
2577  @Override
2578  public RpcServerInterface getRpcServer() {
2579    return rpcServices.rpcServer;
2580  }
2581
2582  @InterfaceAudience.Private
2583  public RSRpcServices getRSRpcServices() {
2584    return rpcServices;
2585  }
2586
2587  /**
2588   * Cause the server to exit without closing the regions it is serving, the log
2589   * it is using and without notifying the master. Used unit testing and on
2590   * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2591   *
2592   * @param reason
2593   *          the reason we are aborting
2594   * @param cause
2595   *          the exception that caused the abort, or null
2596   */
2597  @Override
2598  public void abort(String reason, Throwable cause) {
2599    if (!setAbortRequested()) {
2600      // Abort already in progress, ignore the new request.
2601      LOG.debug(
2602          "Abort already in progress. Ignoring the current request with reason: {}", reason);
2603      return;
2604    }
2605    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2606    if (cause != null) {
2607      LOG.error(HBaseMarkers.FATAL, msg, cause);
2608    } else {
2609      LOG.error(HBaseMarkers.FATAL, msg);
2610    }
2611    // HBASE-4014: show list of coprocessors that were loaded to help debug
2612    // regionserver crashes.Note that we're implicitly using
2613    // java.util.HashSet's toString() method to print the coprocessor names.
2614    LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " +
2615        CoprocessorHost.getLoadedCoprocessors());
2616    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2617    try {
2618      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2619    } catch (MalformedObjectNameException | IOException e) {
2620      LOG.warn("Failed dumping metrics", e);
2621    }
2622
2623    // Do our best to report our abort to the master, but this may not work
2624    try {
2625      if (cause != null) {
2626        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2627      }
2628      // Report to the master but only if we have already registered with the master.
2629      RegionServerStatusService.BlockingInterface rss = rssStub;
2630      if (rss != null && this.serverName != null) {
2631        ReportRSFatalErrorRequest.Builder builder =
2632          ReportRSFatalErrorRequest.newBuilder();
2633        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2634        builder.setErrorMessage(msg);
2635        rss.reportRSFatalError(null, builder.build());
2636      }
2637    } catch (Throwable t) {
2638      LOG.warn("Unable to report fatal error to master", t);
2639    }
2640
2641    scheduleAbortTimer();
2642    // shutdown should be run as the internal user
2643    stop(reason, true, null);
2644  }
2645
2646  /**
2647   * Sets the abort state if not already set.
2648   * @return True if abortRequested set to True successfully, false if an abort is already in
2649   * progress.
2650   */
2651  protected boolean setAbortRequested() {
2652    return abortRequested.compareAndSet(false, true);
2653  }
2654
2655  /**
2656   * @see HRegionServer#abort(String, Throwable)
2657   */
2658  public void abort(String reason) {
2659    abort(reason, null);
2660  }
2661
2662  @Override
2663  public boolean isAborted() {
2664    return abortRequested.get();
2665  }
2666
2667  /*
2668   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2669   * logs but it does close socket in case want to bring up server on old
2670   * hostname+port immediately.
2671   */
2672  @InterfaceAudience.Private
2673  protected void kill() {
2674    this.killed = true;
2675    abort("Simulated kill");
2676  }
2677
2678  // Limits the time spent in the shutdown process.
2679  private void scheduleAbortTimer() {
2680    if (this.abortMonitor == null) {
2681      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2682      TimerTask abortTimeoutTask = null;
2683      try {
2684        Constructor<? extends TimerTask> timerTaskCtor =
2685          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2686            .asSubclass(TimerTask.class).getDeclaredConstructor();
2687        timerTaskCtor.setAccessible(true);
2688        abortTimeoutTask = timerTaskCtor.newInstance();
2689      } catch (Exception e) {
2690        LOG.warn("Initialize abort timeout task failed", e);
2691      }
2692      if (abortTimeoutTask != null) {
2693        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2694      }
2695    }
2696  }
2697
2698  protected final void shutdownChore(ScheduledChore chore) {
2699    if (chore != null) {
2700      chore.shutdown();
2701    }
2702  }
2703  /**
2704   * Wait on all threads to finish. Presumption is that all closes and stops
2705   * have already been called.
2706   */
2707  protected void stopServiceThreads() {
2708    // clean up the scheduled chores
2709    if (this.choreService != null) {
2710      shutdownChore(nonceManagerChore);
2711      shutdownChore(compactionChecker);
2712      shutdownChore(periodicFlusher);
2713      shutdownChore(healthCheckChore);
2714      shutdownChore(executorStatusChore);
2715      shutdownChore(storefileRefresher);
2716      shutdownChore(fsUtilizationChore);
2717      shutdownChore(slowLogTableOpsChore);
2718      // cancel the remaining scheduled chores (in case we missed out any)
2719      // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
2720      choreService.shutdown();
2721    }
2722
2723    if (this.cacheFlusher != null) {
2724      this.cacheFlusher.join();
2725    }
2726    if (this.walRoller != null) {
2727      this.walRoller.close();
2728    }
2729    if (this.compactSplitThread != null) {
2730      this.compactSplitThread.join();
2731    }
2732    if (this.executorService != null) {
2733      this.executorService.shutdown();
2734    }
2735    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2736      this.replicationSourceHandler.stopReplicationService();
2737    } else {
2738      if (this.replicationSourceHandler != null) {
2739        this.replicationSourceHandler.stopReplicationService();
2740      }
2741      if (this.replicationSinkHandler != null) {
2742        this.replicationSinkHandler.stopReplicationService();
2743      }
2744    }
2745  }
2746
2747  /**
2748   * @return Return the object that implements the replication
2749   * source executorService.
2750   */
2751  @Override
2752  public ReplicationSourceService getReplicationSourceService() {
2753    return replicationSourceHandler;
2754  }
2755
2756  /**
2757   * @return Return the object that implements the replication sink executorService.
2758   */
2759  public ReplicationSinkService getReplicationSinkService() {
2760    return replicationSinkHandler;
2761  }
2762
2763  /**
2764   * Get the current master from ZooKeeper and open the RPC connection to it.
2765   * To get a fresh connection, the current rssStub must be null.
2766   * Method will block until a master is available. You can break from this
2767   * block by requesting the server stop.
2768   *
2769   * @return master + port, or null if server has been stopped
2770   */
2771  private synchronized ServerName createRegionServerStatusStub() {
2772    // Create RS stub without refreshing the master node from ZK, use cached data
2773    return createRegionServerStatusStub(false);
2774  }
2775
2776  /**
2777   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2778   * connection, the current rssStub must be null. Method will block until a master is available.
2779   * You can break from this block by requesting the server stop.
2780   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2781   * @return master + port, or null if server has been stopped
2782   */
2783  @InterfaceAudience.Private
2784  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2785    if (rssStub != null) {
2786      return masterAddressTracker.getMasterAddress();
2787    }
2788    ServerName sn = null;
2789    long previousLogTime = 0;
2790    RegionServerStatusService.BlockingInterface intRssStub = null;
2791    LockService.BlockingInterface intLockStub = null;
2792    boolean interrupted = false;
2793    try {
2794      while (keepLooping()) {
2795        sn = this.masterAddressTracker.getMasterAddress(refresh);
2796        if (sn == null) {
2797          if (!keepLooping()) {
2798            // give up with no connection.
2799            LOG.debug("No master found and cluster is stopped; bailing out");
2800            return null;
2801          }
2802          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2803            LOG.debug("No master found; retry");
2804            previousLogTime = EnvironmentEdgeManager.currentTime();
2805          }
2806          refresh = true; // let's try pull it from ZK directly
2807          if (sleepInterrupted(200)) {
2808            interrupted = true;
2809          }
2810          continue;
2811        }
2812
2813        // If we are on the active master, use the shortcut
2814        if (this instanceof HMaster && sn.equals(getServerName())) {
2815          // Wrap the shortcut in a class providing our version to the calls where it's relevant.
2816          // Normally, RpcServer-based threadlocals do that.
2817          intRssStub = new MasterRpcServicesVersionWrapper(((HMaster)this).getMasterRpcServices());
2818          intLockStub = ((HMaster)this).getMasterRpcServices();
2819          break;
2820        }
2821        try {
2822          BlockingRpcChannel channel =
2823            this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2824              shortOperationTimeout);
2825          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2826          intLockStub = LockService.newBlockingStub(channel);
2827          break;
2828        } catch (IOException e) {
2829          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2830            e = e instanceof RemoteException ?
2831              ((RemoteException)e).unwrapRemoteException() : e;
2832            if (e instanceof ServerNotRunningYetException) {
2833              LOG.info("Master isn't available yet, retrying");
2834            } else {
2835              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2836            }
2837            previousLogTime = EnvironmentEdgeManager.currentTime();
2838          }
2839          if (sleepInterrupted(200)) {
2840            interrupted = true;
2841          }
2842        }
2843      }
2844    } finally {
2845      if (interrupted) {
2846        Thread.currentThread().interrupt();
2847      }
2848    }
2849    this.rssStub = intRssStub;
2850    this.lockStub = intLockStub;
2851    return sn;
2852  }
2853
2854  /**
2855   * @return True if we should break loop because cluster is going down or
2856   * this server has been stopped or hdfs has gone bad.
2857   */
2858  private boolean keepLooping() {
2859    return !this.stopped && isClusterUp();
2860  }
2861
2862  /*
2863   * Let the master know we're here Run initialization using parameters passed
2864   * us by the master.
2865   * @return A Map of key/value configurations we got from the Master else
2866   * null if we failed to register.
2867   * @throws IOException
2868   */
2869  private RegionServerStartupResponse reportForDuty() throws IOException {
2870    if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
2871    ServerName masterServerName = createRegionServerStatusStub(true);
2872    RegionServerStatusService.BlockingInterface rss = rssStub;
2873    if (masterServerName == null || rss == null) return null;
2874    RegionServerStartupResponse result = null;
2875    try {
2876      rpcServices.requestCount.reset();
2877      rpcServices.rpcGetRequestCount.reset();
2878      rpcServices.rpcScanRequestCount.reset();
2879      rpcServices.rpcFullScanRequestCount.reset();
2880      rpcServices.rpcMultiRequestCount.reset();
2881      rpcServices.rpcMutateRequestCount.reset();
2882      LOG.info("reportForDuty to master=" + masterServerName + " with isa="
2883        + rpcServices.isa + ", startcode=" + this.startcode);
2884      long now = EnvironmentEdgeManager.currentTime();
2885      int port = rpcServices.isa.getPort();
2886      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2887      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2888        request.setUseThisHostnameInstead(useThisHostnameInstead);
2889      }
2890      request.setPort(port);
2891      request.setServerStartCode(this.startcode);
2892      request.setServerCurrentTime(now);
2893      result = rss.regionServerStartup(null, request.build());
2894    } catch (ServiceException se) {
2895      IOException ioe = ProtobufUtil.getRemoteException(se);
2896      if (ioe instanceof ClockOutOfSyncException) {
2897        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync",
2898            ioe);
2899        // Re-throw IOE will cause RS to abort
2900        throw ioe;
2901      } else if (ioe instanceof ServerNotRunningYetException) {
2902        LOG.debug("Master is not running yet");
2903      } else {
2904        LOG.warn("error telling master we are up", se);
2905      }
2906      rssStub = null;
2907    }
2908    return result;
2909  }
2910
2911  @Override
2912  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2913    try {
2914      GetLastFlushedSequenceIdRequest req =
2915          RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2916      RegionServerStatusService.BlockingInterface rss = rssStub;
2917      if (rss == null) { // Try to connect one more time
2918        createRegionServerStatusStub();
2919        rss = rssStub;
2920        if (rss == null) {
2921          // Still no luck, we tried
2922          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2923          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2924              .build();
2925        }
2926      }
2927      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2928      return RegionStoreSequenceIds.newBuilder()
2929          .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2930          .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2931    } catch (ServiceException e) {
2932      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2933      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2934          .build();
2935    }
2936  }
2937
2938  /**
2939   * Close meta region if we carry it
2940   * @param abort Whether we're running an abort.
2941   */
2942  private void closeMetaTableRegions(final boolean abort) {
2943    HRegion meta = null;
2944    this.onlineRegionsLock.writeLock().lock();
2945    try {
2946      for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2947        RegionInfo hri = e.getValue().getRegionInfo();
2948        if (hri.isMetaRegion()) {
2949          meta = e.getValue();
2950        }
2951        if (meta != null) break;
2952      }
2953    } finally {
2954      this.onlineRegionsLock.writeLock().unlock();
2955    }
2956    if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2957  }
2958
2959  /**
2960   * Schedule closes on all user regions.
2961   * Should be safe calling multiple times because it wont' close regions
2962   * that are already closed or that are closing.
2963   * @param abort Whether we're running an abort.
2964   */
2965  private void closeUserRegions(final boolean abort) {
2966    this.onlineRegionsLock.writeLock().lock();
2967    try {
2968      for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2969        HRegion r = e.getValue();
2970        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2971          // Don't update zk with this close transition; pass false.
2972          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2973        }
2974      }
2975    } finally {
2976      this.onlineRegionsLock.writeLock().unlock();
2977    }
2978  }
2979
2980  /** @return the info server */
2981  public InfoServer getInfoServer() {
2982    return infoServer;
2983  }
2984
2985  /**
2986   * @return true if a stop has been requested.
2987   */
2988  @Override
2989  public boolean isStopped() {
2990    return this.stopped;
2991  }
2992
2993  @Override
2994  public boolean isStopping() {
2995    return this.stopping;
2996  }
2997
2998  @Override
2999  public Configuration getConfiguration() {
3000    return conf;
3001  }
3002
3003  protected Map<String, HRegion> getOnlineRegions() {
3004    return this.onlineRegions;
3005  }
3006
3007  public int getNumberOfOnlineRegions() {
3008    return this.onlineRegions.size();
3009  }
3010
3011  /**
3012   * For tests, web ui and metrics.
3013   * This method will only work if HRegionServer is in the same JVM as client;
3014   * HRegion cannot be serialized to cross an rpc.
3015   */
3016  public Collection<HRegion> getOnlineRegionsLocalContext() {
3017    Collection<HRegion> regions = this.onlineRegions.values();
3018    return Collections.unmodifiableCollection(regions);
3019  }
3020
3021  @Override
3022  public void addRegion(HRegion region) {
3023    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
3024    configurationManager.registerObserver(region);
3025  }
3026
3027  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
3028      long size) {
3029    if (!sortedRegions.containsKey(size)) {
3030      sortedRegions.put(size, new ArrayList<>());
3031    }
3032    sortedRegions.get(size).add(region);
3033  }
3034  /**
3035   * @return A new Map of online regions sorted by region off-heap size with the first entry being
3036   *   the biggest.
3037   */
3038  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
3039    // we'll sort the regions in reverse
3040    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
3041    // Copy over all regions. Regions are sorted by size with biggest first.
3042    for (HRegion region : this.onlineRegions.values()) {
3043      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
3044    }
3045    return sortedRegions;
3046  }
3047
3048  /**
3049   * @return A new Map of online regions sorted by region heap size with the first entry being the
3050   *   biggest.
3051   */
3052  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
3053    // we'll sort the regions in reverse
3054    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
3055    // Copy over all regions. Regions are sorted by size with biggest first.
3056    for (HRegion region : this.onlineRegions.values()) {
3057      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
3058    }
3059    return sortedRegions;
3060  }
3061
3062  /**
3063   * @return time stamp in millis of when this region server was started
3064   */
3065  public long getStartcode() {
3066    return this.startcode;
3067  }
3068
3069  /** @return reference to FlushRequester */
3070  @Override
3071  public FlushRequester getFlushRequester() {
3072    return this.cacheFlusher;
3073  }
3074
3075  @Override
3076  public CompactionRequester getCompactionRequestor() {
3077    return this.compactSplitThread;
3078  }
3079
3080  @Override
3081  public LeaseManager getLeaseManager() {
3082    return leaseManager;
3083  }
3084
3085  /**
3086   * @return Return the rootDir.
3087   */
3088  protected Path getDataRootDir() {
3089    return dataRootDir;
3090  }
3091
3092  @Override
3093  public FileSystem getFileSystem() {
3094    return dataFs;
3095  }
3096
3097  /**
3098   * @return {@code true} when the data file system is available, {@code false} otherwise.
3099   */
3100  boolean isDataFileSystemOk() {
3101    return this.dataFsOk;
3102  }
3103
3104  /**
3105   * @return Return the walRootDir.
3106   */
3107  public Path getWALRootDir() {
3108    return walRootDir;
3109  }
3110
3111  /**
3112   * @return Return the walFs.
3113   */
3114  public FileSystem getWALFileSystem() {
3115    return walFs;
3116  }
3117
3118  @Override
3119  public String toString() {
3120    return getServerName().toString();
3121  }
3122
3123  @Override
3124  public ZKWatcher getZooKeeper() {
3125    return zooKeeper;
3126  }
3127
3128  @Override
3129  public CoordinatedStateManager getCoordinatedStateManager() {
3130    return csm;
3131  }
3132
3133  @Override
3134  public ServerName getServerName() {
3135    return serverName;
3136  }
3137
3138  public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
3139    return this.rsHost;
3140  }
3141
3142  @Override
3143  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
3144    return this.regionsInTransitionInRS;
3145  }
3146
3147  @Override
3148  public ExecutorService getExecutorService() {
3149    return executorService;
3150  }
3151
3152  @Override
3153  public ChoreService getChoreService() {
3154    return choreService;
3155  }
3156
3157  @Override
3158  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
3159    return rsQuotaManager;
3160  }
3161
3162  //
3163  // Main program and support routines
3164  //
3165  /**
3166   * Load the replication executorService objects, if any
3167   */
3168  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
3169      FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
3170    // read in the name of the source replication class from the config file.
3171    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
3172      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
3173
3174    // read in the name of the sink replication class from the config file.
3175    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
3176      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
3177
3178    // If both the sink and the source class names are the same, then instantiate
3179    // only one object.
3180    if (sourceClassname.equals(sinkClassname)) {
3181      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
3182        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
3183      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
3184      server.sameReplicationSourceAndSink = true;
3185    } else {
3186      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
3187        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
3188      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
3189        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
3190      server.sameReplicationSourceAndSink = false;
3191    }
3192  }
3193
3194  private static <T extends ReplicationService> T newReplicationInstance(String classname,
3195      Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
3196      Path oldLogDir, WALFactory walFactory) throws IOException {
3197    final Class<? extends T> clazz;
3198    try {
3199      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
3200      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
3201    } catch (java.lang.ClassNotFoundException nfe) {
3202      throw new IOException("Could not find class for " + classname);
3203    }
3204    T service = ReflectionUtils.newInstance(clazz, conf);
3205    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
3206    return service;
3207  }
3208
3209  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
3210    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
3211    if(!this.isOnline()){
3212      return walGroupsReplicationStatus;
3213    }
3214    List<ReplicationSourceInterface> allSources = new ArrayList<>();
3215    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
3216    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
3217    for(ReplicationSourceInterface source: allSources){
3218      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
3219    }
3220    return walGroupsReplicationStatus;
3221  }
3222
3223  /**
3224   * Utility for constructing an instance of the passed HRegionServer class.
3225   */
3226  static HRegionServer constructRegionServer(
3227      final Class<? extends HRegionServer> regionServerClass,
3228      final Configuration conf
3229  ) {
3230    try {
3231      Constructor<? extends HRegionServer> c =
3232          regionServerClass.getConstructor(Configuration.class);
3233      return c.newInstance(conf);
3234    } catch (Exception e) {
3235      throw new RuntimeException("Failed construction of " + "Regionserver: "
3236          + regionServerClass.toString(), e);
3237    }
3238  }
3239
3240  /**
3241   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
3242   */
3243  public static void main(String[] args) {
3244    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
3245    VersionInfo.logVersion();
3246    Configuration conf = HBaseConfiguration.create();
3247    @SuppressWarnings("unchecked")
3248    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
3249        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
3250
3251    new HRegionServerCommandLine(regionServerClass).doMain(args);
3252  }
3253
3254  /**
3255   * Gets the online regions of the specified table.
3256   * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
3257   * Only returns <em>online</em> regions.  If a region on this table has been
3258   * closed during a disable, etc., it will not be included in the returned list.
3259   * So, the returned list may not necessarily be ALL regions in this table, its
3260   * all the ONLINE regions in the table.
3261   * @param tableName table to limit the scope of the query
3262   * @return Online regions from <code>tableName</code>
3263   */
3264  @Override
3265  public List<HRegion> getRegions(TableName tableName) {
3266     List<HRegion> tableRegions = new ArrayList<>();
3267     synchronized (this.onlineRegions) {
3268       for (HRegion region: this.onlineRegions.values()) {
3269         RegionInfo regionInfo = region.getRegionInfo();
3270         if(regionInfo.getTable().equals(tableName)) {
3271           tableRegions.add(region);
3272         }
3273       }
3274     }
3275     return tableRegions;
3276   }
3277
3278  @Override
3279  public List<HRegion> getRegions() {
3280    List<HRegion> allRegions;
3281    synchronized (this.onlineRegions) {
3282      // Return a clone copy of the onlineRegions
3283      allRegions = new ArrayList<>(onlineRegions.values());
3284    }
3285    return allRegions;
3286  }
3287
3288  /**
3289   * Gets the online tables in this RS.
3290   * This method looks at the in-memory onlineRegions.
3291   * @return all the online tables in this RS
3292   */
3293  public Set<TableName> getOnlineTables() {
3294    Set<TableName> tables = new HashSet<>();
3295    synchronized (this.onlineRegions) {
3296      for (Region region: this.onlineRegions.values()) {
3297        tables.add(region.getTableDescriptor().getTableName());
3298      }
3299    }
3300    return tables;
3301  }
3302
3303  public String[] getRegionServerCoprocessors() {
3304    TreeSet<String> coprocessors = new TreeSet<>();
3305    try {
3306      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3307    } catch (IOException exception) {
3308      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
3309          "skipping.");
3310      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3311    }
3312    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3313    for (HRegion region: regions) {
3314      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3315      try {
3316        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3317      } catch (IOException exception) {
3318        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
3319            "; skipping.");
3320        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3321      }
3322    }
3323    coprocessors.addAll(rsHost.getCoprocessors());
3324    return coprocessors.toArray(new String[0]);
3325  }
3326
3327  /**
3328   * Try to close the region, logs a warning on failure but continues.
3329   * @param region Region to close
3330   */
3331  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3332    try {
3333      if (!closeRegion(region.getEncodedName(), abort, null)) {
3334        LOG.warn("Failed to close " + region.getRegionNameAsString() +
3335            " - ignoring and continuing");
3336      }
3337    } catch (IOException e) {
3338      LOG.warn("Failed to close " + region.getRegionNameAsString() +
3339          " - ignoring and continuing", e);
3340    }
3341  }
3342
3343  /**
3344   * Close asynchronously a region, can be called from the master or internally by the regionserver
3345   * when stopping. If called from the master, the region will update the status.
3346   *
3347   * <p>
3348   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3349   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3350   * </p>
3351
3352   * <p>
3353   *   If a close was in progress, this new request will be ignored, and an exception thrown.
3354   * </p>
3355   *
3356   * @param encodedName Region to close
3357   * @param abort True if we are aborting
3358   * @param destination Where the Region is being moved too... maybe null if unknown.
3359   * @return True if closed a region.
3360   * @throws NotServingRegionException if the region is not online
3361   */
3362  protected boolean closeRegion(String encodedName, final boolean abort,
3363        final ServerName destination)
3364      throws NotServingRegionException {
3365    //Check for permissions to close.
3366    HRegion actualRegion = this.getRegion(encodedName);
3367    // Can be null if we're calling close on a region that's not online
3368    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3369      try {
3370        actualRegion.getCoprocessorHost().preClose(false);
3371      } catch (IOException exp) {
3372        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3373        return false;
3374      }
3375    }
3376
3377    // previous can come back 'null' if not in map.
3378    final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName),
3379        Boolean.FALSE);
3380
3381    if (Boolean.TRUE.equals(previous)) {
3382      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
3383          "trying to OPEN. Cancelling OPENING.");
3384      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3385        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3386        // We're going to try to do a standard close then.
3387        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
3388            " Doing a standard close now");
3389        return closeRegion(encodedName, abort, destination);
3390      }
3391      // Let's get the region from the online region list again
3392      actualRegion = this.getRegion(encodedName);
3393      if (actualRegion == null) { // If already online, we still need to close it.
3394        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3395        // The master deletes the znode when it receives this exception.
3396        throw new NotServingRegionException("The region " + encodedName +
3397          " was opening but not yet served. Opening is cancelled.");
3398      }
3399    } else if (previous == null) {
3400      LOG.info("Received CLOSE for {}", encodedName);
3401    } else if (Boolean.FALSE.equals(previous)) {
3402      LOG.info("Received CLOSE for the region: " + encodedName +
3403        ", which we are already trying to CLOSE, but not completed yet");
3404      return true;
3405    }
3406
3407    if (actualRegion == null) {
3408      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3409      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3410      // The master deletes the znode when it receives this exception.
3411      throw new NotServingRegionException("The region " + encodedName +
3412          " is not online, and is not opening.");
3413    }
3414
3415    CloseRegionHandler crh;
3416    final RegionInfo hri = actualRegion.getRegionInfo();
3417    if (hri.isMetaRegion()) {
3418      crh = new CloseMetaHandler(this, this, hri, abort);
3419    } else {
3420      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3421    }
3422    this.executorService.submit(crh);
3423    return true;
3424  }
3425
3426   /**
3427   * @return HRegion for the passed binary <code>regionName</code> or null if
3428   *         named region is not member of the online regions.
3429   */
3430  public HRegion getOnlineRegion(final byte[] regionName) {
3431    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3432    return this.onlineRegions.get(encodedRegionName);
3433  }
3434
3435  @Override
3436  public HRegion getRegion(final String encodedRegionName) {
3437    return this.onlineRegions.get(encodedRegionName);
3438  }
3439
3440
3441  @Override
3442  public boolean removeRegion(final HRegion r, ServerName destination) {
3443    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3444    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3445    if (destination != null) {
3446      long closeSeqNum = r.getMaxFlushedSeqId();
3447      if (closeSeqNum == HConstants.NO_SEQNUM) {
3448        // No edits in WAL for this region; get the sequence number when the region was opened.
3449        closeSeqNum = r.getOpenSeqNum();
3450        if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
3451      }
3452      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3453      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3454      if (selfMove) {
3455        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(r.getRegionInfo().getEncodedName()
3456          , new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3457      }
3458    }
3459    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3460    configurationManager.deregisterObserver(r);
3461    return toReturn != null;
3462  }
3463
3464  /**
3465   * Protected Utility method for safely obtaining an HRegion handle.
3466   *
3467   * @param regionName Name of online {@link HRegion} to return
3468   * @return {@link HRegion} for <code>regionName</code>
3469   */
3470  protected HRegion getRegion(final byte[] regionName)
3471      throws NotServingRegionException {
3472    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3473    return getRegionByEncodedName(regionName, encodedRegionName);
3474  }
3475
3476  public HRegion getRegionByEncodedName(String encodedRegionName)
3477      throws NotServingRegionException {
3478    return getRegionByEncodedName(null, encodedRegionName);
3479  }
3480
3481  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3482    throws NotServingRegionException {
3483    HRegion region = this.onlineRegions.get(encodedRegionName);
3484    if (region == null) {
3485      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3486      if (moveInfo != null) {
3487        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3488      }
3489      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3490      String regionNameStr = regionName == null?
3491        encodedRegionName: Bytes.toStringBinary(regionName);
3492      if (isOpening != null && isOpening) {
3493        throw new RegionOpeningException("Region " + regionNameStr +
3494          " is opening on " + this.serverName);
3495      }
3496      throw new NotServingRegionException("" + regionNameStr +
3497        " is not online on " + this.serverName);
3498    }
3499    return region;
3500  }
3501
3502  /**
3503   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3504   * IOE if it isn't already.
3505   *
3506   * @param t Throwable
3507   * @param msg Message to log in error. Can be null.
3508   * @return Throwable converted to an IOE; methods can only let out IOEs.
3509   */
3510  private Throwable cleanup(final Throwable t, final String msg) {
3511    // Don't log as error if NSRE; NSRE is 'normal' operation.
3512    if (t instanceof NotServingRegionException) {
3513      LOG.debug("NotServingRegionException; " + t.getMessage());
3514      return t;
3515    }
3516    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3517    if (msg == null) {
3518      LOG.error("", e);
3519    } else {
3520      LOG.error(msg, e);
3521    }
3522    if (!rpcServices.checkOOME(t)) {
3523      checkFileSystem();
3524    }
3525    return t;
3526  }
3527
3528  /**
3529   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3530   * @return Make <code>t</code> an IOE if it isn't already.
3531   */
3532  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3533    return (t instanceof IOException ? (IOException) t : msg == null
3534        || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3535  }
3536
3537  /**
3538   * Checks to see if the file system is still accessible. If not, sets
3539   * abortRequested and stopRequested
3540   *
3541   * @return false if file system is not available
3542   */
3543  boolean checkFileSystem() {
3544    if (this.dataFsOk && this.dataFs != null) {
3545      try {
3546        FSUtils.checkFileSystemAvailable(this.dataFs);
3547      } catch (IOException e) {
3548        abort("File System not available", e);
3549        this.dataFsOk = false;
3550      }
3551    }
3552    return this.dataFsOk;
3553  }
3554
3555  @Override
3556  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3557      List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3558    Address[] addr = new Address[favoredNodes.size()];
3559    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3560    // it is a map of region name to Address[]
3561    for (int i = 0; i < favoredNodes.size(); i++) {
3562      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(),
3563          favoredNodes.get(i).getPort());
3564    }
3565    regionFavoredNodesMap.put(encodedRegionName, addr);
3566  }
3567
3568  /**
3569   * Return the favored nodes for a region given its encoded name. Look at the
3570   * comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[]
3571   * here.
3572   * @param encodedRegionName
3573   * @return array of favored locations
3574   */
3575  @Override
3576  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3577    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3578  }
3579
3580  @Override
3581  public ServerNonceManager getNonceManager() {
3582    return this.nonceManager;
3583  }
3584
3585  private static class MovedRegionInfo {
3586    private final ServerName serverName;
3587    private final long seqNum;
3588
3589    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3590      this.serverName = serverName;
3591      this.seqNum = closeSeqNum;
3592     }
3593
3594    public ServerName getServerName() {
3595      return serverName;
3596    }
3597
3598    public long getSeqNum() {
3599      return seqNum;
3600    }
3601  }
3602
3603  /**
3604   * We need a timeout. If not there is a risk of giving a wrong information: this would double
3605   * the number of network calls instead of reducing them.
3606   */
3607  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3608
3609  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, boolean selfMove) {
3610    if (selfMove) {
3611      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3612      return;
3613    }
3614    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" +
3615        closeSeqNum);
3616    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3617  }
3618
3619  void removeFromMovedRegions(String encodedName) {
3620    movedRegionInfoCache.invalidate(encodedName);
3621  }
3622
3623  @InterfaceAudience.Private
3624  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3625    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3626  }
3627
3628  @InterfaceAudience.Private
3629  public int movedRegionCacheExpiredTime() {
3630        return TIMEOUT_REGION_MOVED;
3631  }
3632
3633  private String getMyEphemeralNodePath() {
3634    return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString());
3635  }
3636
3637  private boolean isHealthCheckerConfigured() {
3638    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3639    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3640  }
3641
3642  /**
3643   * @return the underlying {@link CompactSplit} for the servers
3644   */
3645  public CompactSplit getCompactSplitThread() {
3646    return this.compactSplitThread;
3647  }
3648
3649  CoprocessorServiceResponse execRegionServerService(
3650      @SuppressWarnings("UnusedParameters") final RpcController controller,
3651      final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3652    try {
3653      ServerRpcController serviceController = new ServerRpcController();
3654      CoprocessorServiceCall call = serviceRequest.getCall();
3655      String serviceName = call.getServiceName();
3656      Service service = coprocessorServiceHandlers.get(serviceName);
3657      if (service == null) {
3658        throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
3659            serviceName);
3660      }
3661      ServiceDescriptor serviceDesc =
3662          service.getDescriptorForType();
3663
3664      String methodName = call.getMethodName();
3665      MethodDescriptor methodDesc =
3666          serviceDesc.findMethodByName(methodName);
3667      if (methodDesc == null) {
3668        throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName +
3669            " called on executorService " + serviceName);
3670      }
3671
3672      Message request =
3673          CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3674      final Message.Builder responseBuilder =
3675          service.getResponsePrototype(methodDesc).newBuilderForType();
3676      service.callMethod(methodDesc, serviceController, request, message -> {
3677        if (message != null) {
3678          responseBuilder.mergeFrom(message);
3679        }
3680      });
3681      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3682      if (exception != null) {
3683        throw exception;
3684      }
3685      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3686    } catch (IOException ie) {
3687      throw new ServiceException(ie);
3688    }
3689  }
3690
3691  /**
3692   * May be null if this is a master which not carry table.
3693   *
3694   * @return The block cache instance used by the regionserver.
3695   */
3696  @Override
3697  public Optional<BlockCache> getBlockCache() {
3698    return Optional.ofNullable(this.blockCache);
3699  }
3700
3701  /**
3702   * May be null if this is a master which not carry table.
3703   *
3704   * @return The cache for mob files used by the regionserver.
3705   */
3706  @Override
3707  public Optional<MobFileCache> getMobFileCache() {
3708    return Optional.ofNullable(this.mobFileCache);
3709  }
3710
3711  @Override
3712  public AccessChecker getAccessChecker() {
3713    return rpcServices.getAccessChecker();
3714  }
3715
3716  @Override
3717  public ZKPermissionWatcher getZKPermissionWatcher() {
3718    return rpcServices.getZkPermissionWatcher();
3719  }
3720
3721  /**
3722   * @return : Returns the ConfigurationManager object for testing purposes.
3723   */
3724  @InterfaceAudience.Private
3725  ConfigurationManager getConfigurationManager() {
3726    return configurationManager;
3727  }
3728
3729  /**
3730   * @return Return table descriptors implementation.
3731   */
3732  @Override
3733  public TableDescriptors getTableDescriptors() {
3734    return this.tableDescriptors;
3735  }
3736
3737  /**
3738   * Reload the configuration from disk.
3739   */
3740  void updateConfiguration() {
3741    LOG.info("Reloading the configuration from disk.");
3742    // Reload the configuration from disk.
3743    conf.reloadConfiguration();
3744    configurationManager.notifyAllObservers(conf);
3745  }
3746
3747  CacheEvictionStats clearRegionBlockCache(Region region) {
3748    long evictedBlocks = 0;
3749
3750    for(Store store : region.getStores()) {
3751      for(StoreFile hFile : store.getStorefiles()) {
3752        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3753      }
3754    }
3755
3756    return CacheEvictionStats.builder()
3757        .withEvictedBlocks(evictedBlocks)
3758        .build();
3759  }
3760
3761  @Override
3762  public double getCompactionPressure() {
3763    double max = 0;
3764    for (Region region : onlineRegions.values()) {
3765      for (Store store : region.getStores()) {
3766        double normCount = store.getCompactionPressure();
3767        if (normCount > max) {
3768          max = normCount;
3769        }
3770      }
3771    }
3772    return max;
3773  }
3774
3775  @Override
3776  public HeapMemoryManager getHeapMemoryManager() {
3777    return hMemManager;
3778  }
3779
3780  public MemStoreFlusher getMemStoreFlusher() {
3781    return cacheFlusher;
3782  }
3783
3784  /**
3785   * For testing
3786   * @return whether all wal roll request finished for this regionserver
3787   */
3788  @InterfaceAudience.Private
3789  public boolean walRollRequestFinished() {
3790    return this.walRoller.walRollFinished();
3791  }
3792
3793  @Override
3794  public ThroughputController getFlushThroughputController() {
3795    return flushThroughputController;
3796  }
3797
3798  @Override
3799  public double getFlushPressure() {
3800    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3801      // return 0 during RS initialization
3802      return 0.0;
3803    }
3804    return getRegionServerAccounting().getFlushPressure();
3805  }
3806
3807  @Override
3808  public void onConfigurationChange(Configuration newConf) {
3809    ThroughputController old = this.flushThroughputController;
3810    if (old != null) {
3811      old.stop("configuration change");
3812    }
3813    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3814    try {
3815      Superusers.initialize(newConf);
3816    } catch (IOException e) {
3817      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3818    }
3819  }
3820
3821  @Override
3822  public MetricsRegionServer getMetrics() {
3823    return metricsRegionServer;
3824  }
3825
3826  @Override
3827  public SecureBulkLoadManager getSecureBulkLoadManager() {
3828    return this.secureBulkLoadManager;
3829  }
3830
3831  @Override
3832  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3833      final Abortable abort) {
3834    final LockServiceClient client =
3835        new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3836    return client.regionLock(regionInfo, description, abort);
3837  }
3838
3839  @Override
3840  public void unassign(byte[] regionName) throws IOException {
3841    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3842  }
3843
3844  @Override
3845  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3846    return this.rsSpaceQuotaManager;
3847  }
3848
3849  @Override
3850  public boolean reportFileArchivalForQuotas(TableName tableName,
3851      Collection<Entry<String, Long>> archivedFiles) {
3852    if (TEST_SKIP_REPORTING_TRANSITION) {
3853      return false;
3854    }
3855    RegionServerStatusService.BlockingInterface rss = rssStub;
3856    if (rss == null || rsSpaceQuotaManager == null) {
3857      // the current server could be stopping.
3858      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3859      return false;
3860    }
3861    try {
3862      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3863          rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3864      rss.reportFileArchival(null, request);
3865    } catch (ServiceException se) {
3866      IOException ioe = ProtobufUtil.getRemoteException(se);
3867      if (ioe instanceof PleaseHoldException) {
3868        if (LOG.isTraceEnabled()) {
3869          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3870              + " This will be retried.", ioe);
3871        }
3872        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3873        return false;
3874      }
3875      if (rssStub == rss) {
3876        rssStub = null;
3877      }
3878      // re-create the stub if we failed to report the archival
3879      createRegionServerStatusStub(true);
3880      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3881      return false;
3882    }
3883    return true;
3884  }
3885
3886  public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
3887    return eventLoopGroupConfig;
3888  }
3889
3890  @Override
3891  public Connection createConnection(Configuration conf) throws IOException {
3892    User user = UserProvider.instantiate(conf).getCurrent();
3893    return ConnectionFactory.createConnection(conf, null, user);
3894  }
3895
3896  void executeProcedure(long procId, RSProcedureCallable callable) {
3897    executorService.submit(new RSProcedureHandler(this, procId, callable));
3898  }
3899
3900  public void remoteProcedureComplete(long procId, Throwable error) {
3901    procedureResultReporter.complete(procId, error);
3902  }
3903
3904  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3905    RegionServerStatusService.BlockingInterface rss;
3906    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3907    for (;;) {
3908      rss = rssStub;
3909      if (rss != null) {
3910        break;
3911      }
3912      createRegionServerStatusStub();
3913    }
3914    try {
3915      rss.reportProcedureDone(null, request);
3916    } catch (ServiceException se) {
3917      if (rssStub == rss) {
3918        rssStub = null;
3919      }
3920      throw ProtobufUtil.getRemoteException(se);
3921    }
3922  }
3923
3924  /**
3925   * Will ignore the open/close region procedures which already submitted or executed.
3926   *
3927   * When master had unfinished open/close region procedure and restarted, new active master may
3928   * send duplicate open/close region request to regionserver. The open/close request is submitted
3929   * to a thread pool and execute. So first need a cache for submitted open/close region procedures.
3930   *
3931   * After the open/close region request executed and report region transition succeed, cache it in
3932   * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3933   * transition succeed, master will not send the open/close region request to regionserver again.
3934   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3935   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds.
3936   *
3937   * See HBASE-22404 for more details.
3938   *
3939   * @param procId the id of the open/close region procedure
3940   * @return true if the procedure can be submitted.
3941   */
3942  boolean submitRegionProcedure(long procId) {
3943    if (procId == -1) {
3944      return true;
3945    }
3946    // Ignore the region procedures which already submitted.
3947    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3948    if (previous != null) {
3949      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3950      return false;
3951    }
3952    // Ignore the region procedures which already executed.
3953    if (executedRegionProcedures.getIfPresent(procId) != null) {
3954      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3955      return false;
3956    }
3957    return true;
3958  }
3959
3960  /**
3961   * See {@link #submitRegionProcedure(long)}.
3962   * @param procId the id of the open/close region procedure
3963   */
3964  public void finishRegionProcedure(long procId) {
3965    executedRegionProcedures.put(procId, procId);
3966    submittedRegionProcedures.remove(procId);
3967  }
3968
3969  public boolean isShutDown() {
3970    return shutDown;
3971  }
3972
3973  /**
3974   * Force to terminate region server when abort timeout.
3975   */
3976  private static class SystemExitWhenAbortTimeout extends TimerTask {
3977
3978    public SystemExitWhenAbortTimeout() {
3979    }
3980
3981    @Override
3982    public void run() {
3983      LOG.warn("Aborting region server timed out, terminating forcibly" +
3984          " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3985          " Thread dump to stdout.");
3986      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3987      Runtime.getRuntime().halt(1);
3988    }
3989  }
3990
3991  @Override
3992  public AsyncClusterConnection getAsyncClusterConnection() {
3993    return asyncClusterConnection;
3994  }
3995
3996  @InterfaceAudience.Private
3997  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3998    return compactedFileDischarger;
3999  }
4000
4001  /**
4002   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
4003   * @return pause time
4004   */
4005  @InterfaceAudience.Private
4006  public long getRetryPauseTime() {
4007    return this.retryPauseTime;
4008  }
4009
4010  public MetaRegionLocationCache getMetaRegionLocationCache() {
4011    return this.metaRegionLocationCache;
4012  }
4013
4014  RegionServerAddressTracker getRegionServerAddressTracker() {
4015    return regionServerAddressTracker;
4016  }
4017}