001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
024import static org.apache.hadoop.hbase.util.DNS.RS_HOSTNAME_KEY;
025import java.io.IOException;
026import java.lang.management.MemoryType;
027import java.lang.management.MemoryUsage;
028import java.lang.reflect.Constructor;
029import java.net.BindException;
030import java.net.InetAddress;
031import java.net.InetSocketAddress;
032import java.time.Duration;
033import java.util.ArrayList;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.Comparator;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Map.Entry;
041import java.util.Objects;
042import java.util.Optional;
043import java.util.Set;
044import java.util.SortedMap;
045import java.util.Timer;
046import java.util.TimerTask;
047import java.util.TreeMap;
048import java.util.TreeSet;
049import java.util.concurrent.ConcurrentHashMap;
050import java.util.concurrent.ConcurrentMap;
051import java.util.concurrent.ConcurrentSkipListMap;
052import java.util.concurrent.TimeUnit;
053import java.util.concurrent.atomic.AtomicBoolean;
054import java.util.concurrent.locks.ReentrantReadWriteLock;
055import java.util.stream.Collectors;
056import javax.management.MalformedObjectNameException;
057import javax.servlet.http.HttpServlet;
058import org.apache.commons.lang3.RandomUtils;
059import org.apache.commons.lang3.StringUtils;
060import org.apache.commons.lang3.SystemUtils;
061import org.apache.hadoop.conf.Configuration;
062import org.apache.hadoop.fs.FileSystem;
063import org.apache.hadoop.fs.Path;
064import org.apache.hadoop.hbase.Abortable;
065import org.apache.hadoop.hbase.CacheEvictionStats;
066import org.apache.hadoop.hbase.CallQueueTooBigException;
067import org.apache.hadoop.hbase.ChoreService;
068import org.apache.hadoop.hbase.ClockOutOfSyncException;
069import org.apache.hadoop.hbase.CoordinatedStateManager;
070import org.apache.hadoop.hbase.DoNotRetryIOException;
071import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
072import org.apache.hadoop.hbase.HBaseConfiguration;
073import org.apache.hadoop.hbase.HBaseInterfaceAudience;
074import org.apache.hadoop.hbase.HConstants;
075import org.apache.hadoop.hbase.HealthCheckChore;
076import org.apache.hadoop.hbase.MetaTableAccessor;
077import org.apache.hadoop.hbase.NotServingRegionException;
078import org.apache.hadoop.hbase.PleaseHoldException;
079import org.apache.hadoop.hbase.ScheduledChore;
080import org.apache.hadoop.hbase.ServerName;
081import org.apache.hadoop.hbase.Stoppable;
082import org.apache.hadoop.hbase.TableDescriptors;
083import org.apache.hadoop.hbase.TableName;
084import org.apache.hadoop.hbase.YouAreDeadException;
085import org.apache.hadoop.hbase.ZNodeClearer;
086import org.apache.hadoop.hbase.client.ClusterConnection;
087import org.apache.hadoop.hbase.client.Connection;
088import org.apache.hadoop.hbase.client.ConnectionUtils;
089import org.apache.hadoop.hbase.client.RegionInfo;
090import org.apache.hadoop.hbase.client.RegionInfoBuilder;
091import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
092import org.apache.hadoop.hbase.client.locking.EntityLock;
093import org.apache.hadoop.hbase.client.locking.LockServiceClient;
094import org.apache.hadoop.hbase.conf.ConfigurationManager;
095import org.apache.hadoop.hbase.conf.ConfigurationObserver;
096import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
097import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
098import org.apache.hadoop.hbase.exceptions.RegionMovedException;
099import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
100import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
101import org.apache.hadoop.hbase.executor.ExecutorService;
102import org.apache.hadoop.hbase.executor.ExecutorType;
103import org.apache.hadoop.hbase.fs.HFileSystem;
104import org.apache.hadoop.hbase.http.InfoServer;
105import org.apache.hadoop.hbase.io.hfile.BlockCache;
106import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
107import org.apache.hadoop.hbase.io.hfile.HFile;
108import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
109import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
110import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
111import org.apache.hadoop.hbase.ipc.RpcClient;
112import org.apache.hadoop.hbase.ipc.RpcClientFactory;
113import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
114import org.apache.hadoop.hbase.ipc.RpcServer;
115import org.apache.hadoop.hbase.ipc.RpcServerInterface;
116import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
117import org.apache.hadoop.hbase.ipc.ServerRpcController;
118import org.apache.hadoop.hbase.log.HBaseMarkers;
119import org.apache.hadoop.hbase.master.HMaster;
120import org.apache.hadoop.hbase.master.LoadBalancer;
121import org.apache.hadoop.hbase.master.RegionState.State;
122import org.apache.hadoop.hbase.master.RegionState;
123import org.apache.hadoop.hbase.mob.MobFileCache;
124import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
125import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
126import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
127import org.apache.hadoop.hbase.quotas.QuotaUtil;
128import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
129import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
130import org.apache.hadoop.hbase.quotas.RegionSize;
131import org.apache.hadoop.hbase.quotas.RegionSizeStore;
132import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
133import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
134import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
135import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
136import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
137import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
138import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
139import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
140import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
141import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore;
142import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
143import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
144import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
145import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
146import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
147import org.apache.hadoop.hbase.security.SecurityConstants;
148import org.apache.hadoop.hbase.security.Superusers;
149import org.apache.hadoop.hbase.security.User;
150import org.apache.hadoop.hbase.security.UserProvider;
151import org.apache.hadoop.hbase.security.access.AccessChecker;
152import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
153import org.apache.hadoop.hbase.trace.SpanReceiverHost;
154import org.apache.hadoop.hbase.trace.TraceUtil;
155import org.apache.hadoop.hbase.util.Addressing;
156import org.apache.hadoop.hbase.util.Bytes;
157import org.apache.hadoop.hbase.util.CommonFSUtils;
158import org.apache.hadoop.hbase.util.CompressionTest;
159import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
160import org.apache.hadoop.hbase.util.FSTableDescriptors;
161import org.apache.hadoop.hbase.util.FSUtils;
162import org.apache.hadoop.hbase.util.FutureUtils;
163import org.apache.hadoop.hbase.util.JvmPauseMonitor;
164import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
165import org.apache.hadoop.hbase.util.Pair;
166import org.apache.hadoop.hbase.util.RetryCounter;
167import org.apache.hadoop.hbase.util.RetryCounterFactory;
168import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
169import org.apache.hadoop.hbase.util.Sleeper;
170import org.apache.hadoop.hbase.util.Threads;
171import org.apache.hadoop.hbase.util.VersionInfo;
172import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
173import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
174import org.apache.hadoop.hbase.wal.WAL;
175import org.apache.hadoop.hbase.wal.WALFactory;
176import org.apache.hadoop.hbase.wal.WALProvider;
177import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
178import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
179import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
180import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
181import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
182import org.apache.hadoop.hbase.zookeeper.ZKUtil;
183import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
184import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
185import org.apache.hadoop.ipc.RemoteException;
186import org.apache.hadoop.util.ReflectionUtils;
187import org.apache.yetus.audience.InterfaceAudience;
188import org.apache.zookeeper.KeeperException;
189import org.slf4j.Logger;
190import org.slf4j.LoggerFactory;
191import sun.misc.Signal;
192import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
193import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
194import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
195import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
196import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
197import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
198import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
199import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
200import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
201import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
202import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
203import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
204import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
234
235/**
236 * HRegionServer makes a set of HRegions available to clients. It checks in with
237 * the HMaster. There are many HRegionServers in a single HBase deployment.
238 */
239@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
240@SuppressWarnings({ "deprecation"})
241public class HRegionServer extends Thread implements
242    RegionServerServices, LastSequenceId, ConfigurationObserver {
243  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
244
245  /**
246   * For testing only!  Set to true to skip notifying region assignment to master .
247   */
248  @VisibleForTesting
249  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
250  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
251
252  /**
253   * A map from RegionName to current action in progress. Boolean value indicates:
254   * true - if open region action in progress
255   * false - if close region action in progress
256   */
257  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
258    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
259
260  /**
261   * Used to cache the open/close region procedures which already submitted.
262   * See {@link #submitRegionProcedure(long)}.
263   */
264  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
265  /**
266   * Used to cache the open/close region procedures which already executed.
267   * See {@link #submitRegionProcedure(long)}.
268   */
269  private final Cache<Long, Long> executedRegionProcedures =
270      CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
271
272  private MemStoreFlusher cacheFlusher;
273
274  private HeapMemoryManager hMemManager;
275
276  /**
277   * Cluster connection to be shared by services.
278   * Initialized at server startup and closed when server shuts down.
279   * Clients must never close it explicitly.
280   * Clients hosted by this Server should make use of this clusterConnection rather than create
281   * their own; if they create their own, there is no way for the hosting server to shutdown
282   * ongoing client RPCs.
283   */
284  protected ClusterConnection clusterConnection;
285
286  /**
287   * Go here to get table descriptors.
288   */
289  protected TableDescriptors tableDescriptors;
290
291  // Replication services. If no replication, this handler will be null.
292  private ReplicationSourceService replicationSourceHandler;
293  private ReplicationSinkService replicationSinkHandler;
294
295  // Compactions
296  public CompactSplit compactSplitThread;
297
298  /**
299   * Map of regions currently being served by this region server. Key is the
300   * encoded region name.  All access should be synchronized.
301   */
302  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
303  /**
304   * Lock for gating access to {@link #onlineRegions}.
305   * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap?
306   */
307  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
308
309  /**
310   * Map of encoded region names to the DataNode locations they should be hosted on
311   * We store the value as InetSocketAddress since this is used only in HDFS
312   * API (create() that takes favored nodes as hints for placing file blocks).
313   * We could have used ServerName here as the value class, but we'd need to
314   * convert it to InetSocketAddress at some point before the HDFS API call, and
315   * it seems a bit weird to store ServerName since ServerName refers to RegionServers
316   * and here we really mean DataNode locations.
317   */
318  private final Map<String, InetSocketAddress[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
319
320  private LeaseManager leaseManager;
321
322  // Instance of the hbase executor executorService.
323  protected ExecutorService executorService;
324
325  private volatile boolean dataFsOk;
326  private HFileSystem dataFs;
327  private HFileSystem walFs;
328
329  // Set when a report to the master comes back with a message asking us to
330  // shutdown. Also set by call to stop when debugging or running unit tests
331  // of HRegionServer in isolation.
332  private volatile boolean stopped = false;
333
334  // Go down hard. Used if file system becomes unavailable and also in
335  // debugging and unit tests.
336  private volatile boolean abortRequested;
337  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
338  // Default abort timeout is 1200 seconds for safe
339  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
340  // Will run this task when abort timeout
341  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
342
343  // A state before we go into stopped state.  At this stage we're closing user
344  // space regions.
345  private boolean stopping = false;
346  private volatile boolean killed = false;
347  private volatile boolean shutDown = false;
348
349  protected final Configuration conf;
350
351  private Path dataRootDir;
352  private Path walRootDir;
353
354  private final int threadWakeFrequency;
355  final int msgInterval;
356
357  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
358  private final int compactionCheckFrequency;
359  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
360  private final int flushCheckFrequency;
361
362  // Stub to do region server status calls against the master.
363  private volatile RegionServerStatusService.BlockingInterface rssStub;
364  private volatile LockService.BlockingInterface lockStub;
365  // RPC client. Used to make the stub above that does region server status checking.
366  private RpcClient rpcClient;
367
368  private RpcRetryingCallerFactory rpcRetryingCallerFactory;
369  private RpcControllerFactory rpcControllerFactory;
370
371  private UncaughtExceptionHandler uncaughtExceptionHandler;
372
373  // Info server. Default access so can be used by unit tests. REGIONSERVER
374  // is name of the webapp and the attribute name used stuffing this instance
375  // into web context.
376  protected InfoServer infoServer;
377  private JvmPauseMonitor pauseMonitor;
378
379  /** region server process name */
380  public static final String REGIONSERVER = "regionserver";
381
382  private MetricsRegionServer metricsRegionServer;
383  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
384  private SpanReceiverHost spanReceiverHost;
385
386  /**
387   * ChoreService used to schedule tasks that we want to run periodically
388   */
389  private ChoreService choreService;
390
391  /**
392   * Check for compactions requests.
393   */
394  private ScheduledChore compactionChecker;
395
396  /**
397   * Check for flushes
398   */
399  private ScheduledChore periodicFlusher;
400
401  private volatile WALFactory walFactory;
402
403  private LogRoller walRoller;
404
405  // A thread which calls reportProcedureDone
406  private RemoteProcedureResultReporter procedureResultReporter;
407
408  // flag set after we're done setting up server threads
409  final AtomicBoolean online = new AtomicBoolean(false);
410
411  // zookeeper connection and watcher
412  protected final ZKWatcher zooKeeper;
413
414  // master address tracker
415  private final MasterAddressTracker masterAddressTracker;
416
417  // Cluster Status Tracker
418  protected final ClusterStatusTracker clusterStatusTracker;
419
420  // Log Splitting Worker
421  private SplitLogWorker splitLogWorker;
422
423  // A sleeper that sleeps for msgInterval.
424  protected final Sleeper sleeper;
425
426  private final int operationTimeout;
427  private final int shortOperationTimeout;
428
429  private final RegionServerAccounting regionServerAccounting;
430
431  private SlowLogTableOpsChore slowLogTableOpsChore = null;
432
433  // Block cache
434  private BlockCache blockCache;
435  // The cache for mob files
436  private MobFileCache mobFileCache;
437
438  /** The health check chore. */
439  private HealthCheckChore healthCheckChore;
440
441  /** The nonce manager chore. */
442  private ScheduledChore nonceManagerChore;
443
444  private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
445
446  /**
447   * The server name the Master sees us as.  Its made from the hostname the
448   * master passes us, port, and server startcode. Gets set after registration
449   * against Master.
450   */
451  protected ServerName serverName;
452
453  /**
454   * hostname specified by hostname config
455   */
456  protected String useThisHostnameInstead;
457
458  /**
459   * HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive.
460   * Exception will be thrown if both are used.
461   */
462  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
463  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
464    "hbase.regionserver.hostname.disable.master.reversedns";
465
466  /**
467   * This servers startcode.
468   */
469  protected final long startcode;
470
471  /**
472   * Unique identifier for the cluster we are a part of.
473   */
474  protected String clusterId;
475
476  /**
477   * Chore to clean periodically the moved region list
478   */
479  private MovedRegionsCleaner movedRegionsCleaner;
480
481  // chore for refreshing store files for secondary regions
482  private StorefileRefresherChore storefileRefresher;
483
484  private RegionServerCoprocessorHost rsHost;
485
486  private RegionServerProcedureManagerHost rspmHost;
487
488  private RegionServerRpcQuotaManager rsQuotaManager;
489  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
490
491  /**
492   * Nonce manager. Nonces are used to make operations like increment and append idempotent
493   * in the case where client doesn't receive the response from a successful operation and
494   * retries. We track the successful ops for some time via a nonce sent by client and handle
495   * duplicate operations (currently, by failing them; in future we might use MVCC to return
496   * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
497   * HBASE-3787) are:
498   * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
499   *   of past records. If we don't read the records, we don't read and recover the nonces.
500   *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
501   * - There's no WAL recovery during normal region move, so nonces will not be transfered.
502   * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
503   * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
504   * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
505   * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
506   * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
507   * latest nonce in it expired. It can also be recovered during move.
508   */
509  final ServerNonceManager nonceManager;
510
511  private UserProvider userProvider;
512
513  protected final RSRpcServices rpcServices;
514
515  private CoordinatedStateManager csm;
516
517  /**
518   * Configuration manager is used to register/deregister and notify the configuration observers
519   * when the regionserver is notified that there was a change in the on disk configs.
520   */
521  protected final ConfigurationManager configurationManager;
522
523  @VisibleForTesting
524  CompactedHFilesDischarger compactedFileDischarger;
525
526  private volatile ThroughputController flushThroughputController;
527
528  private SecureBulkLoadManager secureBulkLoadManager;
529
530  private FileSystemUtilizationChore fsUtilizationChore;
531
532  private final NettyEventLoopGroupConfig eventLoopGroupConfig;
533
534  /**
535   * Provide online slow log responses from ringbuffer
536   */
537  private SlowLogRecorder slowLogRecorder;
538
539  /**
540   * True if this RegionServer is coming up in a cluster where there is no Master;
541   * means it needs to just come up and make do without a Master to talk to: e.g. in test or
542   * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
543   * purpose is as a Replication-stream sink; see HBASE-18846 for more.
544   * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ?
545   */
546  private final boolean masterless;
547  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
548
549  /**regionserver codec list **/
550  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
551
552  // A timer to shutdown the process if abort takes too long
553  private Timer abortMonitor;
554
555  /**
556   * Starts a HRegionServer at the default location.
557   * <p/>
558   * Don't start any services or managers in here in the Constructor.
559   * Defer till after we register with the Master as much as possible. See {@link #startServices}.
560   */
561  public HRegionServer(final Configuration conf) throws IOException {
562    super("RegionServer");  // thread name
563    TraceUtil.initTracer(conf);
564    try {
565      this.startcode = System.currentTimeMillis();
566      this.conf = conf;
567      this.dataFsOk = true;
568      this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
569      this.eventLoopGroupConfig = setupNetty(this.conf);
570      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
571      HFile.checkHFileVersion(this.conf);
572      checkCodecs(this.conf);
573      this.userProvider = UserProvider.instantiate(conf);
574      FSUtils.setupShortCircuitRead(this.conf);
575
576      // Disable usage of meta replicas in the regionserver
577      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
578      // Config'ed params
579      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
580      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
581      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
582      this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
583
584      this.sleeper = new Sleeper(this.msgInterval, this);
585
586      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
587      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
588
589      this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
590          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
591
592      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
593          HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
594
595      this.abortRequested = false;
596      this.stopped = false;
597
598      if (!(this instanceof HMaster)) {
599        this.slowLogRecorder = new SlowLogRecorder(this.conf);
600      }
601      rpcServices = createRpcServices();
602      useThisHostnameInstead = getUseThisHostnameInstead(conf);
603      String hostName =
604          StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName()
605              : this.useThisHostnameInstead;
606      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
607
608      rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
609      rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
610
611      // login the zookeeper client principal (if using security)
612      ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
613          HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
614      // login the server principal (if using secure Hadoop)
615      login(userProvider, hostName);
616      // init superusers and add the server principal (if using security)
617      // or process owner as default super user.
618      Superusers.initialize(conf);
619      regionServerAccounting = new RegionServerAccounting(conf);
620
621      boolean isMasterNotCarryTable =
622          this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf);
623
624      // no need to instantiate block cache and mob file cache when master not carry table
625      if (!isMasterNotCarryTable) {
626        blockCache = BlockCacheFactory.createBlockCache(conf);
627        mobFileCache = new MobFileCache(conf);
628      }
629
630      uncaughtExceptionHandler =
631        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
632
633      initializeFileSystem();
634      spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
635
636      this.configurationManager = new ConfigurationManager();
637      setupWindows(getConfiguration(), getConfigurationManager());
638
639      // Some unit tests don't need a cluster, so no zookeeper at all
640      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
641        // Open connection to zookeeper and set primary watcher
642        zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
643          rpcServices.isa.getPort(), this, canCreateBaseZNode());
644        // If no master in cluster, skip trying to track one or look for a cluster status.
645        if (!this.masterless) {
646          if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
647            DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
648            this.csm = new ZkCoordinatedStateManager(this);
649          }
650
651          masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
652          masterAddressTracker.start();
653
654          clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
655          clusterStatusTracker.start();
656        } else {
657          masterAddressTracker = null;
658          clusterStatusTracker = null;
659        }
660      } else {
661        zooKeeper = null;
662        masterAddressTracker = null;
663        clusterStatusTracker = null;
664      }
665      this.rpcServices.start(zooKeeper);
666      // This violates 'no starting stuff in Constructor' but Master depends on the below chore
667      // and executor being created and takes a different startup route. Lots of overlap between HRS
668      // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
669      // Master expects Constructor to put up web servers. Ugh.
670      // class HRS. TODO.
671      this.choreService = new ChoreService(getName(), true);
672      this.executorService = new ExecutorService(getName());
673      putUpWebUI();
674    } catch (Throwable t) {
675      // Make sure we log the exception. HRegionServer is often started via reflection and the
676      // cause of failed startup is lost.
677      LOG.error("Failed construction RegionServer", t);
678      throw t;
679    }
680  }
681
682  // HMaster should override this method to load the specific config for master
683  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
684    String hostname = conf.get(RS_HOSTNAME_KEY);
685    if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
686      if (!StringUtils.isBlank(hostname)) {
687        String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY +
688          " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
689          " to true while " + RS_HOSTNAME_KEY + " is used";
690        throw new IOException(msg);
691      } else {
692        return rpcServices.isa.getHostName();
693      }
694    } else {
695      return hostname;
696    }
697  }
698
699  /**
700   * If running on Windows, do windows-specific setup.
701   */
702  private static void setupWindows(final Configuration conf, ConfigurationManager cm) {
703    if (!SystemUtils.IS_OS_WINDOWS) {
704      Signal.handle(new Signal("HUP"), signal -> {
705        conf.reloadConfiguration();
706        cm.notifyAllObservers(conf);
707      });
708    }
709  }
710
711  private static NettyEventLoopGroupConfig setupNetty(Configuration conf) {
712    // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
713    NettyEventLoopGroupConfig nelgc =
714      new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
715    NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
716    NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
717    return nelgc;
718  }
719
720  private void initializeFileSystem() throws IOException {
721    // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
722    // checksum verification enabled, then automatically switch off hdfs checksum verification.
723    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
724    CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getWALRootDir(this.conf));
725    this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
726    this.walRootDir = CommonFSUtils.getWALRootDir(this.conf);
727    // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
728    // underlying hadoop hdfs accessors will be going against wrong filesystem
729    // (unless all is set to defaults).
730    CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getRootDir(this.conf));
731    this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
732    this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
733    this.tableDescriptors =
734        new FSTableDescriptors(this.dataFs, this.dataRootDir, !canUpdateTableDescriptor(), false);
735    if (this instanceof HMaster) {
736      FSTableDescriptors.tryUpdateMetaTableDescriptor(this.conf, this.dataFs, this.dataRootDir,
737        builder -> builder.setRegionReplication(
738          conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM)));
739    }
740  }
741
742  protected void login(UserProvider user, String host) throws IOException {
743    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
744      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
745  }
746
747  /**
748   * Wait for an active Master.
749   * See override in Master superclass for how it is used.
750   */
751  protected void waitForMasterActive() {}
752
753  protected String getProcessName() {
754    return REGIONSERVER;
755  }
756
757  protected boolean canCreateBaseZNode() {
758    return this.masterless;
759  }
760
761  protected boolean canUpdateTableDescriptor() {
762    return false;
763  }
764
765  protected RSRpcServices createRpcServices() throws IOException {
766    return new RSRpcServices(this);
767  }
768
769  protected void configureInfoServer() {
770    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
771    infoServer.setAttribute(REGIONSERVER, this);
772  }
773
774  protected Class<? extends HttpServlet> getDumpServlet() {
775    return RSDumpServlet.class;
776  }
777
778  @Override
779  public boolean registerService(com.google.protobuf.Service instance) {
780    /*
781     * No stacking of instances is allowed for a single executorService name
782     */
783    com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
784        instance.getDescriptorForType();
785    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
786    if (coprocessorServiceHandlers.containsKey(serviceName)) {
787      LOG.error("Coprocessor executorService " + serviceName
788          + " already registered, rejecting request from " + instance);
789      return false;
790    }
791
792    coprocessorServiceHandlers.put(serviceName, instance);
793    if (LOG.isDebugEnabled()) {
794      LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName);
795    }
796    return true;
797  }
798
799  protected ClusterConnection createClusterConnection() throws IOException {
800    Configuration conf = this.conf;
801    // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
802    // - Decouples RS and master life cycles. RegionServers can continue be up independent of
803    //   masters' availability.
804    // - Configuration management for region servers (cluster internal) is much simpler when adding
805    //   new masters or removing existing masters, since only clients' config needs to be updated.
806    // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
807    //   other internal connections too.
808    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
809        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
810    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
811      // Use server ZK cluster for server-issued connections, so we clone
812      // the conf and unset the client ZK related properties
813      conf = new Configuration(this.conf);
814      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
815    }
816    // Create a cluster connection that when appropriate, can short-circuit and go directly to the
817    // local server if the request is to the local server bypassing RPC. Can be used for both local
818    // and remote invocations.
819    return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
820      serverName, rpcServices, rpcServices);
821  }
822
823  /**
824   * Run test on configured codecs to make sure supporting libs are in place.
825   * @param c
826   * @throws IOException
827   */
828  private static void checkCodecs(final Configuration c) throws IOException {
829    // check to see if the codec list is available:
830    String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null);
831    if (codecs == null) return;
832    for (String codec : codecs) {
833      if (!CompressionTest.testCompression(codec)) {
834        throw new IOException("Compression codec " + codec +
835          " not supported, aborting RS construction");
836      }
837    }
838  }
839
840  public String getClusterId() {
841    return this.clusterId;
842  }
843
844  /**
845   * Setup our cluster connection if not already initialized.
846   * @throws IOException
847   */
848  protected synchronized void setupClusterConnection() throws IOException {
849    if (clusterConnection == null) {
850      clusterConnection = createClusterConnection();
851    }
852  }
853
854  /**
855   * All initialization needed before we go register with Master.<br>
856   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
857   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
858   */
859  private void preRegistrationInitialization() {
860    try {
861      initializeZooKeeper();
862      setupClusterConnection();
863      // Setup RPC client for master communication
864      this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
865          this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
866    } catch (Throwable t) {
867      // Call stop if error or process will stick around for ever since server
868      // puts up non-daemon threads.
869      this.rpcServices.stop();
870      abort("Initialization of RS failed.  Hence aborting RS.", t);
871    }
872  }
873
874  /**
875   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
876   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
877   * <p>
878   * Finally open long-living server short-circuit connection.
879   */
880  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
881    justification="cluster Id znode read would give us correct response")
882  private void initializeZooKeeper() throws IOException, InterruptedException {
883    // Nothing to do in here if no Master in the mix.
884    if (this.masterless) {
885      return;
886    }
887
888    // Create the master address tracker, register with zk, and start it.  Then
889    // block until a master is available.  No point in starting up if no master
890    // running.
891    blockAndCheckIfStopped(this.masterAddressTracker);
892
893    // Wait on cluster being up.  Master will set this flag up in zookeeper
894    // when ready.
895    blockAndCheckIfStopped(this.clusterStatusTracker);
896
897    // If we are HMaster then the cluster id should have already been set.
898    if (clusterId == null) {
899      // Retrieve clusterId
900      // Since cluster status is now up
901      // ID should have already been set by HMaster
902      try {
903        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
904        if (clusterId == null) {
905          this.abort("Cluster ID has not been set");
906        }
907        LOG.info("ClusterId : " + clusterId);
908      } catch (KeeperException e) {
909        this.abort("Failed to retrieve Cluster ID", e);
910      }
911    }
912
913    waitForMasterActive();
914    if (isStopped() || isAborted()) {
915      return; // No need for further initialization
916    }
917
918    // watch for snapshots and other procedures
919    try {
920      rspmHost = new RegionServerProcedureManagerHost();
921      rspmHost.loadProcedures(conf);
922      rspmHost.initialize(this);
923    } catch (KeeperException e) {
924      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
925    }
926  }
927
928  /**
929   * Utilty method to wait indefinitely on a znode availability while checking
930   * if the region server is shut down
931   * @param tracker znode tracker to use
932   * @throws IOException any IO exception, plus if the RS is stopped
933   * @throws InterruptedException if the waiting thread is interrupted
934   */
935  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
936      throws IOException, InterruptedException {
937    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
938      if (this.stopped) {
939        throw new IOException("Received the shutdown message while waiting.");
940      }
941    }
942  }
943
944  /**
945   * @return True if the cluster is up.
946   */
947  @Override
948  public boolean isClusterUp() {
949    return this.masterless ||
950        (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
951  }
952
953  /**
954   * The HRegionServer sticks in this loop until closed.
955   */
956  @Override
957  public void run() {
958    if (isStopped()) {
959      LOG.info("Skipping run; stopped");
960      return;
961    }
962    try {
963      // Do pre-registration initializations; zookeeper, lease threads, etc.
964      preRegistrationInitialization();
965    } catch (Throwable e) {
966      abort("Fatal exception during initialization", e);
967    }
968
969    try {
970      if (!isStopped() && !isAborted()) {
971        ShutdownHook.install(conf, dataFs, this, Thread.currentThread());
972        // Initialize the RegionServerCoprocessorHost now that our ephemeral
973        // node was created, in case any coprocessors want to use ZooKeeper
974        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
975
976        // Try and register with the Master; tell it we are here.  Break if server is stopped or
977        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
978        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
979        // come up.
980        LOG.debug("About to register with Master.");
981        RetryCounterFactory rcf =
982          new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
983        RetryCounter rc = rcf.create();
984        while (keepLooping()) {
985          RegionServerStartupResponse w = reportForDuty();
986          if (w == null) {
987            long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
988            LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
989            this.sleeper.sleep(sleepTime);
990          } else {
991            handleReportForDutyResponse(w);
992            break;
993          }
994        }
995      }
996
997      if (!isStopped() && isHealthy()) {
998        // start the snapshot handler and other procedure handlers,
999        // since the server is ready to run
1000        if (this.rspmHost != null) {
1001          this.rspmHost.start();
1002        }
1003        // Start the Quota Manager
1004        if (this.rsQuotaManager != null) {
1005          rsQuotaManager.start(getRpcServer().getScheduler());
1006        }
1007        if (this.rsSpaceQuotaManager != null) {
1008          this.rsSpaceQuotaManager.start();
1009        }
1010      }
1011
1012      // We registered with the Master.  Go into run mode.
1013      long lastMsg = System.currentTimeMillis();
1014      long oldRequestCount = -1;
1015      // The main run loop.
1016      while (!isStopped() && isHealthy()) {
1017        if (!isClusterUp()) {
1018          if (onlineRegions.isEmpty()) {
1019            stop("Exiting; cluster shutdown set and not carrying any regions");
1020          } else if (!this.stopping) {
1021            this.stopping = true;
1022            LOG.info("Closing user regions");
1023            closeUserRegions(this.abortRequested);
1024          } else {
1025            boolean allUserRegionsOffline = areAllUserRegionsOffline();
1026            if (allUserRegionsOffline) {
1027              // Set stopped if no more write requests tp meta tables
1028              // since last time we went around the loop.  Any open
1029              // meta regions will be closed on our way out.
1030              if (oldRequestCount == getWriteRequestCount()) {
1031                stop("Stopped; only catalog regions remaining online");
1032                break;
1033              }
1034              oldRequestCount = getWriteRequestCount();
1035            } else {
1036              // Make sure all regions have been closed -- some regions may
1037              // have not got it because we were splitting at the time of
1038              // the call to closeUserRegions.
1039              closeUserRegions(this.abortRequested);
1040            }
1041            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1042          }
1043        }
1044        long now = System.currentTimeMillis();
1045        if ((now - lastMsg) >= msgInterval) {
1046          tryRegionServerReport(lastMsg, now);
1047          lastMsg = System.currentTimeMillis();
1048        }
1049        if (!isStopped() && !isAborted()) {
1050          this.sleeper.sleep();
1051        }
1052      } // for
1053    } catch (Throwable t) {
1054      if (!rpcServices.checkOOME(t)) {
1055        String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
1056        abort(prefix + t.getMessage(), t);
1057      }
1058    }
1059
1060    if (this.leaseManager != null) {
1061      this.leaseManager.closeAfterLeasesExpire();
1062    }
1063    if (this.splitLogWorker != null) {
1064      splitLogWorker.stop();
1065    }
1066    if (this.infoServer != null) {
1067      LOG.info("Stopping infoServer");
1068      try {
1069        this.infoServer.stop();
1070      } catch (Exception e) {
1071        LOG.error("Failed to stop infoServer", e);
1072      }
1073    }
1074    // Send cache a shutdown.
1075    if (blockCache != null) {
1076      blockCache.shutdown();
1077    }
1078    if (mobFileCache != null) {
1079      mobFileCache.shutdown();
1080    }
1081
1082    if (movedRegionsCleaner != null) {
1083      movedRegionsCleaner.stop("Region Server stopping");
1084    }
1085
1086    // Send interrupts to wake up threads if sleeping so they notice shutdown.
1087    // TODO: Should we check they are alive? If OOME could have exited already
1088    if (this.hMemManager != null) this.hMemManager.stop();
1089    if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1090    if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1091
1092    // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1093    if (rspmHost != null) {
1094      rspmHost.stop(this.abortRequested || this.killed);
1095    }
1096
1097    if (this.killed) {
1098      // Just skip out w/o closing regions.  Used when testing.
1099    } else if (abortRequested) {
1100      if (this.dataFsOk) {
1101        closeUserRegions(abortRequested); // Don't leave any open file handles
1102      }
1103      LOG.info("aborting server " + this.serverName);
1104    } else {
1105      closeUserRegions(abortRequested);
1106      LOG.info("stopping server " + this.serverName);
1107    }
1108
1109    if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1110      try {
1111        this.clusterConnection.close();
1112      } catch (IOException e) {
1113        // Although the {@link Closeable} interface throws an {@link
1114        // IOException}, in reality, the implementation would never do that.
1115        LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
1116      }
1117    }
1118
1119    // Closing the compactSplit thread before closing meta regions
1120    if (!this.killed && containsMetaTableRegions()) {
1121      if (!abortRequested || this.dataFsOk) {
1122        if (this.compactSplitThread != null) {
1123          this.compactSplitThread.join();
1124          this.compactSplitThread = null;
1125        }
1126        closeMetaTableRegions(abortRequested);
1127      }
1128    }
1129
1130    if (!this.killed && this.dataFsOk) {
1131      waitOnAllRegionsToClose(abortRequested);
1132      LOG.info("stopping server " + this.serverName + "; all regions closed.");
1133    }
1134
1135    // Stop the quota manager
1136    if (rsQuotaManager != null) {
1137      rsQuotaManager.stop();
1138    }
1139    if (rsSpaceQuotaManager != null) {
1140      rsSpaceQuotaManager.stop();
1141      rsSpaceQuotaManager = null;
1142    }
1143
1144    // flag may be changed when closing regions throws exception.
1145    if (this.dataFsOk) {
1146      shutdownWAL(!abortRequested);
1147    }
1148
1149    // Make sure the proxy is down.
1150    if (this.rssStub != null) {
1151      this.rssStub = null;
1152    }
1153    if (this.lockStub != null) {
1154      this.lockStub = null;
1155    }
1156    if (this.rpcClient != null) {
1157      this.rpcClient.close();
1158    }
1159    if (this.leaseManager != null) {
1160      this.leaseManager.close();
1161    }
1162    if (this.pauseMonitor != null) {
1163      this.pauseMonitor.stop();
1164    }
1165
1166    if (!killed) {
1167      stopServiceThreads();
1168    }
1169
1170    if (this.rpcServices != null) {
1171      this.rpcServices.stop();
1172    }
1173
1174    try {
1175      deleteMyEphemeralNode();
1176    } catch (KeeperException.NoNodeException nn) {
1177      // pass
1178    } catch (KeeperException e) {
1179      LOG.warn("Failed deleting my ephemeral node", e);
1180    }
1181    // We may have failed to delete the znode at the previous step, but
1182    //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1183    ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1184
1185    if (this.zooKeeper != null) {
1186      this.zooKeeper.close();
1187    }
1188    this.shutDown = true;
1189    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1190  }
1191
1192  private boolean containsMetaTableRegions() {
1193    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1194  }
1195
1196  private boolean areAllUserRegionsOffline() {
1197    if (getNumberOfOnlineRegions() > 2) return false;
1198    boolean allUserRegionsOffline = true;
1199    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1200      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1201        allUserRegionsOffline = false;
1202        break;
1203      }
1204    }
1205    return allUserRegionsOffline;
1206  }
1207
1208  /**
1209   * @return Current write count for all online regions.
1210   */
1211  private long getWriteRequestCount() {
1212    long writeCount = 0;
1213    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1214      writeCount += e.getValue().getWriteRequestsCount();
1215    }
1216    return writeCount;
1217  }
1218
1219  @VisibleForTesting
1220  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1221      throws IOException {
1222    RegionServerStatusService.BlockingInterface rss = rssStub;
1223    if (rss == null) {
1224      // the current server could be stopping.
1225      return;
1226    }
1227    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1228    try {
1229      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1230      request.setServer(ProtobufUtil.toServerName(this.serverName));
1231      request.setLoad(sl);
1232      rss.regionServerReport(null, request.build());
1233    } catch (ServiceException se) {
1234      IOException ioe = ProtobufUtil.getRemoteException(se);
1235      if (ioe instanceof YouAreDeadException) {
1236        // This will be caught and handled as a fatal error in run()
1237        throw ioe;
1238      }
1239      if (rssStub == rss) {
1240        rssStub = null;
1241      }
1242      // Couldn't connect to the master, get location from zk and reconnect
1243      // Method blocks until new master is found or we are stopped
1244      createRegionServerStatusStub(true);
1245    }
1246  }
1247
1248  /**
1249   * Reports the given map of Regions and their size on the filesystem to the active Master.
1250   *
1251   * @param regionSizeStore The store containing region sizes
1252   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1253   */
1254  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1255    RegionServerStatusService.BlockingInterface rss = rssStub;
1256    if (rss == null) {
1257      // the current server could be stopping.
1258      LOG.trace("Skipping Region size report to HMaster as stub is null");
1259      return true;
1260    }
1261    try {
1262      buildReportAndSend(rss, regionSizeStore);
1263    } catch (ServiceException se) {
1264      IOException ioe = ProtobufUtil.getRemoteException(se);
1265      if (ioe instanceof PleaseHoldException) {
1266        LOG.trace("Failed to report region sizes to Master because it is initializing."
1267            + " This will be retried.", ioe);
1268        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1269        return true;
1270      }
1271      if (rssStub == rss) {
1272        rssStub = null;
1273      }
1274      createRegionServerStatusStub(true);
1275      if (ioe instanceof DoNotRetryIOException) {
1276        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1277        if (doNotRetryEx.getCause() != null) {
1278          Throwable t = doNotRetryEx.getCause();
1279          if (t instanceof UnsupportedOperationException) {
1280            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1281            return false;
1282          }
1283        }
1284      }
1285      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1286    }
1287    return true;
1288  }
1289
1290  /**
1291   * Builds the region size report and sends it to the master. Upon successful sending of the
1292   * report, the region sizes that were sent are marked as sent.
1293   *
1294   * @param rss The stub to send to the Master
1295   * @param regionSizeStore The store containing region sizes
1296   */
1297  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1298      RegionSizeStore regionSizeStore) throws ServiceException {
1299    RegionSpaceUseReportRequest request =
1300        buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1301    rss.reportRegionSpaceUse(null, request);
1302    // Record the number of size reports sent
1303    if (metricsRegionServer != null) {
1304      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1305    }
1306  }
1307
1308  /**
1309   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1310   *
1311   * @param regionSizes The size in bytes of regions
1312   * @return The corresponding protocol buffer message.
1313   */
1314  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1315    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1316    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1317      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1318    }
1319    return request.build();
1320  }
1321
1322  /**
1323   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1324   * protobuf message.
1325   *
1326   * @param regionInfo The RegionInfo
1327   * @param sizeInBytes The size in bytes of the Region
1328   * @return The protocol buffer
1329   */
1330  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1331    return RegionSpaceUse.newBuilder()
1332        .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1333        .setRegionSize(Objects.requireNonNull(sizeInBytes))
1334        .build();
1335  }
1336
1337  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1338      throws IOException {
1339    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1340    // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1341    // the wrapper to compute those numbers in one place.
1342    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1343    // Instead they should be stored in an HBase table so that external visibility into HBase is
1344    // improved; Additionally the load balancer will be able to take advantage of a more complete
1345    // history.
1346    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1347    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1348    long usedMemory = -1L;
1349    long maxMemory = -1L;
1350    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1351    if (usage != null) {
1352      usedMemory = usage.getUsed();
1353      maxMemory = usage.getMax();
1354    }
1355
1356    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1357    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1358    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1359    serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1360    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1361    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1362    Builder coprocessorBuilder = Coprocessor.newBuilder();
1363    for (String coprocessor : coprocessors) {
1364      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1365    }
1366    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1367    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1368    for (HRegion region : regions) {
1369      if (region.getCoprocessorHost() != null) {
1370        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1371        for (String regionCoprocessor : regionCoprocessors) {
1372          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1373        }
1374      }
1375      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1376      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1377          .getCoprocessors()) {
1378        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1379      }
1380    }
1381    serverLoad.setReportStartTime(reportStartTime);
1382    serverLoad.setReportEndTime(reportEndTime);
1383    if (this.infoServer != null) {
1384      serverLoad.setInfoServerPort(this.infoServer.getPort());
1385    } else {
1386      serverLoad.setInfoServerPort(-1);
1387    }
1388    MetricsUserAggregateSource userSource =
1389        metricsRegionServer.getMetricsUserAggregate().getSource();
1390    if (userSource != null) {
1391      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1392      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1393        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1394      }
1395    }
1396    // for the replicationLoad purpose. Only need to get from one executorService
1397    // either source or sink will get the same info
1398    ReplicationSourceService rsources = getReplicationSourceService();
1399
1400    if (rsources != null) {
1401      // always refresh first to get the latest value
1402      ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1403      if (rLoad != null) {
1404        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1405        for (ClusterStatusProtos.ReplicationLoadSource rLS :
1406            rLoad.getReplicationLoadSourceEntries()) {
1407          serverLoad.addReplLoadSource(rLS);
1408        }
1409
1410      }
1411    }
1412
1413    return serverLoad.build();
1414  }
1415
1416  private String getOnlineRegionsAsPrintableString() {
1417    StringBuilder sb = new StringBuilder();
1418    for (Region r: this.onlineRegions.values()) {
1419      if (sb.length() > 0) sb.append(", ");
1420      sb.append(r.getRegionInfo().getEncodedName());
1421    }
1422    return sb.toString();
1423  }
1424
1425  /**
1426   * Wait on regions close.
1427   */
1428  private void waitOnAllRegionsToClose(final boolean abort) {
1429    // Wait till all regions are closed before going out.
1430    int lastCount = -1;
1431    long previousLogTime = 0;
1432    Set<String> closedRegions = new HashSet<>();
1433    boolean interrupted = false;
1434    try {
1435      while (!onlineRegions.isEmpty()) {
1436        int count = getNumberOfOnlineRegions();
1437        // Only print a message if the count of regions has changed.
1438        if (count != lastCount) {
1439          // Log every second at most
1440          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1441            previousLogTime = System.currentTimeMillis();
1442            lastCount = count;
1443            LOG.info("Waiting on " + count + " regions to close");
1444            // Only print out regions still closing if a small number else will
1445            // swamp the log.
1446            if (count < 10 && LOG.isDebugEnabled()) {
1447              LOG.debug("Online Regions=" + this.onlineRegions);
1448            }
1449          }
1450        }
1451        // Ensure all user regions have been sent a close. Use this to
1452        // protect against the case where an open comes in after we start the
1453        // iterator of onlineRegions to close all user regions.
1454        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1455          RegionInfo hri = e.getValue().getRegionInfo();
1456          if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) &&
1457              !closedRegions.contains(hri.getEncodedName())) {
1458            closedRegions.add(hri.getEncodedName());
1459            // Don't update zk with this close transition; pass false.
1460            closeRegionIgnoreErrors(hri, abort);
1461          }
1462        }
1463        // No regions in RIT, we could stop waiting now.
1464        if (this.regionsInTransitionInRS.isEmpty()) {
1465          if (!onlineRegions.isEmpty()) {
1466            LOG.info("We were exiting though online regions are not empty," +
1467                " because some regions failed closing");
1468          }
1469          break;
1470        } else {
1471          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream().
1472            map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1473        }
1474        if (sleepInterrupted(200)) {
1475          interrupted = true;
1476        }
1477      }
1478    } finally {
1479      if (interrupted) {
1480        Thread.currentThread().interrupt();
1481      }
1482    }
1483  }
1484
1485  private static boolean sleepInterrupted(long millis) {
1486    boolean interrupted = false;
1487    try {
1488      Thread.sleep(millis);
1489    } catch (InterruptedException e) {
1490      LOG.warn("Interrupted while sleeping");
1491      interrupted = true;
1492    }
1493    return interrupted;
1494  }
1495
1496  private void shutdownWAL(final boolean close) {
1497    if (this.walFactory != null) {
1498      try {
1499        if (close) {
1500          walFactory.close();
1501        } else {
1502          walFactory.shutdown();
1503        }
1504      } catch (Throwable e) {
1505        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1506        LOG.error("Shutdown / close of WAL failed: " + e);
1507        LOG.debug("Shutdown / close exception details:", e);
1508      }
1509    }
1510  }
1511
1512  /**
1513   * get Online SlowLog Provider to add slow logs to ringbuffer
1514   *
1515   * @return Online SlowLog Provider
1516   */
1517  public SlowLogRecorder getSlowLogRecorder() {
1518    return this.slowLogRecorder;
1519  }
1520
1521  /*
1522   * Run init. Sets up wal and starts up all server threads.
1523   *
1524   * @param c Extra configuration.
1525   */
1526  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1527  throws IOException {
1528    try {
1529      boolean updateRootDir = false;
1530      for (NameStringPair e : c.getMapEntriesList()) {
1531        String key = e.getName();
1532        // The hostname the master sees us as.
1533        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1534          String hostnameFromMasterPOV = e.getValue();
1535          this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(),
1536              this.startcode);
1537          if (!StringUtils.isBlank(useThisHostnameInstead) &&
1538              !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1539            String msg = "Master passed us a different hostname to use; was=" +
1540                this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1541            LOG.error(msg);
1542            throw new IOException(msg);
1543          }
1544          if (StringUtils.isBlank(useThisHostnameInstead) &&
1545              !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1546            String msg = "Master passed us a different hostname to use; was=" +
1547                rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1548            LOG.error(msg);
1549          }
1550          continue;
1551        }
1552
1553        String value = e.getValue();
1554        if (key.equals(HConstants.HBASE_DIR)) {
1555          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1556            updateRootDir = true;
1557          }
1558        }
1559
1560        if (LOG.isDebugEnabled()) {
1561          LOG.debug("Config from master: " + key + "=" + value);
1562        }
1563        this.conf.set(key, value);
1564      }
1565      // Set our ephemeral znode up in zookeeper now we have a name.
1566      createMyEphemeralNode();
1567
1568      if (updateRootDir) {
1569        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1570        initializeFileSystem();
1571      }
1572
1573      // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1574      // config param for task trackers, but we can piggyback off of it.
1575      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1576        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1577      }
1578
1579      // Save it in a file, this will allow to see if we crash
1580      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1581
1582      // This call sets up an initialized replication and WAL. Later we start it up.
1583      setupWALAndReplication();
1584      // Init in here rather than in constructor after thread name has been set
1585      final MetricsTable metricsTable =
1586          new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1587      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1588      this.metricsRegionServer = new MetricsRegionServer(metricsRegionServerImpl,
1589          conf, metricsTable);
1590      // Now that we have a metrics source, start the pause monitor
1591      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1592      pauseMonitor.start();
1593
1594      // There is a rare case where we do NOT want services to start. Check config.
1595      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1596        startServices();
1597      }
1598      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1599      // or make sense of it.
1600      startReplicationService();
1601
1602
1603      // Set up ZK
1604      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa +
1605          ", sessionid=0x" +
1606          Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1607
1608      // Wake up anyone waiting for this server to online
1609      synchronized (online) {
1610        online.set(true);
1611        online.notifyAll();
1612      }
1613    } catch (Throwable e) {
1614      stop("Failed initialization");
1615      throw convertThrowableToIOE(cleanup(e, "Failed init"),
1616          "Region server startup failed");
1617    } finally {
1618      sleeper.skipSleepCycle();
1619    }
1620  }
1621
1622  protected void initializeMemStoreChunkCreator() {
1623    if (MemStoreLAB.isEnabled(conf)) {
1624      // MSLAB is enabled. So initialize MemStoreChunkPool
1625      // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1626      // it.
1627      Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
1628      long globalMemStoreSize = pair.getFirst();
1629      boolean offheap = this.regionServerAccounting.isOffheap();
1630      // When off heap memstore in use, take full area for chunk pool.
1631      float poolSizePercentage = offheap? 1.0F:
1632          conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
1633      float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
1634          MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
1635      int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
1636      // init the chunkCreator
1637      ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
1638        initialCountPercentage, this.hMemManager);
1639    }
1640  }
1641
1642  private void startHeapMemoryManager() {
1643    if (this.blockCache != null) {
1644      this.hMemManager =
1645          new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1646      this.hMemManager.start(getChoreService());
1647    }
1648  }
1649
1650  private void createMyEphemeralNode() throws KeeperException {
1651    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1652    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1653    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1654    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1655    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1656  }
1657
1658  private void deleteMyEphemeralNode() throws KeeperException {
1659    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1660  }
1661
1662  @Override
1663  public RegionServerAccounting getRegionServerAccounting() {
1664    return regionServerAccounting;
1665  }
1666
1667  /**
1668   * @param r Region to get RegionLoad for.
1669   * @param regionLoadBldr the RegionLoad.Builder, can be null
1670   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1671   * @return RegionLoad instance.
1672   */
1673  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1674      RegionSpecifier.Builder regionSpecifier) throws IOException {
1675    byte[] name = r.getRegionInfo().getRegionName();
1676    int stores = 0;
1677    int storefiles = 0;
1678    int storeRefCount = 0;
1679    int maxCompactedStoreFileRefCount = 0;
1680    int storeUncompressedSizeMB = 0;
1681    int storefileSizeMB = 0;
1682    int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024);
1683    long storefileIndexSizeKB = 0;
1684    int rootLevelIndexSizeKB = 0;
1685    int totalStaticIndexSizeKB = 0;
1686    int totalStaticBloomSizeKB = 0;
1687    long totalCompactingKVs = 0;
1688    long currentCompactedKVs = 0;
1689    List<HStore> storeList = r.getStores();
1690    stores += storeList.size();
1691    for (HStore store : storeList) {
1692      storefiles += store.getStorefilesCount();
1693      int currentStoreRefCount = store.getStoreRefCount();
1694      storeRefCount += currentStoreRefCount;
1695      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1696      maxCompactedStoreFileRefCount = Math.max(maxCompactedStoreFileRefCount,
1697        currentMaxCompactedStoreFileRefCount);
1698      storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1699      storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1700      //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1701      storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024;
1702      CompactionProgress progress = store.getCompactionProgress();
1703      if (progress != null) {
1704        totalCompactingKVs += progress.getTotalCompactingKVs();
1705        currentCompactedKVs += progress.currentCompactedKVs;
1706      }
1707      rootLevelIndexSizeKB += (int) (store.getStorefilesRootLevelIndexSize() / 1024);
1708      totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1709      totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1710    }
1711
1712    float dataLocality =
1713        r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1714    if (regionLoadBldr == null) {
1715      regionLoadBldr = RegionLoad.newBuilder();
1716    }
1717    if (regionSpecifier == null) {
1718      regionSpecifier = RegionSpecifier.newBuilder();
1719    }
1720    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1721    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1722    regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1723      .setStores(stores)
1724      .setStorefiles(storefiles)
1725      .setStoreRefCount(storeRefCount)
1726      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1727      .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1728      .setStorefileSizeMB(storefileSizeMB)
1729      .setMemStoreSizeMB(memstoreSizeMB)
1730      .setStorefileIndexSizeKB(storefileIndexSizeKB)
1731      .setRootIndexSizeKB(rootLevelIndexSizeKB)
1732      .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1733      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1734      .setReadRequestsCount(r.getReadRequestsCount())
1735      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1736      .setWriteRequestsCount(r.getWriteRequestsCount())
1737      .setTotalCompactingKVs(totalCompactingKVs)
1738      .setCurrentCompactedKVs(currentCompactedKVs)
1739      .setDataLocality(dataLocality)
1740      .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1741    r.setCompleteSequenceId(regionLoadBldr);
1742
1743    return regionLoadBldr.build();
1744  }
1745
1746  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1747    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1748    userLoadBldr.setUserName(user);
1749    userSource.getClientMetrics().values().stream().map(
1750      clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1751            .setHostName(clientMetrics.getHostName())
1752            .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1753            .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1754            .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1755        .forEach(userLoadBldr::addClientMetrics);
1756    return userLoadBldr.build();
1757  }
1758
1759  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1760    HRegion r = onlineRegions.get(encodedRegionName);
1761    return r != null ? createRegionLoad(r, null, null) : null;
1762  }
1763
1764  /**
1765   * Inner class that runs on a long period checking if regions need compaction.
1766   */
1767  private static class CompactionChecker extends ScheduledChore {
1768    private final HRegionServer instance;
1769    private final int majorCompactPriority;
1770    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1771    //Iteration is 1-based rather than 0-based so we don't check for compaction
1772    // immediately upon region server startup
1773    private long iteration = 1;
1774
1775    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1776      super("CompactionChecker", stopper, sleepTime);
1777      this.instance = h;
1778      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1779
1780      /* MajorCompactPriority is configurable.
1781       * If not set, the compaction will use default priority.
1782       */
1783      this.majorCompactPriority = this.instance.conf.
1784          getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1785              DEFAULT_PRIORITY);
1786    }
1787
1788    @Override
1789    protected void chore() {
1790      for (Region r : this.instance.onlineRegions.values()) {
1791        if (r == null) {
1792          continue;
1793        }
1794        HRegion hr = (HRegion) r;
1795        for (HStore s : hr.stores.values()) {
1796          try {
1797            long multiplier = s.getCompactionCheckMultiplier();
1798            assert multiplier > 0;
1799            if (iteration % multiplier != 0) {
1800              continue;
1801            }
1802            if (s.needsCompaction()) {
1803              // Queue a compaction. Will recognize if major is needed.
1804              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1805                getName() + " requests compaction");
1806            } else if (s.shouldPerformMajorCompaction()) {
1807              s.triggerMajorCompaction();
1808              if (majorCompactPriority == DEFAULT_PRIORITY ||
1809                  majorCompactPriority > hr.getCompactPriority()) {
1810                this.instance.compactSplitThread.requestCompaction(hr, s,
1811                    getName() + " requests major compaction; use default priority",
1812                    Store.NO_PRIORITY,
1813                CompactionLifeCycleTracker.DUMMY, null);
1814              } else {
1815                this.instance.compactSplitThread.requestCompaction(hr, s,
1816                    getName() + " requests major compaction; use configured priority",
1817                    this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1818              }
1819            }
1820          } catch (IOException e) {
1821            LOG.warn("Failed major compaction check on " + r, e);
1822          }
1823        }
1824      }
1825      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1826    }
1827  }
1828
1829  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1830    private final HRegionServer server;
1831    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1832    private final static int MIN_DELAY_TIME = 0; // millisec
1833    private final long rangeOfDelayMs;
1834
1835    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1836      super("MemstoreFlusherChore", server, cacheFlushInterval);
1837      this.server = server;
1838
1839      final long configuredRangeOfDelay = server.getConfiguration().getInt(
1840          "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1841      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1842    }
1843
1844    @Override
1845    protected void chore() {
1846      final StringBuilder whyFlush = new StringBuilder();
1847      for (HRegion r : this.server.onlineRegions.values()) {
1848        if (r == null) continue;
1849        if (r.shouldFlush(whyFlush)) {
1850          FlushRequester requester = server.getFlushRequester();
1851          if (requester != null) {
1852            long randomDelay = RandomUtils.nextLong(0, rangeOfDelayMs) + MIN_DELAY_TIME;
1853            //Throttle the flushes by putting a delay. If we don't throttle, and there
1854            //is a balanced write-load on the regions in a table, we might end up
1855            //overwhelming the filesystem with too many flushes at once.
1856            if (requester.requestDelayedFlush(r, randomDelay, false)) {
1857              LOG.info("{} requesting flush of {} because {} after random delay {} ms",
1858                  getName(), r.getRegionInfo().getRegionNameAsString(),  whyFlush.toString(),
1859                  randomDelay);
1860            }
1861          }
1862        }
1863      }
1864    }
1865  }
1866
1867  /**
1868   * Report the status of the server. A server is online once all the startup is
1869   * completed (setting up filesystem, starting executorService threads, etc.). This
1870   * method is designed mostly to be useful in tests.
1871   *
1872   * @return true if online, false if not.
1873   */
1874  public boolean isOnline() {
1875    return online.get();
1876  }
1877
1878  /**
1879   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1880   * be hooked up to WAL.
1881   */
1882  private void setupWALAndReplication() throws IOException {
1883    WALFactory factory = new WALFactory(conf, serverName.toString());
1884
1885    // TODO Replication make assumptions here based on the default filesystem impl
1886    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1887    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1888
1889    Path logDir = new Path(walRootDir, logName);
1890    LOG.debug("logDir={}", logDir);
1891    if (this.walFs.exists(logDir)) {
1892      throw new RegionServerRunningException(
1893          "Region server has already created directory at " + this.serverName.toString());
1894    }
1895    // Always create wal directory as now we need this when master restarts to find out the live
1896    // region servers.
1897    if (!this.walFs.mkdirs(logDir)) {
1898      throw new IOException("Can not create wal directory " + logDir);
1899    }
1900    // Instantiate replication if replication enabled. Pass it the log directories.
1901    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
1902      factory.getWALProvider());
1903    this.walFactory = factory;
1904  }
1905
1906  /**
1907   * Start up replication source and sink handlers.
1908   */
1909  private void startReplicationService() throws IOException {
1910    if (this.replicationSourceHandler == this.replicationSinkHandler &&
1911        this.replicationSourceHandler != null) {
1912      this.replicationSourceHandler.startReplicationService();
1913    } else {
1914      if (this.replicationSourceHandler != null) {
1915        this.replicationSourceHandler.startReplicationService();
1916      }
1917      if (this.replicationSinkHandler != null) {
1918        this.replicationSinkHandler.startReplicationService();
1919      }
1920    }
1921  }
1922
1923  /**
1924   * @return Master address tracker instance.
1925   */
1926  public MasterAddressTracker getMasterAddressTracker() {
1927    return this.masterAddressTracker;
1928  }
1929
1930  /**
1931   * Start maintenance Threads, Server, Worker and lease checker threads.
1932   * Start all threads we need to run. This is called after we've successfully
1933   * registered with the Master.
1934   * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1935   * get an unhandled exception. We cannot set the handler on all threads.
1936   * Server's internal Listener thread is off limits. For Server, if an OOME, it
1937   * waits a while then retries. Meantime, a flush or a compaction that tries to
1938   * run should trigger same critical condition and the shutdown will run. On
1939   * its way out, this server will shut down Server. Leases are sort of
1940   * inbetween. It has an internal thread that while it inherits from Chore, it
1941   * keeps its own internal stop mechanism so needs to be stopped by this
1942   * hosting server. Worker logs the exception and exits.
1943   */
1944  private void startServices() throws IOException {
1945    if (!isStopped() && !isAborted()) {
1946      initializeThreads();
1947    }
1948    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
1949    this.secureBulkLoadManager.start();
1950
1951    // Health checker thread.
1952    if (isHealthCheckerConfigured()) {
1953      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1954      HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1955      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1956    }
1957
1958    this.walRoller = new LogRoller(this);
1959    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1960    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1961
1962    // Create the CompactedFileDischarger chore executorService. This chore helps to
1963    // remove the compacted files that will no longer be used in reads.
1964    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1965    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1966    int cleanerInterval =
1967      conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1968    this.compactedFileDischarger =
1969      new CompactedHFilesDischarger(cleanerInterval, this, this);
1970    choreService.scheduleChore(compactedFileDischarger);
1971
1972    // Start executor services
1973    this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
1974        conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1975    this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
1976        conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1977    this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
1978        conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
1979    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1980        conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1981    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
1982        conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1983    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1984      this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1985          conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1986    }
1987    this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1988        HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
1989    // Start the threads for compacted files discharger
1990    this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
1991        conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
1992    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1993      this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1994          conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1995              conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1996    }
1997    this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
1998      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
1999    this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
2000      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1));
2001
2002    Threads
2003      .setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler);
2004    if (this.cacheFlusher != null) {
2005      this.cacheFlusher.start(uncaughtExceptionHandler);
2006    }
2007    Threads.setDaemonThreadRunning(this.procedureResultReporter,
2008      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
2009
2010    if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
2011    if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
2012    if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
2013    if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
2014    if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
2015    if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
2016    if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore);
2017    if (this.slowLogTableOpsChore != null) {
2018      choreService.scheduleChore(slowLogTableOpsChore);
2019    }
2020
2021    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2022    // an unhandled exception, it will just exit.
2023    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2024        uncaughtExceptionHandler);
2025
2026    // Create the log splitting worker and start it
2027    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2028    // quite a while inside Connection layer. The worker won't be available for other
2029    // tasks even after current task is preempted after a split task times out.
2030    Configuration sinkConf = HBaseConfiguration.create(conf);
2031    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2032        conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2033    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2034        conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2035    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2036    if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
2037      DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
2038      // SplitLogWorker needs csm. If none, don't start this.
2039      this.splitLogWorker = new SplitLogWorker(sinkConf, this,
2040          this, walFactory);
2041      splitLogWorker.start();
2042      LOG.debug("SplitLogWorker started");
2043    }
2044
2045    // Memstore services.
2046    startHeapMemoryManager();
2047    // Call it after starting HeapMemoryManager.
2048    initializeMemStoreChunkCreator();
2049  }
2050
2051  private void initializeThreads() {
2052    // Cache flushing thread.
2053    this.cacheFlusher = new MemStoreFlusher(conf, this);
2054
2055    // Compaction thread
2056    this.compactSplitThread = new CompactSplit(this);
2057
2058    // Background thread to check for compactions; needed if region has not gotten updates
2059    // in a while. It will take care of not checking too frequently on store-by-store basis.
2060    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2061    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2062    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2063
2064    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2065      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2066    if (isSlowLogTableEnabled) {
2067      // default chore duration: 10 min
2068      final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
2069      slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder);
2070    }
2071
2072    // Create the thread to clean the moved regions list
2073    movedRegionsCleaner = MovedRegionsCleaner.create(this);
2074
2075    if (this.nonceManager != null) {
2076      // Create the scheduled chore that cleans up nonces.
2077      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2078    }
2079
2080    // Setup the Quota Manager
2081    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2082    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2083
2084    if (QuotaUtil.isQuotaEnabled(conf)) {
2085      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2086    }
2087
2088
2089    boolean onlyMetaRefresh = false;
2090    int storefileRefreshPeriod = conf.getInt(
2091        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2092        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2093    if (storefileRefreshPeriod == 0) {
2094      storefileRefreshPeriod = conf.getInt(
2095          StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2096          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2097      onlyMetaRefresh = true;
2098    }
2099    if (storefileRefreshPeriod > 0) {
2100      this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
2101          onlyMetaRefresh, this, this);
2102    }
2103    registerConfigurationObservers();
2104  }
2105
2106  private void registerConfigurationObservers() {
2107    // Registering the compactSplitThread object with the ConfigurationManager.
2108    configurationManager.registerObserver(this.compactSplitThread);
2109    configurationManager.registerObserver(this.rpcServices);
2110    configurationManager.registerObserver(this);
2111  }
2112
2113  /**
2114   * Puts up the webui.
2115   */
2116  private void putUpWebUI() throws IOException {
2117    int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2118      HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2119    String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
2120
2121    if(this instanceof HMaster) {
2122      port = conf.getInt(HConstants.MASTER_INFO_PORT,
2123          HConstants.DEFAULT_MASTER_INFOPORT);
2124      addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
2125    }
2126    // -1 is for disabling info server
2127    if (port < 0) {
2128      return;
2129    }
2130
2131    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
2132      String msg =
2133          "Failed to start http info server. Address " + addr
2134              + " does not belong to this host. Correct configuration parameter: "
2135              + "hbase.regionserver.info.bindAddress";
2136      LOG.error(msg);
2137      throw new IOException(msg);
2138    }
2139    // check if auto port bind enabled
2140    boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false);
2141    while (true) {
2142      try {
2143        this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
2144        infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet());
2145        configureInfoServer();
2146        this.infoServer.start();
2147        break;
2148      } catch (BindException e) {
2149        if (!auto) {
2150          // auto bind disabled throw BindException
2151          LOG.error("Failed binding http info server to port: " + port);
2152          throw e;
2153        }
2154        // auto bind enabled, try to use another port
2155        LOG.info("Failed binding http info server to port: " + port);
2156        port++;
2157      }
2158    }
2159    port = this.infoServer.getPort();
2160    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
2161    int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
2162      HConstants.DEFAULT_MASTER_INFOPORT);
2163    conf.setInt("hbase.master.info.port.orig", masterInfoPort);
2164    conf.setInt(HConstants.MASTER_INFO_PORT, port);
2165  }
2166
2167  /*
2168   * Verify that server is healthy
2169   */
2170  private boolean isHealthy() {
2171    if (!dataFsOk) {
2172      // File system problem
2173      return false;
2174    }
2175    // Verify that all threads are alive
2176    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2177        && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2178        && (this.walRoller == null || this.walRoller.isAlive())
2179        && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2180        && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2181    if (!healthy) {
2182      stop("One or more threads are no longer alive -- stop");
2183    }
2184    return healthy;
2185  }
2186
2187  @Override
2188  public List<WAL> getWALs() {
2189    return walFactory.getWALs();
2190  }
2191
2192  @Override
2193  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2194    try {
2195      WAL wal = walFactory.getWAL(regionInfo);
2196      if (this.walRoller != null) {
2197        this.walRoller.addWAL(wal);
2198      }
2199      return wal;
2200    }catch (FailedCloseWALAfterInitializedErrorException ex) {
2201      // see HBASE-21751 for details
2202      abort("wal can not clean up after init failed", ex);
2203      throw ex;
2204    }
2205  }
2206
2207  public LogRoller getWalRoller() {
2208    return walRoller;
2209  }
2210
2211  WALFactory getWalFactory() {
2212    return walFactory;
2213  }
2214
2215  @Override
2216  public Connection getConnection() {
2217    return getClusterConnection();
2218  }
2219
2220  @Override
2221  public ClusterConnection getClusterConnection() {
2222    return this.clusterConnection;
2223  }
2224
2225  @Override
2226  public void stop(final String msg) {
2227    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2228  }
2229
2230  /**
2231   * Stops the regionserver.
2232   * @param msg Status message
2233   * @param force True if this is a regionserver abort
2234   * @param user The user executing the stop request, or null if no user is associated
2235   */
2236  public void stop(final String msg, final boolean force, final User user) {
2237    if (!this.stopped) {
2238      LOG.info("***** STOPPING region server '" + this + "' *****");
2239      if (this.rsHost != null) {
2240        // when forced via abort don't allow CPs to override
2241        try {
2242          this.rsHost.preStop(msg, user);
2243        } catch (IOException ioe) {
2244          if (!force) {
2245            LOG.warn("The region server did not stop", ioe);
2246            return;
2247          }
2248          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2249        }
2250      }
2251      this.stopped = true;
2252      LOG.info("STOPPED: " + msg);
2253      // Wakes run() if it is sleeping
2254      sleeper.skipSleepCycle();
2255    }
2256  }
2257
2258  public void waitForServerOnline(){
2259    while (!isStopped() && !isOnline()) {
2260      synchronized (online) {
2261        try {
2262          online.wait(msgInterval);
2263        } catch (InterruptedException ie) {
2264          Thread.currentThread().interrupt();
2265          break;
2266        }
2267      }
2268    }
2269  }
2270
2271  @Override
2272  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2273    HRegion r = context.getRegion();
2274    long openProcId = context.getOpenProcId();
2275    long masterSystemTime = context.getMasterSystemTime();
2276    rpcServices.checkOpen();
2277    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2278      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2279    // Do checks to see if we need to compact (references or too many files)
2280    for (HStore s : r.stores.values()) {
2281      if (s.hasReferences() || s.needsCompaction()) {
2282        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2283      }
2284    }
2285    long openSeqNum = r.getOpenSeqNum();
2286    if (openSeqNum == HConstants.NO_SEQNUM) {
2287      // If we opened a region, we should have read some sequence number from it.
2288      LOG.error(
2289        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2290      openSeqNum = 0;
2291    }
2292
2293    // Notify master
2294    if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2295      openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) {
2296      throw new IOException(
2297        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2298    }
2299
2300    triggerFlushInPrimaryRegion(r);
2301
2302    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2303  }
2304
2305  /**
2306   * Helper method for use in tests. Skip the region transition report when there's no master
2307   * around to receive it.
2308   */
2309  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2310    final TransitionCode code = context.getCode();
2311    final long openSeqNum = context.getOpenSeqNum();
2312    long masterSystemTime = context.getMasterSystemTime();
2313    final RegionInfo[] hris = context.getHris();
2314
2315    if (code == TransitionCode.OPENED) {
2316      Preconditions.checkArgument(hris != null && hris.length == 1);
2317      if (hris[0].isMetaRegion()) {
2318        try {
2319          MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
2320              hris[0].getReplicaId(), RegionState.State.OPEN);
2321        } catch (KeeperException e) {
2322          LOG.info("Failed to update meta location", e);
2323          return false;
2324        }
2325      } else {
2326        try {
2327          MetaTableAccessor.updateRegionLocation(clusterConnection,
2328              hris[0], serverName, openSeqNum, masterSystemTime);
2329        } catch (IOException e) {
2330          LOG.info("Failed to update meta", e);
2331          return false;
2332        }
2333      }
2334    }
2335    return true;
2336  }
2337
2338  private ReportRegionStateTransitionRequest createReportRegionStateTransitionRequest(
2339      final RegionStateTransitionContext context) {
2340    final TransitionCode code = context.getCode();
2341    final long openSeqNum = context.getOpenSeqNum();
2342    final RegionInfo[] hris = context.getHris();
2343    final long[] procIds = context.getProcIds();
2344
2345    ReportRegionStateTransitionRequest.Builder builder =
2346        ReportRegionStateTransitionRequest.newBuilder();
2347    builder.setServer(ProtobufUtil.toServerName(serverName));
2348    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2349    transition.setTransitionCode(code);
2350    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2351      transition.setOpenSeqNum(openSeqNum);
2352    }
2353    for (RegionInfo hri: hris) {
2354      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2355    }
2356    for (long procId: procIds) {
2357      transition.addProcId(procId);
2358    }
2359
2360    return builder.build();
2361  }
2362
2363  @Override
2364  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2365    if (TEST_SKIP_REPORTING_TRANSITION) {
2366      return skipReportingTransition(context);
2367    }
2368    final ReportRegionStateTransitionRequest request =
2369        createReportRegionStateTransitionRequest(context);
2370
2371    // Time to pause if master says 'please hold'. Make configurable if needed.
2372    final long initPauseTime = 1000;
2373    int tries = 0;
2374    long pauseTime;
2375    // Keep looping till we get an error. We want to send reports even though server is going down.
2376    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2377    // HRegionServer does down.
2378    while (this.clusterConnection != null && !this.clusterConnection.isClosed()) {
2379      RegionServerStatusService.BlockingInterface rss = rssStub;
2380      try {
2381        if (rss == null) {
2382          createRegionServerStatusStub();
2383          continue;
2384        }
2385        ReportRegionStateTransitionResponse response =
2386          rss.reportRegionStateTransition(null, request);
2387        if (response.hasErrorMessage()) {
2388          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2389          break;
2390        }
2391        // Log if we had to retry else don't log unless TRACE. We want to
2392        // know if were successful after an attempt showed in logs as failed.
2393        if (tries > 0 || LOG.isTraceEnabled()) {
2394          LOG.info("TRANSITION REPORTED " + request);
2395        }
2396        // NOTE: Return mid-method!!!
2397        return true;
2398      } catch (ServiceException se) {
2399        IOException ioe = ProtobufUtil.getRemoteException(se);
2400        boolean pause =
2401            ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException
2402                || ioe instanceof CallQueueTooBigException;
2403        if (pause) {
2404          // Do backoff else we flood the Master with requests.
2405          pauseTime = ConnectionUtils.getPauseTime(initPauseTime, tries);
2406        } else {
2407          pauseTime = initPauseTime; // Reset.
2408        }
2409        LOG.info("Failed report transition " +
2410          TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" +
2411            (pause?
2412                " after " + pauseTime + "ms delay (Master is coming online...).":
2413                " immediately."),
2414            ioe);
2415        if (pause) Threads.sleep(pauseTime);
2416        tries++;
2417        if (rssStub == rss) {
2418          rssStub = null;
2419        }
2420      }
2421    }
2422    return false;
2423  }
2424
2425  /**
2426   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2427   * block this thread. See RegionReplicaFlushHandler for details.
2428   */
2429  private void triggerFlushInPrimaryRegion(final HRegion region) {
2430    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2431      return;
2432    }
2433    if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2434        !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2435          region.conf)) {
2436      region.setReadsEnabled(true);
2437      return;
2438    }
2439
2440    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2441    // RegionReplicaFlushHandler might reset this.
2442
2443    // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2444    if (this.executorService != null) {
2445      this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
2446          rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2447    }
2448  }
2449
2450  @Override
2451  public RpcServerInterface getRpcServer() {
2452    return rpcServices.rpcServer;
2453  }
2454
2455  @VisibleForTesting
2456  public RSRpcServices getRSRpcServices() {
2457    return rpcServices;
2458  }
2459
2460  /**
2461   * Cause the server to exit without closing the regions it is serving, the log
2462   * it is using and without notifying the master. Used unit testing and on
2463   * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2464   *
2465   * @param reason
2466   *          the reason we are aborting
2467   * @param cause
2468   *          the exception that caused the abort, or null
2469   */
2470  @Override
2471  public void abort(String reason, Throwable cause) {
2472    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2473    if (cause != null) {
2474      LOG.error(HBaseMarkers.FATAL, msg, cause);
2475    } else {
2476      LOG.error(HBaseMarkers.FATAL, msg);
2477    }
2478    setAbortRequested();
2479    // HBASE-4014: show list of coprocessors that were loaded to help debug
2480    // regionserver crashes.Note that we're implicitly using
2481    // java.util.HashSet's toString() method to print the coprocessor names.
2482    LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " +
2483        CoprocessorHost.getLoadedCoprocessors());
2484    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2485    try {
2486      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2487    } catch (MalformedObjectNameException | IOException e) {
2488      LOG.warn("Failed dumping metrics", e);
2489    }
2490
2491    // Do our best to report our abort to the master, but this may not work
2492    try {
2493      if (cause != null) {
2494        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2495      }
2496      // Report to the master but only if we have already registered with the master.
2497      RegionServerStatusService.BlockingInterface rss = rssStub;
2498      if (rss != null && this.serverName != null) {
2499        ReportRSFatalErrorRequest.Builder builder =
2500          ReportRSFatalErrorRequest.newBuilder();
2501        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2502        builder.setErrorMessage(msg);
2503        rss.reportRSFatalError(null, builder.build());
2504      }
2505    } catch (Throwable t) {
2506      LOG.warn("Unable to report fatal error to master", t);
2507    }
2508
2509    scheduleAbortTimer();
2510    // shutdown should be run as the internal user
2511    stop(reason, true, null);
2512  }
2513
2514  protected final void setAbortRequested() {
2515    this.abortRequested = true;
2516  }
2517
2518  /**
2519   * @see HRegionServer#abort(String, Throwable)
2520   */
2521  public void abort(String reason) {
2522    abort(reason, null);
2523  }
2524
2525  @Override
2526  public boolean isAborted() {
2527    return this.abortRequested;
2528  }
2529
2530  /*
2531   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2532   * logs but it does close socket in case want to bring up server on old
2533   * hostname+port immediately.
2534   */
2535  @VisibleForTesting
2536  protected void kill() {
2537    this.killed = true;
2538    abort("Simulated kill");
2539  }
2540
2541  // Limits the time spent in the shutdown process.
2542  private void scheduleAbortTimer() {
2543    if (this.abortMonitor == null) {
2544      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2545      TimerTask abortTimeoutTask = null;
2546      try {
2547        Constructor<? extends TimerTask> timerTaskCtor =
2548          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2549            .asSubclass(TimerTask.class).getDeclaredConstructor();
2550        timerTaskCtor.setAccessible(true);
2551        abortTimeoutTask = timerTaskCtor.newInstance();
2552      } catch (Exception e) {
2553        LOG.warn("Initialize abort timeout task failed", e);
2554      }
2555      if (abortTimeoutTask != null) {
2556        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2557      }
2558    }
2559  }
2560
2561  /**
2562   * Wait on all threads to finish. Presumption is that all closes and stops
2563   * have already been called.
2564   */
2565  protected void stopServiceThreads() {
2566    // clean up the scheduled chores
2567    if (this.choreService != null) {
2568      choreService.cancelChore(nonceManagerChore);
2569      choreService.cancelChore(compactionChecker);
2570      choreService.cancelChore(periodicFlusher);
2571      choreService.cancelChore(healthCheckChore);
2572      choreService.cancelChore(storefileRefresher);
2573      choreService.cancelChore(movedRegionsCleaner);
2574      choreService.cancelChore(fsUtilizationChore);
2575      choreService.cancelChore(slowLogTableOpsChore);
2576      // clean up the remaining scheduled chores (in case we missed out any)
2577      choreService.shutdown();
2578    }
2579
2580    if (this.cacheFlusher != null) {
2581      this.cacheFlusher.join();
2582    }
2583
2584    if (this.spanReceiverHost != null) {
2585      this.spanReceiverHost.closeReceivers();
2586    }
2587    if (this.walRoller != null) {
2588      this.walRoller.close();
2589    }
2590    if (this.compactSplitThread != null) {
2591      this.compactSplitThread.join();
2592    }
2593    if (this.executorService != null) this.executorService.shutdown();
2594    if (this.replicationSourceHandler != null &&
2595        this.replicationSourceHandler == this.replicationSinkHandler) {
2596      this.replicationSourceHandler.stopReplicationService();
2597    } else {
2598      if (this.replicationSourceHandler != null) {
2599        this.replicationSourceHandler.stopReplicationService();
2600      }
2601      if (this.replicationSinkHandler != null) {
2602        this.replicationSinkHandler.stopReplicationService();
2603      }
2604    }
2605  }
2606
2607  /**
2608   * @return Return the object that implements the replication
2609   * source executorService.
2610   */
2611  @VisibleForTesting
2612  public ReplicationSourceService getReplicationSourceService() {
2613    return replicationSourceHandler;
2614  }
2615
2616  /**
2617   * @return Return the object that implements the replication
2618   * sink executorService.
2619   */
2620  ReplicationSinkService getReplicationSinkService() {
2621    return replicationSinkHandler;
2622  }
2623
2624  /**
2625   * Get the current master from ZooKeeper and open the RPC connection to it.
2626   * To get a fresh connection, the current rssStub must be null.
2627   * Method will block until a master is available. You can break from this
2628   * block by requesting the server stop.
2629   *
2630   * @return master + port, or null if server has been stopped
2631   */
2632  private synchronized ServerName createRegionServerStatusStub() {
2633    // Create RS stub without refreshing the master node from ZK, use cached data
2634    return createRegionServerStatusStub(false);
2635  }
2636
2637  /**
2638   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2639   * connection, the current rssStub must be null. Method will block until a master is available.
2640   * You can break from this block by requesting the server stop.
2641   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2642   * @return master + port, or null if server has been stopped
2643   */
2644  @VisibleForTesting
2645  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2646    if (rssStub != null) {
2647      return masterAddressTracker.getMasterAddress();
2648    }
2649    ServerName sn = null;
2650    long previousLogTime = 0;
2651    RegionServerStatusService.BlockingInterface intRssStub = null;
2652    LockService.BlockingInterface intLockStub = null;
2653    boolean interrupted = false;
2654    try {
2655      while (keepLooping()) {
2656        sn = this.masterAddressTracker.getMasterAddress(refresh);
2657        if (sn == null) {
2658          if (!keepLooping()) {
2659            // give up with no connection.
2660            LOG.debug("No master found and cluster is stopped; bailing out");
2661            return null;
2662          }
2663          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2664            LOG.debug("No master found; retry");
2665            previousLogTime = System.currentTimeMillis();
2666          }
2667          refresh = true; // let's try pull it from ZK directly
2668          if (sleepInterrupted(200)) {
2669            interrupted = true;
2670          }
2671          continue;
2672        }
2673
2674        // If we are on the active master, use the shortcut
2675        if (this instanceof HMaster && sn.equals(getServerName())) {
2676          intRssStub = ((HMaster)this).getMasterRpcServices();
2677          intLockStub = ((HMaster)this).getMasterRpcServices();
2678          break;
2679        }
2680        try {
2681          BlockingRpcChannel channel =
2682            this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2683              shortOperationTimeout);
2684          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2685          intLockStub = LockService.newBlockingStub(channel);
2686          break;
2687        } catch (IOException e) {
2688          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2689            e = e instanceof RemoteException ?
2690              ((RemoteException)e).unwrapRemoteException() : e;
2691            if (e instanceof ServerNotRunningYetException) {
2692              LOG.info("Master isn't available yet, retrying");
2693            } else {
2694              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2695            }
2696            previousLogTime = System.currentTimeMillis();
2697          }
2698          if (sleepInterrupted(200)) {
2699            interrupted = true;
2700          }
2701        }
2702      }
2703    } finally {
2704      if (interrupted) {
2705        Thread.currentThread().interrupt();
2706      }
2707    }
2708    this.rssStub = intRssStub;
2709    this.lockStub = intLockStub;
2710    return sn;
2711  }
2712
2713  /**
2714   * @return True if we should break loop because cluster is going down or
2715   * this server has been stopped or hdfs has gone bad.
2716   */
2717  private boolean keepLooping() {
2718    return !this.stopped && isClusterUp();
2719  }
2720
2721  /*
2722   * Let the master know we're here Run initialization using parameters passed
2723   * us by the master.
2724   * @return A Map of key/value configurations we got from the Master else
2725   * null if we failed to register.
2726   * @throws IOException
2727   */
2728  private RegionServerStartupResponse reportForDuty() throws IOException {
2729    if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
2730    ServerName masterServerName = createRegionServerStatusStub(true);
2731    RegionServerStatusService.BlockingInterface rss = rssStub;
2732    if (masterServerName == null || rss == null) return null;
2733    RegionServerStartupResponse result = null;
2734    try {
2735      rpcServices.requestCount.reset();
2736      rpcServices.rpcGetRequestCount.reset();
2737      rpcServices.rpcScanRequestCount.reset();
2738      rpcServices.rpcMultiRequestCount.reset();
2739      rpcServices.rpcMutateRequestCount.reset();
2740      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2741        + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2742      long now = EnvironmentEdgeManager.currentTime();
2743      int port = rpcServices.isa.getPort();
2744      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2745      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2746        request.setUseThisHostnameInstead(useThisHostnameInstead);
2747      }
2748      request.setPort(port);
2749      request.setServerStartCode(this.startcode);
2750      request.setServerCurrentTime(now);
2751      result = rss.regionServerStartup(null, request.build());
2752    } catch (ServiceException se) {
2753      IOException ioe = ProtobufUtil.getRemoteException(se);
2754      if (ioe instanceof ClockOutOfSyncException) {
2755        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync",
2756            ioe);
2757        // Re-throw IOE will cause RS to abort
2758        throw ioe;
2759      } else if (ioe instanceof ServerNotRunningYetException) {
2760        LOG.debug("Master is not running yet");
2761      } else {
2762        LOG.warn("error telling master we are up", se);
2763      }
2764      rssStub = null;
2765    }
2766    return result;
2767  }
2768
2769  @Override
2770  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2771    try {
2772      GetLastFlushedSequenceIdRequest req =
2773          RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2774      RegionServerStatusService.BlockingInterface rss = rssStub;
2775      if (rss == null) { // Try to connect one more time
2776        createRegionServerStatusStub();
2777        rss = rssStub;
2778        if (rss == null) {
2779          // Still no luck, we tried
2780          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2781          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2782              .build();
2783        }
2784      }
2785      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2786      return RegionStoreSequenceIds.newBuilder()
2787          .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2788          .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2789    } catch (ServiceException e) {
2790      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2791      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2792          .build();
2793    }
2794  }
2795
2796  /**
2797   * Close meta region if we carry it
2798   * @param abort Whether we're running an abort.
2799   */
2800  private void closeMetaTableRegions(final boolean abort) {
2801    HRegion meta = null;
2802    this.onlineRegionsLock.writeLock().lock();
2803    try {
2804      for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2805        RegionInfo hri = e.getValue().getRegionInfo();
2806        if (hri.isMetaRegion()) {
2807          meta = e.getValue();
2808        }
2809        if (meta != null) break;
2810      }
2811    } finally {
2812      this.onlineRegionsLock.writeLock().unlock();
2813    }
2814    if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2815  }
2816
2817  /**
2818   * Schedule closes on all user regions.
2819   * Should be safe calling multiple times because it wont' close regions
2820   * that are already closed or that are closing.
2821   * @param abort Whether we're running an abort.
2822   */
2823  private void closeUserRegions(final boolean abort) {
2824    this.onlineRegionsLock.writeLock().lock();
2825    try {
2826      for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2827        HRegion r = e.getValue();
2828        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2829          // Don't update zk with this close transition; pass false.
2830          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2831        }
2832      }
2833    } finally {
2834      this.onlineRegionsLock.writeLock().unlock();
2835    }
2836  }
2837
2838  /** @return the info server */
2839  public InfoServer getInfoServer() {
2840    return infoServer;
2841  }
2842
2843  /**
2844   * @return true if a stop has been requested.
2845   */
2846  @Override
2847  public boolean isStopped() {
2848    return this.stopped;
2849  }
2850
2851  @Override
2852  public boolean isStopping() {
2853    return this.stopping;
2854  }
2855
2856  @Override
2857  public Configuration getConfiguration() {
2858    return conf;
2859  }
2860
2861  protected Map<String, HRegion> getOnlineRegions() {
2862    return this.onlineRegions;
2863  }
2864
2865  public int getNumberOfOnlineRegions() {
2866    return this.onlineRegions.size();
2867  }
2868
2869  /**
2870   * For tests, web ui and metrics.
2871   * This method will only work if HRegionServer is in the same JVM as client;
2872   * HRegion cannot be serialized to cross an rpc.
2873   */
2874  public Collection<HRegion> getOnlineRegionsLocalContext() {
2875    Collection<HRegion> regions = this.onlineRegions.values();
2876    return Collections.unmodifiableCollection(regions);
2877  }
2878
2879  @Override
2880  public void addRegion(HRegion region) {
2881    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2882    configurationManager.registerObserver(region);
2883  }
2884
2885  /**
2886   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2887   *   the biggest.  If two regions are the same size, then the last one found wins; i.e. this
2888   *   method may NOT return all regions.
2889   */
2890  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2891    // we'll sort the regions in reverse
2892    SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2893    // Copy over all regions. Regions are sorted by size with biggest first.
2894    for (HRegion region : this.onlineRegions.values()) {
2895      sortedRegions.put(region.getMemStoreOffHeapSize(), region);
2896    }
2897    return sortedRegions;
2898  }
2899
2900  /**
2901   * @return A new Map of online regions sorted by region heap size with the first entry being the
2902   *   biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2903   *   may NOT return all regions.
2904   */
2905  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2906    // we'll sort the regions in reverse
2907    SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2908    // Copy over all regions. Regions are sorted by size with biggest first.
2909    for (HRegion region : this.onlineRegions.values()) {
2910      sortedRegions.put(region.getMemStoreHeapSize(), region);
2911    }
2912    return sortedRegions;
2913  }
2914
2915  /**
2916   * @return time stamp in millis of when this region server was started
2917   */
2918  public long getStartcode() {
2919    return this.startcode;
2920  }
2921
2922  /** @return reference to FlushRequester */
2923  @Override
2924  public FlushRequester getFlushRequester() {
2925    return this.cacheFlusher;
2926  }
2927
2928  @Override
2929  public CompactionRequester getCompactionRequestor() {
2930    return this.compactSplitThread;
2931  }
2932
2933  @Override
2934  public LeaseManager getLeaseManager() {
2935    return leaseManager;
2936  }
2937
2938  /**
2939   * @return Return the rootDir.
2940   */
2941  protected Path getDataRootDir() {
2942    return dataRootDir;
2943  }
2944
2945  @Override
2946  public FileSystem getFileSystem() {
2947    return dataFs;
2948  }
2949
2950  /**
2951   * @return {@code true} when the data file system is available, {@code false} otherwise.
2952   */
2953  boolean isDataFileSystemOk() {
2954    return this.dataFsOk;
2955  }
2956
2957  /**
2958   * @return Return the walRootDir.
2959   */
2960  public Path getWALRootDir() {
2961    return walRootDir;
2962  }
2963
2964  /**
2965   * @return Return the walFs.
2966   */
2967  public FileSystem getWALFileSystem() {
2968    return walFs;
2969  }
2970
2971  @Override
2972  public String toString() {
2973    return getServerName().toString();
2974  }
2975
2976  @Override
2977  public ZKWatcher getZooKeeper() {
2978    return zooKeeper;
2979  }
2980
2981  @Override
2982  public CoordinatedStateManager getCoordinatedStateManager() {
2983    return csm;
2984  }
2985
2986  @Override
2987  public ServerName getServerName() {
2988    return serverName;
2989  }
2990
2991  public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2992    return this.rsHost;
2993  }
2994
2995  @Override
2996  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2997    return this.regionsInTransitionInRS;
2998  }
2999
3000  @Override
3001  public ExecutorService getExecutorService() {
3002    return executorService;
3003  }
3004
3005  @Override
3006  public ChoreService getChoreService() {
3007    return choreService;
3008  }
3009
3010  @Override
3011  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
3012    return rsQuotaManager;
3013  }
3014
3015  //
3016  // Main program and support routines
3017  //
3018  /**
3019   * Load the replication executorService objects, if any
3020   */
3021  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
3022      FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
3023    if ((server instanceof HMaster) &&
3024      (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
3025      return;
3026    }
3027
3028    // read in the name of the source replication class from the config file.
3029    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
3030      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
3031
3032    // read in the name of the sink replication class from the config file.
3033    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
3034      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
3035
3036    // If both the sink and the source class names are the same, then instantiate
3037    // only one object.
3038    if (sourceClassname.equals(sinkClassname)) {
3039      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
3040        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
3041      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
3042    } else {
3043      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
3044        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
3045      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
3046        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
3047    }
3048  }
3049
3050  private static <T extends ReplicationService> T newReplicationInstance(String classname,
3051      Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
3052      Path oldLogDir, WALProvider walProvider) throws IOException {
3053    final Class<? extends T> clazz;
3054    try {
3055      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
3056      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
3057    } catch (java.lang.ClassNotFoundException nfe) {
3058      throw new IOException("Could not find class for " + classname);
3059    }
3060    T service = ReflectionUtils.newInstance(clazz, conf);
3061    service.initialize(server, walFs, logDir, oldLogDir, walProvider);
3062    return service;
3063  }
3064
3065  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
3066    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
3067    if(!this.isOnline()){
3068      return walGroupsReplicationStatus;
3069    }
3070    List<ReplicationSourceInterface> allSources = new ArrayList<>();
3071    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
3072    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
3073    for(ReplicationSourceInterface source: allSources){
3074      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
3075    }
3076    return walGroupsReplicationStatus;
3077  }
3078
3079  /**
3080   * Utility for constructing an instance of the passed HRegionServer class.
3081   */
3082  static HRegionServer constructRegionServer(
3083      final Class<? extends HRegionServer> regionServerClass,
3084      final Configuration conf
3085  ) {
3086    try {
3087      Constructor<? extends HRegionServer> c =
3088          regionServerClass.getConstructor(Configuration.class);
3089      return c.newInstance(conf);
3090    } catch (Exception e) {
3091      throw new RuntimeException("Failed construction of " + "Regionserver: "
3092          + regionServerClass.toString(), e);
3093    }
3094  }
3095
3096  /**
3097   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
3098   */
3099  public static void main(String[] args) {
3100    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
3101    VersionInfo.logVersion();
3102    Configuration conf = HBaseConfiguration.create();
3103    @SuppressWarnings("unchecked")
3104    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
3105        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
3106
3107    new HRegionServerCommandLine(regionServerClass).doMain(args);
3108  }
3109
3110  /**
3111   * Gets the online regions of the specified table.
3112   * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
3113   * Only returns <em>online</em> regions.  If a region on this table has been
3114   * closed during a disable, etc., it will not be included in the returned list.
3115   * So, the returned list may not necessarily be ALL regions in this table, its
3116   * all the ONLINE regions in the table.
3117   * @param tableName table to limit the scope of the query
3118   * @return Online regions from <code>tableName</code>
3119   */
3120  @Override
3121  public List<HRegion> getRegions(TableName tableName) {
3122     List<HRegion> tableRegions = new ArrayList<>();
3123     synchronized (this.onlineRegions) {
3124       for (HRegion region: this.onlineRegions.values()) {
3125         RegionInfo regionInfo = region.getRegionInfo();
3126         if(regionInfo.getTable().equals(tableName)) {
3127           tableRegions.add(region);
3128         }
3129       }
3130     }
3131     return tableRegions;
3132   }
3133
3134  @Override
3135  public List<HRegion> getRegions() {
3136    List<HRegion> allRegions;
3137    synchronized (this.onlineRegions) {
3138      // Return a clone copy of the onlineRegions
3139      allRegions = new ArrayList<>(onlineRegions.values());
3140    }
3141    return allRegions;
3142  }
3143
3144  /**
3145   * Gets the online tables in this RS.
3146   * This method looks at the in-memory onlineRegions.
3147   * @return all the online tables in this RS
3148   */
3149  public Set<TableName> getOnlineTables() {
3150    Set<TableName> tables = new HashSet<>();
3151    synchronized (this.onlineRegions) {
3152      for (Region region: this.onlineRegions.values()) {
3153        tables.add(region.getTableDescriptor().getTableName());
3154      }
3155    }
3156    return tables;
3157  }
3158
3159  public String[] getRegionServerCoprocessors() {
3160    TreeSet<String> coprocessors = new TreeSet<>();
3161    try {
3162      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3163    } catch (IOException exception) {
3164      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
3165          "skipping.");
3166      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3167    }
3168    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3169    for (HRegion region: regions) {
3170      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3171      try {
3172        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3173      } catch (IOException exception) {
3174        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
3175            "; skipping.");
3176        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3177      }
3178    }
3179    coprocessors.addAll(rsHost.getCoprocessors());
3180    return coprocessors.toArray(new String[0]);
3181  }
3182
3183  /**
3184   * Try to close the region, logs a warning on failure but continues.
3185   * @param region Region to close
3186   */
3187  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3188    try {
3189      if (!closeRegion(region.getEncodedName(), abort, null)) {
3190        LOG.warn("Failed to close " + region.getRegionNameAsString() +
3191            " - ignoring and continuing");
3192      }
3193    } catch (IOException e) {
3194      LOG.warn("Failed to close " + region.getRegionNameAsString() +
3195          " - ignoring and continuing", e);
3196    }
3197  }
3198
3199  /**
3200   * Close asynchronously a region, can be called from the master or internally by the regionserver
3201   * when stopping. If called from the master, the region will update the status.
3202   *
3203   * <p>
3204   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3205   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3206   * </p>
3207
3208   * <p>
3209   *   If a close was in progress, this new request will be ignored, and an exception thrown.
3210   * </p>
3211   *
3212   * @param encodedName Region to close
3213   * @param abort True if we are aborting
3214   * @param destination Where the Region is being moved too... maybe null if unknown.
3215   * @return True if closed a region.
3216   * @throws NotServingRegionException if the region is not online
3217   */
3218  protected boolean closeRegion(String encodedName, final boolean abort,
3219        final ServerName destination)
3220      throws NotServingRegionException {
3221    //Check for permissions to close.
3222    HRegion actualRegion = this.getRegion(encodedName);
3223    // Can be null if we're calling close on a region that's not online
3224    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3225      try {
3226        actualRegion.getCoprocessorHost().preClose(false);
3227      } catch (IOException exp) {
3228        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3229        return false;
3230      }
3231    }
3232
3233    // previous can come back 'null' if not in map.
3234    final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName),
3235        Boolean.FALSE);
3236
3237    if (Boolean.TRUE.equals(previous)) {
3238      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
3239          "trying to OPEN. Cancelling OPENING.");
3240      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3241        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3242        // We're going to try to do a standard close then.
3243        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
3244            " Doing a standard close now");
3245        return closeRegion(encodedName, abort, destination);
3246      }
3247      // Let's get the region from the online region list again
3248      actualRegion = this.getRegion(encodedName);
3249      if (actualRegion == null) { // If already online, we still need to close it.
3250        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3251        // The master deletes the znode when it receives this exception.
3252        throw new NotServingRegionException("The region " + encodedName +
3253          " was opening but not yet served. Opening is cancelled.");
3254      }
3255    } else if (previous == null) {
3256      LOG.info("Received CLOSE for {}", encodedName);
3257    } else if (Boolean.FALSE.equals(previous)) {
3258      LOG.info("Received CLOSE for the region: " + encodedName +
3259        ", which we are already trying to CLOSE, but not completed yet");
3260      return true;
3261    }
3262
3263    if (actualRegion == null) {
3264      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3265      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3266      // The master deletes the znode when it receives this exception.
3267      throw new NotServingRegionException("The region " + encodedName +
3268          " is not online, and is not opening.");
3269    }
3270
3271    CloseRegionHandler crh;
3272    final RegionInfo hri = actualRegion.getRegionInfo();
3273    if (hri.isMetaRegion()) {
3274      crh = new CloseMetaHandler(this, this, hri, abort);
3275    } else {
3276      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3277    }
3278    this.executorService.submit(crh);
3279    return true;
3280  }
3281
3282   /**
3283   * @return HRegion for the passed binary <code>regionName</code> or null if
3284   *         named region is not member of the online regions.
3285   */
3286  public HRegion getOnlineRegion(final byte[] regionName) {
3287    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3288    return this.onlineRegions.get(encodedRegionName);
3289  }
3290
3291  @Override
3292  public HRegion getRegion(final String encodedRegionName) {
3293    return this.onlineRegions.get(encodedRegionName);
3294  }
3295
3296
3297  @Override
3298  public boolean removeRegion(final HRegion r, ServerName destination) {
3299    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3300    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3301    if (destination != null) {
3302      long closeSeqNum = r.getMaxFlushedSeqId();
3303      if (closeSeqNum == HConstants.NO_SEQNUM) {
3304        // No edits in WAL for this region; get the sequence number when the region was opened.
3305        closeSeqNum = r.getOpenSeqNum();
3306        if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
3307      }
3308      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3309      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3310      if (selfMove) {
3311        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(r.getRegionInfo().getEncodedName()
3312          , new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3313      }
3314    }
3315    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3316    return toReturn != null;
3317  }
3318
3319  /**
3320   * Protected Utility method for safely obtaining an HRegion handle.
3321   *
3322   * @param regionName Name of online {@link HRegion} to return
3323   * @return {@link HRegion} for <code>regionName</code>
3324   */
3325  protected HRegion getRegion(final byte[] regionName)
3326      throws NotServingRegionException {
3327    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3328    return getRegionByEncodedName(regionName, encodedRegionName);
3329  }
3330
3331  public HRegion getRegionByEncodedName(String encodedRegionName)
3332      throws NotServingRegionException {
3333    return getRegionByEncodedName(null, encodedRegionName);
3334  }
3335
3336  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3337    throws NotServingRegionException {
3338    HRegion region = this.onlineRegions.get(encodedRegionName);
3339    if (region == null) {
3340      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3341      if (moveInfo != null) {
3342        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3343      }
3344      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3345      String regionNameStr = regionName == null?
3346        encodedRegionName: Bytes.toStringBinary(regionName);
3347      if (isOpening != null && isOpening) {
3348        throw new RegionOpeningException("Region " + regionNameStr +
3349          " is opening on " + this.serverName);
3350      }
3351      throw new NotServingRegionException("" + regionNameStr +
3352        " is not online on " + this.serverName);
3353    }
3354    return region;
3355  }
3356
3357  /**
3358   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3359   * IOE if it isn't already.
3360   *
3361   * @param t Throwable
3362   * @param msg Message to log in error. Can be null.
3363   * @return Throwable converted to an IOE; methods can only let out IOEs.
3364   */
3365  private Throwable cleanup(final Throwable t, final String msg) {
3366    // Don't log as error if NSRE; NSRE is 'normal' operation.
3367    if (t instanceof NotServingRegionException) {
3368      LOG.debug("NotServingRegionException; " + t.getMessage());
3369      return t;
3370    }
3371    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3372    if (msg == null) {
3373      LOG.error("", e);
3374    } else {
3375      LOG.error(msg, e);
3376    }
3377    if (!rpcServices.checkOOME(t)) {
3378      checkFileSystem();
3379    }
3380    return t;
3381  }
3382
3383  /**
3384   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3385   * @return Make <code>t</code> an IOE if it isn't already.
3386   */
3387  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3388    return (t instanceof IOException ? (IOException) t : msg == null
3389        || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3390  }
3391
3392  /**
3393   * Checks to see if the file system is still accessible. If not, sets
3394   * abortRequested and stopRequested
3395   *
3396   * @return false if file system is not available
3397   */
3398  boolean checkFileSystem() {
3399    if (this.dataFsOk && this.dataFs != null) {
3400      try {
3401        FSUtils.checkFileSystemAvailable(this.dataFs);
3402      } catch (IOException e) {
3403        abort("File System not available", e);
3404        this.dataFsOk = false;
3405      }
3406    }
3407    return this.dataFsOk;
3408  }
3409
3410  @Override
3411  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3412      List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3413    InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3414    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3415    // it is a map of region name to InetSocketAddress[]
3416    for (int i = 0; i < favoredNodes.size(); i++) {
3417      addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3418          favoredNodes.get(i).getPort());
3419    }
3420    regionFavoredNodesMap.put(encodedRegionName, addr);
3421  }
3422
3423  /**
3424   * Return the favored nodes for a region given its encoded name. Look at the
3425   * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3426   *
3427   * @return array of favored locations
3428   */
3429  @Override
3430  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3431    return regionFavoredNodesMap.get(encodedRegionName);
3432  }
3433
3434  @Override
3435  public ServerNonceManager getNonceManager() {
3436    return this.nonceManager;
3437  }
3438
3439  private static class MovedRegionInfo {
3440    private final ServerName serverName;
3441    private final long seqNum;
3442    private final long moveTime;
3443
3444    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3445      this.serverName = serverName;
3446      this.seqNum = closeSeqNum;
3447      this.moveTime = EnvironmentEdgeManager.currentTime();
3448     }
3449
3450    public ServerName getServerName() {
3451      return serverName;
3452    }
3453
3454    public long getSeqNum() {
3455      return seqNum;
3456    }
3457
3458    long getMoveTime() {
3459      return moveTime;
3460    }
3461  }
3462
3463  /**
3464   * This map will contains all the regions that we closed for a move.
3465   * We add the time it was moved as we don't want to keep too old information
3466   */
3467  private Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000);
3468
3469  /**
3470   * We need a timeout. If not there is a risk of giving a wrong information: this would double
3471   * the number of network calls instead of reducing them.
3472   */
3473  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3474
3475  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, boolean selfMove) {
3476    if (selfMove) {
3477      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3478      return;
3479    }
3480    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" +
3481        closeSeqNum);
3482    movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3483  }
3484
3485  void removeFromMovedRegions(String encodedName) {
3486    movedRegions.remove(encodedName);
3487  }
3488
3489  private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3490    MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3491
3492    long now = EnvironmentEdgeManager.currentTime();
3493    if (dest != null) {
3494      if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3495        return dest;
3496      } else {
3497        movedRegions.remove(encodedRegionName);
3498      }
3499    }
3500
3501    return null;
3502  }
3503
3504  /**
3505   * Remove the expired entries from the moved regions list.
3506   */
3507  protected void cleanMovedRegions() {
3508    final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3509
3510    movedRegions.entrySet().removeIf(e -> e.getValue().getMoveTime() < cutOff);
3511  }
3512
3513  /*
3514   * Use this to allow tests to override and schedule more frequently.
3515   */
3516
3517  protected int movedRegionCleanerPeriod() {
3518        return TIMEOUT_REGION_MOVED;
3519  }
3520
3521  /**
3522   * Creates a Chore thread to clean the moved region cache.
3523   */
3524  protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3525    private HRegionServer regionServer;
3526    Stoppable stoppable;
3527
3528    private MovedRegionsCleaner(
3529      HRegionServer regionServer, Stoppable stoppable){
3530      super("MovedRegionsCleaner for region " + regionServer, stoppable,
3531          regionServer.movedRegionCleanerPeriod());
3532      this.regionServer = regionServer;
3533      this.stoppable = stoppable;
3534    }
3535
3536    static MovedRegionsCleaner create(HRegionServer rs){
3537      Stoppable stoppable = new Stoppable() {
3538        private volatile boolean isStopped = false;
3539        @Override public void stop(String why) { isStopped = true;}
3540        @Override public boolean isStopped() {return isStopped;}
3541      };
3542
3543      return new MovedRegionsCleaner(rs, stoppable);
3544    }
3545
3546    @Override
3547    protected void chore() {
3548      regionServer.cleanMovedRegions();
3549    }
3550
3551    @Override
3552    public void stop(String why) {
3553      stoppable.stop(why);
3554    }
3555
3556    @Override
3557    public boolean isStopped() {
3558      return stoppable.isStopped();
3559    }
3560  }
3561
3562  private String getMyEphemeralNodePath() {
3563    return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString());
3564  }
3565
3566  private boolean isHealthCheckerConfigured() {
3567    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3568    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3569  }
3570
3571  /**
3572   * @return the underlying {@link CompactSplit} for the servers
3573   */
3574  public CompactSplit getCompactSplitThread() {
3575    return this.compactSplitThread;
3576  }
3577
3578  CoprocessorServiceResponse execRegionServerService(
3579      @SuppressWarnings("UnusedParameters") final RpcController controller,
3580      final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3581    try {
3582      ServerRpcController serviceController = new ServerRpcController();
3583      CoprocessorServiceCall call = serviceRequest.getCall();
3584      String serviceName = call.getServiceName();
3585      com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);
3586      if (service == null) {
3587        throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
3588            serviceName);
3589      }
3590      com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
3591          service.getDescriptorForType();
3592
3593      String methodName = call.getMethodName();
3594      com.google.protobuf.Descriptors.MethodDescriptor methodDesc =
3595          serviceDesc.findMethodByName(methodName);
3596      if (methodDesc == null) {
3597        throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName +
3598            " called on executorService " + serviceName);
3599      }
3600
3601      com.google.protobuf.Message request =
3602          CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3603      final com.google.protobuf.Message.Builder responseBuilder =
3604          service.getResponsePrototype(methodDesc).newBuilderForType();
3605      service.callMethod(methodDesc, serviceController, request, message -> {
3606        if (message != null) {
3607          responseBuilder.mergeFrom(message);
3608        }
3609      });
3610      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3611      if (exception != null) {
3612        throw exception;
3613      }
3614      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3615    } catch (IOException ie) {
3616      throw new ServiceException(ie);
3617    }
3618  }
3619
3620  /**
3621   * May be null if this is a master which not carry table.
3622   *
3623   * @return The block cache instance used by the regionserver.
3624   */
3625  @Override
3626  public Optional<BlockCache> getBlockCache() {
3627    return Optional.ofNullable(this.blockCache);
3628  }
3629
3630  /**
3631   * May be null if this is a master which not carry table.
3632   *
3633   * @return The cache for mob files used by the regionserver.
3634   */
3635  @Override
3636  public Optional<MobFileCache> getMobFileCache() {
3637    return Optional.ofNullable(this.mobFileCache);
3638  }
3639
3640  @Override
3641  public AccessChecker getAccessChecker() {
3642    return rpcServices.getAccessChecker();
3643  }
3644
3645  @Override
3646  public ZKPermissionWatcher getZKPermissionWatcher() {
3647    return rpcServices.getZkPermissionWatcher();
3648  }
3649
3650  /**
3651   * @return : Returns the ConfigurationManager object for testing purposes.
3652   */
3653  @VisibleForTesting
3654  ConfigurationManager getConfigurationManager() {
3655    return configurationManager;
3656  }
3657
3658  /**
3659   * @return Return table descriptors implementation.
3660   */
3661  @Override
3662  public TableDescriptors getTableDescriptors() {
3663    return this.tableDescriptors;
3664  }
3665
3666  /**
3667   * Reload the configuration from disk.
3668   */
3669  void updateConfiguration() {
3670    LOG.info("Reloading the configuration from disk.");
3671    // Reload the configuration from disk.
3672    conf.reloadConfiguration();
3673    configurationManager.notifyAllObservers(conf);
3674  }
3675
3676  CacheEvictionStats clearRegionBlockCache(Region region) {
3677    long evictedBlocks = 0;
3678
3679    for(Store store : region.getStores()) {
3680      for(StoreFile hFile : store.getStorefiles()) {
3681        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3682      }
3683    }
3684
3685    return CacheEvictionStats.builder()
3686        .withEvictedBlocks(evictedBlocks)
3687        .build();
3688  }
3689
3690  @Override
3691  public double getCompactionPressure() {
3692    double max = 0;
3693    for (Region region : onlineRegions.values()) {
3694      for (Store store : region.getStores()) {
3695        double normCount = store.getCompactionPressure();
3696        if (normCount > max) {
3697          max = normCount;
3698        }
3699      }
3700    }
3701    return max;
3702  }
3703
3704  @Override
3705  public HeapMemoryManager getHeapMemoryManager() {
3706    return hMemManager;
3707  }
3708
3709  MemStoreFlusher getMemStoreFlusher() {
3710    return cacheFlusher;
3711  }
3712
3713  /**
3714   * For testing
3715   * @return whether all wal roll request finished for this regionserver
3716   */
3717  @VisibleForTesting
3718  public boolean walRollRequestFinished() {
3719    return this.walRoller.walRollFinished();
3720  }
3721
3722  @Override
3723  public ThroughputController getFlushThroughputController() {
3724    return flushThroughputController;
3725  }
3726
3727  @Override
3728  public double getFlushPressure() {
3729    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3730      // return 0 during RS initialization
3731      return 0.0;
3732    }
3733    return getRegionServerAccounting().getFlushPressure();
3734  }
3735
3736  @Override
3737  public void onConfigurationChange(Configuration newConf) {
3738    ThroughputController old = this.flushThroughputController;
3739    if (old != null) {
3740      old.stop("configuration change");
3741    }
3742    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3743    try {
3744      Superusers.initialize(newConf);
3745    } catch (IOException e) {
3746      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3747    }
3748  }
3749
3750  @Override
3751  public MetricsRegionServer getMetrics() {
3752    return metricsRegionServer;
3753  }
3754
3755  @Override
3756  public SecureBulkLoadManager getSecureBulkLoadManager() {
3757    return this.secureBulkLoadManager;
3758  }
3759
3760  @Override
3761  public EntityLock regionLock(final List<RegionInfo> regionInfos, final String description,
3762      final Abortable abort) {
3763    return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
3764      .regionLock(regionInfos, description, abort);
3765  }
3766
3767  @Override
3768  public void unassign(byte[] regionName) throws IOException {
3769    clusterConnection.getAdmin().unassign(regionName, false);
3770  }
3771
3772  @Override
3773  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3774    return this.rsSpaceQuotaManager;
3775  }
3776
3777  @Override
3778  public boolean reportFileArchivalForQuotas(TableName tableName,
3779      Collection<Entry<String, Long>> archivedFiles) {
3780    RegionServerStatusService.BlockingInterface rss = rssStub;
3781    if (rss == null || rsSpaceQuotaManager == null) {
3782      // the current server could be stopping.
3783      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3784      return false;
3785    }
3786    try {
3787      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3788          rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3789      rss.reportFileArchival(null, request);
3790    } catch (ServiceException se) {
3791      IOException ioe = ProtobufUtil.getRemoteException(se);
3792      if (ioe instanceof PleaseHoldException) {
3793        if (LOG.isTraceEnabled()) {
3794          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3795              + " This will be retried.", ioe);
3796        }
3797        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3798        return false;
3799      }
3800      if (rssStub == rss) {
3801        rssStub = null;
3802      }
3803      // re-create the stub if we failed to report the archival
3804      createRegionServerStatusStub(true);
3805      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3806      return false;
3807    }
3808    return true;
3809  }
3810
3811  public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
3812    return eventLoopGroupConfig;
3813  }
3814
3815  @Override
3816  public Connection createConnection(Configuration conf) throws IOException {
3817    User user = UserProvider.instantiate(conf).getCurrent();
3818    return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName,
3819        this.rpcServices, this.rpcServices);
3820  }
3821
3822  void executeProcedure(long procId, RSProcedureCallable callable) {
3823    executorService.submit(new RSProcedureHandler(this, procId, callable));
3824  }
3825
3826  public void remoteProcedureComplete(long procId, Throwable error) {
3827    procedureResultReporter.complete(procId, error);
3828  }
3829
3830  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3831    RegionServerStatusService.BlockingInterface rss;
3832    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3833    for (;;) {
3834      rss = rssStub;
3835      if (rss != null) {
3836        break;
3837      }
3838      createRegionServerStatusStub();
3839    }
3840    try {
3841      rss.reportProcedureDone(null, request);
3842    } catch (ServiceException se) {
3843      if (rssStub == rss) {
3844        rssStub = null;
3845      }
3846      throw ProtobufUtil.getRemoteException(se);
3847    }
3848  }
3849
3850  /**
3851   * Will ignore the open/close region procedures which already submitted or executed.
3852   *
3853   * When master had unfinished open/close region procedure and restarted, new active master may
3854   * send duplicate open/close region request to regionserver. The open/close request is submitted
3855   * to a thread pool and execute. So first need a cache for submitted open/close region procedures.
3856   *
3857   * After the open/close region request executed and report region transition succeed, cache it in
3858   * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3859   * transition succeed, master will not send the open/close region request to regionserver again.
3860   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3861   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds.
3862   *
3863   * See HBASE-22404 for more details.
3864   *
3865   * @param procId the id of the open/close region procedure
3866   * @return true if the procedure can be submitted.
3867   */
3868  boolean submitRegionProcedure(long procId) {
3869    if (procId == -1) {
3870      return true;
3871    }
3872    // Ignore the region procedures which already submitted.
3873    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3874    if (previous != null) {
3875      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3876      return false;
3877    }
3878    // Ignore the region procedures which already executed.
3879    if (executedRegionProcedures.getIfPresent(procId) != null) {
3880      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3881      return false;
3882    }
3883    return true;
3884  }
3885
3886  /**
3887   * See {@link #submitRegionProcedure(long)}.
3888   * @param procId the id of the open/close region procedure
3889   */
3890  public void finishRegionProcedure(long procId) {
3891    executedRegionProcedures.put(procId, procId);
3892    submittedRegionProcedures.remove(procId);
3893  }
3894
3895  public boolean isShutDown() {
3896    return shutDown;
3897  }
3898
3899  /**
3900   * Force to terminate region server when abort timeout.
3901   */
3902  private static class SystemExitWhenAbortTimeout extends TimerTask {
3903
3904    public SystemExitWhenAbortTimeout() {
3905    }
3906
3907    @Override
3908    public void run() {
3909      LOG.warn("Aborting region server timed out, terminating forcibly" +
3910          " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3911          " Thread dump to stdout.");
3912      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3913      Runtime.getRuntime().halt(1);
3914    }
3915  }
3916
3917  @VisibleForTesting
3918  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3919    return compactedFileDischarger;
3920  }
3921}