001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
024import static org.apache.hadoop.hbase.util.DNS.RS_HOSTNAME_KEY;
025
026import java.io.IOException;
027import java.lang.management.MemoryType;
028import java.lang.management.MemoryUsage;
029import java.lang.reflect.Constructor;
030import java.net.BindException;
031import java.net.InetAddress;
032import java.net.InetSocketAddress;
033import java.time.Duration;
034import java.util.ArrayList;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Comparator;
038import java.util.HashSet;
039import java.util.List;
040import java.util.Map;
041import java.util.Map.Entry;
042import java.util.Objects;
043import java.util.Optional;
044import java.util.Set;
045import java.util.SortedMap;
046import java.util.Timer;
047import java.util.TimerTask;
048import java.util.TreeMap;
049import java.util.TreeSet;
050import java.util.concurrent.ConcurrentHashMap;
051import java.util.concurrent.ConcurrentMap;
052import java.util.concurrent.ConcurrentSkipListMap;
053import java.util.concurrent.TimeUnit;
054import java.util.concurrent.atomic.AtomicBoolean;
055import java.util.concurrent.locks.ReentrantReadWriteLock;
056import java.util.stream.Collectors;
057import javax.management.MalformedObjectNameException;
058import javax.servlet.http.HttpServlet;
059import org.apache.commons.lang3.RandomUtils;
060import org.apache.commons.lang3.StringUtils;
061import org.apache.commons.lang3.SystemUtils;
062import org.apache.hadoop.conf.Configuration;
063import org.apache.hadoop.fs.FileSystem;
064import org.apache.hadoop.fs.Path;
065import org.apache.hadoop.hbase.Abortable;
066import org.apache.hadoop.hbase.CacheEvictionStats;
067import org.apache.hadoop.hbase.CallQueueTooBigException;
068import org.apache.hadoop.hbase.ChoreService;
069import org.apache.hadoop.hbase.ClockOutOfSyncException;
070import org.apache.hadoop.hbase.CoordinatedStateManager;
071import org.apache.hadoop.hbase.DoNotRetryIOException;
072import org.apache.hadoop.hbase.ExecutorStatusChore;
073import org.apache.hadoop.hbase.HBaseConfiguration;
074import org.apache.hadoop.hbase.HBaseInterfaceAudience;
075import org.apache.hadoop.hbase.HConstants;
076import org.apache.hadoop.hbase.HDFSBlocksDistribution;
077import org.apache.hadoop.hbase.HealthCheckChore;
078import org.apache.hadoop.hbase.MetaTableAccessor;
079import org.apache.hadoop.hbase.NotServingRegionException;
080import org.apache.hadoop.hbase.PleaseHoldException;
081import org.apache.hadoop.hbase.ScheduledChore;
082import org.apache.hadoop.hbase.ServerName;
083import org.apache.hadoop.hbase.Stoppable;
084import org.apache.hadoop.hbase.TableDescriptors;
085import org.apache.hadoop.hbase.TableName;
086import org.apache.hadoop.hbase.YouAreDeadException;
087import org.apache.hadoop.hbase.ZNodeClearer;
088import org.apache.hadoop.hbase.client.AsyncClusterConnection;
089import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
090import org.apache.hadoop.hbase.client.Connection;
091import org.apache.hadoop.hbase.client.ConnectionFactory;
092import org.apache.hadoop.hbase.client.ConnectionUtils;
093import org.apache.hadoop.hbase.client.RegionInfo;
094import org.apache.hadoop.hbase.client.RegionInfoBuilder;
095import org.apache.hadoop.hbase.client.locking.EntityLock;
096import org.apache.hadoop.hbase.client.locking.LockServiceClient;
097import org.apache.hadoop.hbase.conf.ConfigurationManager;
098import org.apache.hadoop.hbase.conf.ConfigurationObserver;
099import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
100import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
101import org.apache.hadoop.hbase.exceptions.RegionMovedException;
102import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
103import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
104import org.apache.hadoop.hbase.executor.ExecutorService;
105import org.apache.hadoop.hbase.executor.ExecutorType;
106import org.apache.hadoop.hbase.fs.HFileSystem;
107import org.apache.hadoop.hbase.http.InfoServer;
108import org.apache.hadoop.hbase.io.hfile.BlockCache;
109import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
110import org.apache.hadoop.hbase.io.hfile.HFile;
111import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
112import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
113import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
114import org.apache.hadoop.hbase.ipc.RpcClient;
115import org.apache.hadoop.hbase.ipc.RpcServer;
116import org.apache.hadoop.hbase.ipc.RpcServerInterface;
117import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
118import org.apache.hadoop.hbase.ipc.ServerRpcController;
119import org.apache.hadoop.hbase.log.HBaseMarkers;
120import org.apache.hadoop.hbase.master.HMaster;
121import org.apache.hadoop.hbase.master.LoadBalancer;
122import org.apache.hadoop.hbase.master.MasterRpcServicesVersionWrapper;
123import org.apache.hadoop.hbase.master.RegionState;
124import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
125import org.apache.hadoop.hbase.mob.MobFileCache;
126import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
127import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
128import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
129import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
130import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
131import org.apache.hadoop.hbase.quotas.QuotaUtil;
132import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
133import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
134import org.apache.hadoop.hbase.quotas.RegionSize;
135import org.apache.hadoop.hbase.quotas.RegionSizeStore;
136import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
137import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
138import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
139import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
140import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
141import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
142import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
143import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
144import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
145import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
146import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
147import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
148import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
149import org.apache.hadoop.hbase.security.SecurityConstants;
150import org.apache.hadoop.hbase.security.Superusers;
151import org.apache.hadoop.hbase.security.User;
152import org.apache.hadoop.hbase.security.UserProvider;
153import org.apache.hadoop.hbase.security.access.AccessChecker;
154import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
155import org.apache.hadoop.hbase.trace.SpanReceiverHost;
156import org.apache.hadoop.hbase.trace.TraceUtil;
157import org.apache.hadoop.hbase.util.Addressing;
158import org.apache.hadoop.hbase.util.Bytes;
159import org.apache.hadoop.hbase.util.CommonFSUtils;
160import org.apache.hadoop.hbase.util.CompressionTest;
161import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
162import org.apache.hadoop.hbase.util.FSTableDescriptors;
163import org.apache.hadoop.hbase.util.FSUtils;
164import org.apache.hadoop.hbase.util.FutureUtils;
165import org.apache.hadoop.hbase.util.JvmPauseMonitor;
166import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
167import org.apache.hadoop.hbase.util.Pair;
168import org.apache.hadoop.hbase.util.RetryCounter;
169import org.apache.hadoop.hbase.util.RetryCounterFactory;
170import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
171import org.apache.hadoop.hbase.util.Sleeper;
172import org.apache.hadoop.hbase.util.Threads;
173import org.apache.hadoop.hbase.util.VersionInfo;
174import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
175import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
176import org.apache.hadoop.hbase.wal.WAL;
177import org.apache.hadoop.hbase.wal.WALFactory;
178import org.apache.hadoop.hbase.wal.WALProvider;
179import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
180import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
181import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
182import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
183import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
184import org.apache.hadoop.hbase.zookeeper.ZKUtil;
185import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
186import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
187import org.apache.hadoop.ipc.RemoteException;
188import org.apache.hadoop.util.ReflectionUtils;
189import org.apache.yetus.audience.InterfaceAudience;
190import org.apache.zookeeper.KeeperException;
191import org.slf4j.Logger;
192import org.slf4j.LoggerFactory;
193import sun.misc.Signal;
194
195import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
196import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
197import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
198import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
199import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
200import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
201import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
202import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
203import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
204import org.apache.hbase.thirdparty.com.google.protobuf.Message;
205import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
206import org.apache.hbase.thirdparty.com.google.protobuf.Service;
207import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
208import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
209import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
210
211import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
212import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
242
243/**
244 * HRegionServer makes a set of HRegions available to clients. It checks in with
245 * the HMaster. There are many HRegionServers in a single HBase deployment.
246 */
247@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
248@SuppressWarnings({ "deprecation"})
249public class HRegionServer extends Thread implements
250    RegionServerServices, LastSequenceId, ConfigurationObserver {
251  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
252
253  /**
254   * For testing only!  Set to true to skip notifying region assignment to master .
255   */
256  @VisibleForTesting
257  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
258  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
259
260  /**
261   * A map from RegionName to current action in progress. Boolean value indicates:
262   * true - if open region action in progress
263   * false - if close region action in progress
264   */
265  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
266    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
267
268  /**
269   * Used to cache the open/close region procedures which already submitted.
270   * See {@link #submitRegionProcedure(long)}.
271   */
272  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
273  /**
274   * Used to cache the open/close region procedures which already executed.
275   * See {@link #submitRegionProcedure(long)}.
276   */
277  private final Cache<Long, Long> executedRegionProcedures =
278      CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
279
280  /**
281   * Used to cache the moved-out regions
282   */
283  private final Cache<String, MovedRegionInfo> movedRegionInfoCache =
284      CacheBuilder.newBuilder().expireAfterWrite(movedRegionCacheExpiredTime(),
285        TimeUnit.MILLISECONDS).build();
286
287  private MemStoreFlusher cacheFlusher;
288
289  private HeapMemoryManager hMemManager;
290
291  /**
292   * The asynchronous cluster connection to be shared by services.
293   */
294  protected AsyncClusterConnection asyncClusterConnection;
295
296  /**
297   * Go here to get table descriptors.
298   */
299  protected TableDescriptors tableDescriptors;
300
301  // Replication services. If no replication, this handler will be null.
302  private ReplicationSourceService replicationSourceHandler;
303  private ReplicationSinkService replicationSinkHandler;
304  private boolean sameReplicationSourceAndSink;
305
306  // Compactions
307  public CompactSplit compactSplitThread;
308
309  /**
310   * Map of regions currently being served by this region server. Key is the
311   * encoded region name.  All access should be synchronized.
312   */
313  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
314  /**
315   * Lock for gating access to {@link #onlineRegions}.
316   * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap?
317   */
318  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
319
320  /**
321   * Map of encoded region names to the DataNode locations they should be hosted on
322   * We store the value as InetSocketAddress since this is used only in HDFS
323   * API (create() that takes favored nodes as hints for placing file blocks).
324   * We could have used ServerName here as the value class, but we'd need to
325   * convert it to InetSocketAddress at some point before the HDFS API call, and
326   * it seems a bit weird to store ServerName since ServerName refers to RegionServers
327   * and here we really mean DataNode locations.
328   */
329  private final Map<String, InetSocketAddress[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
330
331  private LeaseManager leaseManager;
332
333  // Instance of the hbase executor executorService.
334  protected ExecutorService executorService;
335
336  private volatile boolean dataFsOk;
337  private HFileSystem dataFs;
338  private HFileSystem walFs;
339
340  // Set when a report to the master comes back with a message asking us to
341  // shutdown. Also set by call to stop when debugging or running unit tests
342  // of HRegionServer in isolation.
343  private volatile boolean stopped = false;
344
345  // Go down hard. Used if file system becomes unavailable and also in
346  // debugging and unit tests.
347  private AtomicBoolean abortRequested;
348  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
349  // Default abort timeout is 1200 seconds for safe
350  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
351  // Will run this task when abort timeout
352  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
353
354  // A state before we go into stopped state.  At this stage we're closing user
355  // space regions.
356  private boolean stopping = false;
357  private volatile boolean killed = false;
358  private volatile boolean shutDown = false;
359
360  protected final Configuration conf;
361
362  private Path dataRootDir;
363  private Path walRootDir;
364
365  private final int threadWakeFrequency;
366  final int msgInterval;
367
368  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
369  private final int compactionCheckFrequency;
370  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
371  private final int flushCheckFrequency;
372
373  // Stub to do region server status calls against the master.
374  private volatile RegionServerStatusService.BlockingInterface rssStub;
375  private volatile LockService.BlockingInterface lockStub;
376  // RPC client. Used to make the stub above that does region server status checking.
377  private RpcClient rpcClient;
378
379  private UncaughtExceptionHandler uncaughtExceptionHandler;
380
381  // Info server. Default access so can be used by unit tests. REGIONSERVER
382  // is name of the webapp and the attribute name used stuffing this instance
383  // into web context.
384  protected InfoServer infoServer;
385  private JvmPauseMonitor pauseMonitor;
386
387  /** region server process name */
388  public static final String REGIONSERVER = "regionserver";
389
390
391  private MetricsRegionServer metricsRegionServer;
392  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
393  private SpanReceiverHost spanReceiverHost;
394
395  /**
396   * ChoreService used to schedule tasks that we want to run periodically
397   */
398  private ChoreService choreService;
399
400  /**
401   * Check for compactions requests.
402   */
403  private ScheduledChore compactionChecker;
404
405  /**
406   * Check for flushes
407   */
408  private ScheduledChore periodicFlusher;
409
410  private volatile WALFactory walFactory;
411
412  private LogRoller walRoller;
413
414  // A thread which calls reportProcedureDone
415  private RemoteProcedureResultReporter procedureResultReporter;
416
417  // flag set after we're done setting up server threads
418  final AtomicBoolean online = new AtomicBoolean(false);
419
420  // zookeeper connection and watcher
421  protected final ZKWatcher zooKeeper;
422
423  // master address tracker
424  private final MasterAddressTracker masterAddressTracker;
425
426  // Cluster Status Tracker
427  protected final ClusterStatusTracker clusterStatusTracker;
428
429  // Log Splitting Worker
430  private SplitLogWorker splitLogWorker;
431
432  // A sleeper that sleeps for msgInterval.
433  protected final Sleeper sleeper;
434
435  private final int shortOperationTimeout;
436
437  private final RegionServerAccounting regionServerAccounting;
438
439  private SlowLogTableOpsChore slowLogTableOpsChore = null;
440
441  // Block cache
442  private BlockCache blockCache;
443  // The cache for mob files
444  private MobFileCache mobFileCache;
445
446  /** The health check chore. */
447  private HealthCheckChore healthCheckChore;
448
449  /** The Executor status collect chore. */
450  private ExecutorStatusChore executorStatusChore;
451
452  /** The nonce manager chore. */
453  private ScheduledChore nonceManagerChore;
454
455  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
456
457  /**
458   * The server name the Master sees us as.  Its made from the hostname the
459   * master passes us, port, and server startcode. Gets set after registration
460   * against Master.
461   */
462  protected ServerName serverName;
463
464  /**
465   * hostname specified by hostname config
466   */
467  protected String useThisHostnameInstead;
468
469  /**
470   * HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive.
471   * Exception will be thrown if both are used.
472   */
473  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
474  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
475    "hbase.regionserver.hostname.disable.master.reversedns";
476
477  /**
478   * This servers startcode.
479   */
480  protected final long startcode;
481
482  /**
483   * Unique identifier for the cluster we are a part of.
484   */
485  protected String clusterId;
486
487  // chore for refreshing store files for secondary regions
488  private StorefileRefresherChore storefileRefresher;
489
490  private RegionServerCoprocessorHost rsHost;
491
492  private RegionServerProcedureManagerHost rspmHost;
493
494  private RegionServerRpcQuotaManager rsQuotaManager;
495  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
496
497  /**
498   * Nonce manager. Nonces are used to make operations like increment and append idempotent
499   * in the case where client doesn't receive the response from a successful operation and
500   * retries. We track the successful ops for some time via a nonce sent by client and handle
501   * duplicate operations (currently, by failing them; in future we might use MVCC to return
502   * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
503   * HBASE-3787) are:
504   * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
505   *   of past records. If we don't read the records, we don't read and recover the nonces.
506   *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
507   * - There's no WAL recovery during normal region move, so nonces will not be transfered.
508   * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
509   * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
510   * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
511   * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
512   * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
513   * latest nonce in it expired. It can also be recovered during move.
514   */
515  final ServerNonceManager nonceManager;
516
517  private UserProvider userProvider;
518
519  protected final RSRpcServices rpcServices;
520
521  private CoordinatedStateManager csm;
522
523  /**
524   * Configuration manager is used to register/deregister and notify the configuration observers
525   * when the regionserver is notified that there was a change in the on disk configs.
526   */
527  protected final ConfigurationManager configurationManager;
528
529  @VisibleForTesting
530  CompactedHFilesDischarger compactedFileDischarger;
531
532  private volatile ThroughputController flushThroughputController;
533
534  private SecureBulkLoadManager secureBulkLoadManager;
535
536  private FileSystemUtilizationChore fsUtilizationChore;
537
538  private final NettyEventLoopGroupConfig eventLoopGroupConfig;
539
540  /**
541   * Provide online slow log responses from ringbuffer
542   */
543  private NamedQueueRecorder namedQueueRecorder = null;
544
545  /**
546   * True if this RegionServer is coming up in a cluster where there is no Master;
547   * means it needs to just come up and make do without a Master to talk to: e.g. in test or
548   * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
549   * purpose is as a Replication-stream sink; see HBASE-18846 for more.
550   * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ?
551   */
552  private final boolean masterless;
553  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
554
555  /**regionserver codec list **/
556  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
557
558  // A timer to shutdown the process if abort takes too long
559  private Timer abortMonitor;
560
561  /**
562   * Starts a HRegionServer at the default location.
563   * <p/>
564   * Don't start any services or managers in here in the Constructor.
565   * Defer till after we register with the Master as much as possible. See {@link #startServices}.
566   */
567  public HRegionServer(final Configuration conf) throws IOException {
568    super("RegionServer");  // thread name
569    TraceUtil.initTracer(conf);
570    try {
571      this.startcode = System.currentTimeMillis();
572      this.conf = conf;
573      this.dataFsOk = true;
574      this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
575      this.eventLoopGroupConfig = setupNetty(this.conf);
576      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
577      HFile.checkHFileVersion(this.conf);
578      checkCodecs(this.conf);
579      this.userProvider = UserProvider.instantiate(conf);
580      FSUtils.setupShortCircuitRead(this.conf);
581
582      // Disable usage of meta replicas in the regionserver
583      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
584      // Config'ed params
585      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
586      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
587      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
588      this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
589
590      this.sleeper = new Sleeper(this.msgInterval, this);
591
592      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
593      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
594
595      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
596          HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
597
598      this.abortRequested = new AtomicBoolean(false);
599      this.stopped = false;
600
601      initNamedQueueRecorder(conf);
602      rpcServices = createRpcServices();
603      useThisHostnameInstead = getUseThisHostnameInstead(conf);
604      String hostName =
605          StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName()
606              : this.useThisHostnameInstead;
607      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
608
609      // login the zookeeper client principal (if using security)
610      ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
611          HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
612      // login the server principal (if using secure Hadoop)
613      login(userProvider, hostName);
614      // init superusers and add the server principal (if using security)
615      // or process owner as default super user.
616      Superusers.initialize(conf);
617      regionServerAccounting = new RegionServerAccounting(conf);
618
619      boolean isMasterNotCarryTable =
620          this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf);
621
622      // no need to instantiate block cache and mob file cache when master not carry table
623      if (!isMasterNotCarryTable) {
624        blockCache = BlockCacheFactory.createBlockCache(conf);
625        mobFileCache = new MobFileCache(conf);
626      }
627
628      uncaughtExceptionHandler =
629        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
630
631      initializeFileSystem();
632      spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
633
634      this.configurationManager = new ConfigurationManager();
635      setupWindows(getConfiguration(), getConfigurationManager());
636
637      // Some unit tests don't need a cluster, so no zookeeper at all
638      // Open connection to zookeeper and set primary watcher
639      zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this,
640        canCreateBaseZNode());
641      // If no master in cluster, skip trying to track one or look for a cluster status.
642      if (!this.masterless) {
643        if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
644          DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
645          this.csm = new ZkCoordinatedStateManager(this);
646        }
647
648        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
649        masterAddressTracker.start();
650
651        clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
652        clusterStatusTracker.start();
653      } else {
654        masterAddressTracker = null;
655        clusterStatusTracker = null;
656      }
657      this.rpcServices.start(zooKeeper);
658      // This violates 'no starting stuff in Constructor' but Master depends on the below chore
659      // and executor being created and takes a different startup route. Lots of overlap between HRS
660      // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
661      // Master expects Constructor to put up web servers. Ugh.
662      // class HRS. TODO.
663      this.choreService = new ChoreService(getName(), true);
664      this.executorService = new ExecutorService(getName());
665      putUpWebUI();
666    } catch (Throwable t) {
667      // Make sure we log the exception. HRegionServer is often started via reflection and the
668      // cause of failed startup is lost.
669      LOG.error("Failed construction RegionServer", t);
670      throw t;
671    }
672  }
673
674  private void initNamedQueueRecorder(Configuration conf) {
675    if (!(this instanceof HMaster)) {
676      final boolean isOnlineLogProviderEnabled = conf.getBoolean(
677        HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
678        HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
679      if (isOnlineLogProviderEnabled) {
680        this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
681      }
682    } else {
683      final boolean isBalancerDecisionRecording = conf
684        .getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
685          BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
686      if (isBalancerDecisionRecording) {
687        this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
688      }
689    }
690  }
691
692  // HMaster should override this method to load the specific config for master
693  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
694    String hostname = conf.get(RS_HOSTNAME_KEY);
695    if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
696      if (!StringUtils.isBlank(hostname)) {
697        String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY +
698          " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
699          " to true while " + RS_HOSTNAME_KEY + " is used";
700        throw new IOException(msg);
701      } else {
702        return rpcServices.isa.getHostName();
703      }
704    } else {
705      return hostname;
706    }
707  }
708
709  /**
710   * If running on Windows, do windows-specific setup.
711   */
712  private static void setupWindows(final Configuration conf, ConfigurationManager cm) {
713    if (!SystemUtils.IS_OS_WINDOWS) {
714      Signal.handle(new Signal("HUP"), signal -> {
715        conf.reloadConfiguration();
716        cm.notifyAllObservers(conf);
717      });
718    }
719  }
720
721  private static NettyEventLoopGroupConfig setupNetty(Configuration conf) {
722    // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
723    NettyEventLoopGroupConfig nelgc =
724      new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
725    NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
726    NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
727    return nelgc;
728  }
729
730  private void initializeFileSystem() throws IOException {
731    // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
732    // checksum verification enabled, then automatically switch off hdfs checksum verification.
733    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
734    CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getWALRootDir(this.conf));
735    this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
736    this.walRootDir = CommonFSUtils.getWALRootDir(this.conf);
737    // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
738    // underlying hadoop hdfs accessors will be going against wrong filesystem
739    // (unless all is set to defaults).
740    CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getRootDir(this.conf));
741    this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
742    this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
743    this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir,
744      !canUpdateTableDescriptor(), cacheTableDescriptor());
745  }
746
747  protected void login(UserProvider user, String host) throws IOException {
748    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
749      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
750  }
751
752  /**
753   * Wait for an active Master.
754   * See override in Master superclass for how it is used.
755   */
756  protected void waitForMasterActive() {}
757
758  protected String getProcessName() {
759    return REGIONSERVER;
760  }
761
762  protected boolean canCreateBaseZNode() {
763    return this.masterless;
764  }
765
766  protected boolean canUpdateTableDescriptor() {
767    return false;
768  }
769
770  protected boolean cacheTableDescriptor() {
771    return false;
772  }
773
774  protected RSRpcServices createRpcServices() throws IOException {
775    return new RSRpcServices(this);
776  }
777
778  protected void configureInfoServer() {
779    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
780    infoServer.setAttribute(REGIONSERVER, this);
781  }
782
783  protected Class<? extends HttpServlet> getDumpServlet() {
784    return RSDumpServlet.class;
785  }
786
787  @Override
788  public boolean registerService(Service instance) {
789    // No stacking of instances is allowed for a single executorService name
790    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
791    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
792    if (coprocessorServiceHandlers.containsKey(serviceName)) {
793      LOG.error("Coprocessor executorService " + serviceName +
794        " already registered, rejecting request from " + instance);
795      return false;
796    }
797
798    coprocessorServiceHandlers.put(serviceName, instance);
799    if (LOG.isDebugEnabled()) {
800      LOG.debug(
801        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
802    }
803    return true;
804  }
805
806  private Configuration cleanupConfiguration() {
807    Configuration conf = this.conf;
808    // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
809    // - Decouples RS and master life cycles. RegionServers can continue be up independent of
810    //   masters' availability.
811    // - Configuration management for region servers (cluster internal) is much simpler when adding
812    //   new masters or removing existing masters, since only clients' config needs to be updated.
813    // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
814    //   other internal connections too.
815    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
816        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
817    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
818      // Use server ZK cluster for server-issued connections, so we clone
819      // the conf and unset the client ZK related properties
820      conf = new Configuration(this.conf);
821      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
822    }
823    return conf;
824  }
825
826  /**
827   * Run test on configured codecs to make sure supporting libs are in place.
828   */
829  private static void checkCodecs(final Configuration c) throws IOException {
830    // check to see if the codec list is available:
831    String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null);
832    if (codecs == null) return;
833    for (String codec : codecs) {
834      if (!CompressionTest.testCompression(codec)) {
835        throw new IOException("Compression codec " + codec +
836          " not supported, aborting RS construction");
837      }
838    }
839  }
840
841  public String getClusterId() {
842    return this.clusterId;
843  }
844
845  /**
846   * Setup our cluster connection if not already initialized.
847   */
848  protected final synchronized void setupClusterConnection() throws IOException {
849    if (asyncClusterConnection == null) {
850      Configuration conf = cleanupConfiguration();
851      InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
852      User user = userProvider.getCurrent();
853      asyncClusterConnection =
854        ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user);
855    }
856  }
857
858  /**
859   * All initialization needed before we go register with Master.<br>
860   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
861   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
862   */
863  private void preRegistrationInitialization() {
864    try {
865      initializeZooKeeper();
866      setupClusterConnection();
867      // Setup RPC client for master communication
868      this.rpcClient = asyncClusterConnection.getRpcClient();
869    } catch (Throwable t) {
870      // Call stop if error or process will stick around for ever since server
871      // puts up non-daemon threads.
872      this.rpcServices.stop();
873      abort("Initialization of RS failed.  Hence aborting RS.", t);
874    }
875  }
876
877  /**
878   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
879   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
880   * <p>
881   * Finally open long-living server short-circuit connection.
882   */
883  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
884    justification="cluster Id znode read would give us correct response")
885  private void initializeZooKeeper() throws IOException, InterruptedException {
886    // Nothing to do in here if no Master in the mix.
887    if (this.masterless) {
888      return;
889    }
890
891    // Create the master address tracker, register with zk, and start it.  Then
892    // block until a master is available.  No point in starting up if no master
893    // running.
894    blockAndCheckIfStopped(this.masterAddressTracker);
895
896    // Wait on cluster being up.  Master will set this flag up in zookeeper
897    // when ready.
898    blockAndCheckIfStopped(this.clusterStatusTracker);
899
900    // If we are HMaster then the cluster id should have already been set.
901    if (clusterId == null) {
902      // Retrieve clusterId
903      // Since cluster status is now up
904      // ID should have already been set by HMaster
905      try {
906        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
907        if (clusterId == null) {
908          this.abort("Cluster ID has not been set");
909        }
910        LOG.info("ClusterId : " + clusterId);
911      } catch (KeeperException e) {
912        this.abort("Failed to retrieve Cluster ID", e);
913      }
914    }
915
916    waitForMasterActive();
917    if (isStopped() || isAborted()) {
918      return; // No need for further initialization
919    }
920
921    // watch for snapshots and other procedures
922    try {
923      rspmHost = new RegionServerProcedureManagerHost();
924      rspmHost.loadProcedures(conf);
925      rspmHost.initialize(this);
926    } catch (KeeperException e) {
927      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
928    }
929  }
930
931  /**
932   * Utilty method to wait indefinitely on a znode availability while checking
933   * if the region server is shut down
934   * @param tracker znode tracker to use
935   * @throws IOException any IO exception, plus if the RS is stopped
936   * @throws InterruptedException if the waiting thread is interrupted
937   */
938  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
939      throws IOException, InterruptedException {
940    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
941      if (this.stopped) {
942        throw new IOException("Received the shutdown message while waiting.");
943      }
944    }
945  }
946
947  /**
948   * @return True if the cluster is up.
949   */
950  @Override
951  public boolean isClusterUp() {
952    return this.masterless ||
953        (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
954  }
955
956  /**
957   * The HRegionServer sticks in this loop until closed.
958   */
959  @Override
960  public void run() {
961    if (isStopped()) {
962      LOG.info("Skipping run; stopped");
963      return;
964    }
965    try {
966      // Do pre-registration initializations; zookeeper, lease threads, etc.
967      preRegistrationInitialization();
968    } catch (Throwable e) {
969      abort("Fatal exception during initialization", e);
970    }
971
972    try {
973      if (!isStopped() && !isAborted()) {
974        ShutdownHook.install(conf, dataFs, this, Thread.currentThread());
975        // Initialize the RegionServerCoprocessorHost now that our ephemeral
976        // node was created, in case any coprocessors want to use ZooKeeper
977        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
978
979        // Try and register with the Master; tell it we are here.  Break if server is stopped or
980        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
981        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
982        // come up.
983        LOG.debug("About to register with Master.");
984        RetryCounterFactory rcf =
985          new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
986        RetryCounter rc = rcf.create();
987        while (keepLooping()) {
988          RegionServerStartupResponse w = reportForDuty();
989          if (w == null) {
990            long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
991            LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
992            this.sleeper.sleep(sleepTime);
993          } else {
994            handleReportForDutyResponse(w);
995            break;
996          }
997        }
998      }
999
1000      if (!isStopped() && isHealthy()) {
1001        // start the snapshot handler and other procedure handlers,
1002        // since the server is ready to run
1003        if (this.rspmHost != null) {
1004          this.rspmHost.start();
1005        }
1006        // Start the Quota Manager
1007        if (this.rsQuotaManager != null) {
1008          rsQuotaManager.start(getRpcServer().getScheduler());
1009        }
1010        if (this.rsSpaceQuotaManager != null) {
1011          this.rsSpaceQuotaManager.start();
1012        }
1013      }
1014
1015      // We registered with the Master.  Go into run mode.
1016      long lastMsg = System.currentTimeMillis();
1017      long oldRequestCount = -1;
1018      // The main run loop.
1019      while (!isStopped() && isHealthy()) {
1020        if (!isClusterUp()) {
1021          if (onlineRegions.isEmpty()) {
1022            stop("Exiting; cluster shutdown set and not carrying any regions");
1023          } else if (!this.stopping) {
1024            this.stopping = true;
1025            LOG.info("Closing user regions");
1026            closeUserRegions(this.abortRequested.get());
1027          } else {
1028            boolean allUserRegionsOffline = areAllUserRegionsOffline();
1029            if (allUserRegionsOffline) {
1030              // Set stopped if no more write requests tp meta tables
1031              // since last time we went around the loop.  Any open
1032              // meta regions will be closed on our way out.
1033              if (oldRequestCount == getWriteRequestCount()) {
1034                stop("Stopped; only catalog regions remaining online");
1035                break;
1036              }
1037              oldRequestCount = getWriteRequestCount();
1038            } else {
1039              // Make sure all regions have been closed -- some regions may
1040              // have not got it because we were splitting at the time of
1041              // the call to closeUserRegions.
1042              closeUserRegions(this.abortRequested.get());
1043            }
1044            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1045          }
1046        }
1047        long now = System.currentTimeMillis();
1048        if ((now - lastMsg) >= msgInterval) {
1049          tryRegionServerReport(lastMsg, now);
1050          lastMsg = System.currentTimeMillis();
1051        }
1052        if (!isStopped() && !isAborted()) {
1053          this.sleeper.sleep();
1054        }
1055      } // for
1056    } catch (Throwable t) {
1057      if (!rpcServices.checkOOME(t)) {
1058        String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
1059        abort(prefix + t.getMessage(), t);
1060      }
1061    }
1062
1063    if (this.leaseManager != null) {
1064      this.leaseManager.closeAfterLeasesExpire();
1065    }
1066    if (this.splitLogWorker != null) {
1067      splitLogWorker.stop();
1068    }
1069    if (this.infoServer != null) {
1070      LOG.info("Stopping infoServer");
1071      try {
1072        this.infoServer.stop();
1073      } catch (Exception e) {
1074        LOG.error("Failed to stop infoServer", e);
1075      }
1076    }
1077    // Send cache a shutdown.
1078    if (blockCache != null) {
1079      blockCache.shutdown();
1080    }
1081    if (mobFileCache != null) {
1082      mobFileCache.shutdown();
1083    }
1084
1085    // Send interrupts to wake up threads if sleeping so they notice shutdown.
1086    // TODO: Should we check they are alive? If OOME could have exited already
1087    if (this.hMemManager != null) this.hMemManager.stop();
1088    if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1089    if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1090
1091    // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1092    if (rspmHost != null) {
1093      rspmHost.stop(this.abortRequested.get() || this.killed);
1094    }
1095
1096    if (this.killed) {
1097      // Just skip out w/o closing regions.  Used when testing.
1098    } else if (abortRequested.get()) {
1099      if (this.dataFsOk) {
1100        closeUserRegions(abortRequested.get()); // Don't leave any open file handles
1101      }
1102      LOG.info("aborting server " + this.serverName);
1103    } else {
1104      closeUserRegions(abortRequested.get());
1105      LOG.info("stopping server " + this.serverName);
1106    }
1107
1108    if (this.asyncClusterConnection != null) {
1109      try {
1110        this.asyncClusterConnection.close();
1111      } catch (IOException e) {
1112        // Although the {@link Closeable} interface throws an {@link
1113        // IOException}, in reality, the implementation would never do that.
1114        LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e);
1115      }
1116    }
1117    // Closing the compactSplit thread before closing meta regions
1118    if (!this.killed && containsMetaTableRegions()) {
1119      if (!abortRequested.get() || this.dataFsOk) {
1120        if (this.compactSplitThread != null) {
1121          this.compactSplitThread.join();
1122          this.compactSplitThread = null;
1123        }
1124        closeMetaTableRegions(abortRequested.get());
1125      }
1126    }
1127
1128    if (!this.killed && this.dataFsOk) {
1129      waitOnAllRegionsToClose(abortRequested.get());
1130      LOG.info("stopping server " + this.serverName + "; all regions closed.");
1131    }
1132
1133    // Stop the quota manager
1134    if (rsQuotaManager != null) {
1135      rsQuotaManager.stop();
1136    }
1137    if (rsSpaceQuotaManager != null) {
1138      rsSpaceQuotaManager.stop();
1139      rsSpaceQuotaManager = null;
1140    }
1141
1142    // flag may be changed when closing regions throws exception.
1143    if (this.dataFsOk) {
1144      shutdownWAL(!abortRequested.get());
1145    }
1146
1147    // Make sure the proxy is down.
1148    if (this.rssStub != null) {
1149      this.rssStub = null;
1150    }
1151    if (this.lockStub != null) {
1152      this.lockStub = null;
1153    }
1154    if (this.rpcClient != null) {
1155      this.rpcClient.close();
1156    }
1157    if (this.leaseManager != null) {
1158      this.leaseManager.close();
1159    }
1160    if (this.pauseMonitor != null) {
1161      this.pauseMonitor.stop();
1162    }
1163
1164    if (!killed) {
1165      stopServiceThreads();
1166    }
1167
1168    if (this.rpcServices != null) {
1169      this.rpcServices.stop();
1170    }
1171
1172    try {
1173      deleteMyEphemeralNode();
1174    } catch (KeeperException.NoNodeException nn) {
1175      // pass
1176    } catch (KeeperException e) {
1177      LOG.warn("Failed deleting my ephemeral node", e);
1178    }
1179    // We may have failed to delete the znode at the previous step, but
1180    //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1181    ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1182
1183    if (this.zooKeeper != null) {
1184      this.zooKeeper.close();
1185    }
1186    this.shutDown = true;
1187    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1188  }
1189
1190  private boolean containsMetaTableRegions() {
1191    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1192  }
1193
1194  private boolean areAllUserRegionsOffline() {
1195    if (getNumberOfOnlineRegions() > 2) return false;
1196    boolean allUserRegionsOffline = true;
1197    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1198      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1199        allUserRegionsOffline = false;
1200        break;
1201      }
1202    }
1203    return allUserRegionsOffline;
1204  }
1205
1206  /**
1207   * @return Current write count for all online regions.
1208   */
1209  private long getWriteRequestCount() {
1210    long writeCount = 0;
1211    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1212      writeCount += e.getValue().getWriteRequestsCount();
1213    }
1214    return writeCount;
1215  }
1216
1217  @VisibleForTesting
1218  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1219      throws IOException {
1220    RegionServerStatusService.BlockingInterface rss = rssStub;
1221    if (rss == null) {
1222      // the current server could be stopping.
1223      return;
1224    }
1225    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1226    try {
1227      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1228      request.setServer(ProtobufUtil.toServerName(this.serverName));
1229      request.setLoad(sl);
1230      rss.regionServerReport(null, request.build());
1231    } catch (ServiceException se) {
1232      IOException ioe = ProtobufUtil.getRemoteException(se);
1233      if (ioe instanceof YouAreDeadException) {
1234        // This will be caught and handled as a fatal error in run()
1235        throw ioe;
1236      }
1237      if (rssStub == rss) {
1238        rssStub = null;
1239      }
1240      // Couldn't connect to the master, get location from zk and reconnect
1241      // Method blocks until new master is found or we are stopped
1242      createRegionServerStatusStub(true);
1243    }
1244  }
1245
1246  /**
1247   * Reports the given map of Regions and their size on the filesystem to the active Master.
1248   *
1249   * @param regionSizeStore The store containing region sizes
1250   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1251   */
1252  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1253    RegionServerStatusService.BlockingInterface rss = rssStub;
1254    if (rss == null) {
1255      // the current server could be stopping.
1256      LOG.trace("Skipping Region size report to HMaster as stub is null");
1257      return true;
1258    }
1259    try {
1260      buildReportAndSend(rss, regionSizeStore);
1261    } catch (ServiceException se) {
1262      IOException ioe = ProtobufUtil.getRemoteException(se);
1263      if (ioe instanceof PleaseHoldException) {
1264        LOG.trace("Failed to report region sizes to Master because it is initializing."
1265            + " This will be retried.", ioe);
1266        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1267        return true;
1268      }
1269      if (rssStub == rss) {
1270        rssStub = null;
1271      }
1272      createRegionServerStatusStub(true);
1273      if (ioe instanceof DoNotRetryIOException) {
1274        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1275        if (doNotRetryEx.getCause() != null) {
1276          Throwable t = doNotRetryEx.getCause();
1277          if (t instanceof UnsupportedOperationException) {
1278            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1279            return false;
1280          }
1281        }
1282      }
1283      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1284    }
1285    return true;
1286  }
1287
1288  /**
1289   * Builds the region size report and sends it to the master. Upon successful sending of the
1290   * report, the region sizes that were sent are marked as sent.
1291   *
1292   * @param rss The stub to send to the Master
1293   * @param regionSizeStore The store containing region sizes
1294   */
1295  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1296      RegionSizeStore regionSizeStore) throws ServiceException {
1297    RegionSpaceUseReportRequest request =
1298        buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1299    rss.reportRegionSpaceUse(null, request);
1300    // Record the number of size reports sent
1301    if (metricsRegionServer != null) {
1302      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1303    }
1304  }
1305
1306  /**
1307   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1308   *
1309   * @param regionSizes The size in bytes of regions
1310   * @return The corresponding protocol buffer message.
1311   */
1312  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1313    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1314    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1315      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1316    }
1317    return request.build();
1318  }
1319
1320  /**
1321   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1322   * protobuf message.
1323   *
1324   * @param regionInfo The RegionInfo
1325   * @param sizeInBytes The size in bytes of the Region
1326   * @return The protocol buffer
1327   */
1328  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1329    return RegionSpaceUse.newBuilder()
1330        .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1331        .setRegionSize(Objects.requireNonNull(sizeInBytes))
1332        .build();
1333  }
1334
1335  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1336      throws IOException {
1337    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1338    // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1339    // the wrapper to compute those numbers in one place.
1340    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1341    // Instead they should be stored in an HBase table so that external visibility into HBase is
1342    // improved; Additionally the load balancer will be able to take advantage of a more complete
1343    // history.
1344    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1345    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1346    long usedMemory = -1L;
1347    long maxMemory = -1L;
1348    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1349    if (usage != null) {
1350      usedMemory = usage.getUsed();
1351      maxMemory = usage.getMax();
1352    }
1353
1354    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1355    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1356    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1357    serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1358    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1359    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1360    Builder coprocessorBuilder = Coprocessor.newBuilder();
1361    for (String coprocessor : coprocessors) {
1362      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1363    }
1364    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1365    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1366    for (HRegion region : regions) {
1367      if (region.getCoprocessorHost() != null) {
1368        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1369        for (String regionCoprocessor : regionCoprocessors) {
1370          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1371        }
1372      }
1373      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1374      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1375          .getCoprocessors()) {
1376        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1377      }
1378    }
1379    serverLoad.setReportStartTime(reportStartTime);
1380    serverLoad.setReportEndTime(reportEndTime);
1381    if (this.infoServer != null) {
1382      serverLoad.setInfoServerPort(this.infoServer.getPort());
1383    } else {
1384      serverLoad.setInfoServerPort(-1);
1385    }
1386    MetricsUserAggregateSource userSource =
1387        metricsRegionServer.getMetricsUserAggregate().getSource();
1388    if (userSource != null) {
1389      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1390      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1391        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1392      }
1393    }
1394
1395    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1396      // always refresh first to get the latest value
1397      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1398      if (rLoad != null) {
1399        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1400        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1401          .getReplicationLoadSourceEntries()) {
1402          serverLoad.addReplLoadSource(rLS);
1403        }
1404      }
1405    } else {
1406      if (replicationSourceHandler != null) {
1407        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1408        if (rLoad != null) {
1409          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1410            .getReplicationLoadSourceEntries()) {
1411            serverLoad.addReplLoadSource(rLS);
1412          }
1413        }
1414      }
1415      if (replicationSinkHandler != null) {
1416        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1417        if (rLoad != null) {
1418          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1419        }
1420      }
1421    }
1422
1423    return serverLoad.build();
1424  }
1425
1426  private String getOnlineRegionsAsPrintableString() {
1427    StringBuilder sb = new StringBuilder();
1428    for (Region r: this.onlineRegions.values()) {
1429      if (sb.length() > 0) sb.append(", ");
1430      sb.append(r.getRegionInfo().getEncodedName());
1431    }
1432    return sb.toString();
1433  }
1434
1435  /**
1436   * Wait on regions close.
1437   */
1438  private void waitOnAllRegionsToClose(final boolean abort) {
1439    // Wait till all regions are closed before going out.
1440    int lastCount = -1;
1441    long previousLogTime = 0;
1442    Set<String> closedRegions = new HashSet<>();
1443    boolean interrupted = false;
1444    try {
1445      while (!onlineRegions.isEmpty()) {
1446        int count = getNumberOfOnlineRegions();
1447        // Only print a message if the count of regions has changed.
1448        if (count != lastCount) {
1449          // Log every second at most
1450          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1451            previousLogTime = System.currentTimeMillis();
1452            lastCount = count;
1453            LOG.info("Waiting on " + count + " regions to close");
1454            // Only print out regions still closing if a small number else will
1455            // swamp the log.
1456            if (count < 10 && LOG.isDebugEnabled()) {
1457              LOG.debug("Online Regions=" + this.onlineRegions);
1458            }
1459          }
1460        }
1461        // Ensure all user regions have been sent a close. Use this to
1462        // protect against the case where an open comes in after we start the
1463        // iterator of onlineRegions to close all user regions.
1464        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1465          RegionInfo hri = e.getValue().getRegionInfo();
1466          if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) &&
1467              !closedRegions.contains(hri.getEncodedName())) {
1468            closedRegions.add(hri.getEncodedName());
1469            // Don't update zk with this close transition; pass false.
1470            closeRegionIgnoreErrors(hri, abort);
1471          }
1472        }
1473        // No regions in RIT, we could stop waiting now.
1474        if (this.regionsInTransitionInRS.isEmpty()) {
1475          if (!onlineRegions.isEmpty()) {
1476            LOG.info("We were exiting though online regions are not empty," +
1477                " because some regions failed closing");
1478          }
1479          break;
1480        } else {
1481          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream().
1482            map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1483        }
1484        if (sleepInterrupted(200)) {
1485          interrupted = true;
1486        }
1487      }
1488    } finally {
1489      if (interrupted) {
1490        Thread.currentThread().interrupt();
1491      }
1492    }
1493  }
1494
1495  private static boolean sleepInterrupted(long millis) {
1496    boolean interrupted = false;
1497    try {
1498      Thread.sleep(millis);
1499    } catch (InterruptedException e) {
1500      LOG.warn("Interrupted while sleeping");
1501      interrupted = true;
1502    }
1503    return interrupted;
1504  }
1505
1506  private void shutdownWAL(final boolean close) {
1507    if (this.walFactory != null) {
1508      try {
1509        if (close) {
1510          walFactory.close();
1511        } else {
1512          walFactory.shutdown();
1513        }
1514      } catch (Throwable e) {
1515        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1516        LOG.error("Shutdown / close of WAL failed: " + e);
1517        LOG.debug("Shutdown / close exception details:", e);
1518      }
1519    }
1520  }
1521
1522  /**
1523   * get NamedQueue Provider to add different logs to ringbuffer
1524   *
1525   * @return NamedQueueRecorder
1526   */
1527  public NamedQueueRecorder getNamedQueueRecorder() {
1528    return this.namedQueueRecorder;
1529  }
1530
1531  /*
1532   * Run init. Sets up wal and starts up all server threads.
1533   *
1534   * @param c Extra configuration.
1535   */
1536  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1537  throws IOException {
1538    try {
1539      boolean updateRootDir = false;
1540      for (NameStringPair e : c.getMapEntriesList()) {
1541        String key = e.getName();
1542        // The hostname the master sees us as.
1543        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1544          String hostnameFromMasterPOV = e.getValue();
1545          this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(),
1546              this.startcode);
1547          if (!StringUtils.isBlank(useThisHostnameInstead) &&
1548              !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1549            String msg = "Master passed us a different hostname to use; was=" +
1550                this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1551            LOG.error(msg);
1552            throw new IOException(msg);
1553          }
1554          if (StringUtils.isBlank(useThisHostnameInstead) &&
1555              !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1556            String msg = "Master passed us a different hostname to use; was=" +
1557                rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1558            LOG.error(msg);
1559          }
1560          continue;
1561        }
1562
1563        String value = e.getValue();
1564        if (key.equals(HConstants.HBASE_DIR)) {
1565          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1566            updateRootDir = true;
1567          }
1568        }
1569
1570        if (LOG.isDebugEnabled()) {
1571          LOG.debug("Config from master: " + key + "=" + value);
1572        }
1573        this.conf.set(key, value);
1574      }
1575      // Set our ephemeral znode up in zookeeper now we have a name.
1576      createMyEphemeralNode();
1577
1578      if (updateRootDir) {
1579        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1580        initializeFileSystem();
1581      }
1582
1583      // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1584      // config param for task trackers, but we can piggyback off of it.
1585      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1586        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1587      }
1588
1589      // Save it in a file, this will allow to see if we crash
1590      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1591
1592      // This call sets up an initialized replication and WAL. Later we start it up.
1593      setupWALAndReplication();
1594      // Init in here rather than in constructor after thread name has been set
1595      final MetricsTable metricsTable =
1596          new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1597      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1598      this.metricsRegionServer = new MetricsRegionServer(
1599          metricsRegionServerImpl, conf, metricsTable);
1600      // Now that we have a metrics source, start the pause monitor
1601      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1602      pauseMonitor.start();
1603
1604      // There is a rare case where we do NOT want services to start. Check config.
1605      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1606        startServices();
1607      }
1608      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1609      // or make sense of it.
1610      startReplicationService();
1611
1612
1613      // Set up ZK
1614      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa +
1615          ", sessionid=0x" +
1616          Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1617
1618      // Wake up anyone waiting for this server to online
1619      synchronized (online) {
1620        online.set(true);
1621        online.notifyAll();
1622      }
1623    } catch (Throwable e) {
1624      stop("Failed initialization");
1625      throw convertThrowableToIOE(cleanup(e, "Failed init"),
1626          "Region server startup failed");
1627    } finally {
1628      sleeper.skipSleepCycle();
1629    }
1630  }
1631
1632  protected void initializeMemStoreChunkCreator() {
1633    if (MemStoreLAB.isEnabled(conf)) {
1634      // MSLAB is enabled. So initialize MemStoreChunkPool
1635      // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1636      // it.
1637      Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
1638      long globalMemStoreSize = pair.getFirst();
1639      boolean offheap = this.regionServerAccounting.isOffheap();
1640      // When off heap memstore in use, take full area for chunk pool.
1641      float poolSizePercentage = offheap ? 1.0F :
1642        conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
1643      float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
1644        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
1645      int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
1646      float indexChunkSizePercent = conf.getFloat(MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_KEY,
1647        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1648      // init the chunkCreator
1649      ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
1650        initialCountPercentage, this.hMemManager, indexChunkSizePercent);
1651    }
1652  }
1653
1654  private void startHeapMemoryManager() {
1655    if (this.blockCache != null) {
1656      this.hMemManager =
1657          new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1658      this.hMemManager.start(getChoreService());
1659    }
1660  }
1661
1662  private void createMyEphemeralNode() throws KeeperException {
1663    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1664    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1665    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1666    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1667    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1668  }
1669
1670  private void deleteMyEphemeralNode() throws KeeperException {
1671    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1672  }
1673
1674  @Override
1675  public RegionServerAccounting getRegionServerAccounting() {
1676    return regionServerAccounting;
1677  }
1678
1679  /**
1680   * @param r Region to get RegionLoad for.
1681   * @param regionLoadBldr the RegionLoad.Builder, can be null
1682   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1683   * @return RegionLoad instance.
1684   */
1685  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1686      RegionSpecifier.Builder regionSpecifier) throws IOException {
1687    byte[] name = r.getRegionInfo().getRegionName();
1688    int stores = 0;
1689    int storefiles = 0;
1690    int storeRefCount = 0;
1691    int maxCompactedStoreFileRefCount = 0;
1692    int storeUncompressedSizeMB = 0;
1693    int storefileSizeMB = 0;
1694    int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024);
1695    long storefileIndexSizeKB = 0;
1696    int rootLevelIndexSizeKB = 0;
1697    int totalStaticIndexSizeKB = 0;
1698    int totalStaticBloomSizeKB = 0;
1699    long totalCompactingKVs = 0;
1700    long currentCompactedKVs = 0;
1701    List<HStore> storeList = r.getStores();
1702    stores += storeList.size();
1703    for (HStore store : storeList) {
1704      storefiles += store.getStorefilesCount();
1705      int currentStoreRefCount = store.getStoreRefCount();
1706      storeRefCount += currentStoreRefCount;
1707      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1708      maxCompactedStoreFileRefCount = Math.max(maxCompactedStoreFileRefCount,
1709        currentMaxCompactedStoreFileRefCount);
1710      storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1711      storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1712      //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1713      storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024;
1714      CompactionProgress progress = store.getCompactionProgress();
1715      if (progress != null) {
1716        totalCompactingKVs += progress.getTotalCompactingKVs();
1717        currentCompactedKVs += progress.currentCompactedKVs;
1718      }
1719      rootLevelIndexSizeKB += (int) (store.getStorefilesRootLevelIndexSize() / 1024);
1720      totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1721      totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1722    }
1723
1724    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1725    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1726    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1727    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1728    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1729    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1730    if (regionLoadBldr == null) {
1731      regionLoadBldr = RegionLoad.newBuilder();
1732    }
1733    if (regionSpecifier == null) {
1734      regionSpecifier = RegionSpecifier.newBuilder();
1735    }
1736    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1737    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1738    regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1739      .setStores(stores)
1740      .setStorefiles(storefiles)
1741      .setStoreRefCount(storeRefCount)
1742      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1743      .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1744      .setStorefileSizeMB(storefileSizeMB)
1745      .setMemStoreSizeMB(memstoreSizeMB)
1746      .setStorefileIndexSizeKB(storefileIndexSizeKB)
1747      .setRootIndexSizeKB(rootLevelIndexSizeKB)
1748      .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1749      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1750      .setReadRequestsCount(r.getReadRequestsCount())
1751      .setCpRequestsCount(r.getCpRequestsCount())
1752      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1753      .setWriteRequestsCount(r.getWriteRequestsCount())
1754      .setTotalCompactingKVs(totalCompactingKVs)
1755      .setCurrentCompactedKVs(currentCompactedKVs)
1756      .setDataLocality(dataLocality)
1757      .setDataLocalityForSsd(dataLocalityForSsd)
1758      .setBlocksLocalWeight(blocksLocalWeight)
1759      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight)
1760      .setBlocksTotalWeight(blocksTotalWeight)
1761      .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1762    r.setCompleteSequenceId(regionLoadBldr);
1763
1764    return regionLoadBldr.build();
1765  }
1766
1767  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1768    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1769    userLoadBldr.setUserName(user);
1770    userSource.getClientMetrics().values().stream().map(
1771      clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1772            .setHostName(clientMetrics.getHostName())
1773            .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1774            .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1775            .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1776        .forEach(userLoadBldr::addClientMetrics);
1777    return userLoadBldr.build();
1778  }
1779
1780  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1781    HRegion r = onlineRegions.get(encodedRegionName);
1782    return r != null ? createRegionLoad(r, null, null) : null;
1783  }
1784
1785  /**
1786   * Inner class that runs on a long period checking if regions need compaction.
1787   */
1788  private static class CompactionChecker extends ScheduledChore {
1789    private final HRegionServer instance;
1790    private final int majorCompactPriority;
1791    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1792    //Iteration is 1-based rather than 0-based so we don't check for compaction
1793    // immediately upon region server startup
1794    private long iteration = 1;
1795
1796    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1797      super("CompactionChecker", stopper, sleepTime);
1798      this.instance = h;
1799      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1800
1801      /* MajorCompactPriority is configurable.
1802       * If not set, the compaction will use default priority.
1803       */
1804      this.majorCompactPriority = this.instance.conf.
1805          getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1806              DEFAULT_PRIORITY);
1807    }
1808
1809    @Override
1810    protected void chore() {
1811      for (Region r : this.instance.onlineRegions.values()) {
1812        // Skip compaction if region is read only
1813        if (r == null || r.isReadOnly()) {
1814          continue;
1815        }
1816
1817        HRegion hr = (HRegion) r;
1818        for (HStore s : hr.stores.values()) {
1819          try {
1820            long multiplier = s.getCompactionCheckMultiplier();
1821            assert multiplier > 0;
1822            if (iteration % multiplier != 0) {
1823              continue;
1824            }
1825            if (s.needsCompaction()) {
1826              // Queue a compaction. Will recognize if major is needed.
1827              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1828                getName() + " requests compaction");
1829            } else if (s.shouldPerformMajorCompaction()) {
1830              s.triggerMajorCompaction();
1831              if (majorCompactPriority == DEFAULT_PRIORITY ||
1832                  majorCompactPriority > hr.getCompactPriority()) {
1833                this.instance.compactSplitThread.requestCompaction(hr, s,
1834                    getName() + " requests major compaction; use default priority",
1835                    Store.NO_PRIORITY,
1836                CompactionLifeCycleTracker.DUMMY, null);
1837              } else {
1838                this.instance.compactSplitThread.requestCompaction(hr, s,
1839                    getName() + " requests major compaction; use configured priority",
1840                    this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1841              }
1842            }
1843          } catch (IOException e) {
1844            LOG.warn("Failed major compaction check on " + r, e);
1845          }
1846        }
1847      }
1848      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1849    }
1850  }
1851
1852  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1853    private final HRegionServer server;
1854    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1855    private final static int MIN_DELAY_TIME = 0; // millisec
1856    private final long rangeOfDelayMs;
1857
1858    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1859      super("MemstoreFlusherChore", server, cacheFlushInterval);
1860      this.server = server;
1861
1862      final long configuredRangeOfDelay = server.getConfiguration().getInt(
1863          "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1864      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1865    }
1866
1867    @Override
1868    protected void chore() {
1869      final StringBuilder whyFlush = new StringBuilder();
1870      for (HRegion r : this.server.onlineRegions.values()) {
1871        if (r == null) continue;
1872        if (r.shouldFlush(whyFlush)) {
1873          FlushRequester requester = server.getFlushRequester();
1874          if (requester != null) {
1875            long randomDelay = RandomUtils.nextLong(0, rangeOfDelayMs) + MIN_DELAY_TIME;
1876            //Throttle the flushes by putting a delay. If we don't throttle, and there
1877            //is a balanced write-load on the regions in a table, we might end up
1878            //overwhelming the filesystem with too many flushes at once.
1879            if (requester.requestDelayedFlush(r, randomDelay)) {
1880              LOG.info("{} requesting flush of {} because {} after random delay {} ms",
1881                  getName(), r.getRegionInfo().getRegionNameAsString(),  whyFlush.toString(),
1882                  randomDelay);
1883            }
1884          }
1885        }
1886      }
1887    }
1888  }
1889
1890  /**
1891   * Report the status of the server. A server is online once all the startup is
1892   * completed (setting up filesystem, starting executorService threads, etc.). This
1893   * method is designed mostly to be useful in tests.
1894   *
1895   * @return true if online, false if not.
1896   */
1897  public boolean isOnline() {
1898    return online.get();
1899  }
1900
1901  /**
1902   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1903   * be hooked up to WAL.
1904   */
1905  private void setupWALAndReplication() throws IOException {
1906    boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster &&
1907        !LoadBalancer.isMasterCanHostUserRegions(conf);
1908    WALFactory factory =
1909        new WALFactory(conf, serverName.toString(), !isMasterNoTableOrSystemTableOnly);
1910    if (!isMasterNoTableOrSystemTableOnly) {
1911      // TODO Replication make assumptions here based on the default filesystem impl
1912      Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1913      String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1914
1915      Path logDir = new Path(walRootDir, logName);
1916      LOG.debug("logDir={}", logDir);
1917      if (this.walFs.exists(logDir)) {
1918        throw new RegionServerRunningException(
1919            "Region server has already created directory at " + this.serverName.toString());
1920      }
1921      // Always create wal directory as now we need this when master restarts to find out the live
1922      // region servers.
1923      if (!this.walFs.mkdirs(logDir)) {
1924        throw new IOException("Can not create wal directory " + logDir);
1925      }
1926      // Instantiate replication if replication enabled. Pass it the log directories.
1927      createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
1928        factory.getWALProvider());
1929    }
1930    this.walFactory = factory;
1931  }
1932
1933  /**
1934   * Start up replication source and sink handlers.
1935   */
1936  private void startReplicationService() throws IOException {
1937    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1938      this.replicationSourceHandler.startReplicationService();
1939    } else {
1940      if (this.replicationSourceHandler != null) {
1941        this.replicationSourceHandler.startReplicationService();
1942      }
1943      if (this.replicationSinkHandler != null) {
1944        this.replicationSinkHandler.startReplicationService();
1945      }
1946    }
1947  }
1948
1949  /**
1950   * @return Master address tracker instance.
1951   */
1952  public MasterAddressTracker getMasterAddressTracker() {
1953    return this.masterAddressTracker;
1954  }
1955
1956  /**
1957   * Start maintenance Threads, Server, Worker and lease checker threads.
1958   * Start all threads we need to run. This is called after we've successfully
1959   * registered with the Master.
1960   * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1961   * get an unhandled exception. We cannot set the handler on all threads.
1962   * Server's internal Listener thread is off limits. For Server, if an OOME, it
1963   * waits a while then retries. Meantime, a flush or a compaction that tries to
1964   * run should trigger same critical condition and the shutdown will run. On
1965   * its way out, this server will shut down Server. Leases are sort of
1966   * inbetween. It has an internal thread that while it inherits from Chore, it
1967   * keeps its own internal stop mechanism so needs to be stopped by this
1968   * hosting server. Worker logs the exception and exits.
1969   */
1970  private void startServices() throws IOException {
1971    if (!isStopped() && !isAborted()) {
1972      initializeThreads();
1973    }
1974    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1975    this.secureBulkLoadManager.start();
1976
1977    // Health checker thread.
1978    if (isHealthCheckerConfigured()) {
1979      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1980      HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1981      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1982    }
1983    // Executor status collect thread.
1984    if (this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1985        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)) {
1986      int sleepTime = this.conf.getInt(ExecutorStatusChore.WAKE_FREQ,
1987          ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1988      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1989          this.metricsRegionServer.getMetricsSource());
1990    }
1991
1992    this.walRoller = new LogRoller(this);
1993    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1994    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1995
1996    // Create the CompactedFileDischarger chore executorService. This chore helps to
1997    // remove the compacted files that will no longer be used in reads.
1998    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1999    // 2 mins so that compacted files can be archived before the TTLCleaner runs
2000    int cleanerInterval =
2001      conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
2002    this.compactedFileDischarger =
2003      new CompactedHFilesDischarger(cleanerInterval, this, this);
2004    choreService.scheduleChore(compactedFileDischarger);
2005
2006    // Start executor services
2007    this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
2008        conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
2009    this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
2010        conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
2011    this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
2012        conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
2013    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
2014        conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
2015    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
2016        conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
2017    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
2018      this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
2019          conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
2020    }
2021    this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
2022        HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
2023    // Start the threads for compacted files discharger
2024    this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
2025        conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
2026    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
2027      this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
2028          conf.getInt("hbase.regionserver.region.replica.flusher.threads",
2029              conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
2030    }
2031    this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
2032      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
2033    this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
2034      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1));
2035    this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
2036      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1));
2037
2038    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
2039        uncaughtExceptionHandler);
2040    if (this.cacheFlusher != null) {
2041      this.cacheFlusher.start(uncaughtExceptionHandler);
2042    }
2043    Threads.setDaemonThreadRunning(this.procedureResultReporter,
2044      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
2045
2046    if (this.compactionChecker != null) {
2047      choreService.scheduleChore(compactionChecker);
2048    }
2049    if (this.periodicFlusher != null) {
2050      choreService.scheduleChore(periodicFlusher);
2051    }
2052    if (this.healthCheckChore != null) {
2053      choreService.scheduleChore(healthCheckChore);
2054    }
2055    if (this.executorStatusChore != null) {
2056      choreService.scheduleChore(executorStatusChore);
2057    }
2058    if (this.nonceManagerChore != null) {
2059      choreService.scheduleChore(nonceManagerChore);
2060    }
2061    if (this.storefileRefresher != null) {
2062      choreService.scheduleChore(storefileRefresher);
2063    }
2064    if (this.fsUtilizationChore != null) {
2065      choreService.scheduleChore(fsUtilizationChore);
2066    }
2067    if (this.slowLogTableOpsChore != null) {
2068      choreService.scheduleChore(slowLogTableOpsChore);
2069    }
2070
2071    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2072    // an unhandled exception, it will just exit.
2073    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2074        uncaughtExceptionHandler);
2075
2076    // Create the log splitting worker and start it
2077    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2078    // quite a while inside Connection layer. The worker won't be available for other
2079    // tasks even after current task is preempted after a split task times out.
2080    Configuration sinkConf = HBaseConfiguration.create(conf);
2081    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2082        conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2083    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2084        conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2085    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2086    if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
2087      DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
2088      // SplitLogWorker needs csm. If none, don't start this.
2089      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
2090      splitLogWorker.start();
2091      LOG.debug("SplitLogWorker started");
2092    }
2093
2094    // Memstore services.
2095    startHeapMemoryManager();
2096    // Call it after starting HeapMemoryManager.
2097    initializeMemStoreChunkCreator();
2098  }
2099
2100  private void initializeThreads() {
2101    // Cache flushing thread.
2102    this.cacheFlusher = new MemStoreFlusher(conf, this);
2103
2104    // Compaction thread
2105    this.compactSplitThread = new CompactSplit(this);
2106
2107    // Background thread to check for compactions; needed if region has not gotten updates
2108    // in a while. It will take care of not checking too frequently on store-by-store basis.
2109    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2110    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2111    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2112
2113    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2114      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2115    if (isSlowLogTableEnabled) {
2116      // default chore duration: 10 min
2117      final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
2118      slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
2119    }
2120
2121    if (this.nonceManager != null) {
2122      // Create the scheduled chore that cleans up nonces.
2123      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2124    }
2125
2126    // Setup the Quota Manager
2127    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2128    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2129
2130    if (QuotaUtil.isQuotaEnabled(conf)) {
2131      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2132    }
2133
2134
2135    boolean onlyMetaRefresh = false;
2136    int storefileRefreshPeriod = conf.getInt(
2137        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2138        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2139    if (storefileRefreshPeriod == 0) {
2140      storefileRefreshPeriod = conf.getInt(
2141          StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2142          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2143      onlyMetaRefresh = true;
2144    }
2145    if (storefileRefreshPeriod > 0) {
2146      this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
2147          onlyMetaRefresh, this, this);
2148    }
2149    registerConfigurationObservers();
2150  }
2151
2152  private void registerConfigurationObservers() {
2153    // Registering the compactSplitThread object with the ConfigurationManager.
2154    configurationManager.registerObserver(this.compactSplitThread);
2155    configurationManager.registerObserver(this.rpcServices);
2156    configurationManager.registerObserver(this);
2157  }
2158
2159  /**
2160   * Puts up the webui.
2161   */
2162  private void putUpWebUI() throws IOException {
2163    int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2164      HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2165    String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
2166
2167    if(this instanceof HMaster) {
2168      port = conf.getInt(HConstants.MASTER_INFO_PORT,
2169          HConstants.DEFAULT_MASTER_INFOPORT);
2170      addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
2171    }
2172    // -1 is for disabling info server
2173    if (port < 0) {
2174      return;
2175    }
2176
2177    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
2178      String msg =
2179          "Failed to start http info server. Address " + addr
2180              + " does not belong to this host. Correct configuration parameter: "
2181              + "hbase.regionserver.info.bindAddress";
2182      LOG.error(msg);
2183      throw new IOException(msg);
2184    }
2185    // check if auto port bind enabled
2186    boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false);
2187    while (true) {
2188      try {
2189        this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
2190        infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet());
2191        configureInfoServer();
2192        this.infoServer.start();
2193        break;
2194      } catch (BindException e) {
2195        if (!auto) {
2196          // auto bind disabled throw BindException
2197          LOG.error("Failed binding http info server to port: " + port);
2198          throw e;
2199        }
2200        // auto bind enabled, try to use another port
2201        LOG.info("Failed binding http info server to port: " + port);
2202        port++;
2203      }
2204    }
2205    port = this.infoServer.getPort();
2206    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
2207    int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
2208      HConstants.DEFAULT_MASTER_INFOPORT);
2209    conf.setInt("hbase.master.info.port.orig", masterInfoPort);
2210    conf.setInt(HConstants.MASTER_INFO_PORT, port);
2211  }
2212
2213  /*
2214   * Verify that server is healthy
2215   */
2216  private boolean isHealthy() {
2217    if (!dataFsOk) {
2218      // File system problem
2219      return false;
2220    }
2221    // Verify that all threads are alive
2222    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2223        && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2224        && (this.walRoller == null || this.walRoller.isAlive())
2225        && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2226        && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2227    if (!healthy) {
2228      stop("One or more threads are no longer alive -- stop");
2229    }
2230    return healthy;
2231  }
2232
2233  @Override
2234  public List<WAL> getWALs() {
2235    return walFactory.getWALs();
2236  }
2237
2238  @Override
2239  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2240    WAL wal = walFactory.getWAL(regionInfo);
2241    if (this.walRoller != null) {
2242      this.walRoller.addWAL(wal);
2243    }
2244    return wal;
2245  }
2246
2247  public LogRoller getWalRoller() {
2248    return walRoller;
2249  }
2250
2251  WALFactory getWalFactory() {
2252    return walFactory;
2253  }
2254
2255  @Override
2256  public Connection getConnection() {
2257    return getAsyncConnection().toConnection();
2258  }
2259
2260  @Override
2261  public void stop(final String msg) {
2262    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2263  }
2264
2265  /**
2266   * Stops the regionserver.
2267   * @param msg Status message
2268   * @param force True if this is a regionserver abort
2269   * @param user The user executing the stop request, or null if no user is associated
2270   */
2271  public void stop(final String msg, final boolean force, final User user) {
2272    if (!this.stopped) {
2273      LOG.info("***** STOPPING region server '" + this + "' *****");
2274      if (this.rsHost != null) {
2275        // when forced via abort don't allow CPs to override
2276        try {
2277          this.rsHost.preStop(msg, user);
2278        } catch (IOException ioe) {
2279          if (!force) {
2280            LOG.warn("The region server did not stop", ioe);
2281            return;
2282          }
2283          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2284        }
2285      }
2286      this.stopped = true;
2287      LOG.info("STOPPED: " + msg);
2288      // Wakes run() if it is sleeping
2289      sleeper.skipSleepCycle();
2290    }
2291  }
2292
2293  public void waitForServerOnline(){
2294    while (!isStopped() && !isOnline()) {
2295      synchronized (online) {
2296        try {
2297          online.wait(msgInterval);
2298        } catch (InterruptedException ie) {
2299          Thread.currentThread().interrupt();
2300          break;
2301        }
2302      }
2303    }
2304  }
2305
2306  @Override
2307  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2308    HRegion r = context.getRegion();
2309    long openProcId = context.getOpenProcId();
2310    long masterSystemTime = context.getMasterSystemTime();
2311    rpcServices.checkOpen();
2312    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2313      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2314    // Do checks to see if we need to compact (references or too many files)
2315    // Skip compaction check if region is read only
2316    if (!r.isReadOnly()) {
2317      for (HStore s : r.stores.values()) {
2318        if (s.hasReferences() || s.needsCompaction()) {
2319          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2320        }
2321      }
2322    }
2323    long openSeqNum = r.getOpenSeqNum();
2324    if (openSeqNum == HConstants.NO_SEQNUM) {
2325      // If we opened a region, we should have read some sequence number from it.
2326      LOG.error(
2327        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2328      openSeqNum = 0;
2329    }
2330
2331    // Notify master
2332    if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2333      openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) {
2334      throw new IOException(
2335        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2336    }
2337
2338    triggerFlushInPrimaryRegion(r);
2339
2340    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2341  }
2342
2343  /**
2344   * Helper method for use in tests. Skip the region transition report when there's no master
2345   * around to receive it.
2346   */
2347  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2348    final TransitionCode code = context.getCode();
2349    final long openSeqNum = context.getOpenSeqNum();
2350    long masterSystemTime = context.getMasterSystemTime();
2351    final RegionInfo[] hris = context.getHris();
2352
2353    if (code == TransitionCode.OPENED) {
2354      Preconditions.checkArgument(hris != null && hris.length == 1);
2355      if (hris[0].isMetaRegion()) {
2356        try {
2357          MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
2358              hris[0].getReplicaId(), RegionState.State.OPEN);
2359        } catch (KeeperException e) {
2360          LOG.info("Failed to update meta location", e);
2361          return false;
2362        }
2363      } else {
2364        try {
2365          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2366              serverName, openSeqNum, masterSystemTime);
2367        } catch (IOException e) {
2368          LOG.info("Failed to update meta", e);
2369          return false;
2370        }
2371      }
2372    }
2373    return true;
2374  }
2375
2376  private ReportRegionStateTransitionRequest createReportRegionStateTransitionRequest(
2377      final RegionStateTransitionContext context) {
2378    final TransitionCode code = context.getCode();
2379    final long openSeqNum = context.getOpenSeqNum();
2380    final RegionInfo[] hris = context.getHris();
2381    final long[] procIds = context.getProcIds();
2382
2383    ReportRegionStateTransitionRequest.Builder builder =
2384        ReportRegionStateTransitionRequest.newBuilder();
2385    builder.setServer(ProtobufUtil.toServerName(serverName));
2386    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2387    transition.setTransitionCode(code);
2388    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2389      transition.setOpenSeqNum(openSeqNum);
2390    }
2391    for (RegionInfo hri: hris) {
2392      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2393    }
2394    for (long procId: procIds) {
2395      transition.addProcId(procId);
2396    }
2397
2398    return builder.build();
2399  }
2400
2401  @Override
2402  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2403    if (TEST_SKIP_REPORTING_TRANSITION) {
2404      return skipReportingTransition(context);
2405    }
2406    final ReportRegionStateTransitionRequest request =
2407        createReportRegionStateTransitionRequest(context);
2408
2409    // Time to pause if master says 'please hold'. Make configurable if needed.
2410    final long initPauseTime = 1000;
2411    int tries = 0;
2412    long pauseTime;
2413    // Keep looping till we get an error. We want to send reports even though server is going down.
2414    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2415    // HRegionServer does down.
2416    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2417      RegionServerStatusService.BlockingInterface rss = rssStub;
2418      try {
2419        if (rss == null) {
2420          createRegionServerStatusStub();
2421          continue;
2422        }
2423        ReportRegionStateTransitionResponse response =
2424          rss.reportRegionStateTransition(null, request);
2425        if (response.hasErrorMessage()) {
2426          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2427          break;
2428        }
2429        // Log if we had to retry else don't log unless TRACE. We want to
2430        // know if were successful after an attempt showed in logs as failed.
2431        if (tries > 0 || LOG.isTraceEnabled()) {
2432          LOG.info("TRANSITION REPORTED " + request);
2433        }
2434        // NOTE: Return mid-method!!!
2435        return true;
2436      } catch (ServiceException se) {
2437        IOException ioe = ProtobufUtil.getRemoteException(se);
2438        boolean pause =
2439            ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException
2440                || ioe instanceof CallQueueTooBigException;
2441        if (pause) {
2442          // Do backoff else we flood the Master with requests.
2443          pauseTime = ConnectionUtils.getPauseTime(initPauseTime, tries);
2444        } else {
2445          pauseTime = initPauseTime; // Reset.
2446        }
2447        LOG.info("Failed report transition " +
2448          TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" +
2449            (pause?
2450                " after " + pauseTime + "ms delay (Master is coming online...).":
2451                " immediately."),
2452            ioe);
2453        if (pause) Threads.sleep(pauseTime);
2454        tries++;
2455        if (rssStub == rss) {
2456          rssStub = null;
2457        }
2458      }
2459    }
2460    return false;
2461  }
2462
2463  /**
2464   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2465   * block this thread. See RegionReplicaFlushHandler for details.
2466   */
2467  private void triggerFlushInPrimaryRegion(final HRegion region) {
2468    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2469      return;
2470    }
2471    if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2472        !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2473          region.conf)) {
2474      region.setReadsEnabled(true);
2475      return;
2476    }
2477
2478    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2479    // RegionReplicaFlushHandler might reset this.
2480
2481    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2482    if (this.executorService != null) {
2483      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2484    } else {
2485      LOG.info("Executor is null; not running flush of primary region replica for {}",
2486        region.getRegionInfo());
2487     }
2488  }
2489
2490  @Override
2491  public RpcServerInterface getRpcServer() {
2492    return rpcServices.rpcServer;
2493  }
2494
2495  @VisibleForTesting
2496  public RSRpcServices getRSRpcServices() {
2497    return rpcServices;
2498  }
2499
2500  /**
2501   * Cause the server to exit without closing the regions it is serving, the log
2502   * it is using and without notifying the master. Used unit testing and on
2503   * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2504   *
2505   * @param reason
2506   *          the reason we are aborting
2507   * @param cause
2508   *          the exception that caused the abort, or null
2509   */
2510  @Override
2511  public void abort(String reason, Throwable cause) {
2512    if (!setAbortRequested()) {
2513      // Abort already in progress, ignore the new request.
2514      LOG.debug(
2515          "Abort already in progress. Ignoring the current request with reason: {}", reason);
2516      return;
2517    }
2518    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2519    if (cause != null) {
2520      LOG.error(HBaseMarkers.FATAL, msg, cause);
2521    } else {
2522      LOG.error(HBaseMarkers.FATAL, msg);
2523    }
2524    // HBASE-4014: show list of coprocessors that were loaded to help debug
2525    // regionserver crashes.Note that we're implicitly using
2526    // java.util.HashSet's toString() method to print the coprocessor names.
2527    LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " +
2528        CoprocessorHost.getLoadedCoprocessors());
2529    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2530    try {
2531      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2532    } catch (MalformedObjectNameException | IOException e) {
2533      LOG.warn("Failed dumping metrics", e);
2534    }
2535
2536    // Do our best to report our abort to the master, but this may not work
2537    try {
2538      if (cause != null) {
2539        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2540      }
2541      // Report to the master but only if we have already registered with the master.
2542      RegionServerStatusService.BlockingInterface rss = rssStub;
2543      if (rss != null && this.serverName != null) {
2544        ReportRSFatalErrorRequest.Builder builder =
2545          ReportRSFatalErrorRequest.newBuilder();
2546        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2547        builder.setErrorMessage(msg);
2548        rss.reportRSFatalError(null, builder.build());
2549      }
2550    } catch (Throwable t) {
2551      LOG.warn("Unable to report fatal error to master", t);
2552    }
2553
2554    scheduleAbortTimer();
2555    // shutdown should be run as the internal user
2556    stop(reason, true, null);
2557  }
2558
2559  /**
2560   * Sets the abort state if not already set.
2561   * @return True if abortRequested set to True successfully, false if an abort is already in
2562   * progress.
2563   */
2564  protected boolean setAbortRequested() {
2565    return abortRequested.compareAndSet(false, true);
2566  }
2567
2568  /**
2569   * @see HRegionServer#abort(String, Throwable)
2570   */
2571  public void abort(String reason) {
2572    abort(reason, null);
2573  }
2574
2575  @Override
2576  public boolean isAborted() {
2577    return abortRequested.get();
2578  }
2579
2580  /*
2581   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2582   * logs but it does close socket in case want to bring up server on old
2583   * hostname+port immediately.
2584   */
2585  @VisibleForTesting
2586  protected void kill() {
2587    this.killed = true;
2588    abort("Simulated kill");
2589  }
2590
2591  // Limits the time spent in the shutdown process.
2592  private void scheduleAbortTimer() {
2593    if (this.abortMonitor == null) {
2594      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2595      TimerTask abortTimeoutTask = null;
2596      try {
2597        Constructor<? extends TimerTask> timerTaskCtor =
2598          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2599            .asSubclass(TimerTask.class).getDeclaredConstructor();
2600        timerTaskCtor.setAccessible(true);
2601        abortTimeoutTask = timerTaskCtor.newInstance();
2602      } catch (Exception e) {
2603        LOG.warn("Initialize abort timeout task failed", e);
2604      }
2605      if (abortTimeoutTask != null) {
2606        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2607      }
2608    }
2609  }
2610
2611  /**
2612   * Wait on all threads to finish. Presumption is that all closes and stops
2613   * have already been called.
2614   */
2615  protected void stopServiceThreads() {
2616    // clean up the scheduled chores
2617    if (this.choreService != null) {
2618      choreService.cancelChore(nonceManagerChore);
2619      choreService.cancelChore(compactionChecker);
2620      choreService.cancelChore(periodicFlusher);
2621      choreService.cancelChore(healthCheckChore);
2622      choreService.cancelChore(executorStatusChore);
2623      choreService.cancelChore(storefileRefresher);
2624      choreService.cancelChore(fsUtilizationChore);
2625      choreService.cancelChore(slowLogTableOpsChore);
2626      // clean up the remaining scheduled chores (in case we missed out any)
2627      choreService.shutdown();
2628    }
2629
2630    if (this.cacheFlusher != null) {
2631      this.cacheFlusher.join();
2632    }
2633
2634    if (this.spanReceiverHost != null) {
2635      this.spanReceiverHost.closeReceivers();
2636    }
2637    if (this.walRoller != null) {
2638      this.walRoller.close();
2639    }
2640    if (this.compactSplitThread != null) {
2641      this.compactSplitThread.join();
2642    }
2643    if (this.executorService != null) {
2644      this.executorService.shutdown();
2645    }
2646    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2647      this.replicationSourceHandler.stopReplicationService();
2648    } else {
2649      if (this.replicationSourceHandler != null) {
2650        this.replicationSourceHandler.stopReplicationService();
2651      }
2652      if (this.replicationSinkHandler != null) {
2653        this.replicationSinkHandler.stopReplicationService();
2654      }
2655    }
2656  }
2657
2658  /**
2659   * @return Return the object that implements the replication
2660   * source executorService.
2661   */
2662  @Override
2663  public ReplicationSourceService getReplicationSourceService() {
2664    return replicationSourceHandler;
2665  }
2666
2667  /**
2668   * @return Return the object that implements the replication sink executorService.
2669   */
2670  public ReplicationSinkService getReplicationSinkService() {
2671    return replicationSinkHandler;
2672  }
2673
2674  /**
2675   * Get the current master from ZooKeeper and open the RPC connection to it.
2676   * To get a fresh connection, the current rssStub must be null.
2677   * Method will block until a master is available. You can break from this
2678   * block by requesting the server stop.
2679   *
2680   * @return master + port, or null if server has been stopped
2681   */
2682  private synchronized ServerName createRegionServerStatusStub() {
2683    // Create RS stub without refreshing the master node from ZK, use cached data
2684    return createRegionServerStatusStub(false);
2685  }
2686
2687  /**
2688   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2689   * connection, the current rssStub must be null. Method will block until a master is available.
2690   * You can break from this block by requesting the server stop.
2691   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2692   * @return master + port, or null if server has been stopped
2693   */
2694  @VisibleForTesting
2695  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2696    if (rssStub != null) {
2697      return masterAddressTracker.getMasterAddress();
2698    }
2699    ServerName sn = null;
2700    long previousLogTime = 0;
2701    RegionServerStatusService.BlockingInterface intRssStub = null;
2702    LockService.BlockingInterface intLockStub = null;
2703    boolean interrupted = false;
2704    try {
2705      while (keepLooping()) {
2706        sn = this.masterAddressTracker.getMasterAddress(refresh);
2707        if (sn == null) {
2708          if (!keepLooping()) {
2709            // give up with no connection.
2710            LOG.debug("No master found and cluster is stopped; bailing out");
2711            return null;
2712          }
2713          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2714            LOG.debug("No master found; retry");
2715            previousLogTime = System.currentTimeMillis();
2716          }
2717          refresh = true; // let's try pull it from ZK directly
2718          if (sleepInterrupted(200)) {
2719            interrupted = true;
2720          }
2721          continue;
2722        }
2723
2724        // If we are on the active master, use the shortcut
2725        if (this instanceof HMaster && sn.equals(getServerName())) {
2726          // Wrap the shortcut in a class providing our version to the calls where it's relevant.
2727          // Normally, RpcServer-based threadlocals do that.
2728          intRssStub = new MasterRpcServicesVersionWrapper(((HMaster)this).getMasterRpcServices());
2729          intLockStub = ((HMaster)this).getMasterRpcServices();
2730          break;
2731        }
2732        try {
2733          BlockingRpcChannel channel =
2734            this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2735              shortOperationTimeout);
2736          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2737          intLockStub = LockService.newBlockingStub(channel);
2738          break;
2739        } catch (IOException e) {
2740          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2741            e = e instanceof RemoteException ?
2742              ((RemoteException)e).unwrapRemoteException() : e;
2743            if (e instanceof ServerNotRunningYetException) {
2744              LOG.info("Master isn't available yet, retrying");
2745            } else {
2746              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2747            }
2748            previousLogTime = System.currentTimeMillis();
2749          }
2750          if (sleepInterrupted(200)) {
2751            interrupted = true;
2752          }
2753        }
2754      }
2755    } finally {
2756      if (interrupted) {
2757        Thread.currentThread().interrupt();
2758      }
2759    }
2760    this.rssStub = intRssStub;
2761    this.lockStub = intLockStub;
2762    return sn;
2763  }
2764
2765  /**
2766   * @return True if we should break loop because cluster is going down or
2767   * this server has been stopped or hdfs has gone bad.
2768   */
2769  private boolean keepLooping() {
2770    return !this.stopped && isClusterUp();
2771  }
2772
2773  /*
2774   * Let the master know we're here Run initialization using parameters passed
2775   * us by the master.
2776   * @return A Map of key/value configurations we got from the Master else
2777   * null if we failed to register.
2778   * @throws IOException
2779   */
2780  private RegionServerStartupResponse reportForDuty() throws IOException {
2781    if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
2782    ServerName masterServerName = createRegionServerStatusStub(true);
2783    RegionServerStatusService.BlockingInterface rss = rssStub;
2784    if (masterServerName == null || rss == null) return null;
2785    RegionServerStartupResponse result = null;
2786    try {
2787      rpcServices.requestCount.reset();
2788      rpcServices.rpcGetRequestCount.reset();
2789      rpcServices.rpcScanRequestCount.reset();
2790      rpcServices.rpcMultiRequestCount.reset();
2791      rpcServices.rpcMutateRequestCount.reset();
2792      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2793        + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2794      long now = EnvironmentEdgeManager.currentTime();
2795      int port = rpcServices.isa.getPort();
2796      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2797      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2798        request.setUseThisHostnameInstead(useThisHostnameInstead);
2799      }
2800      request.setPort(port);
2801      request.setServerStartCode(this.startcode);
2802      request.setServerCurrentTime(now);
2803      result = rss.regionServerStartup(null, request.build());
2804    } catch (ServiceException se) {
2805      IOException ioe = ProtobufUtil.getRemoteException(se);
2806      if (ioe instanceof ClockOutOfSyncException) {
2807        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync",
2808            ioe);
2809        // Re-throw IOE will cause RS to abort
2810        throw ioe;
2811      } else if (ioe instanceof ServerNotRunningYetException) {
2812        LOG.debug("Master is not running yet");
2813      } else {
2814        LOG.warn("error telling master we are up", se);
2815      }
2816      rssStub = null;
2817    }
2818    return result;
2819  }
2820
2821  @Override
2822  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2823    try {
2824      GetLastFlushedSequenceIdRequest req =
2825          RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2826      RegionServerStatusService.BlockingInterface rss = rssStub;
2827      if (rss == null) { // Try to connect one more time
2828        createRegionServerStatusStub();
2829        rss = rssStub;
2830        if (rss == null) {
2831          // Still no luck, we tried
2832          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2833          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2834              .build();
2835        }
2836      }
2837      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2838      return RegionStoreSequenceIds.newBuilder()
2839          .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2840          .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2841    } catch (ServiceException e) {
2842      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2843      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2844          .build();
2845    }
2846  }
2847
2848  /**
2849   * Close meta region if we carry it
2850   * @param abort Whether we're running an abort.
2851   */
2852  private void closeMetaTableRegions(final boolean abort) {
2853    HRegion meta = null;
2854    this.onlineRegionsLock.writeLock().lock();
2855    try {
2856      for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2857        RegionInfo hri = e.getValue().getRegionInfo();
2858        if (hri.isMetaRegion()) {
2859          meta = e.getValue();
2860        }
2861        if (meta != null) break;
2862      }
2863    } finally {
2864      this.onlineRegionsLock.writeLock().unlock();
2865    }
2866    if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2867  }
2868
2869  /**
2870   * Schedule closes on all user regions.
2871   * Should be safe calling multiple times because it wont' close regions
2872   * that are already closed or that are closing.
2873   * @param abort Whether we're running an abort.
2874   */
2875  private void closeUserRegions(final boolean abort) {
2876    this.onlineRegionsLock.writeLock().lock();
2877    try {
2878      for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2879        HRegion r = e.getValue();
2880        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2881          // Don't update zk with this close transition; pass false.
2882          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2883        }
2884      }
2885    } finally {
2886      this.onlineRegionsLock.writeLock().unlock();
2887    }
2888  }
2889
2890  /** @return the info server */
2891  public InfoServer getInfoServer() {
2892    return infoServer;
2893  }
2894
2895  /**
2896   * @return true if a stop has been requested.
2897   */
2898  @Override
2899  public boolean isStopped() {
2900    return this.stopped;
2901  }
2902
2903  @Override
2904  public boolean isStopping() {
2905    return this.stopping;
2906  }
2907
2908  @Override
2909  public Configuration getConfiguration() {
2910    return conf;
2911  }
2912
2913  protected Map<String, HRegion> getOnlineRegions() {
2914    return this.onlineRegions;
2915  }
2916
2917  public int getNumberOfOnlineRegions() {
2918    return this.onlineRegions.size();
2919  }
2920
2921  /**
2922   * For tests, web ui and metrics.
2923   * This method will only work if HRegionServer is in the same JVM as client;
2924   * HRegion cannot be serialized to cross an rpc.
2925   */
2926  public Collection<HRegion> getOnlineRegionsLocalContext() {
2927    Collection<HRegion> regions = this.onlineRegions.values();
2928    return Collections.unmodifiableCollection(regions);
2929  }
2930
2931  @Override
2932  public void addRegion(HRegion region) {
2933    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2934    configurationManager.registerObserver(region);
2935  }
2936
2937  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2938      long size) {
2939    if (!sortedRegions.containsKey(size)) {
2940      sortedRegions.put(size, new ArrayList<>());
2941    }
2942    sortedRegions.get(size).add(region);
2943  }
2944  /**
2945   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2946   *   the biggest.
2947   */
2948  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2949    // we'll sort the regions in reverse
2950    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2951    // Copy over all regions. Regions are sorted by size with biggest first.
2952    for (HRegion region : this.onlineRegions.values()) {
2953      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2954    }
2955    return sortedRegions;
2956  }
2957
2958  /**
2959   * @return A new Map of online regions sorted by region heap size with the first entry being the
2960   *   biggest.
2961   */
2962  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2963    // we'll sort the regions in reverse
2964    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2965    // Copy over all regions. Regions are sorted by size with biggest first.
2966    for (HRegion region : this.onlineRegions.values()) {
2967      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2968    }
2969    return sortedRegions;
2970  }
2971
2972  /**
2973   * @return time stamp in millis of when this region server was started
2974   */
2975  public long getStartcode() {
2976    return this.startcode;
2977  }
2978
2979  /** @return reference to FlushRequester */
2980  @Override
2981  public FlushRequester getFlushRequester() {
2982    return this.cacheFlusher;
2983  }
2984
2985  @Override
2986  public CompactionRequester getCompactionRequestor() {
2987    return this.compactSplitThread;
2988  }
2989
2990  @Override
2991  public LeaseManager getLeaseManager() {
2992    return leaseManager;
2993  }
2994
2995  /**
2996   * @return Return the rootDir.
2997   */
2998  protected Path getDataRootDir() {
2999    return dataRootDir;
3000  }
3001
3002  @Override
3003  public FileSystem getFileSystem() {
3004    return dataFs;
3005  }
3006
3007  /**
3008   * @return {@code true} when the data file system is available, {@code false} otherwise.
3009   */
3010  boolean isDataFileSystemOk() {
3011    return this.dataFsOk;
3012  }
3013
3014  /**
3015   * @return Return the walRootDir.
3016   */
3017  public Path getWALRootDir() {
3018    return walRootDir;
3019  }
3020
3021  /**
3022   * @return Return the walFs.
3023   */
3024  public FileSystem getWALFileSystem() {
3025    return walFs;
3026  }
3027
3028  @Override
3029  public String toString() {
3030    return getServerName().toString();
3031  }
3032
3033  @Override
3034  public ZKWatcher getZooKeeper() {
3035    return zooKeeper;
3036  }
3037
3038  @Override
3039  public CoordinatedStateManager getCoordinatedStateManager() {
3040    return csm;
3041  }
3042
3043  @Override
3044  public ServerName getServerName() {
3045    return serverName;
3046  }
3047
3048  public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
3049    return this.rsHost;
3050  }
3051
3052  @Override
3053  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
3054    return this.regionsInTransitionInRS;
3055  }
3056
3057  @Override
3058  public ExecutorService getExecutorService() {
3059    return executorService;
3060  }
3061
3062  @Override
3063  public ChoreService getChoreService() {
3064    return choreService;
3065  }
3066
3067  @Override
3068  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
3069    return rsQuotaManager;
3070  }
3071
3072  //
3073  // Main program and support routines
3074  //
3075  /**
3076   * Load the replication executorService objects, if any
3077   */
3078  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
3079      FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
3080    // read in the name of the source replication class from the config file.
3081    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
3082      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
3083
3084    // read in the name of the sink replication class from the config file.
3085    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
3086      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
3087
3088    // If both the sink and the source class names are the same, then instantiate
3089    // only one object.
3090    if (sourceClassname.equals(sinkClassname)) {
3091      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
3092        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
3093      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
3094      server.sameReplicationSourceAndSink = true;
3095    } else {
3096      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
3097        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
3098      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
3099        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
3100      server.sameReplicationSourceAndSink = false;
3101    }
3102  }
3103
3104  private static <T extends ReplicationService> T newReplicationInstance(String classname,
3105      Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
3106      Path oldLogDir, WALProvider walProvider) throws IOException {
3107    final Class<? extends T> clazz;
3108    try {
3109      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
3110      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
3111    } catch (java.lang.ClassNotFoundException nfe) {
3112      throw new IOException("Could not find class for " + classname);
3113    }
3114    T service = ReflectionUtils.newInstance(clazz, conf);
3115    service.initialize(server, walFs, logDir, oldLogDir, walProvider);
3116    return service;
3117  }
3118
3119  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
3120    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
3121    if(!this.isOnline()){
3122      return walGroupsReplicationStatus;
3123    }
3124    List<ReplicationSourceInterface> allSources = new ArrayList<>();
3125    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
3126    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
3127    for(ReplicationSourceInterface source: allSources){
3128      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
3129    }
3130    return walGroupsReplicationStatus;
3131  }
3132
3133  /**
3134   * Utility for constructing an instance of the passed HRegionServer class.
3135   */
3136  static HRegionServer constructRegionServer(
3137      final Class<? extends HRegionServer> regionServerClass,
3138      final Configuration conf
3139  ) {
3140    try {
3141      Constructor<? extends HRegionServer> c =
3142          regionServerClass.getConstructor(Configuration.class);
3143      return c.newInstance(conf);
3144    } catch (Exception e) {
3145      throw new RuntimeException("Failed construction of " + "Regionserver: "
3146          + regionServerClass.toString(), e);
3147    }
3148  }
3149
3150  /**
3151   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
3152   */
3153  public static void main(String[] args) {
3154    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
3155    VersionInfo.logVersion();
3156    Configuration conf = HBaseConfiguration.create();
3157    @SuppressWarnings("unchecked")
3158    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
3159        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
3160
3161    new HRegionServerCommandLine(regionServerClass).doMain(args);
3162  }
3163
3164  /**
3165   * Gets the online regions of the specified table.
3166   * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
3167   * Only returns <em>online</em> regions.  If a region on this table has been
3168   * closed during a disable, etc., it will not be included in the returned list.
3169   * So, the returned list may not necessarily be ALL regions in this table, its
3170   * all the ONLINE regions in the table.
3171   * @param tableName table to limit the scope of the query
3172   * @return Online regions from <code>tableName</code>
3173   */
3174  @Override
3175  public List<HRegion> getRegions(TableName tableName) {
3176     List<HRegion> tableRegions = new ArrayList<>();
3177     synchronized (this.onlineRegions) {
3178       for (HRegion region: this.onlineRegions.values()) {
3179         RegionInfo regionInfo = region.getRegionInfo();
3180         if(regionInfo.getTable().equals(tableName)) {
3181           tableRegions.add(region);
3182         }
3183       }
3184     }
3185     return tableRegions;
3186   }
3187
3188  @Override
3189  public List<HRegion> getRegions() {
3190    List<HRegion> allRegions;
3191    synchronized (this.onlineRegions) {
3192      // Return a clone copy of the onlineRegions
3193      allRegions = new ArrayList<>(onlineRegions.values());
3194    }
3195    return allRegions;
3196  }
3197
3198  /**
3199   * Gets the online tables in this RS.
3200   * This method looks at the in-memory onlineRegions.
3201   * @return all the online tables in this RS
3202   */
3203  public Set<TableName> getOnlineTables() {
3204    Set<TableName> tables = new HashSet<>();
3205    synchronized (this.onlineRegions) {
3206      for (Region region: this.onlineRegions.values()) {
3207        tables.add(region.getTableDescriptor().getTableName());
3208      }
3209    }
3210    return tables;
3211  }
3212
3213  public String[] getRegionServerCoprocessors() {
3214    TreeSet<String> coprocessors = new TreeSet<>();
3215    try {
3216      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3217    } catch (IOException exception) {
3218      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
3219          "skipping.");
3220      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3221    }
3222    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3223    for (HRegion region: regions) {
3224      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3225      try {
3226        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3227      } catch (IOException exception) {
3228        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
3229            "; skipping.");
3230        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3231      }
3232    }
3233    coprocessors.addAll(rsHost.getCoprocessors());
3234    return coprocessors.toArray(new String[0]);
3235  }
3236
3237  /**
3238   * Try to close the region, logs a warning on failure but continues.
3239   * @param region Region to close
3240   */
3241  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3242    try {
3243      if (!closeRegion(region.getEncodedName(), abort, null)) {
3244        LOG.warn("Failed to close " + region.getRegionNameAsString() +
3245            " - ignoring and continuing");
3246      }
3247    } catch (IOException e) {
3248      LOG.warn("Failed to close " + region.getRegionNameAsString() +
3249          " - ignoring and continuing", e);
3250    }
3251  }
3252
3253  /**
3254   * Close asynchronously a region, can be called from the master or internally by the regionserver
3255   * when stopping. If called from the master, the region will update the status.
3256   *
3257   * <p>
3258   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3259   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3260   * </p>
3261
3262   * <p>
3263   *   If a close was in progress, this new request will be ignored, and an exception thrown.
3264   * </p>
3265   *
3266   * @param encodedName Region to close
3267   * @param abort True if we are aborting
3268   * @param destination Where the Region is being moved too... maybe null if unknown.
3269   * @return True if closed a region.
3270   * @throws NotServingRegionException if the region is not online
3271   */
3272  protected boolean closeRegion(String encodedName, final boolean abort,
3273        final ServerName destination)
3274      throws NotServingRegionException {
3275    //Check for permissions to close.
3276    HRegion actualRegion = this.getRegion(encodedName);
3277    // Can be null if we're calling close on a region that's not online
3278    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3279      try {
3280        actualRegion.getCoprocessorHost().preClose(false);
3281      } catch (IOException exp) {
3282        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3283        return false;
3284      }
3285    }
3286
3287    // previous can come back 'null' if not in map.
3288    final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName),
3289        Boolean.FALSE);
3290
3291    if (Boolean.TRUE.equals(previous)) {
3292      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
3293          "trying to OPEN. Cancelling OPENING.");
3294      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3295        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3296        // We're going to try to do a standard close then.
3297        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
3298            " Doing a standard close now");
3299        return closeRegion(encodedName, abort, destination);
3300      }
3301      // Let's get the region from the online region list again
3302      actualRegion = this.getRegion(encodedName);
3303      if (actualRegion == null) { // If already online, we still need to close it.
3304        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3305        // The master deletes the znode when it receives this exception.
3306        throw new NotServingRegionException("The region " + encodedName +
3307          " was opening but not yet served. Opening is cancelled.");
3308      }
3309    } else if (previous == null) {
3310      LOG.info("Received CLOSE for {}", encodedName);
3311    } else if (Boolean.FALSE.equals(previous)) {
3312      LOG.info("Received CLOSE for the region: " + encodedName +
3313        ", which we are already trying to CLOSE, but not completed yet");
3314      return true;
3315    }
3316
3317    if (actualRegion == null) {
3318      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3319      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3320      // The master deletes the znode when it receives this exception.
3321      throw new NotServingRegionException("The region " + encodedName +
3322          " is not online, and is not opening.");
3323    }
3324
3325    CloseRegionHandler crh;
3326    final RegionInfo hri = actualRegion.getRegionInfo();
3327    if (hri.isMetaRegion()) {
3328      crh = new CloseMetaHandler(this, this, hri, abort);
3329    } else {
3330      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3331    }
3332    this.executorService.submit(crh);
3333    return true;
3334  }
3335
3336   /**
3337   * @return HRegion for the passed binary <code>regionName</code> or null if
3338   *         named region is not member of the online regions.
3339   */
3340  public HRegion getOnlineRegion(final byte[] regionName) {
3341    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3342    return this.onlineRegions.get(encodedRegionName);
3343  }
3344
3345  @Override
3346  public HRegion getRegion(final String encodedRegionName) {
3347    return this.onlineRegions.get(encodedRegionName);
3348  }
3349
3350
3351  @Override
3352  public boolean removeRegion(final HRegion r, ServerName destination) {
3353    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3354    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3355    if (destination != null) {
3356      long closeSeqNum = r.getMaxFlushedSeqId();
3357      if (closeSeqNum == HConstants.NO_SEQNUM) {
3358        // No edits in WAL for this region; get the sequence number when the region was opened.
3359        closeSeqNum = r.getOpenSeqNum();
3360        if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
3361      }
3362      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3363      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3364      if (selfMove) {
3365        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(r.getRegionInfo().getEncodedName()
3366          , new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3367      }
3368    }
3369    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3370    configurationManager.deregisterObserver(r);
3371    return toReturn != null;
3372  }
3373
3374  /**
3375   * Protected Utility method for safely obtaining an HRegion handle.
3376   *
3377   * @param regionName Name of online {@link HRegion} to return
3378   * @return {@link HRegion} for <code>regionName</code>
3379   */
3380  protected HRegion getRegion(final byte[] regionName)
3381      throws NotServingRegionException {
3382    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3383    return getRegionByEncodedName(regionName, encodedRegionName);
3384  }
3385
3386  public HRegion getRegionByEncodedName(String encodedRegionName)
3387      throws NotServingRegionException {
3388    return getRegionByEncodedName(null, encodedRegionName);
3389  }
3390
3391  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3392    throws NotServingRegionException {
3393    HRegion region = this.onlineRegions.get(encodedRegionName);
3394    if (region == null) {
3395      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3396      if (moveInfo != null) {
3397        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3398      }
3399      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3400      String regionNameStr = regionName == null?
3401        encodedRegionName: Bytes.toStringBinary(regionName);
3402      if (isOpening != null && isOpening) {
3403        throw new RegionOpeningException("Region " + regionNameStr +
3404          " is opening on " + this.serverName);
3405      }
3406      throw new NotServingRegionException("" + regionNameStr +
3407        " is not online on " + this.serverName);
3408    }
3409    return region;
3410  }
3411
3412  /**
3413   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3414   * IOE if it isn't already.
3415   *
3416   * @param t Throwable
3417   * @param msg Message to log in error. Can be null.
3418   * @return Throwable converted to an IOE; methods can only let out IOEs.
3419   */
3420  private Throwable cleanup(final Throwable t, final String msg) {
3421    // Don't log as error if NSRE; NSRE is 'normal' operation.
3422    if (t instanceof NotServingRegionException) {
3423      LOG.debug("NotServingRegionException; " + t.getMessage());
3424      return t;
3425    }
3426    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3427    if (msg == null) {
3428      LOG.error("", e);
3429    } else {
3430      LOG.error(msg, e);
3431    }
3432    if (!rpcServices.checkOOME(t)) {
3433      checkFileSystem();
3434    }
3435    return t;
3436  }
3437
3438  /**
3439   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3440   * @return Make <code>t</code> an IOE if it isn't already.
3441   */
3442  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3443    return (t instanceof IOException ? (IOException) t : msg == null
3444        || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3445  }
3446
3447  /**
3448   * Checks to see if the file system is still accessible. If not, sets
3449   * abortRequested and stopRequested
3450   *
3451   * @return false if file system is not available
3452   */
3453  boolean checkFileSystem() {
3454    if (this.dataFsOk && this.dataFs != null) {
3455      try {
3456        FSUtils.checkFileSystemAvailable(this.dataFs);
3457      } catch (IOException e) {
3458        abort("File System not available", e);
3459        this.dataFsOk = false;
3460      }
3461    }
3462    return this.dataFsOk;
3463  }
3464
3465  @Override
3466  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3467      List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3468    InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3469    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3470    // it is a map of region name to InetSocketAddress[]
3471    for (int i = 0; i < favoredNodes.size(); i++) {
3472      addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3473          favoredNodes.get(i).getPort());
3474    }
3475    regionFavoredNodesMap.put(encodedRegionName, addr);
3476  }
3477
3478  /**
3479   * Return the favored nodes for a region given its encoded name. Look at the
3480   * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3481   *
3482   * @return array of favored locations
3483   */
3484  @Override
3485  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3486    return regionFavoredNodesMap.get(encodedRegionName);
3487  }
3488
3489  @Override
3490  public ServerNonceManager getNonceManager() {
3491    return this.nonceManager;
3492  }
3493
3494  private static class MovedRegionInfo {
3495    private final ServerName serverName;
3496    private final long seqNum;
3497
3498    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3499      this.serverName = serverName;
3500      this.seqNum = closeSeqNum;
3501     }
3502
3503    public ServerName getServerName() {
3504      return serverName;
3505    }
3506
3507    public long getSeqNum() {
3508      return seqNum;
3509    }
3510  }
3511
3512  /**
3513   * We need a timeout. If not there is a risk of giving a wrong information: this would double
3514   * the number of network calls instead of reducing them.
3515   */
3516  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3517
3518  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, boolean selfMove) {
3519    if (selfMove) {
3520      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3521      return;
3522    }
3523    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" +
3524        closeSeqNum);
3525    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3526  }
3527
3528  void removeFromMovedRegions(String encodedName) {
3529    movedRegionInfoCache.invalidate(encodedName);
3530  }
3531
3532  @VisibleForTesting
3533  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3534    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3535  }
3536
3537  @VisibleForTesting
3538  public int movedRegionCacheExpiredTime() {
3539        return TIMEOUT_REGION_MOVED;
3540  }
3541
3542  private String getMyEphemeralNodePath() {
3543    return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString());
3544  }
3545
3546  private boolean isHealthCheckerConfigured() {
3547    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3548    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3549  }
3550
3551  /**
3552   * @return the underlying {@link CompactSplit} for the servers
3553   */
3554  public CompactSplit getCompactSplitThread() {
3555    return this.compactSplitThread;
3556  }
3557
3558  CoprocessorServiceResponse execRegionServerService(
3559      @SuppressWarnings("UnusedParameters") final RpcController controller,
3560      final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3561    try {
3562      ServerRpcController serviceController = new ServerRpcController();
3563      CoprocessorServiceCall call = serviceRequest.getCall();
3564      String serviceName = call.getServiceName();
3565      Service service = coprocessorServiceHandlers.get(serviceName);
3566      if (service == null) {
3567        throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
3568            serviceName);
3569      }
3570      ServiceDescriptor serviceDesc =
3571          service.getDescriptorForType();
3572
3573      String methodName = call.getMethodName();
3574      MethodDescriptor methodDesc =
3575          serviceDesc.findMethodByName(methodName);
3576      if (methodDesc == null) {
3577        throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName +
3578            " called on executorService " + serviceName);
3579      }
3580
3581      Message request =
3582          CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3583      final Message.Builder responseBuilder =
3584          service.getResponsePrototype(methodDesc).newBuilderForType();
3585      service.callMethod(methodDesc, serviceController, request, message -> {
3586        if (message != null) {
3587          responseBuilder.mergeFrom(message);
3588        }
3589      });
3590      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3591      if (exception != null) {
3592        throw exception;
3593      }
3594      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3595    } catch (IOException ie) {
3596      throw new ServiceException(ie);
3597    }
3598  }
3599
3600  /**
3601   * May be null if this is a master which not carry table.
3602   *
3603   * @return The block cache instance used by the regionserver.
3604   */
3605  @Override
3606  public Optional<BlockCache> getBlockCache() {
3607    return Optional.ofNullable(this.blockCache);
3608  }
3609
3610  /**
3611   * May be null if this is a master which not carry table.
3612   *
3613   * @return The cache for mob files used by the regionserver.
3614   */
3615  @Override
3616  public Optional<MobFileCache> getMobFileCache() {
3617    return Optional.ofNullable(this.mobFileCache);
3618  }
3619
3620  @Override
3621  public AccessChecker getAccessChecker() {
3622    return rpcServices.getAccessChecker();
3623  }
3624
3625  @Override
3626  public ZKPermissionWatcher getZKPermissionWatcher() {
3627    return rpcServices.getZkPermissionWatcher();
3628  }
3629
3630  /**
3631   * @return : Returns the ConfigurationManager object for testing purposes.
3632   */
3633  @VisibleForTesting
3634  ConfigurationManager getConfigurationManager() {
3635    return configurationManager;
3636  }
3637
3638  /**
3639   * @return Return table descriptors implementation.
3640   */
3641  @Override
3642  public TableDescriptors getTableDescriptors() {
3643    return this.tableDescriptors;
3644  }
3645
3646  /**
3647   * Reload the configuration from disk.
3648   */
3649  void updateConfiguration() {
3650    LOG.info("Reloading the configuration from disk.");
3651    // Reload the configuration from disk.
3652    conf.reloadConfiguration();
3653    configurationManager.notifyAllObservers(conf);
3654  }
3655
3656  CacheEvictionStats clearRegionBlockCache(Region region) {
3657    long evictedBlocks = 0;
3658
3659    for(Store store : region.getStores()) {
3660      for(StoreFile hFile : store.getStorefiles()) {
3661        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3662      }
3663    }
3664
3665    return CacheEvictionStats.builder()
3666        .withEvictedBlocks(evictedBlocks)
3667        .build();
3668  }
3669
3670  @Override
3671  public double getCompactionPressure() {
3672    double max = 0;
3673    for (Region region : onlineRegions.values()) {
3674      for (Store store : region.getStores()) {
3675        double normCount = store.getCompactionPressure();
3676        if (normCount > max) {
3677          max = normCount;
3678        }
3679      }
3680    }
3681    return max;
3682  }
3683
3684  @Override
3685  public HeapMemoryManager getHeapMemoryManager() {
3686    return hMemManager;
3687  }
3688
3689  MemStoreFlusher getMemStoreFlusher() {
3690    return cacheFlusher;
3691  }
3692
3693  /**
3694   * For testing
3695   * @return whether all wal roll request finished for this regionserver
3696   */
3697  @VisibleForTesting
3698  public boolean walRollRequestFinished() {
3699    return this.walRoller.walRollFinished();
3700  }
3701
3702  @Override
3703  public ThroughputController getFlushThroughputController() {
3704    return flushThroughputController;
3705  }
3706
3707  @Override
3708  public double getFlushPressure() {
3709    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3710      // return 0 during RS initialization
3711      return 0.0;
3712    }
3713    return getRegionServerAccounting().getFlushPressure();
3714  }
3715
3716  @Override
3717  public void onConfigurationChange(Configuration newConf) {
3718    ThroughputController old = this.flushThroughputController;
3719    if (old != null) {
3720      old.stop("configuration change");
3721    }
3722    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3723    try {
3724      Superusers.initialize(newConf);
3725    } catch (IOException e) {
3726      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3727    }
3728  }
3729
3730  @Override
3731  public MetricsRegionServer getMetrics() {
3732    return metricsRegionServer;
3733  }
3734
3735  @Override
3736  public SecureBulkLoadManager getSecureBulkLoadManager() {
3737    return this.secureBulkLoadManager;
3738  }
3739
3740  @Override
3741  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3742      final Abortable abort) {
3743    final LockServiceClient client =
3744        new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3745    return client.regionLock(regionInfo, description, abort);
3746  }
3747
3748  @Override
3749  public void unassign(byte[] regionName) throws IOException {
3750    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3751  }
3752
3753  @Override
3754  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3755    return this.rsSpaceQuotaManager;
3756  }
3757
3758  @Override
3759  public boolean reportFileArchivalForQuotas(TableName tableName,
3760      Collection<Entry<String, Long>> archivedFiles) {
3761    RegionServerStatusService.BlockingInterface rss = rssStub;
3762    if (rss == null || rsSpaceQuotaManager == null) {
3763      // the current server could be stopping.
3764      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3765      return false;
3766    }
3767    try {
3768      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3769          rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3770      rss.reportFileArchival(null, request);
3771    } catch (ServiceException se) {
3772      IOException ioe = ProtobufUtil.getRemoteException(se);
3773      if (ioe instanceof PleaseHoldException) {
3774        if (LOG.isTraceEnabled()) {
3775          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3776              + " This will be retried.", ioe);
3777        }
3778        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3779        return false;
3780      }
3781      if (rssStub == rss) {
3782        rssStub = null;
3783      }
3784      // re-create the stub if we failed to report the archival
3785      createRegionServerStatusStub(true);
3786      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3787      return false;
3788    }
3789    return true;
3790  }
3791
3792  public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
3793    return eventLoopGroupConfig;
3794  }
3795
3796  @Override
3797  public Connection createConnection(Configuration conf) throws IOException {
3798    User user = UserProvider.instantiate(conf).getCurrent();
3799    return ConnectionFactory.createConnection(conf, null, user);
3800  }
3801
3802  void executeProcedure(long procId, RSProcedureCallable callable) {
3803    executorService.submit(new RSProcedureHandler(this, procId, callable));
3804  }
3805
3806  public void remoteProcedureComplete(long procId, Throwable error) {
3807    procedureResultReporter.complete(procId, error);
3808  }
3809
3810  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3811    RegionServerStatusService.BlockingInterface rss;
3812    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3813    for (;;) {
3814      rss = rssStub;
3815      if (rss != null) {
3816        break;
3817      }
3818      createRegionServerStatusStub();
3819    }
3820    try {
3821      rss.reportProcedureDone(null, request);
3822    } catch (ServiceException se) {
3823      if (rssStub == rss) {
3824        rssStub = null;
3825      }
3826      throw ProtobufUtil.getRemoteException(se);
3827    }
3828  }
3829
3830  /**
3831   * Will ignore the open/close region procedures which already submitted or executed.
3832   *
3833   * When master had unfinished open/close region procedure and restarted, new active master may
3834   * send duplicate open/close region request to regionserver. The open/close request is submitted
3835   * to a thread pool and execute. So first need a cache for submitted open/close region procedures.
3836   *
3837   * After the open/close region request executed and report region transition succeed, cache it in
3838   * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3839   * transition succeed, master will not send the open/close region request to regionserver again.
3840   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3841   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds.
3842   *
3843   * See HBASE-22404 for more details.
3844   *
3845   * @param procId the id of the open/close region procedure
3846   * @return true if the procedure can be submitted.
3847   */
3848  boolean submitRegionProcedure(long procId) {
3849    if (procId == -1) {
3850      return true;
3851    }
3852    // Ignore the region procedures which already submitted.
3853    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3854    if (previous != null) {
3855      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3856      return false;
3857    }
3858    // Ignore the region procedures which already executed.
3859    if (executedRegionProcedures.getIfPresent(procId) != null) {
3860      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3861      return false;
3862    }
3863    return true;
3864  }
3865
3866  /**
3867   * See {@link #submitRegionProcedure(long)}.
3868   * @param procId the id of the open/close region procedure
3869   */
3870  public void finishRegionProcedure(long procId) {
3871    executedRegionProcedures.put(procId, procId);
3872    submittedRegionProcedures.remove(procId);
3873  }
3874
3875  public boolean isShutDown() {
3876    return shutDown;
3877  }
3878
3879  /**
3880   * Force to terminate region server when abort timeout.
3881   */
3882  private static class SystemExitWhenAbortTimeout extends TimerTask {
3883
3884    public SystemExitWhenAbortTimeout() {
3885    }
3886
3887    @Override
3888    public void run() {
3889      LOG.warn("Aborting region server timed out, terminating forcibly" +
3890          " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3891          " Thread dump to stdout.");
3892      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3893      Runtime.getRuntime().halt(1);
3894    }
3895  }
3896
3897  @Override
3898  public AsyncClusterConnection getAsyncClusterConnection() {
3899    return asyncClusterConnection;
3900  }
3901
3902  @VisibleForTesting
3903  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3904    return compactedFileDischarger;
3905  }
3906}