001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.lang.Thread.UncaughtExceptionHandler;
022import java.lang.management.MemoryType;
023import java.lang.management.MemoryUsage;
024import java.lang.reflect.Constructor;
025import java.net.BindException;
026import java.net.InetAddress;
027import java.net.InetSocketAddress;
028import java.time.Duration;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.Comparator;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.Objects;
039import java.util.Set;
040import java.util.SortedMap;
041import java.util.Timer;
042import java.util.TimerTask;
043import java.util.TreeMap;
044import java.util.TreeSet;
045import java.util.concurrent.ConcurrentHashMap;
046import java.util.concurrent.ConcurrentMap;
047import java.util.concurrent.ConcurrentSkipListMap;
048import java.util.concurrent.atomic.AtomicBoolean;
049import java.util.concurrent.locks.ReentrantReadWriteLock;
050import java.util.function.Function;
051import javax.management.MalformedObjectNameException;
052import javax.servlet.http.HttpServlet;
053import org.apache.commons.lang3.RandomUtils;
054import org.apache.commons.lang3.StringUtils;
055import org.apache.commons.lang3.SystemUtils;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.hbase.Abortable;
060import org.apache.hadoop.hbase.CacheEvictionStats;
061import org.apache.hadoop.hbase.CallQueueTooBigException;
062import org.apache.hadoop.hbase.ChoreService;
063import org.apache.hadoop.hbase.ClockOutOfSyncException;
064import org.apache.hadoop.hbase.CoordinatedStateManager;
065import org.apache.hadoop.hbase.DoNotRetryIOException;
066import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
067import org.apache.hadoop.hbase.HBaseConfiguration;
068import org.apache.hadoop.hbase.HBaseInterfaceAudience;
069import org.apache.hadoop.hbase.HConstants;
070import org.apache.hadoop.hbase.HealthCheckChore;
071import org.apache.hadoop.hbase.MetaTableAccessor;
072import org.apache.hadoop.hbase.NotServingRegionException;
073import org.apache.hadoop.hbase.PleaseHoldException;
074import org.apache.hadoop.hbase.ScheduledChore;
075import org.apache.hadoop.hbase.ServerName;
076import org.apache.hadoop.hbase.Stoppable;
077import org.apache.hadoop.hbase.TableDescriptors;
078import org.apache.hadoop.hbase.TableName;
079import org.apache.hadoop.hbase.YouAreDeadException;
080import org.apache.hadoop.hbase.ZNodeClearer;
081import org.apache.hadoop.hbase.client.ClusterConnection;
082import org.apache.hadoop.hbase.client.Connection;
083import org.apache.hadoop.hbase.client.ConnectionUtils;
084import org.apache.hadoop.hbase.client.RegionInfo;
085import org.apache.hadoop.hbase.client.RegionInfoBuilder;
086import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
087import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
088import org.apache.hadoop.hbase.client.locking.EntityLock;
089import org.apache.hadoop.hbase.client.locking.LockServiceClient;
090import org.apache.hadoop.hbase.conf.ConfigurationManager;
091import org.apache.hadoop.hbase.conf.ConfigurationObserver;
092import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
093import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
094import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
095import org.apache.hadoop.hbase.exceptions.RegionMovedException;
096import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
097import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
098import org.apache.hadoop.hbase.executor.ExecutorService;
099import org.apache.hadoop.hbase.executor.ExecutorType;
100import org.apache.hadoop.hbase.fs.HFileSystem;
101import org.apache.hadoop.hbase.http.InfoServer;
102import org.apache.hadoop.hbase.io.hfile.BlockCache;
103import org.apache.hadoop.hbase.io.hfile.CacheConfig;
104import org.apache.hadoop.hbase.io.hfile.HFile;
105import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
106import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
107import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
108import org.apache.hadoop.hbase.ipc.RpcClient;
109import org.apache.hadoop.hbase.ipc.RpcClientFactory;
110import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
111import org.apache.hadoop.hbase.ipc.RpcServer;
112import org.apache.hadoop.hbase.ipc.RpcServerInterface;
113import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
114import org.apache.hadoop.hbase.ipc.ServerRpcController;
115import org.apache.hadoop.hbase.log.HBaseMarkers;
116import org.apache.hadoop.hbase.master.HMaster;
117import org.apache.hadoop.hbase.master.LoadBalancer;
118import org.apache.hadoop.hbase.master.RegionState.State;
119import org.apache.hadoop.hbase.mob.MobCacheConfig;
120import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
121import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
122import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
123import org.apache.hadoop.hbase.quotas.QuotaUtil;
124import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
125import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
126import org.apache.hadoop.hbase.quotas.RegionSize;
127import org.apache.hadoop.hbase.quotas.RegionSizeStore;
128import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
129import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
130import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
131import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
132import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
133import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
134import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
135import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
136import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
137import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
138import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
139import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
140import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
141import org.apache.hadoop.hbase.security.Superusers;
142import org.apache.hadoop.hbase.security.User;
143import org.apache.hadoop.hbase.security.UserProvider;
144import org.apache.hadoop.hbase.trace.SpanReceiverHost;
145import org.apache.hadoop.hbase.trace.TraceUtil;
146import org.apache.hadoop.hbase.util.Addressing;
147import org.apache.hadoop.hbase.util.Bytes;
148import org.apache.hadoop.hbase.util.CompressionTest;
149import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
150import org.apache.hadoop.hbase.util.FSTableDescriptors;
151import org.apache.hadoop.hbase.util.FSUtils;
152import org.apache.hadoop.hbase.util.HasThread;
153import org.apache.hadoop.hbase.util.JvmPauseMonitor;
154import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
155import org.apache.hadoop.hbase.util.Pair;
156import org.apache.hadoop.hbase.util.RetryCounter;
157import org.apache.hadoop.hbase.util.RetryCounterFactory;
158import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
159import org.apache.hadoop.hbase.util.Sleeper;
160import org.apache.hadoop.hbase.util.Threads;
161import org.apache.hadoop.hbase.util.VersionInfo;
162import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
163import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
164import org.apache.hadoop.hbase.wal.WAL;
165import org.apache.hadoop.hbase.wal.WALFactory;
166import org.apache.hadoop.hbase.wal.WALProvider;
167import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
168import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
169import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
170import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
171import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
172import org.apache.hadoop.hbase.zookeeper.ZKUtil;
173import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
174import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
175import org.apache.hadoop.ipc.RemoteException;
176import org.apache.hadoop.util.ReflectionUtils;
177import org.apache.yetus.audience.InterfaceAudience;
178import org.apache.zookeeper.KeeperException;
179import org.slf4j.Logger;
180import org.slf4j.LoggerFactory;
181import sun.misc.Signal;
182import sun.misc.SignalHandler;
183
184import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
185import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
186import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
187import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
188import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
189import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
190import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
191import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
192import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
193
194import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
195import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
224
225/**
226 * HRegionServer makes a set of HRegions available to clients. It checks in with
227 * the HMaster. There are many HRegionServers in a single HBase deployment.
228 */
229@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
230@SuppressWarnings({ "deprecation"})
231public class HRegionServer extends HasThread implements
232    RegionServerServices, LastSequenceId, ConfigurationObserver {
233  // Time to pause if master says 'please hold'. Make configurable if needed.
234  private static final int INIT_PAUSE_TIME_MS = 1000;
235
236  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
237
238  /**
239   * For testing only!  Set to true to skip notifying region assignment to master .
240   */
241  @VisibleForTesting
242  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
243  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
244
245  //RegionName vs current action in progress
246  //true - if open region action in progress
247  //false - if close region action in progress
248  protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
249    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
250
251  // Cache flushing
252  protected MemStoreFlusher cacheFlusher;
253
254  protected HeapMemoryManager hMemManager;
255
256  /**
257   * Cluster connection to be shared by services.
258   * Initialized at server startup and closed when server shuts down.
259   * Clients must never close it explicitly.
260   * Clients hosted by this Server should make use of this clusterConnection rather than create
261   * their own; if they create their own, there is no way for the hosting server to shutdown
262   * ongoing client RPCs.
263   */
264  protected ClusterConnection clusterConnection;
265
266  /*
267   * Long-living meta table locator, which is created when the server is started and stopped
268   * when server shuts down. References to this locator shall be used to perform according
269   * operations in EventHandlers. Primary reason for this decision is to make it mockable
270   * for tests.
271   */
272  protected MetaTableLocator metaTableLocator;
273
274  /**
275   * Go here to get table descriptors.
276   */
277  protected TableDescriptors tableDescriptors;
278
279  // Replication services. If no replication, this handler will be null.
280  protected ReplicationSourceService replicationSourceHandler;
281  protected ReplicationSinkService replicationSinkHandler;
282
283  // Compactions
284  public CompactSplit compactSplitThread;
285
286  /**
287   * Map of regions currently being served by this region server. Key is the
288   * encoded region name.  All access should be synchronized.
289   */
290  protected final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
291
292  /**
293   * Map of encoded region names to the DataNode locations they should be hosted on
294   * We store the value as InetSocketAddress since this is used only in HDFS
295   * API (create() that takes favored nodes as hints for placing file blocks).
296   * We could have used ServerName here as the value class, but we'd need to
297   * convert it to InetSocketAddress at some point before the HDFS API call, and
298   * it seems a bit weird to store ServerName since ServerName refers to RegionServers
299   * and here we really mean DataNode locations.
300   */
301  protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
302      new ConcurrentHashMap<>();
303
304  // Leases
305  protected Leases leases;
306
307  // Instance of the hbase executor executorService.
308  protected ExecutorService executorService;
309
310  // If false, the file system has become unavailable
311  protected volatile boolean fsOk;
312  protected HFileSystem fs;
313  protected HFileSystem walFs;
314
315  // Set when a report to the master comes back with a message asking us to
316  // shutdown. Also set by call to stop when debugging or running unit tests
317  // of HRegionServer in isolation.
318  private volatile boolean stopped = false;
319
320  // Go down hard. Used if file system becomes unavailable and also in
321  // debugging and unit tests.
322  private volatile boolean abortRequested;
323  public static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
324  // Default abort timeout is 1200 seconds for safe
325  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
326  // Will run this task when abort timeout
327  public static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
328
329  ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<>();
330
331  // A state before we go into stopped state.  At this stage we're closing user
332  // space regions.
333  private boolean stopping = false;
334
335  volatile boolean killed = false;
336
337  private volatile boolean shutDown = false;
338
339  protected final Configuration conf;
340
341  private Path rootDir;
342  private Path walRootDir;
343
344  protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
345
346  final int numRetries;
347  protected final int threadWakeFrequency;
348  protected final int msgInterval;
349
350  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
351  private final int compactionCheckFrequency;
352  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
353  private final int flushCheckFrequency;
354
355  protected final int numRegionsToReport;
356
357  // Stub to do region server status calls against the master.
358  private volatile RegionServerStatusService.BlockingInterface rssStub;
359  private volatile LockService.BlockingInterface lockStub;
360  // RPC client. Used to make the stub above that does region server status checking.
361  RpcClient rpcClient;
362
363  private RpcRetryingCallerFactory rpcRetryingCallerFactory;
364  private RpcControllerFactory rpcControllerFactory;
365
366  private UncaughtExceptionHandler uncaughtExceptionHandler;
367
368  // Info server. Default access so can be used by unit tests. REGIONSERVER
369  // is name of the webapp and the attribute name used stuffing this instance
370  // into web context.
371  protected InfoServer infoServer;
372  private JvmPauseMonitor pauseMonitor;
373
374  /** region server process name */
375  public static final String REGIONSERVER = "regionserver";
376
377  MetricsRegionServer metricsRegionServer;
378  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
379  MetricsTable metricsTable;
380  private SpanReceiverHost spanReceiverHost;
381
382  /**
383   * ChoreService used to schedule tasks that we want to run periodically
384   */
385  private ChoreService choreService;
386
387  /*
388   * Check for compactions requests.
389   */
390  ScheduledChore compactionChecker;
391
392  /*
393   * Check for flushes
394   */
395  ScheduledChore periodicFlusher;
396
397  protected volatile WALFactory walFactory;
398
399  // WAL roller. log is protected rather than private to avoid
400  // eclipse warning when accessed by inner classes
401  protected LogRoller walRoller;
402
403  // A thread which calls reportProcedureDone
404  private RemoteProcedureResultReporter procedureResultReporter;
405
406  // flag set after we're done setting up server threads
407  final AtomicBoolean online = new AtomicBoolean(false);
408
409  // zookeeper connection and watcher
410  protected final ZKWatcher zooKeeper;
411
412  // master address tracker
413  private final MasterAddressTracker masterAddressTracker;
414
415  // Cluster Status Tracker
416  protected final ClusterStatusTracker clusterStatusTracker;
417
418  // Log Splitting Worker
419  private SplitLogWorker splitLogWorker;
420
421  // A sleeper that sleeps for msgInterval.
422  protected final Sleeper sleeper;
423
424  private final int operationTimeout;
425  private final int shortOperationTimeout;
426
427  private final RegionServerAccounting regionServerAccounting;
428
429  // Cache configuration and block cache reference
430  protected CacheConfig cacheConfig;
431  // Cache configuration for mob
432  final MobCacheConfig mobCacheConfig;
433
434  /** The health check chore. */
435  private HealthCheckChore healthCheckChore;
436
437  /** The nonce manager chore. */
438  private ScheduledChore nonceManagerChore;
439
440  private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
441
442  /**
443   * The server name the Master sees us as.  Its made from the hostname the
444   * master passes us, port, and server startcode. Gets set after registration
445   * against  Master.
446   */
447  protected ServerName serverName;
448
449  /*
450   * hostname specified by hostname config
451   */
452  protected String useThisHostnameInstead;
453
454  // key to the config parameter of server hostname
455  // the specification of server hostname is optional. The hostname should be resolvable from
456  // both master and region server
457  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
458  final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
459  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
460  protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
461
462  // HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive.
463  // Exception will be thrown if both are used.
464  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
465    "hbase.regionserver.hostname.disable.master.reversedns";
466
467  /**
468   * This servers startcode.
469   */
470  protected final long startcode;
471
472  /**
473   * Unique identifier for the cluster we are a part of.
474   */
475  protected String clusterId;
476
477  /**
478   * Chore to clean periodically the moved region list
479   */
480  private MovedRegionsCleaner movedRegionsCleaner;
481
482  // chore for refreshing store files for secondary regions
483  private StorefileRefresherChore storefileRefresher;
484
485  private RegionServerCoprocessorHost rsHost;
486
487  private RegionServerProcedureManagerHost rspmHost;
488
489  private RegionServerRpcQuotaManager rsQuotaManager;
490  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
491
492  /**
493   * Nonce manager. Nonces are used to make operations like increment and append idempotent
494   * in the case where client doesn't receive the response from a successful operation and
495   * retries. We track the successful ops for some time via a nonce sent by client and handle
496   * duplicate operations (currently, by failing them; in future we might use MVCC to return
497   * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
498   * HBASE-3787) are:
499   * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
500   *   of past records. If we don't read the records, we don't read and recover the nonces.
501   *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
502   * - There's no WAL recovery during normal region move, so nonces will not be transfered.
503   * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
504   * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
505   * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
506   * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
507   * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
508   * latest nonce in it expired. It can also be recovered during move.
509   */
510  final ServerNonceManager nonceManager;
511
512  private UserProvider userProvider;
513
514  protected final RSRpcServices rpcServices;
515
516  protected CoordinatedStateManager csm;
517
518  /**
519   * Configuration manager is used to register/deregister and notify the configuration observers
520   * when the regionserver is notified that there was a change in the on disk configs.
521   */
522  protected final ConfigurationManager configurationManager;
523
524  @VisibleForTesting
525  CompactedHFilesDischarger compactedFileDischarger;
526
527  private volatile ThroughputController flushThroughputController;
528
529  protected SecureBulkLoadManager secureBulkLoadManager;
530
531  protected FileSystemUtilizationChore fsUtilizationChore;
532
533  private final NettyEventLoopGroupConfig eventLoopGroupConfig;
534
535  /**
536   * True if this RegionServer is coming up in a cluster where there is no Master;
537   * means it needs to just come up and make do without a Master to talk to: e.g. in test or
538   * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
539   * purpose is as a Replication-stream sink; see HBASE-18846 for more.
540   */
541  private final boolean masterless;
542  static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
543
544  /**
545   * Starts a HRegionServer at the default location
546   */
547  // Don't start any services or managers in here in the Constructor.
548  // Defer till after we register with the Master as much as possible. See #startServices.
549  public HRegionServer(Configuration conf) throws IOException {
550    super("RegionServer");  // thread name
551    TraceUtil.initTracer(conf);
552    try {
553      this.startcode = System.currentTimeMillis();
554      this.conf = conf;
555      this.fsOk = true;
556      this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
557      this.eventLoopGroupConfig = setupNetty(this.conf);
558      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
559      HFile.checkHFileVersion(this.conf);
560      checkCodecs(this.conf);
561      this.userProvider = UserProvider.instantiate(conf);
562      FSUtils.setupShortCircuitRead(this.conf);
563
564      // Disable usage of meta replicas in the regionserver
565      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
566      // Config'ed params
567      this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
568          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
569      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
570      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
571      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
572      this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
573
574      this.sleeper = new Sleeper(this.msgInterval, this);
575
576      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
577      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
578
579      this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10);
580
581      this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
582          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
583
584      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
585          HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
586
587      this.abortRequested = false;
588      this.stopped = false;
589
590      rpcServices = createRpcServices();
591      useThisHostnameInstead = getUseThisHostnameInstead(conf);
592      String hostName =
593          StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName()
594              : this.useThisHostnameInstead;
595      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
596
597      rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
598      rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
599
600      // login the zookeeper client principal (if using security)
601      ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
602          HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
603      // login the server principal (if using secure Hadoop)
604      login(userProvider, hostName);
605      // init superusers and add the server principal (if using security)
606      // or process owner as default super user.
607      Superusers.initialize(conf);
608      regionServerAccounting = new RegionServerAccounting(conf);
609
610      boolean isMasterNotCarryTable =
611          this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf);
612      // no need to instantiate global block cache when master not carry table
613      if (!isMasterNotCarryTable) {
614        CacheConfig.instantiateBlockCache(conf);
615      }
616      cacheConfig = new CacheConfig(conf);
617      mobCacheConfig = new MobCacheConfig(conf);
618
619      uncaughtExceptionHandler = new UncaughtExceptionHandler() {
620        @Override
621        public void uncaughtException(Thread t, Throwable e) {
622          abort("Uncaught exception in executorService thread " + t.getName(), e);
623        }
624      };
625
626      initializeFileSystem();
627      spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
628
629      this.configurationManager = new ConfigurationManager();
630      setupWindows(getConfiguration(), getConfigurationManager());
631
632      // Some unit tests don't need a cluster, so no zookeeper at all
633      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
634        // Open connection to zookeeper and set primary watcher
635        zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
636          rpcServices.isa.getPort(), this, canCreateBaseZNode());
637        // If no master in cluster, skip trying to track one or look for a cluster status.
638        if (!this.masterless) {
639          this.csm = new ZkCoordinatedStateManager(this);
640
641          masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
642          masterAddressTracker.start();
643
644          clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
645          clusterStatusTracker.start();
646        } else {
647          masterAddressTracker = null;
648          clusterStatusTracker = null;
649        }
650      } else {
651        zooKeeper = null;
652        masterAddressTracker = null;
653        clusterStatusTracker = null;
654      }
655      this.rpcServices.start(zooKeeper);
656      // This violates 'no starting stuff in Constructor' but Master depends on the below chore
657      // and executor being created and takes a different startup route. Lots of overlap between HRS
658      // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
659      // Master expects Constructor to put up web servers. Ugh.
660      // class HRS. TODO.
661      this.choreService = new ChoreService(getName(), true);
662      this.executorService = new ExecutorService(getName());
663      putUpWebUI();
664    } catch (Throwable t) {
665      // Make sure we log the exception. HRegionServer is often started via reflection and the
666      // cause of failed startup is lost.
667      LOG.error("Failed construction RegionServer", t);
668      throw t;
669    }
670  }
671
672  // HMaster should override this method to load the specific config for master
673  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
674    String hostname = conf.get(RS_HOSTNAME_KEY);
675    if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
676      if (!StringUtils.isBlank(hostname)) {
677        String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY +
678          " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
679          " to true while " + RS_HOSTNAME_KEY + " is used";
680        throw new IOException(msg);
681      } else {
682        return rpcServices.isa.getHostName();
683      }
684    } else {
685      return hostname;
686    }
687  }
688
689  /**
690   * If running on Windows, do windows-specific setup.
691   */
692  private static void setupWindows(final Configuration conf, ConfigurationManager cm) {
693    if (!SystemUtils.IS_OS_WINDOWS) {
694      Signal.handle(new Signal("HUP"), new SignalHandler() {
695        @Override
696        public void handle(Signal signal) {
697          conf.reloadConfiguration();
698          cm.notifyAllObservers(conf);
699        }
700      });
701    }
702  }
703
704  private static NettyEventLoopGroupConfig setupNetty(Configuration conf) {
705    // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
706    NettyEventLoopGroupConfig nelgc =
707      new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
708    NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
709    NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
710    return nelgc;
711  }
712
713  private void initializeFileSystem() throws IOException {
714    // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
715    // checksum verification enabled, then automatically switch off hdfs checksum verification.
716    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
717    FSUtils.setFsDefault(this.conf, FSUtils.getWALRootDir(this.conf));
718    this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
719    this.walRootDir = FSUtils.getWALRootDir(this.conf);
720    // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
721    // underlying hadoop hdfs accessors will be going against wrong filesystem
722    // (unless all is set to defaults).
723    FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
724    this.fs = new HFileSystem(this.conf, useHBaseChecksum);
725    this.rootDir = FSUtils.getRootDir(this.conf);
726    this.tableDescriptors = getFsTableDescriptors();
727  }
728
729  protected TableDescriptors getFsTableDescriptors() throws IOException {
730    return new FSTableDescriptors(this.conf,
731      this.fs, this.rootDir, !canUpdateTableDescriptor(), false, getMetaTableObserver());
732  }
733
734  protected Function<TableDescriptorBuilder, TableDescriptorBuilder> getMetaTableObserver() {
735    return null;
736  }
737
738  protected void login(UserProvider user, String host) throws IOException {
739    user.login("hbase.regionserver.keytab.file",
740      "hbase.regionserver.kerberos.principal", host);
741  }
742
743
744  /**
745   * Wait for an active Master.
746   * See override in Master superclass for how it is used.
747   */
748  protected void waitForMasterActive() {}
749
750  protected String getProcessName() {
751    return REGIONSERVER;
752  }
753
754  protected boolean canCreateBaseZNode() {
755    return this.masterless;
756  }
757
758  protected boolean canUpdateTableDescriptor() {
759    return false;
760  }
761
762  protected RSRpcServices createRpcServices() throws IOException {
763    return new RSRpcServices(this);
764  }
765
766  protected void configureInfoServer() {
767    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
768    infoServer.setAttribute(REGIONSERVER, this);
769  }
770
771  protected Class<? extends HttpServlet> getDumpServlet() {
772    return RSDumpServlet.class;
773  }
774
775  @Override
776  public boolean registerService(com.google.protobuf.Service instance) {
777    /*
778     * No stacking of instances is allowed for a single executorService name
779     */
780    com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
781        instance.getDescriptorForType();
782    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
783    if (coprocessorServiceHandlers.containsKey(serviceName)) {
784      LOG.error("Coprocessor executorService " + serviceName
785          + " already registered, rejecting request from " + instance);
786      return false;
787    }
788
789    coprocessorServiceHandlers.put(serviceName, instance);
790    if (LOG.isDebugEnabled()) {
791      LOG.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName);
792    }
793    return true;
794  }
795
796  /**
797   * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to
798   * the local server; i.e. a short-circuit Connection. Safe to use going to local or remote
799   * server. Create this instance in a method can be intercepted and mocked in tests.
800   * @throws IOException
801   */
802  @VisibleForTesting
803  protected ClusterConnection createClusterConnection() throws IOException {
804    Configuration conf = this.conf;
805    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
806      // Use server ZK cluster for server-issued connections, so we clone
807      // the conf and unset the client ZK related properties
808      conf = new Configuration(this.conf);
809      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
810    }
811    // Create a cluster connection that when appropriate, can short-circuit and go directly to the
812    // local server if the request is to the local server bypassing RPC. Can be used for both local
813    // and remote invocations.
814    return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
815      serverName, rpcServices, rpcServices);
816  }
817
818  /**
819   * Run test on configured codecs to make sure supporting libs are in place.
820   * @param c
821   * @throws IOException
822   */
823  private static void checkCodecs(final Configuration c) throws IOException {
824    // check to see if the codec list is available:
825    String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
826    if (codecs == null) return;
827    for (String codec : codecs) {
828      if (!CompressionTest.testCompression(codec)) {
829        throw new IOException("Compression codec " + codec +
830          " not supported, aborting RS construction");
831      }
832    }
833  }
834
835  public String getClusterId() {
836    return this.clusterId;
837  }
838
839  /**
840   * Setup our cluster connection if not already initialized.
841   * @throws IOException
842   */
843  protected synchronized void setupClusterConnection() throws IOException {
844    if (clusterConnection == null) {
845      clusterConnection = createClusterConnection();
846      metaTableLocator = new MetaTableLocator();
847    }
848  }
849
850  /**
851   * All initialization needed before we go register with Master.<br>
852   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
853   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
854   */
855  private void preRegistrationInitialization() {
856    try {
857      initializeZooKeeper();
858      setupClusterConnection();
859      // Setup RPC client for master communication
860      this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
861          this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
862    } catch (Throwable t) {
863      // Call stop if error or process will stick around for ever since server
864      // puts up non-daemon threads.
865      this.rpcServices.stop();
866      abort("Initialization of RS failed.  Hence aborting RS.", t);
867    }
868  }
869
870  /**
871   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
872   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
873   * <p>
874   * Finally open long-living server short-circuit connection.
875   */
876  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
877    justification="cluster Id znode read would give us correct response")
878  private void initializeZooKeeper() throws IOException, InterruptedException {
879    // Nothing to do in here if no Master in the mix.
880    if (this.masterless) {
881      return;
882    }
883
884    // Create the master address tracker, register with zk, and start it.  Then
885    // block until a master is available.  No point in starting up if no master
886    // running.
887    blockAndCheckIfStopped(this.masterAddressTracker);
888
889    // Wait on cluster being up.  Master will set this flag up in zookeeper
890    // when ready.
891    blockAndCheckIfStopped(this.clusterStatusTracker);
892
893    // If we are HMaster then the cluster id should have already been set.
894    if (clusterId == null) {
895      // Retrieve clusterId
896      // Since cluster status is now up
897      // ID should have already been set by HMaster
898      try {
899        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
900        if (clusterId == null) {
901          this.abort("Cluster ID has not been set");
902        }
903        LOG.info("ClusterId : " + clusterId);
904      } catch (KeeperException e) {
905        this.abort("Failed to retrieve Cluster ID", e);
906      }
907    }
908
909    waitForMasterActive();
910    if (isStopped() || isAborted()) {
911      return; // No need for further initialization
912    }
913
914    // watch for snapshots and other procedures
915    try {
916      rspmHost = new RegionServerProcedureManagerHost();
917      rspmHost.loadProcedures(conf);
918      rspmHost.initialize(this);
919    } catch (KeeperException e) {
920      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
921    }
922  }
923
924  /**
925   * Utilty method to wait indefinitely on a znode availability while checking
926   * if the region server is shut down
927   * @param tracker znode tracker to use
928   * @throws IOException any IO exception, plus if the RS is stopped
929   * @throws InterruptedException
930   */
931  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
932      throws IOException, InterruptedException {
933    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
934      if (this.stopped) {
935        throw new IOException("Received the shutdown message while waiting.");
936      }
937    }
938  }
939
940  /**
941   * @return True if the cluster is up.
942   */
943  @Override
944  public boolean isClusterUp() {
945    return this.masterless ||
946        (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
947  }
948
949  /**
950   * The HRegionServer sticks in this loop until closed.
951   */
952  @Override
953  public void run() {
954    try {
955      // Do pre-registration initializations; zookeeper, lease threads, etc.
956      preRegistrationInitialization();
957    } catch (Throwable e) {
958      abort("Fatal exception during initialization", e);
959    }
960
961    try {
962      if (!isStopped() && !isAborted()) {
963        ShutdownHook.install(conf, fs, this, Thread.currentThread());
964        // Initialize the RegionServerCoprocessorHost now that our ephemeral
965        // node was created, in case any coprocessors want to use ZooKeeper
966        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
967      }
968
969      // Try and register with the Master; tell it we are here.  Break if server is stopped or the
970      // clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and start
971      // up all Services. Use RetryCounter to get backoff in case Master is struggling to come up.
972      LOG.debug("About to register with Master.");
973      RetryCounterFactory rcf = new RetryCounterFactory(Integer.MAX_VALUE,
974          this.sleeper.getPeriod(), 1000 * 60 * 5);
975      RetryCounter rc = rcf.create();
976      while (keepLooping()) {
977        RegionServerStartupResponse w = reportForDuty();
978        if (w == null) {
979          long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
980          LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
981          this.sleeper.sleep(sleepTime);
982        } else {
983          handleReportForDutyResponse(w);
984          break;
985        }
986      }
987
988      if (!isStopped() && isHealthy()) {
989        // start the snapshot handler and other procedure handlers,
990        // since the server is ready to run
991        if (this.rspmHost != null) {
992          this.rspmHost.start();
993        }
994        // Start the Quota Manager
995        if (this.rsQuotaManager != null) {
996          rsQuotaManager.start(getRpcServer().getScheduler());
997        }
998        if (this.rsSpaceQuotaManager != null) {
999          this.rsSpaceQuotaManager.start();
1000        }
1001      }
1002
1003      // We registered with the Master.  Go into run mode.
1004      long lastMsg = System.currentTimeMillis();
1005      long oldRequestCount = -1;
1006      // The main run loop.
1007      while (!isStopped() && isHealthy()) {
1008        if (!isClusterUp()) {
1009          if (isOnlineRegionsEmpty()) {
1010            stop("Exiting; cluster shutdown set and not carrying any regions");
1011          } else if (!this.stopping) {
1012            this.stopping = true;
1013            LOG.info("Closing user regions");
1014            closeUserRegions(this.abortRequested);
1015          } else if (this.stopping) {
1016            boolean allUserRegionsOffline = areAllUserRegionsOffline();
1017            if (allUserRegionsOffline) {
1018              // Set stopped if no more write requests tp meta tables
1019              // since last time we went around the loop.  Any open
1020              // meta regions will be closed on our way out.
1021              if (oldRequestCount == getWriteRequestCount()) {
1022                stop("Stopped; only catalog regions remaining online");
1023                break;
1024              }
1025              oldRequestCount = getWriteRequestCount();
1026            } else {
1027              // Make sure all regions have been closed -- some regions may
1028              // have not got it because we were splitting at the time of
1029              // the call to closeUserRegions.
1030              closeUserRegions(this.abortRequested);
1031            }
1032            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1033          }
1034        }
1035        long now = System.currentTimeMillis();
1036        if ((now - lastMsg) >= msgInterval) {
1037          tryRegionServerReport(lastMsg, now);
1038          lastMsg = System.currentTimeMillis();
1039        }
1040        if (!isStopped() && !isAborted()) {
1041          this.sleeper.sleep();
1042        }
1043      } // for
1044    } catch (Throwable t) {
1045      if (!rpcServices.checkOOME(t)) {
1046        String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
1047        abort(prefix + t.getMessage(), t);
1048      }
1049    }
1050
1051    if (abortRequested) {
1052      Timer abortMonitor = new Timer("Abort regionserver monitor", true);
1053      TimerTask abortTimeoutTask = null;
1054      try {
1055        Constructor<? extends TimerTask> timerTaskCtor =
1056          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
1057            .asSubclass(TimerTask.class).getDeclaredConstructor();
1058        timerTaskCtor.setAccessible(true);
1059        abortTimeoutTask = timerTaskCtor.newInstance();
1060      } catch (Exception e) {
1061        LOG.warn("Initialize abort timeout task failed", e);
1062      }
1063      if (abortTimeoutTask != null) {
1064        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
1065      }
1066    }
1067
1068    if (this.leases != null) {
1069      this.leases.closeAfterLeasesExpire();
1070    }
1071    if (this.splitLogWorker != null) {
1072      splitLogWorker.stop();
1073    }
1074    if (this.infoServer != null) {
1075      LOG.info("Stopping infoServer");
1076      try {
1077        this.infoServer.stop();
1078      } catch (Exception e) {
1079        LOG.error("Failed to stop infoServer", e);
1080      }
1081    }
1082    // Send cache a shutdown.
1083    if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
1084      cacheConfig.getBlockCache().shutdown();
1085    }
1086    mobCacheConfig.getMobFileCache().shutdown();
1087
1088    if (movedRegionsCleaner != null) {
1089      movedRegionsCleaner.stop("Region Server stopping");
1090    }
1091
1092    // Send interrupts to wake up threads if sleeping so they notice shutdown.
1093    // TODO: Should we check they are alive? If OOME could have exited already
1094    if (this.hMemManager != null) this.hMemManager.stop();
1095    if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1096    if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1097    sendShutdownInterrupt();
1098
1099    // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1100    if (rspmHost != null) {
1101      rspmHost.stop(this.abortRequested || this.killed);
1102    }
1103
1104    if (this.killed) {
1105      // Just skip out w/o closing regions.  Used when testing.
1106    } else if (abortRequested) {
1107      if (this.fsOk) {
1108        closeUserRegions(abortRequested); // Don't leave any open file handles
1109      }
1110      LOG.info("aborting server " + this.serverName);
1111    } else {
1112      closeUserRegions(abortRequested);
1113      LOG.info("stopping server " + this.serverName);
1114    }
1115
1116    // so callers waiting for meta without timeout can stop
1117    if (this.metaTableLocator != null) this.metaTableLocator.stop();
1118    if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1119      try {
1120        this.clusterConnection.close();
1121      } catch (IOException e) {
1122        // Although the {@link Closeable} interface throws an {@link
1123        // IOException}, in reality, the implementation would never do that.
1124        LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
1125      }
1126    }
1127
1128    // Closing the compactSplit thread before closing meta regions
1129    if (!this.killed && containsMetaTableRegions()) {
1130      if (!abortRequested || this.fsOk) {
1131        if (this.compactSplitThread != null) {
1132          this.compactSplitThread.join();
1133          this.compactSplitThread = null;
1134        }
1135        closeMetaTableRegions(abortRequested);
1136      }
1137    }
1138
1139    if (!this.killed && this.fsOk) {
1140      waitOnAllRegionsToClose(abortRequested);
1141      LOG.info("stopping server " + this.serverName + "; all regions closed.");
1142    }
1143
1144    // Stop the quota manager
1145    if (rsQuotaManager != null) {
1146      rsQuotaManager.stop();
1147    }
1148    if (rsSpaceQuotaManager != null) {
1149      rsSpaceQuotaManager.stop();
1150      rsSpaceQuotaManager = null;
1151    }
1152
1153    //fsOk flag may be changed when closing regions throws exception.
1154    if (this.fsOk) {
1155      shutdownWAL(!abortRequested);
1156    }
1157
1158    // Make sure the proxy is down.
1159    if (this.rssStub != null) {
1160      this.rssStub = null;
1161    }
1162    if (this.lockStub != null) {
1163      this.lockStub = null;
1164    }
1165    if (this.rpcClient != null) {
1166      this.rpcClient.close();
1167    }
1168    if (this.leases != null) {
1169      this.leases.close();
1170    }
1171    if (this.pauseMonitor != null) {
1172      this.pauseMonitor.stop();
1173    }
1174
1175    if (!killed) {
1176      stopServiceThreads();
1177    }
1178
1179    if (this.rpcServices != null) {
1180      this.rpcServices.stop();
1181    }
1182
1183    try {
1184      deleteMyEphemeralNode();
1185    } catch (KeeperException.NoNodeException nn) {
1186    } catch (KeeperException e) {
1187      LOG.warn("Failed deleting my ephemeral node", e);
1188    }
1189    // We may have failed to delete the znode at the previous step, but
1190    //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1191    ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1192
1193    if (this.zooKeeper != null) {
1194      this.zooKeeper.close();
1195    }
1196    this.shutDown = true;
1197    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1198  }
1199
1200  private boolean containsMetaTableRegions() {
1201    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1202  }
1203
1204  private boolean areAllUserRegionsOffline() {
1205    if (getNumberOfOnlineRegions() > 2) return false;
1206    boolean allUserRegionsOffline = true;
1207    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1208      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1209        allUserRegionsOffline = false;
1210        break;
1211      }
1212    }
1213    return allUserRegionsOffline;
1214  }
1215
1216  /**
1217   * @return Current write count for all online regions.
1218   */
1219  private long getWriteRequestCount() {
1220    long writeCount = 0;
1221    for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1222      writeCount += e.getValue().getWriteRequestsCount();
1223    }
1224    return writeCount;
1225  }
1226
1227  @VisibleForTesting
1228  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1229      throws IOException {
1230    RegionServerStatusService.BlockingInterface rss = rssStub;
1231    if (rss == null) {
1232      // the current server could be stopping.
1233      return;
1234    }
1235    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1236    try {
1237      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1238      request.setServer(ProtobufUtil.toServerName(this.serverName));
1239      request.setLoad(sl);
1240      rss.regionServerReport(null, request.build());
1241    } catch (ServiceException se) {
1242      IOException ioe = ProtobufUtil.getRemoteException(se);
1243      if (ioe instanceof YouAreDeadException) {
1244        // This will be caught and handled as a fatal error in run()
1245        throw ioe;
1246      }
1247      if (rssStub == rss) {
1248        rssStub = null;
1249      }
1250      // Couldn't connect to the master, get location from zk and reconnect
1251      // Method blocks until new master is found or we are stopped
1252      createRegionServerStatusStub(true);
1253    }
1254  }
1255
1256  /**
1257   * Reports the given map of Regions and their size on the filesystem to the active Master.
1258   *
1259   * @param regionSizeStore The store containing region sizes
1260   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1261   */
1262  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1263    RegionServerStatusService.BlockingInterface rss = rssStub;
1264    if (rss == null) {
1265      // the current server could be stopping.
1266      LOG.trace("Skipping Region size report to HMaster as stub is null");
1267      return true;
1268    }
1269    try {
1270      buildReportAndSend(rss, regionSizeStore);
1271    } catch (ServiceException se) {
1272      IOException ioe = ProtobufUtil.getRemoteException(se);
1273      if (ioe instanceof PleaseHoldException) {
1274        LOG.trace("Failed to report region sizes to Master because it is initializing."
1275            + " This will be retried.", ioe);
1276        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1277        return true;
1278      }
1279      if (rssStub == rss) {
1280        rssStub = null;
1281      }
1282      createRegionServerStatusStub(true);
1283      if (ioe instanceof DoNotRetryIOException) {
1284        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1285        if (doNotRetryEx.getCause() != null) {
1286          Throwable t = doNotRetryEx.getCause();
1287          if (t instanceof UnsupportedOperationException) {
1288            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1289            return false;
1290          }
1291        }
1292      }
1293      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1294    }
1295    return true;
1296  }
1297
1298  /**
1299   * Builds the region size report and sends it to the master. Upon successful sending of the
1300   * report, the region sizes that were sent are marked as sent.
1301   *
1302   * @param rss The stub to send to the Master
1303   * @param regionSizeStore The store containing region sizes
1304   */
1305  void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1306      RegionSizeStore regionSizeStore) throws ServiceException {
1307    RegionSpaceUseReportRequest request =
1308        buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1309    rss.reportRegionSpaceUse(null, request);
1310    // Record the number of size reports sent
1311    if (metricsRegionServer != null) {
1312      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1313    }
1314  }
1315
1316  /**
1317   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1318   *
1319   * @param regionSizeStore The size in bytes of regions
1320   * @return The corresponding protocol buffer message.
1321   */
1322  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1323    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1324    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1325      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1326    }
1327    return request.build();
1328  }
1329
1330  /**
1331   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1332   * protobuf message.
1333   *
1334   * @param regionInfo The RegionInfo
1335   * @param sizeInBytes The size in bytes of the Region
1336   * @return The protocol buffer
1337   */
1338  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1339    return RegionSpaceUse.newBuilder()
1340        .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1341        .setRegionSize(Objects.requireNonNull(sizeInBytes))
1342        .build();
1343  }
1344
1345  ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1346      throws IOException {
1347    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1348    // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1349    // the wrapper to compute those numbers in one place.
1350    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1351    // Instead they should be stored in an HBase table so that external visibility into HBase is
1352    // improved; Additionally the load balancer will be able to take advantage of a more complete
1353    // history.
1354    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1355    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1356    long usedMemory = -1L;
1357    long maxMemory = -1L;
1358    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1359    if (usage != null) {
1360      usedMemory = usage.getUsed();
1361      maxMemory = usage.getMax();
1362    }
1363
1364    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1365    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1366    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1367    serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1368    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1369    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1370    Builder coprocessorBuilder = Coprocessor.newBuilder();
1371    for (String coprocessor : coprocessors) {
1372      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1373    }
1374    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1375    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1376    for (HRegion region : regions) {
1377      if (region.getCoprocessorHost() != null) {
1378        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1379        Iterator<String> iterator = regionCoprocessors.iterator();
1380        while (iterator.hasNext()) {
1381          serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1382        }
1383      }
1384      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1385      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1386          .getCoprocessors()) {
1387        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1388      }
1389    }
1390    serverLoad.setReportStartTime(reportStartTime);
1391    serverLoad.setReportEndTime(reportEndTime);
1392    if (this.infoServer != null) {
1393      serverLoad.setInfoServerPort(this.infoServer.getPort());
1394    } else {
1395      serverLoad.setInfoServerPort(-1);
1396    }
1397
1398    // for the replicationLoad purpose. Only need to get from one executorService
1399    // either source or sink will get the same info
1400    ReplicationSourceService rsources = getReplicationSourceService();
1401
1402    if (rsources != null) {
1403      // always refresh first to get the latest value
1404      ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1405      if (rLoad != null) {
1406        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1407        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1408          serverLoad.addReplLoadSource(rLS);
1409        }
1410      }
1411    }
1412
1413    return serverLoad.build();
1414  }
1415
1416  String getOnlineRegionsAsPrintableString() {
1417    StringBuilder sb = new StringBuilder();
1418    for (Region r: this.onlineRegions.values()) {
1419      if (sb.length() > 0) sb.append(", ");
1420      sb.append(r.getRegionInfo().getEncodedName());
1421    }
1422    return sb.toString();
1423  }
1424
1425  /**
1426   * Wait on regions close.
1427   */
1428  private void waitOnAllRegionsToClose(final boolean abort) {
1429    // Wait till all regions are closed before going out.
1430    int lastCount = -1;
1431    long previousLogTime = 0;
1432    Set<String> closedRegions = new HashSet<>();
1433    boolean interrupted = false;
1434    try {
1435      while (!isOnlineRegionsEmpty()) {
1436        int count = getNumberOfOnlineRegions();
1437        // Only print a message if the count of regions has changed.
1438        if (count != lastCount) {
1439          // Log every second at most
1440          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1441            previousLogTime = System.currentTimeMillis();
1442            lastCount = count;
1443            LOG.info("Waiting on " + count + " regions to close");
1444            // Only print out regions still closing if a small number else will
1445            // swamp the log.
1446            if (count < 10 && LOG.isDebugEnabled()) {
1447              LOG.debug("Online Regions=" + this.onlineRegions);
1448            }
1449          }
1450        }
1451        // Ensure all user regions have been sent a close. Use this to
1452        // protect against the case where an open comes in after we start the
1453        // iterator of onlineRegions to close all user regions.
1454        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1455          RegionInfo hri = e.getValue().getRegionInfo();
1456          if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) &&
1457              !closedRegions.contains(hri.getEncodedName())) {
1458            closedRegions.add(hri.getEncodedName());
1459            // Don't update zk with this close transition; pass false.
1460            closeRegionIgnoreErrors(hri, abort);
1461          }
1462        }
1463        // No regions in RIT, we could stop waiting now.
1464        if (this.regionsInTransitionInRS.isEmpty()) {
1465          if (!isOnlineRegionsEmpty()) {
1466            LOG.info("We were exiting though online regions are not empty," +
1467                " because some regions failed closing");
1468          }
1469          break;
1470        }
1471        if (sleep(200)) {
1472          interrupted = true;
1473        }
1474      }
1475    } finally {
1476      if (interrupted) {
1477        Thread.currentThread().interrupt();
1478      }
1479    }
1480  }
1481
1482  private boolean sleep(long millis) {
1483    boolean interrupted = false;
1484    try {
1485      Thread.sleep(millis);
1486    } catch (InterruptedException e) {
1487      LOG.warn("Interrupted while sleeping");
1488      interrupted = true;
1489    }
1490    return interrupted;
1491  }
1492
1493  private void shutdownWAL(final boolean close) {
1494    if (this.walFactory != null) {
1495      try {
1496        if (close) {
1497          walFactory.close();
1498        } else {
1499          walFactory.shutdown();
1500        }
1501      } catch (Throwable e) {
1502        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1503        LOG.error("Shutdown / close of WAL failed: " + e);
1504        LOG.debug("Shutdown / close exception details:", e);
1505      }
1506    }
1507  }
1508
1509  /*
1510   * Run init. Sets up wal and starts up all server threads.
1511   *
1512   * @param c Extra configuration.
1513   */
1514  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1515  throws IOException {
1516    try {
1517      boolean updateRootDir = false;
1518      for (NameStringPair e : c.getMapEntriesList()) {
1519        String key = e.getName();
1520        // The hostname the master sees us as.
1521        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1522          String hostnameFromMasterPOV = e.getValue();
1523          this.serverName = ServerName.valueOf(hostnameFromMasterPOV, rpcServices.isa.getPort(),
1524              this.startcode);
1525          if (!StringUtils.isBlank(useThisHostnameInstead) &&
1526              !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1527            String msg = "Master passed us a different hostname to use; was=" +
1528                this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1529            LOG.error(msg);
1530            throw new IOException(msg);
1531          }
1532          if (StringUtils.isBlank(useThisHostnameInstead) &&
1533              !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1534            String msg = "Master passed us a different hostname to use; was=" +
1535                rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1536            LOG.error(msg);
1537          }
1538          continue;
1539        }
1540
1541        String value = e.getValue();
1542        if (key.equals(HConstants.HBASE_DIR)) {
1543          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1544            updateRootDir = true;
1545          }
1546        }
1547
1548        if (LOG.isDebugEnabled()) {
1549          LOG.debug("Config from master: " + key + "=" + value);
1550        }
1551        this.conf.set(key, value);
1552      }
1553      // Set our ephemeral znode up in zookeeper now we have a name.
1554      createMyEphemeralNode();
1555
1556      if (updateRootDir) {
1557        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1558        initializeFileSystem();
1559      }
1560
1561      // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1562      // config param for task trackers, but we can piggyback off of it.
1563      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1564        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1565      }
1566
1567      // Save it in a file, this will allow to see if we crash
1568      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1569
1570      // This call sets up an initialized replication and WAL. Later we start it up.
1571      setupWALAndReplication();
1572      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1573      // Init in here rather than in constructor after thread name has been set
1574      this.metricsRegionServer = new MetricsRegionServer(metricsRegionServerImpl, conf);
1575      this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1576      // Now that we have a metrics source, start the pause monitor
1577      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1578      pauseMonitor.start();
1579
1580      // There is a rare case where we do NOT want services to start. Check config.
1581      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1582        startServices();
1583      }
1584      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1585      // or make sense of it.
1586      startReplicationService();
1587
1588
1589      // Set up ZK
1590      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa +
1591          ", sessionid=0x" +
1592          Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1593
1594      // Wake up anyone waiting for this server to online
1595      synchronized (online) {
1596        online.set(true);
1597        online.notifyAll();
1598      }
1599    } catch (Throwable e) {
1600      stop("Failed initialization");
1601      throw convertThrowableToIOE(cleanup(e, "Failed init"),
1602          "Region server startup failed");
1603    } finally {
1604      sleeper.skipSleepCycle();
1605    }
1606  }
1607
1608  protected void initializeMemStoreChunkCreator() {
1609    if (MemStoreLAB.isEnabled(conf)) {
1610      // MSLAB is enabled. So initialize MemStoreChunkPool
1611      // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1612      // it.
1613      Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
1614      long globalMemStoreSize = pair.getFirst();
1615      boolean offheap = this.regionServerAccounting.isOffheap();
1616      // When off heap memstore in use, take full area for chunk pool.
1617      float poolSizePercentage = offheap? 1.0F:
1618          conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
1619      float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
1620          MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
1621      int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
1622      // init the chunkCreator
1623      ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
1624        initialCountPercentage, this.hMemManager);
1625    }
1626  }
1627
1628  private void startHeapMemoryManager() {
1629    this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this,
1630        this.regionServerAccounting);
1631    if (this.hMemManager != null) {
1632      this.hMemManager.start(getChoreService());
1633    }
1634  }
1635
1636  private void createMyEphemeralNode() throws KeeperException, IOException {
1637    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1638    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1639    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1640    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1641    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1642  }
1643
1644  private void deleteMyEphemeralNode() throws KeeperException {
1645    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1646  }
1647
1648  @Override
1649  public RegionServerAccounting getRegionServerAccounting() {
1650    return regionServerAccounting;
1651  }
1652
1653  /*
1654   * @param r Region to get RegionLoad for.
1655   * @param regionLoadBldr the RegionLoad.Builder, can be null
1656   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1657   * @return RegionLoad instance.
1658   *
1659   * @throws IOException
1660   */
1661  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1662      RegionSpecifier.Builder regionSpecifier) throws IOException {
1663    byte[] name = r.getRegionInfo().getRegionName();
1664    int stores = 0;
1665    int storefiles = 0;
1666    int storeUncompressedSizeMB = 0;
1667    int storefileSizeMB = 0;
1668    int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024);
1669    long storefileIndexSizeKB = 0;
1670    int rootLevelIndexSizeKB = 0;
1671    int totalStaticIndexSizeKB = 0;
1672    int totalStaticBloomSizeKB = 0;
1673    long totalCompactingKVs = 0;
1674    long currentCompactedKVs = 0;
1675    List<HStore> storeList = r.getStores();
1676    stores += storeList.size();
1677    for (HStore store : storeList) {
1678      storefiles += store.getStorefilesCount();
1679      storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1680      storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1681      //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1682      storefileIndexSizeKB += store.getStorefilesRootLevelIndexSize() / 1024;
1683      CompactionProgress progress = store.getCompactionProgress();
1684      if (progress != null) {
1685        totalCompactingKVs += progress.getTotalCompactingKVs();
1686        currentCompactedKVs += progress.currentCompactedKVs;
1687      }
1688      rootLevelIndexSizeKB += (int) (store.getStorefilesRootLevelIndexSize() / 1024);
1689      totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1690      totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1691    }
1692
1693    float dataLocality =
1694        r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1695    if (regionLoadBldr == null) {
1696      regionLoadBldr = RegionLoad.newBuilder();
1697    }
1698    if (regionSpecifier == null) {
1699      regionSpecifier = RegionSpecifier.newBuilder();
1700    }
1701    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1702    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1703    regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1704      .setStores(stores)
1705      .setStorefiles(storefiles)
1706      .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1707      .setStorefileSizeMB(storefileSizeMB)
1708      .setMemStoreSizeMB(memstoreSizeMB)
1709      .setStorefileIndexSizeKB(storefileIndexSizeKB)
1710      .setRootIndexSizeKB(rootLevelIndexSizeKB)
1711      .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1712      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1713      .setReadRequestsCount(r.getReadRequestsCount())
1714      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1715      .setWriteRequestsCount(r.getWriteRequestsCount())
1716      .setTotalCompactingKVs(totalCompactingKVs)
1717      .setCurrentCompactedKVs(currentCompactedKVs)
1718      .setDataLocality(dataLocality)
1719      .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1720    r.setCompleteSequenceId(regionLoadBldr);
1721
1722    return regionLoadBldr.build();
1723  }
1724
1725  /**
1726   * @param encodedRegionName
1727   * @return An instance of RegionLoad.
1728   */
1729  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1730    HRegion r = onlineRegions.get(encodedRegionName);
1731    return r != null ? createRegionLoad(r, null, null) : null;
1732  }
1733
1734  /*
1735   * Inner class that runs on a long period checking if regions need compaction.
1736   */
1737  private static class CompactionChecker extends ScheduledChore {
1738    private final HRegionServer instance;
1739    private final int majorCompactPriority;
1740    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1741    //Iteration is 1-based rather than 0-based so we don't check for compaction
1742    // immediately upon region server startup
1743    private long iteration = 1;
1744
1745    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1746      super("CompactionChecker", stopper, sleepTime);
1747      this.instance = h;
1748      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1749
1750      /* MajorCompactPriority is configurable.
1751       * If not set, the compaction will use default priority.
1752       */
1753      this.majorCompactPriority = this.instance.conf.
1754          getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1755              DEFAULT_PRIORITY);
1756    }
1757
1758    @Override
1759    protected void chore() {
1760      for (Region r : this.instance.onlineRegions.values()) {
1761        if (r == null) {
1762          continue;
1763        }
1764        HRegion hr = (HRegion) r;
1765        for (HStore s : hr.stores.values()) {
1766          try {
1767            long multiplier = s.getCompactionCheckMultiplier();
1768            assert multiplier > 0;
1769            if (iteration % multiplier != 0) {
1770              continue;
1771            }
1772            if (s.needsCompaction()) {
1773              // Queue a compaction. Will recognize if major is needed.
1774              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1775                getName() + " requests compaction");
1776            } else if (s.shouldPerformMajorCompaction()) {
1777              s.triggerMajorCompaction();
1778              if (majorCompactPriority == DEFAULT_PRIORITY ||
1779                  majorCompactPriority > hr.getCompactPriority()) {
1780                this.instance.compactSplitThread.requestCompaction(hr, s,
1781                    getName() + " requests major compaction; use default priority",
1782                    Store.NO_PRIORITY,
1783                CompactionLifeCycleTracker.DUMMY, null);
1784              } else {
1785                this.instance.compactSplitThread.requestCompaction(hr, s,
1786                    getName() + " requests major compaction; use configured priority",
1787                    this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1788              }
1789            }
1790          } catch (IOException e) {
1791            LOG.warn("Failed major compaction check on " + r, e);
1792          }
1793        }
1794      }
1795      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1796    }
1797  }
1798
1799  static class PeriodicMemStoreFlusher extends ScheduledChore {
1800    final HRegionServer server;
1801    final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1802    final static int MIN_DELAY_TIME = 0; // millisec
1803
1804    final int rangeOfDelay;
1805    public PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1806      super("MemstoreFlusherChore", server, cacheFlushInterval);
1807      this.server = server;
1808
1809      this.rangeOfDelay = this.server.conf.getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds",
1810              RANGE_OF_DELAY)*1000;
1811    }
1812
1813    @Override
1814    protected void chore() {
1815      final StringBuilder whyFlush = new StringBuilder();
1816      for (HRegion r : this.server.onlineRegions.values()) {
1817        if (r == null) continue;
1818        if (r.shouldFlush(whyFlush)) {
1819          FlushRequester requester = server.getFlushRequester();
1820          if (requester != null) {
1821            long randomDelay = (long) RandomUtils.nextInt(0, rangeOfDelay) + MIN_DELAY_TIME;
1822            //Throttle the flushes by putting a delay. If we don't throttle, and there
1823            //is a balanced write-load on the regions in a table, we might end up
1824            //overwhelming the filesystem with too many flushes at once.
1825            if (requester.requestDelayedFlush(r, randomDelay, false)) {
1826              LOG.info("{} requesting flush of {} because {} after random delay {} ms",
1827                  getName(), r.getRegionInfo().getRegionNameAsString(),  whyFlush.toString(),
1828                  randomDelay);
1829            }
1830          }
1831        }
1832      }
1833    }
1834  }
1835
1836  /**
1837   * Report the status of the server. A server is online once all the startup is
1838   * completed (setting up filesystem, starting executorService threads, etc.). This
1839   * method is designed mostly to be useful in tests.
1840   *
1841   * @return true if online, false if not.
1842   */
1843  public boolean isOnline() {
1844    return online.get();
1845  }
1846
1847  /**
1848   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1849   * be hooked up to WAL.
1850   */
1851  private void setupWALAndReplication() throws IOException {
1852    WALFactory factory = new WALFactory(conf, serverName.toString());
1853
1854    // TODO Replication make assumptions here based on the default filesystem impl
1855    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1856    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1857
1858    Path logDir = new Path(walRootDir, logName);
1859    LOG.debug("logDir={}", logDir);
1860    if (this.walFs.exists(logDir)) {
1861      throw new RegionServerRunningException(
1862          "Region server has already created directory at " + this.serverName.toString());
1863    }
1864    // Always create wal directory as now we need this when master restarts to find out the live
1865    // region servers.
1866    if (!this.walFs.mkdirs(logDir)) {
1867      throw new IOException("Can not create wal directory " + logDir);
1868    }
1869    // Instantiate replication if replication enabled. Pass it the log directories.
1870    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
1871      factory.getWALProvider());
1872    this.walFactory = factory;
1873  }
1874
1875  /**
1876   * Start up replication source and sink handlers.
1877   * @throws IOException
1878   */
1879  private void startReplicationService() throws IOException {
1880    if (this.replicationSourceHandler == this.replicationSinkHandler &&
1881        this.replicationSourceHandler != null) {
1882      this.replicationSourceHandler.startReplicationService();
1883    } else {
1884      if (this.replicationSourceHandler != null) {
1885        this.replicationSourceHandler.startReplicationService();
1886      }
1887      if (this.replicationSinkHandler != null) {
1888        this.replicationSinkHandler.startReplicationService();
1889      }
1890    }
1891  }
1892
1893
1894  public MetricsRegionServer getRegionServerMetrics() {
1895    return this.metricsRegionServer;
1896  }
1897
1898  /**
1899   * @return Master address tracker instance.
1900   */
1901  public MasterAddressTracker getMasterAddressTracker() {
1902    return this.masterAddressTracker;
1903  }
1904
1905  /*
1906   * Start maintenance Threads, Server, Worker and lease checker threads.
1907   * Start all threads we need to run. This is called after we've successfully
1908   * registered with the Master.
1909   * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1910   * get an unhandled exception. We cannot set the handler on all threads.
1911   * Server's internal Listener thread is off limits. For Server, if an OOME, it
1912   * waits a while then retries. Meantime, a flush or a compaction that tries to
1913   * run should trigger same critical condition and the shutdown will run. On
1914   * its way out, this server will shut down Server. Leases are sort of
1915   * inbetween. It has an internal thread that while it inherits from Chore, it
1916   * keeps its own internal stop mechanism so needs to be stopped by this
1917   * hosting server. Worker logs the exception and exits.
1918   */
1919  private void startServices() throws IOException {
1920    if (!isStopped() && !isAborted()) {
1921      initializeThreads();
1922    }
1923    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
1924    this.secureBulkLoadManager.start();
1925
1926    // Health checker thread.
1927    if (isHealthCheckerConfigured()) {
1928      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1929      HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1930      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1931    }
1932
1933    this.walRoller = new LogRoller(this);
1934    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1935    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1936
1937    // Create the CompactedFileDischarger chore executorService. This chore helps to
1938    // remove the compacted files that will no longer be used in reads.
1939    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1940    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1941    int cleanerInterval =
1942    conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1943    this.compactedFileDischarger =
1944    new CompactedHFilesDischarger(cleanerInterval, this, this);
1945    choreService.scheduleChore(compactedFileDischarger);
1946
1947    // Start executor services
1948    this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
1949        conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1950    this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
1951        conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1952    this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
1953        conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
1954    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1955        conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1956    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
1957        conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1958    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1959      this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1960          conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1961    }
1962    this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1963    "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1964    // Start the threads for compacted files discharger
1965    this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
1966        conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
1967    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1968      this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1969          conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1970              conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1971    }
1972    this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
1973      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
1974
1975    Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1976    uncaughtExceptionHandler);
1977    this.cacheFlusher.start(uncaughtExceptionHandler);
1978    Threads.setDaemonThreadRunning(this.procedureResultReporter,
1979      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1980
1981    if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1982    if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1983    if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1984    if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1985    if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1986    if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1987    if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore);
1988
1989    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1990    // an unhandled exception, it will just exit.
1991    Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1992    uncaughtExceptionHandler);
1993
1994    // Create the log splitting worker and start it
1995    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1996    // quite a while inside Connection layer. The worker won't be available for other
1997    // tasks even after current task is preempted after a split task times out.
1998    Configuration sinkConf = HBaseConfiguration.create(conf);
1999    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2000        conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2001    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2002        conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2003    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2004    if (this.csm != null) {
2005      // SplitLogWorker needs csm. If none, don't start this.
2006      this.splitLogWorker = new SplitLogWorker(this, sinkConf, this,
2007          this, walFactory);
2008      splitLogWorker.start();
2009    } else {
2010      LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null");
2011    }
2012
2013    // Memstore services.
2014    startHeapMemoryManager();
2015    // Call it after starting HeapMemoryManager.
2016    initializeMemStoreChunkCreator();
2017  }
2018
2019  private void initializeThreads() throws IOException {
2020    // Cache flushing thread.
2021    this.cacheFlusher = new MemStoreFlusher(conf, this);
2022
2023    // Compaction thread
2024    this.compactSplitThread = new CompactSplit(this);
2025
2026    // Background thread to check for compactions; needed if region has not gotten updates
2027    // in a while. It will take care of not checking too frequently on store-by-store basis.
2028    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2029    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2030    this.leases = new Leases(this.threadWakeFrequency);
2031
2032    // Create the thread to clean the moved regions list
2033    movedRegionsCleaner = MovedRegionsCleaner.create(this);
2034
2035    if (this.nonceManager != null) {
2036      // Create the scheduled chore that cleans up nonces.
2037      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2038    }
2039
2040    // Setup the Quota Manager
2041    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2042    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2043
2044    if (QuotaUtil.isQuotaEnabled(conf)) {
2045      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2046    }
2047
2048
2049    boolean onlyMetaRefresh = false;
2050    int storefileRefreshPeriod = conf.getInt(
2051        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2052        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2053    if (storefileRefreshPeriod == 0) {
2054      storefileRefreshPeriod = conf.getInt(
2055          StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2056          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2057      onlyMetaRefresh = true;
2058    }
2059    if (storefileRefreshPeriod > 0) {
2060      this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
2061          onlyMetaRefresh, this, this);
2062    }
2063    registerConfigurationObservers();
2064  }
2065
2066  private void registerConfigurationObservers() {
2067    // Registering the compactSplitThread object with the ConfigurationManager.
2068    configurationManager.registerObserver(this.compactSplitThread);
2069    configurationManager.registerObserver(this.rpcServices);
2070    configurationManager.registerObserver(this);
2071  }
2072
2073  /**
2074   * Puts up the webui.
2075   * @return Returns final port -- maybe different from what we started with.
2076   * @throws IOException
2077   */
2078  private int putUpWebUI() throws IOException {
2079    int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2080      HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2081    String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
2082
2083    if(this instanceof HMaster) {
2084      port = conf.getInt(HConstants.MASTER_INFO_PORT,
2085          HConstants.DEFAULT_MASTER_INFOPORT);
2086      addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
2087    }
2088    // -1 is for disabling info server
2089    if (port < 0) return port;
2090
2091    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
2092      String msg =
2093          "Failed to start http info server. Address " + addr
2094              + " does not belong to this host. Correct configuration parameter: "
2095              + "hbase.regionserver.info.bindAddress";
2096      LOG.error(msg);
2097      throw new IOException(msg);
2098    }
2099    // check if auto port bind enabled
2100    boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
2101        false);
2102    while (true) {
2103      try {
2104        this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
2105        infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet());
2106        configureInfoServer();
2107        this.infoServer.start();
2108        break;
2109      } catch (BindException e) {
2110        if (!auto) {
2111          // auto bind disabled throw BindException
2112          LOG.error("Failed binding http info server to port: " + port);
2113          throw e;
2114        }
2115        // auto bind enabled, try to use another port
2116        LOG.info("Failed binding http info server to port: " + port);
2117        port++;
2118      }
2119    }
2120    port = this.infoServer.getPort();
2121    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
2122    int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
2123      HConstants.DEFAULT_MASTER_INFOPORT);
2124    conf.setInt("hbase.master.info.port.orig", masterInfoPort);
2125    conf.setInt(HConstants.MASTER_INFO_PORT, port);
2126    return port;
2127  }
2128
2129  /*
2130   * Verify that server is healthy
2131   */
2132  private boolean isHealthy() {
2133    if (!fsOk) {
2134      // File system problem
2135      return false;
2136    }
2137    // Verify that all threads are alive
2138    boolean healthy = (this.leases == null || this.leases.isAlive())
2139        && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2140        && (this.walRoller == null || this.walRoller.isAlive())
2141        && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2142        && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2143    if (!healthy) {
2144      stop("One or more threads are no longer alive -- stop");
2145    }
2146    return healthy;
2147  }
2148
2149  @Override
2150  public List<WAL> getWALs() throws IOException {
2151    return walFactory.getWALs();
2152  }
2153
2154  @Override
2155  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2156    try {
2157      WAL wal = walFactory.getWAL(regionInfo);
2158      if (this.walRoller != null) {
2159        this.walRoller.addWAL(wal);
2160      }
2161      return wal;
2162    }catch (FailedCloseWALAfterInitializedErrorException ex) {
2163      // see HBASE-21751 for details
2164      abort("wal can not clean up after init failed", ex);
2165      throw ex;
2166    }
2167  }
2168
2169  public LogRoller getWalRoller() {
2170    return walRoller;
2171  }
2172
2173  @Override
2174  public Connection getConnection() {
2175    return getClusterConnection();
2176  }
2177
2178  @Override
2179  public ClusterConnection getClusterConnection() {
2180    return this.clusterConnection;
2181  }
2182
2183  @Override
2184  public MetaTableLocator getMetaTableLocator() {
2185    return this.metaTableLocator;
2186  }
2187
2188  @Override
2189  public void stop(final String msg) {
2190    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2191  }
2192
2193  /**
2194   * Stops the regionserver.
2195   * @param msg Status message
2196   * @param force True if this is a regionserver abort
2197   * @param user The user executing the stop request, or null if no user is associated
2198   */
2199  public void stop(final String msg, final boolean force, final User user) {
2200    if (!this.stopped) {
2201      LOG.info("***** STOPPING region server '" + this + "' *****");
2202      if (this.rsHost != null) {
2203        // when forced via abort don't allow CPs to override
2204        try {
2205          this.rsHost.preStop(msg, user);
2206        } catch (IOException ioe) {
2207          if (!force) {
2208            LOG.warn("The region server did not stop", ioe);
2209            return;
2210          }
2211          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2212        }
2213      }
2214      this.stopped = true;
2215      LOG.info("STOPPED: " + msg);
2216      // Wakes run() if it is sleeping
2217      sleeper.skipSleepCycle();
2218    }
2219  }
2220
2221  public void waitForServerOnline(){
2222    while (!isStopped() && !isOnline()) {
2223      synchronized (online) {
2224        try {
2225          online.wait(msgInterval);
2226        } catch (InterruptedException ie) {
2227          Thread.currentThread().interrupt();
2228          break;
2229        }
2230      }
2231    }
2232  }
2233
2234  @Override
2235  public void postOpenDeployTasks(final PostOpenDeployContext context)
2236      throws KeeperException, IOException {
2237    HRegion r = context.getRegion();
2238    long masterSystemTime = context.getMasterSystemTime();
2239    rpcServices.checkOpen();
2240    LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
2241    // Do checks to see if we need to compact (references or too many files)
2242    for (HStore s : r.stores.values()) {
2243      if (s.hasReferences() || s.needsCompaction()) {
2244        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2245      }
2246    }
2247    long openSeqNum = r.getOpenSeqNum();
2248    if (openSeqNum == HConstants.NO_SEQNUM) {
2249      // If we opened a region, we should have read some sequence number from it.
2250      LOG.error("No sequence number found when opening " +
2251        r.getRegionInfo().getRegionNameAsString());
2252      openSeqNum = 0;
2253    }
2254
2255    // Notify master
2256    if (!reportRegionStateTransition(new RegionStateTransitionContext(
2257        TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
2258      throw new IOException("Failed to report opened region to master: "
2259        + r.getRegionInfo().getRegionNameAsString());
2260    }
2261
2262    triggerFlushInPrimaryRegion(r);
2263
2264    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2265  }
2266
2267  @Override
2268  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2269    TransitionCode code = context.getCode();
2270    long openSeqNum = context.getOpenSeqNum();
2271    long masterSystemTime = context.getMasterSystemTime();
2272    RegionInfo[] hris = context.getHris();
2273
2274    if (TEST_SKIP_REPORTING_TRANSITION) {
2275      // This is for testing only in case there is no master
2276      // to handle the region transition report at all.
2277      if (code == TransitionCode.OPENED) {
2278        Preconditions.checkArgument(hris != null && hris.length == 1);
2279        if (hris[0].isMetaRegion()) {
2280          try {
2281            MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
2282                hris[0].getReplicaId(),State.OPEN);
2283          } catch (KeeperException e) {
2284            LOG.info("Failed to update meta location", e);
2285            return false;
2286          }
2287        } else {
2288          try {
2289            MetaTableAccessor.updateRegionLocation(clusterConnection,
2290              hris[0], serverName, openSeqNum, masterSystemTime);
2291          } catch (IOException e) {
2292            LOG.info("Failed to update meta", e);
2293            return false;
2294          }
2295        }
2296      }
2297      return true;
2298    }
2299
2300    ReportRegionStateTransitionRequest.Builder builder =
2301      ReportRegionStateTransitionRequest.newBuilder();
2302    builder.setServer(ProtobufUtil.toServerName(serverName));
2303    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2304    transition.setTransitionCode(code);
2305    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2306      transition.setOpenSeqNum(openSeqNum);
2307    }
2308    for (RegionInfo hri: hris) {
2309      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2310    }
2311    ReportRegionStateTransitionRequest request = builder.build();
2312    int tries = 0;
2313    long pauseTime = INIT_PAUSE_TIME_MS;
2314    // Keep looping till we get an error. We want to send reports even though server is going down.
2315    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2316    // HRegionServer does down.
2317    while (this.clusterConnection != null && !this.clusterConnection.isClosed()) {
2318      RegionServerStatusService.BlockingInterface rss = rssStub;
2319      try {
2320        if (rss == null) {
2321          createRegionServerStatusStub();
2322          continue;
2323        }
2324        ReportRegionStateTransitionResponse response =
2325          rss.reportRegionStateTransition(null, request);
2326        if (response.hasErrorMessage()) {
2327          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2328          break;
2329        }
2330        // Log if we had to retry else don't log unless TRACE. We want to
2331        // know if were successful after an attempt showed in logs as failed.
2332        if (tries > 0 || LOG.isTraceEnabled()) {
2333          LOG.info("TRANSITION REPORTED " + request);
2334        }
2335        // NOTE: Return mid-method!!!
2336        return true;
2337      } catch (ServiceException se) {
2338        IOException ioe = ProtobufUtil.getRemoteException(se);
2339        boolean pause =
2340            ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException
2341                || ioe instanceof CallQueueTooBigException;
2342        if (pause) {
2343          // Do backoff else we flood the Master with requests.
2344          pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
2345        } else {
2346          pauseTime = INIT_PAUSE_TIME_MS; // Reset.
2347        }
2348        LOG.info("Failed report transition " +
2349          TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" +
2350            (pause?
2351                " after " + pauseTime + "ms delay (Master is coming online...).":
2352                " immediately."),
2353            ioe);
2354        if (pause) Threads.sleep(pauseTime);
2355        tries++;
2356        if (rssStub == rss) {
2357          rssStub = null;
2358        }
2359      }
2360    }
2361    return false;
2362  }
2363
2364  /**
2365   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2366   * block this thread. See RegionReplicaFlushHandler for details.
2367   */
2368  void triggerFlushInPrimaryRegion(final HRegion region) {
2369    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2370      return;
2371    }
2372    if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2373        !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2374          region.conf)) {
2375      region.setReadsEnabled(true);
2376      return;
2377    }
2378
2379    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2380    // RegionReplicaFlushHandler might reset this.
2381
2382    // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2383    if (this.executorService != null) {
2384      this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
2385          rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2386    }
2387  }
2388
2389  @Override
2390  public RpcServerInterface getRpcServer() {
2391    return rpcServices.rpcServer;
2392  }
2393
2394  @VisibleForTesting
2395  public RSRpcServices getRSRpcServices() {
2396    return rpcServices;
2397  }
2398
2399  /**
2400   * Cause the server to exit without closing the regions it is serving, the log
2401   * it is using and without notifying the master. Used unit testing and on
2402   * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2403   *
2404   * @param reason
2405   *          the reason we are aborting
2406   * @param cause
2407   *          the exception that caused the abort, or null
2408   */
2409  @Override
2410  public void abort(String reason, Throwable cause) {
2411    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2412    if (cause != null) {
2413      LOG.error(HBaseMarkers.FATAL, msg, cause);
2414    } else {
2415      LOG.error(HBaseMarkers.FATAL, msg);
2416    }
2417    setAbortRequested();
2418    // HBASE-4014: show list of coprocessors that were loaded to help debug
2419    // regionserver crashes.Note that we're implicitly using
2420    // java.util.HashSet's toString() method to print the coprocessor names.
2421    LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " +
2422        CoprocessorHost.getLoadedCoprocessors());
2423    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2424    try {
2425      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2426    } catch (MalformedObjectNameException | IOException e) {
2427      LOG.warn("Failed dumping metrics", e);
2428    }
2429
2430    // Do our best to report our abort to the master, but this may not work
2431    try {
2432      if (cause != null) {
2433        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2434      }
2435      // Report to the master but only if we have already registered with the master.
2436      if (rssStub != null && this.serverName != null) {
2437        ReportRSFatalErrorRequest.Builder builder =
2438          ReportRSFatalErrorRequest.newBuilder();
2439        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2440        builder.setErrorMessage(msg);
2441        rssStub.reportRSFatalError(null, builder.build());
2442      }
2443    } catch (Throwable t) {
2444      LOG.warn("Unable to report fatal error to master", t);
2445    }
2446    // shutdown should be run as the internal user
2447    stop(reason, true, null);
2448  }
2449
2450  protected final void setAbortRequested() {
2451    this.abortRequested = true;
2452  }
2453
2454  /**
2455   * @see HRegionServer#abort(String, Throwable)
2456   */
2457  public void abort(String reason) {
2458    abort(reason, null);
2459  }
2460
2461  @Override
2462  public boolean isAborted() {
2463    return this.abortRequested;
2464  }
2465
2466  /*
2467   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2468   * logs but it does close socket in case want to bring up server on old
2469   * hostname+port immediately.
2470   */
2471  @VisibleForTesting
2472  protected void kill() {
2473    this.killed = true;
2474    abort("Simulated kill");
2475  }
2476
2477  /**
2478   * Called on stop/abort before closing the cluster connection and meta locator.
2479   */
2480  protected void sendShutdownInterrupt() {
2481  }
2482
2483  /**
2484   * Wait on all threads to finish. Presumption is that all closes and stops
2485   * have already been called.
2486   */
2487  protected void stopServiceThreads() {
2488    // clean up the scheduled chores
2489    if (this.choreService != null) {
2490      choreService.cancelChore(nonceManagerChore);
2491      choreService.cancelChore(compactionChecker);
2492      choreService.cancelChore(periodicFlusher);
2493      choreService.cancelChore(healthCheckChore);
2494      choreService.cancelChore(storefileRefresher);
2495      choreService.cancelChore(movedRegionsCleaner);
2496      choreService.cancelChore(fsUtilizationChore);
2497      // clean up the remaining scheduled chores (in case we missed out any)
2498      choreService.shutdown();
2499    }
2500
2501    if (this.cacheFlusher != null) {
2502      this.cacheFlusher.join();
2503    }
2504
2505    if (this.spanReceiverHost != null) {
2506      this.spanReceiverHost.closeReceivers();
2507    }
2508    if (this.walRoller != null) {
2509      this.walRoller.close();
2510    }
2511    if (this.compactSplitThread != null) {
2512      this.compactSplitThread.join();
2513    }
2514    if (this.executorService != null) this.executorService.shutdown();
2515    if (this.replicationSourceHandler != null &&
2516        this.replicationSourceHandler == this.replicationSinkHandler) {
2517      this.replicationSourceHandler.stopReplicationService();
2518    } else {
2519      if (this.replicationSourceHandler != null) {
2520        this.replicationSourceHandler.stopReplicationService();
2521      }
2522      if (this.replicationSinkHandler != null) {
2523        this.replicationSinkHandler.stopReplicationService();
2524      }
2525    }
2526  }
2527
2528  /**
2529   * @return Return the object that implements the replication
2530   * source executorService.
2531   */
2532  @VisibleForTesting
2533  public ReplicationSourceService getReplicationSourceService() {
2534    return replicationSourceHandler;
2535  }
2536
2537  /**
2538   * @return Return the object that implements the replication
2539   * sink executorService.
2540   */
2541  ReplicationSinkService getReplicationSinkService() {
2542    return replicationSinkHandler;
2543  }
2544
2545  /**
2546   * Get the current master from ZooKeeper and open the RPC connection to it.
2547   * To get a fresh connection, the current rssStub must be null.
2548   * Method will block until a master is available. You can break from this
2549   * block by requesting the server stop.
2550   *
2551   * @return master + port, or null if server has been stopped
2552   */
2553  @VisibleForTesting
2554  protected synchronized ServerName createRegionServerStatusStub() {
2555    // Create RS stub without refreshing the master node from ZK, use cached data
2556    return createRegionServerStatusStub(false);
2557  }
2558
2559  /**
2560   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2561   * connection, the current rssStub must be null. Method will block until a master is available.
2562   * You can break from this block by requesting the server stop.
2563   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2564   * @return master + port, or null if server has been stopped
2565   */
2566  @VisibleForTesting
2567  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2568    if (rssStub != null) {
2569      return masterAddressTracker.getMasterAddress();
2570    }
2571    ServerName sn = null;
2572    long previousLogTime = 0;
2573    RegionServerStatusService.BlockingInterface intRssStub = null;
2574    LockService.BlockingInterface intLockStub = null;
2575    boolean interrupted = false;
2576    try {
2577      while (keepLooping()) {
2578        sn = this.masterAddressTracker.getMasterAddress(refresh);
2579        if (sn == null) {
2580          if (!keepLooping()) {
2581            // give up with no connection.
2582            LOG.debug("No master found and cluster is stopped; bailing out");
2583            return null;
2584          }
2585          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2586            LOG.debug("No master found; retry");
2587            previousLogTime = System.currentTimeMillis();
2588          }
2589          refresh = true; // let's try pull it from ZK directly
2590          if (sleep(200)) {
2591            interrupted = true;
2592          }
2593          continue;
2594        }
2595
2596        // If we are on the active master, use the shortcut
2597        if (this instanceof HMaster && sn.equals(getServerName())) {
2598          intRssStub = ((HMaster)this).getMasterRpcServices();
2599          intLockStub = ((HMaster)this).getMasterRpcServices();
2600          break;
2601        }
2602        try {
2603          BlockingRpcChannel channel =
2604            this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2605              shortOperationTimeout);
2606          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2607          intLockStub = LockService.newBlockingStub(channel);
2608          break;
2609        } catch (IOException e) {
2610          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2611            e = e instanceof RemoteException ?
2612              ((RemoteException)e).unwrapRemoteException() : e;
2613            if (e instanceof ServerNotRunningYetException) {
2614              LOG.info("Master isn't available yet, retrying");
2615            } else {
2616              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2617            }
2618            previousLogTime = System.currentTimeMillis();
2619          }
2620          if (sleep(200)) {
2621            interrupted = true;
2622          }
2623        }
2624      }
2625    } finally {
2626      if (interrupted) {
2627        Thread.currentThread().interrupt();
2628      }
2629    }
2630    this.rssStub = intRssStub;
2631    this.lockStub = intLockStub;
2632    return sn;
2633  }
2634
2635  /**
2636   * @return True if we should break loop because cluster is going down or
2637   * this server has been stopped or hdfs has gone bad.
2638   */
2639  private boolean keepLooping() {
2640    return !this.stopped && isClusterUp();
2641  }
2642
2643  /*
2644   * Let the master know we're here Run initialization using parameters passed
2645   * us by the master.
2646   * @return A Map of key/value configurations we got from the Master else
2647   * null if we failed to register.
2648   * @throws IOException
2649   */
2650  private RegionServerStartupResponse reportForDuty() throws IOException {
2651    if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
2652    ServerName masterServerName = createRegionServerStatusStub(true);
2653    if (masterServerName == null) return null;
2654    RegionServerStartupResponse result = null;
2655    try {
2656      rpcServices.requestCount.reset();
2657      rpcServices.rpcGetRequestCount.reset();
2658      rpcServices.rpcScanRequestCount.reset();
2659      rpcServices.rpcMultiRequestCount.reset();
2660      rpcServices.rpcMutateRequestCount.reset();
2661      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2662        + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2663      long now = EnvironmentEdgeManager.currentTime();
2664      int port = rpcServices.isa.getPort();
2665      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2666      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2667        request.setUseThisHostnameInstead(useThisHostnameInstead);
2668      }
2669      request.setPort(port);
2670      request.setServerStartCode(this.startcode);
2671      request.setServerCurrentTime(now);
2672      result = this.rssStub.regionServerStartup(null, request.build());
2673    } catch (ServiceException se) {
2674      IOException ioe = ProtobufUtil.getRemoteException(se);
2675      if (ioe instanceof ClockOutOfSyncException) {
2676        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync",
2677            ioe);
2678        // Re-throw IOE will cause RS to abort
2679        throw ioe;
2680      } else if (ioe instanceof ServerNotRunningYetException) {
2681        LOG.debug("Master is not running yet");
2682      } else {
2683        LOG.warn("error telling master we are up", se);
2684      }
2685      rssStub = null;
2686    }
2687    return result;
2688  }
2689
2690  @Override
2691  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2692    try {
2693      GetLastFlushedSequenceIdRequest req =
2694          RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2695      RegionServerStatusService.BlockingInterface rss = rssStub;
2696      if (rss == null) { // Try to connect one more time
2697        createRegionServerStatusStub();
2698        rss = rssStub;
2699        if (rss == null) {
2700          // Still no luck, we tried
2701          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2702          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2703              .build();
2704        }
2705      }
2706      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2707      return RegionStoreSequenceIds.newBuilder()
2708          .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2709          .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2710    } catch (ServiceException e) {
2711      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2712      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2713          .build();
2714    }
2715  }
2716
2717  /**
2718   * Closes all regions.  Called on our way out.
2719   * Assumes that its not possible for new regions to be added to onlineRegions
2720   * while this method runs.
2721   */
2722  protected void closeAllRegions(final boolean abort) {
2723    closeUserRegions(abort);
2724    closeMetaTableRegions(abort);
2725  }
2726
2727  /**
2728   * Close meta region if we carry it
2729   * @param abort Whether we're running an abort.
2730   */
2731  void closeMetaTableRegions(final boolean abort) {
2732    HRegion meta = null;
2733    this.lock.writeLock().lock();
2734    try {
2735      for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2736        RegionInfo hri = e.getValue().getRegionInfo();
2737        if (hri.isMetaRegion()) {
2738          meta = e.getValue();
2739        }
2740        if (meta != null) break;
2741      }
2742    } finally {
2743      this.lock.writeLock().unlock();
2744    }
2745    if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2746  }
2747
2748  /**
2749   * Schedule closes on all user regions.
2750   * Should be safe calling multiple times because it wont' close regions
2751   * that are already closed or that are closing.
2752   * @param abort Whether we're running an abort.
2753   */
2754  void closeUserRegions(final boolean abort) {
2755    this.lock.writeLock().lock();
2756    try {
2757      for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2758        HRegion r = e.getValue();
2759        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2760          // Don't update zk with this close transition; pass false.
2761          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2762        }
2763      }
2764    } finally {
2765      this.lock.writeLock().unlock();
2766    }
2767  }
2768
2769  /** @return the info server */
2770  public InfoServer getInfoServer() {
2771    return infoServer;
2772  }
2773
2774  /**
2775   * @return true if a stop has been requested.
2776   */
2777  @Override
2778  public boolean isStopped() {
2779    return this.stopped;
2780  }
2781
2782  @Override
2783  public boolean isStopping() {
2784    return this.stopping;
2785  }
2786
2787  /**
2788   *
2789   * @return the configuration
2790   */
2791  @Override
2792  public Configuration getConfiguration() {
2793    return conf;
2794  }
2795
2796  /** @return the write lock for the server */
2797  ReentrantReadWriteLock.WriteLock getWriteLock() {
2798    return lock.writeLock();
2799  }
2800
2801  public int getNumberOfOnlineRegions() {
2802    return this.onlineRegions.size();
2803  }
2804
2805  boolean isOnlineRegionsEmpty() {
2806    return this.onlineRegions.isEmpty();
2807  }
2808
2809  /**
2810   * For tests, web ui and metrics.
2811   * This method will only work if HRegionServer is in the same JVM as client;
2812   * HRegion cannot be serialized to cross an rpc.
2813   */
2814  public Collection<HRegion> getOnlineRegionsLocalContext() {
2815    Collection<HRegion> regions = this.onlineRegions.values();
2816    return Collections.unmodifiableCollection(regions);
2817  }
2818
2819  @Override
2820  public void addRegion(HRegion region) {
2821    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2822    configurationManager.registerObserver(region);
2823  }
2824
2825  /**
2826   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2827   *   the biggest.  If two regions are the same size, then the last one found wins; i.e. this
2828   *   method may NOT return all regions.
2829   */
2830  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2831    // we'll sort the regions in reverse
2832    SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(
2833        new Comparator<Long>() {
2834          @Override
2835          public int compare(Long a, Long b) {
2836            return -1 * a.compareTo(b);
2837          }
2838        });
2839    // Copy over all regions. Regions are sorted by size with biggest first.
2840    for (HRegion region : this.onlineRegions.values()) {
2841      sortedRegions.put(region.getMemStoreOffHeapSize(), region);
2842    }
2843    return sortedRegions;
2844  }
2845
2846  /**
2847   * @return A new Map of online regions sorted by region heap size with the first entry being the
2848   *   biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2849   *   may NOT return all regions.
2850   */
2851  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2852    // we'll sort the regions in reverse
2853    SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(
2854        new Comparator<Long>() {
2855          @Override
2856          public int compare(Long a, Long b) {
2857            return -1 * a.compareTo(b);
2858          }
2859        });
2860    // Copy over all regions. Regions are sorted by size with biggest first.
2861    for (HRegion region : this.onlineRegions.values()) {
2862      sortedRegions.put(region.getMemStoreHeapSize(), region);
2863    }
2864    return sortedRegions;
2865  }
2866
2867  /**
2868   * @return time stamp in millis of when this region server was started
2869   */
2870  public long getStartcode() {
2871    return this.startcode;
2872  }
2873
2874  /** @return reference to FlushRequester */
2875  @Override
2876  public FlushRequester getFlushRequester() {
2877    return this.cacheFlusher;
2878  }
2879
2880  @Override
2881  public CompactionRequester getCompactionRequestor() {
2882    return this.compactSplitThread;
2883  }
2884
2885  @Override
2886  public Leases getLeases() {
2887    return leases;
2888  }
2889
2890  /**
2891   * @return Return the rootDir.
2892   */
2893  protected Path getRootDir() {
2894    return rootDir;
2895  }
2896
2897  /**
2898   * @return Return the fs.
2899   */
2900  @Override
2901  public FileSystem getFileSystem() {
2902    return fs;
2903  }
2904
2905  /**
2906   * @return Return the walRootDir.
2907   */
2908  public Path getWALRootDir() {
2909    return walRootDir;
2910  }
2911
2912  /**
2913   * @return Return the walFs.
2914   */
2915  public FileSystem getWALFileSystem() {
2916    return walFs;
2917  }
2918
2919  @Override
2920  public String toString() {
2921    return getServerName().toString();
2922  }
2923
2924  /**
2925   * Interval at which threads should run
2926   *
2927   * @return the interval
2928   */
2929  public int getThreadWakeFrequency() {
2930    return threadWakeFrequency;
2931  }
2932
2933  @Override
2934  public ZKWatcher getZooKeeper() {
2935    return zooKeeper;
2936  }
2937
2938  @Override
2939  public CoordinatedStateManager getCoordinatedStateManager() {
2940    return csm;
2941  }
2942
2943  @Override
2944  public ServerName getServerName() {
2945    return serverName;
2946  }
2947
2948  public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2949    return this.rsHost;
2950  }
2951
2952  @Override
2953  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2954    return this.regionsInTransitionInRS;
2955  }
2956
2957  @Override
2958  public ExecutorService getExecutorService() {
2959    return executorService;
2960  }
2961
2962  @Override
2963  public ChoreService getChoreService() {
2964    return choreService;
2965  }
2966
2967  @Override
2968  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2969    return rsQuotaManager;
2970  }
2971
2972  //
2973  // Main program and support routines
2974  //
2975  /**
2976   * Load the replication executorService objects, if any
2977   */
2978  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2979      FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
2980    if ((server instanceof HMaster) &&
2981      (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
2982      return;
2983    }
2984
2985    // read in the name of the source replication class from the config file.
2986    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2987      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2988
2989    // read in the name of the sink replication class from the config file.
2990    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2991      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2992
2993    // If both the sink and the source class names are the same, then instantiate
2994    // only one object.
2995    if (sourceClassname.equals(sinkClassname)) {
2996      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2997        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
2998      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2999    } else {
3000      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
3001        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
3002      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
3003        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
3004    }
3005  }
3006
3007  private static <T extends ReplicationService> T newReplicationInstance(String classname,
3008      Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
3009      Path oldLogDir, WALProvider walProvider) throws IOException {
3010    Class<? extends T> clazz = null;
3011    try {
3012      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
3013      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
3014    } catch (java.lang.ClassNotFoundException nfe) {
3015      throw new IOException("Could not find class for " + classname);
3016    }
3017    T service = ReflectionUtils.newInstance(clazz, conf);
3018    service.initialize(server, walFs, logDir, oldLogDir, walProvider);
3019    return service;
3020  }
3021
3022  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
3023    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
3024    if(!this.isOnline()){
3025      return walGroupsReplicationStatus;
3026    }
3027    List<ReplicationSourceInterface> allSources = new ArrayList<>();
3028    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
3029    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
3030    for(ReplicationSourceInterface source: allSources){
3031      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
3032    }
3033    return walGroupsReplicationStatus;
3034  }
3035
3036  /**
3037   * Utility for constructing an instance of the passed HRegionServer class.
3038   *
3039   * @param regionServerClass
3040   * @param conf2
3041   * @return HRegionServer instance.
3042   */
3043  public static HRegionServer constructRegionServer(
3044      Class<? extends HRegionServer> regionServerClass,
3045      final Configuration conf2) {
3046    try {
3047      Constructor<? extends HRegionServer> c = regionServerClass
3048          .getConstructor(Configuration.class);
3049      return c.newInstance(conf2);
3050    } catch (Exception e) {
3051      throw new RuntimeException("Failed construction of " + "Regionserver: "
3052          + regionServerClass.toString(), e);
3053    }
3054  }
3055
3056  /**
3057   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
3058   */
3059  public static void main(String[] args) throws Exception {
3060    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
3061    VersionInfo.logVersion();
3062    Configuration conf = HBaseConfiguration.create();
3063    @SuppressWarnings("unchecked")
3064    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
3065        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
3066
3067    new HRegionServerCommandLine(regionServerClass).doMain(args);
3068  }
3069
3070  /**
3071   * Gets the online regions of the specified table.
3072   * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
3073   * Only returns <em>online</em> regions.  If a region on this table has been
3074   * closed during a disable, etc., it will not be included in the returned list.
3075   * So, the returned list may not necessarily be ALL regions in this table, its
3076   * all the ONLINE regions in the table.
3077   * @param tableName
3078   * @return Online regions from <code>tableName</code>
3079   */
3080  @Override
3081  public List<HRegion> getRegions(TableName tableName) {
3082     List<HRegion> tableRegions = new ArrayList<>();
3083     synchronized (this.onlineRegions) {
3084       for (HRegion region: this.onlineRegions.values()) {
3085         RegionInfo regionInfo = region.getRegionInfo();
3086         if(regionInfo.getTable().equals(tableName)) {
3087           tableRegions.add(region);
3088         }
3089       }
3090     }
3091     return tableRegions;
3092   }
3093
3094  @Override
3095  public List<HRegion> getRegions() {
3096    List<HRegion> allRegions = new ArrayList<>();
3097    synchronized (this.onlineRegions) {
3098      // Return a clone copy of the onlineRegions
3099      allRegions.addAll(onlineRegions.values());
3100    }
3101    return allRegions;
3102  }
3103
3104  /**
3105   * Gets the online tables in this RS.
3106   * This method looks at the in-memory onlineRegions.
3107   * @return all the online tables in this RS
3108   */
3109  public Set<TableName> getOnlineTables() {
3110    Set<TableName> tables = new HashSet<>();
3111    synchronized (this.onlineRegions) {
3112      for (Region region: this.onlineRegions.values()) {
3113        tables.add(region.getTableDescriptor().getTableName());
3114      }
3115    }
3116    return tables;
3117  }
3118
3119  // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
3120  public String[] getRegionServerCoprocessors() {
3121    TreeSet<String> coprocessors = new TreeSet<>();
3122    try {
3123      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3124    } catch (IOException exception) {
3125      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
3126          "skipping.");
3127      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3128    }
3129    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3130    for (HRegion region: regions) {
3131      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3132      try {
3133        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3134      } catch (IOException exception) {
3135        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
3136            "; skipping.");
3137        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3138      }
3139    }
3140    coprocessors.addAll(rsHost.getCoprocessors());
3141    return coprocessors.toArray(new String[coprocessors.size()]);
3142  }
3143
3144  /**
3145   * Try to close the region, logs a warning on failure but continues.
3146   * @param region Region to close
3147   */
3148  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3149    try {
3150      if (!closeRegion(region.getEncodedName(), abort, null)) {
3151        LOG.warn("Failed to close " + region.getRegionNameAsString() +
3152            " - ignoring and continuing");
3153      }
3154    } catch (IOException e) {
3155      LOG.warn("Failed to close " + region.getRegionNameAsString() +
3156          " - ignoring and continuing", e);
3157    }
3158  }
3159
3160  /**
3161   * Close asynchronously a region, can be called from the master or internally by the regionserver
3162   * when stopping. If called from the master, the region will update the znode status.
3163   *
3164   * <p>
3165   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3166   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3167   * </p>
3168
3169   * <p>
3170   *   If a close was in progress, this new request will be ignored, and an exception thrown.
3171   * </p>
3172   *
3173   * @param encodedName Region to close
3174   * @param abort True if we are aborting
3175   * @return True if closed a region.
3176   * @throws NotServingRegionException if the region is not online
3177   */
3178  protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn)
3179      throws NotServingRegionException {
3180    //Check for permissions to close.
3181    HRegion actualRegion = this.getRegion(encodedName);
3182    // Can be null if we're calling close on a region that's not online
3183    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3184      try {
3185        actualRegion.getCoprocessorHost().preClose(false);
3186      } catch (IOException exp) {
3187        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3188        return false;
3189      }
3190    }
3191
3192    final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName),
3193        Boolean.FALSE);
3194
3195    if (Boolean.TRUE.equals(previous)) {
3196      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
3197          "trying to OPEN. Cancelling OPENING.");
3198      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3199        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3200        // We're going to try to do a standard close then.
3201        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
3202            " Doing a standard close now");
3203        return closeRegion(encodedName, abort, sn);
3204      }
3205      // Let's get the region from the online region list again
3206      actualRegion = this.getRegion(encodedName);
3207      if (actualRegion == null) { // If already online, we still need to close it.
3208        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3209        // The master deletes the znode when it receives this exception.
3210        throw new NotServingRegionException("The region " + encodedName +
3211          " was opening but not yet served. Opening is cancelled.");
3212      }
3213    } else if (Boolean.FALSE.equals(previous)) {
3214      LOG.info("Received CLOSE for the region: " + encodedName +
3215        ", which we are already trying to CLOSE, but not completed yet");
3216      return true;
3217    }
3218
3219    if (actualRegion == null) {
3220      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3221      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3222      // The master deletes the znode when it receives this exception.
3223      throw new NotServingRegionException("The region " + encodedName +
3224          " is not online, and is not opening.");
3225    }
3226
3227    CloseRegionHandler crh;
3228    final RegionInfo hri = actualRegion.getRegionInfo();
3229    if (hri.isMetaRegion()) {
3230      crh = new CloseMetaHandler(this, this, hri, abort);
3231    } else {
3232      crh = new CloseRegionHandler(this, this, hri, abort, sn);
3233    }
3234    this.executorService.submit(crh);
3235    return true;
3236  }
3237
3238  /**
3239   * Close and offline the region for split or merge
3240   *
3241   * @param regionEncodedName the name of the region(s) to close
3242   * @return true if closed the region successfully.
3243   * @throws IOException
3244  */
3245  protected boolean closeAndOfflineRegionForSplitOrMerge(final List<String> regionEncodedName)
3246      throws IOException {
3247    for (int i = 0; i < regionEncodedName.size(); ++i) {
3248      HRegion regionToClose = this.getRegion(regionEncodedName.get(i));
3249      if (regionToClose != null) {
3250        Map<byte[], List<HStoreFile>> hstoreFiles = null;
3251        Exception exceptionToThrow = null;
3252        try {
3253          hstoreFiles = regionToClose.close(false);
3254        } catch (Exception e) {
3255          exceptionToThrow = e;
3256        }
3257        if (exceptionToThrow == null && hstoreFiles == null) {
3258          // The region was closed by someone else
3259          exceptionToThrow =
3260            new IOException("Failed to close region: already closed by another thread");
3261        }
3262        if (exceptionToThrow != null) {
3263          if (exceptionToThrow instanceof IOException) {
3264            throw (IOException) exceptionToThrow;
3265          }
3266          throw new IOException(exceptionToThrow);
3267        }
3268        // Offline the region
3269        this.removeRegion(regionToClose, null);
3270      }
3271    }
3272    return true;
3273  }
3274
3275   /**
3276   * @return HRegion for the passed binary <code>regionName</code> or null if
3277   *         named region is not member of the online regions.
3278   */
3279  public HRegion getOnlineRegion(final byte[] regionName) {
3280    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3281    return this.onlineRegions.get(encodedRegionName);
3282  }
3283
3284  public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
3285    return this.regionFavoredNodesMap.get(encodedRegionName);
3286  }
3287
3288  @Override
3289  public HRegion getRegion(final String encodedRegionName) {
3290    return this.onlineRegions.get(encodedRegionName);
3291  }
3292
3293
3294  @Override
3295  public boolean removeRegion(final HRegion r, ServerName destination) {
3296    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3297    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3298    if (destination != null) {
3299      long closeSeqNum = r.getMaxFlushedSeqId();
3300      if (closeSeqNum == HConstants.NO_SEQNUM) {
3301        // No edits in WAL for this region; get the sequence number when the region was opened.
3302        closeSeqNum = r.getOpenSeqNum();
3303        if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
3304      }
3305      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
3306    }
3307    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3308    return toReturn != null;
3309  }
3310
3311  /**
3312   * Protected Utility method for safely obtaining an HRegion handle.
3313   *
3314   * @param regionName
3315   *          Name of online {@link HRegion} to return
3316   * @return {@link HRegion} for <code>regionName</code>
3317   * @throws NotServingRegionException
3318   */
3319  protected HRegion getRegion(final byte[] regionName)
3320      throws NotServingRegionException {
3321    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3322    return getRegionByEncodedName(regionName, encodedRegionName);
3323  }
3324
3325  public HRegion getRegionByEncodedName(String encodedRegionName)
3326      throws NotServingRegionException {
3327    return getRegionByEncodedName(null, encodedRegionName);
3328  }
3329
3330  protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3331    throws NotServingRegionException {
3332    HRegion region = this.onlineRegions.get(encodedRegionName);
3333    if (region == null) {
3334      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3335      if (moveInfo != null) {
3336        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3337      }
3338      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3339      String regionNameStr = regionName == null?
3340        encodedRegionName: Bytes.toStringBinary(regionName);
3341      if (isOpening != null && isOpening.booleanValue()) {
3342        throw new RegionOpeningException("Region " + regionNameStr +
3343          " is opening on " + this.serverName);
3344      }
3345      throw new NotServingRegionException("" + regionNameStr +
3346        " is not online on " + this.serverName);
3347    }
3348    return region;
3349  }
3350
3351  /*
3352   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3353   * IOE if it isn't already.
3354   *
3355   * @param t Throwable
3356   *
3357   * @param msg Message to log in error. Can be null.
3358   *
3359   * @return Throwable converted to an IOE; methods can only let out IOEs.
3360   */
3361  private Throwable cleanup(final Throwable t, final String msg) {
3362    // Don't log as error if NSRE; NSRE is 'normal' operation.
3363    if (t instanceof NotServingRegionException) {
3364      LOG.debug("NotServingRegionException; " + t.getMessage());
3365      return t;
3366    }
3367    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3368    if (msg == null) {
3369      LOG.error("", e);
3370    } else {
3371      LOG.error(msg, e);
3372    }
3373    if (!rpcServices.checkOOME(t)) {
3374      checkFileSystem();
3375    }
3376    return t;
3377  }
3378
3379  /*
3380   * @param t
3381   *
3382   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3383   *
3384   * @return Make <code>t</code> an IOE if it isn't already.
3385   */
3386  protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
3387    return (t instanceof IOException ? (IOException) t : msg == null
3388        || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3389  }
3390
3391  /**
3392   * Checks to see if the file system is still accessible. If not, sets
3393   * abortRequested and stopRequested
3394   *
3395   * @return false if file system is not available
3396   */
3397  public boolean checkFileSystem() {
3398    if (this.fsOk && this.fs != null) {
3399      try {
3400        FSUtils.checkFileSystemAvailable(this.fs);
3401      } catch (IOException e) {
3402        abort("File System not available", e);
3403        this.fsOk = false;
3404      }
3405    }
3406    return this.fsOk;
3407  }
3408
3409  @Override
3410  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3411      List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3412    InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3413    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3414    // it is a map of region name to InetSocketAddress[]
3415    for (int i = 0; i < favoredNodes.size(); i++) {
3416      addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3417          favoredNodes.get(i).getPort());
3418    }
3419    regionFavoredNodesMap.put(encodedRegionName, addr);
3420  }
3421
3422  /**
3423   * Return the favored nodes for a region given its encoded name. Look at the
3424   * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3425   * @param encodedRegionName
3426   * @return array of favored locations
3427   */
3428  @Override
3429  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3430    return regionFavoredNodesMap.get(encodedRegionName);
3431  }
3432
3433  @Override
3434  public ServerNonceManager getNonceManager() {
3435    return this.nonceManager;
3436  }
3437
3438  private static class MovedRegionInfo {
3439    private final ServerName serverName;
3440    private final long seqNum;
3441    private final long ts;
3442
3443    public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3444      this.serverName = serverName;
3445      this.seqNum = closeSeqNum;
3446      ts = EnvironmentEdgeManager.currentTime();
3447     }
3448
3449    public ServerName getServerName() {
3450      return serverName;
3451    }
3452
3453    public long getSeqNum() {
3454      return seqNum;
3455    }
3456
3457    public long getMoveTime() {
3458      return ts;
3459    }
3460  }
3461
3462  // This map will contains all the regions that we closed for a move.
3463  // We add the time it was moved as we don't want to keep too old information
3464  protected Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000);
3465
3466  // We need a timeout. If not there is a risk of giving a wrong information: this would double
3467  //  the number of network calls instead of reducing them.
3468  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3469
3470  protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3471    if (ServerName.isSameAddress(destination, this.getServerName())) {
3472      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3473      return;
3474    }
3475    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" +
3476        closeSeqNum);
3477    movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3478  }
3479
3480  void removeFromMovedRegions(String encodedName) {
3481    movedRegions.remove(encodedName);
3482  }
3483
3484  private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3485    MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3486
3487    long now = EnvironmentEdgeManager.currentTime();
3488    if (dest != null) {
3489      if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3490        return dest;
3491      } else {
3492        movedRegions.remove(encodedRegionName);
3493      }
3494    }
3495
3496    return null;
3497  }
3498
3499  /**
3500   * Remove the expired entries from the moved regions list.
3501   */
3502  protected void cleanMovedRegions() {
3503    final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3504    Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3505
3506    while (it.hasNext()){
3507      Map.Entry<String, MovedRegionInfo> e = it.next();
3508      if (e.getValue().getMoveTime() < cutOff) {
3509        it.remove();
3510      }
3511    }
3512  }
3513
3514  /*
3515   * Use this to allow tests to override and schedule more frequently.
3516   */
3517
3518  protected int movedRegionCleanerPeriod() {
3519        return TIMEOUT_REGION_MOVED;
3520  }
3521
3522  /**
3523   * Creates a Chore thread to clean the moved region cache.
3524   */
3525  protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3526    private HRegionServer regionServer;
3527    Stoppable stoppable;
3528
3529    private MovedRegionsCleaner(
3530      HRegionServer regionServer, Stoppable stoppable){
3531      super("MovedRegionsCleaner for region " + regionServer, stoppable,
3532          regionServer.movedRegionCleanerPeriod());
3533      this.regionServer = regionServer;
3534      this.stoppable = stoppable;
3535    }
3536
3537    static MovedRegionsCleaner create(HRegionServer rs){
3538      Stoppable stoppable = new Stoppable() {
3539        private volatile boolean isStopped = false;
3540        @Override public void stop(String why) { isStopped = true;}
3541        @Override public boolean isStopped() {return isStopped;}
3542      };
3543
3544      return new MovedRegionsCleaner(rs, stoppable);
3545    }
3546
3547    @Override
3548    protected void chore() {
3549      regionServer.cleanMovedRegions();
3550    }
3551
3552    @Override
3553    public void stop(String why) {
3554      stoppable.stop(why);
3555    }
3556
3557    @Override
3558    public boolean isStopped() {
3559      return stoppable.isStopped();
3560    }
3561  }
3562
3563  private String getMyEphemeralNodePath() {
3564    return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString());
3565  }
3566
3567  private boolean isHealthCheckerConfigured() {
3568    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3569    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3570  }
3571
3572  /**
3573   * @return the underlying {@link CompactSplit} for the servers
3574   */
3575  public CompactSplit getCompactSplitThread() {
3576    return this.compactSplitThread;
3577  }
3578
3579  public CoprocessorServiceResponse execRegionServerService(
3580      @SuppressWarnings("UnusedParameters") final RpcController controller,
3581      final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3582    try {
3583      ServerRpcController serviceController = new ServerRpcController();
3584      CoprocessorServiceCall call = serviceRequest.getCall();
3585      String serviceName = call.getServiceName();
3586      com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);
3587      if (service == null) {
3588        throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
3589            serviceName);
3590      }
3591      com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
3592          service.getDescriptorForType();
3593
3594      String methodName = call.getMethodName();
3595      com.google.protobuf.Descriptors.MethodDescriptor methodDesc =
3596          serviceDesc.findMethodByName(methodName);
3597      if (methodDesc == null) {
3598        throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName +
3599            " called on executorService " + serviceName);
3600      }
3601
3602      com.google.protobuf.Message request =
3603          CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3604      final com.google.protobuf.Message.Builder responseBuilder =
3605          service.getResponsePrototype(methodDesc).newBuilderForType();
3606      service.callMethod(methodDesc, serviceController, request,
3607          new com.google.protobuf.RpcCallback<com.google.protobuf.Message>() {
3608        @Override
3609        public void run(com.google.protobuf.Message message) {
3610          if (message != null) {
3611            responseBuilder.mergeFrom(message);
3612          }
3613        }
3614      });
3615      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3616      if (exception != null) {
3617        throw exception;
3618      }
3619      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3620    } catch (IOException ie) {
3621      throw new ServiceException(ie);
3622    }
3623  }
3624
3625  /**
3626   * @return The cache config instance used by the regionserver.
3627   */
3628  public CacheConfig getCacheConfig() {
3629    return this.cacheConfig;
3630  }
3631
3632  /**
3633   * @return : Returns the ConfigurationManager object for testing purposes.
3634   */
3635  protected ConfigurationManager getConfigurationManager() {
3636    return configurationManager;
3637  }
3638
3639  /**
3640   * @return Return table descriptors implementation.
3641   */
3642  public TableDescriptors getTableDescriptors() {
3643    return this.tableDescriptors;
3644  }
3645
3646  /**
3647   * Reload the configuration from disk.
3648   */
3649  public void updateConfiguration() {
3650    LOG.info("Reloading the configuration from disk.");
3651    // Reload the configuration from disk.
3652    conf.reloadConfiguration();
3653    configurationManager.notifyAllObservers(conf);
3654  }
3655
3656  public CacheEvictionStats clearRegionBlockCache(Region region) {
3657    BlockCache blockCache = this.getCacheConfig().getBlockCache();
3658    long evictedBlocks = 0;
3659
3660    for(Store store : region.getStores()) {
3661      for(StoreFile hFile : store.getStorefiles()) {
3662        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3663      }
3664    }
3665
3666    return CacheEvictionStats.builder()
3667        .withEvictedBlocks(evictedBlocks)
3668        .build();
3669  }
3670
3671  @Override
3672  public double getCompactionPressure() {
3673    double max = 0;
3674    for (Region region : onlineRegions.values()) {
3675      for (Store store : region.getStores()) {
3676        double normCount = store.getCompactionPressure();
3677        if (normCount > max) {
3678          max = normCount;
3679        }
3680      }
3681    }
3682    return max;
3683  }
3684
3685  @Override
3686  public HeapMemoryManager getHeapMemoryManager() {
3687    return hMemManager;
3688  }
3689
3690  /**
3691   * For testing
3692   * @return whether all wal roll request finished for this regionserver
3693   */
3694  @VisibleForTesting
3695  public boolean walRollRequestFinished() {
3696    return this.walRoller.walRollFinished();
3697  }
3698
3699  @Override
3700  public ThroughputController getFlushThroughputController() {
3701    return flushThroughputController;
3702  }
3703
3704  @Override
3705  public double getFlushPressure() {
3706    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3707      // return 0 during RS initialization
3708      return 0.0;
3709    }
3710    return getRegionServerAccounting().getFlushPressure();
3711  }
3712
3713  @Override
3714  public void onConfigurationChange(Configuration newConf) {
3715    ThroughputController old = this.flushThroughputController;
3716    if (old != null) {
3717      old.stop("configuration change");
3718    }
3719    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3720    try {
3721      Superusers.initialize(newConf);
3722    } catch (IOException e) {
3723      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3724    }
3725  }
3726
3727  @Override
3728  public MetricsRegionServer getMetrics() {
3729    return metricsRegionServer;
3730  }
3731
3732  @Override
3733  public SecureBulkLoadManager getSecureBulkLoadManager() {
3734    return this.secureBulkLoadManager;
3735  }
3736
3737  @Override
3738  public EntityLock regionLock(List<RegionInfo> regionInfos, String description,
3739      Abortable abort) throws IOException {
3740    return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
3741      .regionLock(regionInfos, description, abort);
3742  }
3743
3744  @Override
3745  public void unassign(byte[] regionName) throws IOException {
3746    clusterConnection.getAdmin().unassign(regionName, false);
3747  }
3748
3749  @Override
3750  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3751    return this.rsSpaceQuotaManager;
3752  }
3753
3754  @Override
3755  public boolean reportFileArchivalForQuotas(TableName tableName,
3756      Collection<Entry<String, Long>> archivedFiles) {
3757    RegionServerStatusService.BlockingInterface rss = rssStub;
3758    if (rss == null || rsSpaceQuotaManager == null) {
3759      // the current server could be stopping.
3760      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3761      return false;
3762    }
3763    try {
3764      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3765          rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3766      rss.reportFileArchival(null, request);
3767    } catch (ServiceException se) {
3768      IOException ioe = ProtobufUtil.getRemoteException(se);
3769      if (ioe instanceof PleaseHoldException) {
3770        if (LOG.isTraceEnabled()) {
3771          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3772              + " This will be retried.", ioe);
3773        }
3774        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3775        return false;
3776      }
3777      if (rssStub == rss) {
3778        rssStub = null;
3779      }
3780      // re-create the stub if we failed to report the archival
3781      createRegionServerStatusStub(true);
3782      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3783      return false;
3784    }
3785    return true;
3786  }
3787
3788  public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
3789    return eventLoopGroupConfig;
3790  }
3791
3792  @Override
3793  public Connection createConnection(Configuration conf) throws IOException {
3794    User user = UserProvider.instantiate(conf).getCurrent();
3795    return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName,
3796        this.rpcServices, this.rpcServices);
3797  }
3798
3799  public void executeProcedure(long procId, RSProcedureCallable callable) {
3800    executorService.submit(new RSProcedureHandler(this, procId, callable));
3801  }
3802
3803  public void remoteProcedureComplete(long procId, Throwable error) {
3804    procedureResultReporter.complete(procId, error);
3805  }
3806
3807  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3808    RegionServerStatusService.BlockingInterface rss = rssStub;
3809    for (;;) {
3810      rss = rssStub;
3811      if (rss != null) {
3812        break;
3813      }
3814      createRegionServerStatusStub();
3815    }
3816    try {
3817      rss.reportProcedureDone(null, request);
3818    } catch (ServiceException se) {
3819      if (rssStub == rss) {
3820        rssStub = null;
3821      }
3822      throw ProtobufUtil.getRemoteException(se);
3823    }
3824  }
3825
3826  public boolean isShutDown() {
3827    return shutDown;
3828  }
3829
3830  /**
3831   * Force to terminate region server when abort timeout.
3832   */
3833  private static class SystemExitWhenAbortTimeout extends TimerTask {
3834
3835    public SystemExitWhenAbortTimeout() {
3836
3837    }
3838
3839    @Override
3840    public void run() {
3841      LOG.warn("Aborting region server timed out, terminating forcibly" +
3842          " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3843          " Thread dump to stdout.");
3844      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3845      Runtime.getRuntime().halt(1);
3846    }
3847  }
3848}