001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.lang.Thread.UncaughtExceptionHandler;
022import java.lang.management.MemoryType;
023import java.lang.management.MemoryUsage;
024import java.lang.reflect.Constructor;
025import java.net.BindException;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.time.Duration;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.Comparator;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.Objects;
039import java.util.Set;
040import java.util.SortedMap;
041import java.util.Timer;
042import java.util.TimerTask;
043import java.util.TreeMap;
044import java.util.TreeSet;
045import java.util.concurrent.ConcurrentHashMap;
046import java.util.concurrent.ConcurrentMap;
047import java.util.concurrent.ConcurrentSkipListMap;
048import java.util.concurrent.atomic.AtomicBoolean;
049import java.util.concurrent.locks.ReentrantReadWriteLock;
050import java.util.function.Function;
051import javax.management.MalformedObjectNameException;
052import javax.servlet.http.HttpServlet;
053import org.apache.commons.lang3.RandomUtils;
054import org.apache.commons.lang3.StringUtils;
055import org.apache.commons.lang3.SystemUtils;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.hbase.Abortable;
060import org.apache.hadoop.hbase.CacheEvictionStats;
061import org.apache.hadoop.hbase.ChoreService;
062import org.apache.hadoop.hbase.ClockOutOfSyncException;
063import org.apache.hadoop.hbase.CoordinatedStateManager;
064import org.apache.hadoop.hbase.DoNotRetryIOException;
065import org.apache.hadoop.hbase.HBaseConfiguration;
066import org.apache.hadoop.hbase.HBaseInterfaceAudience;
067import org.apache.hadoop.hbase.HConstants;
068import org.apache.hadoop.hbase.HealthCheckChore;
069import org.apache.hadoop.hbase.MetaTableAccessor;
070import org.apache.hadoop.hbase.NotServingRegionException;
071import org.apache.hadoop.hbase.PleaseHoldException;
072import org.apache.hadoop.hbase.ScheduledChore;
073import org.apache.hadoop.hbase.ServerName;
074import org.apache.hadoop.hbase.Stoppable;
075import org.apache.hadoop.hbase.TableDescriptors;
076import org.apache.hadoop.hbase.TableName;
077import org.apache.hadoop.hbase.YouAreDeadException;
078import org.apache.hadoop.hbase.ZNodeClearer;
079import org.apache.hadoop.hbase.client.ClusterConnection;
080import org.apache.hadoop.hbase.client.Connection;
081import org.apache.hadoop.hbase.client.ConnectionUtils;
082import org.apache.hadoop.hbase.client.RegionInfo;
083import org.apache.hadoop.hbase.client.RegionInfoBuilder;
084import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
085import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
086import org.apache.hadoop.hbase.client.locking.EntityLock;
087import org.apache.hadoop.hbase.client.locking.LockServiceClient;
088import org.apache.hadoop.hbase.conf.ConfigurationManager;
089import org.apache.hadoop.hbase.conf.ConfigurationObserver;
090import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
091import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
092import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
093import org.apache.hadoop.hbase.exceptions.RegionMovedException;
094import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
095import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
096import org.apache.hadoop.hbase.executor.ExecutorService;
097import org.apache.hadoop.hbase.executor.ExecutorType;
098import org.apache.hadoop.hbase.fs.HFileSystem;
099import org.apache.hadoop.hbase.http.InfoServer;
100import org.apache.hadoop.hbase.io.hfile.BlockCache;
101import org.apache.hadoop.hbase.io.hfile.CacheConfig;
102import org.apache.hadoop.hbase.io.hfile.HFile;
103import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
104import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
105import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
106import org.apache.hadoop.hbase.ipc.RpcClient;
107import org.apache.hadoop.hbase.ipc.RpcClientFactory;
108import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
109import org.apache.hadoop.hbase.ipc.RpcServer;
110import org.apache.hadoop.hbase.ipc.RpcServerInterface;
111import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
112import org.apache.hadoop.hbase.ipc.ServerRpcController;
113import org.apache.hadoop.hbase.log.HBaseMarkers;
114import org.apache.hadoop.hbase.master.HMaster;
115import org.apache.hadoop.hbase.master.LoadBalancer;
116import org.apache.hadoop.hbase.master.RegionState.State;
117import org.apache.hadoop.hbase.mob.MobCacheConfig;
118import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
119import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
120import org.apache.hadoop.hbase.quotas.QuotaUtil;
121import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
122import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
123import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
124import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
125import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
126import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
127import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
128import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
129import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
130import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
131import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
132import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
133import org.apache.hadoop.hbase.security.Superusers;
134import org.apache.hadoop.hbase.security.User;
135import org.apache.hadoop.hbase.security.UserProvider;
136import org.apache.hadoop.hbase.trace.SpanReceiverHost;
137import org.apache.hadoop.hbase.trace.TraceUtil;
138import org.apache.hadoop.hbase.util.Addressing;
139import org.apache.hadoop.hbase.util.Bytes;
140import org.apache.hadoop.hbase.util.CompressionTest;
141import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
142import org.apache.hadoop.hbase.util.FSTableDescriptors;
143import org.apache.hadoop.hbase.util.FSUtils;
144import org.apache.hadoop.hbase.util.HasThread;
145import org.apache.hadoop.hbase.util.JvmPauseMonitor;
146import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
147import org.apache.hadoop.hbase.util.Pair;
148import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
149import org.apache.hadoop.hbase.util.Sleeper;
150import org.apache.hadoop.hbase.util.Threads;
151import org.apache.hadoop.hbase.util.VersionInfo;
152import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
153import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
154import org.apache.hadoop.hbase.wal.WAL;
155import org.apache.hadoop.hbase.wal.WALFactory;
156import org.apache.hadoop.hbase.wal.WALProvider;
157import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
158import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
159import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
160import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
161import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
162import org.apache.hadoop.hbase.zookeeper.ZKUtil;
163import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
164import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
165import org.apache.hadoop.ipc.RemoteException;
166import org.apache.hadoop.util.ReflectionUtils;
167import org.apache.yetus.audience.InterfaceAudience;
168import org.apache.zookeeper.KeeperException;
169import org.slf4j.Logger;
170import org.slf4j.LoggerFactory;
171import sun.misc.Signal;
172import sun.misc.SignalHandler;
173
174import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
175import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
176import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
177import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
178import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
179import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
180import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
181import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
182import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
183
184import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
185import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
212
213/**
214 * HRegionServer makes a set of HRegions available to clients. It checks in with
215 * the HMaster. There are many HRegionServers in a single HBase deployment.
216 */
217@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
218@SuppressWarnings({ "deprecation"})
219public class HRegionServer extends HasThread implements
220    RegionServerServices, LastSequenceId, ConfigurationObserver {
221  // Time to pause if master says 'please hold'. Make configurable if needed.
222  private static final int INIT_PAUSE_TIME_MS = 1000;
223
224  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
225
226  /**
227   * For testing only!  Set to true to skip notifying region assignment to master .
228   */
229  @VisibleForTesting
230  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
231  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
232
233  //RegionName vs current action in progress
234  //true - if open region action in progress
235  //false - if close region action in progress
236  protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
237    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
238
239  // Cache flushing
240  protected MemStoreFlusher cacheFlusher;
241
242  protected HeapMemoryManager hMemManager;
243
244  /**
245   * Cluster connection to be shared by services.
246   * Initialized at server startup and closed when server shuts down.
247   * Clients must never close it explicitly.
248   * Clients hosted by this Server should make use of this clusterConnection rather than create
249   * their own; if they create their own, there is no way for the hosting server to shutdown
250   * ongoing client RPCs.
251   */
252  protected ClusterConnection clusterConnection;
253
254  /*
255   * Long-living meta table locator, which is created when the server is started and stopped
256   * when server shuts down. References to this locator shall be used to perform according
257   * operations in EventHandlers. Primary reason for this decision is to make it mockable
258   * for tests.
259   */
260  protected MetaTableLocator metaTableLocator;
261
262  /**
263   * Go here to get table descriptors.
264   */
265  protected TableDescriptors tableDescriptors;
266
267  // Replication services. If no replication, this handler will be null.
268  protected ReplicationSourceService replicationSourceHandler;
269  protected ReplicationSinkService replicationSinkHandler;
270
271  // Compactions
272  public CompactSplit compactSplitThread;
273
274  /**
275   * Map of regions currently being served by this region server. Key is the
276   * encoded region name.  All access should be synchronized.
277   */
278  protected final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
279
280  /**
281   * Map of encoded region names to the DataNode locations they should be hosted on
282   * We store the value as InetSocketAddress since this is used only in HDFS
283   * API (create() that takes favored nodes as hints for placing file blocks).
284   * We could have used ServerName here as the value class, but we'd need to
285   * convert it to InetSocketAddress at some point before the HDFS API call, and
286   * it seems a bit weird to store ServerName since ServerName refers to RegionServers
287   * and here we really mean DataNode locations.
288   */
289  protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
290      new ConcurrentHashMap<>();
291
292  // Leases
293  protected Leases leases;
294
295  // Instance of the hbase executor executorService.
296  protected ExecutorService executorService;
297
298  // If false, the file system has become unavailable
299  protected volatile boolean fsOk;
300  protected HFileSystem fs;
301  protected HFileSystem walFs;
302
303  // Set when a report to the master comes back with a message asking us to
304  // shutdown. Also set by call to stop when debugging or running unit tests
305  // of HRegionServer in isolation.
306  private volatile boolean stopped = false;
307
308  // Go down hard. Used if file system becomes unavailable and also in
309  // debugging and unit tests.
310  private volatile boolean abortRequested;
311  public static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
312  // Default abort timeout is 1200 seconds for safe
313  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
314  // Will run this task when abort timeout
315  public static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
316
317  ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<>();
318
319  // A state before we go into stopped state.  At this stage we're closing user
320  // space regions.
321  private boolean stopping = false;
322
323  volatile boolean killed = false;
324
325  protected final Configuration conf;
326
327  private Path rootDir;
328  private Path walRootDir;
329
330  protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
331
332  final int numRetries;
333  protected final int threadWakeFrequency;
334  protected final int msgInterval;
335
336  protected final int numRegionsToReport;
337
338  // Stub to do region server status calls against the master.
339  private volatile RegionServerStatusService.BlockingInterface rssStub;
340  private volatile LockService.BlockingInterface lockStub;
341  // RPC client. Used to make the stub above that does region server status checking.
342  RpcClient rpcClient;
343
344  private RpcRetryingCallerFactory rpcRetryingCallerFactory;
345  private RpcControllerFactory rpcControllerFactory;
346
347  private UncaughtExceptionHandler uncaughtExceptionHandler;
348
349  // Info server. Default access so can be used by unit tests. REGIONSERVER
350  // is name of the webapp and the attribute name used stuffing this instance
351  // into web context.
352  protected InfoServer infoServer;
353  private JvmPauseMonitor pauseMonitor;
354
355  /** region server process name */
356  public static final String REGIONSERVER = "regionserver";
357
358  MetricsRegionServer metricsRegionServer;
359  MetricsTable metricsTable;
360  private SpanReceiverHost spanReceiverHost;
361
362  /**
363   * ChoreService used to schedule tasks that we want to run periodically
364   */
365  private ChoreService choreService;
366
367  /*
368   * Check for compactions requests.
369   */
370  ScheduledChore compactionChecker;
371
372  /*
373   * Check for flushes
374   */
375  ScheduledChore periodicFlusher;
376
377  protected volatile WALFactory walFactory;
378
379  // WAL roller. log is protected rather than private to avoid
380  // eclipse warning when accessed by inner classes
381  protected LogRoller walRoller;
382
383  // flag set after we're done setting up server threads
384  final AtomicBoolean online = new AtomicBoolean(false);
385
386  // zookeeper connection and watcher
387  protected final ZKWatcher zooKeeper;
388
389  // master address tracker
390  private final MasterAddressTracker masterAddressTracker;
391
392  // Cluster Status Tracker
393  protected final ClusterStatusTracker clusterStatusTracker;
394
395  // Log Splitting Worker
396  private SplitLogWorker splitLogWorker;
397
398  // A sleeper that sleeps for msgInterval.
399  protected final Sleeper sleeper;
400
401  private final int operationTimeout;
402  private final int shortOperationTimeout;
403
404  private final RegionServerAccounting regionServerAccounting;
405
406  // Cache configuration and block cache reference
407  protected CacheConfig cacheConfig;
408  // Cache configuration for mob
409  final MobCacheConfig mobCacheConfig;
410
411  /** The health check chore. */
412  private HealthCheckChore healthCheckChore;
413
414  /** The nonce manager chore. */
415  private ScheduledChore nonceManagerChore;
416
417  private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
418
419  /**
420   * The server name the Master sees us as.  Its made from the hostname the
421   * master passes us, port, and server startcode. Gets set after registration
422   * against  Master.
423   */
424  protected ServerName serverName;
425
426  /*
427   * hostname specified by hostname config
428   */
429  protected String useThisHostnameInstead;
430
431  // key to the config parameter of server hostname
432  // the specification of server hostname is optional. The hostname should be resolvable from
433  // both master and region server
434  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
435  final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
436  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
437  protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
438
439  // HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive.
440  // Exception will be thrown if both are used.
441  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
442    "hbase.regionserver.hostname.disable.master.reversedns";
443
444  /**
445   * This servers startcode.
446   */
447  protected final long startcode;
448
449  /**
450   * Unique identifier for the cluster we are a part of.
451   */
452  protected String clusterId;
453
454  /**
455   * Chore to clean periodically the moved region list
456   */
457  private MovedRegionsCleaner movedRegionsCleaner;
458
459  // chore for refreshing store files for secondary regions
460  private StorefileRefresherChore storefileRefresher;
461
462  private RegionServerCoprocessorHost rsHost;
463
464  private RegionServerProcedureManagerHost rspmHost;
465
466  private RegionServerRpcQuotaManager rsQuotaManager;
467  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
468
469  /**
470   * Nonce manager. Nonces are used to make operations like increment and append idempotent
471   * in the case where client doesn't receive the response from a successful operation and
472   * retries. We track the successful ops for some time via a nonce sent by client and handle
473   * duplicate operations (currently, by failing them; in future we might use MVCC to return
474   * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
475   * HBASE-3787) are:
476   * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
477   *   of past records. If we don't read the records, we don't read and recover the nonces.
478   *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
479   * - There's no WAL recovery during normal region move, so nonces will not be transfered.
480   * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
481   * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
482   * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
483   * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
484   * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
485   * latest nonce in it expired. It can also be recovered during move.
486   */
487  final ServerNonceManager nonceManager;
488
489  private UserProvider userProvider;
490
491  protected final RSRpcServices rpcServices;
492
493  protected CoordinatedStateManager csm;
494
495  /**
496   * Configuration manager is used to register/deregister and notify the configuration observers
497   * when the regionserver is notified that there was a change in the on disk configs.
498   */
499  protected final ConfigurationManager configurationManager;
500
501  @VisibleForTesting
502  CompactedHFilesDischarger compactedFileDischarger;
503
504  private volatile ThroughputController flushThroughputController;
505
506  protected SecureBulkLoadManager secureBulkLoadManager;
507
508  protected FileSystemUtilizationChore fsUtilizationChore;
509
510  private final NettyEventLoopGroupConfig eventLoopGroupConfig;
511
512  /**
513   * True if this RegionServer is coming up in a cluster where there is no Master;
514   * means it needs to just come up and make do without a Master to talk to: e.g. in test or
515   * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
516   * purpose is as a Replication-stream sink; see HBASE-18846 for more.
517   */
518  private final boolean masterless;
519  static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
520
521  /**
522   * Starts a HRegionServer at the default location
523   */
524  // Don't start any services or managers in here in the Constructor.
525  // Defer till after we register with the Master as much as possible. See #startServices.
526  public HRegionServer(Configuration conf) throws IOException {
527    super("RegionServer");  // thread name
528    TraceUtil.initTracer(conf);
529    try {
530      this.startcode = System.currentTimeMillis();
531      this.conf = conf;
532      this.fsOk = true;
533      this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
534      this.eventLoopGroupConfig = setupNetty(this.conf);
535      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
536      HFile.checkHFileVersion(this.conf);
537      checkCodecs(this.conf);
538      this.userProvider = UserProvider.instantiate(conf);
539      FSUtils.setupShortCircuitRead(this.conf);
540
541      // Disable usage of meta replicas in the regionserver
542      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
543      // Config'ed params
544      this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
545          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
546      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
547      this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
548
549      this.sleeper = new Sleeper(this.msgInterval, this);
550
551      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
552      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
553
554      this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10);
555
556      this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
557          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
558
559      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
560          HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
561
562      this.abortRequested = false;
563      this.stopped = false;
564
565      rpcServices = createRpcServices();
566      useThisHostnameInstead = getUseThisHostnameInstead(conf);
567      String hostName =
568          StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName()
569              : this.useThisHostnameInstead;
570      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
571
572      rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
573      rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
574
575      // login the zookeeper client principal (if using security)
576      ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
577          HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
578      // login the server principal (if using secure Hadoop)
579      login(userProvider, hostName);
580      // init superusers and add the server principal (if using security)
581      // or process owner as default super user.
582      Superusers.initialize(conf);
583      regionServerAccounting = new RegionServerAccounting(conf);
584
585      boolean isMasterNotCarryTable =
586          this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf);
587      // no need to instantiate global block cache when master not carry table
588      if (!isMasterNotCarryTable) {
589        CacheConfig.instantiateBlockCache(conf);
590      }
591      cacheConfig = new CacheConfig(conf);
592      mobCacheConfig = new MobCacheConfig(conf);
593
594      uncaughtExceptionHandler = new UncaughtExceptionHandler() {
595        @Override
596        public void uncaughtException(Thread t, Throwable e) {
597          abort("Uncaught exception in executorService thread " + t.getName(), e);
598        }
599      };
600
601      initializeFileSystem();
602      spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
603
604      this.configurationManager = new ConfigurationManager();
605      setupWindows(getConfiguration(), getConfigurationManager());
606
607      // Some unit tests don't need a cluster, so no zookeeper at all
608      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
609        // Open connection to zookeeper and set primary watcher
610        zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
611          rpcServices.isa.getPort(), this, canCreateBaseZNode());
612        // If no master in cluster, skip trying to track one or look for a cluster status.
613        if (!this.masterless) {
614          this.csm = new ZkCoordinatedStateManager(this);
615
616          masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
617          masterAddressTracker.start();
618
619          clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
620          clusterStatusTracker.start();
621        } else {
622          masterAddressTracker = null;
623          clusterStatusTracker = null;
624        }
625      } else {
626        zooKeeper = null;
627        masterAddressTracker = null;
628        clusterStatusTracker = null;
629      }
630      this.rpcServices.start(zooKeeper);
631      // This violates 'no starting stuff in Constructor' but Master depends on the below chore
632      // and executor being created and takes a different startup route. Lots of overlap between HRS
633      // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
634      // Master expects Constructor to put up web servers. Ugh.
635      // class HRS. TODO.
636      this.choreService = new ChoreService(getName(), true);
637      this.executorService = new ExecutorService(getName());
638      putUpWebUI();
639    } catch (Throwable t) {
640      // Make sure we log the exception. HRegionServer is often started via reflection and the
641      // cause of failed startup is lost.
642      LOG.error("Failed construction RegionServer", t);
643      throw t;
644    }
645  }
646
647  // HMaster should override this method to load the specific config for master
648  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
649    String hostname = conf.get(RS_HOSTNAME_KEY);
650    if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
651      if (!StringUtils.isBlank(hostname)) {
652        String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY +
653          " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
654          " to true while " + RS_HOSTNAME_KEY + " is used";
655        throw new IOException(msg);
656      } else {
657        return rpcServices.isa.getHostName();
658      }
659    } else {
660      return hostname;
661    }
662  }
663
664  /**
665   * If running on Windows, do windows-specific setup.
666   */
667  private static void setupWindows(final Configuration conf, ConfigurationManager cm) {
668    if (!SystemUtils.IS_OS_WINDOWS) {
669      Signal.handle(new Signal("HUP"), new SignalHandler() {
670        @Override
671        public void handle(Signal signal) {
672          conf.reloadConfiguration();
673          cm.notifyAllObservers(conf);
674        }
675      });
676    }
677  }
678
679  private static NettyEventLoopGroupConfig setupNetty(Configuration conf) {
680    // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
681    NettyEventLoopGroupConfig nelgc =
682      new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
683    NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
684    NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
685    return nelgc;
686  }
687
688  private void initializeFileSystem() throws IOException {
689    // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
690    // checksum verification enabled, then automatically switch off hdfs checksum verification.
691    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
692    FSUtils.setFsDefault(this.conf, FSUtils.getWALRootDir(this.conf));
693    this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
694    this.walRootDir = FSUtils.getWALRootDir(this.conf);
695    // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
696    // underlying hadoop hdfs accessors will be going against wrong filesystem
697    // (unless all is set to defaults).
698    FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
699    this.fs = new HFileSystem(this.conf, useHBaseChecksum);
700    this.rootDir = FSUtils.getRootDir(this.conf);
701    this.tableDescriptors = getFsTableDescriptors();
702  }
703
704  protected TableDescriptors getFsTableDescriptors() throws IOException {
705    return new FSTableDescriptors(this.conf,
706      this.fs, this.rootDir, !canUpdateTableDescriptor(), false, getMetaTableObserver());
707  }
708
709  protected Function<TableDescriptorBuilder, TableDescriptorBuilder> getMetaTableObserver() {
710    return null;
711  }
712
713  protected void login(UserProvider user, String host) throws IOException {
714    user.login("hbase.regionserver.keytab.file",
715      "hbase.regionserver.kerberos.principal", host);
716  }
717
718
719  /**
720   * Wait for an active Master.
721   * See override in Master superclass for how it is used.
722   */
723  protected void waitForMasterActive() {}
724
725  protected String getProcessName() {
726    return REGIONSERVER;
727  }
728
729  protected boolean canCreateBaseZNode() {
730    return this.masterless;
731  }
732
733  protected boolean canUpdateTableDescriptor() {
734    return false;
735  }
736
737  protected RSRpcServices createRpcServices() throws IOException {
738    return new RSRpcServices(this);
739  }
740
741  protected void configureInfoServer() {
742    infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
743    infoServer.setAttribute(REGIONSERVER, this);
744  }
745
746  protected Class<? extends HttpServlet> getDumpServlet() {
747    return RSDumpServlet.class;
748  }
749
750  @Override
751  public boolean registerService(com.google.protobuf.Service instance) {
752    /*
753     * No stacking of instances is allowed for a single executorService name
754     */
755    com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
756        instance.getDescriptorForType();
757    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
758    if (coprocessorServiceHandlers.containsKey(serviceName)) {
759      LOG.error("Coprocessor executorService " + serviceName
760          + " already registered, rejecting request from " + instance);
761      return false;
762    }
763
764    coprocessorServiceHandlers.put(serviceName, instance);
765    if (LOG.isDebugEnabled()) {
766      LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName);
767    }
768    return true;
769  }
770
771  /**
772   * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to
773   * the local server; i.e. a short-circuit Connection. Safe to use going to local or remote
774   * server. Create this instance in a method can be intercepted and mocked in tests.
775   * @throws IOException
776   */
777  @VisibleForTesting
778  protected ClusterConnection createClusterConnection() throws IOException {
779    // Create a cluster connection that when appropriate, can short-circuit and go directly to the
780    // local server if the request is to the local server bypassing RPC. Can be used for both local
781    // and remote invocations.
782    return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
783      serverName, rpcServices, rpcServices);
784  }
785
786  /**
787   * Run test on configured codecs to make sure supporting libs are in place.
788   * @param c
789   * @throws IOException
790   */
791  private static void checkCodecs(final Configuration c) throws IOException {
792    // check to see if the codec list is available:
793    String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
794    if (codecs == null) return;
795    for (String codec : codecs) {
796      if (!CompressionTest.testCompression(codec)) {
797        throw new IOException("Compression codec " + codec +
798          " not supported, aborting RS construction");
799      }
800    }
801  }
802
803  public String getClusterId() {
804    return this.clusterId;
805  }
806
807  /**
808   * Setup our cluster connection if not already initialized.
809   * @throws IOException
810   */
811  protected synchronized void setupClusterConnection() throws IOException {
812    if (clusterConnection == null) {
813      clusterConnection = createClusterConnection();
814      metaTableLocator = new MetaTableLocator();
815    }
816  }
817
818  /**
819   * All initialization needed before we go register with Master.<br>
820   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
821   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
822   */
823  private void preRegistrationInitialization() {
824    try {
825      initializeZooKeeper();
826      setupClusterConnection();
827      // Setup RPC client for master communication
828      this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
829          this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
830    } catch (Throwable t) {
831      // Call stop if error or process will stick around for ever since server
832      // puts up non-daemon threads.
833      this.rpcServices.stop();
834      abort("Initialization of RS failed.  Hence aborting RS.", t);
835    }
836  }
837
838  /**
839   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
840   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
841   * <p>
842   * Finally open long-living server short-circuit connection.
843   */
844  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
845    justification="cluster Id znode read would give us correct response")
846  private void initializeZooKeeper() throws IOException, InterruptedException {
847    // Nothing to do in here if no Master in the mix.
848    if (this.masterless) {
849      return;
850    }
851
852    // Create the master address tracker, register with zk, and start it.  Then
853    // block until a master is available.  No point in starting up if no master
854    // running.
855    blockAndCheckIfStopped(this.masterAddressTracker);
856
857    // Wait on cluster being up.  Master will set this flag up in zookeeper
858    // when ready.
859    blockAndCheckIfStopped(this.clusterStatusTracker);
860
861    // If we are HMaster then the cluster id should have already been set.
862    if (clusterId == null) {
863      // Retrieve clusterId
864      // Since cluster status is now up
865      // ID should have already been set by HMaster
866      try {
867        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
868        if (clusterId == null) {
869          this.abort("Cluster ID has not been set");
870        }
871        LOG.info("ClusterId : " + clusterId);
872      } catch (KeeperException e) {
873        this.abort("Failed to retrieve Cluster ID", e);
874      }
875    }
876
877    waitForMasterActive();
878    if (isStopped() || isAborted()) {
879      return; // No need for further initialization
880    }
881
882    // watch for snapshots and other procedures
883    try {
884      rspmHost = new RegionServerProcedureManagerHost();
885      rspmHost.loadProcedures(conf);
886      rspmHost.initialize(this);
887    } catch (KeeperException e) {
888      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
889    }
890  }
891
892  /**
893   * Utilty method to wait indefinitely on a znode availability while checking
894   * if the region server is shut down
895   * @param tracker znode tracker to use
896   * @throws IOException any IO exception, plus if the RS is stopped
897   * @throws InterruptedException
898   */
899  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
900      throws IOException, InterruptedException {
901    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
902      if (this.stopped) {
903        throw new IOException("Received the shutdown message while waiting.");
904      }
905    }
906  }
907
908  /**
909   * @return True if the cluster is up.
910   */
911  @Override
912  public boolean isClusterUp() {
913    return this.masterless ||
914        (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
915  }
916
917  /**
918   * The HRegionServer sticks in this loop until closed.
919   */
920  @Override
921  public void run() {
922    try {
923      // Do pre-registration initializations; zookeeper, lease threads, etc.
924      preRegistrationInitialization();
925    } catch (Throwable e) {
926      abort("Fatal exception during initialization", e);
927    }
928
929    try {
930      if (!isStopped() && !isAborted()) {
931        ShutdownHook.install(conf, fs, this, Thread.currentThread());
932        // Initialize the RegionServerCoprocessorHost now that our ephemeral
933        // node was created, in case any coprocessors want to use ZooKeeper
934        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
935      }
936
937      // Try and register with the Master; tell it we are here.  Break if
938      // server is stopped or the clusterup flag is down or hdfs went wacky.
939      // Once registered successfully, go ahead and start up all Services.
940      LOG.debug("About to register with Master.");
941      while (keepLooping()) {
942        RegionServerStartupResponse w = reportForDuty();
943        if (w == null) {
944          LOG.warn("reportForDuty failed; sleeping and then retrying.");
945          this.sleeper.sleep();
946        } else {
947          handleReportForDutyResponse(w);
948          break;
949        }
950      }
951
952      if (!isStopped() && isHealthy()) {
953        // start the snapshot handler and other procedure handlers,
954        // since the server is ready to run
955        if (this.rspmHost != null) {
956          this.rspmHost.start();
957        }
958        // Start the Quota Manager
959        if (this.rsQuotaManager != null) {
960          rsQuotaManager.start(getRpcServer().getScheduler());
961        }
962        if (this.rsSpaceQuotaManager != null) {
963          this.rsSpaceQuotaManager.start();
964        }
965      }
966
967      // We registered with the Master.  Go into run mode.
968      long lastMsg = System.currentTimeMillis();
969      long oldRequestCount = -1;
970      // The main run loop.
971      while (!isStopped() && isHealthy()) {
972        if (!isClusterUp()) {
973          if (isOnlineRegionsEmpty()) {
974            stop("Exiting; cluster shutdown set and not carrying any regions");
975          } else if (!this.stopping) {
976            this.stopping = true;
977            LOG.info("Closing user regions");
978            closeUserRegions(this.abortRequested);
979          } else if (this.stopping) {
980            boolean allUserRegionsOffline = areAllUserRegionsOffline();
981            if (allUserRegionsOffline) {
982              // Set stopped if no more write requests tp meta tables
983              // since last time we went around the loop.  Any open
984              // meta regions will be closed on our way out.
985              if (oldRequestCount == getWriteRequestCount()) {
986                stop("Stopped; only catalog regions remaining online");
987                break;
988              }
989              oldRequestCount = getWriteRequestCount();
990            } else {
991              // Make sure all regions have been closed -- some regions may
992              // have not got it because we were splitting at the time of
993              // the call to closeUserRegions.
994              closeUserRegions(this.abortRequested);
995            }
996            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
997          }
998        }
999        long now = System.currentTimeMillis();
1000        if ((now - lastMsg) >= msgInterval) {
1001          tryRegionServerReport(lastMsg, now);
1002          lastMsg = System.currentTimeMillis();
1003        }
1004        if (!isStopped() && !isAborted()) {
1005          this.sleeper.sleep();
1006        }
1007      } // for
1008    } catch (Throwable t) {
1009      if (!rpcServices.checkOOME(t)) {
1010        String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
1011        abort(prefix + t.getMessage(), t);
1012      }
1013    }
1014
1015    if (abortRequested) {
1016      Timer abortMonitor = new Timer("Abort regionserver monitor", true);
1017      TimerTask abortTimeoutTask = null;
1018      try {
1019        abortTimeoutTask =
1020            Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
1021                .asSubclass(TimerTask.class).getDeclaredConstructor().newInstance();
1022      } catch (Exception e) {
1023        LOG.warn("Initialize abort timeout task failed", e);
1024      }
1025      if (abortTimeoutTask != null) {
1026        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
1027      }
1028    }
1029
1030    if (this.leases != null) {
1031      this.leases.closeAfterLeasesExpire();
1032    }
1033    if (this.splitLogWorker != null) {
1034      splitLogWorker.stop();
1035    }
1036    if (this.infoServer != null) {
1037      LOG.info("Stopping infoServer");
1038      try {
1039        this.infoServer.stop();
1040      } catch (Exception e) {
1041        LOG.error("Failed to stop infoServer", e);
1042      }
1043    }
1044    // Send cache a shutdown.
1045    if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
1046      cacheConfig.getBlockCache().shutdown();
1047    }
1048    mobCacheConfig.getMobFileCache().shutdown();
1049
1050    if (movedRegionsCleaner != null) {
1051      movedRegionsCleaner.stop("Region Server stopping");
1052    }
1053
1054    // Send interrupts to wake up threads if sleeping so they notice shutdown.
1055    // TODO: Should we check they are alive? If OOME could have exited already
1056    if (this.hMemManager != null) this.hMemManager.stop();
1057    if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1058    if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1059    if (this.compactionChecker != null) this.compactionChecker.cancel(true);
1060    if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
1061    if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
1062    if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
1063    sendShutdownInterrupt();
1064
1065    // Stop the quota manager
1066    if (rsQuotaManager != null) {
1067      rsQuotaManager.stop();
1068    }
1069    if (rsSpaceQuotaManager != null) {
1070      rsSpaceQuotaManager.stop();
1071      rsSpaceQuotaManager = null;
1072    }
1073
1074    // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1075    if (rspmHost != null) {
1076      rspmHost.stop(this.abortRequested || this.killed);
1077    }
1078
1079    if (this.killed) {
1080      // Just skip out w/o closing regions.  Used when testing.
1081    } else if (abortRequested) {
1082      if (this.fsOk) {
1083        closeUserRegions(abortRequested); // Don't leave any open file handles
1084      }
1085      LOG.info("aborting server " + this.serverName);
1086    } else {
1087      closeUserRegions(abortRequested);
1088      LOG.info("stopping server " + this.serverName);
1089    }
1090
1091    // so callers waiting for meta without timeout can stop
1092    if (this.metaTableLocator != null) this.metaTableLocator.stop();
1093    if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1094      try {
1095        this.clusterConnection.close();
1096      } catch (IOException e) {
1097        // Although the {@link Closeable} interface throws an {@link
1098        // IOException}, in reality, the implementation would never do that.
1099        LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
1100      }
1101    }
1102
1103    // Closing the compactSplit thread before closing meta regions
1104    if (!this.killed && containsMetaTableRegions()) {
1105      if (!abortRequested || this.fsOk) {
1106        if (this.compactSplitThread != null) {
1107          this.compactSplitThread.join();
1108          this.compactSplitThread = null;
1109        }
1110        closeMetaTableRegions(abortRequested);
1111      }
1112    }
1113
1114    if (!this.killed && this.fsOk) {
1115      waitOnAllRegionsToClose(abortRequested);
1116      LOG.info("stopping server " + this.serverName + "; all regions closed.");
1117    }
1118
1119    //fsOk flag may be changed when closing regions throws exception.
1120    if (this.fsOk) {
1121      shutdownWAL(!abortRequested);
1122    }
1123
1124    // Make sure the proxy is down.
1125    if (this.rssStub != null) {
1126      this.rssStub = null;
1127    }
1128    if (this.lockStub != null) {
1129      this.lockStub = null;
1130    }
1131    if (this.rpcClient != null) {
1132      this.rpcClient.close();
1133    }
1134    if (this.leases != null) {
1135      this.leases.close();
1136    }
1137    if (this.pauseMonitor != null) {
1138      this.pauseMonitor.stop();
1139    }
1140
1141    if (!killed) {
1142      stopServiceThreads();
1143    }
1144
1145    if (this.rpcServices != null) {
1146      this.rpcServices.stop();
1147    }
1148
1149    try {
1150      deleteMyEphemeralNode();
1151    } catch (KeeperException.NoNodeException nn) {
1152    } catch (KeeperException e) {
1153      LOG.warn("Failed deleting my ephemeral node", e);
1154    }
1155    // We may have failed to delete the znode at the previous step, but
1156    //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1157    ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1158
1159    if (this.zooKeeper != null) {
1160      this.zooKeeper.close();
1161    }
1162    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1163  }
1164
1165  private boolean containsMetaTableRegions() {
1166    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1167  }
1168
1169  private boolean areAllUserRegionsOffline() {
1170    if (getNumberOfOnlineRegions() > 2) return false;
1171    boolean allUserRegionsOffline = true;
1172    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1173      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1174        allUserRegionsOffline = false;
1175        break;
1176      }
1177    }
1178    return allUserRegionsOffline;
1179  }
1180
1181  /**
1182   * @return Current write count for all online regions.
1183   */
1184  private long getWriteRequestCount() {
1185    long writeCount = 0;
1186    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1187      writeCount += e.getValue().getWriteRequestsCount();
1188    }
1189    return writeCount;
1190  }
1191
1192  @VisibleForTesting
1193  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1194      throws IOException {
1195    RegionServerStatusService.BlockingInterface rss = rssStub;
1196    if (rss == null) {
1197      // the current server could be stopping.
1198      return;
1199    }
1200    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1201    try {
1202      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1203      request.setServer(ProtobufUtil.toServerName(this.serverName));
1204      request.setLoad(sl);
1205      rss.regionServerReport(null, request.build());
1206    } catch (ServiceException se) {
1207      IOException ioe = ProtobufUtil.getRemoteException(se);
1208      if (ioe instanceof YouAreDeadException) {
1209        // This will be caught and handled as a fatal error in run()
1210        throw ioe;
1211      }
1212      if (rssStub == rss) {
1213        rssStub = null;
1214      }
1215      // Couldn't connect to the master, get location from zk and reconnect
1216      // Method blocks until new master is found or we are stopped
1217      createRegionServerStatusStub(true);
1218    }
1219  }
1220
1221  /**
1222   * Reports the given map of Regions and their size on the filesystem to the active Master.
1223   *
1224   * @param onlineRegionSizes A map of region info to size in bytes
1225   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1226   */
1227  public boolean reportRegionSizesForQuotas(final Map<RegionInfo, Long> onlineRegionSizes) {
1228    RegionServerStatusService.BlockingInterface rss = rssStub;
1229    if (rss == null) {
1230      // the current server could be stopping.
1231      LOG.trace("Skipping Region size report to HMaster as stub is null");
1232      return true;
1233    }
1234    try {
1235      RegionSpaceUseReportRequest request = buildRegionSpaceUseReportRequest(
1236          Objects.requireNonNull(onlineRegionSizes));
1237      rss.reportRegionSpaceUse(null, request);
1238    } catch (ServiceException se) {
1239      IOException ioe = ProtobufUtil.getRemoteException(se);
1240      if (ioe instanceof PleaseHoldException) {
1241        LOG.trace("Failed to report region sizes to Master because it is initializing."
1242            + " This will be retried.", ioe);
1243        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1244        return true;
1245      }
1246      if (rssStub == rss) {
1247        rssStub = null;
1248      }
1249      createRegionServerStatusStub(true);
1250      if (ioe instanceof DoNotRetryIOException) {
1251        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1252        if (doNotRetryEx.getCause() != null) {
1253          Throwable t = doNotRetryEx.getCause();
1254          if (t instanceof UnsupportedOperationException) {
1255            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1256            return false;
1257          }
1258        }
1259      }
1260      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1261    }
1262    return true;
1263  }
1264
1265  /**
1266   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1267   *
1268   * @param regionSizes Map of region info to size in bytes.
1269   * @return The corresponding protocol buffer message.
1270   */
1271  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(Map<RegionInfo,Long> regionSizes) {
1272    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1273    for (Entry<RegionInfo, Long> entry : Objects.requireNonNull(regionSizes).entrySet()) {
1274      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue()));
1275    }
1276    return request.build();
1277  }
1278
1279  /**
1280   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1281   * protobuf message.
1282   *
1283   * @param regionInfo The RegionInfo
1284   * @param sizeInBytes The size in bytes of the Region
1285   * @return The protocol buffer
1286   */
1287  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1288    return RegionSpaceUse.newBuilder()
1289        .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1290        .setRegionSize(Objects.requireNonNull(sizeInBytes))
1291        .build();
1292  }
1293
1294  ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1295      throws IOException {
1296    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1297    // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1298    // the wrapper to compute those numbers in one place.
1299    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1300    // Instead they should be stored in an HBase table so that external visibility into HBase is
1301    // improved; Additionally the load balancer will be able to take advantage of a more complete
1302    // history.
1303    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1304    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1305    long usedMemory = -1L;
1306    long maxMemory = -1L;
1307    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1308    if (usage != null) {
1309      usedMemory = usage.getUsed();
1310      maxMemory = usage.getMax();
1311    }
1312
1313    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1314    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1315    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1316    serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1317    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1318    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1319    Builder coprocessorBuilder = Coprocessor.newBuilder();
1320    for (String coprocessor : coprocessors) {
1321      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1322    }
1323    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1324    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1325    for (HRegion region : regions) {
1326      if (region.getCoprocessorHost() != null) {
1327        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1328        Iterator<String> iterator = regionCoprocessors.iterator();
1329        while (iterator.hasNext()) {
1330          serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1331        }
1332      }
1333      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1334      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1335          .getCoprocessors()) {
1336        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1337      }
1338    }
1339    serverLoad.setReportStartTime(reportStartTime);
1340    serverLoad.setReportEndTime(reportEndTime);
1341    if (this.infoServer != null) {
1342      serverLoad.setInfoServerPort(this.infoServer.getPort());
1343    } else {
1344      serverLoad.setInfoServerPort(-1);
1345    }
1346
1347    // for the replicationLoad purpose. Only need to get from one executorService
1348    // either source or sink will get the same info
1349    ReplicationSourceService rsources = getReplicationSourceService();
1350
1351    if (rsources != null) {
1352      // always refresh first to get the latest value
1353      ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1354      if (rLoad != null) {
1355        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1356        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1357          serverLoad.addReplLoadSource(rLS);
1358        }
1359      }
1360    }
1361
1362    return serverLoad.build();
1363  }
1364
1365  String getOnlineRegionsAsPrintableString() {
1366    StringBuilder sb = new StringBuilder();
1367    for (Region r: this.onlineRegions.values()) {
1368      if (sb.length() > 0) sb.append(", ");
1369      sb.append(r.getRegionInfo().getEncodedName());
1370    }
1371    return sb.toString();
1372  }
1373
1374  /**
1375   * Wait on regions close.
1376   */
1377  private void waitOnAllRegionsToClose(final boolean abort) {
1378    // Wait till all regions are closed before going out.
1379    int lastCount = -1;
1380    long previousLogTime = 0;
1381    Set<String> closedRegions = new HashSet<>();
1382    boolean interrupted = false;
1383    try {
1384      while (!isOnlineRegionsEmpty()) {
1385        int count = getNumberOfOnlineRegions();
1386        // Only print a message if the count of regions has changed.
1387        if (count != lastCount) {
1388          // Log every second at most
1389          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1390            previousLogTime = System.currentTimeMillis();
1391            lastCount = count;
1392            LOG.info("Waiting on " + count + " regions to close");
1393            // Only print out regions still closing if a small number else will
1394            // swamp the log.
1395            if (count < 10 && LOG.isDebugEnabled()) {
1396              LOG.debug("Online Regions=" + this.onlineRegions);
1397            }
1398          }
1399        }
1400        // Ensure all user regions have been sent a close. Use this to
1401        // protect against the case where an open comes in after we start the
1402        // iterator of onlineRegions to close all user regions.
1403        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1404          RegionInfo hri = e.getValue().getRegionInfo();
1405          if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) &&
1406              !closedRegions.contains(hri.getEncodedName())) {
1407            closedRegions.add(hri.getEncodedName());
1408            // Don't update zk with this close transition; pass false.
1409            closeRegionIgnoreErrors(hri, abort);
1410          }
1411        }
1412        // No regions in RIT, we could stop waiting now.
1413        if (this.regionsInTransitionInRS.isEmpty()) {
1414          if (!isOnlineRegionsEmpty()) {
1415            LOG.info("We were exiting though online regions are not empty," +
1416                " because some regions failed closing");
1417          }
1418          break;
1419        }
1420        if (sleep(200)) {
1421          interrupted = true;
1422        }
1423      }
1424    } finally {
1425      if (interrupted) {
1426        Thread.currentThread().interrupt();
1427      }
1428    }
1429  }
1430
1431  private boolean sleep(long millis) {
1432    boolean interrupted = false;
1433    try {
1434      Thread.sleep(millis);
1435    } catch (InterruptedException e) {
1436      LOG.warn("Interrupted while sleeping");
1437      interrupted = true;
1438    }
1439    return interrupted;
1440  }
1441
1442  private void shutdownWAL(final boolean close) {
1443    if (this.walFactory != null) {
1444      try {
1445        if (close) {
1446          walFactory.close();
1447        } else {
1448          walFactory.shutdown();
1449        }
1450      } catch (Throwable e) {
1451        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1452        LOG.error("Shutdown / close of WAL failed: " + e);
1453        LOG.debug("Shutdown / close exception details:", e);
1454      }
1455    }
1456  }
1457
1458  /*
1459   * Run init. Sets up wal and starts up all server threads.
1460   *
1461   * @param c Extra configuration.
1462   */
1463  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1464  throws IOException {
1465    try {
1466      boolean updateRootDir = false;
1467      for (NameStringPair e : c.getMapEntriesList()) {
1468        String key = e.getName();
1469        // The hostname the master sees us as.
1470        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1471          String hostnameFromMasterPOV = e.getValue();
1472          this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(),
1473              this.startcode);
1474          if (!StringUtils.isBlank(useThisHostnameInstead) &&
1475              !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1476            String msg = "Master passed us a different hostname to use; was=" +
1477                this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1478            LOG.error(msg);
1479            throw new IOException(msg);
1480          }
1481          if (StringUtils.isBlank(useThisHostnameInstead) &&
1482              !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1483            String msg = "Master passed us a different hostname to use; was=" +
1484                rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1485            LOG.error(msg);
1486          }
1487          continue;
1488        }
1489
1490        String value = e.getValue();
1491        if (key.equals(HConstants.HBASE_DIR)) {
1492          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1493            updateRootDir = true;
1494          }
1495        }
1496
1497        if (LOG.isDebugEnabled()) {
1498          LOG.debug("Config from master: " + key + "=" + value);
1499        }
1500        this.conf.set(key, value);
1501      }
1502      // Set our ephemeral znode up in zookeeper now we have a name.
1503      createMyEphemeralNode();
1504
1505      if (updateRootDir) {
1506        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1507        initializeFileSystem();
1508      }
1509
1510      // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1511      // config param for task trackers, but we can piggyback off of it.
1512      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1513        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1514      }
1515
1516      // Save it in a file, this will allow to see if we crash
1517      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1518
1519      // This call sets up an initialized replication and WAL. Later we start it up.
1520      setupWALAndReplication();
1521      // Init in here rather than in constructor after thread name has been set
1522      this.metricsRegionServer = new MetricsRegionServer(
1523          new MetricsRegionServerWrapperImpl(this), conf);
1524      this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1525      // Now that we have a metrics source, start the pause monitor
1526      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1527      pauseMonitor.start();
1528
1529      // There is a rare case where we do NOT want services to start. Check config.
1530      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1531        startServices();
1532      }
1533      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1534      // or make sense of it.
1535      startReplicationService();
1536
1537
1538      // Set up ZK
1539      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa +
1540          ", sessionid=0x" +
1541          Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1542
1543      // Wake up anyone waiting for this server to online
1544      synchronized (online) {
1545        online.set(true);
1546        online.notifyAll();
1547      }
1548    } catch (Throwable e) {
1549      stop("Failed initialization");
1550      throw convertThrowableToIOE(cleanup(e, "Failed init"),
1551          "Region server startup failed");
1552    } finally {
1553      sleeper.skipSleepCycle();
1554    }
1555  }
1556
1557  protected void initializeMemStoreChunkCreator() {
1558    if (MemStoreLAB.isEnabled(conf)) {
1559      // MSLAB is enabled. So initialize MemStoreChunkPool
1560      // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1561      // it.
1562      Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
1563      long globalMemStoreSize = pair.getFirst();
1564      boolean offheap = this.regionServerAccounting.isOffheap();
1565      // When off heap memstore in use, take full area for chunk pool.
1566      float poolSizePercentage = offheap? 1.0F:
1567          conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
1568      float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
1569          MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
1570      int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
1571      // init the chunkCreator
1572      ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
1573        initialCountPercentage, this.hMemManager);
1574    }
1575  }
1576
1577  private void startHeapMemoryManager() {
1578    this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this,
1579        this.regionServerAccounting);
1580    if (this.hMemManager != null) {
1581      this.hMemManager.start(getChoreService());
1582    }
1583  }
1584
1585  private void createMyEphemeralNode() throws KeeperException, IOException {
1586    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1587    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1588    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1589    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1590    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1591  }
1592
1593  private void deleteMyEphemeralNode() throws KeeperException {
1594    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1595  }
1596
1597  @Override
1598  public RegionServerAccounting getRegionServerAccounting() {
1599    return regionServerAccounting;
1600  }
1601
1602  /*
1603   * @param r Region to get RegionLoad for.
1604   * @param regionLoadBldr the RegionLoad.Builder, can be null
1605   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1606   * @return RegionLoad instance.
1607   *
1608   * @throws IOException
1609   */
1610  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1611      RegionSpecifier.Builder regionSpecifier) throws IOException {
1612    byte[] name = r.getRegionInfo().getRegionName();
1613    int stores = 0;
1614    int storefiles = 0;
1615    int storeUncompressedSizeMB = 0;
1616    int storefileSizeMB = 0;
1617    int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024);
1618    long storefileIndexSizeKB = 0;
1619    int rootLevelIndexSizeKB = 0;
1620    int totalStaticIndexSizeKB = 0;
1621    int totalStaticBloomSizeKB = 0;
1622    long totalCompactingKVs = 0;
1623    long currentCompactedKVs = 0;
1624    List<HStore> storeList = r.getStores();
1625    stores += storeList.size();
1626    for (HStore store : storeList) {
1627      storefiles += store.getStorefilesCount();
1628      storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1629      storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1630      //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1631      storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024;
1632      CompactionProgress progress = store.getCompactionProgress();
1633      if (progress != null) {
1634        totalCompactingKVs += progress.getTotalCompactingKVs();
1635        currentCompactedKVs += progress.currentCompactedKVs;
1636      }
1637      rootLevelIndexSizeKB += (int) (store.getStorefilesRootLevelIndexSize() / 1024);
1638      totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1639      totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1640    }
1641
1642    float dataLocality =
1643        r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1644    if (regionLoadBldr == null) {
1645      regionLoadBldr = RegionLoad.newBuilder();
1646    }
1647    if (regionSpecifier == null) {
1648      regionSpecifier = RegionSpecifier.newBuilder();
1649    }
1650    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1651    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1652    regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1653      .setStores(stores)
1654      .setStorefiles(storefiles)
1655      .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1656      .setStorefileSizeMB(storefileSizeMB)
1657      .setMemStoreSizeMB(memstoreSizeMB)
1658      .setStorefileIndexSizeKB(storefileIndexSizeKB)
1659      .setRootIndexSizeKB(rootLevelIndexSizeKB)
1660      .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1661      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1662      .setReadRequestsCount(r.getReadRequestsCount())
1663      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1664      .setWriteRequestsCount(r.getWriteRequestsCount())
1665      .setTotalCompactingKVs(totalCompactingKVs)
1666      .setCurrentCompactedKVs(currentCompactedKVs)
1667      .setDataLocality(dataLocality)
1668      .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1669    r.setCompleteSequenceId(regionLoadBldr);
1670
1671    return regionLoadBldr.build();
1672  }
1673
1674  /**
1675   * @param encodedRegionName
1676   * @return An instance of RegionLoad.
1677   */
1678  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1679    HRegion r = onlineRegions.get(encodedRegionName);
1680    return r != null ? createRegionLoad(r, null, null) : null;
1681  }
1682
1683  /*
1684   * Inner class that runs on a long period checking if regions need compaction.
1685   */
1686  private static class CompactionChecker extends ScheduledChore {
1687    private final HRegionServer instance;
1688    private final int majorCompactPriority;
1689    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1690    //Iteration is 1-based rather than 0-based so we don't check for compaction
1691    // immediately upon region server startup
1692    private long iteration = 1;
1693
1694    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1695      super("CompactionChecker", stopper, sleepTime);
1696      this.instance = h;
1697      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1698
1699      /* MajorCompactPriority is configurable.
1700       * If not set, the compaction will use default priority.
1701       */
1702      this.majorCompactPriority = this.instance.conf.
1703          getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1704              DEFAULT_PRIORITY);
1705    }
1706
1707    @Override
1708    protected void chore() {
1709      for (Region r : this.instance.onlineRegions.values()) {
1710        if (r == null) {
1711          continue;
1712        }
1713        HRegion hr = (HRegion) r;
1714        for (HStore s : hr.stores.values()) {
1715          try {
1716            long multiplier = s.getCompactionCheckMultiplier();
1717            assert multiplier > 0;
1718            if (iteration % multiplier != 0) {
1719              continue;
1720            }
1721            if (s.needsCompaction()) {
1722              // Queue a compaction. Will recognize if major is needed.
1723              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1724                getName() + " requests compaction");
1725            } else if (s.shouldPerformMajorCompaction()) {
1726              s.triggerMajorCompaction();
1727              if (majorCompactPriority == DEFAULT_PRIORITY ||
1728                  majorCompactPriority > hr.getCompactPriority()) {
1729                this.instance.compactSplitThread.requestCompaction(hr, s,
1730                    getName() + " requests major compaction; use default priority",
1731                    Store.NO_PRIORITY,
1732                CompactionLifeCycleTracker.DUMMY, null);
1733              } else {
1734                this.instance.compactSplitThread.requestCompaction(hr, s,
1735                    getName() + " requests major compaction; use configured priority",
1736                    this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1737              }
1738            }
1739          } catch (IOException e) {
1740            LOG.warn("Failed major compaction check on " + r, e);
1741          }
1742        }
1743      }
1744      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1745    }
1746  }
1747
1748  static class PeriodicMemStoreFlusher extends ScheduledChore {
1749    final HRegionServer server;
1750    final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
1751    final static int MIN_DELAY_TIME = 0; // millisec
1752    public PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1753      super("MemstoreFlusherChore", server, cacheFlushInterval);
1754      this.server = server;
1755    }
1756
1757    @Override
1758    protected void chore() {
1759      final StringBuilder whyFlush = new StringBuilder();
1760      for (HRegion r : this.server.onlineRegions.values()) {
1761        if (r == null) continue;
1762        if (r.shouldFlush(whyFlush)) {
1763          FlushRequester requester = server.getFlushRequester();
1764          if (requester != null) {
1765            long randomDelay = (long) RandomUtils.nextInt(0, RANGE_OF_DELAY) + MIN_DELAY_TIME;
1766            LOG.info(getName() + " requesting flush of " +
1767              r.getRegionInfo().getRegionNameAsString() + " because " +
1768              whyFlush.toString() +
1769              " after random delay " + randomDelay + "ms");
1770            //Throttle the flushes by putting a delay. If we don't throttle, and there
1771            //is a balanced write-load on the regions in a table, we might end up
1772            //overwhelming the filesystem with too many flushes at once.
1773            requester.requestDelayedFlush(r, randomDelay, false);
1774          }
1775        }
1776      }
1777    }
1778  }
1779
1780  /**
1781   * Report the status of the server. A server is online once all the startup is
1782   * completed (setting up filesystem, starting executorService threads, etc.). This
1783   * method is designed mostly to be useful in tests.
1784   *
1785   * @return true if online, false if not.
1786   */
1787  public boolean isOnline() {
1788    return online.get();
1789  }
1790
1791  /**
1792   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1793   * be hooked up to WAL.
1794   */
1795  private void setupWALAndReplication() throws IOException {
1796    WALFactory factory = new WALFactory(conf, serverName.toString());
1797
1798    // TODO Replication make assumptions here based on the default filesystem impl
1799    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1800    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1801
1802    Path logDir = new Path(walRootDir, logName);
1803    LOG.debug("logDir={}", logDir);
1804    if (this.walFs.exists(logDir)) {
1805      throw new RegionServerRunningException(
1806          "Region server has already created directory at " + this.serverName.toString());
1807    }
1808    // Instantiate replication if replication enabled. Pass it the log directories.
1809    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
1810      factory.getWALProvider());
1811    this.walFactory = factory;
1812  }
1813
1814  /**
1815   * Start up replication source and sink handlers.
1816   * @throws IOException
1817   */
1818  private void startReplicationService() throws IOException {
1819    if (this.replicationSourceHandler == this.replicationSinkHandler &&
1820        this.replicationSourceHandler != null) {
1821      this.replicationSourceHandler.startReplicationService();
1822    } else {
1823      if (this.replicationSourceHandler != null) {
1824        this.replicationSourceHandler.startReplicationService();
1825      }
1826      if (this.replicationSinkHandler != null) {
1827        this.replicationSinkHandler.startReplicationService();
1828      }
1829    }
1830  }
1831
1832
1833  public MetricsRegionServer getRegionServerMetrics() {
1834    return this.metricsRegionServer;
1835  }
1836
1837  /**
1838   * @return Master address tracker instance.
1839   */
1840  public MasterAddressTracker getMasterAddressTracker() {
1841    return this.masterAddressTracker;
1842  }
1843
1844  /*
1845   * Start maintenance Threads, Server, Worker and lease checker threads.
1846   * Start all threads we need to run. This is called after we've successfully
1847   * registered with the Master.
1848   * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1849   * get an unhandled exception. We cannot set the handler on all threads.
1850   * Server's internal Listener thread is off limits. For Server, if an OOME, it
1851   * waits a while then retries. Meantime, a flush or a compaction that tries to
1852   * run should trigger same critical condition and the shutdown will run. On
1853   * its way out, this server will shut down Server. Leases are sort of
1854   * inbetween. It has an internal thread that while it inherits from Chore, it
1855   * keeps its own internal stop mechanism so needs to be stopped by this
1856   * hosting server. Worker logs the exception and exits.
1857   */
1858  private void startServices() throws IOException {
1859    if (!isStopped() && !isAborted()) {
1860      initializeThreads();
1861    }
1862    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
1863    this.secureBulkLoadManager.start();
1864
1865    // Health checker thread.
1866    if (isHealthCheckerConfigured()) {
1867      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1868      HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1869      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1870    }
1871
1872    this.walRoller = new LogRoller(this, this);
1873    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1874
1875    // Create the CompactedFileDischarger chore executorService. This chore helps to
1876    // remove the compacted files that will no longer be used in reads.
1877    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1878    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1879    int cleanerInterval =
1880    conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1881    this.compactedFileDischarger =
1882    new CompactedHFilesDischarger(cleanerInterval, this, this);
1883    choreService.scheduleChore(compactedFileDischarger);
1884
1885    // Start executor services
1886    this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
1887        conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1888    this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
1889        conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1890    this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
1891        conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
1892    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1893        conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1894    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
1895        conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1896    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1897      this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1898          conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1899    }
1900    this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1901    "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1902    // Start the threads for compacted files discharger
1903    this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
1904        conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
1905    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1906      this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1907          conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1908              conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1909    }
1910
1911    Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1912    uncaughtExceptionHandler);
1913    this.cacheFlusher.start(uncaughtExceptionHandler);
1914
1915    if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1916    if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1917    if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1918    if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1919    if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1920    if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1921    if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore);
1922
1923    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1924    // an unhandled exception, it will just exit.
1925    Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1926    uncaughtExceptionHandler);
1927
1928    // Create the log splitting worker and start it
1929    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1930    // quite a while inside Connection layer. The worker won't be available for other
1931    // tasks even after current task is preempted after a split task times out.
1932    Configuration sinkConf = HBaseConfiguration.create(conf);
1933    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1934        conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1935    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1936        conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1937    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
1938    if (this.csm != null) {
1939      // SplitLogWorker needs csm. If none, don't start this.
1940      this.splitLogWorker = new SplitLogWorker(this, sinkConf, this,
1941          this, walFactory);
1942      splitLogWorker.start();
1943    } else {
1944      LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null");
1945    }
1946
1947    // Memstore services.
1948    startHeapMemoryManager();
1949    // Call it after starting HeapMemoryManager.
1950    initializeMemStoreChunkCreator();
1951  }
1952
1953  private void initializeThreads() throws IOException {
1954    // Cache flushing thread.
1955    this.cacheFlusher = new MemStoreFlusher(conf, this);
1956
1957    // Compaction thread
1958    this.compactSplitThread = new CompactSplit(this);
1959
1960    // Background thread to check for compactions; needed if region has not gotten updates
1961    // in a while. It will take care of not checking too frequently on store-by-store basis.
1962    this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
1963    this.periodicFlusher = new PeriodicMemStoreFlusher(this.threadWakeFrequency, this);
1964    this.leases = new Leases(this.threadWakeFrequency);
1965
1966    // Create the thread to clean the moved regions list
1967    movedRegionsCleaner = MovedRegionsCleaner.create(this);
1968
1969    if (this.nonceManager != null) {
1970      // Create the scheduled chore that cleans up nonces.
1971      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
1972    }
1973
1974    // Setup the Quota Manager
1975    rsQuotaManager = new RegionServerRpcQuotaManager(this);
1976    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
1977
1978    if (QuotaUtil.isQuotaEnabled(conf)) {
1979      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
1980    }
1981
1982
1983    boolean onlyMetaRefresh = false;
1984    int storefileRefreshPeriod = conf.getInt(
1985        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
1986        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
1987    if (storefileRefreshPeriod == 0) {
1988      storefileRefreshPeriod = conf.getInt(
1989          StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
1990          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
1991      onlyMetaRefresh = true;
1992    }
1993    if (storefileRefreshPeriod > 0) {
1994      this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
1995          onlyMetaRefresh, this, this);
1996    }
1997    registerConfigurationObservers();
1998  }
1999
2000  private void registerConfigurationObservers() {
2001    // Registering the compactSplitThread object with the ConfigurationManager.
2002    configurationManager.registerObserver(this.compactSplitThread);
2003    configurationManager.registerObserver(this.rpcServices);
2004    configurationManager.registerObserver(this);
2005  }
2006
2007  /**
2008   * Puts up the webui.
2009   * @return Returns final port -- maybe different from what we started with.
2010   * @throws IOException
2011   */
2012  private int putUpWebUI() throws IOException {
2013    int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2014      HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2015    String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
2016
2017    if(this instanceof HMaster) {
2018      port = conf.getInt(HConstants.MASTER_INFO_PORT,
2019          HConstants.DEFAULT_MASTER_INFOPORT);
2020      addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
2021    }
2022    // -1 is for disabling info server
2023    if (port < 0) return port;
2024
2025    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
2026      String msg =
2027          "Failed to start http info server. Address " + addr
2028              + " does not belong to this host. Correct configuration parameter: "
2029              + "hbase.regionserver.info.bindAddress";
2030      LOG.error(msg);
2031      throw new IOException(msg);
2032    }
2033    // check if auto port bind enabled
2034    boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
2035        false);
2036    while (true) {
2037      try {
2038        this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
2039        infoServer.addServlet("dump", "/dump", getDumpServlet());
2040        configureInfoServer();
2041        this.infoServer.start();
2042        break;
2043      } catch (BindException e) {
2044        if (!auto) {
2045          // auto bind disabled throw BindException
2046          LOG.error("Failed binding http info server to port: " + port);
2047          throw e;
2048        }
2049        // auto bind enabled, try to use another port
2050        LOG.info("Failed binding http info server to port: " + port);
2051        port++;
2052      }
2053    }
2054    port = this.infoServer.getPort();
2055    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
2056    int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
2057      HConstants.DEFAULT_MASTER_INFOPORT);
2058    conf.setInt("hbase.master.info.port.orig", masterInfoPort);
2059    conf.setInt(HConstants.MASTER_INFO_PORT, port);
2060    return port;
2061  }
2062
2063  /*
2064   * Verify that server is healthy
2065   */
2066  private boolean isHealthy() {
2067    if (!fsOk) {
2068      // File system problem
2069      return false;
2070    }
2071    // Verify that all threads are alive
2072    boolean healthy = (this.leases == null || this.leases.isAlive())
2073        && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2074        && (this.walRoller == null || this.walRoller.isAlive())
2075        && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2076        && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2077    if (!healthy) {
2078      stop("One or more threads are no longer alive -- stop");
2079    }
2080    return healthy;
2081  }
2082
2083  @Override
2084  public List<WAL> getWALs() throws IOException {
2085    return walFactory.getWALs();
2086  }
2087
2088  @Override
2089  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2090    WAL wal = walFactory.getWAL(regionInfo);
2091    if (this.walRoller != null) {
2092      this.walRoller.addWAL(wal);
2093    }
2094    return wal;
2095  }
2096
2097  public LogRoller getWalRoller() {
2098    return walRoller;
2099  }
2100
2101  @Override
2102  public Connection getConnection() {
2103    return getClusterConnection();
2104  }
2105
2106  @Override
2107  public ClusterConnection getClusterConnection() {
2108    return this.clusterConnection;
2109  }
2110
2111  @Override
2112  public MetaTableLocator getMetaTableLocator() {
2113    return this.metaTableLocator;
2114  }
2115
2116  @Override
2117  public void stop(final String msg) {
2118    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2119  }
2120
2121  /**
2122   * Stops the regionserver.
2123   * @param msg Status message
2124   * @param force True if this is a regionserver abort
2125   * @param user The user executing the stop request, or null if no user is associated
2126   */
2127  public void stop(final String msg, final boolean force, final User user) {
2128    if (!this.stopped) {
2129      LOG.info("***** STOPPING region server '" + this + "' *****");
2130      if (this.rsHost != null) {
2131        // when forced via abort don't allow CPs to override
2132        try {
2133          this.rsHost.preStop(msg, user);
2134        } catch (IOException ioe) {
2135          if (!force) {
2136            LOG.warn("The region server did not stop", ioe);
2137            return;
2138          }
2139          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2140        }
2141      }
2142      this.stopped = true;
2143      LOG.info("STOPPED: " + msg);
2144      // Wakes run() if it is sleeping
2145      sleeper.skipSleepCycle();
2146    }
2147  }
2148
2149  public void waitForServerOnline(){
2150    while (!isStopped() && !isOnline()) {
2151      synchronized (online) {
2152        try {
2153          online.wait(msgInterval);
2154        } catch (InterruptedException ie) {
2155          Thread.currentThread().interrupt();
2156          break;
2157        }
2158      }
2159    }
2160  }
2161
2162  @Override
2163  public void postOpenDeployTasks(final PostOpenDeployContext context)
2164      throws KeeperException, IOException {
2165    HRegion r = context.getRegion();
2166    long masterSystemTime = context.getMasterSystemTime();
2167    rpcServices.checkOpen();
2168    LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
2169    // Do checks to see if we need to compact (references or too many files)
2170    for (HStore s : r.stores.values()) {
2171      if (s.hasReferences() || s.needsCompaction()) {
2172        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2173      }
2174    }
2175    long openSeqNum = r.getOpenSeqNum();
2176    if (openSeqNum == HConstants.NO_SEQNUM) {
2177      // If we opened a region, we should have read some sequence number from it.
2178      LOG.error("No sequence number found when opening " +
2179        r.getRegionInfo().getRegionNameAsString());
2180      openSeqNum = 0;
2181    }
2182
2183    // Notify master
2184    if (!reportRegionStateTransition(new RegionStateTransitionContext(
2185        TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
2186      throw new IOException("Failed to report opened region to master: "
2187        + r.getRegionInfo().getRegionNameAsString());
2188    }
2189
2190    triggerFlushInPrimaryRegion(r);
2191
2192    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2193  }
2194
2195  @Override
2196  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2197    TransitionCode code = context.getCode();
2198    long openSeqNum = context.getOpenSeqNum();
2199    long masterSystemTime = context.getMasterSystemTime();
2200    RegionInfo[] hris = context.getHris();
2201
2202    if (TEST_SKIP_REPORTING_TRANSITION) {
2203      // This is for testing only in case there is no master
2204      // to handle the region transition report at all.
2205      if (code == TransitionCode.OPENED) {
2206        Preconditions.checkArgument(hris != null && hris.length == 1);
2207        if (hris[0].isMetaRegion()) {
2208          try {
2209            MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
2210                hris[0].getReplicaId(),State.OPEN);
2211          } catch (KeeperException e) {
2212            LOG.info("Failed to update meta location", e);
2213            return false;
2214          }
2215        } else {
2216          try {
2217            MetaTableAccessor.updateRegionLocation(clusterConnection,
2218              hris[0], serverName, openSeqNum, masterSystemTime);
2219          } catch (IOException e) {
2220            LOG.info("Failed to update meta", e);
2221            return false;
2222          }
2223        }
2224      }
2225      return true;
2226    }
2227
2228    ReportRegionStateTransitionRequest.Builder builder =
2229      ReportRegionStateTransitionRequest.newBuilder();
2230    builder.setServer(ProtobufUtil.toServerName(serverName));
2231    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2232    transition.setTransitionCode(code);
2233    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2234      transition.setOpenSeqNum(openSeqNum);
2235    }
2236    for (RegionInfo hri: hris) {
2237      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2238    }
2239    ReportRegionStateTransitionRequest request = builder.build();
2240    int tries = 0;
2241    long pauseTime = INIT_PAUSE_TIME_MS;
2242    // Keep looping till we get an error. We want to send reports even though server is going down.
2243    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2244    // HRegionServer does down.
2245    while (this.clusterConnection != null && !this.clusterConnection.isClosed()) {
2246      RegionServerStatusService.BlockingInterface rss = rssStub;
2247      try {
2248        if (rss == null) {
2249          createRegionServerStatusStub();
2250          continue;
2251        }
2252        ReportRegionStateTransitionResponse response =
2253          rss.reportRegionStateTransition(null, request);
2254        if (response.hasErrorMessage()) {
2255          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2256          break;
2257        }
2258        // Log if we had to retry else don't log unless TRACE. We want to
2259        // know if were successful after an attempt showed in logs as failed.
2260        if (tries > 0 || LOG.isTraceEnabled()) {
2261          LOG.info("TRANSITION REPORTED " + request);
2262        }
2263        // NOTE: Return mid-method!!!
2264        return true;
2265      } catch (ServiceException se) {
2266        IOException ioe = ProtobufUtil.getRemoteException(se);
2267        boolean pause = ioe instanceof ServerNotRunningYetException ||
2268            ioe instanceof PleaseHoldException;
2269        if (pause) {
2270          // Do backoff else we flood the Master with requests.
2271          pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
2272        } else {
2273          pauseTime = INIT_PAUSE_TIME_MS; // Reset.
2274        }
2275        LOG.info("Failed report transition " +
2276          TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" +
2277            (pause?
2278                " after " + pauseTime + "ms delay (Master is coming online...).":
2279                " immediately."),
2280            ioe);
2281        if (pause) Threads.sleep(pauseTime);
2282        tries++;
2283        if (rssStub == rss) {
2284          rssStub = null;
2285        }
2286      }
2287    }
2288    return false;
2289  }
2290
2291  /**
2292   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2293   * block this thread. See RegionReplicaFlushHandler for details.
2294   */
2295  void triggerFlushInPrimaryRegion(final HRegion region) {
2296    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2297      return;
2298    }
2299    if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2300        !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2301          region.conf)) {
2302      region.setReadsEnabled(true);
2303      return;
2304    }
2305
2306    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2307    // RegionReplicaFlushHandler might reset this.
2308
2309    // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2310    if (this.executorService != null) {
2311      this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
2312          rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2313    }
2314  }
2315
2316  @Override
2317  public RpcServerInterface getRpcServer() {
2318    return rpcServices.rpcServer;
2319  }
2320
2321  @VisibleForTesting
2322  public RSRpcServices getRSRpcServices() {
2323    return rpcServices;
2324  }
2325
2326  /**
2327   * Cause the server to exit without closing the regions it is serving, the log
2328   * it is using and without notifying the master. Used unit testing and on
2329   * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2330   *
2331   * @param reason
2332   *          the reason we are aborting
2333   * @param cause
2334   *          the exception that caused the abort, or null
2335   */
2336  @Override
2337  public void abort(String reason, Throwable cause) {
2338    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2339    if (cause != null) {
2340      LOG.error(HBaseMarkers.FATAL, msg, cause);
2341    } else {
2342      LOG.error(HBaseMarkers.FATAL, msg);
2343    }
2344    this.abortRequested = true;
2345    // HBASE-4014: show list of coprocessors that were loaded to help debug
2346    // regionserver crashes.Note that we're implicitly using
2347    // java.util.HashSet's toString() method to print the coprocessor names.
2348    LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " +
2349        CoprocessorHost.getLoadedCoprocessors());
2350    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2351    try {
2352      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2353    } catch (MalformedObjectNameException | IOException e) {
2354      LOG.warn("Failed dumping metrics", e);
2355    }
2356
2357    // Do our best to report our abort to the master, but this may not work
2358    try {
2359      if (cause != null) {
2360        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2361      }
2362      // Report to the master but only if we have already registered with the master.
2363      if (rssStub != null && this.serverName != null) {
2364        ReportRSFatalErrorRequest.Builder builder =
2365          ReportRSFatalErrorRequest.newBuilder();
2366        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2367        builder.setErrorMessage(msg);
2368        rssStub.reportRSFatalError(null, builder.build());
2369      }
2370    } catch (Throwable t) {
2371      LOG.warn("Unable to report fatal error to master", t);
2372    }
2373    // shutdown should be run as the internal user
2374    stop(reason, true, null);
2375  }
2376
2377  /**
2378   * @see HRegionServer#abort(String, Throwable)
2379   */
2380  public void abort(String reason) {
2381    abort(reason, null);
2382  }
2383
2384  @Override
2385  public boolean isAborted() {
2386    return this.abortRequested;
2387  }
2388
2389  /*
2390   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2391   * logs but it does close socket in case want to bring up server on old
2392   * hostname+port immediately.
2393   */
2394  @VisibleForTesting
2395  protected void kill() {
2396    this.killed = true;
2397    abort("Simulated kill");
2398  }
2399
2400  /**
2401   * Called on stop/abort before closing the cluster connection and meta locator.
2402   */
2403  protected void sendShutdownInterrupt() {
2404  }
2405
2406  /**
2407   * Wait on all threads to finish. Presumption is that all closes and stops
2408   * have already been called.
2409   */
2410  protected void stopServiceThreads() {
2411    // clean up the scheduled chores
2412    if (this.choreService != null) choreService.shutdown();
2413    if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2414    if (this.compactionChecker != null) compactionChecker.cancel(true);
2415    if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2416    if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2417    if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2418    if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2419    if (this.fsUtilizationChore != null) fsUtilizationChore.cancel(true);
2420
2421    if (this.cacheFlusher != null) {
2422      this.cacheFlusher.join();
2423    }
2424
2425    if (this.spanReceiverHost != null) {
2426      this.spanReceiverHost.closeReceivers();
2427    }
2428    if (this.walRoller != null) {
2429      this.walRoller.close();
2430    }
2431    if (this.compactSplitThread != null) {
2432      this.compactSplitThread.join();
2433    }
2434    if (this.executorService != null) this.executorService.shutdown();
2435    if (this.replicationSourceHandler != null &&
2436        this.replicationSourceHandler == this.replicationSinkHandler) {
2437      this.replicationSourceHandler.stopReplicationService();
2438    } else {
2439      if (this.replicationSourceHandler != null) {
2440        this.replicationSourceHandler.stopReplicationService();
2441      }
2442      if (this.replicationSinkHandler != null) {
2443        this.replicationSinkHandler.stopReplicationService();
2444      }
2445    }
2446  }
2447
2448  /**
2449   * @return Return the object that implements the replication
2450   * source executorService.
2451   */
2452  @VisibleForTesting
2453  public ReplicationSourceService getReplicationSourceService() {
2454    return replicationSourceHandler;
2455  }
2456
2457  /**
2458   * @return Return the object that implements the replication
2459   * sink executorService.
2460   */
2461  ReplicationSinkService getReplicationSinkService() {
2462    return replicationSinkHandler;
2463  }
2464
2465  /**
2466   * Get the current master from ZooKeeper and open the RPC connection to it.
2467   * To get a fresh connection, the current rssStub must be null.
2468   * Method will block until a master is available. You can break from this
2469   * block by requesting the server stop.
2470   *
2471   * @return master + port, or null if server has been stopped
2472   */
2473  @VisibleForTesting
2474  protected synchronized ServerName createRegionServerStatusStub() {
2475    // Create RS stub without refreshing the master node from ZK, use cached data
2476    return createRegionServerStatusStub(false);
2477  }
2478
2479  /**
2480   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2481   * connection, the current rssStub must be null. Method will block until a master is available.
2482   * You can break from this block by requesting the server stop.
2483   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2484   * @return master + port, or null if server has been stopped
2485   */
2486  @VisibleForTesting
2487  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2488    if (rssStub != null) {
2489      return masterAddressTracker.getMasterAddress();
2490    }
2491    ServerName sn = null;
2492    long previousLogTime = 0;
2493    RegionServerStatusService.BlockingInterface intRssStub = null;
2494    LockService.BlockingInterface intLockStub = null;
2495    boolean interrupted = false;
2496    try {
2497      while (keepLooping()) {
2498        sn = this.masterAddressTracker.getMasterAddress(refresh);
2499        if (sn == null) {
2500          if (!keepLooping()) {
2501            // give up with no connection.
2502            LOG.debug("No master found and cluster is stopped; bailing out");
2503            return null;
2504          }
2505          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2506            LOG.debug("No master found; retry");
2507            previousLogTime = System.currentTimeMillis();
2508          }
2509          refresh = true; // let's try pull it from ZK directly
2510          if (sleep(200)) {
2511            interrupted = true;
2512          }
2513          continue;
2514        }
2515
2516        // If we are on the active master, use the shortcut
2517        if (this instanceof HMaster && sn.equals(getServerName())) {
2518          intRssStub = ((HMaster)this).getMasterRpcServices();
2519          intLockStub = ((HMaster)this).getMasterRpcServices();
2520          break;
2521        }
2522        try {
2523          BlockingRpcChannel channel =
2524            this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2525              shortOperationTimeout);
2526          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2527          intLockStub = LockService.newBlockingStub(channel);
2528          break;
2529        } catch (IOException e) {
2530          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2531            e = e instanceof RemoteException ?
2532              ((RemoteException)e).unwrapRemoteException() : e;
2533            if (e instanceof ServerNotRunningYetException) {
2534              LOG.info("Master isn't available yet, retrying");
2535            } else {
2536              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2537            }
2538            previousLogTime = System.currentTimeMillis();
2539          }
2540          if (sleep(200)) {
2541            interrupted = true;
2542          }
2543        }
2544      }
2545    } finally {
2546      if (interrupted) {
2547        Thread.currentThread().interrupt();
2548      }
2549    }
2550    this.rssStub = intRssStub;
2551    this.lockStub = intLockStub;
2552    return sn;
2553  }
2554
2555  /**
2556   * @return True if we should break loop because cluster is going down or
2557   * this server has been stopped or hdfs has gone bad.
2558   */
2559  private boolean keepLooping() {
2560    return !this.stopped && isClusterUp();
2561  }
2562
2563  /*
2564   * Let the master know we're here Run initialization using parameters passed
2565   * us by the master.
2566   * @return A Map of key/value configurations we got from the Master else
2567   * null if we failed to register.
2568   * @throws IOException
2569   */
2570  private RegionServerStartupResponse reportForDuty() throws IOException {
2571    if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
2572    ServerName masterServerName = createRegionServerStatusStub(true);
2573    if (masterServerName == null) return null;
2574    RegionServerStartupResponse result = null;
2575    try {
2576      rpcServices.requestCount.reset();
2577      rpcServices.rpcGetRequestCount.reset();
2578      rpcServices.rpcScanRequestCount.reset();
2579      rpcServices.rpcMultiRequestCount.reset();
2580      rpcServices.rpcMutateRequestCount.reset();
2581      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2582        + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2583      long now = EnvironmentEdgeManager.currentTime();
2584      int port = rpcServices.isa.getPort();
2585      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2586      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2587        request.setUseThisHostnameInstead(useThisHostnameInstead);
2588      }
2589      request.setPort(port);
2590      request.setServerStartCode(this.startcode);
2591      request.setServerCurrentTime(now);
2592      result = this.rssStub.regionServerStartup(null, request.build());
2593    } catch (ServiceException se) {
2594      IOException ioe = ProtobufUtil.getRemoteException(se);
2595      if (ioe instanceof ClockOutOfSyncException) {
2596        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync",
2597            ioe);
2598        // Re-throw IOE will cause RS to abort
2599        throw ioe;
2600      } else if (ioe instanceof ServerNotRunningYetException) {
2601        LOG.debug("Master is not running yet");
2602      } else {
2603        LOG.warn("error telling master we are up", se);
2604      }
2605      rssStub = null;
2606    }
2607    return result;
2608  }
2609
2610  @Override
2611  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2612    try {
2613      GetLastFlushedSequenceIdRequest req =
2614          RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2615      RegionServerStatusService.BlockingInterface rss = rssStub;
2616      if (rss == null) { // Try to connect one more time
2617        createRegionServerStatusStub();
2618        rss = rssStub;
2619        if (rss == null) {
2620          // Still no luck, we tried
2621          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2622          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2623              .build();
2624        }
2625      }
2626      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2627      return RegionStoreSequenceIds.newBuilder()
2628          .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2629          .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2630    } catch (ServiceException e) {
2631      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2632      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2633          .build();
2634    }
2635  }
2636
2637  /**
2638   * Closes all regions.  Called on our way out.
2639   * Assumes that its not possible for new regions to be added to onlineRegions
2640   * while this method runs.
2641   */
2642  protected void closeAllRegions(final boolean abort) {
2643    closeUserRegions(abort);
2644    closeMetaTableRegions(abort);
2645  }
2646
2647  /**
2648   * Close meta region if we carry it
2649   * @param abort Whether we're running an abort.
2650   */
2651  void closeMetaTableRegions(final boolean abort) {
2652    HRegion meta = null;
2653    this.lock.writeLock().lock();
2654    try {
2655      for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2656        RegionInfo hri = e.getValue().getRegionInfo();
2657        if (hri.isMetaRegion()) {
2658          meta = e.getValue();
2659        }
2660        if (meta != null) break;
2661      }
2662    } finally {
2663      this.lock.writeLock().unlock();
2664    }
2665    if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2666  }
2667
2668  /**
2669   * Schedule closes on all user regions.
2670   * Should be safe calling multiple times because it wont' close regions
2671   * that are already closed or that are closing.
2672   * @param abort Whether we're running an abort.
2673   */
2674  void closeUserRegions(final boolean abort) {
2675    this.lock.writeLock().lock();
2676    try {
2677      for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2678        HRegion r = e.getValue();
2679        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2680          // Don't update zk with this close transition; pass false.
2681          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2682        }
2683      }
2684    } finally {
2685      this.lock.writeLock().unlock();
2686    }
2687  }
2688
2689  /** @return the info server */
2690  public InfoServer getInfoServer() {
2691    return infoServer;
2692  }
2693
2694  /**
2695   * @return true if a stop has been requested.
2696   */
2697  @Override
2698  public boolean isStopped() {
2699    return this.stopped;
2700  }
2701
2702  @Override
2703  public boolean isStopping() {
2704    return this.stopping;
2705  }
2706
2707  /**
2708   *
2709   * @return the configuration
2710   */
2711  @Override
2712  public Configuration getConfiguration() {
2713    return conf;
2714  }
2715
2716  /** @return the write lock for the server */
2717  ReentrantReadWriteLock.WriteLock getWriteLock() {
2718    return lock.writeLock();
2719  }
2720
2721  public int getNumberOfOnlineRegions() {
2722    return this.onlineRegions.size();
2723  }
2724
2725  boolean isOnlineRegionsEmpty() {
2726    return this.onlineRegions.isEmpty();
2727  }
2728
2729  /**
2730   * For tests, web ui and metrics.
2731   * This method will only work if HRegionServer is in the same JVM as client;
2732   * HRegion cannot be serialized to cross an rpc.
2733   */
2734  public Collection<HRegion> getOnlineRegionsLocalContext() {
2735    Collection<HRegion> regions = this.onlineRegions.values();
2736    return Collections.unmodifiableCollection(regions);
2737  }
2738
2739  @Override
2740  public void addRegion(HRegion region) {
2741    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2742    configurationManager.registerObserver(region);
2743  }
2744
2745  /**
2746   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2747   *   the biggest.  If two regions are the same size, then the last one found wins; i.e. this
2748   *   method may NOT return all regions.
2749   */
2750  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2751    // we'll sort the regions in reverse
2752    SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(
2753        new Comparator<Long>() {
2754          @Override
2755          public int compare(Long a, Long b) {
2756            return -1 * a.compareTo(b);
2757          }
2758        });
2759    // Copy over all regions. Regions are sorted by size with biggest first.
2760    for (HRegion region : this.onlineRegions.values()) {
2761      sortedRegions.put(region.getMemStoreOffHeapSize(), region);
2762    }
2763    return sortedRegions;
2764  }
2765
2766  /**
2767   * @return A new Map of online regions sorted by region heap size with the first entry being the
2768   *   biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2769   *   may NOT return all regions.
2770   */
2771  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2772    // we'll sort the regions in reverse
2773    SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(
2774        new Comparator<Long>() {
2775          @Override
2776          public int compare(Long a, Long b) {
2777            return -1 * a.compareTo(b);
2778          }
2779        });
2780    // Copy over all regions. Regions are sorted by size with biggest first.
2781    for (HRegion region : this.onlineRegions.values()) {
2782      sortedRegions.put(region.getMemStoreHeapSize(), region);
2783    }
2784    return sortedRegions;
2785  }
2786
2787  /**
2788   * @return time stamp in millis of when this region server was started
2789   */
2790  public long getStartcode() {
2791    return this.startcode;
2792  }
2793
2794  /** @return reference to FlushRequester */
2795  @Override
2796  public FlushRequester getFlushRequester() {
2797    return this.cacheFlusher;
2798  }
2799
2800  @Override
2801  public CompactionRequester getCompactionRequestor() {
2802    return this.compactSplitThread;
2803  }
2804
2805  /**
2806   * Get the top N most loaded regions this server is serving so we can tell the
2807   * master which regions it can reallocate if we're overloaded. TODO: actually
2808   * calculate which regions are most loaded. (Right now, we're just grabbing
2809   * the first N regions being served regardless of load.)
2810   */
2811  protected RegionInfo[] getMostLoadedRegions() {
2812    ArrayList<RegionInfo> regions = new ArrayList<>();
2813    for (Region r : onlineRegions.values()) {
2814      if (!r.isAvailable()) {
2815        continue;
2816      }
2817      if (regions.size() < numRegionsToReport) {
2818        regions.add(r.getRegionInfo());
2819      } else {
2820        break;
2821      }
2822    }
2823    return regions.toArray(new RegionInfo[regions.size()]);
2824  }
2825
2826  @Override
2827  public Leases getLeases() {
2828    return leases;
2829  }
2830
2831  /**
2832   * @return Return the rootDir.
2833   */
2834  protected Path getRootDir() {
2835    return rootDir;
2836  }
2837
2838  /**
2839   * @return Return the fs.
2840   */
2841  @Override
2842  public FileSystem getFileSystem() {
2843    return fs;
2844  }
2845
2846  /**
2847   * @return Return the walRootDir.
2848   */
2849  public Path getWALRootDir() {
2850    return walRootDir;
2851  }
2852
2853  /**
2854   * @return Return the walFs.
2855   */
2856  public FileSystem getWALFileSystem() {
2857    return walFs;
2858  }
2859
2860  @Override
2861  public String toString() {
2862    return getServerName().toString();
2863  }
2864
2865  /**
2866   * Interval at which threads should run
2867   *
2868   * @return the interval
2869   */
2870  public int getThreadWakeFrequency() {
2871    return threadWakeFrequency;
2872  }
2873
2874  @Override
2875  public ZKWatcher getZooKeeper() {
2876    return zooKeeper;
2877  }
2878
2879  @Override
2880  public CoordinatedStateManager getCoordinatedStateManager() {
2881    return csm;
2882  }
2883
2884  @Override
2885  public ServerName getServerName() {
2886    return serverName;
2887  }
2888
2889  public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2890    return this.rsHost;
2891  }
2892
2893  @Override
2894  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2895    return this.regionsInTransitionInRS;
2896  }
2897
2898  @Override
2899  public ExecutorService getExecutorService() {
2900    return executorService;
2901  }
2902
2903  @Override
2904  public ChoreService getChoreService() {
2905    return choreService;
2906  }
2907
2908  @Override
2909  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2910    return rsQuotaManager;
2911  }
2912
2913  //
2914  // Main program and support routines
2915  //
2916  /**
2917   * Load the replication executorService objects, if any
2918   */
2919  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2920      FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
2921    if ((server instanceof HMaster) &&
2922      (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
2923      return;
2924    }
2925
2926    // read in the name of the source replication class from the config file.
2927    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2928      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2929
2930    // read in the name of the sink replication class from the config file.
2931    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2932      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2933
2934    // If both the sink and the source class names are the same, then instantiate
2935    // only one object.
2936    if (sourceClassname.equals(sinkClassname)) {
2937      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2938        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
2939      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2940    } else {
2941      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2942        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
2943      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2944        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
2945    }
2946  }
2947
2948  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2949      Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2950      Path oldLogDir, WALProvider walProvider) throws IOException {
2951    Class<? extends T> clazz = null;
2952    try {
2953      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2954      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2955    } catch (java.lang.ClassNotFoundException nfe) {
2956      throw new IOException("Could not find class for " + classname);
2957    }
2958    T service = ReflectionUtils.newInstance(clazz, conf);
2959    service.initialize(server, walFs, logDir, oldLogDir, walProvider);
2960    return service;
2961  }
2962
2963  /**
2964   * Utility for constructing an instance of the passed HRegionServer class.
2965   *
2966   * @param regionServerClass
2967   * @param conf2
2968   * @return HRegionServer instance.
2969   */
2970  public static HRegionServer constructRegionServer(
2971      Class<? extends HRegionServer> regionServerClass,
2972      final Configuration conf2) {
2973    try {
2974      Constructor<? extends HRegionServer> c = regionServerClass
2975          .getConstructor(Configuration.class);
2976      return c.newInstance(conf2);
2977    } catch (Exception e) {
2978      throw new RuntimeException("Failed construction of " + "Regionserver: "
2979          + regionServerClass.toString(), e);
2980    }
2981  }
2982
2983  /**
2984   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2985   */
2986  public static void main(String[] args) throws Exception {
2987    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2988    VersionInfo.logVersion();
2989    Configuration conf = HBaseConfiguration.create();
2990    @SuppressWarnings("unchecked")
2991    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2992        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2993
2994    new HRegionServerCommandLine(regionServerClass).doMain(args);
2995  }
2996
2997  /**
2998   * Gets the online regions of the specified table.
2999   * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
3000   * Only returns <em>online</em> regions.  If a region on this table has been
3001   * closed during a disable, etc., it will not be included in the returned list.
3002   * So, the returned list may not necessarily be ALL regions in this table, its
3003   * all the ONLINE regions in the table.
3004   * @param tableName
3005   * @return Online regions from <code>tableName</code>
3006   */
3007  @Override
3008  public List<HRegion> getRegions(TableName tableName) {
3009     List<HRegion> tableRegions = new ArrayList<>();
3010     synchronized (this.onlineRegions) {
3011       for (HRegion region: this.onlineRegions.values()) {
3012         RegionInfo regionInfo = region.getRegionInfo();
3013         if(regionInfo.getTable().equals(tableName)) {
3014           tableRegions.add(region);
3015         }
3016       }
3017     }
3018     return tableRegions;
3019   }
3020
3021  @Override
3022  public List<HRegion> getRegions() {
3023    List<HRegion> allRegions = new ArrayList<>();
3024    synchronized (this.onlineRegions) {
3025      // Return a clone copy of the onlineRegions
3026      allRegions.addAll(onlineRegions.values());
3027    }
3028    return allRegions;
3029  }
3030
3031  /**
3032   * Gets the online tables in this RS.
3033   * This method looks at the in-memory onlineRegions.
3034   * @return all the online tables in this RS
3035   */
3036  public Set<TableName> getOnlineTables() {
3037    Set<TableName> tables = new HashSet<>();
3038    synchronized (this.onlineRegions) {
3039      for (Region region: this.onlineRegions.values()) {
3040        tables.add(region.getTableDescriptor().getTableName());
3041      }
3042    }
3043    return tables;
3044  }
3045
3046  // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
3047  public String[] getRegionServerCoprocessors() {
3048    TreeSet<String> coprocessors = new TreeSet<>();
3049    try {
3050      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3051    } catch (IOException exception) {
3052      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
3053          "skipping.");
3054      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3055    }
3056    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3057    for (HRegion region: regions) {
3058      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3059      try {
3060        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3061      } catch (IOException exception) {
3062        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
3063            "; skipping.");
3064        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3065      }
3066    }
3067    coprocessors.addAll(rsHost.getCoprocessors());
3068    return coprocessors.toArray(new String[coprocessors.size()]);
3069  }
3070
3071  /**
3072   * Try to close the region, logs a warning on failure but continues.
3073   * @param region Region to close
3074   */
3075  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3076    try {
3077      if (!closeRegion(region.getEncodedName(), abort, null)) {
3078        LOG.warn("Failed to close " + region.getRegionNameAsString() +
3079            " - ignoring and continuing");
3080      }
3081    } catch (IOException e) {
3082      LOG.warn("Failed to close " + region.getRegionNameAsString() +
3083          " - ignoring and continuing", e);
3084    }
3085  }
3086
3087  /**
3088   * Close asynchronously a region, can be called from the master or internally by the regionserver
3089   * when stopping. If called from the master, the region will update the znode status.
3090   *
3091   * <p>
3092   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3093   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3094   * </p>
3095
3096   * <p>
3097   *   If a close was in progress, this new request will be ignored, and an exception thrown.
3098   * </p>
3099   *
3100   * @param encodedName Region to close
3101   * @param abort True if we are aborting
3102   * @return True if closed a region.
3103   * @throws NotServingRegionException if the region is not online
3104   */
3105  protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn)
3106      throws NotServingRegionException {
3107    //Check for permissions to close.
3108    HRegion actualRegion = this.getRegion(encodedName);
3109    // Can be null if we're calling close on a region that's not online
3110    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3111      try {
3112        actualRegion.getCoprocessorHost().preClose(false);
3113      } catch (IOException exp) {
3114        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3115        return false;
3116      }
3117    }
3118
3119    final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName),
3120        Boolean.FALSE);
3121
3122    if (Boolean.TRUE.equals(previous)) {
3123      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
3124          "trying to OPEN. Cancelling OPENING.");
3125      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3126        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3127        // We're going to try to do a standard close then.
3128        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
3129            " Doing a standard close now");
3130        return closeRegion(encodedName, abort, sn);
3131      }
3132      // Let's get the region from the online region list again
3133      actualRegion = this.getRegion(encodedName);
3134      if (actualRegion == null) { // If already online, we still need to close it.
3135        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3136        // The master deletes the znode when it receives this exception.
3137        throw new NotServingRegionException("The region " + encodedName +
3138          " was opening but not yet served. Opening is cancelled.");
3139      }
3140    } else if (Boolean.FALSE.equals(previous)) {
3141      LOG.info("Received CLOSE for the region: " + encodedName +
3142        ", which we are already trying to CLOSE, but not completed yet");
3143      return true;
3144    }
3145
3146    if (actualRegion == null) {
3147      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3148      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3149      // The master deletes the znode when it receives this exception.
3150      throw new NotServingRegionException("The region " + encodedName +
3151          " is not online, and is not opening.");
3152    }
3153
3154    CloseRegionHandler crh;
3155    final RegionInfo hri = actualRegion.getRegionInfo();
3156    if (hri.isMetaRegion()) {
3157      crh = new CloseMetaHandler(this, this, hri, abort);
3158    } else {
3159      crh = new CloseRegionHandler(this, this, hri, abort, sn);
3160    }
3161    this.executorService.submit(crh);
3162    return true;
3163  }
3164
3165  /**
3166   * Close and offline the region for split or merge
3167   *
3168   * @param regionEncodedName the name of the region(s) to close
3169   * @return true if closed the region successfully.
3170   * @throws IOException
3171  */
3172  protected boolean closeAndOfflineRegionForSplitOrMerge(final List<String> regionEncodedName)
3173      throws IOException {
3174    for (int i = 0; i < regionEncodedName.size(); ++i) {
3175      HRegion regionToClose = this.getRegion(regionEncodedName.get(i));
3176      if (regionToClose != null) {
3177        Map<byte[], List<HStoreFile>> hstoreFiles = null;
3178        Exception exceptionToThrow = null;
3179        try {
3180          hstoreFiles = regionToClose.close(false);
3181        } catch (Exception e) {
3182          exceptionToThrow = e;
3183        }
3184        if (exceptionToThrow == null && hstoreFiles == null) {
3185          // The region was closed by someone else
3186          exceptionToThrow =
3187            new IOException("Failed to close region: already closed by another thread");
3188        }
3189        if (exceptionToThrow != null) {
3190          if (exceptionToThrow instanceof IOException) {
3191            throw (IOException) exceptionToThrow;
3192          }
3193          throw new IOException(exceptionToThrow);
3194        }
3195        // Offline the region
3196        this.removeRegion(regionToClose, null);
3197      }
3198    }
3199    return true;
3200  }
3201
3202   /**
3203   * @return HRegion for the passed binary <code>regionName</code> or null if
3204   *         named region is not member of the online regions.
3205   */
3206  public HRegion getOnlineRegion(final byte[] regionName) {
3207    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3208    return this.onlineRegions.get(encodedRegionName);
3209  }
3210
3211  public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
3212    return this.regionFavoredNodesMap.get(encodedRegionName);
3213  }
3214
3215  @Override
3216  public HRegion getRegion(final String encodedRegionName) {
3217    return this.onlineRegions.get(encodedRegionName);
3218  }
3219
3220
3221  @Override
3222  public boolean removeRegion(final HRegion r, ServerName destination) {
3223    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3224    if (destination != null) {
3225      long closeSeqNum = r.getMaxFlushedSeqId();
3226      if (closeSeqNum == HConstants.NO_SEQNUM) {
3227        // No edits in WAL for this region; get the sequence number when the region was opened.
3228        closeSeqNum = r.getOpenSeqNum();
3229        if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
3230      }
3231      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
3232    }
3233    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3234    return toReturn != null;
3235  }
3236
3237  /**
3238   * Protected Utility method for safely obtaining an HRegion handle.
3239   *
3240   * @param regionName
3241   *          Name of online {@link HRegion} to return
3242   * @return {@link HRegion} for <code>regionName</code>
3243   * @throws NotServingRegionException
3244   */
3245  protected HRegion getRegion(final byte[] regionName)
3246      throws NotServingRegionException {
3247    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3248    return getRegionByEncodedName(regionName, encodedRegionName);
3249  }
3250
3251  public HRegion getRegionByEncodedName(String encodedRegionName)
3252      throws NotServingRegionException {
3253    return getRegionByEncodedName(null, encodedRegionName);
3254  }
3255
3256  protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3257    throws NotServingRegionException {
3258    HRegion region = this.onlineRegions.get(encodedRegionName);
3259    if (region == null) {
3260      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3261      if (moveInfo != null) {
3262        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3263      }
3264      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3265      String regionNameStr = regionName == null?
3266        encodedRegionName: Bytes.toStringBinary(regionName);
3267      if (isOpening != null && isOpening.booleanValue()) {
3268        throw new RegionOpeningException("Region " + regionNameStr +
3269          " is opening on " + this.serverName);
3270      }
3271      throw new NotServingRegionException("" + regionNameStr +
3272        " is not online on " + this.serverName);
3273    }
3274    return region;
3275  }
3276
3277  /*
3278   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3279   * IOE if it isn't already.
3280   *
3281   * @param t Throwable
3282   *
3283   * @param msg Message to log in error. Can be null.
3284   *
3285   * @return Throwable converted to an IOE; methods can only let out IOEs.
3286   */
3287  private Throwable cleanup(final Throwable t, final String msg) {
3288    // Don't log as error if NSRE; NSRE is 'normal' operation.
3289    if (t instanceof NotServingRegionException) {
3290      LOG.debug("NotServingRegionException; " + t.getMessage());
3291      return t;
3292    }
3293    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3294    if (msg == null) {
3295      LOG.error("", e);
3296    } else {
3297      LOG.error(msg, e);
3298    }
3299    if (!rpcServices.checkOOME(t)) {
3300      checkFileSystem();
3301    }
3302    return t;
3303  }
3304
3305  /*
3306   * @param t
3307   *
3308   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3309   *
3310   * @return Make <code>t</code> an IOE if it isn't already.
3311   */
3312  protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
3313    return (t instanceof IOException ? (IOException) t : msg == null
3314        || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3315  }
3316
3317  /**
3318   * Checks to see if the file system is still accessible. If not, sets
3319   * abortRequested and stopRequested
3320   *
3321   * @return false if file system is not available
3322   */
3323  public boolean checkFileSystem() {
3324    if (this.fsOk && this.fs != null) {
3325      try {
3326        FSUtils.checkFileSystemAvailable(this.fs);
3327      } catch (IOException e) {
3328        abort("File System not available", e);
3329        this.fsOk = false;
3330      }
3331    }
3332    return this.fsOk;
3333  }
3334
3335  @Override
3336  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3337      List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3338    InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3339    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3340    // it is a map of region name to InetSocketAddress[]
3341    for (int i = 0; i < favoredNodes.size(); i++) {
3342      addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3343          favoredNodes.get(i).getPort());
3344    }
3345    regionFavoredNodesMap.put(encodedRegionName, addr);
3346  }
3347
3348  /**
3349   * Return the favored nodes for a region given its encoded name. Look at the
3350   * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3351   * @param encodedRegionName
3352   * @return array of favored locations
3353   */
3354  @Override
3355  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3356    return regionFavoredNodesMap.get(encodedRegionName);
3357  }
3358
3359  @Override
3360  public ServerNonceManager getNonceManager() {
3361    return this.nonceManager;
3362  }
3363
3364  private static class MovedRegionInfo {
3365    private final ServerName serverName;
3366    private final long seqNum;
3367    private final long ts;
3368
3369    public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3370      this.serverName = serverName;
3371      this.seqNum = closeSeqNum;
3372      ts = EnvironmentEdgeManager.currentTime();
3373     }
3374
3375    public ServerName getServerName() {
3376      return serverName;
3377    }
3378
3379    public long getSeqNum() {
3380      return seqNum;
3381    }
3382
3383    public long getMoveTime() {
3384      return ts;
3385    }
3386  }
3387
3388  // This map will contains all the regions that we closed for a move.
3389  // We add the time it was moved as we don't want to keep too old information
3390  protected Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000);
3391
3392  // We need a timeout. If not there is a risk of giving a wrong information: this would double
3393  //  the number of network calls instead of reducing them.
3394  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3395
3396  protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3397    if (ServerName.isSameAddress(destination, this.getServerName())) {
3398      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3399      return;
3400    }
3401    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" +
3402        closeSeqNum);
3403    movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3404  }
3405
3406  void removeFromMovedRegions(String encodedName) {
3407    movedRegions.remove(encodedName);
3408  }
3409
3410  private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3411    MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3412
3413    long now = EnvironmentEdgeManager.currentTime();
3414    if (dest != null) {
3415      if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3416        return dest;
3417      } else {
3418        movedRegions.remove(encodedRegionName);
3419      }
3420    }
3421
3422    return null;
3423  }
3424
3425  /**
3426   * Remove the expired entries from the moved regions list.
3427   */
3428  protected void cleanMovedRegions() {
3429    final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3430    Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3431
3432    while (it.hasNext()){
3433      Map.Entry<String, MovedRegionInfo> e = it.next();
3434      if (e.getValue().getMoveTime() < cutOff) {
3435        it.remove();
3436      }
3437    }
3438  }
3439
3440  /*
3441   * Use this to allow tests to override and schedule more frequently.
3442   */
3443
3444  protected int movedRegionCleanerPeriod() {
3445        return TIMEOUT_REGION_MOVED;
3446  }
3447
3448  /**
3449   * Creates a Chore thread to clean the moved region cache.
3450   */
3451  protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3452    private HRegionServer regionServer;
3453    Stoppable stoppable;
3454
3455    private MovedRegionsCleaner(
3456      HRegionServer regionServer, Stoppable stoppable){
3457      super("MovedRegionsCleaner for region " + regionServer, stoppable,
3458          regionServer.movedRegionCleanerPeriod());
3459      this.regionServer = regionServer;
3460      this.stoppable = stoppable;
3461    }
3462
3463    static MovedRegionsCleaner create(HRegionServer rs){
3464      Stoppable stoppable = new Stoppable() {
3465        private volatile boolean isStopped = false;
3466        @Override public void stop(String why) { isStopped = true;}
3467        @Override public boolean isStopped() {return isStopped;}
3468      };
3469
3470      return new MovedRegionsCleaner(rs, stoppable);
3471    }
3472
3473    @Override
3474    protected void chore() {
3475      regionServer.cleanMovedRegions();
3476    }
3477
3478    @Override
3479    public void stop(String why) {
3480      stoppable.stop(why);
3481    }
3482
3483    @Override
3484    public boolean isStopped() {
3485      return stoppable.isStopped();
3486    }
3487  }
3488
3489  private String getMyEphemeralNodePath() {
3490    return ZNodePaths.joinZNode(this.zooKeeper.znodePaths.rsZNode, getServerName().toString());
3491  }
3492
3493  private boolean isHealthCheckerConfigured() {
3494    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3495    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3496  }
3497
3498  /**
3499   * @return the underlying {@link CompactSplit} for the servers
3500   */
3501  public CompactSplit getCompactSplitThread() {
3502    return this.compactSplitThread;
3503  }
3504
3505  public CoprocessorServiceResponse execRegionServerService(
3506      @SuppressWarnings("UnusedParameters") final RpcController controller,
3507      final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3508    try {
3509      ServerRpcController serviceController = new ServerRpcController();
3510      CoprocessorServiceCall call = serviceRequest.getCall();
3511      String serviceName = call.getServiceName();
3512      com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);
3513      if (service == null) {
3514        throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
3515            serviceName);
3516      }
3517      com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
3518          service.getDescriptorForType();
3519
3520      String methodName = call.getMethodName();
3521      com.google.protobuf.Descriptors.MethodDescriptor methodDesc =
3522          serviceDesc.findMethodByName(methodName);
3523      if (methodDesc == null) {
3524        throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName +
3525            " called on executorService " + serviceName);
3526      }
3527
3528      com.google.protobuf.Message request =
3529          CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3530      final com.google.protobuf.Message.Builder responseBuilder =
3531          service.getResponsePrototype(methodDesc).newBuilderForType();
3532      service.callMethod(methodDesc, serviceController, request,
3533          new com.google.protobuf.RpcCallback<com.google.protobuf.Message>() {
3534        @Override
3535        public void run(com.google.protobuf.Message message) {
3536          if (message != null) {
3537            responseBuilder.mergeFrom(message);
3538          }
3539        }
3540      });
3541      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3542      if (exception != null) {
3543        throw exception;
3544      }
3545      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3546    } catch (IOException ie) {
3547      throw new ServiceException(ie);
3548    }
3549  }
3550
3551  /**
3552   * @return The cache config instance used by the regionserver.
3553   */
3554  public CacheConfig getCacheConfig() {
3555    return this.cacheConfig;
3556  }
3557
3558  /**
3559   * @return : Returns the ConfigurationManager object for testing purposes.
3560   */
3561  protected ConfigurationManager getConfigurationManager() {
3562    return configurationManager;
3563  }
3564
3565  /**
3566   * @return Return table descriptors implementation.
3567   */
3568  public TableDescriptors getTableDescriptors() {
3569    return this.tableDescriptors;
3570  }
3571
3572  /**
3573   * Reload the configuration from disk.
3574   */
3575  public void updateConfiguration() {
3576    LOG.info("Reloading the configuration from disk.");
3577    // Reload the configuration from disk.
3578    conf.reloadConfiguration();
3579    configurationManager.notifyAllObservers(conf);
3580  }
3581
3582  public CacheEvictionStats clearRegionBlockCache(Region region) {
3583    BlockCache blockCache = this.getCacheConfig().getBlockCache();
3584    long evictedBlocks = 0;
3585
3586    for(Store store : region.getStores()) {
3587      for(StoreFile hFile : store.getStorefiles()) {
3588        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3589      }
3590    }
3591
3592    return CacheEvictionStats.builder()
3593        .withEvictedBlocks(evictedBlocks)
3594        .build();
3595  }
3596
3597  @Override
3598  public double getCompactionPressure() {
3599    double max = 0;
3600    for (Region region : onlineRegions.values()) {
3601      for (Store store : region.getStores()) {
3602        double normCount = store.getCompactionPressure();
3603        if (normCount > max) {
3604          max = normCount;
3605        }
3606      }
3607    }
3608    return max;
3609  }
3610
3611  @Override
3612  public HeapMemoryManager getHeapMemoryManager() {
3613    return hMemManager;
3614  }
3615
3616  /**
3617   * For testing
3618   * @return whether all wal roll request finished for this regionserver
3619   */
3620  @VisibleForTesting
3621  public boolean walRollRequestFinished() {
3622    return this.walRoller.walRollFinished();
3623  }
3624
3625  @Override
3626  public ThroughputController getFlushThroughputController() {
3627    return flushThroughputController;
3628  }
3629
3630  @Override
3631  public double getFlushPressure() {
3632    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3633      // return 0 during RS initialization
3634      return 0.0;
3635    }
3636    return getRegionServerAccounting().getFlushPressure();
3637  }
3638
3639  @Override
3640  public void onConfigurationChange(Configuration newConf) {
3641    ThroughputController old = this.flushThroughputController;
3642    if (old != null) {
3643      old.stop("configuration change");
3644    }
3645    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3646  }
3647
3648  @Override
3649  public MetricsRegionServer getMetrics() {
3650    return metricsRegionServer;
3651  }
3652
3653  @Override
3654  public SecureBulkLoadManager getSecureBulkLoadManager() {
3655    return this.secureBulkLoadManager;
3656  }
3657
3658  @Override
3659  public EntityLock regionLock(List<RegionInfo> regionInfos, String description,
3660      Abortable abort) throws IOException {
3661    return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
3662      .regionLock(regionInfos, description, abort);
3663  }
3664
3665  @Override
3666  public void unassign(byte[] regionName) throws IOException {
3667    clusterConnection.getAdmin().unassign(regionName, false);
3668  }
3669
3670  @Override
3671  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3672    return this.rsSpaceQuotaManager;
3673  }
3674
3675  public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
3676    return eventLoopGroupConfig;
3677  }
3678
3679  @Override
3680  public Connection createConnection(Configuration conf) throws IOException {
3681    User user = UserProvider.instantiate(conf).getCurrent();
3682    return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName,
3683        this.rpcServices, this.rpcServices);
3684  }
3685
3686  /**
3687   * Force to terminate region server when abort timeout.
3688   */
3689  private static class SystemExitWhenAbortTimeout extends TimerTask {
3690
3691    public SystemExitWhenAbortTimeout() {
3692
3693    }
3694
3695    @Override
3696    public void run() {
3697      LOG.warn("Aborting region server timed out, terminating forcibly" +
3698          " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3699          " Thread dump to stdout.");
3700      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3701      Runtime.getRuntime().halt(1);
3702    }
3703  }
3704}