001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
034
035import io.opentelemetry.api.trace.Span;
036import io.opentelemetry.api.trace.StatusCode;
037import io.opentelemetry.context.Scope;
038import java.io.IOException;
039import java.io.PrintWriter;
040import java.lang.management.MemoryUsage;
041import java.lang.reflect.Constructor;
042import java.net.InetSocketAddress;
043import java.time.Duration;
044import java.util.ArrayList;
045import java.util.Collection;
046import java.util.Collections;
047import java.util.Comparator;
048import java.util.HashSet;
049import java.util.Iterator;
050import java.util.List;
051import java.util.Map;
052import java.util.Map.Entry;
053import java.util.Objects;
054import java.util.Optional;
055import java.util.Set;
056import java.util.SortedMap;
057import java.util.Timer;
058import java.util.TimerTask;
059import java.util.TreeMap;
060import java.util.TreeSet;
061import java.util.concurrent.ConcurrentHashMap;
062import java.util.concurrent.ConcurrentMap;
063import java.util.concurrent.ConcurrentSkipListMap;
064import java.util.concurrent.ThreadLocalRandom;
065import java.util.concurrent.TimeUnit;
066import java.util.concurrent.atomic.AtomicBoolean;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.stream.Collectors;
069import javax.management.MalformedObjectNameException;
070import javax.servlet.http.HttpServlet;
071import org.apache.commons.lang3.StringUtils;
072import org.apache.commons.lang3.mutable.MutableFloat;
073import org.apache.hadoop.conf.Configuration;
074import org.apache.hadoop.fs.FileSystem;
075import org.apache.hadoop.fs.Path;
076import org.apache.hadoop.hbase.Abortable;
077import org.apache.hadoop.hbase.CacheEvictionStats;
078import org.apache.hadoop.hbase.CallQueueTooBigException;
079import org.apache.hadoop.hbase.ClockOutOfSyncException;
080import org.apache.hadoop.hbase.DoNotRetryIOException;
081import org.apache.hadoop.hbase.ExecutorStatusChore;
082import org.apache.hadoop.hbase.HBaseConfiguration;
083import org.apache.hadoop.hbase.HBaseInterfaceAudience;
084import org.apache.hadoop.hbase.HBaseServerBase;
085import org.apache.hadoop.hbase.HConstants;
086import org.apache.hadoop.hbase.HDFSBlocksDistribution;
087import org.apache.hadoop.hbase.HRegionLocation;
088import org.apache.hadoop.hbase.HealthCheckChore;
089import org.apache.hadoop.hbase.MetaTableAccessor;
090import org.apache.hadoop.hbase.NotServingRegionException;
091import org.apache.hadoop.hbase.PleaseHoldException;
092import org.apache.hadoop.hbase.ScheduledChore;
093import org.apache.hadoop.hbase.ServerName;
094import org.apache.hadoop.hbase.Stoppable;
095import org.apache.hadoop.hbase.TableName;
096import org.apache.hadoop.hbase.YouAreDeadException;
097import org.apache.hadoop.hbase.ZNodeClearer;
098import org.apache.hadoop.hbase.client.ConnectionUtils;
099import org.apache.hadoop.hbase.client.RegionInfo;
100import org.apache.hadoop.hbase.client.RegionInfoBuilder;
101import org.apache.hadoop.hbase.client.locking.EntityLock;
102import org.apache.hadoop.hbase.client.locking.LockServiceClient;
103import org.apache.hadoop.hbase.conf.ConfigurationObserver;
104import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
105import org.apache.hadoop.hbase.exceptions.RegionMovedException;
106import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
107import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
108import org.apache.hadoop.hbase.executor.ExecutorType;
109import org.apache.hadoop.hbase.http.InfoServer;
110import org.apache.hadoop.hbase.io.hfile.BlockCache;
111import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
112import org.apache.hadoop.hbase.io.hfile.HFile;
113import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
114import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
115import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
116import org.apache.hadoop.hbase.ipc.RpcClient;
117import org.apache.hadoop.hbase.ipc.RpcServer;
118import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
119import org.apache.hadoop.hbase.ipc.ServerRpcController;
120import org.apache.hadoop.hbase.log.HBaseMarkers;
121import org.apache.hadoop.hbase.mob.MobFileCache;
122import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
123import org.apache.hadoop.hbase.monitoring.TaskMonitor;
124import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
125import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
126import org.apache.hadoop.hbase.net.Address;
127import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
128import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
129import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
130import org.apache.hadoop.hbase.quotas.QuotaUtil;
131import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
132import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
133import org.apache.hadoop.hbase.quotas.RegionSize;
134import org.apache.hadoop.hbase.quotas.RegionSizeStore;
135import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
136import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
137import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
138import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
139import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
140import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
141import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
142import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
143import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
144import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
145import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
146import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
147import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
148import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
149import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
150import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
151import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
152import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
153import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
154import org.apache.hadoop.hbase.security.SecurityConstants;
155import org.apache.hadoop.hbase.security.Superusers;
156import org.apache.hadoop.hbase.security.User;
157import org.apache.hadoop.hbase.security.UserProvider;
158import org.apache.hadoop.hbase.trace.TraceUtil;
159import org.apache.hadoop.hbase.util.Bytes;
160import org.apache.hadoop.hbase.util.CompressionTest;
161import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
162import org.apache.hadoop.hbase.util.DNS;
163import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
164import org.apache.hadoop.hbase.util.FSUtils;
165import org.apache.hadoop.hbase.util.FutureUtils;
166import org.apache.hadoop.hbase.util.JvmPauseMonitor;
167import org.apache.hadoop.hbase.util.Pair;
168import org.apache.hadoop.hbase.util.RetryCounter;
169import org.apache.hadoop.hbase.util.RetryCounterFactory;
170import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
171import org.apache.hadoop.hbase.util.Strings;
172import org.apache.hadoop.hbase.util.Threads;
173import org.apache.hadoop.hbase.util.VersionInfo;
174import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
175import org.apache.hadoop.hbase.wal.WAL;
176import org.apache.hadoop.hbase.wal.WALFactory;
177import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
178import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
179import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
180import org.apache.hadoop.hbase.zookeeper.ZKUtil;
181import org.apache.hadoop.ipc.RemoteException;
182import org.apache.hadoop.util.ReflectionUtils;
183import org.apache.yetus.audience.InterfaceAudience;
184import org.apache.zookeeper.KeeperException;
185import org.slf4j.Logger;
186import org.slf4j.LoggerFactory;
187
188import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
189import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
190import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
191import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
192import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
193import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
194import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
195import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
196import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
197import org.apache.hbase.thirdparty.com.google.protobuf.Message;
198import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
199import org.apache.hbase.thirdparty.com.google.protobuf.Service;
200import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
201import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
202import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
203
204import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
205import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
234
235/**
236 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
237 * are many HRegionServers in a single HBase deployment.
238 */
239@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
240@SuppressWarnings({ "deprecation" })
241public class HRegionServer extends HBaseServerBase<RSRpcServices>
242  implements RegionServerServices, LastSequenceId {
243
244  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
245
246  int unitMB = 1024 * 1024;
247  int unitKB = 1024;
248
249  /**
250   * For testing only! Set to true to skip notifying region assignment to master .
251   */
252  @InterfaceAudience.Private
253  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
254  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
255
256  /**
257   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
258   * region action in progress false - if close region action in progress
259   */
260  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
261    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
262
263  /**
264   * Used to cache the open/close region procedures which already submitted. See
265   * {@link #submitRegionProcedure(long)}.
266   */
267  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
268  /**
269   * Used to cache the open/close region procedures which already executed. See
270   * {@link #submitRegionProcedure(long)}.
271   */
272  private final Cache<Long, Long> executedRegionProcedures =
273    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
274
275  /**
276   * Used to cache the moved-out regions
277   */
278  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
279    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
280
281  private MemStoreFlusher cacheFlusher;
282
283  private HeapMemoryManager hMemManager;
284
285  // Replication services. If no replication, this handler will be null.
286  private ReplicationSourceService replicationSourceHandler;
287  private ReplicationSinkService replicationSinkHandler;
288  private boolean sameReplicationSourceAndSink;
289
290  // Compactions
291  private CompactSplit compactSplitThread;
292
293  /**
294   * Map of regions currently being served by this region server. Key is the encoded region name.
295   * All access should be synchronized.
296   */
297  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
298  /**
299   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
300   * need to be a ConcurrentHashMap?
301   */
302  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
303
304  /**
305   * Map of encoded region names to the DataNode locations they should be hosted on We store the
306   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
307   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
308   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
309   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
310   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
311   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
312   * will be fresh when we need it.
313   */
314  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
315
316  private LeaseManager leaseManager;
317
318  private volatile boolean dataFsOk;
319  private volatile boolean isGlobalReadOnlyEnabled;
320
321  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
322  // Default abort timeout is 1200 seconds for safe
323  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
324  // Will run this task when abort timeout
325  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
326
327  // A state before we go into stopped state. At this stage we're closing user
328  // space regions.
329  private boolean stopping = false;
330  private volatile boolean killed = false;
331
332  private final int threadWakeFrequency;
333
334  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
335  private final int compactionCheckFrequency;
336  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
337  private final int flushCheckFrequency;
338
339  // Stub to do region server status calls against the master.
340  private volatile RegionServerStatusService.BlockingInterface rssStub;
341  private volatile LockService.BlockingInterface lockStub;
342  // RPC client. Used to make the stub above that does region server status checking.
343  private RpcClient rpcClient;
344
345  private UncaughtExceptionHandler uncaughtExceptionHandler;
346
347  private JvmPauseMonitor pauseMonitor;
348
349  private RSSnapshotVerifier rsSnapshotVerifier;
350
351  /** region server process name */
352  public static final String REGIONSERVER = "regionserver";
353
354  private MetricsRegionServer metricsRegionServer;
355  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
356
357  /**
358   * Check for compactions requests.
359   */
360  private ScheduledChore compactionChecker;
361
362  /**
363   * Check for flushes
364   */
365  private ScheduledChore periodicFlusher;
366
367  private volatile WALFactory walFactory;
368
369  private LogRoller walRoller;
370
371  // A thread which calls reportProcedureDone
372  private RemoteProcedureResultReporter procedureResultReporter;
373
374  // flag set after we're done setting up server threads
375  final AtomicBoolean online = new AtomicBoolean(false);
376
377  // master address tracker
378  private final MasterAddressTracker masterAddressTracker;
379
380  // Log Splitting Worker
381  private SplitLogWorker splitLogWorker;
382
383  private final int shortOperationTimeout;
384
385  // Time to pause if master says 'please hold'
386  private final long retryPauseTime;
387
388  private final RegionServerAccounting regionServerAccounting;
389
390  private NamedQueueServiceChore namedQueueServiceChore = null;
391
392  // Block cache
393  private BlockCache blockCache;
394  // The cache for mob files
395  private MobFileCache mobFileCache;
396
397  /** The health check chore. */
398  private HealthCheckChore healthCheckChore;
399
400  /** The Executor status collect chore. */
401  private ExecutorStatusChore executorStatusChore;
402
403  /** The nonce manager chore. */
404  private ScheduledChore nonceManagerChore;
405
406  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
407
408  /**
409   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
410   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
411   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
412   */
413  @Deprecated
414  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
415  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
416    "hbase.regionserver.hostname.disable.master.reversedns";
417
418  /**
419   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
420   * Exception will be thrown if both are used.
421   */
422  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
423  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
424    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
425
426  /**
427   * Unique identifier for the cluster we are a part of.
428   */
429  private String clusterId;
430
431  // chore for refreshing store files for secondary regions
432  private StorefileRefresherChore storefileRefresher;
433
434  private volatile RegionServerCoprocessorHost rsHost;
435
436  private RegionServerProcedureManagerHost rspmHost;
437
438  private RegionServerRpcQuotaManager rsQuotaManager;
439  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
440
441  /**
442   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
443   * case where client doesn't receive the response from a successful operation and retries. We
444   * track the successful ops for some time via a nonce sent by client and handle duplicate
445   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
446   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
447   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
448   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
449   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
450   * recovery during normal region move, so nonces will not be transfered. We can have separate
451   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
452   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
453   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
454   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
455   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
456   * recovered during move.
457   */
458  final ServerNonceManager nonceManager;
459
460  private BrokenStoreFileCleaner brokenStoreFileCleaner;
461
462  private RSMobFileCleanerChore rsMobFileCleanerChore;
463
464  @InterfaceAudience.Private
465  CompactedHFilesDischarger compactedFileDischarger;
466
467  private volatile ThroughputController flushThroughputController;
468
469  private SecureBulkLoadManager secureBulkLoadManager;
470
471  private FileSystemUtilizationChore fsUtilizationChore;
472
473  private BootstrapNodeManager bootstrapNodeManager;
474
475  /**
476   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
477   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
478   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
479   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
480   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
481   */
482  private final boolean masterless;
483  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
484
485  /** regionserver codec list **/
486  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
487
488  // A timer to shutdown the process if abort takes too long
489  private Timer abortMonitor;
490
491  private RegionReplicationBufferManager regionReplicationBufferManager;
492
493  /*
494   * Chore that creates replication marker rows.
495   */
496  private ReplicationMarkerChore replicationMarkerChore;
497
498  // A timer submit requests to the PrefetchExecutor
499  private PrefetchExecutorNotifier prefetchExecutorNotifier;
500
501  /**
502   * Starts a HRegionServer at the default location.
503   * <p/>
504   * Don't start any services or managers in here in the Constructor. Defer till after we register
505   * with the Master as much as possible. See {@link #startServices}.
506   */
507  public HRegionServer(final Configuration conf) throws IOException {
508    super(conf, "RegionServer"); // thread name
509    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
510    try (Scope ignored = span.makeCurrent()) {
511      this.dataFsOk = true;
512      this.masterless = !clusterMode();
513      MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf);
514      HFile.checkHFileVersion(this.conf);
515      checkCodecs(this.conf);
516      FSUtils.setupShortCircuitRead(this.conf);
517
518      // Disable usage of meta replicas in the regionserver
519      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
520      // Config'ed params
521      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
522      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
523      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
524
525      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
526      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
527
528      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
529        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
530
531      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
532        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
533
534      regionServerAccounting = new RegionServerAccounting(conf);
535
536      blockCache = BlockCacheFactory.createBlockCache(conf);
537      // The call below, instantiates the DataTieringManager only when
538      // the configuration "hbase.regionserver.datatiering.enable" is set to true.
539      DataTieringManager.instantiate(conf, onlineRegions);
540
541      mobFileCache = new MobFileCache(conf);
542
543      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
544
545      uncaughtExceptionHandler =
546        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
547
548      // If no master in cluster, skip trying to track one or look for a cluster status.
549      if (!this.masterless) {
550        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
551        masterAddressTracker.start();
552      } else {
553        masterAddressTracker = null;
554      }
555      this.rpcServices.start(zooKeeper);
556      span.setStatus(StatusCode.OK);
557    } catch (Throwable t) {
558      // Make sure we log the exception. HRegionServer is often started via reflection and the
559      // cause of failed startup is lost.
560      TraceUtil.setError(span, t);
561      LOG.error("Failed construction RegionServer", t);
562      throw t;
563    } finally {
564      span.end();
565    }
566  }
567
568  // HMaster should override this method to load the specific config for master
569  @Override
570  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
571    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
572    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
573      if (!StringUtils.isBlank(hostname)) {
574        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
575          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
576          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
577          + UNSAFE_RS_HOSTNAME_KEY + " is used";
578        throw new IOException(msg);
579      } else {
580        return DNS.getHostname(conf, DNS.ServerType.REGIONSERVER);
581      }
582    } else {
583      return hostname;
584    }
585  }
586
587  @Override
588  protected DNS.ServerType getDNSServerType() {
589    return DNS.ServerType.REGIONSERVER;
590  }
591
592  @Override
593  protected void login(UserProvider user, String host) throws IOException {
594    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
595      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
596  }
597
598  @Override
599  protected String getProcessName() {
600    return REGIONSERVER;
601  }
602
603  @Override
604  protected RegionServerCoprocessorHost getCoprocessorHost() {
605    return getRegionServerCoprocessorHost();
606  }
607
608  @Override
609  protected boolean canCreateBaseZNode() {
610    return !clusterMode();
611  }
612
613  @Override
614  protected boolean canUpdateTableDescriptor() {
615    return false;
616  }
617
618  @Override
619  protected boolean cacheTableDescriptor() {
620    return false;
621  }
622
623  protected RSRpcServices createRpcServices() throws IOException {
624    return new RSRpcServices(this);
625  }
626
627  @Override
628  protected void configureInfoServer(InfoServer infoServer) {
629    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
630    infoServer.setAttribute(REGIONSERVER, this);
631  }
632
633  @Override
634  protected Class<? extends HttpServlet> getDumpServlet() {
635    return RSDumpServlet.class;
636  }
637
638  /**
639   * Used by {@link RSDumpServlet} to generate debugging information.
640   */
641  public void dumpRowLocks(final PrintWriter out) {
642    StringBuilder sb = new StringBuilder();
643    for (HRegion region : getRegions()) {
644      if (region.getLockedRows().size() > 0) {
645        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
646          sb.setLength(0);
647          sb.append(region.getTableDescriptor().getTableName()).append(",")
648            .append(region.getRegionInfo().getEncodedName()).append(",");
649          sb.append(rowLockContext.toString());
650          out.println(sb);
651        }
652      }
653    }
654  }
655
656  @Override
657  public boolean registerService(Service instance) {
658    // No stacking of instances is allowed for a single executorService name
659    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
660    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
661    if (coprocessorServiceHandlers.containsKey(serviceName)) {
662      LOG.error("Coprocessor executorService " + serviceName
663        + " already registered, rejecting request from " + instance);
664      return false;
665    }
666
667    coprocessorServiceHandlers.put(serviceName, instance);
668    if (LOG.isDebugEnabled()) {
669      LOG.debug(
670        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
671    }
672    return true;
673  }
674
675  /**
676   * Run test on configured codecs to make sure supporting libs are in place.
677   */
678  private static void checkCodecs(final Configuration c) throws IOException {
679    // check to see if the codec list is available:
680    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
681    if (codecs == null) {
682      return;
683    }
684    for (String codec : codecs) {
685      if (!CompressionTest.testCompression(codec)) {
686        throw new IOException(
687          "Compression codec " + codec + " not supported, aborting RS construction");
688      }
689    }
690  }
691
692  public String getClusterId() {
693    return this.clusterId;
694  }
695
696  /**
697   * All initialization needed before we go register with Master.<br>
698   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
699   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
700   */
701  private void preRegistrationInitialization() {
702    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
703    try (Scope ignored = span.makeCurrent()) {
704      initializeZooKeeper();
705      setupClusterConnection();
706      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
707      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
708      // Setup RPC client for master communication
709      this.rpcClient = asyncClusterConnection.getRpcClient();
710      span.setStatus(StatusCode.OK);
711    } catch (Throwable t) {
712      // Call stop if error or process will stick around for ever since server
713      // puts up non-daemon threads.
714      TraceUtil.setError(span, t);
715      this.rpcServices.stop();
716      abort("Initialization of RS failed.  Hence aborting RS.", t);
717    } finally {
718      span.end();
719    }
720  }
721
722  /**
723   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
724   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
725   * <p>
726   * Finally open long-living server short-circuit connection.
727   */
728  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
729      justification = "cluster Id znode read would give us correct response")
730  private void initializeZooKeeper() throws IOException, InterruptedException {
731    // Nothing to do in here if no Master in the mix.
732    if (this.masterless) {
733      return;
734    }
735
736    // Create the master address tracker, register with zk, and start it. Then
737    // block until a master is available. No point in starting up if no master
738    // running.
739    blockAndCheckIfStopped(this.masterAddressTracker);
740
741    // Wait on cluster being up. Master will set this flag up in zookeeper
742    // when ready.
743    blockAndCheckIfStopped(this.clusterStatusTracker);
744
745    // If we are HMaster then the cluster id should have already been set.
746    if (clusterId == null) {
747      // Retrieve clusterId
748      // Since cluster status is now up
749      // ID should have already been set by HMaster
750      try {
751        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
752        if (clusterId == null) {
753          this.abort("Cluster ID has not been set");
754        }
755        LOG.info("ClusterId : " + clusterId);
756      } catch (KeeperException e) {
757        this.abort("Failed to retrieve Cluster ID", e);
758      }
759    }
760
761    if (isStopped() || isAborted()) {
762      return; // No need for further initialization
763    }
764
765    // watch for snapshots and other procedures
766    try {
767      rspmHost = new RegionServerProcedureManagerHost();
768      rspmHost.loadProcedures(conf);
769      rspmHost.initialize(this);
770    } catch (KeeperException e) {
771      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
772    }
773  }
774
775  /**
776   * Utilty method to wait indefinitely on a znode availability while checking if the region server
777   * is shut down
778   * @param tracker znode tracker to use
779   * @throws IOException          any IO exception, plus if the RS is stopped
780   * @throws InterruptedException if the waiting thread is interrupted
781   */
782  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
783    throws IOException, InterruptedException {
784    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
785      if (this.stopped) {
786        throw new IOException("Received the shutdown message while waiting.");
787      }
788    }
789  }
790
791  /** Returns True if the cluster is up. */
792  @Override
793  public boolean isClusterUp() {
794    return this.masterless
795      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
796  }
797
798  private void initializeReplicationMarkerChore() {
799    boolean replicationMarkerEnabled =
800      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
801    // If replication or replication marker is not enabled then return immediately.
802    if (replicationMarkerEnabled) {
803      int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
804        REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
805      replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf);
806    }
807  }
808
809  @Override
810  public boolean isStopping() {
811    return stopping;
812  }
813
814  /**
815   * The HRegionServer sticks in this loop until closed.
816   */
817  @Override
818  public void run() {
819    if (isStopped()) {
820      LOG.info("Skipping run; stopped");
821      return;
822    }
823    try {
824      // Do pre-registration initializations; zookeeper, lease threads, etc.
825      preRegistrationInitialization();
826    } catch (Throwable e) {
827      abort("Fatal exception during initialization", e);
828    }
829
830    try {
831      if (!isStopped() && !isAborted()) {
832        installShutdownHook();
833
834        CoprocessorConfigurationUtil.syncReadOnlyConfigurations(conf,
835          CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
836
837        // Initialize the RegionServerCoprocessorHost now that our ephemeral
838        // node was created, in case any coprocessors want to use ZooKeeper
839        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
840
841        // Try and register with the Master; tell it we are here. Break if server is stopped or
842        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
843        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
844        // come up.
845        LOG.debug("About to register with Master.");
846        TraceUtil.trace(() -> {
847          RetryCounterFactory rcf =
848            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
849          RetryCounter rc = rcf.create();
850          while (keepLooping()) {
851            RegionServerStartupResponse w = reportForDuty();
852            if (w == null) {
853              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
854              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
855              this.sleeper.sleep(sleepTime);
856            } else {
857              handleReportForDutyResponse(w);
858              break;
859            }
860          }
861        }, "HRegionServer.registerWithMaster");
862      }
863
864      if (!isStopped() && isHealthy()) {
865        TraceUtil.trace(() -> {
866          // start the snapshot handler and other procedure handlers,
867          // since the server is ready to run
868          if (this.rspmHost != null) {
869            this.rspmHost.start();
870          }
871          // Start the Quota Manager
872          if (this.rsQuotaManager != null) {
873            rsQuotaManager.start(getRpcServer().getScheduler());
874          }
875          if (this.rsSpaceQuotaManager != null) {
876            this.rsSpaceQuotaManager.start();
877          }
878        }, "HRegionServer.startup");
879      }
880
881      // We registered with the Master. Go into run mode.
882      long lastMsg = EnvironmentEdgeManager.currentTime();
883      long oldRequestCount = -1;
884      // The main run loop.
885      while (!isStopped() && isHealthy()) {
886        if (!isClusterUp()) {
887          if (onlineRegions.isEmpty()) {
888            stop("Exiting; cluster shutdown set and not carrying any regions");
889          } else if (!this.stopping) {
890            this.stopping = true;
891            LOG.info("Closing user regions");
892            closeUserRegions(isAborted());
893          } else {
894            boolean allUserRegionsOffline = areAllUserRegionsOffline();
895            if (allUserRegionsOffline) {
896              // Set stopped if no more write requests tp meta tables
897              // since last time we went around the loop. Any open
898              // meta regions will be closed on our way out.
899              if (oldRequestCount == getWriteRequestCount()) {
900                stop("Stopped; only catalog regions remaining online");
901                break;
902              }
903              oldRequestCount = getWriteRequestCount();
904            } else {
905              // Make sure all regions have been closed -- some regions may
906              // have not got it because we were splitting at the time of
907              // the call to closeUserRegions.
908              closeUserRegions(this.abortRequested.get());
909            }
910            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
911          }
912        }
913        long now = EnvironmentEdgeManager.currentTime();
914        if ((now - lastMsg) >= msgInterval) {
915          tryRegionServerReport(lastMsg, now);
916          lastMsg = EnvironmentEdgeManager.currentTime();
917        }
918        if (!isStopped() && !isAborted()) {
919          this.sleeper.sleep();
920        }
921      } // for
922    } catch (Throwable t) {
923      if (!rpcServices.checkOOME(t)) {
924        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
925        abort(prefix + t.getMessage(), t);
926      }
927    }
928
929    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
930    try (Scope ignored = span.makeCurrent()) {
931      if (this.leaseManager != null) {
932        this.leaseManager.closeAfterLeasesExpire();
933      }
934      if (this.splitLogWorker != null) {
935        splitLogWorker.stop();
936      }
937      stopInfoServer();
938      // Send cache a shutdown.
939      if (blockCache != null) {
940        blockCache.shutdown();
941      }
942      if (mobFileCache != null) {
943        mobFileCache.shutdown();
944      }
945
946      // Send interrupts to wake up threads if sleeping so they notice shutdown.
947      // TODO: Should we check they are alive? If OOME could have exited already
948      if (this.hMemManager != null) {
949        this.hMemManager.stop();
950      }
951      if (this.cacheFlusher != null) {
952        this.cacheFlusher.interruptIfNecessary();
953      }
954      if (this.compactSplitThread != null) {
955        this.compactSplitThread.interruptIfNecessary();
956      }
957
958      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
959      if (rspmHost != null) {
960        rspmHost.stop(this.abortRequested.get() || this.killed);
961      }
962
963      if (this.killed) {
964        // Just skip out w/o closing regions. Used when testing.
965      } else if (abortRequested.get()) {
966        if (this.dataFsOk) {
967          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
968        }
969        LOG.info("aborting server " + this.serverName);
970      } else {
971        closeUserRegions(abortRequested.get());
972        LOG.info("stopping server " + this.serverName);
973      }
974      regionReplicationBufferManager.stop();
975      closeClusterConnection();
976      // Closing the compactSplit thread before closing meta regions
977      if (!this.killed && containsMetaTableRegions()) {
978        if (!abortRequested.get() || this.dataFsOk) {
979          if (this.compactSplitThread != null) {
980            this.compactSplitThread.join();
981            this.compactSplitThread = null;
982          }
983          closeMetaTableRegions(abortRequested.get());
984        }
985      }
986
987      if (!this.killed && this.dataFsOk) {
988        waitOnAllRegionsToClose(abortRequested.get());
989        LOG.info("stopping server " + this.serverName + "; all regions closed.");
990      }
991
992      // Stop the quota manager
993      if (rsQuotaManager != null) {
994        rsQuotaManager.stop();
995      }
996      if (rsSpaceQuotaManager != null) {
997        rsSpaceQuotaManager.stop();
998        rsSpaceQuotaManager = null;
999      }
1000
1001      // flag may be changed when closing regions throws exception.
1002      if (this.dataFsOk) {
1003        shutdownWAL(!abortRequested.get());
1004      }
1005
1006      // Make sure the proxy is down.
1007      if (this.rssStub != null) {
1008        this.rssStub = null;
1009      }
1010      if (this.lockStub != null) {
1011        this.lockStub = null;
1012      }
1013      if (this.rpcClient != null) {
1014        this.rpcClient.close();
1015      }
1016      if (this.leaseManager != null) {
1017        this.leaseManager.close();
1018      }
1019      if (this.pauseMonitor != null) {
1020        this.pauseMonitor.stop();
1021      }
1022
1023      if (!killed) {
1024        stopServiceThreads();
1025      }
1026
1027      if (this.rpcServices != null) {
1028        this.rpcServices.stop();
1029      }
1030
1031      try {
1032        deleteMyEphemeralNode();
1033      } catch (KeeperException.NoNodeException nn) {
1034        // pass
1035      } catch (KeeperException e) {
1036        LOG.warn("Failed deleting my ephemeral node", e);
1037      }
1038      // We may have failed to delete the znode at the previous step, but
1039      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1040      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1041
1042      closeZooKeeper();
1043      closeTableDescriptors();
1044      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1045      span.setStatus(StatusCode.OK);
1046    } finally {
1047      span.end();
1048    }
1049  }
1050
1051  private boolean containsMetaTableRegions() {
1052    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1053  }
1054
1055  private boolean areAllUserRegionsOffline() {
1056    if (getNumberOfOnlineRegions() > 2) {
1057      return false;
1058    }
1059    boolean allUserRegionsOffline = true;
1060    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1061      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1062        allUserRegionsOffline = false;
1063        break;
1064      }
1065    }
1066    return allUserRegionsOffline;
1067  }
1068
1069  /** Returns Current write count for all online regions. */
1070  private long getWriteRequestCount() {
1071    long writeCount = 0;
1072    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1073      writeCount += e.getValue().getWriteRequestsCount();
1074    }
1075    return writeCount;
1076  }
1077
1078  @InterfaceAudience.Private
1079  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1080    throws IOException {
1081    RegionServerStatusService.BlockingInterface rss = rssStub;
1082    if (rss == null) {
1083      // the current server could be stopping.
1084      return;
1085    }
1086    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1087    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1088    try (Scope ignored = span.makeCurrent()) {
1089      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1090      request.setServer(ProtobufUtil.toServerName(this.serverName));
1091      request.setLoad(sl);
1092      rss.regionServerReport(null, request.build());
1093      span.setStatus(StatusCode.OK);
1094    } catch (ServiceException se) {
1095      IOException ioe = ProtobufUtil.getRemoteException(se);
1096      if (ioe instanceof YouAreDeadException) {
1097        // This will be caught and handled as a fatal error in run()
1098        TraceUtil.setError(span, ioe);
1099        throw ioe;
1100      }
1101      if (rssStub == rss) {
1102        rssStub = null;
1103      }
1104      TraceUtil.setError(span, se);
1105      // Couldn't connect to the master, get location from zk and reconnect
1106      // Method blocks until new master is found or we are stopped
1107      createRegionServerStatusStub(true);
1108    } finally {
1109      span.end();
1110    }
1111  }
1112
1113  /**
1114   * Reports the given map of Regions and their size on the filesystem to the active Master.
1115   * @param regionSizeStore The store containing region sizes
1116   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1117   */
1118  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1119    RegionServerStatusService.BlockingInterface rss = rssStub;
1120    if (rss == null) {
1121      // the current server could be stopping.
1122      LOG.trace("Skipping Region size report to HMaster as stub is null");
1123      return true;
1124    }
1125    try {
1126      buildReportAndSend(rss, regionSizeStore);
1127    } catch (ServiceException se) {
1128      IOException ioe = ProtobufUtil.getRemoteException(se);
1129      if (ioe instanceof PleaseHoldException) {
1130        LOG.trace("Failed to report region sizes to Master because it is initializing."
1131          + " This will be retried.", ioe);
1132        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1133        return true;
1134      }
1135      if (rssStub == rss) {
1136        rssStub = null;
1137      }
1138      createRegionServerStatusStub(true);
1139      if (ioe instanceof DoNotRetryIOException) {
1140        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1141        if (doNotRetryEx.getCause() != null) {
1142          Throwable t = doNotRetryEx.getCause();
1143          if (t instanceof UnsupportedOperationException) {
1144            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1145            return false;
1146          }
1147        }
1148      }
1149      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1150    }
1151    return true;
1152  }
1153
1154  /**
1155   * Builds the region size report and sends it to the master. Upon successful sending of the
1156   * report, the region sizes that were sent are marked as sent.
1157   * @param rss             The stub to send to the Master
1158   * @param regionSizeStore The store containing region sizes
1159   */
1160  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1161    RegionSizeStore regionSizeStore) throws ServiceException {
1162    RegionSpaceUseReportRequest request =
1163      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1164    rss.reportRegionSpaceUse(null, request);
1165    // Record the number of size reports sent
1166    if (metricsRegionServer != null) {
1167      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1168    }
1169  }
1170
1171  /**
1172   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1173   * @param regionSizes The size in bytes of regions
1174   * @return The corresponding protocol buffer message.
1175   */
1176  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1177    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1178    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1179      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1180    }
1181    return request.build();
1182  }
1183
1184  /**
1185   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1186   * message.
1187   * @param regionInfo  The RegionInfo
1188   * @param sizeInBytes The size in bytes of the Region
1189   * @return The protocol buffer
1190   */
1191  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1192    return RegionSpaceUse.newBuilder()
1193      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1194      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1195  }
1196
1197  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1198    throws IOException {
1199    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1200    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1201    // the wrapper to compute those numbers in one place.
1202    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1203    // Instead they should be stored in an HBase table so that external visibility into HBase is
1204    // improved; Additionally the load balancer will be able to take advantage of a more complete
1205    // history.
1206    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1207    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1208    long usedMemory = -1L;
1209    long maxMemory = -1L;
1210    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1211    if (usage != null) {
1212      usedMemory = usage.getUsed();
1213      maxMemory = usage.getMax();
1214    }
1215
1216    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1217    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1218    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1219    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1220    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1221    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1222    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1223    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1224    Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder();
1225    for (String coprocessor : coprocessors) {
1226      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1227    }
1228    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1229    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1230    for (HRegion region : regions) {
1231      if (region.getCoprocessorHost() != null) {
1232        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1233        for (String regionCoprocessor : regionCoprocessors) {
1234          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1235        }
1236      }
1237      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1238      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1239        .getCoprocessors()) {
1240        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1241      }
1242    }
1243
1244    getBlockCache().ifPresent(cache -> {
1245      cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1246        regionCachedInfo.forEach((regionName, prefetchSize) -> {
1247          serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB));
1248        });
1249      });
1250    });
1251    serverLoad.setCacheFreeSize(regionServerWrapper.getBlockCacheFreeSize());
1252    if (DataTieringManager.getInstance() != null) {
1253      DataTieringManager.getInstance().getRegionColdDataSize()
1254        .forEach((regionName, coldDataSize) -> serverLoad.putRegionColdData(regionName,
1255          roundSize(coldDataSize.getSecond(), unitMB)));
1256    }
1257    serverLoad.setReportStartTime(reportStartTime);
1258    serverLoad.setReportEndTime(reportEndTime);
1259    if (this.infoServer != null) {
1260      serverLoad.setInfoServerPort(this.infoServer.getPort());
1261    } else {
1262      serverLoad.setInfoServerPort(-1);
1263    }
1264    MetricsUserAggregateSource userSource =
1265      metricsRegionServer.getMetricsUserAggregate().getSource();
1266    if (userSource != null) {
1267      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1268      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1269        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1270      }
1271    }
1272
1273    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1274      // always refresh first to get the latest value
1275      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1276      if (rLoad != null) {
1277        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1278        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1279          .getReplicationLoadSourceEntries()) {
1280          serverLoad.addReplLoadSource(rLS);
1281        }
1282      }
1283    } else {
1284      if (replicationSourceHandler != null) {
1285        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1286        if (rLoad != null) {
1287          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1288            .getReplicationLoadSourceEntries()) {
1289            serverLoad.addReplLoadSource(rLS);
1290          }
1291        }
1292      }
1293      if (replicationSinkHandler != null) {
1294        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1295        if (rLoad != null) {
1296          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1297        }
1298      }
1299    }
1300
1301    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1302      .newBuilder().setDescription(task.getDescription())
1303      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1304      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1305      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1306
1307    return serverLoad.build();
1308  }
1309
1310  private String getOnlineRegionsAsPrintableString() {
1311    StringBuilder sb = new StringBuilder();
1312    for (Region r : this.onlineRegions.values()) {
1313      if (sb.length() > 0) {
1314        sb.append(", ");
1315      }
1316      sb.append(r.getRegionInfo().getEncodedName());
1317    }
1318    return sb.toString();
1319  }
1320
1321  /**
1322   * Wait on regions close.
1323   */
1324  private void waitOnAllRegionsToClose(final boolean abort) {
1325    // Wait till all regions are closed before going out.
1326    int lastCount = -1;
1327    long previousLogTime = 0;
1328    Set<String> closedRegions = new HashSet<>();
1329    boolean interrupted = false;
1330    try {
1331      while (!onlineRegions.isEmpty()) {
1332        int count = getNumberOfOnlineRegions();
1333        // Only print a message if the count of regions has changed.
1334        if (count != lastCount) {
1335          // Log every second at most
1336          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1337            previousLogTime = EnvironmentEdgeManager.currentTime();
1338            lastCount = count;
1339            LOG.info("Waiting on " + count + " regions to close");
1340            // Only print out regions still closing if a small number else will
1341            // swamp the log.
1342            if (count < 10 && LOG.isDebugEnabled()) {
1343              LOG.debug("Online Regions=" + this.onlineRegions);
1344            }
1345          }
1346        }
1347        // Ensure all user regions have been sent a close. Use this to
1348        // protect against the case where an open comes in after we start the
1349        // iterator of onlineRegions to close all user regions.
1350        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1351          RegionInfo hri = e.getValue().getRegionInfo();
1352          if (
1353            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1354              && !closedRegions.contains(hri.getEncodedName())
1355          ) {
1356            closedRegions.add(hri.getEncodedName());
1357            // Don't update zk with this close transition; pass false.
1358            closeRegionIgnoreErrors(hri, abort);
1359          }
1360        }
1361        // No regions in RIT, we could stop waiting now.
1362        if (this.regionsInTransitionInRS.isEmpty()) {
1363          if (!onlineRegions.isEmpty()) {
1364            LOG.info("We were exiting though online regions are not empty,"
1365              + " because some regions failed closing");
1366          }
1367          break;
1368        } else {
1369          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1370            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1371        }
1372        if (sleepInterrupted(200)) {
1373          interrupted = true;
1374        }
1375      }
1376    } finally {
1377      if (interrupted) {
1378        Thread.currentThread().interrupt();
1379      }
1380    }
1381  }
1382
1383  private static boolean sleepInterrupted(long millis) {
1384    boolean interrupted = false;
1385    try {
1386      Thread.sleep(millis);
1387    } catch (InterruptedException e) {
1388      LOG.warn("Interrupted while sleeping");
1389      interrupted = true;
1390    }
1391    return interrupted;
1392  }
1393
1394  private void shutdownWAL(final boolean close) {
1395    if (this.walFactory != null) {
1396      try {
1397        if (close) {
1398          walFactory.close();
1399        } else {
1400          walFactory.shutdown();
1401        }
1402      } catch (Throwable e) {
1403        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1404        LOG.error("Shutdown / close of WAL failed: " + e);
1405        LOG.debug("Shutdown / close exception details:", e);
1406      }
1407    }
1408  }
1409
1410  /**
1411   * Run init. Sets up wal and starts up all server threads.
1412   * @param c Extra configuration.
1413   */
1414  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1415    throws IOException {
1416    try {
1417      boolean updateRootDir = false;
1418      for (NameStringPair e : c.getMapEntriesList()) {
1419        String key = e.getName();
1420        // The hostname the master sees us as.
1421        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1422          String hostnameFromMasterPOV = e.getValue();
1423          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1424            rpcServices.getSocketAddress().getPort(), this.startcode);
1425          String expectedHostName = rpcServices.getSocketAddress().getHostName();
1426          // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it
1427          // is set to disable. so we will use the ip of the RegionServer to compare with the
1428          // hostname passed by the Master, see HBASE-27304 for details.
1429          if (
1430            StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent()
1431              && InetAddresses.isInetAddress(getActiveMaster().get().getHostname())
1432          ) {
1433            expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress();
1434          }
1435          boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead)
1436            ? Strings.hostnamesEqual(hostnameFromMasterPOV, expectedHostName)
1437            : Strings.hostnamesEqual(hostnameFromMasterPOV, useThisHostnameInstead);
1438
1439          if (!isHostnameConsist) {
1440            String msg = "Master passed us a different hostname to use; was="
1441              + (StringUtils.isBlank(useThisHostnameInstead)
1442                ? expectedHostName
1443                : this.useThisHostnameInstead)
1444              + ", but now=" + hostnameFromMasterPOV;
1445            LOG.error(msg);
1446            throw new IOException(msg);
1447          }
1448          continue;
1449        }
1450
1451        String value = e.getValue();
1452        if (key.equals(HConstants.HBASE_DIR)) {
1453          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1454            updateRootDir = true;
1455          }
1456        }
1457
1458        if (LOG.isDebugEnabled()) {
1459          LOG.debug("Config from master: " + key + "=" + value);
1460        }
1461        this.conf.set(key, value);
1462      }
1463      // Set our ephemeral znode up in zookeeper now we have a name.
1464      createMyEphemeralNode();
1465
1466      if (updateRootDir) {
1467        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1468        initializeFileSystem();
1469      }
1470
1471      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1472      // config param for task trackers, but we can piggyback off of it.
1473      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1474        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1475      }
1476
1477      // Save it in a file, this will allow to see if we crash
1478      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1479
1480      // This call sets up an initialized replication and WAL. Later we start it up.
1481      setupWALAndReplication();
1482      // Init in here rather than in constructor after thread name has been set
1483      final MetricsTable metricsTable =
1484        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1485      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1486      this.metricsRegionServer =
1487        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1488      // Now that we have a metrics source, start the pause monitor
1489      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1490      pauseMonitor.start();
1491
1492      // There is a rare case where we do NOT want services to start. Check config.
1493      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1494        startServices();
1495      }
1496      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1497      // or make sense of it.
1498      startReplicationService();
1499
1500      // Set up ZK
1501      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress()
1502        + ", sessionid=0x"
1503        + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1504
1505      // Wake up anyone waiting for this server to online
1506      synchronized (online) {
1507        online.set(true);
1508        online.notifyAll();
1509      }
1510    } catch (Throwable e) {
1511      stop("Failed initialization");
1512      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1513    } finally {
1514      sleeper.skipSleepCycle();
1515    }
1516  }
1517
1518  private void startHeapMemoryManager() {
1519    if (this.blockCache != null) {
1520      this.hMemManager =
1521        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1522      this.hMemManager.start(getChoreService());
1523    }
1524  }
1525
1526  private void createMyEphemeralNode() throws KeeperException {
1527    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1528    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1529    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1530    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1531    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1532  }
1533
1534  private void deleteMyEphemeralNode() throws KeeperException {
1535    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1536  }
1537
1538  @Override
1539  public RegionServerAccounting getRegionServerAccounting() {
1540    return regionServerAccounting;
1541  }
1542
1543  // Round the size with KB or MB.
1544  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1545  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1546  // HBASE-26340 for more details on why this is important.
1547  private static int roundSize(long sizeInByte, int sizeUnit) {
1548    if (sizeInByte == 0) {
1549      return 0;
1550    } else if (sizeInByte < sizeUnit) {
1551      return 1;
1552    } else {
1553      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1554    }
1555  }
1556
1557  /**
1558   * @param r               Region to get RegionLoad for.
1559   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1560   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1561   * @return RegionLoad instance.
1562   */
1563  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1564    RegionSpecifier.Builder regionSpecifier) throws IOException {
1565    byte[] name = r.getRegionInfo().getRegionName();
1566    String regionEncodedName = r.getRegionInfo().getEncodedName();
1567    int stores = 0;
1568    int storefiles = 0;
1569    int storeRefCount = 0;
1570    int maxCompactedStoreFileRefCount = 0;
1571    long storeUncompressedSize = 0L;
1572    long storefileSize = 0L;
1573    long storefileIndexSize = 0L;
1574    long rootLevelIndexSize = 0L;
1575    long totalStaticIndexSize = 0L;
1576    long totalStaticBloomSize = 0L;
1577    long totalCompactingKVs = 0L;
1578    long currentCompactedKVs = 0L;
1579    long totalRegionSize = 0L;
1580    List<HStore> storeList = r.getStores();
1581    stores += storeList.size();
1582    for (HStore store : storeList) {
1583      storefiles += store.getStorefilesCount();
1584      int currentStoreRefCount = store.getStoreRefCount();
1585      storeRefCount += currentStoreRefCount;
1586      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1587      maxCompactedStoreFileRefCount =
1588        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1589      storeUncompressedSize += store.getStoreSizeUncompressed();
1590      storefileSize += store.getStorefilesSize();
1591      totalRegionSize += store.getHFilesSize();
1592      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1593      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1594      CompactionProgress progress = store.getCompactionProgress();
1595      if (progress != null) {
1596        totalCompactingKVs += progress.getTotalCompactingKVs();
1597        currentCompactedKVs += progress.currentCompactedKVs;
1598      }
1599      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1600      totalStaticIndexSize += store.getTotalStaticIndexSize();
1601      totalStaticBloomSize += store.getTotalStaticBloomSize();
1602    }
1603
1604    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1605    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1606    int storefileSizeMB = roundSize(storefileSize, unitMB);
1607    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1608    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1609    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1610    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1611    int regionSizeMB = roundSize(totalRegionSize, unitMB);
1612    final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f);
1613    getBlockCache().ifPresent(bc -> {
1614      bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1615        if (regionCachedInfo.containsKey(regionEncodedName)) {
1616          currentRegionCachedRatio.setValue(regionSizeMB == 0
1617            ? 0.0f
1618            : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB);
1619        }
1620      });
1621    });
1622    final MutableFloat currentRegionColdDataRatio = new MutableFloat(0.0f);
1623    if (DataTieringManager.getInstance() != null) {
1624      DataTieringManager.getInstance().getRegionColdDataSize().computeIfPresent(regionEncodedName,
1625        (k, v) -> {
1626          int coldSizeMB = roundSize(v.getSecond(), unitMB);
1627          currentRegionColdDataRatio
1628            .setValue(regionSizeMB == 0 ? 0.0f : (float) coldSizeMB / regionSizeMB);
1629          return v;
1630        });
1631    }
1632
1633    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1634    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1635    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1636    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1637    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1638    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1639    if (regionLoadBldr == null) {
1640      regionLoadBldr = RegionLoad.newBuilder();
1641    }
1642    if (regionSpecifier == null) {
1643      regionSpecifier = RegionSpecifier.newBuilder();
1644    }
1645
1646    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1647    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1648    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1649      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1650      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1651      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1652      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1653      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1654      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1655      .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount())
1656      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1657      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1658      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1659      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1660      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1661      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1662      .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB)
1663      .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue())
1664      .setCurrentRegionColdDataRatio(currentRegionColdDataRatio.floatValue());
1665    r.setCompleteSequenceId(regionLoadBldr);
1666    return regionLoadBldr.build();
1667  }
1668
1669  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1670    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1671    userLoadBldr.setUserName(user);
1672    userSource.getClientMetrics().values().stream()
1673      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1674        .setHostName(clientMetrics.getHostName())
1675        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1676        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1677        .setReadRequestsCount(clientMetrics.getReadRequestsCount())
1678        .setHostAddress(clientMetrics.getHostAddress()).setUserName(clientMetrics.getUserName())
1679        .setClientVersion(clientMetrics.getClientVersion())
1680        .setServiceName(clientMetrics.getServiceName())
1681        .setClientVersion(clientMetrics.getClientVersion()).build())
1682      .forEach(userLoadBldr::addClientMetrics);
1683    return userLoadBldr.build();
1684  }
1685
1686  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1687    HRegion r = onlineRegions.get(encodedRegionName);
1688    return r != null ? createRegionLoad(r, null, null) : null;
1689  }
1690
1691  /**
1692   * Inner class that runs on a long period checking if regions need compaction.
1693   */
1694  private static class CompactionChecker extends ScheduledChore {
1695    private final HRegionServer instance;
1696    private final int majorCompactPriority;
1697    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1698    // Iteration is 1-based rather than 0-based so we don't check for compaction
1699    // immediately upon region server startup
1700    private long iteration = 1;
1701
1702    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1703      super("CompactionChecker", stopper, sleepTime);
1704      this.instance = h;
1705      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1706
1707      /*
1708       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1709       */
1710      this.majorCompactPriority = this.instance.conf
1711        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1712    }
1713
1714    @Override
1715    protected void chore() {
1716      for (HRegion hr : this.instance.onlineRegions.values()) {
1717        // If region is read only or compaction is disabled at table level, there's no need to
1718        // iterate through region's stores
1719        if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {
1720          continue;
1721        }
1722
1723        for (HStore s : hr.stores.values()) {
1724          try {
1725            long multiplier = s.getCompactionCheckMultiplier();
1726            assert multiplier > 0;
1727            if (iteration % multiplier != 0) {
1728              continue;
1729            }
1730            if (s.needsCompaction()) {
1731              // Queue a compaction. Will recognize if major is needed.
1732              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1733                getName() + " requests compaction");
1734            } else if (s.shouldPerformMajorCompaction()) {
1735              s.triggerMajorCompaction();
1736              if (
1737                majorCompactPriority == DEFAULT_PRIORITY
1738                  || majorCompactPriority > hr.getCompactPriority()
1739              ) {
1740                this.instance.compactSplitThread.requestCompaction(hr, s,
1741                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
1742                  CompactionLifeCycleTracker.DUMMY, null);
1743              } else {
1744                this.instance.compactSplitThread.requestCompaction(hr, s,
1745                  getName() + " requests major compaction; use configured priority",
1746                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1747              }
1748            }
1749          } catch (IOException e) {
1750            LOG.warn("Failed major compaction check on " + hr, e);
1751          }
1752        }
1753      }
1754      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1755    }
1756  }
1757
1758  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1759    private final HRegionServer server;
1760    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1761    private final static int MIN_DELAY_TIME = 0; // millisec
1762    private final long rangeOfDelayMs;
1763
1764    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1765      super("MemstoreFlusherChore", server, cacheFlushInterval);
1766      this.server = server;
1767
1768      final long configuredRangeOfDelay = server.getConfiguration()
1769        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1770      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1771    }
1772
1773    @Override
1774    protected void chore() {
1775      final StringBuilder whyFlush = new StringBuilder();
1776      for (HRegion r : this.server.onlineRegions.values()) {
1777        if (r == null) {
1778          continue;
1779        }
1780        if (r.shouldFlush(whyFlush)) {
1781          FlushRequester requester = server.getFlushRequester();
1782          if (requester != null) {
1783            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1784            // Throttle the flushes by putting a delay. If we don't throttle, and there
1785            // is a balanced write-load on the regions in a table, we might end up
1786            // overwhelming the filesystem with too many flushes at once.
1787            if (requester.requestDelayedFlush(r, delay)) {
1788              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
1789                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
1790            }
1791          }
1792        }
1793      }
1794    }
1795  }
1796
1797  /**
1798   * Report the status of the server. A server is online once all the startup is completed (setting
1799   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
1800   * useful in tests.
1801   * @return true if online, false if not.
1802   */
1803  public boolean isOnline() {
1804    return online.get();
1805  }
1806
1807  /**
1808   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1809   * be hooked up to WAL.
1810   */
1811  private void setupWALAndReplication() throws IOException {
1812    WALFactory factory = new WALFactory(conf, serverName, this);
1813    // TODO Replication make assumptions here based on the default filesystem impl
1814    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1815    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1816
1817    Path logDir = new Path(walRootDir, logName);
1818    LOG.debug("logDir={}", logDir);
1819    if (this.walFs.exists(logDir)) {
1820      throw new RegionServerRunningException(
1821        "Region server has already created directory at " + this.serverName.toString());
1822    }
1823    // Create wal directory here and we will never create it again in other places. This is
1824    // important to make sure that our fencing way takes effect. See HBASE-29797 for more details.
1825    if (!this.walFs.mkdirs(logDir)) {
1826      throw new IOException("Can not create wal directory " + logDir);
1827    }
1828    // Instantiate replication if replication enabled. Pass it the log directories.
1829    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1830
1831    WALActionsListener walEventListener = getWALEventTrackerListener(conf);
1832    if (walEventListener != null && factory.getWALProvider() != null) {
1833      factory.getWALProvider().addWALActionsListener(walEventListener);
1834    }
1835    this.walFactory = factory;
1836  }
1837
1838  private WALActionsListener getWALEventTrackerListener(Configuration conf) {
1839    if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
1840      WALEventTrackerListener listener =
1841        new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
1842      return listener;
1843    }
1844    return null;
1845  }
1846
1847  /**
1848   * Start up replication source and sink handlers.
1849   */
1850  private void startReplicationService() throws IOException {
1851    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1852      this.replicationSourceHandler.startReplicationService();
1853    } else {
1854      if (this.replicationSourceHandler != null) {
1855        this.replicationSourceHandler.startReplicationService();
1856      }
1857      if (this.replicationSinkHandler != null) {
1858        this.replicationSinkHandler.startReplicationService();
1859      }
1860    }
1861  }
1862
1863  /** Returns Master address tracker instance. */
1864  public MasterAddressTracker getMasterAddressTracker() {
1865    return this.masterAddressTracker;
1866  }
1867
1868  /**
1869   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
1870   * to run. This is called after we've successfully registered with the Master. Install an
1871   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
1872   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
1873   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
1874   * to run should trigger same critical condition and the shutdown will run. On its way out, this
1875   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
1876   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
1877   * by this hosting server. Worker logs the exception and exits.
1878   */
1879  private void startServices() throws IOException {
1880    if (!isStopped() && !isAborted()) {
1881      initializeThreads();
1882    }
1883    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1884    this.secureBulkLoadManager.start();
1885
1886    // Health checker thread.
1887    if (isHealthCheckerConfigured()) {
1888      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1889        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1890      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1891    }
1892    // Executor status collect thread.
1893    if (
1894      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1895        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
1896    ) {
1897      int sleepTime =
1898        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1899      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1900        this.metricsRegionServer.getMetricsSource());
1901    }
1902
1903    this.walRoller = new LogRoller(this);
1904    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1905    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1906
1907    // Create the CompactedFileDischarger chore executorService. This chore helps to
1908    // remove the compacted files that will no longer be used in reads.
1909    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1910    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1911    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1912    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
1913    choreService.scheduleChore(compactedFileDischarger);
1914
1915    // Start executor services
1916    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1917    executorService.startExecutorService(executorService.new ExecutorConfig()
1918      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1919    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1920    executorService.startExecutorService(executorService.new ExecutorConfig()
1921      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1922    final int openPriorityRegionThreads =
1923      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1924    executorService.startExecutorService(
1925      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
1926        .setCorePoolSize(openPriorityRegionThreads));
1927    final int closeRegionThreads =
1928      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1929    executorService.startExecutorService(executorService.new ExecutorConfig()
1930      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1931    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1932    executorService.startExecutorService(executorService.new ExecutorConfig()
1933      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1934    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1935      final int storeScannerParallelSeekThreads =
1936        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1937      executorService.startExecutorService(
1938        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
1939          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
1940    }
1941    final int logReplayOpsThreads =
1942      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1943    executorService.startExecutorService(
1944      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
1945        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
1946    // Start the threads for compacted files discharger
1947    final int compactionDischargerThreads =
1948      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1949    executorService.startExecutorService(executorService.new ExecutorConfig()
1950      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
1951      .setCorePoolSize(compactionDischargerThreads));
1952    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1953      final int regionReplicaFlushThreads =
1954        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1955          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1956      executorService.startExecutorService(executorService.new ExecutorConfig()
1957        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
1958        .setCorePoolSize(regionReplicaFlushThreads));
1959    }
1960    final int refreshPeerThreads =
1961      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1962    executorService.startExecutorService(executorService.new ExecutorConfig()
1963      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1964    final int replaySyncReplicationWALThreads =
1965      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1966    executorService.startExecutorService(executorService.new ExecutorConfig()
1967      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
1968      .setCorePoolSize(replaySyncReplicationWALThreads));
1969    final int switchRpcThrottleThreads =
1970      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1971    executorService.startExecutorService(
1972      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
1973        .setCorePoolSize(switchRpcThrottleThreads));
1974    final int claimReplicationQueueThreads =
1975      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1976    executorService.startExecutorService(
1977      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
1978        .setCorePoolSize(claimReplicationQueueThreads));
1979    final int rsSnapshotOperationThreads =
1980      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1981    executorService.startExecutorService(
1982      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
1983        .setCorePoolSize(rsSnapshotOperationThreads));
1984    final int rsFlushOperationThreads =
1985      conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3);
1986    executorService.startExecutorService(executorService.new ExecutorConfig()
1987      .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads));
1988    final int rsRefreshQuotasThreads =
1989      conf.getInt("hbase.regionserver.executor.refresh.quotas.threads", 1);
1990    executorService.startExecutorService(
1991      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS)
1992        .setCorePoolSize(rsRefreshQuotasThreads));
1993    final int logRollThreads = conf.getInt("hbase.regionserver.executor.log.roll.threads", 1);
1994    executorService.startExecutorService(executorService.new ExecutorConfig()
1995      .setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads));
1996    final int rsRefreshHFilesThreads =
1997      conf.getInt("hbase.regionserver.executor.refresh.hfiles.threads", 3);
1998    executorService.startExecutorService(executorService.new ExecutorConfig()
1999      .setExecutorType(ExecutorType.RS_REFRESH_HFILES).setCorePoolSize(rsRefreshHFilesThreads));
2000
2001    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
2002      uncaughtExceptionHandler);
2003    if (this.cacheFlusher != null) {
2004      this.cacheFlusher.start(uncaughtExceptionHandler);
2005    }
2006    Threads.setDaemonThreadRunning(this.procedureResultReporter,
2007      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
2008
2009    if (this.compactionChecker != null) {
2010      choreService.scheduleChore(compactionChecker);
2011    }
2012    if (this.periodicFlusher != null) {
2013      choreService.scheduleChore(periodicFlusher);
2014    }
2015    if (this.healthCheckChore != null) {
2016      choreService.scheduleChore(healthCheckChore);
2017    }
2018    if (this.executorStatusChore != null) {
2019      choreService.scheduleChore(executorStatusChore);
2020    }
2021    if (this.nonceManagerChore != null) {
2022      choreService.scheduleChore(nonceManagerChore);
2023    }
2024    if (this.storefileRefresher != null) {
2025      choreService.scheduleChore(storefileRefresher);
2026    }
2027    if (this.fsUtilizationChore != null) {
2028      choreService.scheduleChore(fsUtilizationChore);
2029    }
2030    if (this.namedQueueServiceChore != null) {
2031      choreService.scheduleChore(namedQueueServiceChore);
2032    }
2033    if (this.brokenStoreFileCleaner != null) {
2034      choreService.scheduleChore(brokenStoreFileCleaner);
2035    }
2036    if (this.rsMobFileCleanerChore != null) {
2037      choreService.scheduleChore(rsMobFileCleanerChore);
2038    }
2039    if (replicationMarkerChore != null) {
2040      LOG.info("Starting replication marker chore");
2041      choreService.scheduleChore(replicationMarkerChore);
2042    }
2043
2044    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2045    // an unhandled exception, it will just exit.
2046    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2047      uncaughtExceptionHandler);
2048
2049    // Create the log splitting worker and start it
2050    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2051    // quite a while inside Connection layer. The worker won't be available for other
2052    // tasks even after current task is preempted after a split task times out.
2053    Configuration sinkConf = HBaseConfiguration.create(conf);
2054    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2055      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2056    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2057      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2058    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2059    if (
2060      this.csm != null
2061        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
2062    ) {
2063      // SplitLogWorker needs csm. If none, don't start this.
2064      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
2065      splitLogWorker.start();
2066      LOG.debug("SplitLogWorker started");
2067    }
2068
2069    // Memstore services.
2070    startHeapMemoryManager();
2071    // Call it after starting HeapMemoryManager.
2072    initializeMemStoreChunkCreator(hMemManager);
2073  }
2074
2075  private void initializeThreads() {
2076    // Cache flushing thread.
2077    this.cacheFlusher = new MemStoreFlusher(conf, this);
2078
2079    // Compaction thread
2080    this.compactSplitThread = new CompactSplit(this);
2081
2082    // Prefetch Notifier
2083    this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
2084
2085    // Background thread to check for compactions; needed if region has not gotten updates
2086    // in a while. It will take care of not checking too frequently on store-by-store basis.
2087    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2088    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2089    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2090
2091    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2092      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2093    final boolean walEventTrackerEnabled =
2094      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
2095
2096    if (isSlowLogTableEnabled || walEventTrackerEnabled) {
2097      // default chore duration: 10 min
2098      // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
2099      final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
2100        DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
2101
2102      final int namedQueueChoreDuration =
2103        conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
2104      // Considering min of slowLogChoreDuration and namedQueueChoreDuration
2105      int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
2106
2107      namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
2108        this.namedQueueRecorder, this.getConnection());
2109    }
2110
2111    if (this.nonceManager != null) {
2112      // Create the scheduled chore that cleans up nonces.
2113      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2114    }
2115
2116    // Setup the Quota Manager
2117    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2118    configurationManager.registerObserver(rsQuotaManager);
2119    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2120
2121    if (QuotaUtil.isQuotaEnabled(conf)) {
2122      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2123    }
2124
2125    boolean onlyMetaRefresh = false;
2126    int storefileRefreshPeriod =
2127      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2128        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2129    if (storefileRefreshPeriod == 0) {
2130      storefileRefreshPeriod =
2131        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2132          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2133      onlyMetaRefresh = true;
2134    }
2135    if (storefileRefreshPeriod > 0) {
2136      this.storefileRefresher =
2137        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
2138    }
2139
2140    int brokenStoreFileCleanerPeriod =
2141      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2142        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2143    int brokenStoreFileCleanerDelay =
2144      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2145        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2146    double brokenStoreFileCleanerDelayJitter =
2147      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2148        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2149    double jitterRate =
2150      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2151    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2152    this.brokenStoreFileCleaner =
2153      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2154        brokenStoreFileCleanerPeriod, this, conf, this);
2155
2156    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
2157
2158    registerConfigurationObservers();
2159    initializeReplicationMarkerChore();
2160  }
2161
2162  private void registerConfigurationObservers() {
2163    // Register Replication if possible, as now we support recreating replication peer storage, for
2164    // migrating across different replication peer storages online
2165    if (replicationSourceHandler instanceof ConfigurationObserver) {
2166      configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
2167    }
2168    if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
2169      configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
2170    }
2171    // Registering the compactSplitThread object with the ConfigurationManager.
2172    configurationManager.registerObserver(this.compactSplitThread);
2173    configurationManager.registerObserver(this.cacheFlusher);
2174    configurationManager.registerObserver(this.rpcServices);
2175    configurationManager.registerObserver(this.prefetchExecutorNotifier);
2176    configurationManager.registerObserver(this);
2177  }
2178
2179  /*
2180   * Verify that server is healthy
2181   */
2182  private boolean isHealthy() {
2183    if (!dataFsOk) {
2184      // File system problem
2185      return false;
2186    }
2187    // Verify that all threads are alive
2188    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2189      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2190      && (this.walRoller == null || this.walRoller.isAlive())
2191      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2192      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2193    if (!healthy) {
2194      stop("One or more threads are no longer alive -- stop");
2195    }
2196    return healthy;
2197  }
2198
2199  @Override
2200  public List<WAL> getWALs() {
2201    return walFactory.getWALs();
2202  }
2203
2204  @Override
2205  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2206    WAL wal = walFactory.getWAL(regionInfo);
2207    if (this.walRoller != null) {
2208      this.walRoller.addWAL(wal);
2209    }
2210    return wal;
2211  }
2212
2213  public LogRoller getWalRoller() {
2214    return walRoller;
2215  }
2216
2217  public WALFactory getWalFactory() {
2218    return walFactory;
2219  }
2220
2221  @Override
2222  public void stop(final String msg) {
2223    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2224  }
2225
2226  /**
2227   * Stops the regionserver.
2228   * @param msg   Status message
2229   * @param force True if this is a regionserver abort
2230   * @param user  The user executing the stop request, or null if no user is associated
2231   */
2232  public void stop(final String msg, final boolean force, final User user) {
2233    if (!this.stopped) {
2234      LOG.info("***** STOPPING region server '{}' *****", this);
2235      if (this.rsHost != null) {
2236        // when forced via abort don't allow CPs to override
2237        try {
2238          this.rsHost.preStop(msg, user);
2239        } catch (IOException ioe) {
2240          if (!force) {
2241            LOG.warn("The region server did not stop", ioe);
2242            return;
2243          }
2244          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2245        }
2246      }
2247      this.stopped = true;
2248      LOG.info("STOPPED: " + msg);
2249      // Wakes run() if it is sleeping
2250      sleeper.skipSleepCycle();
2251    }
2252  }
2253
2254  public void waitForServerOnline() {
2255    while (!isStopped() && !isOnline()) {
2256      synchronized (online) {
2257        try {
2258          online.wait(msgInterval);
2259        } catch (InterruptedException ie) {
2260          Thread.currentThread().interrupt();
2261          break;
2262        }
2263      }
2264    }
2265  }
2266
2267  @Override
2268  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2269    HRegion r = context.getRegion();
2270    long openProcId = context.getOpenProcId();
2271    long masterSystemTime = context.getMasterSystemTime();
2272    long initiatingMasterActiveTime = context.getInitiatingMasterActiveTime();
2273    rpcServices.checkOpen();
2274    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2275      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2276    // Do checks to see if we need to compact (references or too many files)
2277    // Skip compaction check if region is read only
2278    if (!r.isReadOnly()) {
2279      for (HStore s : r.stores.values()) {
2280        if (s.hasReferences() || s.needsCompaction()) {
2281          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2282        }
2283      }
2284    }
2285    long openSeqNum = r.getOpenSeqNum();
2286    if (openSeqNum == HConstants.NO_SEQNUM) {
2287      // If we opened a region, we should have read some sequence number from it.
2288      LOG.error(
2289        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2290      openSeqNum = 0;
2291    }
2292
2293    // Notify master
2294    if (
2295      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2296        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo(), initiatingMasterActiveTime))
2297    ) {
2298      throw new IOException(
2299        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2300    }
2301
2302    triggerFlushInPrimaryRegion(r);
2303
2304    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2305  }
2306
2307  /**
2308   * Helper method for use in tests. Skip the region transition report when there's no master around
2309   * to receive it.
2310   */
2311  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2312    final TransitionCode code = context.getCode();
2313    final long openSeqNum = context.getOpenSeqNum();
2314    long masterSystemTime = context.getMasterSystemTime();
2315    final RegionInfo[] hris = context.getHris();
2316
2317    if (code == TransitionCode.OPENED) {
2318      Preconditions.checkArgument(hris != null && hris.length == 1);
2319      if (hris[0].isMetaRegion()) {
2320        LOG.warn(
2321          "meta table location is stored in master local store, so we can not skip reporting");
2322        return false;
2323      } else {
2324        try {
2325          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2326            serverName, openSeqNum, masterSystemTime);
2327        } catch (IOException e) {
2328          LOG.info("Failed to update meta", e);
2329          return false;
2330        }
2331      }
2332    }
2333    return true;
2334  }
2335
2336  private ReportRegionStateTransitionRequest
2337    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2338    final TransitionCode code = context.getCode();
2339    final long openSeqNum = context.getOpenSeqNum();
2340    final RegionInfo[] hris = context.getHris();
2341    final long[] procIds = context.getProcIds();
2342
2343    ReportRegionStateTransitionRequest.Builder builder =
2344      ReportRegionStateTransitionRequest.newBuilder();
2345    builder.setServer(ProtobufUtil.toServerName(serverName));
2346    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2347    transition.setTransitionCode(code);
2348    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2349      transition.setOpenSeqNum(openSeqNum);
2350    }
2351    for (RegionInfo hri : hris) {
2352      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2353    }
2354    for (long procId : procIds) {
2355      transition.addProcId(procId);
2356    }
2357    transition.setInitiatingMasterActiveTime(context.getInitiatingMasterActiveTime());
2358
2359    return builder.build();
2360  }
2361
2362  @Override
2363  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2364    if (TEST_SKIP_REPORTING_TRANSITION) {
2365      return skipReportingTransition(context);
2366    }
2367    final ReportRegionStateTransitionRequest request =
2368      createReportRegionStateTransitionRequest(context);
2369
2370    int tries = 0;
2371    long pauseTime = this.retryPauseTime;
2372    // Keep looping till we get an error. We want to send reports even though server is going down.
2373    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2374    // HRegionServer does down.
2375    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2376      RegionServerStatusService.BlockingInterface rss = rssStub;
2377      try {
2378        if (rss == null) {
2379          createRegionServerStatusStub();
2380          continue;
2381        }
2382        ReportRegionStateTransitionResponse response =
2383          rss.reportRegionStateTransition(null, request);
2384        if (response.hasErrorMessage()) {
2385          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2386          break;
2387        }
2388        // Log if we had to retry else don't log unless TRACE. We want to
2389        // know if were successful after an attempt showed in logs as failed.
2390        if (tries > 0 || LOG.isTraceEnabled()) {
2391          LOG.info("TRANSITION REPORTED " + request);
2392        }
2393        // NOTE: Return mid-method!!!
2394        return true;
2395      } catch (ServiceException se) {
2396        IOException ioe = ProtobufUtil.getRemoteException(se);
2397        boolean pause = ioe instanceof ServerNotRunningYetException
2398          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2399        if (pause) {
2400          // Do backoff else we flood the Master with requests.
2401          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2402        } else {
2403          pauseTime = this.retryPauseTime; // Reset.
2404        }
2405        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2406          + tries + ")"
2407          + (pause
2408            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2409            : " immediately."),
2410          ioe);
2411        if (pause) {
2412          Threads.sleep(pauseTime);
2413        }
2414        tries++;
2415        if (rssStub == rss) {
2416          rssStub = null;
2417        }
2418      }
2419    }
2420    return false;
2421  }
2422
2423  /**
2424   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2425   * block this thread. See RegionReplicaFlushHandler for details.
2426   */
2427  private void triggerFlushInPrimaryRegion(final HRegion region) {
2428    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2429      return;
2430    }
2431    TableName tn = region.getTableDescriptor().getTableName();
2432    if (
2433      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2434        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2435        // If the memstore replication not setup, we do not have to wait for observing a flush event
2436        // from primary before starting to serve reads, because gaps from replication is not
2437        // applicable,this logic is from
2438        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2439        // HBASE-13063
2440        !region.getTableDescriptor().hasRegionMemStoreReplication()
2441    ) {
2442      region.setReadsEnabled(true);
2443      return;
2444    }
2445
2446    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2447    // RegionReplicaFlushHandler might reset this.
2448
2449    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2450    if (this.executorService != null) {
2451      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2452    } else {
2453      LOG.info("Executor is null; not running flush of primary region replica for {}",
2454        region.getRegionInfo());
2455    }
2456  }
2457
2458  @InterfaceAudience.Private
2459  public RSRpcServices getRSRpcServices() {
2460    return rpcServices;
2461  }
2462
2463  /**
2464   * Cause the server to exit without closing the regions it is serving, the log it is using and
2465   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2466   * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused
2467   * the abort, or null
2468   */
2469  @Override
2470  public void abort(String reason, Throwable cause) {
2471    if (!setAbortRequested()) {
2472      // Abort already in progress, ignore the new request.
2473      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2474      return;
2475    }
2476    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2477    if (cause != null) {
2478      LOG.error(HBaseMarkers.FATAL, msg, cause);
2479    } else {
2480      LOG.error(HBaseMarkers.FATAL, msg);
2481    }
2482    // HBASE-4014: show list of coprocessors that were loaded to help debug
2483    // regionserver crashes.Note that we're implicitly using
2484    // java.util.HashSet's toString() method to print the coprocessor names.
2485    LOG.error(HBaseMarkers.FATAL,
2486      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2487    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2488    try {
2489      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2490    } catch (MalformedObjectNameException | IOException e) {
2491      LOG.warn("Failed dumping metrics", e);
2492    }
2493
2494    // Do our best to report our abort to the master, but this may not work
2495    try {
2496      if (cause != null) {
2497        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2498      }
2499      // Report to the master but only if we have already registered with the master.
2500      RegionServerStatusService.BlockingInterface rss = rssStub;
2501      if (rss != null && this.serverName != null) {
2502        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2503        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2504        builder.setErrorMessage(msg);
2505        rss.reportRSFatalError(null, builder.build());
2506      }
2507    } catch (Throwable t) {
2508      LOG.warn("Unable to report fatal error to master", t);
2509    }
2510
2511    scheduleAbortTimer();
2512    // shutdown should be run as the internal user
2513    stop(reason, true, null);
2514  }
2515
2516  /*
2517   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2518   * close socket in case want to bring up server on old hostname+port immediately.
2519   */
2520  @InterfaceAudience.Private
2521  protected void kill() {
2522    this.killed = true;
2523    abort("Simulated kill");
2524  }
2525
2526  // Limits the time spent in the shutdown process.
2527  private void scheduleAbortTimer() {
2528    if (this.abortMonitor == null) {
2529      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2530      TimerTask abortTimeoutTask = null;
2531      try {
2532        Constructor<? extends TimerTask> timerTaskCtor =
2533          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2534            .asSubclass(TimerTask.class).getDeclaredConstructor();
2535        timerTaskCtor.setAccessible(true);
2536        abortTimeoutTask = timerTaskCtor.newInstance();
2537      } catch (Exception e) {
2538        LOG.warn("Initialize abort timeout task failed", e);
2539      }
2540      if (abortTimeoutTask != null) {
2541        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2542      }
2543    }
2544  }
2545
2546  /**
2547   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2548   * called.
2549   */
2550  protected void stopServiceThreads() {
2551    // clean up the scheduled chores
2552    stopChoreService();
2553    if (bootstrapNodeManager != null) {
2554      bootstrapNodeManager.stop();
2555    }
2556    if (this.cacheFlusher != null) {
2557      this.cacheFlusher.shutdown();
2558    }
2559    if (this.walRoller != null) {
2560      this.walRoller.close();
2561    }
2562    if (this.compactSplitThread != null) {
2563      this.compactSplitThread.join();
2564    }
2565    stopExecutorService();
2566    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2567      this.replicationSourceHandler.stopReplicationService();
2568    } else {
2569      if (this.replicationSourceHandler != null) {
2570        this.replicationSourceHandler.stopReplicationService();
2571      }
2572      if (this.replicationSinkHandler != null) {
2573        this.replicationSinkHandler.stopReplicationService();
2574      }
2575    }
2576  }
2577
2578  /** Returns Return the object that implements the replication source executorService. */
2579  @Override
2580  public ReplicationSourceService getReplicationSourceService() {
2581    return replicationSourceHandler;
2582  }
2583
2584  /** Returns Return the object that implements the replication sink executorService. */
2585  public ReplicationSinkService getReplicationSinkService() {
2586    return replicationSinkHandler;
2587  }
2588
2589  /**
2590   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2591   * connection, the current rssStub must be null. Method will block until a master is available.
2592   * You can break from this block by requesting the server stop.
2593   * @return master + port, or null if server has been stopped
2594   */
2595  private synchronized ServerName createRegionServerStatusStub() {
2596    // Create RS stub without refreshing the master node from ZK, use cached data
2597    return createRegionServerStatusStub(false);
2598  }
2599
2600  /**
2601   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2602   * connection, the current rssStub must be null. Method will block until a master is available.
2603   * You can break from this block by requesting the server stop.
2604   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2605   * @return master + port, or null if server has been stopped
2606   */
2607  @InterfaceAudience.Private
2608  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2609    if (rssStub != null) {
2610      return masterAddressTracker.getMasterAddress();
2611    }
2612    ServerName sn = null;
2613    long previousLogTime = 0;
2614    RegionServerStatusService.BlockingInterface intRssStub = null;
2615    LockService.BlockingInterface intLockStub = null;
2616    boolean interrupted = false;
2617    try {
2618      while (keepLooping()) {
2619        sn = this.masterAddressTracker.getMasterAddress(refresh);
2620        if (sn == null) {
2621          if (!keepLooping()) {
2622            // give up with no connection.
2623            LOG.debug("No master found and cluster is stopped; bailing out");
2624            return null;
2625          }
2626          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2627            LOG.debug("No master found; retry");
2628            previousLogTime = EnvironmentEdgeManager.currentTime();
2629          }
2630          refresh = true; // let's try pull it from ZK directly
2631          if (sleepInterrupted(200)) {
2632            interrupted = true;
2633          }
2634          continue;
2635        }
2636        try {
2637          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
2638            userProvider.getCurrent(), shortOperationTimeout);
2639          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2640          intLockStub = LockService.newBlockingStub(channel);
2641          break;
2642        } catch (IOException e) {
2643          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2644            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
2645            if (e instanceof ServerNotRunningYetException) {
2646              LOG.info("Master isn't available yet, retrying");
2647            } else {
2648              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2649            }
2650            previousLogTime = EnvironmentEdgeManager.currentTime();
2651          }
2652          if (sleepInterrupted(200)) {
2653            interrupted = true;
2654          }
2655        }
2656      }
2657    } finally {
2658      if (interrupted) {
2659        Thread.currentThread().interrupt();
2660      }
2661    }
2662    this.rssStub = intRssStub;
2663    this.lockStub = intLockStub;
2664    return sn;
2665  }
2666
2667  /**
2668   * @return True if we should break loop because cluster is going down or this server has been
2669   *         stopped or hdfs has gone bad.
2670   */
2671  private boolean keepLooping() {
2672    return !this.stopped && isClusterUp();
2673  }
2674
2675  /*
2676   * Let the master know we're here Run initialization using parameters passed us by the master.
2677   * @return A Map of key/value configurations we got from the Master else null if we failed to
2678   * register.
2679   */
2680  private RegionServerStartupResponse reportForDuty() throws IOException {
2681    if (this.masterless) {
2682      return RegionServerStartupResponse.getDefaultInstance();
2683    }
2684    ServerName masterServerName = createRegionServerStatusStub(true);
2685    RegionServerStatusService.BlockingInterface rss = rssStub;
2686    if (masterServerName == null || rss == null) {
2687      return null;
2688    }
2689    RegionServerStartupResponse result = null;
2690    try {
2691      rpcServices.requestCount.reset();
2692      rpcServices.rpcGetRequestCount.reset();
2693      rpcServices.rpcScanRequestCount.reset();
2694      rpcServices.rpcFullScanRequestCount.reset();
2695      rpcServices.rpcMultiRequestCount.reset();
2696      rpcServices.rpcMutateRequestCount.reset();
2697      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2698        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2699      long now = EnvironmentEdgeManager.currentTime();
2700      int port = rpcServices.getSocketAddress().getPort();
2701      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2702      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2703        request.setUseThisHostnameInstead(useThisHostnameInstead);
2704      }
2705      request.setPort(port);
2706      request.setServerStartCode(this.startcode);
2707      request.setServerCurrentTime(now);
2708      result = rss.regionServerStartup(null, request.build());
2709    } catch (ServiceException se) {
2710      IOException ioe = ProtobufUtil.getRemoteException(se);
2711      if (ioe instanceof ClockOutOfSyncException) {
2712        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
2713        // Re-throw IOE will cause RS to abort
2714        throw ioe;
2715      } else if (ioe instanceof DecommissionedHostRejectedException) {
2716        LOG.error(HBaseMarkers.FATAL,
2717          "Master rejected startup because the host is considered decommissioned", ioe);
2718        // Re-throw IOE will cause RS to abort
2719        throw ioe;
2720      } else if (ioe instanceof ServerNotRunningYetException) {
2721        LOG.debug("Master is not running yet");
2722      } else {
2723        LOG.warn("error telling master we are up", se);
2724      }
2725      rssStub = null;
2726    }
2727    return result;
2728  }
2729
2730  @Override
2731  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2732    try {
2733      GetLastFlushedSequenceIdRequest req =
2734        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2735      RegionServerStatusService.BlockingInterface rss = rssStub;
2736      if (rss == null) { // Try to connect one more time
2737        createRegionServerStatusStub();
2738        rss = rssStub;
2739        if (rss == null) {
2740          // Still no luck, we tried
2741          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2742          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2743            .build();
2744        }
2745      }
2746      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2747      return RegionStoreSequenceIds.newBuilder()
2748        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2749        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2750    } catch (ServiceException e) {
2751      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2752      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2753        .build();
2754    }
2755  }
2756
2757  /**
2758   * Close meta region if we carry it
2759   * @param abort Whether we're running an abort.
2760   */
2761  private void closeMetaTableRegions(final boolean abort) {
2762    HRegion meta = null;
2763    this.onlineRegionsLock.writeLock().lock();
2764    try {
2765      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
2766        RegionInfo hri = e.getValue().getRegionInfo();
2767        if (hri.isMetaRegion()) {
2768          meta = e.getValue();
2769        }
2770        if (meta != null) {
2771          break;
2772        }
2773      }
2774    } finally {
2775      this.onlineRegionsLock.writeLock().unlock();
2776    }
2777    if (meta != null) {
2778      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2779    }
2780  }
2781
2782  /**
2783   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
2784   * close regions that are already closed or that are closing.
2785   * @param abort Whether we're running an abort.
2786   */
2787  private void closeUserRegions(final boolean abort) {
2788    this.onlineRegionsLock.writeLock().lock();
2789    try {
2790      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
2791        HRegion r = e.getValue();
2792        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2793          // Don't update zk with this close transition; pass false.
2794          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2795        }
2796      }
2797    } finally {
2798      this.onlineRegionsLock.writeLock().unlock();
2799    }
2800  }
2801
2802  protected Map<String, HRegion> getOnlineRegions() {
2803    return this.onlineRegions;
2804  }
2805
2806  public int getNumberOfOnlineRegions() {
2807    return this.onlineRegions.size();
2808  }
2809
2810  /**
2811   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
2812   * as client; HRegion cannot be serialized to cross an rpc.
2813   */
2814  public Collection<HRegion> getOnlineRegionsLocalContext() {
2815    Collection<HRegion> regions = this.onlineRegions.values();
2816    return Collections.unmodifiableCollection(regions);
2817  }
2818
2819  @Override
2820  public void addRegion(HRegion region) {
2821    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2822    configurationManager.registerObserver(region);
2823  }
2824
2825  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2826    long size) {
2827    if (!sortedRegions.containsKey(size)) {
2828      sortedRegions.put(size, new ArrayList<>());
2829    }
2830    sortedRegions.get(size).add(region);
2831  }
2832
2833  /**
2834   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2835   *         the biggest.
2836   */
2837  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2838    // we'll sort the regions in reverse
2839    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2840    // Copy over all regions. Regions are sorted by size with biggest first.
2841    for (HRegion region : this.onlineRegions.values()) {
2842      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2843    }
2844    return sortedRegions;
2845  }
2846
2847  /**
2848   * @return A new Map of online regions sorted by region heap size with the first entry being the
2849   *         biggest.
2850   */
2851  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2852    // we'll sort the regions in reverse
2853    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2854    // Copy over all regions. Regions are sorted by size with biggest first.
2855    for (HRegion region : this.onlineRegions.values()) {
2856      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2857    }
2858    return sortedRegions;
2859  }
2860
2861  /** Returns reference to FlushRequester */
2862  @Override
2863  public FlushRequester getFlushRequester() {
2864    return this.cacheFlusher;
2865  }
2866
2867  @Override
2868  public CompactionRequester getCompactionRequestor() {
2869    return this.compactSplitThread;
2870  }
2871
2872  @Override
2873  public LeaseManager getLeaseManager() {
2874    return leaseManager;
2875  }
2876
2877  /** Returns {@code true} when the data file system is available, {@code false} otherwise. */
2878  boolean isDataFileSystemOk() {
2879    return this.dataFsOk;
2880  }
2881
2882  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
2883    return this.rsHost;
2884  }
2885
2886  @Override
2887  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2888    return this.regionsInTransitionInRS;
2889  }
2890
2891  @Override
2892  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2893    return rsQuotaManager;
2894  }
2895
2896  //
2897  // Main program and support routines
2898  //
2899  /**
2900   * Load the replication executorService objects, if any
2901   */
2902  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2903    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2904    // read in the name of the source replication class from the config file.
2905    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2906      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2907
2908    // read in the name of the sink replication class from the config file.
2909    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2910      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2911
2912    // If both the sink and the source class names are the same, then instantiate
2913    // only one object.
2914    if (sourceClassname.equals(sinkClassname)) {
2915      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2916        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2917      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2918      server.sameReplicationSourceAndSink = true;
2919    } else {
2920      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2921        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2922      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2923        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2924      server.sameReplicationSourceAndSink = false;
2925    }
2926  }
2927
2928  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2929    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2930    Path oldLogDir, WALFactory walFactory) throws IOException {
2931    final Class<? extends T> clazz;
2932    try {
2933      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2934      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2935    } catch (java.lang.ClassNotFoundException nfe) {
2936      throw new IOException("Could not find class for " + classname);
2937    }
2938    T service = ReflectionUtils.newInstance(clazz, conf);
2939    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2940    return service;
2941  }
2942
2943  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
2944    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2945    if (!this.isOnline()) {
2946      return walGroupsReplicationStatus;
2947    }
2948    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2949    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2950    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2951    for (ReplicationSourceInterface source : allSources) {
2952      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2953    }
2954    return walGroupsReplicationStatus;
2955  }
2956
2957  /**
2958   * Utility for constructing an instance of the passed HRegionServer class.
2959   */
2960  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
2961    final Configuration conf) {
2962    try {
2963      Constructor<? extends HRegionServer> c =
2964        regionServerClass.getConstructor(Configuration.class);
2965      return c.newInstance(conf);
2966    } catch (Exception e) {
2967      throw new RuntimeException(
2968        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
2969    }
2970  }
2971
2972  /**
2973   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2974   */
2975  public static void main(String[] args) {
2976    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2977    VersionInfo.logVersion();
2978    Configuration conf = HBaseConfiguration.create();
2979    @SuppressWarnings("unchecked")
2980    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2981      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2982
2983    new HRegionServerCommandLine(regionServerClass).doMain(args);
2984  }
2985
2986  /**
2987   * Gets the online regions of the specified table. This method looks at the in-memory
2988   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
2989   * If a region on this table has been closed during a disable, etc., it will not be included in
2990   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
2991   * all the ONLINE regions in the table.
2992   * @param tableName table to limit the scope of the query
2993   * @return Online regions from <code>tableName</code>
2994   */
2995  @Override
2996  public List<HRegion> getRegions(TableName tableName) {
2997    List<HRegion> tableRegions = new ArrayList<>();
2998    synchronized (this.onlineRegions) {
2999      for (HRegion region : this.onlineRegions.values()) {
3000        RegionInfo regionInfo = region.getRegionInfo();
3001        if (regionInfo.getTable().equals(tableName)) {
3002          tableRegions.add(region);
3003        }
3004      }
3005    }
3006    return tableRegions;
3007  }
3008
3009  @Override
3010  public List<HRegion> getRegions() {
3011    List<HRegion> allRegions;
3012    synchronized (this.onlineRegions) {
3013      // Return a clone copy of the onlineRegions
3014      allRegions = new ArrayList<>(onlineRegions.values());
3015    }
3016    return allRegions;
3017  }
3018
3019  /**
3020   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
3021   * @return all the online tables in this RS
3022   */
3023  public Set<TableName> getOnlineTables() {
3024    Set<TableName> tables = new HashSet<>();
3025    synchronized (this.onlineRegions) {
3026      for (Region region : this.onlineRegions.values()) {
3027        tables.add(region.getTableDescriptor().getTableName());
3028      }
3029    }
3030    return tables;
3031  }
3032
3033  public String[] getRegionServerCoprocessors() {
3034    TreeSet<String> coprocessors = new TreeSet<>();
3035    try {
3036      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3037    } catch (IOException exception) {
3038      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
3039        + "skipping.");
3040      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3041    }
3042    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3043    for (HRegion region : regions) {
3044      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3045      try {
3046        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3047      } catch (IOException exception) {
3048        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
3049          + "; skipping.");
3050        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3051      }
3052    }
3053    coprocessors.addAll(rsHost.getCoprocessors());
3054    return coprocessors.toArray(new String[0]);
3055  }
3056
3057  /**
3058   * Try to close the region, logs a warning on failure but continues.
3059   * @param region Region to close
3060   */
3061  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3062    try {
3063      if (!closeRegion(region.getEncodedName(), abort, null)) {
3064        LOG
3065          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
3066      }
3067    } catch (IOException e) {
3068      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
3069        e);
3070    }
3071  }
3072
3073  /**
3074   * Close asynchronously a region, can be called from the master or internally by the regionserver
3075   * when stopping. If called from the master, the region will update the status.
3076   * <p>
3077   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3078   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3079   * </p>
3080   * <p>
3081   * If a close was in progress, this new request will be ignored, and an exception thrown.
3082   * </p>
3083   * <p>
3084   * Provides additional flag to indicate if this region blocks should be evicted from the cache.
3085   * </p>
3086   * @param encodedName Region to close
3087   * @param abort       True if we are aborting
3088   * @param destination Where the Region is being moved too... maybe null if unknown.
3089   * @return True if closed a region.
3090   * @throws NotServingRegionException if the region is not online
3091   */
3092  protected boolean closeRegion(String encodedName, final boolean abort,
3093    final ServerName destination) throws NotServingRegionException {
3094    // Check for permissions to close.
3095    HRegion actualRegion = this.getRegion(encodedName);
3096    // Can be null if we're calling close on a region that's not online
3097    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3098      try {
3099        actualRegion.getCoprocessorHost().preClose(false);
3100      } catch (IOException exp) {
3101        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3102        return false;
3103      }
3104    }
3105
3106    // previous can come back 'null' if not in map.
3107    final Boolean previous =
3108      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
3109
3110    if (Boolean.TRUE.equals(previous)) {
3111      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
3112        + "trying to OPEN. Cancelling OPENING.");
3113      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3114        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3115        // We're going to try to do a standard close then.
3116        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
3117          + " Doing a standard close now");
3118        return closeRegion(encodedName, abort, destination);
3119      }
3120      // Let's get the region from the online region list again
3121      actualRegion = this.getRegion(encodedName);
3122      if (actualRegion == null) { // If already online, we still need to close it.
3123        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3124        // The master deletes the znode when it receives this exception.
3125        throw new NotServingRegionException(
3126          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
3127      }
3128    } else if (previous == null) {
3129      LOG.info("Received CLOSE for {}", encodedName);
3130    } else if (Boolean.FALSE.equals(previous)) {
3131      LOG.info("Received CLOSE for the region: " + encodedName
3132        + ", which we are already trying to CLOSE, but not completed yet");
3133      return true;
3134    }
3135
3136    if (actualRegion == null) {
3137      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3138      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3139      // The master deletes the znode when it receives this exception.
3140      throw new NotServingRegionException(
3141        "The region " + encodedName + " is not online, and is not opening.");
3142    }
3143
3144    CloseRegionHandler crh;
3145    final RegionInfo hri = actualRegion.getRegionInfo();
3146    if (hri.isMetaRegion()) {
3147      crh = new CloseMetaHandler(this, this, hri, abort);
3148    } else {
3149      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3150    }
3151    this.executorService.submit(crh);
3152    return true;
3153  }
3154
3155  /**
3156   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
3157   *         member of the online regions.
3158   */
3159  public HRegion getOnlineRegion(final byte[] regionName) {
3160    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3161    return this.onlineRegions.get(encodedRegionName);
3162  }
3163
3164  @Override
3165  public HRegion getRegion(final String encodedRegionName) {
3166    return this.onlineRegions.get(encodedRegionName);
3167  }
3168
3169  @Override
3170  public boolean removeRegion(final HRegion r, ServerName destination) {
3171    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3172    if (DataTieringManager.getInstance() != null) {
3173      DataTieringManager.getInstance().getRegionColdDataSize()
3174        .remove(r.getRegionInfo().getEncodedName());
3175    }
3176    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3177    if (destination != null) {
3178      long closeSeqNum = r.getMaxFlushedSeqId();
3179      if (closeSeqNum == HConstants.NO_SEQNUM) {
3180        // No edits in WAL for this region; get the sequence number when the region was opened.
3181        closeSeqNum = r.getOpenSeqNum();
3182        if (closeSeqNum == HConstants.NO_SEQNUM) {
3183          closeSeqNum = 0;
3184        }
3185      }
3186      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3187      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3188      if (selfMove) {
3189        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3190          r.getRegionInfo().getEncodedName(),
3191          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3192      }
3193    }
3194    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3195    configurationManager.deregisterObserver(r);
3196    return toReturn != null;
3197  }
3198
3199  /**
3200   * Protected Utility method for safely obtaining an HRegion handle.
3201   * @param regionName Name of online {@link HRegion} to return
3202   * @return {@link HRegion} for <code>regionName</code>
3203   */
3204  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3205    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3206    return getRegionByEncodedName(regionName, encodedRegionName);
3207  }
3208
3209  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3210    return getRegionByEncodedName(null, encodedRegionName);
3211  }
3212
3213  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3214    throws NotServingRegionException {
3215    HRegion region = this.onlineRegions.get(encodedRegionName);
3216    if (region == null) {
3217      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3218      if (moveInfo != null) {
3219        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3220      }
3221      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3222      String regionNameStr =
3223        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3224      if (isOpening != null && isOpening) {
3225        throw new RegionOpeningException(
3226          "Region " + regionNameStr + " is opening on " + this.serverName);
3227      }
3228      throw new NotServingRegionException(
3229        "" + regionNameStr + " is not online on " + this.serverName);
3230    }
3231    return region;
3232  }
3233
3234  /**
3235   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3236   * already.
3237   * @param t   Throwable
3238   * @param msg Message to log in error. Can be null.
3239   * @return Throwable converted to an IOE; methods can only let out IOEs.
3240   */
3241  private Throwable cleanup(final Throwable t, final String msg) {
3242    // Don't log as error if NSRE; NSRE is 'normal' operation.
3243    if (t instanceof NotServingRegionException) {
3244      LOG.debug("NotServingRegionException; " + t.getMessage());
3245      return t;
3246    }
3247    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3248    if (msg == null) {
3249      LOG.error("", e);
3250    } else {
3251      LOG.error(msg, e);
3252    }
3253    if (!rpcServices.checkOOME(t)) {
3254      checkFileSystem();
3255    }
3256    return t;
3257  }
3258
3259  /**
3260   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3261   * @return Make <code>t</code> an IOE if it isn't already.
3262   */
3263  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3264    return (t instanceof IOException ? (IOException) t
3265      : msg == null || msg.length() == 0 ? new IOException(t)
3266      : new IOException(msg, t));
3267  }
3268
3269  /**
3270   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3271   * stopRequested
3272   * @return false if file system is not available
3273   */
3274  boolean checkFileSystem() {
3275    if (this.dataFsOk && this.dataFs != null) {
3276      try {
3277        FSUtils.checkFileSystemAvailable(this.dataFs);
3278      } catch (IOException e) {
3279        abort("File System not available", e);
3280        this.dataFsOk = false;
3281      }
3282    }
3283    return this.dataFsOk;
3284  }
3285
3286  @Override
3287  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3288    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3289    Address[] addr = new Address[favoredNodes.size()];
3290    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3291    // it is a map of region name to Address[]
3292    for (int i = 0; i < favoredNodes.size(); i++) {
3293      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3294    }
3295    regionFavoredNodesMap.put(encodedRegionName, addr);
3296  }
3297
3298  /**
3299   * Return the favored nodes for a region given its encoded name. Look at the comment around
3300   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3301   * @param encodedRegionName the encoded region name.
3302   * @return array of favored locations
3303   */
3304  @Override
3305  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3306    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3307  }
3308
3309  @Override
3310  public ServerNonceManager getNonceManager() {
3311    return this.nonceManager;
3312  }
3313
3314  private static class MovedRegionInfo {
3315    private final ServerName serverName;
3316    private final long seqNum;
3317
3318    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3319      this.serverName = serverName;
3320      this.seqNum = closeSeqNum;
3321    }
3322
3323    public ServerName getServerName() {
3324      return serverName;
3325    }
3326
3327    public long getSeqNum() {
3328      return seqNum;
3329    }
3330  }
3331
3332  /**
3333   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3334   * number of network calls instead of reducing them.
3335   */
3336  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3337
3338  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3339    boolean selfMove) {
3340    if (selfMove) {
3341      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3342      return;
3343    }
3344    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3345      + closeSeqNum);
3346    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3347  }
3348
3349  // public for being called in tests
3350  @InterfaceAudience.Private
3351  public void removeFromMovedRegions(String encodedName) {
3352    movedRegionInfoCache.invalidate(encodedName);
3353  }
3354
3355  @InterfaceAudience.Private
3356  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3357    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3358  }
3359
3360  @InterfaceAudience.Private
3361  public int movedRegionCacheExpiredTime() {
3362    return TIMEOUT_REGION_MOVED;
3363  }
3364
3365  private String getMyEphemeralNodePath() {
3366    return zooKeeper.getZNodePaths().getRsPath(serverName);
3367  }
3368
3369  private boolean isHealthCheckerConfigured() {
3370    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3371    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3372  }
3373
3374  /** Returns the underlying {@link CompactSplit} for the servers */
3375  public CompactSplit getCompactSplitThread() {
3376    return this.compactSplitThread;
3377  }
3378
3379  CoprocessorServiceResponse execRegionServerService(
3380    @SuppressWarnings("UnusedParameters") final RpcController controller,
3381    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3382    try {
3383      ServerRpcController serviceController = new ServerRpcController();
3384      CoprocessorServiceCall call = serviceRequest.getCall();
3385      String serviceName = call.getServiceName();
3386      Service service = coprocessorServiceHandlers.get(serviceName);
3387      if (service == null) {
3388        throw new UnknownProtocolException(null,
3389          "No registered coprocessor executorService found for " + serviceName);
3390      }
3391      ServiceDescriptor serviceDesc = service.getDescriptorForType();
3392
3393      String methodName = call.getMethodName();
3394      MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3395      if (methodDesc == null) {
3396        throw new UnknownProtocolException(service.getClass(),
3397          "Unknown method " + methodName + " called on executorService " + serviceName);
3398      }
3399
3400      Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3401      final Message.Builder responseBuilder =
3402        service.getResponsePrototype(methodDesc).newBuilderForType();
3403      service.callMethod(methodDesc, serviceController, request, message -> {
3404        if (message != null) {
3405          responseBuilder.mergeFrom(message);
3406        }
3407      });
3408      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3409      if (exception != null) {
3410        throw exception;
3411      }
3412      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3413    } catch (IOException ie) {
3414      throw new ServiceException(ie);
3415    }
3416  }
3417
3418  /**
3419   * May be null if this is a master which not carry table.
3420   * @return The block cache instance used by the regionserver.
3421   */
3422  @Override
3423  public Optional<BlockCache> getBlockCache() {
3424    return Optional.ofNullable(this.blockCache);
3425  }
3426
3427  /**
3428   * May be null if this is a master which not carry table.
3429   * @return The cache for mob files used by the regionserver.
3430   */
3431  @Override
3432  public Optional<MobFileCache> getMobFileCache() {
3433    return Optional.ofNullable(this.mobFileCache);
3434  }
3435
3436  CacheEvictionStats clearRegionBlockCache(Region region) {
3437    long evictedBlocks = 0;
3438
3439    for (Store store : region.getStores()) {
3440      for (StoreFile hFile : store.getStorefiles()) {
3441        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3442      }
3443    }
3444
3445    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3446  }
3447
3448  @Override
3449  public double getCompactionPressure() {
3450    double max = 0;
3451    for (Region region : onlineRegions.values()) {
3452      for (Store store : region.getStores()) {
3453        double normCount = store.getCompactionPressure();
3454        if (normCount > max) {
3455          max = normCount;
3456        }
3457      }
3458    }
3459    return max;
3460  }
3461
3462  @Override
3463  public HeapMemoryManager getHeapMemoryManager() {
3464    return hMemManager;
3465  }
3466
3467  public MemStoreFlusher getMemStoreFlusher() {
3468    return cacheFlusher;
3469  }
3470
3471  /**
3472   * For testing
3473   * @return whether all wal roll request finished for this regionserver
3474   */
3475  @InterfaceAudience.Private
3476  public boolean walRollRequestFinished() {
3477    return this.walRoller.walRollFinished();
3478  }
3479
3480  @Override
3481  public ThroughputController getFlushThroughputController() {
3482    return flushThroughputController;
3483  }
3484
3485  @Override
3486  public double getFlushPressure() {
3487    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3488      // return 0 during RS initialization
3489      return 0.0;
3490    }
3491    return getRegionServerAccounting().getFlushPressure();
3492  }
3493
3494  /**
3495   * Dynamically updates HRegionServer's configuration. Since HRegionServer inherits from
3496   * {@link HBaseServerBase}, the {@code updatedConf} parameter references the same
3497   * {@link Configuration} object as HRegionServer's {@code this.conf} instance variable in a real
3498   * HBase deployment. This isn't necessarily the case in unit tests.
3499   * @param updatedConf the dynamically updated configuration
3500   */
3501  @Override
3502  public void onConfigurationChange(Configuration updatedConf) {
3503    ThroughputController old = this.flushThroughputController;
3504    if (old != null) {
3505      old.stop("configuration change");
3506    }
3507    this.flushThroughputController = FlushThroughputControllerFactory.create(this, updatedConf);
3508    try {
3509      Superusers.initialize(updatedConf);
3510    } catch (IOException e) {
3511      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3512    }
3513
3514    boolean originalIsReadOnlyEnabled = CoprocessorConfigurationUtil
3515      .areReadOnlyCoprocessorsLoaded(this.conf, CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
3516
3517    // updatedConf and this.conf reference the same Configuration object in an actual HBase
3518    // deployment. However, in unit test cases they reference different Configuration objects, so
3519    // this.conf needs to be updated.
3520    CoprocessorConfigurationUtil.maybeUpdateCoprocessors(updatedConf, this.conf,
3521      originalIsReadOnlyEnabled, this.rsHost, CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
3522      false, this.toString(), conf -> this.rsHost = new RegionServerCoprocessorHost(this, conf));
3523  }
3524
3525  @Override
3526  public MetricsRegionServer getMetrics() {
3527    return metricsRegionServer;
3528  }
3529
3530  @Override
3531  public SecureBulkLoadManager getSecureBulkLoadManager() {
3532    return this.secureBulkLoadManager;
3533  }
3534
3535  @Override
3536  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3537    final Abortable abort) {
3538    final LockServiceClient client =
3539      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3540    return client.regionLock(regionInfo, description, abort);
3541  }
3542
3543  @Override
3544  public void unassign(byte[] regionName) throws IOException {
3545    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3546  }
3547
3548  @Override
3549  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3550    return this.rsSpaceQuotaManager;
3551  }
3552
3553  @Override
3554  public boolean reportFileArchivalForQuotas(TableName tableName,
3555    Collection<Entry<String, Long>> archivedFiles) {
3556    if (TEST_SKIP_REPORTING_TRANSITION) {
3557      return false;
3558    }
3559    RegionServerStatusService.BlockingInterface rss = rssStub;
3560    if (rss == null || rsSpaceQuotaManager == null) {
3561      // the current server could be stopping.
3562      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3563      return false;
3564    }
3565    try {
3566      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3567        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3568      rss.reportFileArchival(null, request);
3569    } catch (ServiceException se) {
3570      IOException ioe = ProtobufUtil.getRemoteException(se);
3571      if (ioe instanceof PleaseHoldException) {
3572        if (LOG.isTraceEnabled()) {
3573          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3574            + " This will be retried.", ioe);
3575        }
3576        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3577        return false;
3578      }
3579      if (rssStub == rss) {
3580        rssStub = null;
3581      }
3582      // re-create the stub if we failed to report the archival
3583      createRegionServerStatusStub(true);
3584      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3585      return false;
3586    }
3587    return true;
3588  }
3589
3590  void executeProcedure(long procId, long initiatingMasterActiveTime,
3591    RSProcedureCallable callable) {
3592    executorService
3593      .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable));
3594  }
3595
3596  public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, Throwable error,
3597    byte[] procResultData) {
3598    procedureResultReporter.complete(procId, initiatingMasterActiveTime, error, procResultData);
3599  }
3600
3601  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3602    RegionServerStatusService.BlockingInterface rss;
3603    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3604    for (;;) {
3605      rss = rssStub;
3606      if (rss != null) {
3607        break;
3608      }
3609      createRegionServerStatusStub();
3610    }
3611    try {
3612      rss.reportProcedureDone(null, request);
3613    } catch (ServiceException se) {
3614      if (rssStub == rss) {
3615        rssStub = null;
3616      }
3617      throw ProtobufUtil.getRemoteException(se);
3618    }
3619  }
3620
3621  /**
3622   * Will ignore the open/close region procedures which already submitted or executed. When master
3623   * had unfinished open/close region procedure and restarted, new active master may send duplicate
3624   * open/close region request to regionserver. The open/close request is submitted to a thread pool
3625   * and execute. So first need a cache for submitted open/close region procedures. After the
3626   * open/close region request executed and report region transition succeed, cache it in executed
3627   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3628   * transition succeed, master will not send the open/close region request to regionserver again.
3629   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3630   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
3631   * HBASE-22404 for more details.
3632   * @param procId the id of the open/close region procedure
3633   * @return true if the procedure can be submitted.
3634   */
3635  boolean submitRegionProcedure(long procId) {
3636    if (procId == -1) {
3637      return true;
3638    }
3639    // Ignore the region procedures which already submitted.
3640    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3641    if (previous != null) {
3642      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3643      return false;
3644    }
3645    // Ignore the region procedures which already executed.
3646    if (executedRegionProcedures.getIfPresent(procId) != null) {
3647      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3648      return false;
3649    }
3650    return true;
3651  }
3652
3653  /**
3654   * See {@link #submitRegionProcedure(long)}.
3655   * @param procId the id of the open/close region procedure
3656   */
3657  public void finishRegionProcedure(long procId) {
3658    executedRegionProcedures.put(procId, procId);
3659    submittedRegionProcedures.remove(procId);
3660  }
3661
3662  /**
3663   * Force to terminate region server when abort timeout.
3664   */
3665  private static class SystemExitWhenAbortTimeout extends TimerTask {
3666
3667    public SystemExitWhenAbortTimeout() {
3668    }
3669
3670    @Override
3671    public void run() {
3672      LOG.warn("Aborting region server timed out, terminating forcibly"
3673        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
3674        + " Thread dump to stdout.");
3675      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3676      Runtime.getRuntime().halt(1);
3677    }
3678  }
3679
3680  @InterfaceAudience.Private
3681  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3682    return compactedFileDischarger;
3683  }
3684
3685  /**
3686   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3687   * @return pause time
3688   */
3689  @InterfaceAudience.Private
3690  public long getRetryPauseTime() {
3691    return this.retryPauseTime;
3692  }
3693
3694  @Override
3695  public Optional<ServerName> getActiveMaster() {
3696    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3697  }
3698
3699  @Override
3700  public List<ServerName> getBackupMasters() {
3701    return masterAddressTracker.getBackupMasters();
3702  }
3703
3704  @Override
3705  public Iterator<ServerName> getBootstrapNodes() {
3706    return bootstrapNodeManager.getBootstrapNodes().iterator();
3707  }
3708
3709  @Override
3710  public List<HRegionLocation> getMetaLocations() {
3711    return metaRegionLocationCache.getMetaRegionLocations();
3712  }
3713
3714  @Override
3715  protected NamedQueueRecorder createNamedQueueRecord() {
3716    return NamedQueueRecorder.getInstance(conf);
3717  }
3718
3719  @Override
3720  protected boolean clusterMode() {
3721    // this method will be called in the constructor of super class, so we can not return masterless
3722    // directly here, as it will always be false.
3723    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3724  }
3725
3726  @InterfaceAudience.Private
3727  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
3728    return brokenStoreFileCleaner;
3729  }
3730
3731  @InterfaceAudience.Private
3732  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
3733    return rsMobFileCleanerChore;
3734  }
3735
3736  RSSnapshotVerifier getRsSnapshotVerifier() {
3737    return rsSnapshotVerifier;
3738  }
3739
3740  @Override
3741  protected void stopChores() {
3742    shutdownChore(nonceManagerChore);
3743    shutdownChore(compactionChecker);
3744    shutdownChore(compactedFileDischarger);
3745    shutdownChore(periodicFlusher);
3746    shutdownChore(healthCheckChore);
3747    shutdownChore(executorStatusChore);
3748    shutdownChore(storefileRefresher);
3749    shutdownChore(fsUtilizationChore);
3750    shutdownChore(namedQueueServiceChore);
3751    shutdownChore(brokenStoreFileCleaner);
3752    shutdownChore(rsMobFileCleanerChore);
3753    shutdownChore(replicationMarkerChore);
3754  }
3755
3756  @Override
3757  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3758    return regionReplicationBufferManager;
3759  }
3760}