001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
034
035import io.opentelemetry.api.trace.Span;
036import io.opentelemetry.api.trace.StatusCode;
037import io.opentelemetry.context.Scope;
038import java.io.IOException;
039import java.io.PrintWriter;
040import java.lang.management.MemoryUsage;
041import java.lang.reflect.Constructor;
042import java.net.InetSocketAddress;
043import java.time.Duration;
044import java.util.ArrayList;
045import java.util.Collection;
046import java.util.Collections;
047import java.util.Comparator;
048import java.util.HashSet;
049import java.util.Iterator;
050import java.util.List;
051import java.util.Map;
052import java.util.Map.Entry;
053import java.util.Objects;
054import java.util.Optional;
055import java.util.Set;
056import java.util.SortedMap;
057import java.util.Timer;
058import java.util.TimerTask;
059import java.util.TreeMap;
060import java.util.TreeSet;
061import java.util.concurrent.ConcurrentHashMap;
062import java.util.concurrent.ConcurrentMap;
063import java.util.concurrent.ConcurrentSkipListMap;
064import java.util.concurrent.ThreadLocalRandom;
065import java.util.concurrent.TimeUnit;
066import java.util.concurrent.atomic.AtomicBoolean;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.stream.Collectors;
069import javax.management.MalformedObjectNameException;
070import javax.servlet.http.HttpServlet;
071import org.apache.commons.lang3.StringUtils;
072import org.apache.commons.lang3.mutable.MutableFloat;
073import org.apache.hadoop.conf.Configuration;
074import org.apache.hadoop.fs.FileSystem;
075import org.apache.hadoop.fs.Path;
076import org.apache.hadoop.hbase.Abortable;
077import org.apache.hadoop.hbase.CacheEvictionStats;
078import org.apache.hadoop.hbase.CallQueueTooBigException;
079import org.apache.hadoop.hbase.ClockOutOfSyncException;
080import org.apache.hadoop.hbase.DoNotRetryIOException;
081import org.apache.hadoop.hbase.ExecutorStatusChore;
082import org.apache.hadoop.hbase.HBaseConfiguration;
083import org.apache.hadoop.hbase.HBaseInterfaceAudience;
084import org.apache.hadoop.hbase.HBaseServerBase;
085import org.apache.hadoop.hbase.HConstants;
086import org.apache.hadoop.hbase.HDFSBlocksDistribution;
087import org.apache.hadoop.hbase.HRegionLocation;
088import org.apache.hadoop.hbase.HealthCheckChore;
089import org.apache.hadoop.hbase.MetaTableAccessor;
090import org.apache.hadoop.hbase.NotServingRegionException;
091import org.apache.hadoop.hbase.PleaseHoldException;
092import org.apache.hadoop.hbase.ScheduledChore;
093import org.apache.hadoop.hbase.ServerName;
094import org.apache.hadoop.hbase.Stoppable;
095import org.apache.hadoop.hbase.TableName;
096import org.apache.hadoop.hbase.YouAreDeadException;
097import org.apache.hadoop.hbase.ZNodeClearer;
098import org.apache.hadoop.hbase.client.ConnectionUtils;
099import org.apache.hadoop.hbase.client.RegionInfo;
100import org.apache.hadoop.hbase.client.RegionInfoBuilder;
101import org.apache.hadoop.hbase.client.locking.EntityLock;
102import org.apache.hadoop.hbase.client.locking.LockServiceClient;
103import org.apache.hadoop.hbase.conf.ConfigurationObserver;
104import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
105import org.apache.hadoop.hbase.exceptions.RegionMovedException;
106import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
107import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
108import org.apache.hadoop.hbase.executor.ExecutorType;
109import org.apache.hadoop.hbase.http.InfoServer;
110import org.apache.hadoop.hbase.io.hfile.BlockCache;
111import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
112import org.apache.hadoop.hbase.io.hfile.HFile;
113import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
114import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
115import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
116import org.apache.hadoop.hbase.ipc.RpcClient;
117import org.apache.hadoop.hbase.ipc.RpcServer;
118import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
119import org.apache.hadoop.hbase.ipc.ServerRpcController;
120import org.apache.hadoop.hbase.log.HBaseMarkers;
121import org.apache.hadoop.hbase.mob.MobFileCache;
122import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
123import org.apache.hadoop.hbase.monitoring.TaskMonitor;
124import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
125import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
126import org.apache.hadoop.hbase.net.Address;
127import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
128import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
129import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
130import org.apache.hadoop.hbase.quotas.QuotaUtil;
131import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
132import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
133import org.apache.hadoop.hbase.quotas.RegionSize;
134import org.apache.hadoop.hbase.quotas.RegionSizeStore;
135import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
136import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
137import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
138import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
139import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
140import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
141import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
142import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
143import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
144import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
145import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
146import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
147import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
148import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
149import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
150import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
151import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
152import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
153import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
154import org.apache.hadoop.hbase.security.SecurityConstants;
155import org.apache.hadoop.hbase.security.Superusers;
156import org.apache.hadoop.hbase.security.User;
157import org.apache.hadoop.hbase.security.UserProvider;
158import org.apache.hadoop.hbase.trace.TraceUtil;
159import org.apache.hadoop.hbase.util.Bytes;
160import org.apache.hadoop.hbase.util.CompressionTest;
161import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
162import org.apache.hadoop.hbase.util.DNS;
163import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
164import org.apache.hadoop.hbase.util.FSUtils;
165import org.apache.hadoop.hbase.util.FutureUtils;
166import org.apache.hadoop.hbase.util.JvmPauseMonitor;
167import org.apache.hadoop.hbase.util.Pair;
168import org.apache.hadoop.hbase.util.RetryCounter;
169import org.apache.hadoop.hbase.util.RetryCounterFactory;
170import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
171import org.apache.hadoop.hbase.util.Threads;
172import org.apache.hadoop.hbase.util.VersionInfo;
173import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
174import org.apache.hadoop.hbase.wal.WAL;
175import org.apache.hadoop.hbase.wal.WALFactory;
176import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
177import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
178import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
179import org.apache.hadoop.hbase.zookeeper.ZKUtil;
180import org.apache.hadoop.ipc.RemoteException;
181import org.apache.hadoop.util.ReflectionUtils;
182import org.apache.yetus.audience.InterfaceAudience;
183import org.apache.zookeeper.KeeperException;
184import org.slf4j.Logger;
185import org.slf4j.LoggerFactory;
186
187import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
188import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
189import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
190import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
191import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
192import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
193import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
194import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
195import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
196import org.apache.hbase.thirdparty.com.google.protobuf.Message;
197import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
198import org.apache.hbase.thirdparty.com.google.protobuf.Service;
199import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
200import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
201import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
202
203import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
204import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
233
234/**
235 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
236 * are many HRegionServers in a single HBase deployment.
237 */
238@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
239@SuppressWarnings({ "deprecation" })
240public class HRegionServer extends HBaseServerBase<RSRpcServices>
241  implements RegionServerServices, LastSequenceId {
242
243  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
244
245  int unitMB = 1024 * 1024;
246  int unitKB = 1024;
247
248  /**
249   * For testing only! Set to true to skip notifying region assignment to master .
250   */
251  @InterfaceAudience.Private
252  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
253  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
254
255  /**
256   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
257   * region action in progress false - if close region action in progress
258   */
259  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
260    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
261
262  /**
263   * Used to cache the open/close region procedures which already submitted. See
264   * {@link #submitRegionProcedure(long)}.
265   */
266  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
267  /**
268   * Used to cache the open/close region procedures which already executed. See
269   * {@link #submitRegionProcedure(long)}.
270   */
271  private final Cache<Long, Long> executedRegionProcedures =
272    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
273
274  /**
275   * Used to cache the moved-out regions
276   */
277  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
278    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
279
280  private MemStoreFlusher cacheFlusher;
281
282  private HeapMemoryManager hMemManager;
283
284  // Replication services. If no replication, this handler will be null.
285  private ReplicationSourceService replicationSourceHandler;
286  private ReplicationSinkService replicationSinkHandler;
287  private boolean sameReplicationSourceAndSink;
288
289  // Compactions
290  private CompactSplit compactSplitThread;
291
292  /**
293   * Map of regions currently being served by this region server. Key is the encoded region name.
294   * All access should be synchronized.
295   */
296  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
297  /**
298   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
299   * need to be a ConcurrentHashMap?
300   */
301  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
302
303  /**
304   * Map of encoded region names to the DataNode locations they should be hosted on We store the
305   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
306   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
307   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
308   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
309   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
310   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
311   * will be fresh when we need it.
312   */
313  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
314
315  private LeaseManager leaseManager;
316
317  private volatile boolean dataFsOk;
318  private volatile boolean isGlobalReadOnlyEnabled;
319
320  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
321  // Default abort timeout is 1200 seconds for safe
322  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
323  // Will run this task when abort timeout
324  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
325
326  // A state before we go into stopped state. At this stage we're closing user
327  // space regions.
328  private boolean stopping = false;
329  private volatile boolean killed = false;
330
331  private final int threadWakeFrequency;
332
333  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
334  private final int compactionCheckFrequency;
335  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
336  private final int flushCheckFrequency;
337
338  // Stub to do region server status calls against the master.
339  private volatile RegionServerStatusService.BlockingInterface rssStub;
340  private volatile LockService.BlockingInterface lockStub;
341  // RPC client. Used to make the stub above that does region server status checking.
342  private RpcClient rpcClient;
343
344  private UncaughtExceptionHandler uncaughtExceptionHandler;
345
346  private JvmPauseMonitor pauseMonitor;
347
348  private RSSnapshotVerifier rsSnapshotVerifier;
349
350  /** region server process name */
351  public static final String REGIONSERVER = "regionserver";
352
353  private MetricsRegionServer metricsRegionServer;
354  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
355
356  /**
357   * Check for compactions requests.
358   */
359  private ScheduledChore compactionChecker;
360
361  /**
362   * Check for flushes
363   */
364  private ScheduledChore periodicFlusher;
365
366  private volatile WALFactory walFactory;
367
368  private LogRoller walRoller;
369
370  // A thread which calls reportProcedureDone
371  private RemoteProcedureResultReporter procedureResultReporter;
372
373  // flag set after we're done setting up server threads
374  final AtomicBoolean online = new AtomicBoolean(false);
375
376  // master address tracker
377  private final MasterAddressTracker masterAddressTracker;
378
379  // Log Splitting Worker
380  private SplitLogWorker splitLogWorker;
381
382  private final int shortOperationTimeout;
383
384  // Time to pause if master says 'please hold'
385  private final long retryPauseTime;
386
387  private final RegionServerAccounting regionServerAccounting;
388
389  private NamedQueueServiceChore namedQueueServiceChore = null;
390
391  // Block cache
392  private BlockCache blockCache;
393  // The cache for mob files
394  private MobFileCache mobFileCache;
395
396  /** The health check chore. */
397  private HealthCheckChore healthCheckChore;
398
399  /** The Executor status collect chore. */
400  private ExecutorStatusChore executorStatusChore;
401
402  /** The nonce manager chore. */
403  private ScheduledChore nonceManagerChore;
404
405  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
406
407  /**
408   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
409   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
410   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
411   */
412  @Deprecated
413  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
414  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
415    "hbase.regionserver.hostname.disable.master.reversedns";
416
417  /**
418   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
419   * Exception will be thrown if both are used.
420   */
421  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
422  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
423    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
424
425  /**
426   * Unique identifier for the cluster we are a part of.
427   */
428  private String clusterId;
429
430  // chore for refreshing store files for secondary regions
431  private StorefileRefresherChore storefileRefresher;
432
433  private volatile RegionServerCoprocessorHost rsHost;
434
435  private RegionServerProcedureManagerHost rspmHost;
436
437  private RegionServerRpcQuotaManager rsQuotaManager;
438  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
439
440  /**
441   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
442   * case where client doesn't receive the response from a successful operation and retries. We
443   * track the successful ops for some time via a nonce sent by client and handle duplicate
444   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
445   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
446   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
447   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
448   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
449   * recovery during normal region move, so nonces will not be transfered. We can have separate
450   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
451   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
452   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
453   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
454   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
455   * recovered during move.
456   */
457  final ServerNonceManager nonceManager;
458
459  private BrokenStoreFileCleaner brokenStoreFileCleaner;
460
461  private RSMobFileCleanerChore rsMobFileCleanerChore;
462
463  @InterfaceAudience.Private
464  CompactedHFilesDischarger compactedFileDischarger;
465
466  private volatile ThroughputController flushThroughputController;
467
468  private SecureBulkLoadManager secureBulkLoadManager;
469
470  private FileSystemUtilizationChore fsUtilizationChore;
471
472  private BootstrapNodeManager bootstrapNodeManager;
473
474  /**
475   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
476   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
477   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
478   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
479   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
480   */
481  private final boolean masterless;
482  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
483
484  /** regionserver codec list **/
485  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
486
487  // A timer to shutdown the process if abort takes too long
488  private Timer abortMonitor;
489
490  private RegionReplicationBufferManager regionReplicationBufferManager;
491
492  /*
493   * Chore that creates replication marker rows.
494   */
495  private ReplicationMarkerChore replicationMarkerChore;
496
497  // A timer submit requests to the PrefetchExecutor
498  private PrefetchExecutorNotifier prefetchExecutorNotifier;
499
500  /**
501   * Starts a HRegionServer at the default location.
502   * <p/>
503   * Don't start any services or managers in here in the Constructor. Defer till after we register
504   * with the Master as much as possible. See {@link #startServices}.
505   */
506  public HRegionServer(final Configuration conf) throws IOException {
507    super(conf, "RegionServer"); // thread name
508    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
509    try (Scope ignored = span.makeCurrent()) {
510      this.dataFsOk = true;
511      this.masterless = !clusterMode();
512      MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf);
513      HFile.checkHFileVersion(this.conf);
514      checkCodecs(this.conf);
515      FSUtils.setupShortCircuitRead(this.conf);
516
517      // Disable usage of meta replicas in the regionserver
518      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
519      // Config'ed params
520      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
521      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
522      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
523
524      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
525      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
526
527      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
528        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
529
530      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
531        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
532
533      regionServerAccounting = new RegionServerAccounting(conf);
534
535      blockCache = BlockCacheFactory.createBlockCache(conf);
536      // The call below, instantiates the DataTieringManager only when
537      // the configuration "hbase.regionserver.datatiering.enable" is set to true.
538      DataTieringManager.instantiate(conf, onlineRegions);
539
540      mobFileCache = new MobFileCache(conf);
541
542      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
543
544      uncaughtExceptionHandler =
545        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
546
547      // If no master in cluster, skip trying to track one or look for a cluster status.
548      if (!this.masterless) {
549        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
550        masterAddressTracker.start();
551      } else {
552        masterAddressTracker = null;
553      }
554      this.rpcServices.start(zooKeeper);
555      span.setStatus(StatusCode.OK);
556    } catch (Throwable t) {
557      // Make sure we log the exception. HRegionServer is often started via reflection and the
558      // cause of failed startup is lost.
559      TraceUtil.setError(span, t);
560      LOG.error("Failed construction RegionServer", t);
561      throw t;
562    } finally {
563      span.end();
564    }
565  }
566
567  // HMaster should override this method to load the specific config for master
568  @Override
569  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
570    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
571    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
572      if (!StringUtils.isBlank(hostname)) {
573        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
574          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
575          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
576          + UNSAFE_RS_HOSTNAME_KEY + " is used";
577        throw new IOException(msg);
578      } else {
579        return DNS.getHostname(conf, DNS.ServerType.REGIONSERVER);
580      }
581    } else {
582      return hostname;
583    }
584  }
585
586  @Override
587  protected DNS.ServerType getDNSServerType() {
588    return DNS.ServerType.REGIONSERVER;
589  }
590
591  @Override
592  protected void login(UserProvider user, String host) throws IOException {
593    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
594      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
595  }
596
597  @Override
598  protected String getProcessName() {
599    return REGIONSERVER;
600  }
601
602  @Override
603  protected RegionServerCoprocessorHost getCoprocessorHost() {
604    return getRegionServerCoprocessorHost();
605  }
606
607  @Override
608  protected boolean canCreateBaseZNode() {
609    return !clusterMode();
610  }
611
612  @Override
613  protected boolean canUpdateTableDescriptor() {
614    return false;
615  }
616
617  @Override
618  protected boolean cacheTableDescriptor() {
619    return false;
620  }
621
622  protected RSRpcServices createRpcServices() throws IOException {
623    return new RSRpcServices(this);
624  }
625
626  @Override
627  protected void configureInfoServer(InfoServer infoServer) {
628    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
629    infoServer.setAttribute(REGIONSERVER, this);
630  }
631
632  @Override
633  protected Class<? extends HttpServlet> getDumpServlet() {
634    return RSDumpServlet.class;
635  }
636
637  /**
638   * Used by {@link RSDumpServlet} to generate debugging information.
639   */
640  public void dumpRowLocks(final PrintWriter out) {
641    StringBuilder sb = new StringBuilder();
642    for (HRegion region : getRegions()) {
643      if (region.getLockedRows().size() > 0) {
644        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
645          sb.setLength(0);
646          sb.append(region.getTableDescriptor().getTableName()).append(",")
647            .append(region.getRegionInfo().getEncodedName()).append(",");
648          sb.append(rowLockContext.toString());
649          out.println(sb);
650        }
651      }
652    }
653  }
654
655  @Override
656  public boolean registerService(Service instance) {
657    // No stacking of instances is allowed for a single executorService name
658    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
659    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
660    if (coprocessorServiceHandlers.containsKey(serviceName)) {
661      LOG.error("Coprocessor executorService " + serviceName
662        + " already registered, rejecting request from " + instance);
663      return false;
664    }
665
666    coprocessorServiceHandlers.put(serviceName, instance);
667    if (LOG.isDebugEnabled()) {
668      LOG.debug(
669        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
670    }
671    return true;
672  }
673
674  /**
675   * Run test on configured codecs to make sure supporting libs are in place.
676   */
677  private static void checkCodecs(final Configuration c) throws IOException {
678    // check to see if the codec list is available:
679    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
680    if (codecs == null) {
681      return;
682    }
683    for (String codec : codecs) {
684      if (!CompressionTest.testCompression(codec)) {
685        throw new IOException(
686          "Compression codec " + codec + " not supported, aborting RS construction");
687      }
688    }
689  }
690
691  public String getClusterId() {
692    return this.clusterId;
693  }
694
695  /**
696   * All initialization needed before we go register with Master.<br>
697   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
698   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
699   */
700  private void preRegistrationInitialization() {
701    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
702    try (Scope ignored = span.makeCurrent()) {
703      initializeZooKeeper();
704      setupClusterConnection();
705      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
706      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
707      // Setup RPC client for master communication
708      this.rpcClient = asyncClusterConnection.getRpcClient();
709      span.setStatus(StatusCode.OK);
710    } catch (Throwable t) {
711      // Call stop if error or process will stick around for ever since server
712      // puts up non-daemon threads.
713      TraceUtil.setError(span, t);
714      this.rpcServices.stop();
715      abort("Initialization of RS failed.  Hence aborting RS.", t);
716    } finally {
717      span.end();
718    }
719  }
720
721  /**
722   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
723   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
724   * <p>
725   * Finally open long-living server short-circuit connection.
726   */
727  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
728      justification = "cluster Id znode read would give us correct response")
729  private void initializeZooKeeper() throws IOException, InterruptedException {
730    // Nothing to do in here if no Master in the mix.
731    if (this.masterless) {
732      return;
733    }
734
735    // Create the master address tracker, register with zk, and start it. Then
736    // block until a master is available. No point in starting up if no master
737    // running.
738    blockAndCheckIfStopped(this.masterAddressTracker);
739
740    // Wait on cluster being up. Master will set this flag up in zookeeper
741    // when ready.
742    blockAndCheckIfStopped(this.clusterStatusTracker);
743
744    // If we are HMaster then the cluster id should have already been set.
745    if (clusterId == null) {
746      // Retrieve clusterId
747      // Since cluster status is now up
748      // ID should have already been set by HMaster
749      try {
750        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
751        if (clusterId == null) {
752          this.abort("Cluster ID has not been set");
753        }
754        LOG.info("ClusterId : " + clusterId);
755      } catch (KeeperException e) {
756        this.abort("Failed to retrieve Cluster ID", e);
757      }
758    }
759
760    if (isStopped() || isAborted()) {
761      return; // No need for further initialization
762    }
763
764    // watch for snapshots and other procedures
765    try {
766      rspmHost = new RegionServerProcedureManagerHost();
767      rspmHost.loadProcedures(conf);
768      rspmHost.initialize(this);
769    } catch (KeeperException e) {
770      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
771    }
772  }
773
774  /**
775   * Utilty method to wait indefinitely on a znode availability while checking if the region server
776   * is shut down
777   * @param tracker znode tracker to use
778   * @throws IOException          any IO exception, plus if the RS is stopped
779   * @throws InterruptedException if the waiting thread is interrupted
780   */
781  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
782    throws IOException, InterruptedException {
783    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
784      if (this.stopped) {
785        throw new IOException("Received the shutdown message while waiting.");
786      }
787    }
788  }
789
790  /** Returns True if the cluster is up. */
791  @Override
792  public boolean isClusterUp() {
793    return this.masterless
794      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
795  }
796
797  private void initializeReplicationMarkerChore() {
798    boolean replicationMarkerEnabled =
799      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
800    // If replication or replication marker is not enabled then return immediately.
801    if (replicationMarkerEnabled) {
802      int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
803        REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
804      replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf);
805    }
806  }
807
808  @Override
809  public boolean isStopping() {
810    return stopping;
811  }
812
813  /**
814   * The HRegionServer sticks in this loop until closed.
815   */
816  @Override
817  public void run() {
818    if (isStopped()) {
819      LOG.info("Skipping run; stopped");
820      return;
821    }
822    try {
823      // Do pre-registration initializations; zookeeper, lease threads, etc.
824      preRegistrationInitialization();
825    } catch (Throwable e) {
826      abort("Fatal exception during initialization", e);
827    }
828
829    try {
830      if (!isStopped() && !isAborted()) {
831        installShutdownHook();
832
833        CoprocessorConfigurationUtil.syncReadOnlyConfigurations(conf,
834          CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
835
836        // Initialize the RegionServerCoprocessorHost now that our ephemeral
837        // node was created, in case any coprocessors want to use ZooKeeper
838        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
839
840        // Try and register with the Master; tell it we are here. Break if server is stopped or
841        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
842        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
843        // come up.
844        LOG.debug("About to register with Master.");
845        TraceUtil.trace(() -> {
846          RetryCounterFactory rcf =
847            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
848          RetryCounter rc = rcf.create();
849          while (keepLooping()) {
850            RegionServerStartupResponse w = reportForDuty();
851            if (w == null) {
852              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
853              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
854              this.sleeper.sleep(sleepTime);
855            } else {
856              handleReportForDutyResponse(w);
857              break;
858            }
859          }
860        }, "HRegionServer.registerWithMaster");
861      }
862
863      if (!isStopped() && isHealthy()) {
864        TraceUtil.trace(() -> {
865          // start the snapshot handler and other procedure handlers,
866          // since the server is ready to run
867          if (this.rspmHost != null) {
868            this.rspmHost.start();
869          }
870          // Start the Quota Manager
871          if (this.rsQuotaManager != null) {
872            rsQuotaManager.start(getRpcServer().getScheduler());
873          }
874          if (this.rsSpaceQuotaManager != null) {
875            this.rsSpaceQuotaManager.start();
876          }
877        }, "HRegionServer.startup");
878      }
879
880      // We registered with the Master. Go into run mode.
881      long lastMsg = EnvironmentEdgeManager.currentTime();
882      long oldRequestCount = -1;
883      // The main run loop.
884      while (!isStopped() && isHealthy()) {
885        if (!isClusterUp()) {
886          if (onlineRegions.isEmpty()) {
887            stop("Exiting; cluster shutdown set and not carrying any regions");
888          } else if (!this.stopping) {
889            this.stopping = true;
890            LOG.info("Closing user regions");
891            closeUserRegions(isAborted());
892          } else {
893            boolean allUserRegionsOffline = areAllUserRegionsOffline();
894            if (allUserRegionsOffline) {
895              // Set stopped if no more write requests tp meta tables
896              // since last time we went around the loop. Any open
897              // meta regions will be closed on our way out.
898              if (oldRequestCount == getWriteRequestCount()) {
899                stop("Stopped; only catalog regions remaining online");
900                break;
901              }
902              oldRequestCount = getWriteRequestCount();
903            } else {
904              // Make sure all regions have been closed -- some regions may
905              // have not got it because we were splitting at the time of
906              // the call to closeUserRegions.
907              closeUserRegions(this.abortRequested.get());
908            }
909            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
910          }
911        }
912        long now = EnvironmentEdgeManager.currentTime();
913        if ((now - lastMsg) >= msgInterval) {
914          tryRegionServerReport(lastMsg, now);
915          lastMsg = EnvironmentEdgeManager.currentTime();
916        }
917        if (!isStopped() && !isAborted()) {
918          this.sleeper.sleep();
919        }
920      } // for
921    } catch (Throwable t) {
922      if (!rpcServices.checkOOME(t)) {
923        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
924        abort(prefix + t.getMessage(), t);
925      }
926    }
927
928    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
929    try (Scope ignored = span.makeCurrent()) {
930      if (this.leaseManager != null) {
931        this.leaseManager.closeAfterLeasesExpire();
932      }
933      if (this.splitLogWorker != null) {
934        splitLogWorker.stop();
935      }
936      stopInfoServer();
937      // Send cache a shutdown.
938      if (blockCache != null) {
939        blockCache.shutdown();
940      }
941      if (mobFileCache != null) {
942        mobFileCache.shutdown();
943      }
944
945      // Send interrupts to wake up threads if sleeping so they notice shutdown.
946      // TODO: Should we check they are alive? If OOME could have exited already
947      if (this.hMemManager != null) {
948        this.hMemManager.stop();
949      }
950      if (this.cacheFlusher != null) {
951        this.cacheFlusher.interruptIfNecessary();
952      }
953      if (this.compactSplitThread != null) {
954        this.compactSplitThread.interruptIfNecessary();
955      }
956
957      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
958      if (rspmHost != null) {
959        rspmHost.stop(this.abortRequested.get() || this.killed);
960      }
961
962      if (this.killed) {
963        // Just skip out w/o closing regions. Used when testing.
964      } else if (abortRequested.get()) {
965        if (this.dataFsOk) {
966          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
967        }
968        LOG.info("aborting server " + this.serverName);
969      } else {
970        closeUserRegions(abortRequested.get());
971        LOG.info("stopping server " + this.serverName);
972      }
973      regionReplicationBufferManager.stop();
974      closeClusterConnection();
975      // Closing the compactSplit thread before closing meta regions
976      if (!this.killed && containsMetaTableRegions()) {
977        if (!abortRequested.get() || this.dataFsOk) {
978          if (this.compactSplitThread != null) {
979            this.compactSplitThread.join();
980            this.compactSplitThread = null;
981          }
982          closeMetaTableRegions(abortRequested.get());
983        }
984      }
985
986      if (!this.killed && this.dataFsOk) {
987        waitOnAllRegionsToClose(abortRequested.get());
988        LOG.info("stopping server " + this.serverName + "; all regions closed.");
989      }
990
991      // Stop the quota manager
992      if (rsQuotaManager != null) {
993        rsQuotaManager.stop();
994      }
995      if (rsSpaceQuotaManager != null) {
996        rsSpaceQuotaManager.stop();
997        rsSpaceQuotaManager = null;
998      }
999
1000      // flag may be changed when closing regions throws exception.
1001      if (this.dataFsOk) {
1002        shutdownWAL(!abortRequested.get());
1003      }
1004
1005      // Make sure the proxy is down.
1006      if (this.rssStub != null) {
1007        this.rssStub = null;
1008      }
1009      if (this.lockStub != null) {
1010        this.lockStub = null;
1011      }
1012      if (this.rpcClient != null) {
1013        this.rpcClient.close();
1014      }
1015      if (this.leaseManager != null) {
1016        this.leaseManager.close();
1017      }
1018      if (this.pauseMonitor != null) {
1019        this.pauseMonitor.stop();
1020      }
1021
1022      if (!killed) {
1023        stopServiceThreads();
1024      }
1025
1026      if (this.rpcServices != null) {
1027        this.rpcServices.stop();
1028      }
1029
1030      try {
1031        deleteMyEphemeralNode();
1032      } catch (KeeperException.NoNodeException nn) {
1033        // pass
1034      } catch (KeeperException e) {
1035        LOG.warn("Failed deleting my ephemeral node", e);
1036      }
1037      // We may have failed to delete the znode at the previous step, but
1038      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1039      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1040
1041      closeZooKeeper();
1042      closeTableDescriptors();
1043      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1044      span.setStatus(StatusCode.OK);
1045    } finally {
1046      span.end();
1047    }
1048  }
1049
1050  private boolean containsMetaTableRegions() {
1051    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1052  }
1053
1054  private boolean areAllUserRegionsOffline() {
1055    if (getNumberOfOnlineRegions() > 2) {
1056      return false;
1057    }
1058    boolean allUserRegionsOffline = true;
1059    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1060      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1061        allUserRegionsOffline = false;
1062        break;
1063      }
1064    }
1065    return allUserRegionsOffline;
1066  }
1067
1068  /** Returns Current write count for all online regions. */
1069  private long getWriteRequestCount() {
1070    long writeCount = 0;
1071    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1072      writeCount += e.getValue().getWriteRequestsCount();
1073    }
1074    return writeCount;
1075  }
1076
1077  @InterfaceAudience.Private
1078  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1079    throws IOException {
1080    RegionServerStatusService.BlockingInterface rss = rssStub;
1081    if (rss == null) {
1082      // the current server could be stopping.
1083      return;
1084    }
1085    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1086    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1087    try (Scope ignored = span.makeCurrent()) {
1088      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1089      request.setServer(ProtobufUtil.toServerName(this.serverName));
1090      request.setLoad(sl);
1091      rss.regionServerReport(null, request.build());
1092      span.setStatus(StatusCode.OK);
1093    } catch (ServiceException se) {
1094      IOException ioe = ProtobufUtil.getRemoteException(se);
1095      if (ioe instanceof YouAreDeadException) {
1096        // This will be caught and handled as a fatal error in run()
1097        TraceUtil.setError(span, ioe);
1098        throw ioe;
1099      }
1100      if (rssStub == rss) {
1101        rssStub = null;
1102      }
1103      TraceUtil.setError(span, se);
1104      // Couldn't connect to the master, get location from zk and reconnect
1105      // Method blocks until new master is found or we are stopped
1106      createRegionServerStatusStub(true);
1107    } finally {
1108      span.end();
1109    }
1110  }
1111
1112  /**
1113   * Reports the given map of Regions and their size on the filesystem to the active Master.
1114   * @param regionSizeStore The store containing region sizes
1115   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1116   */
1117  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1118    RegionServerStatusService.BlockingInterface rss = rssStub;
1119    if (rss == null) {
1120      // the current server could be stopping.
1121      LOG.trace("Skipping Region size report to HMaster as stub is null");
1122      return true;
1123    }
1124    try {
1125      buildReportAndSend(rss, regionSizeStore);
1126    } catch (ServiceException se) {
1127      IOException ioe = ProtobufUtil.getRemoteException(se);
1128      if (ioe instanceof PleaseHoldException) {
1129        LOG.trace("Failed to report region sizes to Master because it is initializing."
1130          + " This will be retried.", ioe);
1131        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1132        return true;
1133      }
1134      if (rssStub == rss) {
1135        rssStub = null;
1136      }
1137      createRegionServerStatusStub(true);
1138      if (ioe instanceof DoNotRetryIOException) {
1139        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1140        if (doNotRetryEx.getCause() != null) {
1141          Throwable t = doNotRetryEx.getCause();
1142          if (t instanceof UnsupportedOperationException) {
1143            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1144            return false;
1145          }
1146        }
1147      }
1148      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1149    }
1150    return true;
1151  }
1152
1153  /**
1154   * Builds the region size report and sends it to the master. Upon successful sending of the
1155   * report, the region sizes that were sent are marked as sent.
1156   * @param rss             The stub to send to the Master
1157   * @param regionSizeStore The store containing region sizes
1158   */
1159  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1160    RegionSizeStore regionSizeStore) throws ServiceException {
1161    RegionSpaceUseReportRequest request =
1162      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1163    rss.reportRegionSpaceUse(null, request);
1164    // Record the number of size reports sent
1165    if (metricsRegionServer != null) {
1166      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1167    }
1168  }
1169
1170  /**
1171   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1172   * @param regionSizes The size in bytes of regions
1173   * @return The corresponding protocol buffer message.
1174   */
1175  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1176    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1177    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1178      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1179    }
1180    return request.build();
1181  }
1182
1183  /**
1184   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1185   * message.
1186   * @param regionInfo  The RegionInfo
1187   * @param sizeInBytes The size in bytes of the Region
1188   * @return The protocol buffer
1189   */
1190  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1191    return RegionSpaceUse.newBuilder()
1192      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1193      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1194  }
1195
1196  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1197    throws IOException {
1198    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1199    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1200    // the wrapper to compute those numbers in one place.
1201    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1202    // Instead they should be stored in an HBase table so that external visibility into HBase is
1203    // improved; Additionally the load balancer will be able to take advantage of a more complete
1204    // history.
1205    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1206    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1207    long usedMemory = -1L;
1208    long maxMemory = -1L;
1209    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1210    if (usage != null) {
1211      usedMemory = usage.getUsed();
1212      maxMemory = usage.getMax();
1213    }
1214
1215    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1216    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1217    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1218    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1219    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1220    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1221    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1222    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1223    Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder();
1224    for (String coprocessor : coprocessors) {
1225      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1226    }
1227    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1228    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1229    for (HRegion region : regions) {
1230      if (region.getCoprocessorHost() != null) {
1231        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1232        for (String regionCoprocessor : regionCoprocessors) {
1233          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1234        }
1235      }
1236      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1237      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1238        .getCoprocessors()) {
1239        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1240      }
1241    }
1242
1243    getBlockCache().ifPresent(cache -> {
1244      cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1245        regionCachedInfo.forEach((regionName, prefetchSize) -> {
1246          serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB));
1247        });
1248      });
1249    });
1250    serverLoad.setCacheFreeSize(regionServerWrapper.getBlockCacheFreeSize());
1251    if (DataTieringManager.getInstance() != null) {
1252      DataTieringManager.getInstance().getRegionColdDataSize()
1253        .forEach((regionName, coldDataSize) -> serverLoad.putRegionColdData(regionName,
1254          roundSize(coldDataSize.getSecond(), unitMB)));
1255    }
1256    serverLoad.setReportStartTime(reportStartTime);
1257    serverLoad.setReportEndTime(reportEndTime);
1258    if (this.infoServer != null) {
1259      serverLoad.setInfoServerPort(this.infoServer.getPort());
1260    } else {
1261      serverLoad.setInfoServerPort(-1);
1262    }
1263    MetricsUserAggregateSource userSource =
1264      metricsRegionServer.getMetricsUserAggregate().getSource();
1265    if (userSource != null) {
1266      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1267      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1268        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1269      }
1270    }
1271
1272    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1273      // always refresh first to get the latest value
1274      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1275      if (rLoad != null) {
1276        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1277        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1278          .getReplicationLoadSourceEntries()) {
1279          serverLoad.addReplLoadSource(rLS);
1280        }
1281      }
1282    } else {
1283      if (replicationSourceHandler != null) {
1284        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1285        if (rLoad != null) {
1286          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1287            .getReplicationLoadSourceEntries()) {
1288            serverLoad.addReplLoadSource(rLS);
1289          }
1290        }
1291      }
1292      if (replicationSinkHandler != null) {
1293        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1294        if (rLoad != null) {
1295          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1296        }
1297      }
1298    }
1299
1300    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1301      .newBuilder().setDescription(task.getDescription())
1302      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1303      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1304      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1305
1306    return serverLoad.build();
1307  }
1308
1309  private String getOnlineRegionsAsPrintableString() {
1310    StringBuilder sb = new StringBuilder();
1311    for (Region r : this.onlineRegions.values()) {
1312      if (sb.length() > 0) {
1313        sb.append(", ");
1314      }
1315      sb.append(r.getRegionInfo().getEncodedName());
1316    }
1317    return sb.toString();
1318  }
1319
1320  /**
1321   * Wait on regions close.
1322   */
1323  private void waitOnAllRegionsToClose(final boolean abort) {
1324    // Wait till all regions are closed before going out.
1325    int lastCount = -1;
1326    long previousLogTime = 0;
1327    Set<String> closedRegions = new HashSet<>();
1328    boolean interrupted = false;
1329    try {
1330      while (!onlineRegions.isEmpty()) {
1331        int count = getNumberOfOnlineRegions();
1332        // Only print a message if the count of regions has changed.
1333        if (count != lastCount) {
1334          // Log every second at most
1335          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1336            previousLogTime = EnvironmentEdgeManager.currentTime();
1337            lastCount = count;
1338            LOG.info("Waiting on " + count + " regions to close");
1339            // Only print out regions still closing if a small number else will
1340            // swamp the log.
1341            if (count < 10 && LOG.isDebugEnabled()) {
1342              LOG.debug("Online Regions=" + this.onlineRegions);
1343            }
1344          }
1345        }
1346        // Ensure all user regions have been sent a close. Use this to
1347        // protect against the case where an open comes in after we start the
1348        // iterator of onlineRegions to close all user regions.
1349        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1350          RegionInfo hri = e.getValue().getRegionInfo();
1351          if (
1352            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1353              && !closedRegions.contains(hri.getEncodedName())
1354          ) {
1355            closedRegions.add(hri.getEncodedName());
1356            // Don't update zk with this close transition; pass false.
1357            closeRegionIgnoreErrors(hri, abort);
1358          }
1359        }
1360        // No regions in RIT, we could stop waiting now.
1361        if (this.regionsInTransitionInRS.isEmpty()) {
1362          if (!onlineRegions.isEmpty()) {
1363            LOG.info("We were exiting though online regions are not empty,"
1364              + " because some regions failed closing");
1365          }
1366          break;
1367        } else {
1368          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1369            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1370        }
1371        if (sleepInterrupted(200)) {
1372          interrupted = true;
1373        }
1374      }
1375    } finally {
1376      if (interrupted) {
1377        Thread.currentThread().interrupt();
1378      }
1379    }
1380  }
1381
1382  private static boolean sleepInterrupted(long millis) {
1383    boolean interrupted = false;
1384    try {
1385      Thread.sleep(millis);
1386    } catch (InterruptedException e) {
1387      LOG.warn("Interrupted while sleeping");
1388      interrupted = true;
1389    }
1390    return interrupted;
1391  }
1392
1393  private void shutdownWAL(final boolean close) {
1394    if (this.walFactory != null) {
1395      try {
1396        if (close) {
1397          walFactory.close();
1398        } else {
1399          walFactory.shutdown();
1400        }
1401      } catch (Throwable e) {
1402        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1403        LOG.error("Shutdown / close of WAL failed: " + e);
1404        LOG.debug("Shutdown / close exception details:", e);
1405      }
1406    }
1407  }
1408
1409  /**
1410   * Run init. Sets up wal and starts up all server threads.
1411   * @param c Extra configuration.
1412   */
1413  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1414    throws IOException {
1415    try {
1416      boolean updateRootDir = false;
1417      for (NameStringPair e : c.getMapEntriesList()) {
1418        String key = e.getName();
1419        // The hostname the master sees us as.
1420        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1421          String hostnameFromMasterPOV = e.getValue();
1422          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1423            rpcServices.getSocketAddress().getPort(), this.startcode);
1424          String expectedHostName = rpcServices.getSocketAddress().getHostName();
1425          // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it
1426          // is set to disable. so we will use the ip of the RegionServer to compare with the
1427          // hostname passed by the Master, see HBASE-27304 for details.
1428          if (
1429            StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent()
1430              && InetAddresses.isInetAddress(getActiveMaster().get().getHostname())
1431          ) {
1432            expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress();
1433          }
1434          boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead)
1435            ? hostnameFromMasterPOV.equals(expectedHostName)
1436            : hostnameFromMasterPOV.equals(useThisHostnameInstead);
1437          if (!isHostnameConsist) {
1438            String msg = "Master passed us a different hostname to use; was="
1439              + (StringUtils.isBlank(useThisHostnameInstead)
1440                ? expectedHostName
1441                : this.useThisHostnameInstead)
1442              + ", but now=" + hostnameFromMasterPOV;
1443            LOG.error(msg);
1444            throw new IOException(msg);
1445          }
1446          continue;
1447        }
1448
1449        String value = e.getValue();
1450        if (key.equals(HConstants.HBASE_DIR)) {
1451          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1452            updateRootDir = true;
1453          }
1454        }
1455
1456        if (LOG.isDebugEnabled()) {
1457          LOG.debug("Config from master: " + key + "=" + value);
1458        }
1459        this.conf.set(key, value);
1460      }
1461      // Set our ephemeral znode up in zookeeper now we have a name.
1462      createMyEphemeralNode();
1463
1464      if (updateRootDir) {
1465        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1466        initializeFileSystem();
1467      }
1468
1469      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1470      // config param for task trackers, but we can piggyback off of it.
1471      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1472        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1473      }
1474
1475      // Save it in a file, this will allow to see if we crash
1476      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1477
1478      // This call sets up an initialized replication and WAL. Later we start it up.
1479      setupWALAndReplication();
1480      // Init in here rather than in constructor after thread name has been set
1481      final MetricsTable metricsTable =
1482        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1483      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1484      this.metricsRegionServer =
1485        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1486      // Now that we have a metrics source, start the pause monitor
1487      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1488      pauseMonitor.start();
1489
1490      // There is a rare case where we do NOT want services to start. Check config.
1491      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1492        startServices();
1493      }
1494      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1495      // or make sense of it.
1496      startReplicationService();
1497
1498      // Set up ZK
1499      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress()
1500        + ", sessionid=0x"
1501        + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1502
1503      // Wake up anyone waiting for this server to online
1504      synchronized (online) {
1505        online.set(true);
1506        online.notifyAll();
1507      }
1508    } catch (Throwable e) {
1509      stop("Failed initialization");
1510      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1511    } finally {
1512      sleeper.skipSleepCycle();
1513    }
1514  }
1515
1516  private void startHeapMemoryManager() {
1517    if (this.blockCache != null) {
1518      this.hMemManager =
1519        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1520      this.hMemManager.start(getChoreService());
1521    }
1522  }
1523
1524  private void createMyEphemeralNode() throws KeeperException {
1525    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1526    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1527    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1528    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1529    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1530  }
1531
1532  private void deleteMyEphemeralNode() throws KeeperException {
1533    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1534  }
1535
1536  @Override
1537  public RegionServerAccounting getRegionServerAccounting() {
1538    return regionServerAccounting;
1539  }
1540
1541  // Round the size with KB or MB.
1542  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1543  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1544  // HBASE-26340 for more details on why this is important.
1545  private static int roundSize(long sizeInByte, int sizeUnit) {
1546    if (sizeInByte == 0) {
1547      return 0;
1548    } else if (sizeInByte < sizeUnit) {
1549      return 1;
1550    } else {
1551      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1552    }
1553  }
1554
1555  /**
1556   * @param r               Region to get RegionLoad for.
1557   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1558   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1559   * @return RegionLoad instance.
1560   */
1561  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1562    RegionSpecifier.Builder regionSpecifier) throws IOException {
1563    byte[] name = r.getRegionInfo().getRegionName();
1564    String regionEncodedName = r.getRegionInfo().getEncodedName();
1565    int stores = 0;
1566    int storefiles = 0;
1567    int storeRefCount = 0;
1568    int maxCompactedStoreFileRefCount = 0;
1569    long storeUncompressedSize = 0L;
1570    long storefileSize = 0L;
1571    long storefileIndexSize = 0L;
1572    long rootLevelIndexSize = 0L;
1573    long totalStaticIndexSize = 0L;
1574    long totalStaticBloomSize = 0L;
1575    long totalCompactingKVs = 0L;
1576    long currentCompactedKVs = 0L;
1577    long totalRegionSize = 0L;
1578    List<HStore> storeList = r.getStores();
1579    stores += storeList.size();
1580    for (HStore store : storeList) {
1581      storefiles += store.getStorefilesCount();
1582      int currentStoreRefCount = store.getStoreRefCount();
1583      storeRefCount += currentStoreRefCount;
1584      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1585      maxCompactedStoreFileRefCount =
1586        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1587      storeUncompressedSize += store.getStoreSizeUncompressed();
1588      storefileSize += store.getStorefilesSize();
1589      totalRegionSize += store.getHFilesSize();
1590      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1591      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1592      CompactionProgress progress = store.getCompactionProgress();
1593      if (progress != null) {
1594        totalCompactingKVs += progress.getTotalCompactingKVs();
1595        currentCompactedKVs += progress.currentCompactedKVs;
1596      }
1597      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1598      totalStaticIndexSize += store.getTotalStaticIndexSize();
1599      totalStaticBloomSize += store.getTotalStaticBloomSize();
1600    }
1601
1602    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1603    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1604    int storefileSizeMB = roundSize(storefileSize, unitMB);
1605    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1606    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1607    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1608    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1609    int regionSizeMB = roundSize(totalRegionSize, unitMB);
1610    final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f);
1611    getBlockCache().ifPresent(bc -> {
1612      bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1613        if (regionCachedInfo.containsKey(regionEncodedName)) {
1614          currentRegionCachedRatio.setValue(regionSizeMB == 0
1615            ? 0.0f
1616            : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB);
1617        }
1618      });
1619    });
1620    final MutableFloat currentRegionColdDataRatio = new MutableFloat(0.0f);
1621    if (DataTieringManager.getInstance() != null) {
1622      DataTieringManager.getInstance().getRegionColdDataSize().computeIfPresent(regionEncodedName,
1623        (k, v) -> {
1624          int coldSizeMB = roundSize(v.getSecond(), unitMB);
1625          currentRegionColdDataRatio
1626            .setValue(regionSizeMB == 0 ? 0.0f : (float) coldSizeMB / regionSizeMB);
1627          return v;
1628        });
1629    }
1630
1631    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1632    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1633    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1634    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1635    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1636    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1637    if (regionLoadBldr == null) {
1638      regionLoadBldr = RegionLoad.newBuilder();
1639    }
1640    if (regionSpecifier == null) {
1641      regionSpecifier = RegionSpecifier.newBuilder();
1642    }
1643
1644    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1645    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1646    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1647      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1648      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1649      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1650      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1651      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1652      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1653      .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount())
1654      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1655      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1656      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1657      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1658      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1659      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1660      .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB)
1661      .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue())
1662      .setCurrentRegionColdDataRatio(currentRegionColdDataRatio.floatValue());
1663    r.setCompleteSequenceId(regionLoadBldr);
1664    return regionLoadBldr.build();
1665  }
1666
1667  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1668    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1669    userLoadBldr.setUserName(user);
1670    userSource.getClientMetrics().values().stream()
1671      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1672        .setHostName(clientMetrics.getHostName())
1673        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1674        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1675        .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1676      .forEach(userLoadBldr::addClientMetrics);
1677    return userLoadBldr.build();
1678  }
1679
1680  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1681    HRegion r = onlineRegions.get(encodedRegionName);
1682    return r != null ? createRegionLoad(r, null, null) : null;
1683  }
1684
1685  /**
1686   * Inner class that runs on a long period checking if regions need compaction.
1687   */
1688  private static class CompactionChecker extends ScheduledChore {
1689    private final HRegionServer instance;
1690    private final int majorCompactPriority;
1691    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1692    // Iteration is 1-based rather than 0-based so we don't check for compaction
1693    // immediately upon region server startup
1694    private long iteration = 1;
1695
1696    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1697      super("CompactionChecker", stopper, sleepTime);
1698      this.instance = h;
1699      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1700
1701      /*
1702       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1703       */
1704      this.majorCompactPriority = this.instance.conf
1705        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1706    }
1707
1708    @Override
1709    protected void chore() {
1710      for (HRegion hr : this.instance.onlineRegions.values()) {
1711        // If region is read only or compaction is disabled at table level, there's no need to
1712        // iterate through region's stores
1713        if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {
1714          continue;
1715        }
1716
1717        for (HStore s : hr.stores.values()) {
1718          try {
1719            long multiplier = s.getCompactionCheckMultiplier();
1720            assert multiplier > 0;
1721            if (iteration % multiplier != 0) {
1722              continue;
1723            }
1724            if (s.needsCompaction()) {
1725              // Queue a compaction. Will recognize if major is needed.
1726              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1727                getName() + " requests compaction");
1728            } else if (s.shouldPerformMajorCompaction()) {
1729              s.triggerMajorCompaction();
1730              if (
1731                majorCompactPriority == DEFAULT_PRIORITY
1732                  || majorCompactPriority > hr.getCompactPriority()
1733              ) {
1734                this.instance.compactSplitThread.requestCompaction(hr, s,
1735                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
1736                  CompactionLifeCycleTracker.DUMMY, null);
1737              } else {
1738                this.instance.compactSplitThread.requestCompaction(hr, s,
1739                  getName() + " requests major compaction; use configured priority",
1740                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1741              }
1742            }
1743          } catch (IOException e) {
1744            LOG.warn("Failed major compaction check on " + hr, e);
1745          }
1746        }
1747      }
1748      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1749    }
1750  }
1751
1752  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1753    private final HRegionServer server;
1754    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1755    private final static int MIN_DELAY_TIME = 0; // millisec
1756    private final long rangeOfDelayMs;
1757
1758    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1759      super("MemstoreFlusherChore", server, cacheFlushInterval);
1760      this.server = server;
1761
1762      final long configuredRangeOfDelay = server.getConfiguration()
1763        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1764      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1765    }
1766
1767    @Override
1768    protected void chore() {
1769      final StringBuilder whyFlush = new StringBuilder();
1770      for (HRegion r : this.server.onlineRegions.values()) {
1771        if (r == null) {
1772          continue;
1773        }
1774        if (r.shouldFlush(whyFlush)) {
1775          FlushRequester requester = server.getFlushRequester();
1776          if (requester != null) {
1777            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1778            // Throttle the flushes by putting a delay. If we don't throttle, and there
1779            // is a balanced write-load on the regions in a table, we might end up
1780            // overwhelming the filesystem with too many flushes at once.
1781            if (requester.requestDelayedFlush(r, delay)) {
1782              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
1783                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
1784            }
1785          }
1786        }
1787      }
1788    }
1789  }
1790
1791  /**
1792   * Report the status of the server. A server is online once all the startup is completed (setting
1793   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
1794   * useful in tests.
1795   * @return true if online, false if not.
1796   */
1797  public boolean isOnline() {
1798    return online.get();
1799  }
1800
1801  /**
1802   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1803   * be hooked up to WAL.
1804   */
1805  private void setupWALAndReplication() throws IOException {
1806    WALFactory factory = new WALFactory(conf, serverName, this);
1807    // TODO Replication make assumptions here based on the default filesystem impl
1808    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1809    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1810
1811    Path logDir = new Path(walRootDir, logName);
1812    LOG.debug("logDir={}", logDir);
1813    if (this.walFs.exists(logDir)) {
1814      throw new RegionServerRunningException(
1815        "Region server has already created directory at " + this.serverName.toString());
1816    }
1817    // Create wal directory here and we will never create it again in other places. This is
1818    // important to make sure that our fencing way takes effect. See HBASE-29797 for more details.
1819    if (!this.walFs.mkdirs(logDir)) {
1820      throw new IOException("Can not create wal directory " + logDir);
1821    }
1822    // Instantiate replication if replication enabled. Pass it the log directories.
1823    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1824
1825    WALActionsListener walEventListener = getWALEventTrackerListener(conf);
1826    if (walEventListener != null && factory.getWALProvider() != null) {
1827      factory.getWALProvider().addWALActionsListener(walEventListener);
1828    }
1829    this.walFactory = factory;
1830  }
1831
1832  private WALActionsListener getWALEventTrackerListener(Configuration conf) {
1833    if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
1834      WALEventTrackerListener listener =
1835        new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
1836      return listener;
1837    }
1838    return null;
1839  }
1840
1841  /**
1842   * Start up replication source and sink handlers.
1843   */
1844  private void startReplicationService() throws IOException {
1845    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1846      this.replicationSourceHandler.startReplicationService();
1847    } else {
1848      if (this.replicationSourceHandler != null) {
1849        this.replicationSourceHandler.startReplicationService();
1850      }
1851      if (this.replicationSinkHandler != null) {
1852        this.replicationSinkHandler.startReplicationService();
1853      }
1854    }
1855  }
1856
1857  /** Returns Master address tracker instance. */
1858  public MasterAddressTracker getMasterAddressTracker() {
1859    return this.masterAddressTracker;
1860  }
1861
1862  /**
1863   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
1864   * to run. This is called after we've successfully registered with the Master. Install an
1865   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
1866   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
1867   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
1868   * to run should trigger same critical condition and the shutdown will run. On its way out, this
1869   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
1870   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
1871   * by this hosting server. Worker logs the exception and exits.
1872   */
1873  private void startServices() throws IOException {
1874    if (!isStopped() && !isAborted()) {
1875      initializeThreads();
1876    }
1877    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1878    this.secureBulkLoadManager.start();
1879
1880    // Health checker thread.
1881    if (isHealthCheckerConfigured()) {
1882      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1883        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1884      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1885    }
1886    // Executor status collect thread.
1887    if (
1888      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1889        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
1890    ) {
1891      int sleepTime =
1892        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1893      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1894        this.metricsRegionServer.getMetricsSource());
1895    }
1896
1897    this.walRoller = new LogRoller(this);
1898    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1899    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1900
1901    // Create the CompactedFileDischarger chore executorService. This chore helps to
1902    // remove the compacted files that will no longer be used in reads.
1903    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1904    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1905    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1906    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
1907    choreService.scheduleChore(compactedFileDischarger);
1908
1909    // Start executor services
1910    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1911    executorService.startExecutorService(executorService.new ExecutorConfig()
1912      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1913    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1914    executorService.startExecutorService(executorService.new ExecutorConfig()
1915      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1916    final int openPriorityRegionThreads =
1917      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1918    executorService.startExecutorService(
1919      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
1920        .setCorePoolSize(openPriorityRegionThreads));
1921    final int closeRegionThreads =
1922      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1923    executorService.startExecutorService(executorService.new ExecutorConfig()
1924      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1925    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1926    executorService.startExecutorService(executorService.new ExecutorConfig()
1927      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1928    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1929      final int storeScannerParallelSeekThreads =
1930        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1931      executorService.startExecutorService(
1932        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
1933          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
1934    }
1935    final int logReplayOpsThreads =
1936      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1937    executorService.startExecutorService(
1938      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
1939        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
1940    // Start the threads for compacted files discharger
1941    final int compactionDischargerThreads =
1942      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1943    executorService.startExecutorService(executorService.new ExecutorConfig()
1944      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
1945      .setCorePoolSize(compactionDischargerThreads));
1946    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1947      final int regionReplicaFlushThreads =
1948        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1949          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1950      executorService.startExecutorService(executorService.new ExecutorConfig()
1951        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
1952        .setCorePoolSize(regionReplicaFlushThreads));
1953    }
1954    final int refreshPeerThreads =
1955      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1956    executorService.startExecutorService(executorService.new ExecutorConfig()
1957      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1958    final int replaySyncReplicationWALThreads =
1959      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1960    executorService.startExecutorService(executorService.new ExecutorConfig()
1961      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
1962      .setCorePoolSize(replaySyncReplicationWALThreads));
1963    final int switchRpcThrottleThreads =
1964      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1965    executorService.startExecutorService(
1966      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
1967        .setCorePoolSize(switchRpcThrottleThreads));
1968    final int claimReplicationQueueThreads =
1969      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1970    executorService.startExecutorService(
1971      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
1972        .setCorePoolSize(claimReplicationQueueThreads));
1973    final int rsSnapshotOperationThreads =
1974      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1975    executorService.startExecutorService(
1976      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
1977        .setCorePoolSize(rsSnapshotOperationThreads));
1978    final int rsFlushOperationThreads =
1979      conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3);
1980    executorService.startExecutorService(executorService.new ExecutorConfig()
1981      .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads));
1982    final int rsRefreshQuotasThreads =
1983      conf.getInt("hbase.regionserver.executor.refresh.quotas.threads", 1);
1984    executorService.startExecutorService(
1985      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS)
1986        .setCorePoolSize(rsRefreshQuotasThreads));
1987    final int logRollThreads = conf.getInt("hbase.regionserver.executor.log.roll.threads", 1);
1988    executorService.startExecutorService(executorService.new ExecutorConfig()
1989      .setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads));
1990    final int rsRefreshHFilesThreads =
1991      conf.getInt("hbase.regionserver.executor.refresh.hfiles.threads", 3);
1992    executorService.startExecutorService(executorService.new ExecutorConfig()
1993      .setExecutorType(ExecutorType.RS_REFRESH_HFILES).setCorePoolSize(rsRefreshHFilesThreads));
1994
1995    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1996      uncaughtExceptionHandler);
1997    if (this.cacheFlusher != null) {
1998      this.cacheFlusher.start(uncaughtExceptionHandler);
1999    }
2000    Threads.setDaemonThreadRunning(this.procedureResultReporter,
2001      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
2002
2003    if (this.compactionChecker != null) {
2004      choreService.scheduleChore(compactionChecker);
2005    }
2006    if (this.periodicFlusher != null) {
2007      choreService.scheduleChore(periodicFlusher);
2008    }
2009    if (this.healthCheckChore != null) {
2010      choreService.scheduleChore(healthCheckChore);
2011    }
2012    if (this.executorStatusChore != null) {
2013      choreService.scheduleChore(executorStatusChore);
2014    }
2015    if (this.nonceManagerChore != null) {
2016      choreService.scheduleChore(nonceManagerChore);
2017    }
2018    if (this.storefileRefresher != null) {
2019      choreService.scheduleChore(storefileRefresher);
2020    }
2021    if (this.fsUtilizationChore != null) {
2022      choreService.scheduleChore(fsUtilizationChore);
2023    }
2024    if (this.namedQueueServiceChore != null) {
2025      choreService.scheduleChore(namedQueueServiceChore);
2026    }
2027    if (this.brokenStoreFileCleaner != null) {
2028      choreService.scheduleChore(brokenStoreFileCleaner);
2029    }
2030    if (this.rsMobFileCleanerChore != null) {
2031      choreService.scheduleChore(rsMobFileCleanerChore);
2032    }
2033    if (replicationMarkerChore != null) {
2034      LOG.info("Starting replication marker chore");
2035      choreService.scheduleChore(replicationMarkerChore);
2036    }
2037
2038    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2039    // an unhandled exception, it will just exit.
2040    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2041      uncaughtExceptionHandler);
2042
2043    // Create the log splitting worker and start it
2044    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2045    // quite a while inside Connection layer. The worker won't be available for other
2046    // tasks even after current task is preempted after a split task times out.
2047    Configuration sinkConf = HBaseConfiguration.create(conf);
2048    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2049      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2050    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2051      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2052    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2053    if (
2054      this.csm != null
2055        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
2056    ) {
2057      // SplitLogWorker needs csm. If none, don't start this.
2058      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
2059      splitLogWorker.start();
2060      LOG.debug("SplitLogWorker started");
2061    }
2062
2063    // Memstore services.
2064    startHeapMemoryManager();
2065    // Call it after starting HeapMemoryManager.
2066    initializeMemStoreChunkCreator(hMemManager);
2067  }
2068
2069  private void initializeThreads() {
2070    // Cache flushing thread.
2071    this.cacheFlusher = new MemStoreFlusher(conf, this);
2072
2073    // Compaction thread
2074    this.compactSplitThread = new CompactSplit(this);
2075
2076    // Prefetch Notifier
2077    this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
2078
2079    // Background thread to check for compactions; needed if region has not gotten updates
2080    // in a while. It will take care of not checking too frequently on store-by-store basis.
2081    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2082    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2083    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2084
2085    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2086      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2087    final boolean walEventTrackerEnabled =
2088      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
2089
2090    if (isSlowLogTableEnabled || walEventTrackerEnabled) {
2091      // default chore duration: 10 min
2092      // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
2093      final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
2094        DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
2095
2096      final int namedQueueChoreDuration =
2097        conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
2098      // Considering min of slowLogChoreDuration and namedQueueChoreDuration
2099      int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
2100
2101      namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
2102        this.namedQueueRecorder, this.getConnection());
2103    }
2104
2105    if (this.nonceManager != null) {
2106      // Create the scheduled chore that cleans up nonces.
2107      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2108    }
2109
2110    // Setup the Quota Manager
2111    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2112    configurationManager.registerObserver(rsQuotaManager);
2113    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2114
2115    if (QuotaUtil.isQuotaEnabled(conf)) {
2116      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2117    }
2118
2119    boolean onlyMetaRefresh = false;
2120    int storefileRefreshPeriod =
2121      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2122        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2123    if (storefileRefreshPeriod == 0) {
2124      storefileRefreshPeriod =
2125        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2126          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2127      onlyMetaRefresh = true;
2128    }
2129    if (storefileRefreshPeriod > 0) {
2130      this.storefileRefresher =
2131        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
2132    }
2133
2134    int brokenStoreFileCleanerPeriod =
2135      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2136        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2137    int brokenStoreFileCleanerDelay =
2138      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2139        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2140    double brokenStoreFileCleanerDelayJitter =
2141      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2142        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2143    double jitterRate =
2144      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2145    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2146    this.brokenStoreFileCleaner =
2147      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2148        brokenStoreFileCleanerPeriod, this, conf, this);
2149
2150    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
2151
2152    registerConfigurationObservers();
2153    initializeReplicationMarkerChore();
2154  }
2155
2156  private void registerConfigurationObservers() {
2157    // Register Replication if possible, as now we support recreating replication peer storage, for
2158    // migrating across different replication peer storages online
2159    if (replicationSourceHandler instanceof ConfigurationObserver) {
2160      configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
2161    }
2162    if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
2163      configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
2164    }
2165    // Registering the compactSplitThread object with the ConfigurationManager.
2166    configurationManager.registerObserver(this.compactSplitThread);
2167    configurationManager.registerObserver(this.cacheFlusher);
2168    configurationManager.registerObserver(this.rpcServices);
2169    configurationManager.registerObserver(this.prefetchExecutorNotifier);
2170    configurationManager.registerObserver(this);
2171  }
2172
2173  /*
2174   * Verify that server is healthy
2175   */
2176  private boolean isHealthy() {
2177    if (!dataFsOk) {
2178      // File system problem
2179      return false;
2180    }
2181    // Verify that all threads are alive
2182    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2183      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2184      && (this.walRoller == null || this.walRoller.isAlive())
2185      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2186      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2187    if (!healthy) {
2188      stop("One or more threads are no longer alive -- stop");
2189    }
2190    return healthy;
2191  }
2192
2193  @Override
2194  public List<WAL> getWALs() {
2195    return walFactory.getWALs();
2196  }
2197
2198  @Override
2199  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2200    WAL wal = walFactory.getWAL(regionInfo);
2201    if (this.walRoller != null) {
2202      this.walRoller.addWAL(wal);
2203    }
2204    return wal;
2205  }
2206
2207  public LogRoller getWalRoller() {
2208    return walRoller;
2209  }
2210
2211  public WALFactory getWalFactory() {
2212    return walFactory;
2213  }
2214
2215  @Override
2216  public void stop(final String msg) {
2217    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2218  }
2219
2220  /**
2221   * Stops the regionserver.
2222   * @param msg   Status message
2223   * @param force True if this is a regionserver abort
2224   * @param user  The user executing the stop request, or null if no user is associated
2225   */
2226  public void stop(final String msg, final boolean force, final User user) {
2227    if (!this.stopped) {
2228      LOG.info("***** STOPPING region server '{}' *****", this);
2229      if (this.rsHost != null) {
2230        // when forced via abort don't allow CPs to override
2231        try {
2232          this.rsHost.preStop(msg, user);
2233        } catch (IOException ioe) {
2234          if (!force) {
2235            LOG.warn("The region server did not stop", ioe);
2236            return;
2237          }
2238          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2239        }
2240      }
2241      this.stopped = true;
2242      LOG.info("STOPPED: " + msg);
2243      // Wakes run() if it is sleeping
2244      sleeper.skipSleepCycle();
2245    }
2246  }
2247
2248  public void waitForServerOnline() {
2249    while (!isStopped() && !isOnline()) {
2250      synchronized (online) {
2251        try {
2252          online.wait(msgInterval);
2253        } catch (InterruptedException ie) {
2254          Thread.currentThread().interrupt();
2255          break;
2256        }
2257      }
2258    }
2259  }
2260
2261  @Override
2262  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2263    HRegion r = context.getRegion();
2264    long openProcId = context.getOpenProcId();
2265    long masterSystemTime = context.getMasterSystemTime();
2266    long initiatingMasterActiveTime = context.getInitiatingMasterActiveTime();
2267    rpcServices.checkOpen();
2268    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2269      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2270    // Do checks to see if we need to compact (references or too many files)
2271    // Skip compaction check if region is read only
2272    if (!r.isReadOnly()) {
2273      for (HStore s : r.stores.values()) {
2274        if (s.hasReferences() || s.needsCompaction()) {
2275          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2276        }
2277      }
2278    }
2279    long openSeqNum = r.getOpenSeqNum();
2280    if (openSeqNum == HConstants.NO_SEQNUM) {
2281      // If we opened a region, we should have read some sequence number from it.
2282      LOG.error(
2283        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2284      openSeqNum = 0;
2285    }
2286
2287    // Notify master
2288    if (
2289      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2290        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo(), initiatingMasterActiveTime))
2291    ) {
2292      throw new IOException(
2293        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2294    }
2295
2296    triggerFlushInPrimaryRegion(r);
2297
2298    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2299  }
2300
2301  /**
2302   * Helper method for use in tests. Skip the region transition report when there's no master around
2303   * to receive it.
2304   */
2305  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2306    final TransitionCode code = context.getCode();
2307    final long openSeqNum = context.getOpenSeqNum();
2308    long masterSystemTime = context.getMasterSystemTime();
2309    final RegionInfo[] hris = context.getHris();
2310
2311    if (code == TransitionCode.OPENED) {
2312      Preconditions.checkArgument(hris != null && hris.length == 1);
2313      if (hris[0].isMetaRegion()) {
2314        LOG.warn(
2315          "meta table location is stored in master local store, so we can not skip reporting");
2316        return false;
2317      } else {
2318        try {
2319          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2320            serverName, openSeqNum, masterSystemTime);
2321        } catch (IOException e) {
2322          LOG.info("Failed to update meta", e);
2323          return false;
2324        }
2325      }
2326    }
2327    return true;
2328  }
2329
2330  private ReportRegionStateTransitionRequest
2331    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2332    final TransitionCode code = context.getCode();
2333    final long openSeqNum = context.getOpenSeqNum();
2334    final RegionInfo[] hris = context.getHris();
2335    final long[] procIds = context.getProcIds();
2336
2337    ReportRegionStateTransitionRequest.Builder builder =
2338      ReportRegionStateTransitionRequest.newBuilder();
2339    builder.setServer(ProtobufUtil.toServerName(serverName));
2340    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2341    transition.setTransitionCode(code);
2342    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2343      transition.setOpenSeqNum(openSeqNum);
2344    }
2345    for (RegionInfo hri : hris) {
2346      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2347    }
2348    for (long procId : procIds) {
2349      transition.addProcId(procId);
2350    }
2351    transition.setInitiatingMasterActiveTime(context.getInitiatingMasterActiveTime());
2352
2353    return builder.build();
2354  }
2355
2356  @Override
2357  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2358    if (TEST_SKIP_REPORTING_TRANSITION) {
2359      return skipReportingTransition(context);
2360    }
2361    final ReportRegionStateTransitionRequest request =
2362      createReportRegionStateTransitionRequest(context);
2363
2364    int tries = 0;
2365    long pauseTime = this.retryPauseTime;
2366    // Keep looping till we get an error. We want to send reports even though server is going down.
2367    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2368    // HRegionServer does down.
2369    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2370      RegionServerStatusService.BlockingInterface rss = rssStub;
2371      try {
2372        if (rss == null) {
2373          createRegionServerStatusStub();
2374          continue;
2375        }
2376        ReportRegionStateTransitionResponse response =
2377          rss.reportRegionStateTransition(null, request);
2378        if (response.hasErrorMessage()) {
2379          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2380          break;
2381        }
2382        // Log if we had to retry else don't log unless TRACE. We want to
2383        // know if were successful after an attempt showed in logs as failed.
2384        if (tries > 0 || LOG.isTraceEnabled()) {
2385          LOG.info("TRANSITION REPORTED " + request);
2386        }
2387        // NOTE: Return mid-method!!!
2388        return true;
2389      } catch (ServiceException se) {
2390        IOException ioe = ProtobufUtil.getRemoteException(se);
2391        boolean pause = ioe instanceof ServerNotRunningYetException
2392          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2393        if (pause) {
2394          // Do backoff else we flood the Master with requests.
2395          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2396        } else {
2397          pauseTime = this.retryPauseTime; // Reset.
2398        }
2399        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2400          + tries + ")"
2401          + (pause
2402            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2403            : " immediately."),
2404          ioe);
2405        if (pause) {
2406          Threads.sleep(pauseTime);
2407        }
2408        tries++;
2409        if (rssStub == rss) {
2410          rssStub = null;
2411        }
2412      }
2413    }
2414    return false;
2415  }
2416
2417  /**
2418   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2419   * block this thread. See RegionReplicaFlushHandler for details.
2420   */
2421  private void triggerFlushInPrimaryRegion(final HRegion region) {
2422    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2423      return;
2424    }
2425    TableName tn = region.getTableDescriptor().getTableName();
2426    if (
2427      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2428        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2429        // If the memstore replication not setup, we do not have to wait for observing a flush event
2430        // from primary before starting to serve reads, because gaps from replication is not
2431        // applicable,this logic is from
2432        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2433        // HBASE-13063
2434        !region.getTableDescriptor().hasRegionMemStoreReplication()
2435    ) {
2436      region.setReadsEnabled(true);
2437      return;
2438    }
2439
2440    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2441    // RegionReplicaFlushHandler might reset this.
2442
2443    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2444    if (this.executorService != null) {
2445      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2446    } else {
2447      LOG.info("Executor is null; not running flush of primary region replica for {}",
2448        region.getRegionInfo());
2449    }
2450  }
2451
2452  @InterfaceAudience.Private
2453  public RSRpcServices getRSRpcServices() {
2454    return rpcServices;
2455  }
2456
2457  /**
2458   * Cause the server to exit without closing the regions it is serving, the log it is using and
2459   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2460   * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused
2461   * the abort, or null
2462   */
2463  @Override
2464  public void abort(String reason, Throwable cause) {
2465    if (!setAbortRequested()) {
2466      // Abort already in progress, ignore the new request.
2467      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2468      return;
2469    }
2470    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2471    if (cause != null) {
2472      LOG.error(HBaseMarkers.FATAL, msg, cause);
2473    } else {
2474      LOG.error(HBaseMarkers.FATAL, msg);
2475    }
2476    // HBASE-4014: show list of coprocessors that were loaded to help debug
2477    // regionserver crashes.Note that we're implicitly using
2478    // java.util.HashSet's toString() method to print the coprocessor names.
2479    LOG.error(HBaseMarkers.FATAL,
2480      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2481    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2482    try {
2483      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2484    } catch (MalformedObjectNameException | IOException e) {
2485      LOG.warn("Failed dumping metrics", e);
2486    }
2487
2488    // Do our best to report our abort to the master, but this may not work
2489    try {
2490      if (cause != null) {
2491        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2492      }
2493      // Report to the master but only if we have already registered with the master.
2494      RegionServerStatusService.BlockingInterface rss = rssStub;
2495      if (rss != null && this.serverName != null) {
2496        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2497        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2498        builder.setErrorMessage(msg);
2499        rss.reportRSFatalError(null, builder.build());
2500      }
2501    } catch (Throwable t) {
2502      LOG.warn("Unable to report fatal error to master", t);
2503    }
2504
2505    scheduleAbortTimer();
2506    // shutdown should be run as the internal user
2507    stop(reason, true, null);
2508  }
2509
2510  /*
2511   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2512   * close socket in case want to bring up server on old hostname+port immediately.
2513   */
2514  @InterfaceAudience.Private
2515  protected void kill() {
2516    this.killed = true;
2517    abort("Simulated kill");
2518  }
2519
2520  // Limits the time spent in the shutdown process.
2521  private void scheduleAbortTimer() {
2522    if (this.abortMonitor == null) {
2523      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2524      TimerTask abortTimeoutTask = null;
2525      try {
2526        Constructor<? extends TimerTask> timerTaskCtor =
2527          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2528            .asSubclass(TimerTask.class).getDeclaredConstructor();
2529        timerTaskCtor.setAccessible(true);
2530        abortTimeoutTask = timerTaskCtor.newInstance();
2531      } catch (Exception e) {
2532        LOG.warn("Initialize abort timeout task failed", e);
2533      }
2534      if (abortTimeoutTask != null) {
2535        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2536      }
2537    }
2538  }
2539
2540  /**
2541   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2542   * called.
2543   */
2544  protected void stopServiceThreads() {
2545    // clean up the scheduled chores
2546    stopChoreService();
2547    if (bootstrapNodeManager != null) {
2548      bootstrapNodeManager.stop();
2549    }
2550    if (this.cacheFlusher != null) {
2551      this.cacheFlusher.shutdown();
2552    }
2553    if (this.walRoller != null) {
2554      this.walRoller.close();
2555    }
2556    if (this.compactSplitThread != null) {
2557      this.compactSplitThread.join();
2558    }
2559    stopExecutorService();
2560    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2561      this.replicationSourceHandler.stopReplicationService();
2562    } else {
2563      if (this.replicationSourceHandler != null) {
2564        this.replicationSourceHandler.stopReplicationService();
2565      }
2566      if (this.replicationSinkHandler != null) {
2567        this.replicationSinkHandler.stopReplicationService();
2568      }
2569    }
2570  }
2571
2572  /** Returns Return the object that implements the replication source executorService. */
2573  @Override
2574  public ReplicationSourceService getReplicationSourceService() {
2575    return replicationSourceHandler;
2576  }
2577
2578  /** Returns Return the object that implements the replication sink executorService. */
2579  public ReplicationSinkService getReplicationSinkService() {
2580    return replicationSinkHandler;
2581  }
2582
2583  /**
2584   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2585   * connection, the current rssStub must be null. Method will block until a master is available.
2586   * You can break from this block by requesting the server stop.
2587   * @return master + port, or null if server has been stopped
2588   */
2589  private synchronized ServerName createRegionServerStatusStub() {
2590    // Create RS stub without refreshing the master node from ZK, use cached data
2591    return createRegionServerStatusStub(false);
2592  }
2593
2594  /**
2595   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2596   * connection, the current rssStub must be null. Method will block until a master is available.
2597   * You can break from this block by requesting the server stop.
2598   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2599   * @return master + port, or null if server has been stopped
2600   */
2601  @InterfaceAudience.Private
2602  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2603    if (rssStub != null) {
2604      return masterAddressTracker.getMasterAddress();
2605    }
2606    ServerName sn = null;
2607    long previousLogTime = 0;
2608    RegionServerStatusService.BlockingInterface intRssStub = null;
2609    LockService.BlockingInterface intLockStub = null;
2610    boolean interrupted = false;
2611    try {
2612      while (keepLooping()) {
2613        sn = this.masterAddressTracker.getMasterAddress(refresh);
2614        if (sn == null) {
2615          if (!keepLooping()) {
2616            // give up with no connection.
2617            LOG.debug("No master found and cluster is stopped; bailing out");
2618            return null;
2619          }
2620          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2621            LOG.debug("No master found; retry");
2622            previousLogTime = EnvironmentEdgeManager.currentTime();
2623          }
2624          refresh = true; // let's try pull it from ZK directly
2625          if (sleepInterrupted(200)) {
2626            interrupted = true;
2627          }
2628          continue;
2629        }
2630        try {
2631          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
2632            userProvider.getCurrent(), shortOperationTimeout);
2633          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2634          intLockStub = LockService.newBlockingStub(channel);
2635          break;
2636        } catch (IOException e) {
2637          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2638            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
2639            if (e instanceof ServerNotRunningYetException) {
2640              LOG.info("Master isn't available yet, retrying");
2641            } else {
2642              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2643            }
2644            previousLogTime = EnvironmentEdgeManager.currentTime();
2645          }
2646          if (sleepInterrupted(200)) {
2647            interrupted = true;
2648          }
2649        }
2650      }
2651    } finally {
2652      if (interrupted) {
2653        Thread.currentThread().interrupt();
2654      }
2655    }
2656    this.rssStub = intRssStub;
2657    this.lockStub = intLockStub;
2658    return sn;
2659  }
2660
2661  /**
2662   * @return True if we should break loop because cluster is going down or this server has been
2663   *         stopped or hdfs has gone bad.
2664   */
2665  private boolean keepLooping() {
2666    return !this.stopped && isClusterUp();
2667  }
2668
2669  /*
2670   * Let the master know we're here Run initialization using parameters passed us by the master.
2671   * @return A Map of key/value configurations we got from the Master else null if we failed to
2672   * register.
2673   */
2674  private RegionServerStartupResponse reportForDuty() throws IOException {
2675    if (this.masterless) {
2676      return RegionServerStartupResponse.getDefaultInstance();
2677    }
2678    ServerName masterServerName = createRegionServerStatusStub(true);
2679    RegionServerStatusService.BlockingInterface rss = rssStub;
2680    if (masterServerName == null || rss == null) {
2681      return null;
2682    }
2683    RegionServerStartupResponse result = null;
2684    try {
2685      rpcServices.requestCount.reset();
2686      rpcServices.rpcGetRequestCount.reset();
2687      rpcServices.rpcScanRequestCount.reset();
2688      rpcServices.rpcFullScanRequestCount.reset();
2689      rpcServices.rpcMultiRequestCount.reset();
2690      rpcServices.rpcMutateRequestCount.reset();
2691      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2692        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2693      long now = EnvironmentEdgeManager.currentTime();
2694      int port = rpcServices.getSocketAddress().getPort();
2695      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2696      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2697        request.setUseThisHostnameInstead(useThisHostnameInstead);
2698      }
2699      request.setPort(port);
2700      request.setServerStartCode(this.startcode);
2701      request.setServerCurrentTime(now);
2702      result = rss.regionServerStartup(null, request.build());
2703    } catch (ServiceException se) {
2704      IOException ioe = ProtobufUtil.getRemoteException(se);
2705      if (ioe instanceof ClockOutOfSyncException) {
2706        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
2707        // Re-throw IOE will cause RS to abort
2708        throw ioe;
2709      } else if (ioe instanceof DecommissionedHostRejectedException) {
2710        LOG.error(HBaseMarkers.FATAL,
2711          "Master rejected startup because the host is considered decommissioned", ioe);
2712        // Re-throw IOE will cause RS to abort
2713        throw ioe;
2714      } else if (ioe instanceof ServerNotRunningYetException) {
2715        LOG.debug("Master is not running yet");
2716      } else {
2717        LOG.warn("error telling master we are up", se);
2718      }
2719      rssStub = null;
2720    }
2721    return result;
2722  }
2723
2724  @Override
2725  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2726    try {
2727      GetLastFlushedSequenceIdRequest req =
2728        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2729      RegionServerStatusService.BlockingInterface rss = rssStub;
2730      if (rss == null) { // Try to connect one more time
2731        createRegionServerStatusStub();
2732        rss = rssStub;
2733        if (rss == null) {
2734          // Still no luck, we tried
2735          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2736          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2737            .build();
2738        }
2739      }
2740      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2741      return RegionStoreSequenceIds.newBuilder()
2742        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2743        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2744    } catch (ServiceException e) {
2745      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2746      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2747        .build();
2748    }
2749  }
2750
2751  /**
2752   * Close meta region if we carry it
2753   * @param abort Whether we're running an abort.
2754   */
2755  private void closeMetaTableRegions(final boolean abort) {
2756    HRegion meta = null;
2757    this.onlineRegionsLock.writeLock().lock();
2758    try {
2759      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
2760        RegionInfo hri = e.getValue().getRegionInfo();
2761        if (hri.isMetaRegion()) {
2762          meta = e.getValue();
2763        }
2764        if (meta != null) {
2765          break;
2766        }
2767      }
2768    } finally {
2769      this.onlineRegionsLock.writeLock().unlock();
2770    }
2771    if (meta != null) {
2772      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2773    }
2774  }
2775
2776  /**
2777   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
2778   * close regions that are already closed or that are closing.
2779   * @param abort Whether we're running an abort.
2780   */
2781  private void closeUserRegions(final boolean abort) {
2782    this.onlineRegionsLock.writeLock().lock();
2783    try {
2784      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
2785        HRegion r = e.getValue();
2786        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2787          // Don't update zk with this close transition; pass false.
2788          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2789        }
2790      }
2791    } finally {
2792      this.onlineRegionsLock.writeLock().unlock();
2793    }
2794  }
2795
2796  protected Map<String, HRegion> getOnlineRegions() {
2797    return this.onlineRegions;
2798  }
2799
2800  public int getNumberOfOnlineRegions() {
2801    return this.onlineRegions.size();
2802  }
2803
2804  /**
2805   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
2806   * as client; HRegion cannot be serialized to cross an rpc.
2807   */
2808  public Collection<HRegion> getOnlineRegionsLocalContext() {
2809    Collection<HRegion> regions = this.onlineRegions.values();
2810    return Collections.unmodifiableCollection(regions);
2811  }
2812
2813  @Override
2814  public void addRegion(HRegion region) {
2815    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2816    configurationManager.registerObserver(region);
2817  }
2818
2819  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2820    long size) {
2821    if (!sortedRegions.containsKey(size)) {
2822      sortedRegions.put(size, new ArrayList<>());
2823    }
2824    sortedRegions.get(size).add(region);
2825  }
2826
2827  /**
2828   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2829   *         the biggest.
2830   */
2831  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2832    // we'll sort the regions in reverse
2833    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2834    // Copy over all regions. Regions are sorted by size with biggest first.
2835    for (HRegion region : this.onlineRegions.values()) {
2836      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2837    }
2838    return sortedRegions;
2839  }
2840
2841  /**
2842   * @return A new Map of online regions sorted by region heap size with the first entry being the
2843   *         biggest.
2844   */
2845  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2846    // we'll sort the regions in reverse
2847    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2848    // Copy over all regions. Regions are sorted by size with biggest first.
2849    for (HRegion region : this.onlineRegions.values()) {
2850      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2851    }
2852    return sortedRegions;
2853  }
2854
2855  /** Returns reference to FlushRequester */
2856  @Override
2857  public FlushRequester getFlushRequester() {
2858    return this.cacheFlusher;
2859  }
2860
2861  @Override
2862  public CompactionRequester getCompactionRequestor() {
2863    return this.compactSplitThread;
2864  }
2865
2866  @Override
2867  public LeaseManager getLeaseManager() {
2868    return leaseManager;
2869  }
2870
2871  /** Returns {@code true} when the data file system is available, {@code false} otherwise. */
2872  boolean isDataFileSystemOk() {
2873    return this.dataFsOk;
2874  }
2875
2876  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
2877    return this.rsHost;
2878  }
2879
2880  @Override
2881  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2882    return this.regionsInTransitionInRS;
2883  }
2884
2885  @Override
2886  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2887    return rsQuotaManager;
2888  }
2889
2890  //
2891  // Main program and support routines
2892  //
2893  /**
2894   * Load the replication executorService objects, if any
2895   */
2896  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2897    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2898    // read in the name of the source replication class from the config file.
2899    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2900      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2901
2902    // read in the name of the sink replication class from the config file.
2903    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2904      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2905
2906    // If both the sink and the source class names are the same, then instantiate
2907    // only one object.
2908    if (sourceClassname.equals(sinkClassname)) {
2909      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2910        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2911      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2912      server.sameReplicationSourceAndSink = true;
2913    } else {
2914      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2915        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2916      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2917        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2918      server.sameReplicationSourceAndSink = false;
2919    }
2920  }
2921
2922  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2923    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2924    Path oldLogDir, WALFactory walFactory) throws IOException {
2925    final Class<? extends T> clazz;
2926    try {
2927      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2928      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2929    } catch (java.lang.ClassNotFoundException nfe) {
2930      throw new IOException("Could not find class for " + classname);
2931    }
2932    T service = ReflectionUtils.newInstance(clazz, conf);
2933    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2934    return service;
2935  }
2936
2937  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
2938    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2939    if (!this.isOnline()) {
2940      return walGroupsReplicationStatus;
2941    }
2942    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2943    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2944    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2945    for (ReplicationSourceInterface source : allSources) {
2946      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2947    }
2948    return walGroupsReplicationStatus;
2949  }
2950
2951  /**
2952   * Utility for constructing an instance of the passed HRegionServer class.
2953   */
2954  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
2955    final Configuration conf) {
2956    try {
2957      Constructor<? extends HRegionServer> c =
2958        regionServerClass.getConstructor(Configuration.class);
2959      return c.newInstance(conf);
2960    } catch (Exception e) {
2961      throw new RuntimeException(
2962        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
2963    }
2964  }
2965
2966  /**
2967   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2968   */
2969  public static void main(String[] args) {
2970    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2971    VersionInfo.logVersion();
2972    Configuration conf = HBaseConfiguration.create();
2973    @SuppressWarnings("unchecked")
2974    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2975      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2976
2977    new HRegionServerCommandLine(regionServerClass).doMain(args);
2978  }
2979
2980  /**
2981   * Gets the online regions of the specified table. This method looks at the in-memory
2982   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
2983   * If a region on this table has been closed during a disable, etc., it will not be included in
2984   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
2985   * all the ONLINE regions in the table.
2986   * @param tableName table to limit the scope of the query
2987   * @return Online regions from <code>tableName</code>
2988   */
2989  @Override
2990  public List<HRegion> getRegions(TableName tableName) {
2991    List<HRegion> tableRegions = new ArrayList<>();
2992    synchronized (this.onlineRegions) {
2993      for (HRegion region : this.onlineRegions.values()) {
2994        RegionInfo regionInfo = region.getRegionInfo();
2995        if (regionInfo.getTable().equals(tableName)) {
2996          tableRegions.add(region);
2997        }
2998      }
2999    }
3000    return tableRegions;
3001  }
3002
3003  @Override
3004  public List<HRegion> getRegions() {
3005    List<HRegion> allRegions;
3006    synchronized (this.onlineRegions) {
3007      // Return a clone copy of the onlineRegions
3008      allRegions = new ArrayList<>(onlineRegions.values());
3009    }
3010    return allRegions;
3011  }
3012
3013  /**
3014   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
3015   * @return all the online tables in this RS
3016   */
3017  public Set<TableName> getOnlineTables() {
3018    Set<TableName> tables = new HashSet<>();
3019    synchronized (this.onlineRegions) {
3020      for (Region region : this.onlineRegions.values()) {
3021        tables.add(region.getTableDescriptor().getTableName());
3022      }
3023    }
3024    return tables;
3025  }
3026
3027  public String[] getRegionServerCoprocessors() {
3028    TreeSet<String> coprocessors = new TreeSet<>();
3029    try {
3030      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3031    } catch (IOException exception) {
3032      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
3033        + "skipping.");
3034      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3035    }
3036    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3037    for (HRegion region : regions) {
3038      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3039      try {
3040        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3041      } catch (IOException exception) {
3042        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
3043          + "; skipping.");
3044        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3045      }
3046    }
3047    coprocessors.addAll(rsHost.getCoprocessors());
3048    return coprocessors.toArray(new String[0]);
3049  }
3050
3051  /**
3052   * Try to close the region, logs a warning on failure but continues.
3053   * @param region Region to close
3054   */
3055  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3056    try {
3057      if (!closeRegion(region.getEncodedName(), abort, null)) {
3058        LOG
3059          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
3060      }
3061    } catch (IOException e) {
3062      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
3063        e);
3064    }
3065  }
3066
3067  /**
3068   * Close asynchronously a region, can be called from the master or internally by the regionserver
3069   * when stopping. If called from the master, the region will update the status.
3070   * <p>
3071   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3072   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3073   * </p>
3074   * <p>
3075   * If a close was in progress, this new request will be ignored, and an exception thrown.
3076   * </p>
3077   * <p>
3078   * Provides additional flag to indicate if this region blocks should be evicted from the cache.
3079   * </p>
3080   * @param encodedName Region to close
3081   * @param abort       True if we are aborting
3082   * @param destination Where the Region is being moved too... maybe null if unknown.
3083   * @return True if closed a region.
3084   * @throws NotServingRegionException if the region is not online
3085   */
3086  protected boolean closeRegion(String encodedName, final boolean abort,
3087    final ServerName destination) throws NotServingRegionException {
3088    // Check for permissions to close.
3089    HRegion actualRegion = this.getRegion(encodedName);
3090    // Can be null if we're calling close on a region that's not online
3091    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3092      try {
3093        actualRegion.getCoprocessorHost().preClose(false);
3094      } catch (IOException exp) {
3095        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3096        return false;
3097      }
3098    }
3099
3100    // previous can come back 'null' if not in map.
3101    final Boolean previous =
3102      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
3103
3104    if (Boolean.TRUE.equals(previous)) {
3105      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
3106        + "trying to OPEN. Cancelling OPENING.");
3107      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3108        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3109        // We're going to try to do a standard close then.
3110        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
3111          + " Doing a standard close now");
3112        return closeRegion(encodedName, abort, destination);
3113      }
3114      // Let's get the region from the online region list again
3115      actualRegion = this.getRegion(encodedName);
3116      if (actualRegion == null) { // If already online, we still need to close it.
3117        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3118        // The master deletes the znode when it receives this exception.
3119        throw new NotServingRegionException(
3120          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
3121      }
3122    } else if (previous == null) {
3123      LOG.info("Received CLOSE for {}", encodedName);
3124    } else if (Boolean.FALSE.equals(previous)) {
3125      LOG.info("Received CLOSE for the region: " + encodedName
3126        + ", which we are already trying to CLOSE, but not completed yet");
3127      return true;
3128    }
3129
3130    if (actualRegion == null) {
3131      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3132      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3133      // The master deletes the znode when it receives this exception.
3134      throw new NotServingRegionException(
3135        "The region " + encodedName + " is not online, and is not opening.");
3136    }
3137
3138    CloseRegionHandler crh;
3139    final RegionInfo hri = actualRegion.getRegionInfo();
3140    if (hri.isMetaRegion()) {
3141      crh = new CloseMetaHandler(this, this, hri, abort);
3142    } else {
3143      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3144    }
3145    this.executorService.submit(crh);
3146    return true;
3147  }
3148
3149  /**
3150   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
3151   *         member of the online regions.
3152   */
3153  public HRegion getOnlineRegion(final byte[] regionName) {
3154    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3155    return this.onlineRegions.get(encodedRegionName);
3156  }
3157
3158  @Override
3159  public HRegion getRegion(final String encodedRegionName) {
3160    return this.onlineRegions.get(encodedRegionName);
3161  }
3162
3163  @Override
3164  public boolean removeRegion(final HRegion r, ServerName destination) {
3165    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3166    if (DataTieringManager.getInstance() != null) {
3167      DataTieringManager.getInstance().getRegionColdDataSize()
3168        .remove(r.getRegionInfo().getEncodedName());
3169    }
3170    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3171    if (destination != null) {
3172      long closeSeqNum = r.getMaxFlushedSeqId();
3173      if (closeSeqNum == HConstants.NO_SEQNUM) {
3174        // No edits in WAL for this region; get the sequence number when the region was opened.
3175        closeSeqNum = r.getOpenSeqNum();
3176        if (closeSeqNum == HConstants.NO_SEQNUM) {
3177          closeSeqNum = 0;
3178        }
3179      }
3180      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3181      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3182      if (selfMove) {
3183        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3184          r.getRegionInfo().getEncodedName(),
3185          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3186      }
3187    }
3188    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3189    configurationManager.deregisterObserver(r);
3190    return toReturn != null;
3191  }
3192
3193  /**
3194   * Protected Utility method for safely obtaining an HRegion handle.
3195   * @param regionName Name of online {@link HRegion} to return
3196   * @return {@link HRegion} for <code>regionName</code>
3197   */
3198  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3199    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3200    return getRegionByEncodedName(regionName, encodedRegionName);
3201  }
3202
3203  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3204    return getRegionByEncodedName(null, encodedRegionName);
3205  }
3206
3207  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3208    throws NotServingRegionException {
3209    HRegion region = this.onlineRegions.get(encodedRegionName);
3210    if (region == null) {
3211      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3212      if (moveInfo != null) {
3213        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3214      }
3215      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3216      String regionNameStr =
3217        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3218      if (isOpening != null && isOpening) {
3219        throw new RegionOpeningException(
3220          "Region " + regionNameStr + " is opening on " + this.serverName);
3221      }
3222      throw new NotServingRegionException(
3223        "" + regionNameStr + " is not online on " + this.serverName);
3224    }
3225    return region;
3226  }
3227
3228  /**
3229   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3230   * already.
3231   * @param t   Throwable
3232   * @param msg Message to log in error. Can be null.
3233   * @return Throwable converted to an IOE; methods can only let out IOEs.
3234   */
3235  private Throwable cleanup(final Throwable t, final String msg) {
3236    // Don't log as error if NSRE; NSRE is 'normal' operation.
3237    if (t instanceof NotServingRegionException) {
3238      LOG.debug("NotServingRegionException; " + t.getMessage());
3239      return t;
3240    }
3241    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3242    if (msg == null) {
3243      LOG.error("", e);
3244    } else {
3245      LOG.error(msg, e);
3246    }
3247    if (!rpcServices.checkOOME(t)) {
3248      checkFileSystem();
3249    }
3250    return t;
3251  }
3252
3253  /**
3254   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3255   * @return Make <code>t</code> an IOE if it isn't already.
3256   */
3257  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3258    return (t instanceof IOException ? (IOException) t
3259      : msg == null || msg.length() == 0 ? new IOException(t)
3260      : new IOException(msg, t));
3261  }
3262
3263  /**
3264   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3265   * stopRequested
3266   * @return false if file system is not available
3267   */
3268  boolean checkFileSystem() {
3269    if (this.dataFsOk && this.dataFs != null) {
3270      try {
3271        FSUtils.checkFileSystemAvailable(this.dataFs);
3272      } catch (IOException e) {
3273        abort("File System not available", e);
3274        this.dataFsOk = false;
3275      }
3276    }
3277    return this.dataFsOk;
3278  }
3279
3280  @Override
3281  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3282    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3283    Address[] addr = new Address[favoredNodes.size()];
3284    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3285    // it is a map of region name to Address[]
3286    for (int i = 0; i < favoredNodes.size(); i++) {
3287      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3288    }
3289    regionFavoredNodesMap.put(encodedRegionName, addr);
3290  }
3291
3292  /**
3293   * Return the favored nodes for a region given its encoded name. Look at the comment around
3294   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3295   * @param encodedRegionName the encoded region name.
3296   * @return array of favored locations
3297   */
3298  @Override
3299  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3300    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3301  }
3302
3303  @Override
3304  public ServerNonceManager getNonceManager() {
3305    return this.nonceManager;
3306  }
3307
3308  private static class MovedRegionInfo {
3309    private final ServerName serverName;
3310    private final long seqNum;
3311
3312    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3313      this.serverName = serverName;
3314      this.seqNum = closeSeqNum;
3315    }
3316
3317    public ServerName getServerName() {
3318      return serverName;
3319    }
3320
3321    public long getSeqNum() {
3322      return seqNum;
3323    }
3324  }
3325
3326  /**
3327   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3328   * number of network calls instead of reducing them.
3329   */
3330  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3331
3332  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3333    boolean selfMove) {
3334    if (selfMove) {
3335      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3336      return;
3337    }
3338    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3339      + closeSeqNum);
3340    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3341  }
3342
3343  void removeFromMovedRegions(String encodedName) {
3344    movedRegionInfoCache.invalidate(encodedName);
3345  }
3346
3347  @InterfaceAudience.Private
3348  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3349    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3350  }
3351
3352  @InterfaceAudience.Private
3353  public int movedRegionCacheExpiredTime() {
3354    return TIMEOUT_REGION_MOVED;
3355  }
3356
3357  private String getMyEphemeralNodePath() {
3358    return zooKeeper.getZNodePaths().getRsPath(serverName);
3359  }
3360
3361  private boolean isHealthCheckerConfigured() {
3362    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3363    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3364  }
3365
3366  /** Returns the underlying {@link CompactSplit} for the servers */
3367  public CompactSplit getCompactSplitThread() {
3368    return this.compactSplitThread;
3369  }
3370
3371  CoprocessorServiceResponse execRegionServerService(
3372    @SuppressWarnings("UnusedParameters") final RpcController controller,
3373    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3374    try {
3375      ServerRpcController serviceController = new ServerRpcController();
3376      CoprocessorServiceCall call = serviceRequest.getCall();
3377      String serviceName = call.getServiceName();
3378      Service service = coprocessorServiceHandlers.get(serviceName);
3379      if (service == null) {
3380        throw new UnknownProtocolException(null,
3381          "No registered coprocessor executorService found for " + serviceName);
3382      }
3383      ServiceDescriptor serviceDesc = service.getDescriptorForType();
3384
3385      String methodName = call.getMethodName();
3386      MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3387      if (methodDesc == null) {
3388        throw new UnknownProtocolException(service.getClass(),
3389          "Unknown method " + methodName + " called on executorService " + serviceName);
3390      }
3391
3392      Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3393      final Message.Builder responseBuilder =
3394        service.getResponsePrototype(methodDesc).newBuilderForType();
3395      service.callMethod(methodDesc, serviceController, request, message -> {
3396        if (message != null) {
3397          responseBuilder.mergeFrom(message);
3398        }
3399      });
3400      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3401      if (exception != null) {
3402        throw exception;
3403      }
3404      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3405    } catch (IOException ie) {
3406      throw new ServiceException(ie);
3407    }
3408  }
3409
3410  /**
3411   * May be null if this is a master which not carry table.
3412   * @return The block cache instance used by the regionserver.
3413   */
3414  @Override
3415  public Optional<BlockCache> getBlockCache() {
3416    return Optional.ofNullable(this.blockCache);
3417  }
3418
3419  /**
3420   * May be null if this is a master which not carry table.
3421   * @return The cache for mob files used by the regionserver.
3422   */
3423  @Override
3424  public Optional<MobFileCache> getMobFileCache() {
3425    return Optional.ofNullable(this.mobFileCache);
3426  }
3427
3428  CacheEvictionStats clearRegionBlockCache(Region region) {
3429    long evictedBlocks = 0;
3430
3431    for (Store store : region.getStores()) {
3432      for (StoreFile hFile : store.getStorefiles()) {
3433        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3434      }
3435    }
3436
3437    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3438  }
3439
3440  @Override
3441  public double getCompactionPressure() {
3442    double max = 0;
3443    for (Region region : onlineRegions.values()) {
3444      for (Store store : region.getStores()) {
3445        double normCount = store.getCompactionPressure();
3446        if (normCount > max) {
3447          max = normCount;
3448        }
3449      }
3450    }
3451    return max;
3452  }
3453
3454  @Override
3455  public HeapMemoryManager getHeapMemoryManager() {
3456    return hMemManager;
3457  }
3458
3459  public MemStoreFlusher getMemStoreFlusher() {
3460    return cacheFlusher;
3461  }
3462
3463  /**
3464   * For testing
3465   * @return whether all wal roll request finished for this regionserver
3466   */
3467  @InterfaceAudience.Private
3468  public boolean walRollRequestFinished() {
3469    return this.walRoller.walRollFinished();
3470  }
3471
3472  @Override
3473  public ThroughputController getFlushThroughputController() {
3474    return flushThroughputController;
3475  }
3476
3477  @Override
3478  public double getFlushPressure() {
3479    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3480      // return 0 during RS initialization
3481      return 0.0;
3482    }
3483    return getRegionServerAccounting().getFlushPressure();
3484  }
3485
3486  @Override
3487  public void onConfigurationChange(Configuration newConf) {
3488    ThroughputController old = this.flushThroughputController;
3489    if (old != null) {
3490      old.stop("configuration change");
3491    }
3492    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3493    try {
3494      Superusers.initialize(newConf);
3495    } catch (IOException e) {
3496      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3497    }
3498
3499    boolean originalIsReadOnlyEnabled = CoprocessorConfigurationUtil
3500      .areReadOnlyCoprocessorsLoaded(this.conf, CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
3501
3502    CoprocessorConfigurationUtil.maybeUpdateCoprocessors(newConf, originalIsReadOnlyEnabled,
3503      this.rsHost, CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, false, this.toString(),
3504      conf -> {
3505        this.rsHost = new RegionServerCoprocessorHost(this, conf);
3506        CoprocessorConfigurationUtil.updateCoprocessorListInConf(this.conf, conf,
3507          CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
3508      });
3509  }
3510
3511  @Override
3512  public MetricsRegionServer getMetrics() {
3513    return metricsRegionServer;
3514  }
3515
3516  @Override
3517  public SecureBulkLoadManager getSecureBulkLoadManager() {
3518    return this.secureBulkLoadManager;
3519  }
3520
3521  @Override
3522  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3523    final Abortable abort) {
3524    final LockServiceClient client =
3525      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3526    return client.regionLock(regionInfo, description, abort);
3527  }
3528
3529  @Override
3530  public void unassign(byte[] regionName) throws IOException {
3531    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3532  }
3533
3534  @Override
3535  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3536    return this.rsSpaceQuotaManager;
3537  }
3538
3539  @Override
3540  public boolean reportFileArchivalForQuotas(TableName tableName,
3541    Collection<Entry<String, Long>> archivedFiles) {
3542    if (TEST_SKIP_REPORTING_TRANSITION) {
3543      return false;
3544    }
3545    RegionServerStatusService.BlockingInterface rss = rssStub;
3546    if (rss == null || rsSpaceQuotaManager == null) {
3547      // the current server could be stopping.
3548      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3549      return false;
3550    }
3551    try {
3552      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3553        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3554      rss.reportFileArchival(null, request);
3555    } catch (ServiceException se) {
3556      IOException ioe = ProtobufUtil.getRemoteException(se);
3557      if (ioe instanceof PleaseHoldException) {
3558        if (LOG.isTraceEnabled()) {
3559          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3560            + " This will be retried.", ioe);
3561        }
3562        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3563        return false;
3564      }
3565      if (rssStub == rss) {
3566        rssStub = null;
3567      }
3568      // re-create the stub if we failed to report the archival
3569      createRegionServerStatusStub(true);
3570      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3571      return false;
3572    }
3573    return true;
3574  }
3575
3576  void executeProcedure(long procId, long initiatingMasterActiveTime,
3577    RSProcedureCallable callable) {
3578    executorService
3579      .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable));
3580  }
3581
3582  public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, Throwable error,
3583    byte[] procResultData) {
3584    procedureResultReporter.complete(procId, initiatingMasterActiveTime, error, procResultData);
3585  }
3586
3587  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3588    RegionServerStatusService.BlockingInterface rss;
3589    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3590    for (;;) {
3591      rss = rssStub;
3592      if (rss != null) {
3593        break;
3594      }
3595      createRegionServerStatusStub();
3596    }
3597    try {
3598      rss.reportProcedureDone(null, request);
3599    } catch (ServiceException se) {
3600      if (rssStub == rss) {
3601        rssStub = null;
3602      }
3603      throw ProtobufUtil.getRemoteException(se);
3604    }
3605  }
3606
3607  /**
3608   * Will ignore the open/close region procedures which already submitted or executed. When master
3609   * had unfinished open/close region procedure and restarted, new active master may send duplicate
3610   * open/close region request to regionserver. The open/close request is submitted to a thread pool
3611   * and execute. So first need a cache for submitted open/close region procedures. After the
3612   * open/close region request executed and report region transition succeed, cache it in executed
3613   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3614   * transition succeed, master will not send the open/close region request to regionserver again.
3615   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3616   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
3617   * HBASE-22404 for more details.
3618   * @param procId the id of the open/close region procedure
3619   * @return true if the procedure can be submitted.
3620   */
3621  boolean submitRegionProcedure(long procId) {
3622    if (procId == -1) {
3623      return true;
3624    }
3625    // Ignore the region procedures which already submitted.
3626    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3627    if (previous != null) {
3628      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3629      return false;
3630    }
3631    // Ignore the region procedures which already executed.
3632    if (executedRegionProcedures.getIfPresent(procId) != null) {
3633      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3634      return false;
3635    }
3636    return true;
3637  }
3638
3639  /**
3640   * See {@link #submitRegionProcedure(long)}.
3641   * @param procId the id of the open/close region procedure
3642   */
3643  public void finishRegionProcedure(long procId) {
3644    executedRegionProcedures.put(procId, procId);
3645    submittedRegionProcedures.remove(procId);
3646  }
3647
3648  /**
3649   * Force to terminate region server when abort timeout.
3650   */
3651  private static class SystemExitWhenAbortTimeout extends TimerTask {
3652
3653    public SystemExitWhenAbortTimeout() {
3654    }
3655
3656    @Override
3657    public void run() {
3658      LOG.warn("Aborting region server timed out, terminating forcibly"
3659        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
3660        + " Thread dump to stdout.");
3661      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3662      Runtime.getRuntime().halt(1);
3663    }
3664  }
3665
3666  @InterfaceAudience.Private
3667  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3668    return compactedFileDischarger;
3669  }
3670
3671  /**
3672   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3673   * @return pause time
3674   */
3675  @InterfaceAudience.Private
3676  public long getRetryPauseTime() {
3677    return this.retryPauseTime;
3678  }
3679
3680  @Override
3681  public Optional<ServerName> getActiveMaster() {
3682    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3683  }
3684
3685  @Override
3686  public List<ServerName> getBackupMasters() {
3687    return masterAddressTracker.getBackupMasters();
3688  }
3689
3690  @Override
3691  public Iterator<ServerName> getBootstrapNodes() {
3692    return bootstrapNodeManager.getBootstrapNodes().iterator();
3693  }
3694
3695  @Override
3696  public List<HRegionLocation> getMetaLocations() {
3697    return metaRegionLocationCache.getMetaRegionLocations();
3698  }
3699
3700  @Override
3701  protected NamedQueueRecorder createNamedQueueRecord() {
3702    return NamedQueueRecorder.getInstance(conf);
3703  }
3704
3705  @Override
3706  protected boolean clusterMode() {
3707    // this method will be called in the constructor of super class, so we can not return masterless
3708    // directly here, as it will always be false.
3709    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3710  }
3711
3712  @InterfaceAudience.Private
3713  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
3714    return brokenStoreFileCleaner;
3715  }
3716
3717  @InterfaceAudience.Private
3718  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
3719    return rsMobFileCleanerChore;
3720  }
3721
3722  RSSnapshotVerifier getRsSnapshotVerifier() {
3723    return rsSnapshotVerifier;
3724  }
3725
3726  @Override
3727  protected void stopChores() {
3728    shutdownChore(nonceManagerChore);
3729    shutdownChore(compactionChecker);
3730    shutdownChore(compactedFileDischarger);
3731    shutdownChore(periodicFlusher);
3732    shutdownChore(healthCheckChore);
3733    shutdownChore(executorStatusChore);
3734    shutdownChore(storefileRefresher);
3735    shutdownChore(fsUtilizationChore);
3736    shutdownChore(namedQueueServiceChore);
3737    shutdownChore(brokenStoreFileCleaner);
3738    shutdownChore(rsMobFileCleanerChore);
3739    shutdownChore(replicationMarkerChore);
3740  }
3741
3742  @Override
3743  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3744    return regionReplicationBufferManager;
3745  }
3746}