001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
034
035import io.opentelemetry.api.trace.Span;
036import io.opentelemetry.api.trace.StatusCode;
037import io.opentelemetry.context.Scope;
038import java.io.IOException;
039import java.io.PrintWriter;
040import java.lang.management.MemoryUsage;
041import java.lang.reflect.Constructor;
042import java.net.InetSocketAddress;
043import java.time.Duration;
044import java.util.ArrayList;
045import java.util.Collection;
046import java.util.Collections;
047import java.util.Comparator;
048import java.util.HashSet;
049import java.util.Iterator;
050import java.util.List;
051import java.util.Map;
052import java.util.Map.Entry;
053import java.util.Objects;
054import java.util.Optional;
055import java.util.Set;
056import java.util.SortedMap;
057import java.util.Timer;
058import java.util.TimerTask;
059import java.util.TreeMap;
060import java.util.TreeSet;
061import java.util.concurrent.ConcurrentHashMap;
062import java.util.concurrent.ConcurrentMap;
063import java.util.concurrent.ConcurrentSkipListMap;
064import java.util.concurrent.ThreadLocalRandom;
065import java.util.concurrent.TimeUnit;
066import java.util.concurrent.atomic.AtomicBoolean;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.function.Consumer;
069import java.util.stream.Collectors;
070import javax.management.MalformedObjectNameException;
071import javax.servlet.http.HttpServlet;
072import org.apache.commons.lang3.StringUtils;
073import org.apache.commons.lang3.mutable.MutableFloat;
074import org.apache.hadoop.conf.Configuration;
075import org.apache.hadoop.fs.FileSystem;
076import org.apache.hadoop.fs.Path;
077import org.apache.hadoop.hbase.Abortable;
078import org.apache.hadoop.hbase.CacheEvictionStats;
079import org.apache.hadoop.hbase.CallQueueTooBigException;
080import org.apache.hadoop.hbase.ClockOutOfSyncException;
081import org.apache.hadoop.hbase.DoNotRetryIOException;
082import org.apache.hadoop.hbase.ExecutorStatusChore;
083import org.apache.hadoop.hbase.HBaseConfiguration;
084import org.apache.hadoop.hbase.HBaseInterfaceAudience;
085import org.apache.hadoop.hbase.HBaseServerBase;
086import org.apache.hadoop.hbase.HConstants;
087import org.apache.hadoop.hbase.HDFSBlocksDistribution;
088import org.apache.hadoop.hbase.HRegionLocation;
089import org.apache.hadoop.hbase.HealthCheckChore;
090import org.apache.hadoop.hbase.MetaTableAccessor;
091import org.apache.hadoop.hbase.NotServingRegionException;
092import org.apache.hadoop.hbase.PleaseHoldException;
093import org.apache.hadoop.hbase.ScheduledChore;
094import org.apache.hadoop.hbase.ServerName;
095import org.apache.hadoop.hbase.Stoppable;
096import org.apache.hadoop.hbase.TableName;
097import org.apache.hadoop.hbase.YouAreDeadException;
098import org.apache.hadoop.hbase.ZNodeClearer;
099import org.apache.hadoop.hbase.client.ConnectionUtils;
100import org.apache.hadoop.hbase.client.RegionInfo;
101import org.apache.hadoop.hbase.client.RegionInfoBuilder;
102import org.apache.hadoop.hbase.client.locking.EntityLock;
103import org.apache.hadoop.hbase.client.locking.LockServiceClient;
104import org.apache.hadoop.hbase.conf.ConfigurationObserver;
105import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
106import org.apache.hadoop.hbase.exceptions.RegionMovedException;
107import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
108import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
109import org.apache.hadoop.hbase.executor.ExecutorType;
110import org.apache.hadoop.hbase.http.InfoServer;
111import org.apache.hadoop.hbase.io.hfile.BlockCache;
112import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
113import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
114import org.apache.hadoop.hbase.io.hfile.HFile;
115import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
116import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
117import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
118import org.apache.hadoop.hbase.ipc.RpcClient;
119import org.apache.hadoop.hbase.ipc.RpcServer;
120import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
121import org.apache.hadoop.hbase.ipc.ServerRpcController;
122import org.apache.hadoop.hbase.log.HBaseMarkers;
123import org.apache.hadoop.hbase.mob.MobFileCache;
124import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
125import org.apache.hadoop.hbase.monitoring.TaskMonitor;
126import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
127import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
128import org.apache.hadoop.hbase.net.Address;
129import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
130import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
131import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
132import org.apache.hadoop.hbase.quotas.QuotaUtil;
133import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
134import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
135import org.apache.hadoop.hbase.quotas.RegionSize;
136import org.apache.hadoop.hbase.quotas.RegionSizeStore;
137import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
138import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
139import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
140import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
141import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
142import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
143import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
144import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
145import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
146import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
147import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
148import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
149import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
150import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
151import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
152import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
153import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
154import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
155import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
156import org.apache.hadoop.hbase.security.SecurityConstants;
157import org.apache.hadoop.hbase.security.Superusers;
158import org.apache.hadoop.hbase.security.User;
159import org.apache.hadoop.hbase.security.UserProvider;
160import org.apache.hadoop.hbase.trace.TraceUtil;
161import org.apache.hadoop.hbase.util.Bytes;
162import org.apache.hadoop.hbase.util.CompressionTest;
163import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
164import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
165import org.apache.hadoop.hbase.util.FSUtils;
166import org.apache.hadoop.hbase.util.FutureUtils;
167import org.apache.hadoop.hbase.util.JvmPauseMonitor;
168import org.apache.hadoop.hbase.util.Pair;
169import org.apache.hadoop.hbase.util.RetryCounter;
170import org.apache.hadoop.hbase.util.RetryCounterFactory;
171import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
172import org.apache.hadoop.hbase.util.Threads;
173import org.apache.hadoop.hbase.util.VersionInfo;
174import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
175import org.apache.hadoop.hbase.wal.WAL;
176import org.apache.hadoop.hbase.wal.WALFactory;
177import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
178import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
179import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
180import org.apache.hadoop.hbase.zookeeper.ZKUtil;
181import org.apache.hadoop.ipc.RemoteException;
182import org.apache.hadoop.util.ReflectionUtils;
183import org.apache.yetus.audience.InterfaceAudience;
184import org.apache.zookeeper.KeeperException;
185import org.slf4j.Logger;
186import org.slf4j.LoggerFactory;
187
188import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
189import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
190import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
191import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
192import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
193import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
194import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
195import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
196import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
197import org.apache.hbase.thirdparty.com.google.protobuf.Message;
198import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
199import org.apache.hbase.thirdparty.com.google.protobuf.Service;
200import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
201import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
202import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
203
204import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
205import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
234
235/**
236 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
237 * are many HRegionServers in a single HBase deployment.
238 */
239@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
240@SuppressWarnings({ "deprecation" })
241public class HRegionServer extends HBaseServerBase<RSRpcServices>
242  implements RegionServerServices, LastSequenceId {
243
244  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
245
246  int unitMB = 1024 * 1024;
247  int unitKB = 1024;
248
249  /**
250   * For testing only! Set to true to skip notifying region assignment to master .
251   */
252  @InterfaceAudience.Private
253  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
254  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
255
256  /**
257   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
258   * region action in progress false - if close region action in progress
259   */
260  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
261    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
262
263  /**
264   * Used to cache the open/close region procedures which already submitted. See
265   * {@link #submitRegionProcedure(long)}.
266   */
267  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
268  /**
269   * Used to cache the open/close region procedures which already executed. See
270   * {@link #submitRegionProcedure(long)}.
271   */
272  private final Cache<Long, Long> executedRegionProcedures =
273    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
274
275  /**
276   * Used to cache the moved-out regions
277   */
278  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
279    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
280
281  private MemStoreFlusher cacheFlusher;
282
283  private HeapMemoryManager hMemManager;
284
285  // Replication services. If no replication, this handler will be null.
286  private ReplicationSourceService replicationSourceHandler;
287  private ReplicationSinkService replicationSinkHandler;
288  private boolean sameReplicationSourceAndSink;
289
290  // Compactions
291  private CompactSplit compactSplitThread;
292
293  /**
294   * Map of regions currently being served by this region server. Key is the encoded region name.
295   * All access should be synchronized.
296   */
297  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
298  /**
299   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
300   * need to be a ConcurrentHashMap?
301   */
302  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
303
304  /**
305   * Map of encoded region names to the DataNode locations they should be hosted on We store the
306   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
307   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
308   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
309   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
310   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
311   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
312   * will be fresh when we need it.
313   */
314  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
315
316  private LeaseManager leaseManager;
317
318  private volatile boolean dataFsOk;
319
320  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
321  // Default abort timeout is 1200 seconds for safe
322  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
323  // Will run this task when abort timeout
324  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
325
326  // A state before we go into stopped state. At this stage we're closing user
327  // space regions.
328  private boolean stopping = false;
329  private volatile boolean killed = false;
330
331  private final int threadWakeFrequency;
332
333  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
334  private final int compactionCheckFrequency;
335  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
336  private final int flushCheckFrequency;
337
338  // Stub to do region server status calls against the master.
339  private volatile RegionServerStatusService.BlockingInterface rssStub;
340  private volatile LockService.BlockingInterface lockStub;
341  // RPC client. Used to make the stub above that does region server status checking.
342  private RpcClient rpcClient;
343
344  private UncaughtExceptionHandler uncaughtExceptionHandler;
345
346  private JvmPauseMonitor pauseMonitor;
347
348  private RSSnapshotVerifier rsSnapshotVerifier;
349
350  /** region server process name */
351  public static final String REGIONSERVER = "regionserver";
352
353  private MetricsRegionServer metricsRegionServer;
354  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
355
356  /**
357   * Check for compactions requests.
358   */
359  private ScheduledChore compactionChecker;
360
361  /**
362   * Check for flushes
363   */
364  private ScheduledChore periodicFlusher;
365
366  private volatile WALFactory walFactory;
367
368  private LogRoller walRoller;
369
370  // A thread which calls reportProcedureDone
371  private RemoteProcedureResultReporter procedureResultReporter;
372
373  // flag set after we're done setting up server threads
374  final AtomicBoolean online = new AtomicBoolean(false);
375
376  // master address tracker
377  private final MasterAddressTracker masterAddressTracker;
378
379  // Log Splitting Worker
380  private SplitLogWorker splitLogWorker;
381
382  private final int shortOperationTimeout;
383
384  // Time to pause if master says 'please hold'
385  private final long retryPauseTime;
386
387  private final RegionServerAccounting regionServerAccounting;
388
389  private NamedQueueServiceChore namedQueueServiceChore = null;
390
391  // Block cache
392  private BlockCache blockCache;
393  // The cache for mob files
394  private MobFileCache mobFileCache;
395
396  /** The health check chore. */
397  private HealthCheckChore healthCheckChore;
398
399  /** The Executor status collect chore. */
400  private ExecutorStatusChore executorStatusChore;
401
402  /** The nonce manager chore. */
403  private ScheduledChore nonceManagerChore;
404
405  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
406
407  /**
408   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
409   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
410   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
411   */
412  @Deprecated
413  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
414  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
415    "hbase.regionserver.hostname.disable.master.reversedns";
416
417  /**
418   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
419   * Exception will be thrown if both are used.
420   */
421  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
422  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
423    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
424
425  /**
426   * Unique identifier for the cluster we are a part of.
427   */
428  private String clusterId;
429
430  // chore for refreshing store files for secondary regions
431  private StorefileRefresherChore storefileRefresher;
432
433  private volatile RegionServerCoprocessorHost rsHost;
434
435  private RegionServerProcedureManagerHost rspmHost;
436
437  private RegionServerRpcQuotaManager rsQuotaManager;
438  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
439
440  /**
441   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
442   * case where client doesn't receive the response from a successful operation and retries. We
443   * track the successful ops for some time via a nonce sent by client and handle duplicate
444   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
445   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
446   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
447   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
448   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
449   * recovery during normal region move, so nonces will not be transfered. We can have separate
450   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
451   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
452   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
453   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
454   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
455   * recovered during move.
456   */
457  final ServerNonceManager nonceManager;
458
459  private BrokenStoreFileCleaner brokenStoreFileCleaner;
460
461  private RSMobFileCleanerChore rsMobFileCleanerChore;
462
463  @InterfaceAudience.Private
464  CompactedHFilesDischarger compactedFileDischarger;
465
466  private volatile ThroughputController flushThroughputController;
467
468  private SecureBulkLoadManager secureBulkLoadManager;
469
470  private FileSystemUtilizationChore fsUtilizationChore;
471
472  private BootstrapNodeManager bootstrapNodeManager;
473
474  /**
475   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
476   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
477   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
478   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
479   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
480   */
481  private final boolean masterless;
482  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
483
484  /** regionserver codec list **/
485  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
486
487  // A timer to shutdown the process if abort takes too long
488  private Timer abortMonitor;
489
490  private RegionReplicationBufferManager regionReplicationBufferManager;
491
492  /*
493   * Chore that creates replication marker rows.
494   */
495  private ReplicationMarkerChore replicationMarkerChore;
496
497  /**
498   * Starts a HRegionServer at the default location.
499   * <p/>
500   * Don't start any services or managers in here in the Constructor. Defer till after we register
501   * with the Master as much as possible. See {@link #startServices}.
502   */
503  public HRegionServer(final Configuration conf) throws IOException {
504    super(conf, "RegionServer"); // thread name
505    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
506    try (Scope ignored = span.makeCurrent()) {
507      this.dataFsOk = true;
508      this.masterless = !clusterMode();
509      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
510      HFile.checkHFileVersion(this.conf);
511      checkCodecs(this.conf);
512      FSUtils.setupShortCircuitRead(this.conf);
513
514      // Disable usage of meta replicas in the regionserver
515      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
516      // Config'ed params
517      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
518      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
519      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
520
521      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
522      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
523
524      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
525        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
526
527      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
528        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
529
530      regionServerAccounting = new RegionServerAccounting(conf);
531
532      blockCache = BlockCacheFactory.createBlockCache(conf);
533      mobFileCache = new MobFileCache(conf);
534
535      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
536
537      uncaughtExceptionHandler =
538        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
539
540      // If no master in cluster, skip trying to track one or look for a cluster status.
541      if (!this.masterless) {
542        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
543        masterAddressTracker.start();
544      } else {
545        masterAddressTracker = null;
546      }
547      this.rpcServices.start(zooKeeper);
548      span.setStatus(StatusCode.OK);
549    } catch (Throwable t) {
550      // Make sure we log the exception. HRegionServer is often started via reflection and the
551      // cause of failed startup is lost.
552      TraceUtil.setError(span, t);
553      LOG.error("Failed construction RegionServer", t);
554      throw t;
555    } finally {
556      span.end();
557    }
558  }
559
560  // HMaster should override this method to load the specific config for master
561  @Override
562  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
563    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
564    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
565      if (!StringUtils.isBlank(hostname)) {
566        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
567          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
568          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
569          + UNSAFE_RS_HOSTNAME_KEY + " is used";
570        throw new IOException(msg);
571      } else {
572        return rpcServices.getSocketAddress().getHostName();
573      }
574    } else {
575      return hostname;
576    }
577  }
578
579  @Override
580  protected void login(UserProvider user, String host) throws IOException {
581    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
582      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
583  }
584
585  @Override
586  protected String getProcessName() {
587    return REGIONSERVER;
588  }
589
590  @Override
591  protected RegionServerCoprocessorHost getCoprocessorHost() {
592    return getRegionServerCoprocessorHost();
593  }
594
595  @Override
596  protected boolean canCreateBaseZNode() {
597    return !clusterMode();
598  }
599
600  @Override
601  protected boolean canUpdateTableDescriptor() {
602    return false;
603  }
604
605  @Override
606  protected boolean cacheTableDescriptor() {
607    return false;
608  }
609
610  protected RSRpcServices createRpcServices() throws IOException {
611    return new RSRpcServices(this);
612  }
613
614  @Override
615  protected void configureInfoServer(InfoServer infoServer) {
616    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
617    infoServer.setAttribute(REGIONSERVER, this);
618  }
619
620  @Override
621  protected Class<? extends HttpServlet> getDumpServlet() {
622    return RSDumpServlet.class;
623  }
624
625  /**
626   * Used by {@link RSDumpServlet} to generate debugging information.
627   */
628  public void dumpRowLocks(final PrintWriter out) {
629    StringBuilder sb = new StringBuilder();
630    for (HRegion region : getRegions()) {
631      if (region.getLockedRows().size() > 0) {
632        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
633          sb.setLength(0);
634          sb.append(region.getTableDescriptor().getTableName()).append(",")
635            .append(region.getRegionInfo().getEncodedName()).append(",");
636          sb.append(rowLockContext.toString());
637          out.println(sb);
638        }
639      }
640    }
641  }
642
643  @Override
644  public boolean registerService(Service instance) {
645    // No stacking of instances is allowed for a single executorService name
646    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
647    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
648    if (coprocessorServiceHandlers.containsKey(serviceName)) {
649      LOG.error("Coprocessor executorService " + serviceName
650        + " already registered, rejecting request from " + instance);
651      return false;
652    }
653
654    coprocessorServiceHandlers.put(serviceName, instance);
655    if (LOG.isDebugEnabled()) {
656      LOG.debug(
657        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
658    }
659    return true;
660  }
661
662  /**
663   * Run test on configured codecs to make sure supporting libs are in place.
664   */
665  private static void checkCodecs(final Configuration c) throws IOException {
666    // check to see if the codec list is available:
667    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
668    if (codecs == null) {
669      return;
670    }
671    for (String codec : codecs) {
672      if (!CompressionTest.testCompression(codec)) {
673        throw new IOException(
674          "Compression codec " + codec + " not supported, aborting RS construction");
675      }
676    }
677  }
678
679  public String getClusterId() {
680    return this.clusterId;
681  }
682
683  /**
684   * All initialization needed before we go register with Master.<br>
685   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
686   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
687   */
688  private void preRegistrationInitialization() {
689    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
690    try (Scope ignored = span.makeCurrent()) {
691      initializeZooKeeper();
692      setupClusterConnection();
693      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
694      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
695      // Setup RPC client for master communication
696      this.rpcClient = asyncClusterConnection.getRpcClient();
697      span.setStatus(StatusCode.OK);
698    } catch (Throwable t) {
699      // Call stop if error or process will stick around for ever since server
700      // puts up non-daemon threads.
701      TraceUtil.setError(span, t);
702      this.rpcServices.stop();
703      abort("Initialization of RS failed.  Hence aborting RS.", t);
704    } finally {
705      span.end();
706    }
707  }
708
709  /**
710   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
711   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
712   * <p>
713   * Finally open long-living server short-circuit connection.
714   */
715  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
716      justification = "cluster Id znode read would give us correct response")
717  private void initializeZooKeeper() throws IOException, InterruptedException {
718    // Nothing to do in here if no Master in the mix.
719    if (this.masterless) {
720      return;
721    }
722
723    // Create the master address tracker, register with zk, and start it. Then
724    // block until a master is available. No point in starting up if no master
725    // running.
726    blockAndCheckIfStopped(this.masterAddressTracker);
727
728    // Wait on cluster being up. Master will set this flag up in zookeeper
729    // when ready.
730    blockAndCheckIfStopped(this.clusterStatusTracker);
731
732    // If we are HMaster then the cluster id should have already been set.
733    if (clusterId == null) {
734      // Retrieve clusterId
735      // Since cluster status is now up
736      // ID should have already been set by HMaster
737      try {
738        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
739        if (clusterId == null) {
740          this.abort("Cluster ID has not been set");
741        }
742        LOG.info("ClusterId : " + clusterId);
743      } catch (KeeperException e) {
744        this.abort("Failed to retrieve Cluster ID", e);
745      }
746    }
747
748    if (isStopped() || isAborted()) {
749      return; // No need for further initialization
750    }
751
752    // watch for snapshots and other procedures
753    try {
754      rspmHost = new RegionServerProcedureManagerHost();
755      rspmHost.loadProcedures(conf);
756      rspmHost.initialize(this);
757    } catch (KeeperException e) {
758      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
759    }
760  }
761
762  /**
763   * Utilty method to wait indefinitely on a znode availability while checking if the region server
764   * is shut down
765   * @param tracker znode tracker to use
766   * @throws IOException          any IO exception, plus if the RS is stopped
767   * @throws InterruptedException if the waiting thread is interrupted
768   */
769  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
770    throws IOException, InterruptedException {
771    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
772      if (this.stopped) {
773        throw new IOException("Received the shutdown message while waiting.");
774      }
775    }
776  }
777
778  /** Returns True if the cluster is up. */
779  @Override
780  public boolean isClusterUp() {
781    return this.masterless
782      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
783  }
784
785  private void initializeReplicationMarkerChore() {
786    boolean replicationMarkerEnabled =
787      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
788    // If replication or replication marker is not enabled then return immediately.
789    if (replicationMarkerEnabled) {
790      int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
791        REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
792      replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf);
793    }
794  }
795
796  @Override
797  public boolean isStopping() {
798    return stopping;
799  }
800
801  /**
802   * The HRegionServer sticks in this loop until closed.
803   */
804  @Override
805  public void run() {
806    if (isStopped()) {
807      LOG.info("Skipping run; stopped");
808      return;
809    }
810    try {
811      // Do pre-registration initializations; zookeeper, lease threads, etc.
812      preRegistrationInitialization();
813    } catch (Throwable e) {
814      abort("Fatal exception during initialization", e);
815    }
816
817    try {
818      if (!isStopped() && !isAborted()) {
819        installShutdownHook();
820        // Initialize the RegionServerCoprocessorHost now that our ephemeral
821        // node was created, in case any coprocessors want to use ZooKeeper
822        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
823
824        // Try and register with the Master; tell it we are here. Break if server is stopped or
825        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
826        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
827        // come up.
828        LOG.debug("About to register with Master.");
829        TraceUtil.trace(() -> {
830          RetryCounterFactory rcf =
831            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
832          RetryCounter rc = rcf.create();
833          while (keepLooping()) {
834            RegionServerStartupResponse w = reportForDuty();
835            if (w == null) {
836              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
837              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
838              this.sleeper.sleep(sleepTime);
839            } else {
840              handleReportForDutyResponse(w);
841              break;
842            }
843          }
844        }, "HRegionServer.registerWithMaster");
845      }
846
847      if (!isStopped() && isHealthy()) {
848        TraceUtil.trace(() -> {
849          // start the snapshot handler and other procedure handlers,
850          // since the server is ready to run
851          if (this.rspmHost != null) {
852            this.rspmHost.start();
853          }
854          // Start the Quota Manager
855          if (this.rsQuotaManager != null) {
856            rsQuotaManager.start(getRpcServer().getScheduler());
857          }
858          if (this.rsSpaceQuotaManager != null) {
859            this.rsSpaceQuotaManager.start();
860          }
861        }, "HRegionServer.startup");
862      }
863
864      // We registered with the Master. Go into run mode.
865      long lastMsg = EnvironmentEdgeManager.currentTime();
866      long oldRequestCount = -1;
867      // The main run loop.
868      while (!isStopped() && isHealthy()) {
869        if (!isClusterUp()) {
870          if (onlineRegions.isEmpty()) {
871            stop("Exiting; cluster shutdown set and not carrying any regions");
872          } else if (!this.stopping) {
873            this.stopping = true;
874            LOG.info("Closing user regions");
875            closeUserRegions(isAborted());
876          } else {
877            boolean allUserRegionsOffline = areAllUserRegionsOffline();
878            if (allUserRegionsOffline) {
879              // Set stopped if no more write requests tp meta tables
880              // since last time we went around the loop. Any open
881              // meta regions will be closed on our way out.
882              if (oldRequestCount == getWriteRequestCount()) {
883                stop("Stopped; only catalog regions remaining online");
884                break;
885              }
886              oldRequestCount = getWriteRequestCount();
887            } else {
888              // Make sure all regions have been closed -- some regions may
889              // have not got it because we were splitting at the time of
890              // the call to closeUserRegions.
891              closeUserRegions(this.abortRequested.get());
892            }
893            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
894          }
895        }
896        long now = EnvironmentEdgeManager.currentTime();
897        if ((now - lastMsg) >= msgInterval) {
898          tryRegionServerReport(lastMsg, now);
899          lastMsg = EnvironmentEdgeManager.currentTime();
900        }
901        if (!isStopped() && !isAborted()) {
902          this.sleeper.sleep();
903        }
904      } // for
905    } catch (Throwable t) {
906      if (!rpcServices.checkOOME(t)) {
907        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
908        abort(prefix + t.getMessage(), t);
909      }
910    }
911
912    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
913    try (Scope ignored = span.makeCurrent()) {
914      if (this.leaseManager != null) {
915        this.leaseManager.closeAfterLeasesExpire();
916      }
917      if (this.splitLogWorker != null) {
918        splitLogWorker.stop();
919      }
920      stopInfoServer();
921      // Send cache a shutdown.
922      if (blockCache != null) {
923        blockCache.shutdown();
924      }
925      if (mobFileCache != null) {
926        mobFileCache.shutdown();
927      }
928
929      // Send interrupts to wake up threads if sleeping so they notice shutdown.
930      // TODO: Should we check they are alive? If OOME could have exited already
931      if (this.hMemManager != null) {
932        this.hMemManager.stop();
933      }
934      if (this.cacheFlusher != null) {
935        this.cacheFlusher.interruptIfNecessary();
936      }
937      if (this.compactSplitThread != null) {
938        this.compactSplitThread.interruptIfNecessary();
939      }
940
941      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
942      if (rspmHost != null) {
943        rspmHost.stop(this.abortRequested.get() || this.killed);
944      }
945
946      if (this.killed) {
947        // Just skip out w/o closing regions. Used when testing.
948      } else if (abortRequested.get()) {
949        if (this.dataFsOk) {
950          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
951        }
952        LOG.info("aborting server " + this.serverName);
953      } else {
954        closeUserRegions(abortRequested.get());
955        LOG.info("stopping server " + this.serverName);
956      }
957      regionReplicationBufferManager.stop();
958      closeClusterConnection();
959      // Closing the compactSplit thread before closing meta regions
960      if (!this.killed && containsMetaTableRegions()) {
961        if (!abortRequested.get() || this.dataFsOk) {
962          if (this.compactSplitThread != null) {
963            this.compactSplitThread.join();
964            this.compactSplitThread = null;
965          }
966          closeMetaTableRegions(abortRequested.get());
967        }
968      }
969
970      if (!this.killed && this.dataFsOk) {
971        waitOnAllRegionsToClose(abortRequested.get());
972        LOG.info("stopping server " + this.serverName + "; all regions closed.");
973      }
974
975      // Stop the quota manager
976      if (rsQuotaManager != null) {
977        rsQuotaManager.stop();
978      }
979      if (rsSpaceQuotaManager != null) {
980        rsSpaceQuotaManager.stop();
981        rsSpaceQuotaManager = null;
982      }
983
984      // flag may be changed when closing regions throws exception.
985      if (this.dataFsOk) {
986        shutdownWAL(!abortRequested.get());
987      }
988
989      // Make sure the proxy is down.
990      if (this.rssStub != null) {
991        this.rssStub = null;
992      }
993      if (this.lockStub != null) {
994        this.lockStub = null;
995      }
996      if (this.rpcClient != null) {
997        this.rpcClient.close();
998      }
999      if (this.leaseManager != null) {
1000        this.leaseManager.close();
1001      }
1002      if (this.pauseMonitor != null) {
1003        this.pauseMonitor.stop();
1004      }
1005
1006      if (!killed) {
1007        stopServiceThreads();
1008      }
1009
1010      if (this.rpcServices != null) {
1011        this.rpcServices.stop();
1012      }
1013
1014      try {
1015        deleteMyEphemeralNode();
1016      } catch (KeeperException.NoNodeException nn) {
1017        // pass
1018      } catch (KeeperException e) {
1019        LOG.warn("Failed deleting my ephemeral node", e);
1020      }
1021      // We may have failed to delete the znode at the previous step, but
1022      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1023      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1024
1025      closeZooKeeper();
1026      closeTableDescriptors();
1027      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1028      span.setStatus(StatusCode.OK);
1029    } finally {
1030      span.end();
1031    }
1032  }
1033
1034  private boolean containsMetaTableRegions() {
1035    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1036  }
1037
1038  private boolean areAllUserRegionsOffline() {
1039    if (getNumberOfOnlineRegions() > 2) {
1040      return false;
1041    }
1042    boolean allUserRegionsOffline = true;
1043    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1044      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1045        allUserRegionsOffline = false;
1046        break;
1047      }
1048    }
1049    return allUserRegionsOffline;
1050  }
1051
1052  /** Returns Current write count for all online regions. */
1053  private long getWriteRequestCount() {
1054    long writeCount = 0;
1055    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1056      writeCount += e.getValue().getWriteRequestsCount();
1057    }
1058    return writeCount;
1059  }
1060
1061  @InterfaceAudience.Private
1062  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1063    throws IOException {
1064    RegionServerStatusService.BlockingInterface rss = rssStub;
1065    if (rss == null) {
1066      // the current server could be stopping.
1067      return;
1068    }
1069    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1070    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1071    try (Scope ignored = span.makeCurrent()) {
1072      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1073      request.setServer(ProtobufUtil.toServerName(this.serverName));
1074      request.setLoad(sl);
1075      rss.regionServerReport(null, request.build());
1076      span.setStatus(StatusCode.OK);
1077    } catch (ServiceException se) {
1078      IOException ioe = ProtobufUtil.getRemoteException(se);
1079      if (ioe instanceof YouAreDeadException) {
1080        // This will be caught and handled as a fatal error in run()
1081        TraceUtil.setError(span, ioe);
1082        throw ioe;
1083      }
1084      if (rssStub == rss) {
1085        rssStub = null;
1086      }
1087      TraceUtil.setError(span, se);
1088      // Couldn't connect to the master, get location from zk and reconnect
1089      // Method blocks until new master is found or we are stopped
1090      createRegionServerStatusStub(true);
1091    } finally {
1092      span.end();
1093    }
1094  }
1095
1096  /**
1097   * Reports the given map of Regions and their size on the filesystem to the active Master.
1098   * @param regionSizeStore The store containing region sizes
1099   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1100   */
1101  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1102    RegionServerStatusService.BlockingInterface rss = rssStub;
1103    if (rss == null) {
1104      // the current server could be stopping.
1105      LOG.trace("Skipping Region size report to HMaster as stub is null");
1106      return true;
1107    }
1108    try {
1109      buildReportAndSend(rss, regionSizeStore);
1110    } catch (ServiceException se) {
1111      IOException ioe = ProtobufUtil.getRemoteException(se);
1112      if (ioe instanceof PleaseHoldException) {
1113        LOG.trace("Failed to report region sizes to Master because it is initializing."
1114          + " This will be retried.", ioe);
1115        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1116        return true;
1117      }
1118      if (rssStub == rss) {
1119        rssStub = null;
1120      }
1121      createRegionServerStatusStub(true);
1122      if (ioe instanceof DoNotRetryIOException) {
1123        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1124        if (doNotRetryEx.getCause() != null) {
1125          Throwable t = doNotRetryEx.getCause();
1126          if (t instanceof UnsupportedOperationException) {
1127            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1128            return false;
1129          }
1130        }
1131      }
1132      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1133    }
1134    return true;
1135  }
1136
1137  /**
1138   * Builds the region size report and sends it to the master. Upon successful sending of the
1139   * report, the region sizes that were sent are marked as sent.
1140   * @param rss             The stub to send to the Master
1141   * @param regionSizeStore The store containing region sizes
1142   */
1143  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1144    RegionSizeStore regionSizeStore) throws ServiceException {
1145    RegionSpaceUseReportRequest request =
1146      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1147    rss.reportRegionSpaceUse(null, request);
1148    // Record the number of size reports sent
1149    if (metricsRegionServer != null) {
1150      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1151    }
1152  }
1153
1154  /**
1155   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1156   * @param regionSizes The size in bytes of regions
1157   * @return The corresponding protocol buffer message.
1158   */
1159  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1160    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1161    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1162      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1163    }
1164    return request.build();
1165  }
1166
1167  /**
1168   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1169   * message.
1170   * @param regionInfo  The RegionInfo
1171   * @param sizeInBytes The size in bytes of the Region
1172   * @return The protocol buffer
1173   */
1174  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1175    return RegionSpaceUse.newBuilder()
1176      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1177      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1178  }
1179
1180  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1181    throws IOException {
1182    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1183    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1184    // the wrapper to compute those numbers in one place.
1185    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1186    // Instead they should be stored in an HBase table so that external visibility into HBase is
1187    // improved; Additionally the load balancer will be able to take advantage of a more complete
1188    // history.
1189    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1190    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1191    long usedMemory = -1L;
1192    long maxMemory = -1L;
1193    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1194    if (usage != null) {
1195      usedMemory = usage.getUsed();
1196      maxMemory = usage.getMax();
1197    }
1198
1199    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1200    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1201    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1202    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1203    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1204    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1205    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1206    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1207    Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder();
1208    for (String coprocessor : coprocessors) {
1209      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1210    }
1211    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1212    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1213    for (HRegion region : regions) {
1214      if (region.getCoprocessorHost() != null) {
1215        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1216        for (String regionCoprocessor : regionCoprocessors) {
1217          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1218        }
1219      }
1220      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1221      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1222        .getCoprocessors()) {
1223        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1224      }
1225    }
1226
1227    getBlockCache().ifPresent(cache -> {
1228      cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1229        regionCachedInfo.forEach((regionName, prefetchSize) -> {
1230          serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB));
1231        });
1232      });
1233    });
1234
1235    serverLoad.setReportStartTime(reportStartTime);
1236    serverLoad.setReportEndTime(reportEndTime);
1237    if (this.infoServer != null) {
1238      serverLoad.setInfoServerPort(this.infoServer.getPort());
1239    } else {
1240      serverLoad.setInfoServerPort(-1);
1241    }
1242    MetricsUserAggregateSource userSource =
1243      metricsRegionServer.getMetricsUserAggregate().getSource();
1244    if (userSource != null) {
1245      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1246      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1247        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1248      }
1249    }
1250
1251    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1252      // always refresh first to get the latest value
1253      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1254      if (rLoad != null) {
1255        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1256        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1257          .getReplicationLoadSourceEntries()) {
1258          serverLoad.addReplLoadSource(rLS);
1259        }
1260      }
1261    } else {
1262      if (replicationSourceHandler != null) {
1263        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1264        if (rLoad != null) {
1265          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1266            .getReplicationLoadSourceEntries()) {
1267            serverLoad.addReplLoadSource(rLS);
1268          }
1269        }
1270      }
1271      if (replicationSinkHandler != null) {
1272        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1273        if (rLoad != null) {
1274          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1275        }
1276      }
1277    }
1278
1279    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1280      .newBuilder().setDescription(task.getDescription())
1281      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1282      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1283      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1284
1285    return serverLoad.build();
1286  }
1287
1288  private String getOnlineRegionsAsPrintableString() {
1289    StringBuilder sb = new StringBuilder();
1290    for (Region r : this.onlineRegions.values()) {
1291      if (sb.length() > 0) {
1292        sb.append(", ");
1293      }
1294      sb.append(r.getRegionInfo().getEncodedName());
1295    }
1296    return sb.toString();
1297  }
1298
1299  /**
1300   * Wait on regions close.
1301   */
1302  private void waitOnAllRegionsToClose(final boolean abort) {
1303    // Wait till all regions are closed before going out.
1304    int lastCount = -1;
1305    long previousLogTime = 0;
1306    Set<String> closedRegions = new HashSet<>();
1307    boolean interrupted = false;
1308    try {
1309      while (!onlineRegions.isEmpty()) {
1310        int count = getNumberOfOnlineRegions();
1311        // Only print a message if the count of regions has changed.
1312        if (count != lastCount) {
1313          // Log every second at most
1314          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1315            previousLogTime = EnvironmentEdgeManager.currentTime();
1316            lastCount = count;
1317            LOG.info("Waiting on " + count + " regions to close");
1318            // Only print out regions still closing if a small number else will
1319            // swamp the log.
1320            if (count < 10 && LOG.isDebugEnabled()) {
1321              LOG.debug("Online Regions=" + this.onlineRegions);
1322            }
1323          }
1324        }
1325        // Ensure all user regions have been sent a close. Use this to
1326        // protect against the case where an open comes in after we start the
1327        // iterator of onlineRegions to close all user regions.
1328        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1329          RegionInfo hri = e.getValue().getRegionInfo();
1330          if (
1331            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1332              && !closedRegions.contains(hri.getEncodedName())
1333          ) {
1334            closedRegions.add(hri.getEncodedName());
1335            // Don't update zk with this close transition; pass false.
1336            closeRegionIgnoreErrors(hri, abort);
1337          }
1338        }
1339        // No regions in RIT, we could stop waiting now.
1340        if (this.regionsInTransitionInRS.isEmpty()) {
1341          if (!onlineRegions.isEmpty()) {
1342            LOG.info("We were exiting though online regions are not empty,"
1343              + " because some regions failed closing");
1344          }
1345          break;
1346        } else {
1347          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1348            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1349        }
1350        if (sleepInterrupted(200)) {
1351          interrupted = true;
1352        }
1353      }
1354    } finally {
1355      if (interrupted) {
1356        Thread.currentThread().interrupt();
1357      }
1358    }
1359  }
1360
1361  private static boolean sleepInterrupted(long millis) {
1362    boolean interrupted = false;
1363    try {
1364      Thread.sleep(millis);
1365    } catch (InterruptedException e) {
1366      LOG.warn("Interrupted while sleeping");
1367      interrupted = true;
1368    }
1369    return interrupted;
1370  }
1371
1372  private void shutdownWAL(final boolean close) {
1373    if (this.walFactory != null) {
1374      try {
1375        if (close) {
1376          walFactory.close();
1377        } else {
1378          walFactory.shutdown();
1379        }
1380      } catch (Throwable e) {
1381        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1382        LOG.error("Shutdown / close of WAL failed: " + e);
1383        LOG.debug("Shutdown / close exception details:", e);
1384      }
1385    }
1386  }
1387
1388  /**
1389   * Run init. Sets up wal and starts up all server threads.
1390   * @param c Extra configuration.
1391   */
1392  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1393    throws IOException {
1394    try {
1395      boolean updateRootDir = false;
1396      for (NameStringPair e : c.getMapEntriesList()) {
1397        String key = e.getName();
1398        // The hostname the master sees us as.
1399        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1400          String hostnameFromMasterPOV = e.getValue();
1401          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1402            rpcServices.getSocketAddress().getPort(), this.startcode);
1403          String expectedHostName = rpcServices.getSocketAddress().getHostName();
1404          // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it
1405          // is set to disable. so we will use the ip of the RegionServer to compare with the
1406          // hostname passed by the Master, see HBASE-27304 for details.
1407          if (
1408            StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent()
1409              && InetAddresses.isInetAddress(getActiveMaster().get().getHostname())
1410          ) {
1411            expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress();
1412          }
1413          boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead)
1414            ? hostnameFromMasterPOV.equals(expectedHostName)
1415            : hostnameFromMasterPOV.equals(useThisHostnameInstead);
1416          if (!isHostnameConsist) {
1417            String msg = "Master passed us a different hostname to use; was="
1418              + (StringUtils.isBlank(useThisHostnameInstead)
1419                ? rpcServices.getSocketAddress().getHostName()
1420                : this.useThisHostnameInstead)
1421              + ", but now=" + hostnameFromMasterPOV;
1422            LOG.error(msg);
1423            throw new IOException(msg);
1424          }
1425          continue;
1426        }
1427
1428        String value = e.getValue();
1429        if (key.equals(HConstants.HBASE_DIR)) {
1430          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1431            updateRootDir = true;
1432          }
1433        }
1434
1435        if (LOG.isDebugEnabled()) {
1436          LOG.debug("Config from master: " + key + "=" + value);
1437        }
1438        this.conf.set(key, value);
1439      }
1440      // Set our ephemeral znode up in zookeeper now we have a name.
1441      createMyEphemeralNode();
1442
1443      if (updateRootDir) {
1444        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1445        initializeFileSystem();
1446      }
1447
1448      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1449      // config param for task trackers, but we can piggyback off of it.
1450      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1451        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1452      }
1453
1454      // Save it in a file, this will allow to see if we crash
1455      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1456
1457      // This call sets up an initialized replication and WAL. Later we start it up.
1458      setupWALAndReplication();
1459      // Init in here rather than in constructor after thread name has been set
1460      final MetricsTable metricsTable =
1461        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1462      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1463      this.metricsRegionServer =
1464        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1465      // Now that we have a metrics source, start the pause monitor
1466      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1467      pauseMonitor.start();
1468
1469      // There is a rare case where we do NOT want services to start. Check config.
1470      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1471        startServices();
1472      }
1473      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1474      // or make sense of it.
1475      startReplicationService();
1476
1477      // Set up ZK
1478      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress()
1479        + ", sessionid=0x"
1480        + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1481
1482      // Wake up anyone waiting for this server to online
1483      synchronized (online) {
1484        online.set(true);
1485        online.notifyAll();
1486      }
1487    } catch (Throwable e) {
1488      stop("Failed initialization");
1489      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1490    } finally {
1491      sleeper.skipSleepCycle();
1492    }
1493  }
1494
1495  private void startHeapMemoryManager() {
1496    if (this.blockCache != null) {
1497      this.hMemManager =
1498        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1499      this.hMemManager.start(getChoreService());
1500    }
1501  }
1502
1503  private void createMyEphemeralNode() throws KeeperException {
1504    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1505    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1506    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1507    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1508    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1509  }
1510
1511  private void deleteMyEphemeralNode() throws KeeperException {
1512    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1513  }
1514
1515  @Override
1516  public RegionServerAccounting getRegionServerAccounting() {
1517    return regionServerAccounting;
1518  }
1519
1520  // Round the size with KB or MB.
1521  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1522  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1523  // HBASE-26340 for more details on why this is important.
1524  private static int roundSize(long sizeInByte, int sizeUnit) {
1525    if (sizeInByte == 0) {
1526      return 0;
1527    } else if (sizeInByte < sizeUnit) {
1528      return 1;
1529    } else {
1530      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1531    }
1532  }
1533
1534  private void computeIfPersistentBucketCache(Consumer<BucketCache> computation) {
1535    if (blockCache instanceof CombinedBlockCache) {
1536      BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache();
1537      if (l2 instanceof BucketCache && ((BucketCache) l2).isCachePersistent()) {
1538        computation.accept((BucketCache) l2);
1539      }
1540    }
1541  }
1542
1543  /**
1544   * @param r               Region to get RegionLoad for.
1545   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1546   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1547   * @return RegionLoad instance.
1548   */
1549  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1550    RegionSpecifier.Builder regionSpecifier) throws IOException {
1551    byte[] name = r.getRegionInfo().getRegionName();
1552    String regionEncodedName = r.getRegionInfo().getEncodedName();
1553    int stores = 0;
1554    int storefiles = 0;
1555    int storeRefCount = 0;
1556    int maxCompactedStoreFileRefCount = 0;
1557    long storeUncompressedSize = 0L;
1558    long storefileSize = 0L;
1559    long storefileIndexSize = 0L;
1560    long rootLevelIndexSize = 0L;
1561    long totalStaticIndexSize = 0L;
1562    long totalStaticBloomSize = 0L;
1563    long totalCompactingKVs = 0L;
1564    long currentCompactedKVs = 0L;
1565    long totalRegionSize = 0L;
1566    List<HStore> storeList = r.getStores();
1567    stores += storeList.size();
1568    for (HStore store : storeList) {
1569      storefiles += store.getStorefilesCount();
1570      int currentStoreRefCount = store.getStoreRefCount();
1571      storeRefCount += currentStoreRefCount;
1572      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1573      maxCompactedStoreFileRefCount =
1574        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1575      storeUncompressedSize += store.getStoreSizeUncompressed();
1576      storefileSize += store.getStorefilesSize();
1577      totalRegionSize += store.getHFilesSize();
1578      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1579      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1580      CompactionProgress progress = store.getCompactionProgress();
1581      if (progress != null) {
1582        totalCompactingKVs += progress.getTotalCompactingKVs();
1583        currentCompactedKVs += progress.currentCompactedKVs;
1584      }
1585      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1586      totalStaticIndexSize += store.getTotalStaticIndexSize();
1587      totalStaticBloomSize += store.getTotalStaticBloomSize();
1588    }
1589
1590    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1591    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1592    int storefileSizeMB = roundSize(storefileSize, unitMB);
1593    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1594    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1595    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1596    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1597    int regionSizeMB = roundSize(totalRegionSize, unitMB);
1598    final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f);
1599    getBlockCache().ifPresent(bc -> {
1600      bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1601        if (regionCachedInfo.containsKey(regionEncodedName)) {
1602          currentRegionCachedRatio.setValue(regionSizeMB == 0
1603            ? 0.0f
1604            : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB);
1605        }
1606      });
1607    });
1608
1609    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1610    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1611    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1612    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1613    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1614    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1615    if (regionLoadBldr == null) {
1616      regionLoadBldr = RegionLoad.newBuilder();
1617    }
1618    if (regionSpecifier == null) {
1619      regionSpecifier = RegionSpecifier.newBuilder();
1620    }
1621
1622    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1623    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1624    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1625      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1626      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1627      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1628      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1629      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1630      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1631      .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount())
1632      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1633      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1634      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1635      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1636      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1637      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1638      .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB)
1639      .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue());
1640    r.setCompleteSequenceId(regionLoadBldr);
1641    return regionLoadBldr.build();
1642  }
1643
1644  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1645    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1646    userLoadBldr.setUserName(user);
1647    userSource.getClientMetrics().values().stream()
1648      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1649        .setHostName(clientMetrics.getHostName())
1650        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1651        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1652        .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1653      .forEach(userLoadBldr::addClientMetrics);
1654    return userLoadBldr.build();
1655  }
1656
1657  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1658    HRegion r = onlineRegions.get(encodedRegionName);
1659    return r != null ? createRegionLoad(r, null, null) : null;
1660  }
1661
1662  /**
1663   * Inner class that runs on a long period checking if regions need compaction.
1664   */
1665  private static class CompactionChecker extends ScheduledChore {
1666    private final HRegionServer instance;
1667    private final int majorCompactPriority;
1668    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1669    // Iteration is 1-based rather than 0-based so we don't check for compaction
1670    // immediately upon region server startup
1671    private long iteration = 1;
1672
1673    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1674      super("CompactionChecker", stopper, sleepTime);
1675      this.instance = h;
1676      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1677
1678      /*
1679       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1680       */
1681      this.majorCompactPriority = this.instance.conf
1682        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1683    }
1684
1685    @Override
1686    protected void chore() {
1687      for (HRegion hr : this.instance.onlineRegions.values()) {
1688        // If region is read only or compaction is disabled at table level, there's no need to
1689        // iterate through region's stores
1690        if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {
1691          continue;
1692        }
1693
1694        for (HStore s : hr.stores.values()) {
1695          try {
1696            long multiplier = s.getCompactionCheckMultiplier();
1697            assert multiplier > 0;
1698            if (iteration % multiplier != 0) {
1699              continue;
1700            }
1701            if (s.needsCompaction()) {
1702              // Queue a compaction. Will recognize if major is needed.
1703              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1704                getName() + " requests compaction");
1705            } else if (s.shouldPerformMajorCompaction()) {
1706              s.triggerMajorCompaction();
1707              if (
1708                majorCompactPriority == DEFAULT_PRIORITY
1709                  || majorCompactPriority > hr.getCompactPriority()
1710              ) {
1711                this.instance.compactSplitThread.requestCompaction(hr, s,
1712                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
1713                  CompactionLifeCycleTracker.DUMMY, null);
1714              } else {
1715                this.instance.compactSplitThread.requestCompaction(hr, s,
1716                  getName() + " requests major compaction; use configured priority",
1717                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1718              }
1719            }
1720          } catch (IOException e) {
1721            LOG.warn("Failed major compaction check on " + hr, e);
1722          }
1723        }
1724      }
1725      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1726    }
1727  }
1728
1729  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1730    private final HRegionServer server;
1731    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1732    private final static int MIN_DELAY_TIME = 0; // millisec
1733    private final long rangeOfDelayMs;
1734
1735    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1736      super("MemstoreFlusherChore", server, cacheFlushInterval);
1737      this.server = server;
1738
1739      final long configuredRangeOfDelay = server.getConfiguration()
1740        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1741      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1742    }
1743
1744    @Override
1745    protected void chore() {
1746      final StringBuilder whyFlush = new StringBuilder();
1747      for (HRegion r : this.server.onlineRegions.values()) {
1748        if (r == null) {
1749          continue;
1750        }
1751        if (r.shouldFlush(whyFlush)) {
1752          FlushRequester requester = server.getFlushRequester();
1753          if (requester != null) {
1754            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1755            // Throttle the flushes by putting a delay. If we don't throttle, and there
1756            // is a balanced write-load on the regions in a table, we might end up
1757            // overwhelming the filesystem with too many flushes at once.
1758            if (requester.requestDelayedFlush(r, delay)) {
1759              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
1760                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
1761            }
1762          }
1763        }
1764      }
1765    }
1766  }
1767
1768  /**
1769   * Report the status of the server. A server is online once all the startup is completed (setting
1770   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
1771   * useful in tests.
1772   * @return true if online, false if not.
1773   */
1774  public boolean isOnline() {
1775    return online.get();
1776  }
1777
1778  /**
1779   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1780   * be hooked up to WAL.
1781   */
1782  private void setupWALAndReplication() throws IOException {
1783    WALFactory factory = new WALFactory(conf, serverName, this);
1784    // TODO Replication make assumptions here based on the default filesystem impl
1785    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1786    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1787
1788    Path logDir = new Path(walRootDir, logName);
1789    LOG.debug("logDir={}", logDir);
1790    if (this.walFs.exists(logDir)) {
1791      throw new RegionServerRunningException(
1792        "Region server has already created directory at " + this.serverName.toString());
1793    }
1794    // Always create wal directory as now we need this when master restarts to find out the live
1795    // region servers.
1796    if (!this.walFs.mkdirs(logDir)) {
1797      throw new IOException("Can not create wal directory " + logDir);
1798    }
1799    // Instantiate replication if replication enabled. Pass it the log directories.
1800    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1801
1802    WALActionsListener walEventListener = getWALEventTrackerListener(conf);
1803    if (walEventListener != null && factory.getWALProvider() != null) {
1804      factory.getWALProvider().addWALActionsListener(walEventListener);
1805    }
1806    this.walFactory = factory;
1807  }
1808
1809  private WALActionsListener getWALEventTrackerListener(Configuration conf) {
1810    if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
1811      WALEventTrackerListener listener =
1812        new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
1813      return listener;
1814    }
1815    return null;
1816  }
1817
1818  /**
1819   * Start up replication source and sink handlers.
1820   */
1821  private void startReplicationService() throws IOException {
1822    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1823      this.replicationSourceHandler.startReplicationService();
1824    } else {
1825      if (this.replicationSourceHandler != null) {
1826        this.replicationSourceHandler.startReplicationService();
1827      }
1828      if (this.replicationSinkHandler != null) {
1829        this.replicationSinkHandler.startReplicationService();
1830      }
1831    }
1832  }
1833
1834  /** Returns Master address tracker instance. */
1835  public MasterAddressTracker getMasterAddressTracker() {
1836    return this.masterAddressTracker;
1837  }
1838
1839  /**
1840   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
1841   * to run. This is called after we've successfully registered with the Master. Install an
1842   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
1843   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
1844   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
1845   * to run should trigger same critical condition and the shutdown will run. On its way out, this
1846   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
1847   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
1848   * by this hosting server. Worker logs the exception and exits.
1849   */
1850  private void startServices() throws IOException {
1851    if (!isStopped() && !isAborted()) {
1852      initializeThreads();
1853    }
1854    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1855    this.secureBulkLoadManager.start();
1856
1857    // Health checker thread.
1858    if (isHealthCheckerConfigured()) {
1859      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1860        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1861      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1862    }
1863    // Executor status collect thread.
1864    if (
1865      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1866        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
1867    ) {
1868      int sleepTime =
1869        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1870      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1871        this.metricsRegionServer.getMetricsSource());
1872    }
1873
1874    this.walRoller = new LogRoller(this);
1875    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1876    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1877
1878    // Create the CompactedFileDischarger chore executorService. This chore helps to
1879    // remove the compacted files that will no longer be used in reads.
1880    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1881    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1882    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1883    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
1884    choreService.scheduleChore(compactedFileDischarger);
1885
1886    // Start executor services
1887    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1888    executorService.startExecutorService(executorService.new ExecutorConfig()
1889      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1890    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1891    executorService.startExecutorService(executorService.new ExecutorConfig()
1892      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1893    final int openPriorityRegionThreads =
1894      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1895    executorService.startExecutorService(
1896      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
1897        .setCorePoolSize(openPriorityRegionThreads));
1898    final int closeRegionThreads =
1899      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1900    executorService.startExecutorService(executorService.new ExecutorConfig()
1901      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1902    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1903    executorService.startExecutorService(executorService.new ExecutorConfig()
1904      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1905    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1906      final int storeScannerParallelSeekThreads =
1907        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1908      executorService.startExecutorService(
1909        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
1910          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
1911    }
1912    final int logReplayOpsThreads =
1913      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1914    executorService.startExecutorService(
1915      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
1916        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
1917    // Start the threads for compacted files discharger
1918    final int compactionDischargerThreads =
1919      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1920    executorService.startExecutorService(executorService.new ExecutorConfig()
1921      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
1922      .setCorePoolSize(compactionDischargerThreads));
1923    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1924      final int regionReplicaFlushThreads =
1925        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1926          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1927      executorService.startExecutorService(executorService.new ExecutorConfig()
1928        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
1929        .setCorePoolSize(regionReplicaFlushThreads));
1930    }
1931    final int refreshPeerThreads =
1932      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1933    executorService.startExecutorService(executorService.new ExecutorConfig()
1934      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1935    final int replaySyncReplicationWALThreads =
1936      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1937    executorService.startExecutorService(executorService.new ExecutorConfig()
1938      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
1939      .setCorePoolSize(replaySyncReplicationWALThreads));
1940    final int switchRpcThrottleThreads =
1941      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1942    executorService.startExecutorService(
1943      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
1944        .setCorePoolSize(switchRpcThrottleThreads));
1945    final int claimReplicationQueueThreads =
1946      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1947    executorService.startExecutorService(
1948      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
1949        .setCorePoolSize(claimReplicationQueueThreads));
1950    final int rsSnapshotOperationThreads =
1951      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1952    executorService.startExecutorService(
1953      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
1954        .setCorePoolSize(rsSnapshotOperationThreads));
1955    final int rsFlushOperationThreads =
1956      conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3);
1957    executorService.startExecutorService(executorService.new ExecutorConfig()
1958      .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads));
1959
1960    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1961      uncaughtExceptionHandler);
1962    if (this.cacheFlusher != null) {
1963      this.cacheFlusher.start(uncaughtExceptionHandler);
1964    }
1965    Threads.setDaemonThreadRunning(this.procedureResultReporter,
1966      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1967
1968    if (this.compactionChecker != null) {
1969      choreService.scheduleChore(compactionChecker);
1970    }
1971    if (this.periodicFlusher != null) {
1972      choreService.scheduleChore(periodicFlusher);
1973    }
1974    if (this.healthCheckChore != null) {
1975      choreService.scheduleChore(healthCheckChore);
1976    }
1977    if (this.executorStatusChore != null) {
1978      choreService.scheduleChore(executorStatusChore);
1979    }
1980    if (this.nonceManagerChore != null) {
1981      choreService.scheduleChore(nonceManagerChore);
1982    }
1983    if (this.storefileRefresher != null) {
1984      choreService.scheduleChore(storefileRefresher);
1985    }
1986    if (this.fsUtilizationChore != null) {
1987      choreService.scheduleChore(fsUtilizationChore);
1988    }
1989    if (this.namedQueueServiceChore != null) {
1990      choreService.scheduleChore(namedQueueServiceChore);
1991    }
1992    if (this.brokenStoreFileCleaner != null) {
1993      choreService.scheduleChore(brokenStoreFileCleaner);
1994    }
1995    if (this.rsMobFileCleanerChore != null) {
1996      choreService.scheduleChore(rsMobFileCleanerChore);
1997    }
1998    if (replicationMarkerChore != null) {
1999      LOG.info("Starting replication marker chore");
2000      choreService.scheduleChore(replicationMarkerChore);
2001    }
2002
2003    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2004    // an unhandled exception, it will just exit.
2005    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2006      uncaughtExceptionHandler);
2007
2008    // Create the log splitting worker and start it
2009    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2010    // quite a while inside Connection layer. The worker won't be available for other
2011    // tasks even after current task is preempted after a split task times out.
2012    Configuration sinkConf = HBaseConfiguration.create(conf);
2013    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2014      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2015    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2016      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2017    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2018    if (
2019      this.csm != null
2020        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
2021    ) {
2022      // SplitLogWorker needs csm. If none, don't start this.
2023      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
2024      splitLogWorker.start();
2025      LOG.debug("SplitLogWorker started");
2026    }
2027
2028    // Memstore services.
2029    startHeapMemoryManager();
2030    // Call it after starting HeapMemoryManager.
2031    initializeMemStoreChunkCreator(hMemManager);
2032  }
2033
2034  private void initializeThreads() {
2035    // Cache flushing thread.
2036    this.cacheFlusher = new MemStoreFlusher(conf, this);
2037
2038    // Compaction thread
2039    this.compactSplitThread = new CompactSplit(this);
2040
2041    // Background thread to check for compactions; needed if region has not gotten updates
2042    // in a while. It will take care of not checking too frequently on store-by-store basis.
2043    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2044    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2045    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2046
2047    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2048      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2049    final boolean walEventTrackerEnabled =
2050      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
2051
2052    if (isSlowLogTableEnabled || walEventTrackerEnabled) {
2053      // default chore duration: 10 min
2054      // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
2055      final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
2056        DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
2057
2058      final int namedQueueChoreDuration =
2059        conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
2060      // Considering min of slowLogChoreDuration and namedQueueChoreDuration
2061      int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
2062
2063      namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
2064        this.namedQueueRecorder, this.getConnection());
2065    }
2066
2067    if (this.nonceManager != null) {
2068      // Create the scheduled chore that cleans up nonces.
2069      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2070    }
2071
2072    // Setup the Quota Manager
2073    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2074    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2075
2076    if (QuotaUtil.isQuotaEnabled(conf)) {
2077      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2078    }
2079
2080    boolean onlyMetaRefresh = false;
2081    int storefileRefreshPeriod =
2082      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2083        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2084    if (storefileRefreshPeriod == 0) {
2085      storefileRefreshPeriod =
2086        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2087          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2088      onlyMetaRefresh = true;
2089    }
2090    if (storefileRefreshPeriod > 0) {
2091      this.storefileRefresher =
2092        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
2093    }
2094
2095    int brokenStoreFileCleanerPeriod =
2096      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2097        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2098    int brokenStoreFileCleanerDelay =
2099      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2100        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2101    double brokenStoreFileCleanerDelayJitter =
2102      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2103        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2104    double jitterRate =
2105      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2106    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2107    this.brokenStoreFileCleaner =
2108      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2109        brokenStoreFileCleanerPeriod, this, conf, this);
2110
2111    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
2112
2113    registerConfigurationObservers();
2114    initializeReplicationMarkerChore();
2115  }
2116
2117  private void registerConfigurationObservers() {
2118    // Register Replication if possible, as now we support recreating replication peer storage, for
2119    // migrating across different replication peer storages online
2120    if (replicationSourceHandler instanceof ConfigurationObserver) {
2121      configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
2122    }
2123    if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
2124      configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
2125    }
2126    // Registering the compactSplitThread object with the ConfigurationManager.
2127    configurationManager.registerObserver(this.compactSplitThread);
2128    configurationManager.registerObserver(this.cacheFlusher);
2129    configurationManager.registerObserver(this.rpcServices);
2130    configurationManager.registerObserver(this);
2131  }
2132
2133  /*
2134   * Verify that server is healthy
2135   */
2136  private boolean isHealthy() {
2137    if (!dataFsOk) {
2138      // File system problem
2139      return false;
2140    }
2141    // Verify that all threads are alive
2142    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2143      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2144      && (this.walRoller == null || this.walRoller.isAlive())
2145      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2146      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2147    if (!healthy) {
2148      stop("One or more threads are no longer alive -- stop");
2149    }
2150    return healthy;
2151  }
2152
2153  @Override
2154  public List<WAL> getWALs() {
2155    return walFactory.getWALs();
2156  }
2157
2158  @Override
2159  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2160    WAL wal = walFactory.getWAL(regionInfo);
2161    if (this.walRoller != null) {
2162      this.walRoller.addWAL(wal);
2163    }
2164    return wal;
2165  }
2166
2167  public LogRoller getWalRoller() {
2168    return walRoller;
2169  }
2170
2171  public WALFactory getWalFactory() {
2172    return walFactory;
2173  }
2174
2175  @Override
2176  public void stop(final String msg) {
2177    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2178  }
2179
2180  /**
2181   * Stops the regionserver.
2182   * @param msg   Status message
2183   * @param force True if this is a regionserver abort
2184   * @param user  The user executing the stop request, or null if no user is associated
2185   */
2186  public void stop(final String msg, final boolean force, final User user) {
2187    if (!this.stopped) {
2188      LOG.info("***** STOPPING region server '" + this + "' *****");
2189      if (this.rsHost != null) {
2190        // when forced via abort don't allow CPs to override
2191        try {
2192          this.rsHost.preStop(msg, user);
2193        } catch (IOException ioe) {
2194          if (!force) {
2195            LOG.warn("The region server did not stop", ioe);
2196            return;
2197          }
2198          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2199        }
2200      }
2201      this.stopped = true;
2202      LOG.info("STOPPED: " + msg);
2203      // Wakes run() if it is sleeping
2204      sleeper.skipSleepCycle();
2205    }
2206  }
2207
2208  public void waitForServerOnline() {
2209    while (!isStopped() && !isOnline()) {
2210      synchronized (online) {
2211        try {
2212          online.wait(msgInterval);
2213        } catch (InterruptedException ie) {
2214          Thread.currentThread().interrupt();
2215          break;
2216        }
2217      }
2218    }
2219  }
2220
2221  @Override
2222  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2223    HRegion r = context.getRegion();
2224    long openProcId = context.getOpenProcId();
2225    long masterSystemTime = context.getMasterSystemTime();
2226    rpcServices.checkOpen();
2227    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2228      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2229    // Do checks to see if we need to compact (references or too many files)
2230    // Skip compaction check if region is read only
2231    if (!r.isReadOnly()) {
2232      for (HStore s : r.stores.values()) {
2233        if (s.hasReferences() || s.needsCompaction()) {
2234          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2235        }
2236      }
2237    }
2238    long openSeqNum = r.getOpenSeqNum();
2239    if (openSeqNum == HConstants.NO_SEQNUM) {
2240      // If we opened a region, we should have read some sequence number from it.
2241      LOG.error(
2242        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2243      openSeqNum = 0;
2244    }
2245
2246    // Notify master
2247    if (
2248      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2249        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))
2250    ) {
2251      throw new IOException(
2252        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2253    }
2254
2255    triggerFlushInPrimaryRegion(r);
2256
2257    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2258  }
2259
2260  /**
2261   * Helper method for use in tests. Skip the region transition report when there's no master around
2262   * to receive it.
2263   */
2264  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2265    final TransitionCode code = context.getCode();
2266    final long openSeqNum = context.getOpenSeqNum();
2267    long masterSystemTime = context.getMasterSystemTime();
2268    final RegionInfo[] hris = context.getHris();
2269
2270    if (code == TransitionCode.OPENED) {
2271      Preconditions.checkArgument(hris != null && hris.length == 1);
2272      if (hris[0].isMetaRegion()) {
2273        LOG.warn(
2274          "meta table location is stored in master local store, so we can not skip reporting");
2275        return false;
2276      } else {
2277        try {
2278          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2279            serverName, openSeqNum, masterSystemTime);
2280        } catch (IOException e) {
2281          LOG.info("Failed to update meta", e);
2282          return false;
2283        }
2284      }
2285    }
2286    return true;
2287  }
2288
2289  private ReportRegionStateTransitionRequest
2290    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2291    final TransitionCode code = context.getCode();
2292    final long openSeqNum = context.getOpenSeqNum();
2293    final RegionInfo[] hris = context.getHris();
2294    final long[] procIds = context.getProcIds();
2295
2296    ReportRegionStateTransitionRequest.Builder builder =
2297      ReportRegionStateTransitionRequest.newBuilder();
2298    builder.setServer(ProtobufUtil.toServerName(serverName));
2299    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2300    transition.setTransitionCode(code);
2301    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2302      transition.setOpenSeqNum(openSeqNum);
2303    }
2304    for (RegionInfo hri : hris) {
2305      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2306    }
2307    for (long procId : procIds) {
2308      transition.addProcId(procId);
2309    }
2310
2311    return builder.build();
2312  }
2313
2314  @Override
2315  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2316    if (TEST_SKIP_REPORTING_TRANSITION) {
2317      return skipReportingTransition(context);
2318    }
2319    final ReportRegionStateTransitionRequest request =
2320      createReportRegionStateTransitionRequest(context);
2321
2322    int tries = 0;
2323    long pauseTime = this.retryPauseTime;
2324    // Keep looping till we get an error. We want to send reports even though server is going down.
2325    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2326    // HRegionServer does down.
2327    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2328      RegionServerStatusService.BlockingInterface rss = rssStub;
2329      try {
2330        if (rss == null) {
2331          createRegionServerStatusStub();
2332          continue;
2333        }
2334        ReportRegionStateTransitionResponse response =
2335          rss.reportRegionStateTransition(null, request);
2336        if (response.hasErrorMessage()) {
2337          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2338          break;
2339        }
2340        // Log if we had to retry else don't log unless TRACE. We want to
2341        // know if were successful after an attempt showed in logs as failed.
2342        if (tries > 0 || LOG.isTraceEnabled()) {
2343          LOG.info("TRANSITION REPORTED " + request);
2344        }
2345        // NOTE: Return mid-method!!!
2346        return true;
2347      } catch (ServiceException se) {
2348        IOException ioe = ProtobufUtil.getRemoteException(se);
2349        boolean pause = ioe instanceof ServerNotRunningYetException
2350          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2351        if (pause) {
2352          // Do backoff else we flood the Master with requests.
2353          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2354        } else {
2355          pauseTime = this.retryPauseTime; // Reset.
2356        }
2357        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2358          + tries + ")"
2359          + (pause
2360            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2361            : " immediately."),
2362          ioe);
2363        if (pause) {
2364          Threads.sleep(pauseTime);
2365        }
2366        tries++;
2367        if (rssStub == rss) {
2368          rssStub = null;
2369        }
2370      }
2371    }
2372    return false;
2373  }
2374
2375  /**
2376   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2377   * block this thread. See RegionReplicaFlushHandler for details.
2378   */
2379  private void triggerFlushInPrimaryRegion(final HRegion region) {
2380    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2381      return;
2382    }
2383    TableName tn = region.getTableDescriptor().getTableName();
2384    if (
2385      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2386        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2387        // If the memstore replication not setup, we do not have to wait for observing a flush event
2388        // from primary before starting to serve reads, because gaps from replication is not
2389        // applicable,this logic is from
2390        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2391        // HBASE-13063
2392        !region.getTableDescriptor().hasRegionMemStoreReplication()
2393    ) {
2394      region.setReadsEnabled(true);
2395      return;
2396    }
2397
2398    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2399    // RegionReplicaFlushHandler might reset this.
2400
2401    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2402    if (this.executorService != null) {
2403      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2404    } else {
2405      LOG.info("Executor is null; not running flush of primary region replica for {}",
2406        region.getRegionInfo());
2407    }
2408  }
2409
2410  @InterfaceAudience.Private
2411  public RSRpcServices getRSRpcServices() {
2412    return rpcServices;
2413  }
2414
2415  /**
2416   * Cause the server to exit without closing the regions it is serving, the log it is using and
2417   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2418   * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused
2419   * the abort, or null
2420   */
2421  @Override
2422  public void abort(String reason, Throwable cause) {
2423    if (!setAbortRequested()) {
2424      // Abort already in progress, ignore the new request.
2425      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2426      return;
2427    }
2428    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2429    if (cause != null) {
2430      LOG.error(HBaseMarkers.FATAL, msg, cause);
2431    } else {
2432      LOG.error(HBaseMarkers.FATAL, msg);
2433    }
2434    // HBASE-4014: show list of coprocessors that were loaded to help debug
2435    // regionserver crashes.Note that we're implicitly using
2436    // java.util.HashSet's toString() method to print the coprocessor names.
2437    LOG.error(HBaseMarkers.FATAL,
2438      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2439    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2440    try {
2441      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2442    } catch (MalformedObjectNameException | IOException e) {
2443      LOG.warn("Failed dumping metrics", e);
2444    }
2445
2446    // Do our best to report our abort to the master, but this may not work
2447    try {
2448      if (cause != null) {
2449        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2450      }
2451      // Report to the master but only if we have already registered with the master.
2452      RegionServerStatusService.BlockingInterface rss = rssStub;
2453      if (rss != null && this.serverName != null) {
2454        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2455        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2456        builder.setErrorMessage(msg);
2457        rss.reportRSFatalError(null, builder.build());
2458      }
2459    } catch (Throwable t) {
2460      LOG.warn("Unable to report fatal error to master", t);
2461    }
2462
2463    scheduleAbortTimer();
2464    // shutdown should be run as the internal user
2465    stop(reason, true, null);
2466  }
2467
2468  /*
2469   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2470   * close socket in case want to bring up server on old hostname+port immediately.
2471   */
2472  @InterfaceAudience.Private
2473  protected void kill() {
2474    this.killed = true;
2475    abort("Simulated kill");
2476  }
2477
2478  // Limits the time spent in the shutdown process.
2479  private void scheduleAbortTimer() {
2480    if (this.abortMonitor == null) {
2481      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2482      TimerTask abortTimeoutTask = null;
2483      try {
2484        Constructor<? extends TimerTask> timerTaskCtor =
2485          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2486            .asSubclass(TimerTask.class).getDeclaredConstructor();
2487        timerTaskCtor.setAccessible(true);
2488        abortTimeoutTask = timerTaskCtor.newInstance();
2489      } catch (Exception e) {
2490        LOG.warn("Initialize abort timeout task failed", e);
2491      }
2492      if (abortTimeoutTask != null) {
2493        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2494      }
2495    }
2496  }
2497
2498  /**
2499   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2500   * called.
2501   */
2502  protected void stopServiceThreads() {
2503    // clean up the scheduled chores
2504    stopChoreService();
2505    if (bootstrapNodeManager != null) {
2506      bootstrapNodeManager.stop();
2507    }
2508    if (this.cacheFlusher != null) {
2509      this.cacheFlusher.shutdown();
2510    }
2511    if (this.walRoller != null) {
2512      this.walRoller.close();
2513    }
2514    if (this.compactSplitThread != null) {
2515      this.compactSplitThread.join();
2516    }
2517    stopExecutorService();
2518    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2519      this.replicationSourceHandler.stopReplicationService();
2520    } else {
2521      if (this.replicationSourceHandler != null) {
2522        this.replicationSourceHandler.stopReplicationService();
2523      }
2524      if (this.replicationSinkHandler != null) {
2525        this.replicationSinkHandler.stopReplicationService();
2526      }
2527    }
2528  }
2529
2530  /** Returns Return the object that implements the replication source executorService. */
2531  @Override
2532  public ReplicationSourceService getReplicationSourceService() {
2533    return replicationSourceHandler;
2534  }
2535
2536  /** Returns Return the object that implements the replication sink executorService. */
2537  public ReplicationSinkService getReplicationSinkService() {
2538    return replicationSinkHandler;
2539  }
2540
2541  /**
2542   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2543   * connection, the current rssStub must be null. Method will block until a master is available.
2544   * You can break from this block by requesting the server stop.
2545   * @return master + port, or null if server has been stopped
2546   */
2547  private synchronized ServerName createRegionServerStatusStub() {
2548    // Create RS stub without refreshing the master node from ZK, use cached data
2549    return createRegionServerStatusStub(false);
2550  }
2551
2552  /**
2553   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2554   * connection, the current rssStub must be null. Method will block until a master is available.
2555   * You can break from this block by requesting the server stop.
2556   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2557   * @return master + port, or null if server has been stopped
2558   */
2559  @InterfaceAudience.Private
2560  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2561    if (rssStub != null) {
2562      return masterAddressTracker.getMasterAddress();
2563    }
2564    ServerName sn = null;
2565    long previousLogTime = 0;
2566    RegionServerStatusService.BlockingInterface intRssStub = null;
2567    LockService.BlockingInterface intLockStub = null;
2568    boolean interrupted = false;
2569    try {
2570      while (keepLooping()) {
2571        sn = this.masterAddressTracker.getMasterAddress(refresh);
2572        if (sn == null) {
2573          if (!keepLooping()) {
2574            // give up with no connection.
2575            LOG.debug("No master found and cluster is stopped; bailing out");
2576            return null;
2577          }
2578          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2579            LOG.debug("No master found; retry");
2580            previousLogTime = EnvironmentEdgeManager.currentTime();
2581          }
2582          refresh = true; // let's try pull it from ZK directly
2583          if (sleepInterrupted(200)) {
2584            interrupted = true;
2585          }
2586          continue;
2587        }
2588        try {
2589          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
2590            userProvider.getCurrent(), shortOperationTimeout);
2591          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2592          intLockStub = LockService.newBlockingStub(channel);
2593          break;
2594        } catch (IOException e) {
2595          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2596            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
2597            if (e instanceof ServerNotRunningYetException) {
2598              LOG.info("Master isn't available yet, retrying");
2599            } else {
2600              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2601            }
2602            previousLogTime = EnvironmentEdgeManager.currentTime();
2603          }
2604          if (sleepInterrupted(200)) {
2605            interrupted = true;
2606          }
2607        }
2608      }
2609    } finally {
2610      if (interrupted) {
2611        Thread.currentThread().interrupt();
2612      }
2613    }
2614    this.rssStub = intRssStub;
2615    this.lockStub = intLockStub;
2616    return sn;
2617  }
2618
2619  /**
2620   * @return True if we should break loop because cluster is going down or this server has been
2621   *         stopped or hdfs has gone bad.
2622   */
2623  private boolean keepLooping() {
2624    return !this.stopped && isClusterUp();
2625  }
2626
2627  /*
2628   * Let the master know we're here Run initialization using parameters passed us by the master.
2629   * @return A Map of key/value configurations we got from the Master else null if we failed to
2630   * register.
2631   */
2632  private RegionServerStartupResponse reportForDuty() throws IOException {
2633    if (this.masterless) {
2634      return RegionServerStartupResponse.getDefaultInstance();
2635    }
2636    ServerName masterServerName = createRegionServerStatusStub(true);
2637    RegionServerStatusService.BlockingInterface rss = rssStub;
2638    if (masterServerName == null || rss == null) {
2639      return null;
2640    }
2641    RegionServerStartupResponse result = null;
2642    try {
2643      rpcServices.requestCount.reset();
2644      rpcServices.rpcGetRequestCount.reset();
2645      rpcServices.rpcScanRequestCount.reset();
2646      rpcServices.rpcFullScanRequestCount.reset();
2647      rpcServices.rpcMultiRequestCount.reset();
2648      rpcServices.rpcMutateRequestCount.reset();
2649      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2650        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2651      long now = EnvironmentEdgeManager.currentTime();
2652      int port = rpcServices.getSocketAddress().getPort();
2653      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2654      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2655        request.setUseThisHostnameInstead(useThisHostnameInstead);
2656      }
2657      request.setPort(port);
2658      request.setServerStartCode(this.startcode);
2659      request.setServerCurrentTime(now);
2660      result = rss.regionServerStartup(null, request.build());
2661    } catch (ServiceException se) {
2662      IOException ioe = ProtobufUtil.getRemoteException(se);
2663      if (ioe instanceof ClockOutOfSyncException) {
2664        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
2665        // Re-throw IOE will cause RS to abort
2666        throw ioe;
2667      } else if (ioe instanceof ServerNotRunningYetException) {
2668        LOG.debug("Master is not running yet");
2669      } else {
2670        LOG.warn("error telling master we are up", se);
2671      }
2672      rssStub = null;
2673    }
2674    return result;
2675  }
2676
2677  @Override
2678  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2679    try {
2680      GetLastFlushedSequenceIdRequest req =
2681        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2682      RegionServerStatusService.BlockingInterface rss = rssStub;
2683      if (rss == null) { // Try to connect one more time
2684        createRegionServerStatusStub();
2685        rss = rssStub;
2686        if (rss == null) {
2687          // Still no luck, we tried
2688          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2689          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2690            .build();
2691        }
2692      }
2693      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2694      return RegionStoreSequenceIds.newBuilder()
2695        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2696        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2697    } catch (ServiceException e) {
2698      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2699      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2700        .build();
2701    }
2702  }
2703
2704  /**
2705   * Close meta region if we carry it
2706   * @param abort Whether we're running an abort.
2707   */
2708  private void closeMetaTableRegions(final boolean abort) {
2709    HRegion meta = null;
2710    this.onlineRegionsLock.writeLock().lock();
2711    try {
2712      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
2713        RegionInfo hri = e.getValue().getRegionInfo();
2714        if (hri.isMetaRegion()) {
2715          meta = e.getValue();
2716        }
2717        if (meta != null) {
2718          break;
2719        }
2720      }
2721    } finally {
2722      this.onlineRegionsLock.writeLock().unlock();
2723    }
2724    if (meta != null) {
2725      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2726    }
2727  }
2728
2729  /**
2730   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
2731   * close regions that are already closed or that are closing.
2732   * @param abort Whether we're running an abort.
2733   */
2734  private void closeUserRegions(final boolean abort) {
2735    this.onlineRegionsLock.writeLock().lock();
2736    try {
2737      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
2738        HRegion r = e.getValue();
2739        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2740          // Don't update zk with this close transition; pass false.
2741          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2742        }
2743      }
2744    } finally {
2745      this.onlineRegionsLock.writeLock().unlock();
2746    }
2747  }
2748
2749  protected Map<String, HRegion> getOnlineRegions() {
2750    return this.onlineRegions;
2751  }
2752
2753  public int getNumberOfOnlineRegions() {
2754    return this.onlineRegions.size();
2755  }
2756
2757  /**
2758   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
2759   * as client; HRegion cannot be serialized to cross an rpc.
2760   */
2761  public Collection<HRegion> getOnlineRegionsLocalContext() {
2762    Collection<HRegion> regions = this.onlineRegions.values();
2763    return Collections.unmodifiableCollection(regions);
2764  }
2765
2766  @Override
2767  public void addRegion(HRegion region) {
2768    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2769    configurationManager.registerObserver(region);
2770  }
2771
2772  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2773    long size) {
2774    if (!sortedRegions.containsKey(size)) {
2775      sortedRegions.put(size, new ArrayList<>());
2776    }
2777    sortedRegions.get(size).add(region);
2778  }
2779
2780  /**
2781   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2782   *         the biggest.
2783   */
2784  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2785    // we'll sort the regions in reverse
2786    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2787    // Copy over all regions. Regions are sorted by size with biggest first.
2788    for (HRegion region : this.onlineRegions.values()) {
2789      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2790    }
2791    return sortedRegions;
2792  }
2793
2794  /**
2795   * @return A new Map of online regions sorted by region heap size with the first entry being the
2796   *         biggest.
2797   */
2798  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2799    // we'll sort the regions in reverse
2800    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2801    // Copy over all regions. Regions are sorted by size with biggest first.
2802    for (HRegion region : this.onlineRegions.values()) {
2803      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2804    }
2805    return sortedRegions;
2806  }
2807
2808  /** Returns reference to FlushRequester */
2809  @Override
2810  public FlushRequester getFlushRequester() {
2811    return this.cacheFlusher;
2812  }
2813
2814  @Override
2815  public CompactionRequester getCompactionRequestor() {
2816    return this.compactSplitThread;
2817  }
2818
2819  @Override
2820  public LeaseManager getLeaseManager() {
2821    return leaseManager;
2822  }
2823
2824  /** Returns {@code true} when the data file system is available, {@code false} otherwise. */
2825  boolean isDataFileSystemOk() {
2826    return this.dataFsOk;
2827  }
2828
2829  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
2830    return this.rsHost;
2831  }
2832
2833  @Override
2834  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2835    return this.regionsInTransitionInRS;
2836  }
2837
2838  @Override
2839  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2840    return rsQuotaManager;
2841  }
2842
2843  //
2844  // Main program and support routines
2845  //
2846  /**
2847   * Load the replication executorService objects, if any
2848   */
2849  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2850    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2851    // read in the name of the source replication class from the config file.
2852    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2853      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2854
2855    // read in the name of the sink replication class from the config file.
2856    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2857      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2858
2859    // If both the sink and the source class names are the same, then instantiate
2860    // only one object.
2861    if (sourceClassname.equals(sinkClassname)) {
2862      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2863        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2864      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2865      server.sameReplicationSourceAndSink = true;
2866    } else {
2867      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2868        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2869      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2870        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2871      server.sameReplicationSourceAndSink = false;
2872    }
2873  }
2874
2875  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2876    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2877    Path oldLogDir, WALFactory walFactory) throws IOException {
2878    final Class<? extends T> clazz;
2879    try {
2880      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2881      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2882    } catch (java.lang.ClassNotFoundException nfe) {
2883      throw new IOException("Could not find class for " + classname);
2884    }
2885    T service = ReflectionUtils.newInstance(clazz, conf);
2886    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2887    return service;
2888  }
2889
2890  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
2891    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2892    if (!this.isOnline()) {
2893      return walGroupsReplicationStatus;
2894    }
2895    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2896    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2897    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2898    for (ReplicationSourceInterface source : allSources) {
2899      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2900    }
2901    return walGroupsReplicationStatus;
2902  }
2903
2904  /**
2905   * Utility for constructing an instance of the passed HRegionServer class.
2906   */
2907  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
2908    final Configuration conf) {
2909    try {
2910      Constructor<? extends HRegionServer> c =
2911        regionServerClass.getConstructor(Configuration.class);
2912      return c.newInstance(conf);
2913    } catch (Exception e) {
2914      throw new RuntimeException(
2915        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
2916    }
2917  }
2918
2919  /**
2920   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2921   */
2922  public static void main(String[] args) {
2923    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2924    VersionInfo.logVersion();
2925    Configuration conf = HBaseConfiguration.create();
2926    @SuppressWarnings("unchecked")
2927    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2928      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2929
2930    new HRegionServerCommandLine(regionServerClass).doMain(args);
2931  }
2932
2933  /**
2934   * Gets the online regions of the specified table. This method looks at the in-memory
2935   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
2936   * If a region on this table has been closed during a disable, etc., it will not be included in
2937   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
2938   * all the ONLINE regions in the table.
2939   * @param tableName table to limit the scope of the query
2940   * @return Online regions from <code>tableName</code>
2941   */
2942  @Override
2943  public List<HRegion> getRegions(TableName tableName) {
2944    List<HRegion> tableRegions = new ArrayList<>();
2945    synchronized (this.onlineRegions) {
2946      for (HRegion region : this.onlineRegions.values()) {
2947        RegionInfo regionInfo = region.getRegionInfo();
2948        if (regionInfo.getTable().equals(tableName)) {
2949          tableRegions.add(region);
2950        }
2951      }
2952    }
2953    return tableRegions;
2954  }
2955
2956  @Override
2957  public List<HRegion> getRegions() {
2958    List<HRegion> allRegions;
2959    synchronized (this.onlineRegions) {
2960      // Return a clone copy of the onlineRegions
2961      allRegions = new ArrayList<>(onlineRegions.values());
2962    }
2963    return allRegions;
2964  }
2965
2966  /**
2967   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
2968   * @return all the online tables in this RS
2969   */
2970  public Set<TableName> getOnlineTables() {
2971    Set<TableName> tables = new HashSet<>();
2972    synchronized (this.onlineRegions) {
2973      for (Region region : this.onlineRegions.values()) {
2974        tables.add(region.getTableDescriptor().getTableName());
2975      }
2976    }
2977    return tables;
2978  }
2979
2980  public String[] getRegionServerCoprocessors() {
2981    TreeSet<String> coprocessors = new TreeSet<>();
2982    try {
2983      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2984    } catch (IOException exception) {
2985      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
2986        + "skipping.");
2987      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2988    }
2989    Collection<HRegion> regions = getOnlineRegionsLocalContext();
2990    for (HRegion region : regions) {
2991      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2992      try {
2993        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2994      } catch (IOException exception) {
2995        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
2996          + "; skipping.");
2997        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2998      }
2999    }
3000    coprocessors.addAll(rsHost.getCoprocessors());
3001    return coprocessors.toArray(new String[0]);
3002  }
3003
3004  /**
3005   * Try to close the region, logs a warning on failure but continues.
3006   * @param region Region to close
3007   */
3008  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3009    try {
3010      if (!closeRegion(region.getEncodedName(), abort, null)) {
3011        LOG
3012          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
3013      }
3014    } catch (IOException e) {
3015      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
3016        e);
3017    }
3018  }
3019
3020  /**
3021   * Close asynchronously a region, can be called from the master or internally by the regionserver
3022   * when stopping. If called from the master, the region will update the status.
3023   * <p>
3024   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3025   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3026   * </p>
3027   * <p>
3028   * If a close was in progress, this new request will be ignored, and an exception thrown.
3029   * </p>
3030   * <p>
3031   * Provides additional flag to indicate if this region blocks should be evicted from the cache.
3032   * </p>
3033   * @param encodedName Region to close
3034   * @param abort       True if we are aborting
3035   * @param destination Where the Region is being moved too... maybe null if unknown.
3036   * @return True if closed a region.
3037   * @throws NotServingRegionException if the region is not online
3038   */
3039  protected boolean closeRegion(String encodedName, final boolean abort,
3040    final ServerName destination) throws NotServingRegionException {
3041    // Check for permissions to close.
3042    HRegion actualRegion = this.getRegion(encodedName);
3043    // Can be null if we're calling close on a region that's not online
3044    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3045      try {
3046        actualRegion.getCoprocessorHost().preClose(false);
3047      } catch (IOException exp) {
3048        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3049        return false;
3050      }
3051    }
3052
3053    // previous can come back 'null' if not in map.
3054    final Boolean previous =
3055      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
3056
3057    if (Boolean.TRUE.equals(previous)) {
3058      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
3059        + "trying to OPEN. Cancelling OPENING.");
3060      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3061        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3062        // We're going to try to do a standard close then.
3063        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
3064          + " Doing a standard close now");
3065        return closeRegion(encodedName, abort, destination);
3066      }
3067      // Let's get the region from the online region list again
3068      actualRegion = this.getRegion(encodedName);
3069      if (actualRegion == null) { // If already online, we still need to close it.
3070        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3071        // The master deletes the znode when it receives this exception.
3072        throw new NotServingRegionException(
3073          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
3074      }
3075    } else if (previous == null) {
3076      LOG.info("Received CLOSE for {}", encodedName);
3077    } else if (Boolean.FALSE.equals(previous)) {
3078      LOG.info("Received CLOSE for the region: " + encodedName
3079        + ", which we are already trying to CLOSE, but not completed yet");
3080      return true;
3081    }
3082
3083    if (actualRegion == null) {
3084      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3085      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3086      // The master deletes the znode when it receives this exception.
3087      throw new NotServingRegionException(
3088        "The region " + encodedName + " is not online, and is not opening.");
3089    }
3090
3091    CloseRegionHandler crh;
3092    final RegionInfo hri = actualRegion.getRegionInfo();
3093    if (hri.isMetaRegion()) {
3094      crh = new CloseMetaHandler(this, this, hri, abort);
3095    } else {
3096      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3097    }
3098    this.executorService.submit(crh);
3099    return true;
3100  }
3101
3102  /**
3103   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
3104   *         member of the online regions.
3105   */
3106  public HRegion getOnlineRegion(final byte[] regionName) {
3107    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3108    return this.onlineRegions.get(encodedRegionName);
3109  }
3110
3111  @Override
3112  public HRegion getRegion(final String encodedRegionName) {
3113    return this.onlineRegions.get(encodedRegionName);
3114  }
3115
3116  @Override
3117  public boolean removeRegion(final HRegion r, ServerName destination) {
3118    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3119    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3120    if (destination != null) {
3121      long closeSeqNum = r.getMaxFlushedSeqId();
3122      if (closeSeqNum == HConstants.NO_SEQNUM) {
3123        // No edits in WAL for this region; get the sequence number when the region was opened.
3124        closeSeqNum = r.getOpenSeqNum();
3125        if (closeSeqNum == HConstants.NO_SEQNUM) {
3126          closeSeqNum = 0;
3127        }
3128      }
3129      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3130      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3131      if (selfMove) {
3132        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3133          r.getRegionInfo().getEncodedName(),
3134          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3135      }
3136    }
3137    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3138    configurationManager.deregisterObserver(r);
3139    return toReturn != null;
3140  }
3141
3142  /**
3143   * Protected Utility method for safely obtaining an HRegion handle.
3144   * @param regionName Name of online {@link HRegion} to return
3145   * @return {@link HRegion} for <code>regionName</code>
3146   */
3147  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3148    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3149    return getRegionByEncodedName(regionName, encodedRegionName);
3150  }
3151
3152  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3153    return getRegionByEncodedName(null, encodedRegionName);
3154  }
3155
3156  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3157    throws NotServingRegionException {
3158    HRegion region = this.onlineRegions.get(encodedRegionName);
3159    if (region == null) {
3160      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3161      if (moveInfo != null) {
3162        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3163      }
3164      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3165      String regionNameStr =
3166        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3167      if (isOpening != null && isOpening) {
3168        throw new RegionOpeningException(
3169          "Region " + regionNameStr + " is opening on " + this.serverName);
3170      }
3171      throw new NotServingRegionException(
3172        "" + regionNameStr + " is not online on " + this.serverName);
3173    }
3174    return region;
3175  }
3176
3177  /**
3178   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3179   * already.
3180   * @param t   Throwable
3181   * @param msg Message to log in error. Can be null.
3182   * @return Throwable converted to an IOE; methods can only let out IOEs.
3183   */
3184  private Throwable cleanup(final Throwable t, final String msg) {
3185    // Don't log as error if NSRE; NSRE is 'normal' operation.
3186    if (t instanceof NotServingRegionException) {
3187      LOG.debug("NotServingRegionException; " + t.getMessage());
3188      return t;
3189    }
3190    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3191    if (msg == null) {
3192      LOG.error("", e);
3193    } else {
3194      LOG.error(msg, e);
3195    }
3196    if (!rpcServices.checkOOME(t)) {
3197      checkFileSystem();
3198    }
3199    return t;
3200  }
3201
3202  /**
3203   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3204   * @return Make <code>t</code> an IOE if it isn't already.
3205   */
3206  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3207    return (t instanceof IOException ? (IOException) t
3208      : msg == null || msg.length() == 0 ? new IOException(t)
3209      : new IOException(msg, t));
3210  }
3211
3212  /**
3213   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3214   * stopRequested
3215   * @return false if file system is not available
3216   */
3217  boolean checkFileSystem() {
3218    if (this.dataFsOk && this.dataFs != null) {
3219      try {
3220        FSUtils.checkFileSystemAvailable(this.dataFs);
3221      } catch (IOException e) {
3222        abort("File System not available", e);
3223        this.dataFsOk = false;
3224      }
3225    }
3226    return this.dataFsOk;
3227  }
3228
3229  @Override
3230  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3231    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3232    Address[] addr = new Address[favoredNodes.size()];
3233    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3234    // it is a map of region name to Address[]
3235    for (int i = 0; i < favoredNodes.size(); i++) {
3236      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3237    }
3238    regionFavoredNodesMap.put(encodedRegionName, addr);
3239  }
3240
3241  /**
3242   * Return the favored nodes for a region given its encoded name. Look at the comment around
3243   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3244   * @param encodedRegionName the encoded region name.
3245   * @return array of favored locations
3246   */
3247  @Override
3248  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3249    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3250  }
3251
3252  @Override
3253  public ServerNonceManager getNonceManager() {
3254    return this.nonceManager;
3255  }
3256
3257  private static class MovedRegionInfo {
3258    private final ServerName serverName;
3259    private final long seqNum;
3260
3261    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3262      this.serverName = serverName;
3263      this.seqNum = closeSeqNum;
3264    }
3265
3266    public ServerName getServerName() {
3267      return serverName;
3268    }
3269
3270    public long getSeqNum() {
3271      return seqNum;
3272    }
3273  }
3274
3275  /**
3276   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3277   * number of network calls instead of reducing them.
3278   */
3279  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3280
3281  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3282    boolean selfMove) {
3283    if (selfMove) {
3284      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3285      return;
3286    }
3287    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3288      + closeSeqNum);
3289    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3290  }
3291
3292  void removeFromMovedRegions(String encodedName) {
3293    movedRegionInfoCache.invalidate(encodedName);
3294  }
3295
3296  @InterfaceAudience.Private
3297  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3298    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3299  }
3300
3301  @InterfaceAudience.Private
3302  public int movedRegionCacheExpiredTime() {
3303    return TIMEOUT_REGION_MOVED;
3304  }
3305
3306  private String getMyEphemeralNodePath() {
3307    return zooKeeper.getZNodePaths().getRsPath(serverName);
3308  }
3309
3310  private boolean isHealthCheckerConfigured() {
3311    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3312    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3313  }
3314
3315  /** Returns the underlying {@link CompactSplit} for the servers */
3316  public CompactSplit getCompactSplitThread() {
3317    return this.compactSplitThread;
3318  }
3319
3320  CoprocessorServiceResponse execRegionServerService(
3321    @SuppressWarnings("UnusedParameters") final RpcController controller,
3322    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3323    try {
3324      ServerRpcController serviceController = new ServerRpcController();
3325      CoprocessorServiceCall call = serviceRequest.getCall();
3326      String serviceName = call.getServiceName();
3327      Service service = coprocessorServiceHandlers.get(serviceName);
3328      if (service == null) {
3329        throw new UnknownProtocolException(null,
3330          "No registered coprocessor executorService found for " + serviceName);
3331      }
3332      ServiceDescriptor serviceDesc = service.getDescriptorForType();
3333
3334      String methodName = call.getMethodName();
3335      MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3336      if (methodDesc == null) {
3337        throw new UnknownProtocolException(service.getClass(),
3338          "Unknown method " + methodName + " called on executorService " + serviceName);
3339      }
3340
3341      Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3342      final Message.Builder responseBuilder =
3343        service.getResponsePrototype(methodDesc).newBuilderForType();
3344      service.callMethod(methodDesc, serviceController, request, message -> {
3345        if (message != null) {
3346          responseBuilder.mergeFrom(message);
3347        }
3348      });
3349      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3350      if (exception != null) {
3351        throw exception;
3352      }
3353      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3354    } catch (IOException ie) {
3355      throw new ServiceException(ie);
3356    }
3357  }
3358
3359  /**
3360   * May be null if this is a master which not carry table.
3361   * @return The block cache instance used by the regionserver.
3362   */
3363  @Override
3364  public Optional<BlockCache> getBlockCache() {
3365    return Optional.ofNullable(this.blockCache);
3366  }
3367
3368  /**
3369   * May be null if this is a master which not carry table.
3370   * @return The cache for mob files used by the regionserver.
3371   */
3372  @Override
3373  public Optional<MobFileCache> getMobFileCache() {
3374    return Optional.ofNullable(this.mobFileCache);
3375  }
3376
3377  CacheEvictionStats clearRegionBlockCache(Region region) {
3378    long evictedBlocks = 0;
3379
3380    for (Store store : region.getStores()) {
3381      for (StoreFile hFile : store.getStorefiles()) {
3382        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3383      }
3384    }
3385
3386    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3387  }
3388
3389  @Override
3390  public double getCompactionPressure() {
3391    double max = 0;
3392    for (Region region : onlineRegions.values()) {
3393      for (Store store : region.getStores()) {
3394        double normCount = store.getCompactionPressure();
3395        if (normCount > max) {
3396          max = normCount;
3397        }
3398      }
3399    }
3400    return max;
3401  }
3402
3403  @Override
3404  public HeapMemoryManager getHeapMemoryManager() {
3405    return hMemManager;
3406  }
3407
3408  public MemStoreFlusher getMemStoreFlusher() {
3409    return cacheFlusher;
3410  }
3411
3412  /**
3413   * For testing
3414   * @return whether all wal roll request finished for this regionserver
3415   */
3416  @InterfaceAudience.Private
3417  public boolean walRollRequestFinished() {
3418    return this.walRoller.walRollFinished();
3419  }
3420
3421  @Override
3422  public ThroughputController getFlushThroughputController() {
3423    return flushThroughputController;
3424  }
3425
3426  @Override
3427  public double getFlushPressure() {
3428    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3429      // return 0 during RS initialization
3430      return 0.0;
3431    }
3432    return getRegionServerAccounting().getFlushPressure();
3433  }
3434
3435  @Override
3436  public void onConfigurationChange(Configuration newConf) {
3437    ThroughputController old = this.flushThroughputController;
3438    if (old != null) {
3439      old.stop("configuration change");
3440    }
3441    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3442    try {
3443      Superusers.initialize(newConf);
3444    } catch (IOException e) {
3445      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3446    }
3447
3448    // update region server coprocessor if the configuration has changed.
3449    if (
3450      CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
3451        CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)
3452    ) {
3453      LOG.info("Update region server coprocessors because the configuration has changed");
3454      this.rsHost = new RegionServerCoprocessorHost(this, newConf);
3455    }
3456  }
3457
3458  @Override
3459  public MetricsRegionServer getMetrics() {
3460    return metricsRegionServer;
3461  }
3462
3463  @Override
3464  public SecureBulkLoadManager getSecureBulkLoadManager() {
3465    return this.secureBulkLoadManager;
3466  }
3467
3468  @Override
3469  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3470    final Abortable abort) {
3471    final LockServiceClient client =
3472      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3473    return client.regionLock(regionInfo, description, abort);
3474  }
3475
3476  @Override
3477  public void unassign(byte[] regionName) throws IOException {
3478    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3479  }
3480
3481  @Override
3482  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3483    return this.rsSpaceQuotaManager;
3484  }
3485
3486  @Override
3487  public boolean reportFileArchivalForQuotas(TableName tableName,
3488    Collection<Entry<String, Long>> archivedFiles) {
3489    if (TEST_SKIP_REPORTING_TRANSITION) {
3490      return false;
3491    }
3492    RegionServerStatusService.BlockingInterface rss = rssStub;
3493    if (rss == null || rsSpaceQuotaManager == null) {
3494      // the current server could be stopping.
3495      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3496      return false;
3497    }
3498    try {
3499      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3500        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3501      rss.reportFileArchival(null, request);
3502    } catch (ServiceException se) {
3503      IOException ioe = ProtobufUtil.getRemoteException(se);
3504      if (ioe instanceof PleaseHoldException) {
3505        if (LOG.isTraceEnabled()) {
3506          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3507            + " This will be retried.", ioe);
3508        }
3509        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3510        return false;
3511      }
3512      if (rssStub == rss) {
3513        rssStub = null;
3514      }
3515      // re-create the stub if we failed to report the archival
3516      createRegionServerStatusStub(true);
3517      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3518      return false;
3519    }
3520    return true;
3521  }
3522
3523  void executeProcedure(long procId, RSProcedureCallable callable) {
3524    executorService.submit(new RSProcedureHandler(this, procId, callable));
3525  }
3526
3527  public void remoteProcedureComplete(long procId, Throwable error) {
3528    procedureResultReporter.complete(procId, error);
3529  }
3530
3531  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3532    RegionServerStatusService.BlockingInterface rss;
3533    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3534    for (;;) {
3535      rss = rssStub;
3536      if (rss != null) {
3537        break;
3538      }
3539      createRegionServerStatusStub();
3540    }
3541    try {
3542      rss.reportProcedureDone(null, request);
3543    } catch (ServiceException se) {
3544      if (rssStub == rss) {
3545        rssStub = null;
3546      }
3547      throw ProtobufUtil.getRemoteException(se);
3548    }
3549  }
3550
3551  /**
3552   * Will ignore the open/close region procedures which already submitted or executed. When master
3553   * had unfinished open/close region procedure and restarted, new active master may send duplicate
3554   * open/close region request to regionserver. The open/close request is submitted to a thread pool
3555   * and execute. So first need a cache for submitted open/close region procedures. After the
3556   * open/close region request executed and report region transition succeed, cache it in executed
3557   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3558   * transition succeed, master will not send the open/close region request to regionserver again.
3559   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3560   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
3561   * HBASE-22404 for more details.
3562   * @param procId the id of the open/close region procedure
3563   * @return true if the procedure can be submitted.
3564   */
3565  boolean submitRegionProcedure(long procId) {
3566    if (procId == -1) {
3567      return true;
3568    }
3569    // Ignore the region procedures which already submitted.
3570    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3571    if (previous != null) {
3572      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3573      return false;
3574    }
3575    // Ignore the region procedures which already executed.
3576    if (executedRegionProcedures.getIfPresent(procId) != null) {
3577      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3578      return false;
3579    }
3580    return true;
3581  }
3582
3583  /**
3584   * See {@link #submitRegionProcedure(long)}.
3585   * @param procId the id of the open/close region procedure
3586   */
3587  public void finishRegionProcedure(long procId) {
3588    executedRegionProcedures.put(procId, procId);
3589    submittedRegionProcedures.remove(procId);
3590  }
3591
3592  /**
3593   * Force to terminate region server when abort timeout.
3594   */
3595  private static class SystemExitWhenAbortTimeout extends TimerTask {
3596
3597    public SystemExitWhenAbortTimeout() {
3598    }
3599
3600    @Override
3601    public void run() {
3602      LOG.warn("Aborting region server timed out, terminating forcibly"
3603        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
3604        + " Thread dump to stdout.");
3605      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3606      Runtime.getRuntime().halt(1);
3607    }
3608  }
3609
3610  @InterfaceAudience.Private
3611  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3612    return compactedFileDischarger;
3613  }
3614
3615  /**
3616   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3617   * @return pause time
3618   */
3619  @InterfaceAudience.Private
3620  public long getRetryPauseTime() {
3621    return this.retryPauseTime;
3622  }
3623
3624  @Override
3625  public Optional<ServerName> getActiveMaster() {
3626    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3627  }
3628
3629  @Override
3630  public List<ServerName> getBackupMasters() {
3631    return masterAddressTracker.getBackupMasters();
3632  }
3633
3634  @Override
3635  public Iterator<ServerName> getBootstrapNodes() {
3636    return bootstrapNodeManager.getBootstrapNodes().iterator();
3637  }
3638
3639  @Override
3640  public List<HRegionLocation> getMetaLocations() {
3641    return metaRegionLocationCache.getMetaRegionLocations();
3642  }
3643
3644  @Override
3645  protected NamedQueueRecorder createNamedQueueRecord() {
3646    return NamedQueueRecorder.getInstance(conf);
3647  }
3648
3649  @Override
3650  protected boolean clusterMode() {
3651    // this method will be called in the constructor of super class, so we can not return masterless
3652    // directly here, as it will always be false.
3653    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3654  }
3655
3656  @InterfaceAudience.Private
3657  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
3658    return brokenStoreFileCleaner;
3659  }
3660
3661  @InterfaceAudience.Private
3662  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
3663    return rsMobFileCleanerChore;
3664  }
3665
3666  RSSnapshotVerifier getRsSnapshotVerifier() {
3667    return rsSnapshotVerifier;
3668  }
3669
3670  @Override
3671  protected void stopChores() {
3672    shutdownChore(nonceManagerChore);
3673    shutdownChore(compactionChecker);
3674    shutdownChore(compactedFileDischarger);
3675    shutdownChore(periodicFlusher);
3676    shutdownChore(healthCheckChore);
3677    shutdownChore(executorStatusChore);
3678    shutdownChore(storefileRefresher);
3679    shutdownChore(fsUtilizationChore);
3680    shutdownChore(namedQueueServiceChore);
3681    shutdownChore(brokenStoreFileCleaner);
3682    shutdownChore(rsMobFileCleanerChore);
3683    shutdownChore(replicationMarkerChore);
3684  }
3685
3686  @Override
3687  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3688    return regionReplicationBufferManager;
3689  }
3690}