001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
034
035import io.opentelemetry.api.trace.Span;
036import io.opentelemetry.api.trace.StatusCode;
037import io.opentelemetry.context.Scope;
038import java.io.IOException;
039import java.io.PrintWriter;
040import java.lang.management.MemoryUsage;
041import java.lang.reflect.Constructor;
042import java.net.InetSocketAddress;
043import java.time.Duration;
044import java.util.ArrayList;
045import java.util.Collection;
046import java.util.Collections;
047import java.util.Comparator;
048import java.util.HashSet;
049import java.util.Iterator;
050import java.util.List;
051import java.util.Map;
052import java.util.Map.Entry;
053import java.util.Objects;
054import java.util.Optional;
055import java.util.Set;
056import java.util.SortedMap;
057import java.util.Timer;
058import java.util.TimerTask;
059import java.util.TreeMap;
060import java.util.TreeSet;
061import java.util.concurrent.ConcurrentHashMap;
062import java.util.concurrent.ConcurrentMap;
063import java.util.concurrent.ConcurrentSkipListMap;
064import java.util.concurrent.ThreadLocalRandom;
065import java.util.concurrent.TimeUnit;
066import java.util.concurrent.atomic.AtomicBoolean;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.function.Consumer;
069import java.util.stream.Collectors;
070import javax.management.MalformedObjectNameException;
071import javax.servlet.http.HttpServlet;
072import org.apache.commons.lang3.StringUtils;
073import org.apache.commons.lang3.mutable.MutableFloat;
074import org.apache.hadoop.conf.Configuration;
075import org.apache.hadoop.fs.FileSystem;
076import org.apache.hadoop.fs.Path;
077import org.apache.hadoop.hbase.Abortable;
078import org.apache.hadoop.hbase.CacheEvictionStats;
079import org.apache.hadoop.hbase.CallQueueTooBigException;
080import org.apache.hadoop.hbase.ClockOutOfSyncException;
081import org.apache.hadoop.hbase.DoNotRetryIOException;
082import org.apache.hadoop.hbase.ExecutorStatusChore;
083import org.apache.hadoop.hbase.HBaseConfiguration;
084import org.apache.hadoop.hbase.HBaseInterfaceAudience;
085import org.apache.hadoop.hbase.HBaseServerBase;
086import org.apache.hadoop.hbase.HConstants;
087import org.apache.hadoop.hbase.HDFSBlocksDistribution;
088import org.apache.hadoop.hbase.HRegionLocation;
089import org.apache.hadoop.hbase.HealthCheckChore;
090import org.apache.hadoop.hbase.MetaTableAccessor;
091import org.apache.hadoop.hbase.NotServingRegionException;
092import org.apache.hadoop.hbase.PleaseHoldException;
093import org.apache.hadoop.hbase.ScheduledChore;
094import org.apache.hadoop.hbase.ServerName;
095import org.apache.hadoop.hbase.Stoppable;
096import org.apache.hadoop.hbase.TableName;
097import org.apache.hadoop.hbase.YouAreDeadException;
098import org.apache.hadoop.hbase.ZNodeClearer;
099import org.apache.hadoop.hbase.client.ConnectionUtils;
100import org.apache.hadoop.hbase.client.RegionInfo;
101import org.apache.hadoop.hbase.client.RegionInfoBuilder;
102import org.apache.hadoop.hbase.client.locking.EntityLock;
103import org.apache.hadoop.hbase.client.locking.LockServiceClient;
104import org.apache.hadoop.hbase.conf.ConfigurationObserver;
105import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
106import org.apache.hadoop.hbase.exceptions.RegionMovedException;
107import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
108import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
109import org.apache.hadoop.hbase.executor.ExecutorType;
110import org.apache.hadoop.hbase.http.InfoServer;
111import org.apache.hadoop.hbase.io.hfile.BlockCache;
112import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
113import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
114import org.apache.hadoop.hbase.io.hfile.HFile;
115import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
116import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
117import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
118import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
119import org.apache.hadoop.hbase.ipc.RpcClient;
120import org.apache.hadoop.hbase.ipc.RpcServer;
121import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
122import org.apache.hadoop.hbase.ipc.ServerRpcController;
123import org.apache.hadoop.hbase.log.HBaseMarkers;
124import org.apache.hadoop.hbase.mob.MobFileCache;
125import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
126import org.apache.hadoop.hbase.monitoring.TaskMonitor;
127import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
128import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
129import org.apache.hadoop.hbase.net.Address;
130import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
131import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
132import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
133import org.apache.hadoop.hbase.quotas.QuotaUtil;
134import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
135import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
136import org.apache.hadoop.hbase.quotas.RegionSize;
137import org.apache.hadoop.hbase.quotas.RegionSizeStore;
138import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
139import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
140import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
141import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
142import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
143import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
144import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
145import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
146import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
147import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
148import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
149import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
150import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
151import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
152import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
153import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
154import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
155import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
156import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
157import org.apache.hadoop.hbase.security.SecurityConstants;
158import org.apache.hadoop.hbase.security.Superusers;
159import org.apache.hadoop.hbase.security.User;
160import org.apache.hadoop.hbase.security.UserProvider;
161import org.apache.hadoop.hbase.trace.TraceUtil;
162import org.apache.hadoop.hbase.util.Bytes;
163import org.apache.hadoop.hbase.util.CompressionTest;
164import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
165import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
166import org.apache.hadoop.hbase.util.FSUtils;
167import org.apache.hadoop.hbase.util.FutureUtils;
168import org.apache.hadoop.hbase.util.JvmPauseMonitor;
169import org.apache.hadoop.hbase.util.Pair;
170import org.apache.hadoop.hbase.util.RetryCounter;
171import org.apache.hadoop.hbase.util.RetryCounterFactory;
172import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
173import org.apache.hadoop.hbase.util.Threads;
174import org.apache.hadoop.hbase.util.VersionInfo;
175import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
176import org.apache.hadoop.hbase.wal.WAL;
177import org.apache.hadoop.hbase.wal.WALFactory;
178import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
179import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
180import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
181import org.apache.hadoop.hbase.zookeeper.ZKUtil;
182import org.apache.hadoop.ipc.RemoteException;
183import org.apache.hadoop.util.ReflectionUtils;
184import org.apache.yetus.audience.InterfaceAudience;
185import org.apache.zookeeper.KeeperException;
186import org.slf4j.Logger;
187import org.slf4j.LoggerFactory;
188
189import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
190import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
191import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
192import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
193import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
194import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
195import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
196import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
197import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
198import org.apache.hbase.thirdparty.com.google.protobuf.Message;
199import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
200import org.apache.hbase.thirdparty.com.google.protobuf.Service;
201import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
202import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
203import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
204
205import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
206import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
235
236/**
237 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
238 * are many HRegionServers in a single HBase deployment.
239 */
240@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
241@SuppressWarnings({ "deprecation" })
242public class HRegionServer extends HBaseServerBase<RSRpcServices>
243  implements RegionServerServices, LastSequenceId {
244
245  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
246
247  int unitMB = 1024 * 1024;
248  int unitKB = 1024;
249
250  /**
251   * For testing only! Set to true to skip notifying region assignment to master .
252   */
253  @InterfaceAudience.Private
254  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
255  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
256
257  /**
258   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
259   * region action in progress false - if close region action in progress
260   */
261  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
262    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
263
264  /**
265   * Used to cache the open/close region procedures which already submitted. See
266   * {@link #submitRegionProcedure(long)}.
267   */
268  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
269  /**
270   * Used to cache the open/close region procedures which already executed. See
271   * {@link #submitRegionProcedure(long)}.
272   */
273  private final Cache<Long, Long> executedRegionProcedures =
274    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
275
276  /**
277   * Used to cache the moved-out regions
278   */
279  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
280    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
281
282  private MemStoreFlusher cacheFlusher;
283
284  private HeapMemoryManager hMemManager;
285
286  // Replication services. If no replication, this handler will be null.
287  private ReplicationSourceService replicationSourceHandler;
288  private ReplicationSinkService replicationSinkHandler;
289  private boolean sameReplicationSourceAndSink;
290
291  // Compactions
292  private CompactSplit compactSplitThread;
293
294  /**
295   * Map of regions currently being served by this region server. Key is the encoded region name.
296   * All access should be synchronized.
297   */
298  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
299  /**
300   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
301   * need to be a ConcurrentHashMap?
302   */
303  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
304
305  /**
306   * Map of encoded region names to the DataNode locations they should be hosted on We store the
307   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
308   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
309   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
310   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
311   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
312   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
313   * will be fresh when we need it.
314   */
315  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
316
317  private LeaseManager leaseManager;
318
319  private volatile boolean dataFsOk;
320
321  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
322  // Default abort timeout is 1200 seconds for safe
323  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
324  // Will run this task when abort timeout
325  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
326
327  // A state before we go into stopped state. At this stage we're closing user
328  // space regions.
329  private boolean stopping = false;
330  private volatile boolean killed = false;
331
332  private final int threadWakeFrequency;
333
334  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
335  private final int compactionCheckFrequency;
336  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
337  private final int flushCheckFrequency;
338
339  // Stub to do region server status calls against the master.
340  private volatile RegionServerStatusService.BlockingInterface rssStub;
341  private volatile LockService.BlockingInterface lockStub;
342  // RPC client. Used to make the stub above that does region server status checking.
343  private RpcClient rpcClient;
344
345  private UncaughtExceptionHandler uncaughtExceptionHandler;
346
347  private JvmPauseMonitor pauseMonitor;
348
349  private RSSnapshotVerifier rsSnapshotVerifier;
350
351  /** region server process name */
352  public static final String REGIONSERVER = "regionserver";
353
354  private MetricsRegionServer metricsRegionServer;
355  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
356
357  /**
358   * Check for compactions requests.
359   */
360  private ScheduledChore compactionChecker;
361
362  /**
363   * Check for flushes
364   */
365  private ScheduledChore periodicFlusher;
366
367  private volatile WALFactory walFactory;
368
369  private LogRoller walRoller;
370
371  // A thread which calls reportProcedureDone
372  private RemoteProcedureResultReporter procedureResultReporter;
373
374  // flag set after we're done setting up server threads
375  final AtomicBoolean online = new AtomicBoolean(false);
376
377  // master address tracker
378  private final MasterAddressTracker masterAddressTracker;
379
380  // Log Splitting Worker
381  private SplitLogWorker splitLogWorker;
382
383  private final int shortOperationTimeout;
384
385  // Time to pause if master says 'please hold'
386  private final long retryPauseTime;
387
388  private final RegionServerAccounting regionServerAccounting;
389
390  private NamedQueueServiceChore namedQueueServiceChore = null;
391
392  // Block cache
393  private BlockCache blockCache;
394  // The cache for mob files
395  private MobFileCache mobFileCache;
396
397  /** The health check chore. */
398  private HealthCheckChore healthCheckChore;
399
400  /** The Executor status collect chore. */
401  private ExecutorStatusChore executorStatusChore;
402
403  /** The nonce manager chore. */
404  private ScheduledChore nonceManagerChore;
405
406  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
407
408  /**
409   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
410   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
411   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
412   */
413  @Deprecated
414  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
415  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
416    "hbase.regionserver.hostname.disable.master.reversedns";
417
418  /**
419   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
420   * Exception will be thrown if both are used.
421   */
422  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
423  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
424    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
425
426  /**
427   * Unique identifier for the cluster we are a part of.
428   */
429  private String clusterId;
430
431  // chore for refreshing store files for secondary regions
432  private StorefileRefresherChore storefileRefresher;
433
434  private volatile RegionServerCoprocessorHost rsHost;
435
436  private RegionServerProcedureManagerHost rspmHost;
437
438  private RegionServerRpcQuotaManager rsQuotaManager;
439  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
440
441  /**
442   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
443   * case where client doesn't receive the response from a successful operation and retries. We
444   * track the successful ops for some time via a nonce sent by client and handle duplicate
445   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
446   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
447   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
448   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
449   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
450   * recovery during normal region move, so nonces will not be transfered. We can have separate
451   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
452   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
453   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
454   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
455   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
456   * recovered during move.
457   */
458  final ServerNonceManager nonceManager;
459
460  private BrokenStoreFileCleaner brokenStoreFileCleaner;
461
462  private RSMobFileCleanerChore rsMobFileCleanerChore;
463
464  @InterfaceAudience.Private
465  CompactedHFilesDischarger compactedFileDischarger;
466
467  private volatile ThroughputController flushThroughputController;
468
469  private SecureBulkLoadManager secureBulkLoadManager;
470
471  private FileSystemUtilizationChore fsUtilizationChore;
472
473  private BootstrapNodeManager bootstrapNodeManager;
474
475  /**
476   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
477   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
478   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
479   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
480   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
481   */
482  private final boolean masterless;
483  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
484
485  /** regionserver codec list **/
486  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
487
488  // A timer to shutdown the process if abort takes too long
489  private Timer abortMonitor;
490
491  private RegionReplicationBufferManager regionReplicationBufferManager;
492
493  /*
494   * Chore that creates replication marker rows.
495   */
496  private ReplicationMarkerChore replicationMarkerChore;
497
498  // A timer submit requests to the PrefetchExecutor
499  private PrefetchExecutorNotifier prefetchExecutorNotifier;
500
501  /**
502   * Starts a HRegionServer at the default location.
503   * <p/>
504   * Don't start any services or managers in here in the Constructor. Defer till after we register
505   * with the Master as much as possible. See {@link #startServices}.
506   */
507  public HRegionServer(final Configuration conf) throws IOException {
508    super(conf, "RegionServer"); // thread name
509    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
510    try (Scope ignored = span.makeCurrent()) {
511      this.dataFsOk = true;
512      this.masterless = !clusterMode();
513      MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf);
514      HFile.checkHFileVersion(this.conf);
515      checkCodecs(this.conf);
516      FSUtils.setupShortCircuitRead(this.conf);
517
518      // Disable usage of meta replicas in the regionserver
519      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
520      // Config'ed params
521      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
522      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
523      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
524
525      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
526      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
527
528      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
529        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
530
531      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
532        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
533
534      regionServerAccounting = new RegionServerAccounting(conf);
535
536      blockCache = BlockCacheFactory.createBlockCache(conf);
537      // The call below, instantiates the DataTieringManager only when
538      // the configuration "hbase.regionserver.datatiering.enable" is set to true.
539      DataTieringManager.instantiate(conf, onlineRegions);
540
541      mobFileCache = new MobFileCache(conf);
542
543      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
544
545      uncaughtExceptionHandler =
546        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
547
548      // If no master in cluster, skip trying to track one or look for a cluster status.
549      if (!this.masterless) {
550        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
551        masterAddressTracker.start();
552      } else {
553        masterAddressTracker = null;
554      }
555      this.rpcServices.start(zooKeeper);
556      span.setStatus(StatusCode.OK);
557    } catch (Throwable t) {
558      // Make sure we log the exception. HRegionServer is often started via reflection and the
559      // cause of failed startup is lost.
560      TraceUtil.setError(span, t);
561      LOG.error("Failed construction RegionServer", t);
562      throw t;
563    } finally {
564      span.end();
565    }
566  }
567
568  // HMaster should override this method to load the specific config for master
569  @Override
570  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
571    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
572    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
573      if (!StringUtils.isBlank(hostname)) {
574        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
575          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
576          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
577          + UNSAFE_RS_HOSTNAME_KEY + " is used";
578        throw new IOException(msg);
579      } else {
580        return rpcServices.getSocketAddress().getHostName();
581      }
582    } else {
583      return hostname;
584    }
585  }
586
587  @Override
588  protected void login(UserProvider user, String host) throws IOException {
589    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
590      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
591  }
592
593  @Override
594  protected String getProcessName() {
595    return REGIONSERVER;
596  }
597
598  @Override
599  protected RegionServerCoprocessorHost getCoprocessorHost() {
600    return getRegionServerCoprocessorHost();
601  }
602
603  @Override
604  protected boolean canCreateBaseZNode() {
605    return !clusterMode();
606  }
607
608  @Override
609  protected boolean canUpdateTableDescriptor() {
610    return false;
611  }
612
613  @Override
614  protected boolean cacheTableDescriptor() {
615    return false;
616  }
617
618  protected RSRpcServices createRpcServices() throws IOException {
619    return new RSRpcServices(this);
620  }
621
622  @Override
623  protected void configureInfoServer(InfoServer infoServer) {
624    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
625    infoServer.setAttribute(REGIONSERVER, this);
626  }
627
628  @Override
629  protected Class<? extends HttpServlet> getDumpServlet() {
630    return RSDumpServlet.class;
631  }
632
633  /**
634   * Used by {@link RSDumpServlet} to generate debugging information.
635   */
636  public void dumpRowLocks(final PrintWriter out) {
637    StringBuilder sb = new StringBuilder();
638    for (HRegion region : getRegions()) {
639      if (region.getLockedRows().size() > 0) {
640        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
641          sb.setLength(0);
642          sb.append(region.getTableDescriptor().getTableName()).append(",")
643            .append(region.getRegionInfo().getEncodedName()).append(",");
644          sb.append(rowLockContext.toString());
645          out.println(sb);
646        }
647      }
648    }
649  }
650
651  @Override
652  public boolean registerService(Service instance) {
653    // No stacking of instances is allowed for a single executorService name
654    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
655    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
656    if (coprocessorServiceHandlers.containsKey(serviceName)) {
657      LOG.error("Coprocessor executorService " + serviceName
658        + " already registered, rejecting request from " + instance);
659      return false;
660    }
661
662    coprocessorServiceHandlers.put(serviceName, instance);
663    if (LOG.isDebugEnabled()) {
664      LOG.debug(
665        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
666    }
667    return true;
668  }
669
670  /**
671   * Run test on configured codecs to make sure supporting libs are in place.
672   */
673  private static void checkCodecs(final Configuration c) throws IOException {
674    // check to see if the codec list is available:
675    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
676    if (codecs == null) {
677      return;
678    }
679    for (String codec : codecs) {
680      if (!CompressionTest.testCompression(codec)) {
681        throw new IOException(
682          "Compression codec " + codec + " not supported, aborting RS construction");
683      }
684    }
685  }
686
687  public String getClusterId() {
688    return this.clusterId;
689  }
690
691  /**
692   * All initialization needed before we go register with Master.<br>
693   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
694   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
695   */
696  private void preRegistrationInitialization() {
697    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
698    try (Scope ignored = span.makeCurrent()) {
699      initializeZooKeeper();
700      setupClusterConnection();
701      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
702      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
703      // Setup RPC client for master communication
704      this.rpcClient = asyncClusterConnection.getRpcClient();
705      span.setStatus(StatusCode.OK);
706    } catch (Throwable t) {
707      // Call stop if error or process will stick around for ever since server
708      // puts up non-daemon threads.
709      TraceUtil.setError(span, t);
710      this.rpcServices.stop();
711      abort("Initialization of RS failed.  Hence aborting RS.", t);
712    } finally {
713      span.end();
714    }
715  }
716
717  /**
718   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
719   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
720   * <p>
721   * Finally open long-living server short-circuit connection.
722   */
723  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
724      justification = "cluster Id znode read would give us correct response")
725  private void initializeZooKeeper() throws IOException, InterruptedException {
726    // Nothing to do in here if no Master in the mix.
727    if (this.masterless) {
728      return;
729    }
730
731    // Create the master address tracker, register with zk, and start it. Then
732    // block until a master is available. No point in starting up if no master
733    // running.
734    blockAndCheckIfStopped(this.masterAddressTracker);
735
736    // Wait on cluster being up. Master will set this flag up in zookeeper
737    // when ready.
738    blockAndCheckIfStopped(this.clusterStatusTracker);
739
740    // If we are HMaster then the cluster id should have already been set.
741    if (clusterId == null) {
742      // Retrieve clusterId
743      // Since cluster status is now up
744      // ID should have already been set by HMaster
745      try {
746        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
747        if (clusterId == null) {
748          this.abort("Cluster ID has not been set");
749        }
750        LOG.info("ClusterId : " + clusterId);
751      } catch (KeeperException e) {
752        this.abort("Failed to retrieve Cluster ID", e);
753      }
754    }
755
756    if (isStopped() || isAborted()) {
757      return; // No need for further initialization
758    }
759
760    // watch for snapshots and other procedures
761    try {
762      rspmHost = new RegionServerProcedureManagerHost();
763      rspmHost.loadProcedures(conf);
764      rspmHost.initialize(this);
765    } catch (KeeperException e) {
766      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
767    }
768  }
769
770  /**
771   * Utilty method to wait indefinitely on a znode availability while checking if the region server
772   * is shut down
773   * @param tracker znode tracker to use
774   * @throws IOException          any IO exception, plus if the RS is stopped
775   * @throws InterruptedException if the waiting thread is interrupted
776   */
777  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
778    throws IOException, InterruptedException {
779    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
780      if (this.stopped) {
781        throw new IOException("Received the shutdown message while waiting.");
782      }
783    }
784  }
785
786  /** Returns True if the cluster is up. */
787  @Override
788  public boolean isClusterUp() {
789    return this.masterless
790      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
791  }
792
793  private void initializeReplicationMarkerChore() {
794    boolean replicationMarkerEnabled =
795      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
796    // If replication or replication marker is not enabled then return immediately.
797    if (replicationMarkerEnabled) {
798      int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
799        REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
800      replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf);
801    }
802  }
803
804  @Override
805  public boolean isStopping() {
806    return stopping;
807  }
808
809  /**
810   * The HRegionServer sticks in this loop until closed.
811   */
812  @Override
813  public void run() {
814    if (isStopped()) {
815      LOG.info("Skipping run; stopped");
816      return;
817    }
818    try {
819      // Do pre-registration initializations; zookeeper, lease threads, etc.
820      preRegistrationInitialization();
821    } catch (Throwable e) {
822      abort("Fatal exception during initialization", e);
823    }
824
825    try {
826      if (!isStopped() && !isAborted()) {
827        installShutdownHook();
828        // Initialize the RegionServerCoprocessorHost now that our ephemeral
829        // node was created, in case any coprocessors want to use ZooKeeper
830        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
831
832        // Try and register with the Master; tell it we are here. Break if server is stopped or
833        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
834        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
835        // come up.
836        LOG.debug("About to register with Master.");
837        TraceUtil.trace(() -> {
838          RetryCounterFactory rcf =
839            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
840          RetryCounter rc = rcf.create();
841          while (keepLooping()) {
842            RegionServerStartupResponse w = reportForDuty();
843            if (w == null) {
844              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
845              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
846              this.sleeper.sleep(sleepTime);
847            } else {
848              handleReportForDutyResponse(w);
849              break;
850            }
851          }
852        }, "HRegionServer.registerWithMaster");
853      }
854
855      if (!isStopped() && isHealthy()) {
856        TraceUtil.trace(() -> {
857          // start the snapshot handler and other procedure handlers,
858          // since the server is ready to run
859          if (this.rspmHost != null) {
860            this.rspmHost.start();
861          }
862          // Start the Quota Manager
863          if (this.rsQuotaManager != null) {
864            rsQuotaManager.start(getRpcServer().getScheduler());
865          }
866          if (this.rsSpaceQuotaManager != null) {
867            this.rsSpaceQuotaManager.start();
868          }
869        }, "HRegionServer.startup");
870      }
871
872      // We registered with the Master. Go into run mode.
873      long lastMsg = EnvironmentEdgeManager.currentTime();
874      long oldRequestCount = -1;
875      // The main run loop.
876      while (!isStopped() && isHealthy()) {
877        if (!isClusterUp()) {
878          if (onlineRegions.isEmpty()) {
879            stop("Exiting; cluster shutdown set and not carrying any regions");
880          } else if (!this.stopping) {
881            this.stopping = true;
882            LOG.info("Closing user regions");
883            closeUserRegions(isAborted());
884          } else {
885            boolean allUserRegionsOffline = areAllUserRegionsOffline();
886            if (allUserRegionsOffline) {
887              // Set stopped if no more write requests tp meta tables
888              // since last time we went around the loop. Any open
889              // meta regions will be closed on our way out.
890              if (oldRequestCount == getWriteRequestCount()) {
891                stop("Stopped; only catalog regions remaining online");
892                break;
893              }
894              oldRequestCount = getWriteRequestCount();
895            } else {
896              // Make sure all regions have been closed -- some regions may
897              // have not got it because we were splitting at the time of
898              // the call to closeUserRegions.
899              closeUserRegions(this.abortRequested.get());
900            }
901            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
902          }
903        }
904        long now = EnvironmentEdgeManager.currentTime();
905        if ((now - lastMsg) >= msgInterval) {
906          tryRegionServerReport(lastMsg, now);
907          lastMsg = EnvironmentEdgeManager.currentTime();
908        }
909        if (!isStopped() && !isAborted()) {
910          this.sleeper.sleep();
911        }
912      } // for
913    } catch (Throwable t) {
914      if (!rpcServices.checkOOME(t)) {
915        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
916        abort(prefix + t.getMessage(), t);
917      }
918    }
919
920    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
921    try (Scope ignored = span.makeCurrent()) {
922      if (this.leaseManager != null) {
923        this.leaseManager.closeAfterLeasesExpire();
924      }
925      if (this.splitLogWorker != null) {
926        splitLogWorker.stop();
927      }
928      stopInfoServer();
929      // Send cache a shutdown.
930      if (blockCache != null) {
931        blockCache.shutdown();
932      }
933      if (mobFileCache != null) {
934        mobFileCache.shutdown();
935      }
936
937      // Send interrupts to wake up threads if sleeping so they notice shutdown.
938      // TODO: Should we check they are alive? If OOME could have exited already
939      if (this.hMemManager != null) {
940        this.hMemManager.stop();
941      }
942      if (this.cacheFlusher != null) {
943        this.cacheFlusher.interruptIfNecessary();
944      }
945      if (this.compactSplitThread != null) {
946        this.compactSplitThread.interruptIfNecessary();
947      }
948
949      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
950      if (rspmHost != null) {
951        rspmHost.stop(this.abortRequested.get() || this.killed);
952      }
953
954      if (this.killed) {
955        // Just skip out w/o closing regions. Used when testing.
956      } else if (abortRequested.get()) {
957        if (this.dataFsOk) {
958          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
959        }
960        LOG.info("aborting server " + this.serverName);
961      } else {
962        closeUserRegions(abortRequested.get());
963        LOG.info("stopping server " + this.serverName);
964      }
965      regionReplicationBufferManager.stop();
966      closeClusterConnection();
967      // Closing the compactSplit thread before closing meta regions
968      if (!this.killed && containsMetaTableRegions()) {
969        if (!abortRequested.get() || this.dataFsOk) {
970          if (this.compactSplitThread != null) {
971            this.compactSplitThread.join();
972            this.compactSplitThread = null;
973          }
974          closeMetaTableRegions(abortRequested.get());
975        }
976      }
977
978      if (!this.killed && this.dataFsOk) {
979        waitOnAllRegionsToClose(abortRequested.get());
980        LOG.info("stopping server " + this.serverName + "; all regions closed.");
981      }
982
983      // Stop the quota manager
984      if (rsQuotaManager != null) {
985        rsQuotaManager.stop();
986      }
987      if (rsSpaceQuotaManager != null) {
988        rsSpaceQuotaManager.stop();
989        rsSpaceQuotaManager = null;
990      }
991
992      // flag may be changed when closing regions throws exception.
993      if (this.dataFsOk) {
994        shutdownWAL(!abortRequested.get());
995      }
996
997      // Make sure the proxy is down.
998      if (this.rssStub != null) {
999        this.rssStub = null;
1000      }
1001      if (this.lockStub != null) {
1002        this.lockStub = null;
1003      }
1004      if (this.rpcClient != null) {
1005        this.rpcClient.close();
1006      }
1007      if (this.leaseManager != null) {
1008        this.leaseManager.close();
1009      }
1010      if (this.pauseMonitor != null) {
1011        this.pauseMonitor.stop();
1012      }
1013
1014      if (!killed) {
1015        stopServiceThreads();
1016      }
1017
1018      if (this.rpcServices != null) {
1019        this.rpcServices.stop();
1020      }
1021
1022      try {
1023        deleteMyEphemeralNode();
1024      } catch (KeeperException.NoNodeException nn) {
1025        // pass
1026      } catch (KeeperException e) {
1027        LOG.warn("Failed deleting my ephemeral node", e);
1028      }
1029      // We may have failed to delete the znode at the previous step, but
1030      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1031      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1032
1033      closeZooKeeper();
1034      closeTableDescriptors();
1035      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1036      span.setStatus(StatusCode.OK);
1037    } finally {
1038      span.end();
1039    }
1040  }
1041
1042  private boolean containsMetaTableRegions() {
1043    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1044  }
1045
1046  private boolean areAllUserRegionsOffline() {
1047    if (getNumberOfOnlineRegions() > 2) {
1048      return false;
1049    }
1050    boolean allUserRegionsOffline = true;
1051    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1052      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1053        allUserRegionsOffline = false;
1054        break;
1055      }
1056    }
1057    return allUserRegionsOffline;
1058  }
1059
1060  /** Returns Current write count for all online regions. */
1061  private long getWriteRequestCount() {
1062    long writeCount = 0;
1063    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1064      writeCount += e.getValue().getWriteRequestsCount();
1065    }
1066    return writeCount;
1067  }
1068
1069  @InterfaceAudience.Private
1070  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1071    throws IOException {
1072    RegionServerStatusService.BlockingInterface rss = rssStub;
1073    if (rss == null) {
1074      // the current server could be stopping.
1075      return;
1076    }
1077    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1078    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1079    try (Scope ignored = span.makeCurrent()) {
1080      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1081      request.setServer(ProtobufUtil.toServerName(this.serverName));
1082      request.setLoad(sl);
1083      rss.regionServerReport(null, request.build());
1084      span.setStatus(StatusCode.OK);
1085    } catch (ServiceException se) {
1086      IOException ioe = ProtobufUtil.getRemoteException(se);
1087      if (ioe instanceof YouAreDeadException) {
1088        // This will be caught and handled as a fatal error in run()
1089        TraceUtil.setError(span, ioe);
1090        throw ioe;
1091      }
1092      if (rssStub == rss) {
1093        rssStub = null;
1094      }
1095      TraceUtil.setError(span, se);
1096      // Couldn't connect to the master, get location from zk and reconnect
1097      // Method blocks until new master is found or we are stopped
1098      createRegionServerStatusStub(true);
1099    } finally {
1100      span.end();
1101    }
1102  }
1103
1104  /**
1105   * Reports the given map of Regions and their size on the filesystem to the active Master.
1106   * @param regionSizeStore The store containing region sizes
1107   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1108   */
1109  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1110    RegionServerStatusService.BlockingInterface rss = rssStub;
1111    if (rss == null) {
1112      // the current server could be stopping.
1113      LOG.trace("Skipping Region size report to HMaster as stub is null");
1114      return true;
1115    }
1116    try {
1117      buildReportAndSend(rss, regionSizeStore);
1118    } catch (ServiceException se) {
1119      IOException ioe = ProtobufUtil.getRemoteException(se);
1120      if (ioe instanceof PleaseHoldException) {
1121        LOG.trace("Failed to report region sizes to Master because it is initializing."
1122          + " This will be retried.", ioe);
1123        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1124        return true;
1125      }
1126      if (rssStub == rss) {
1127        rssStub = null;
1128      }
1129      createRegionServerStatusStub(true);
1130      if (ioe instanceof DoNotRetryIOException) {
1131        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1132        if (doNotRetryEx.getCause() != null) {
1133          Throwable t = doNotRetryEx.getCause();
1134          if (t instanceof UnsupportedOperationException) {
1135            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1136            return false;
1137          }
1138        }
1139      }
1140      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1141    }
1142    return true;
1143  }
1144
1145  /**
1146   * Builds the region size report and sends it to the master. Upon successful sending of the
1147   * report, the region sizes that were sent are marked as sent.
1148   * @param rss             The stub to send to the Master
1149   * @param regionSizeStore The store containing region sizes
1150   */
1151  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1152    RegionSizeStore regionSizeStore) throws ServiceException {
1153    RegionSpaceUseReportRequest request =
1154      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1155    rss.reportRegionSpaceUse(null, request);
1156    // Record the number of size reports sent
1157    if (metricsRegionServer != null) {
1158      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1159    }
1160  }
1161
1162  /**
1163   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1164   * @param regionSizes The size in bytes of regions
1165   * @return The corresponding protocol buffer message.
1166   */
1167  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1168    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1169    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1170      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1171    }
1172    return request.build();
1173  }
1174
1175  /**
1176   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1177   * message.
1178   * @param regionInfo  The RegionInfo
1179   * @param sizeInBytes The size in bytes of the Region
1180   * @return The protocol buffer
1181   */
1182  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1183    return RegionSpaceUse.newBuilder()
1184      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1185      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1186  }
1187
1188  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1189    throws IOException {
1190    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1191    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1192    // the wrapper to compute those numbers in one place.
1193    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1194    // Instead they should be stored in an HBase table so that external visibility into HBase is
1195    // improved; Additionally the load balancer will be able to take advantage of a more complete
1196    // history.
1197    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1198    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1199    long usedMemory = -1L;
1200    long maxMemory = -1L;
1201    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1202    if (usage != null) {
1203      usedMemory = usage.getUsed();
1204      maxMemory = usage.getMax();
1205    }
1206
1207    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1208    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1209    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1210    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1211    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1212    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1213    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1214    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1215    Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder();
1216    for (String coprocessor : coprocessors) {
1217      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1218    }
1219    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1220    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1221    for (HRegion region : regions) {
1222      if (region.getCoprocessorHost() != null) {
1223        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1224        for (String regionCoprocessor : regionCoprocessors) {
1225          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1226        }
1227      }
1228      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1229      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1230        .getCoprocessors()) {
1231        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1232      }
1233    }
1234
1235    getBlockCache().ifPresent(cache -> {
1236      cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1237        regionCachedInfo.forEach((regionName, prefetchSize) -> {
1238          serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB));
1239        });
1240      });
1241    });
1242
1243    serverLoad.setReportStartTime(reportStartTime);
1244    serverLoad.setReportEndTime(reportEndTime);
1245    if (this.infoServer != null) {
1246      serverLoad.setInfoServerPort(this.infoServer.getPort());
1247    } else {
1248      serverLoad.setInfoServerPort(-1);
1249    }
1250    MetricsUserAggregateSource userSource =
1251      metricsRegionServer.getMetricsUserAggregate().getSource();
1252    if (userSource != null) {
1253      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1254      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1255        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1256      }
1257    }
1258
1259    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1260      // always refresh first to get the latest value
1261      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1262      if (rLoad != null) {
1263        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1264        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1265          .getReplicationLoadSourceEntries()) {
1266          serverLoad.addReplLoadSource(rLS);
1267        }
1268      }
1269    } else {
1270      if (replicationSourceHandler != null) {
1271        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1272        if (rLoad != null) {
1273          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1274            .getReplicationLoadSourceEntries()) {
1275            serverLoad.addReplLoadSource(rLS);
1276          }
1277        }
1278      }
1279      if (replicationSinkHandler != null) {
1280        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1281        if (rLoad != null) {
1282          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1283        }
1284      }
1285    }
1286
1287    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1288      .newBuilder().setDescription(task.getDescription())
1289      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1290      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1291      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1292
1293    return serverLoad.build();
1294  }
1295
1296  private String getOnlineRegionsAsPrintableString() {
1297    StringBuilder sb = new StringBuilder();
1298    for (Region r : this.onlineRegions.values()) {
1299      if (sb.length() > 0) {
1300        sb.append(", ");
1301      }
1302      sb.append(r.getRegionInfo().getEncodedName());
1303    }
1304    return sb.toString();
1305  }
1306
1307  /**
1308   * Wait on regions close.
1309   */
1310  private void waitOnAllRegionsToClose(final boolean abort) {
1311    // Wait till all regions are closed before going out.
1312    int lastCount = -1;
1313    long previousLogTime = 0;
1314    Set<String> closedRegions = new HashSet<>();
1315    boolean interrupted = false;
1316    try {
1317      while (!onlineRegions.isEmpty()) {
1318        int count = getNumberOfOnlineRegions();
1319        // Only print a message if the count of regions has changed.
1320        if (count != lastCount) {
1321          // Log every second at most
1322          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1323            previousLogTime = EnvironmentEdgeManager.currentTime();
1324            lastCount = count;
1325            LOG.info("Waiting on " + count + " regions to close");
1326            // Only print out regions still closing if a small number else will
1327            // swamp the log.
1328            if (count < 10 && LOG.isDebugEnabled()) {
1329              LOG.debug("Online Regions=" + this.onlineRegions);
1330            }
1331          }
1332        }
1333        // Ensure all user regions have been sent a close. Use this to
1334        // protect against the case where an open comes in after we start the
1335        // iterator of onlineRegions to close all user regions.
1336        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1337          RegionInfo hri = e.getValue().getRegionInfo();
1338          if (
1339            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1340              && !closedRegions.contains(hri.getEncodedName())
1341          ) {
1342            closedRegions.add(hri.getEncodedName());
1343            // Don't update zk with this close transition; pass false.
1344            closeRegionIgnoreErrors(hri, abort);
1345          }
1346        }
1347        // No regions in RIT, we could stop waiting now.
1348        if (this.regionsInTransitionInRS.isEmpty()) {
1349          if (!onlineRegions.isEmpty()) {
1350            LOG.info("We were exiting though online regions are not empty,"
1351              + " because some regions failed closing");
1352          }
1353          break;
1354        } else {
1355          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1356            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1357        }
1358        if (sleepInterrupted(200)) {
1359          interrupted = true;
1360        }
1361      }
1362    } finally {
1363      if (interrupted) {
1364        Thread.currentThread().interrupt();
1365      }
1366    }
1367  }
1368
1369  private static boolean sleepInterrupted(long millis) {
1370    boolean interrupted = false;
1371    try {
1372      Thread.sleep(millis);
1373    } catch (InterruptedException e) {
1374      LOG.warn("Interrupted while sleeping");
1375      interrupted = true;
1376    }
1377    return interrupted;
1378  }
1379
1380  private void shutdownWAL(final boolean close) {
1381    if (this.walFactory != null) {
1382      try {
1383        if (close) {
1384          walFactory.close();
1385        } else {
1386          walFactory.shutdown();
1387        }
1388      } catch (Throwable e) {
1389        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1390        LOG.error("Shutdown / close of WAL failed: " + e);
1391        LOG.debug("Shutdown / close exception details:", e);
1392      }
1393    }
1394  }
1395
1396  /**
1397   * Run init. Sets up wal and starts up all server threads.
1398   * @param c Extra configuration.
1399   */
1400  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1401    throws IOException {
1402    try {
1403      boolean updateRootDir = false;
1404      for (NameStringPair e : c.getMapEntriesList()) {
1405        String key = e.getName();
1406        // The hostname the master sees us as.
1407        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1408          String hostnameFromMasterPOV = e.getValue();
1409          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1410            rpcServices.getSocketAddress().getPort(), this.startcode);
1411          String expectedHostName = rpcServices.getSocketAddress().getHostName();
1412          // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it
1413          // is set to disable. so we will use the ip of the RegionServer to compare with the
1414          // hostname passed by the Master, see HBASE-27304 for details.
1415          if (
1416            StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent()
1417              && InetAddresses.isInetAddress(getActiveMaster().get().getHostname())
1418          ) {
1419            expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress();
1420          }
1421          boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead)
1422            ? hostnameFromMasterPOV.equals(expectedHostName)
1423            : hostnameFromMasterPOV.equals(useThisHostnameInstead);
1424          if (!isHostnameConsist) {
1425            String msg = "Master passed us a different hostname to use; was="
1426              + (StringUtils.isBlank(useThisHostnameInstead)
1427                ? expectedHostName
1428                : this.useThisHostnameInstead)
1429              + ", but now=" + hostnameFromMasterPOV;
1430            LOG.error(msg);
1431            throw new IOException(msg);
1432          }
1433          continue;
1434        }
1435
1436        String value = e.getValue();
1437        if (key.equals(HConstants.HBASE_DIR)) {
1438          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1439            updateRootDir = true;
1440          }
1441        }
1442
1443        if (LOG.isDebugEnabled()) {
1444          LOG.debug("Config from master: " + key + "=" + value);
1445        }
1446        this.conf.set(key, value);
1447      }
1448      // Set our ephemeral znode up in zookeeper now we have a name.
1449      createMyEphemeralNode();
1450
1451      if (updateRootDir) {
1452        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1453        initializeFileSystem();
1454      }
1455
1456      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1457      // config param for task trackers, but we can piggyback off of it.
1458      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1459        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1460      }
1461
1462      // Save it in a file, this will allow to see if we crash
1463      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1464
1465      // This call sets up an initialized replication and WAL. Later we start it up.
1466      setupWALAndReplication();
1467      // Init in here rather than in constructor after thread name has been set
1468      final MetricsTable metricsTable =
1469        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1470      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1471      this.metricsRegionServer =
1472        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1473      // Now that we have a metrics source, start the pause monitor
1474      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1475      pauseMonitor.start();
1476
1477      // There is a rare case where we do NOT want services to start. Check config.
1478      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1479        startServices();
1480      }
1481      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1482      // or make sense of it.
1483      startReplicationService();
1484
1485      // Set up ZK
1486      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress()
1487        + ", sessionid=0x"
1488        + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1489
1490      // Wake up anyone waiting for this server to online
1491      synchronized (online) {
1492        online.set(true);
1493        online.notifyAll();
1494      }
1495    } catch (Throwable e) {
1496      stop("Failed initialization");
1497      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1498    } finally {
1499      sleeper.skipSleepCycle();
1500    }
1501  }
1502
1503  private void startHeapMemoryManager() {
1504    if (this.blockCache != null) {
1505      this.hMemManager =
1506        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1507      this.hMemManager.start(getChoreService());
1508    }
1509  }
1510
1511  private void createMyEphemeralNode() throws KeeperException {
1512    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1513    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1514    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1515    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1516    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1517  }
1518
1519  private void deleteMyEphemeralNode() throws KeeperException {
1520    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1521  }
1522
1523  @Override
1524  public RegionServerAccounting getRegionServerAccounting() {
1525    return regionServerAccounting;
1526  }
1527
1528  // Round the size with KB or MB.
1529  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1530  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1531  // HBASE-26340 for more details on why this is important.
1532  private static int roundSize(long sizeInByte, int sizeUnit) {
1533    if (sizeInByte == 0) {
1534      return 0;
1535    } else if (sizeInByte < sizeUnit) {
1536      return 1;
1537    } else {
1538      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1539    }
1540  }
1541
1542  private void computeIfPersistentBucketCache(Consumer<BucketCache> computation) {
1543    if (blockCache instanceof CombinedBlockCache) {
1544      BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache();
1545      if (l2 instanceof BucketCache && ((BucketCache) l2).isCachePersistent()) {
1546        computation.accept((BucketCache) l2);
1547      }
1548    }
1549  }
1550
1551  /**
1552   * @param r               Region to get RegionLoad for.
1553   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1554   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1555   * @return RegionLoad instance.
1556   */
1557  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1558    RegionSpecifier.Builder regionSpecifier) throws IOException {
1559    byte[] name = r.getRegionInfo().getRegionName();
1560    String regionEncodedName = r.getRegionInfo().getEncodedName();
1561    int stores = 0;
1562    int storefiles = 0;
1563    int storeRefCount = 0;
1564    int maxCompactedStoreFileRefCount = 0;
1565    long storeUncompressedSize = 0L;
1566    long storefileSize = 0L;
1567    long storefileIndexSize = 0L;
1568    long rootLevelIndexSize = 0L;
1569    long totalStaticIndexSize = 0L;
1570    long totalStaticBloomSize = 0L;
1571    long totalCompactingKVs = 0L;
1572    long currentCompactedKVs = 0L;
1573    long totalRegionSize = 0L;
1574    List<HStore> storeList = r.getStores();
1575    stores += storeList.size();
1576    for (HStore store : storeList) {
1577      storefiles += store.getStorefilesCount();
1578      int currentStoreRefCount = store.getStoreRefCount();
1579      storeRefCount += currentStoreRefCount;
1580      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1581      maxCompactedStoreFileRefCount =
1582        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1583      storeUncompressedSize += store.getStoreSizeUncompressed();
1584      storefileSize += store.getStorefilesSize();
1585      totalRegionSize += store.getHFilesSize();
1586      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1587      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1588      CompactionProgress progress = store.getCompactionProgress();
1589      if (progress != null) {
1590        totalCompactingKVs += progress.getTotalCompactingKVs();
1591        currentCompactedKVs += progress.currentCompactedKVs;
1592      }
1593      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1594      totalStaticIndexSize += store.getTotalStaticIndexSize();
1595      totalStaticBloomSize += store.getTotalStaticBloomSize();
1596    }
1597
1598    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1599    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1600    int storefileSizeMB = roundSize(storefileSize, unitMB);
1601    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1602    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1603    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1604    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1605    int regionSizeMB = roundSize(totalRegionSize, unitMB);
1606    final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f);
1607    getBlockCache().ifPresent(bc -> {
1608      bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1609        if (regionCachedInfo.containsKey(regionEncodedName)) {
1610          currentRegionCachedRatio.setValue(regionSizeMB == 0
1611            ? 0.0f
1612            : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB);
1613        }
1614      });
1615    });
1616
1617    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1618    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1619    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1620    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1621    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1622    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1623    if (regionLoadBldr == null) {
1624      regionLoadBldr = RegionLoad.newBuilder();
1625    }
1626    if (regionSpecifier == null) {
1627      regionSpecifier = RegionSpecifier.newBuilder();
1628    }
1629
1630    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1631    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1632    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1633      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1634      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1635      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1636      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1637      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1638      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1639      .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount())
1640      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1641      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1642      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1643      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1644      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1645      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1646      .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB)
1647      .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue());
1648    r.setCompleteSequenceId(regionLoadBldr);
1649    return regionLoadBldr.build();
1650  }
1651
1652  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1653    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1654    userLoadBldr.setUserName(user);
1655    userSource.getClientMetrics().values().stream()
1656      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1657        .setHostName(clientMetrics.getHostName())
1658        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1659        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1660        .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1661      .forEach(userLoadBldr::addClientMetrics);
1662    return userLoadBldr.build();
1663  }
1664
1665  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1666    HRegion r = onlineRegions.get(encodedRegionName);
1667    return r != null ? createRegionLoad(r, null, null) : null;
1668  }
1669
1670  /**
1671   * Inner class that runs on a long period checking if regions need compaction.
1672   */
1673  private static class CompactionChecker extends ScheduledChore {
1674    private final HRegionServer instance;
1675    private final int majorCompactPriority;
1676    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1677    // Iteration is 1-based rather than 0-based so we don't check for compaction
1678    // immediately upon region server startup
1679    private long iteration = 1;
1680
1681    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1682      super("CompactionChecker", stopper, sleepTime);
1683      this.instance = h;
1684      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1685
1686      /*
1687       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1688       */
1689      this.majorCompactPriority = this.instance.conf
1690        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1691    }
1692
1693    @Override
1694    protected void chore() {
1695      for (HRegion hr : this.instance.onlineRegions.values()) {
1696        // If region is read only or compaction is disabled at table level, there's no need to
1697        // iterate through region's stores
1698        if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {
1699          continue;
1700        }
1701
1702        for (HStore s : hr.stores.values()) {
1703          try {
1704            long multiplier = s.getCompactionCheckMultiplier();
1705            assert multiplier > 0;
1706            if (iteration % multiplier != 0) {
1707              continue;
1708            }
1709            if (s.needsCompaction()) {
1710              // Queue a compaction. Will recognize if major is needed.
1711              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1712                getName() + " requests compaction");
1713            } else if (s.shouldPerformMajorCompaction()) {
1714              s.triggerMajorCompaction();
1715              if (
1716                majorCompactPriority == DEFAULT_PRIORITY
1717                  || majorCompactPriority > hr.getCompactPriority()
1718              ) {
1719                this.instance.compactSplitThread.requestCompaction(hr, s,
1720                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
1721                  CompactionLifeCycleTracker.DUMMY, null);
1722              } else {
1723                this.instance.compactSplitThread.requestCompaction(hr, s,
1724                  getName() + " requests major compaction; use configured priority",
1725                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1726              }
1727            }
1728          } catch (IOException e) {
1729            LOG.warn("Failed major compaction check on " + hr, e);
1730          }
1731        }
1732      }
1733      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1734    }
1735  }
1736
1737  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1738    private final HRegionServer server;
1739    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1740    private final static int MIN_DELAY_TIME = 0; // millisec
1741    private final long rangeOfDelayMs;
1742
1743    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1744      super("MemstoreFlusherChore", server, cacheFlushInterval);
1745      this.server = server;
1746
1747      final long configuredRangeOfDelay = server.getConfiguration()
1748        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1749      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1750    }
1751
1752    @Override
1753    protected void chore() {
1754      final StringBuilder whyFlush = new StringBuilder();
1755      for (HRegion r : this.server.onlineRegions.values()) {
1756        if (r == null) {
1757          continue;
1758        }
1759        if (r.shouldFlush(whyFlush)) {
1760          FlushRequester requester = server.getFlushRequester();
1761          if (requester != null) {
1762            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1763            // Throttle the flushes by putting a delay. If we don't throttle, and there
1764            // is a balanced write-load on the regions in a table, we might end up
1765            // overwhelming the filesystem with too many flushes at once.
1766            if (requester.requestDelayedFlush(r, delay)) {
1767              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
1768                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
1769            }
1770          }
1771        }
1772      }
1773    }
1774  }
1775
1776  /**
1777   * Report the status of the server. A server is online once all the startup is completed (setting
1778   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
1779   * useful in tests.
1780   * @return true if online, false if not.
1781   */
1782  public boolean isOnline() {
1783    return online.get();
1784  }
1785
1786  /**
1787   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1788   * be hooked up to WAL.
1789   */
1790  private void setupWALAndReplication() throws IOException {
1791    WALFactory factory = new WALFactory(conf, serverName, this);
1792    // TODO Replication make assumptions here based on the default filesystem impl
1793    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1794    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1795
1796    Path logDir = new Path(walRootDir, logName);
1797    LOG.debug("logDir={}", logDir);
1798    if (this.walFs.exists(logDir)) {
1799      throw new RegionServerRunningException(
1800        "Region server has already created directory at " + this.serverName.toString());
1801    }
1802    // Always create wal directory as now we need this when master restarts to find out the live
1803    // region servers.
1804    if (!this.walFs.mkdirs(logDir)) {
1805      throw new IOException("Can not create wal directory " + logDir);
1806    }
1807    // Instantiate replication if replication enabled. Pass it the log directories.
1808    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1809
1810    WALActionsListener walEventListener = getWALEventTrackerListener(conf);
1811    if (walEventListener != null && factory.getWALProvider() != null) {
1812      factory.getWALProvider().addWALActionsListener(walEventListener);
1813    }
1814    this.walFactory = factory;
1815  }
1816
1817  private WALActionsListener getWALEventTrackerListener(Configuration conf) {
1818    if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
1819      WALEventTrackerListener listener =
1820        new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
1821      return listener;
1822    }
1823    return null;
1824  }
1825
1826  /**
1827   * Start up replication source and sink handlers.
1828   */
1829  private void startReplicationService() throws IOException {
1830    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1831      this.replicationSourceHandler.startReplicationService();
1832    } else {
1833      if (this.replicationSourceHandler != null) {
1834        this.replicationSourceHandler.startReplicationService();
1835      }
1836      if (this.replicationSinkHandler != null) {
1837        this.replicationSinkHandler.startReplicationService();
1838      }
1839    }
1840  }
1841
1842  /** Returns Master address tracker instance. */
1843  public MasterAddressTracker getMasterAddressTracker() {
1844    return this.masterAddressTracker;
1845  }
1846
1847  /**
1848   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
1849   * to run. This is called after we've successfully registered with the Master. Install an
1850   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
1851   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
1852   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
1853   * to run should trigger same critical condition and the shutdown will run. On its way out, this
1854   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
1855   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
1856   * by this hosting server. Worker logs the exception and exits.
1857   */
1858  private void startServices() throws IOException {
1859    if (!isStopped() && !isAborted()) {
1860      initializeThreads();
1861    }
1862    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1863    this.secureBulkLoadManager.start();
1864
1865    // Health checker thread.
1866    if (isHealthCheckerConfigured()) {
1867      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1868        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1869      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1870    }
1871    // Executor status collect thread.
1872    if (
1873      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1874        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
1875    ) {
1876      int sleepTime =
1877        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1878      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1879        this.metricsRegionServer.getMetricsSource());
1880    }
1881
1882    this.walRoller = new LogRoller(this);
1883    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1884    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1885
1886    // Create the CompactedFileDischarger chore executorService. This chore helps to
1887    // remove the compacted files that will no longer be used in reads.
1888    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1889    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1890    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1891    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
1892    choreService.scheduleChore(compactedFileDischarger);
1893
1894    // Start executor services
1895    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1896    executorService.startExecutorService(executorService.new ExecutorConfig()
1897      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1898    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1899    executorService.startExecutorService(executorService.new ExecutorConfig()
1900      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1901    final int openPriorityRegionThreads =
1902      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1903    executorService.startExecutorService(
1904      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
1905        .setCorePoolSize(openPriorityRegionThreads));
1906    final int closeRegionThreads =
1907      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1908    executorService.startExecutorService(executorService.new ExecutorConfig()
1909      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1910    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1911    executorService.startExecutorService(executorService.new ExecutorConfig()
1912      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1913    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1914      final int storeScannerParallelSeekThreads =
1915        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1916      executorService.startExecutorService(
1917        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
1918          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
1919    }
1920    final int logReplayOpsThreads =
1921      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1922    executorService.startExecutorService(
1923      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
1924        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
1925    // Start the threads for compacted files discharger
1926    final int compactionDischargerThreads =
1927      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1928    executorService.startExecutorService(executorService.new ExecutorConfig()
1929      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
1930      .setCorePoolSize(compactionDischargerThreads));
1931    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1932      final int regionReplicaFlushThreads =
1933        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1934          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1935      executorService.startExecutorService(executorService.new ExecutorConfig()
1936        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
1937        .setCorePoolSize(regionReplicaFlushThreads));
1938    }
1939    final int refreshPeerThreads =
1940      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1941    executorService.startExecutorService(executorService.new ExecutorConfig()
1942      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1943    final int replaySyncReplicationWALThreads =
1944      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1945    executorService.startExecutorService(executorService.new ExecutorConfig()
1946      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
1947      .setCorePoolSize(replaySyncReplicationWALThreads));
1948    final int switchRpcThrottleThreads =
1949      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1950    executorService.startExecutorService(
1951      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
1952        .setCorePoolSize(switchRpcThrottleThreads));
1953    final int claimReplicationQueueThreads =
1954      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1955    executorService.startExecutorService(
1956      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
1957        .setCorePoolSize(claimReplicationQueueThreads));
1958    final int rsSnapshotOperationThreads =
1959      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1960    executorService.startExecutorService(
1961      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
1962        .setCorePoolSize(rsSnapshotOperationThreads));
1963    final int rsFlushOperationThreads =
1964      conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3);
1965    executorService.startExecutorService(executorService.new ExecutorConfig()
1966      .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads));
1967    final int rsRefreshQuotasThreads =
1968      conf.getInt("hbase.regionserver.executor.refresh.quotas.threads", 1);
1969    executorService.startExecutorService(
1970      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS)
1971        .setCorePoolSize(rsRefreshQuotasThreads));
1972    final int logRollThreads = conf.getInt("hbase.regionserver.executor.log.roll.threads", 1);
1973    executorService.startExecutorService(executorService.new ExecutorConfig()
1974      .setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads));
1975
1976    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1977      uncaughtExceptionHandler);
1978    if (this.cacheFlusher != null) {
1979      this.cacheFlusher.start(uncaughtExceptionHandler);
1980    }
1981    Threads.setDaemonThreadRunning(this.procedureResultReporter,
1982      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1983
1984    if (this.compactionChecker != null) {
1985      choreService.scheduleChore(compactionChecker);
1986    }
1987    if (this.periodicFlusher != null) {
1988      choreService.scheduleChore(periodicFlusher);
1989    }
1990    if (this.healthCheckChore != null) {
1991      choreService.scheduleChore(healthCheckChore);
1992    }
1993    if (this.executorStatusChore != null) {
1994      choreService.scheduleChore(executorStatusChore);
1995    }
1996    if (this.nonceManagerChore != null) {
1997      choreService.scheduleChore(nonceManagerChore);
1998    }
1999    if (this.storefileRefresher != null) {
2000      choreService.scheduleChore(storefileRefresher);
2001    }
2002    if (this.fsUtilizationChore != null) {
2003      choreService.scheduleChore(fsUtilizationChore);
2004    }
2005    if (this.namedQueueServiceChore != null) {
2006      choreService.scheduleChore(namedQueueServiceChore);
2007    }
2008    if (this.brokenStoreFileCleaner != null) {
2009      choreService.scheduleChore(brokenStoreFileCleaner);
2010    }
2011    if (this.rsMobFileCleanerChore != null) {
2012      choreService.scheduleChore(rsMobFileCleanerChore);
2013    }
2014    if (replicationMarkerChore != null) {
2015      LOG.info("Starting replication marker chore");
2016      choreService.scheduleChore(replicationMarkerChore);
2017    }
2018
2019    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2020    // an unhandled exception, it will just exit.
2021    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2022      uncaughtExceptionHandler);
2023
2024    // Create the log splitting worker and start it
2025    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2026    // quite a while inside Connection layer. The worker won't be available for other
2027    // tasks even after current task is preempted after a split task times out.
2028    Configuration sinkConf = HBaseConfiguration.create(conf);
2029    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2030      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2031    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2032      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2033    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2034    if (
2035      this.csm != null
2036        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
2037    ) {
2038      // SplitLogWorker needs csm. If none, don't start this.
2039      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
2040      splitLogWorker.start();
2041      LOG.debug("SplitLogWorker started");
2042    }
2043
2044    // Memstore services.
2045    startHeapMemoryManager();
2046    // Call it after starting HeapMemoryManager.
2047    initializeMemStoreChunkCreator(hMemManager);
2048  }
2049
2050  private void initializeThreads() {
2051    // Cache flushing thread.
2052    this.cacheFlusher = new MemStoreFlusher(conf, this);
2053
2054    // Compaction thread
2055    this.compactSplitThread = new CompactSplit(this);
2056
2057    // Prefetch Notifier
2058    this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
2059
2060    // Background thread to check for compactions; needed if region has not gotten updates
2061    // in a while. It will take care of not checking too frequently on store-by-store basis.
2062    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2063    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2064    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2065
2066    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2067      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2068    final boolean walEventTrackerEnabled =
2069      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
2070
2071    if (isSlowLogTableEnabled || walEventTrackerEnabled) {
2072      // default chore duration: 10 min
2073      // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
2074      final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
2075        DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
2076
2077      final int namedQueueChoreDuration =
2078        conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
2079      // Considering min of slowLogChoreDuration and namedQueueChoreDuration
2080      int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
2081
2082      namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
2083        this.namedQueueRecorder, this.getConnection());
2084    }
2085
2086    if (this.nonceManager != null) {
2087      // Create the scheduled chore that cleans up nonces.
2088      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2089    }
2090
2091    // Setup the Quota Manager
2092    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2093    configurationManager.registerObserver(rsQuotaManager);
2094    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2095
2096    if (QuotaUtil.isQuotaEnabled(conf)) {
2097      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2098    }
2099
2100    boolean onlyMetaRefresh = false;
2101    int storefileRefreshPeriod =
2102      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2103        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2104    if (storefileRefreshPeriod == 0) {
2105      storefileRefreshPeriod =
2106        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2107          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2108      onlyMetaRefresh = true;
2109    }
2110    if (storefileRefreshPeriod > 0) {
2111      this.storefileRefresher =
2112        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
2113    }
2114
2115    int brokenStoreFileCleanerPeriod =
2116      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2117        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2118    int brokenStoreFileCleanerDelay =
2119      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2120        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2121    double brokenStoreFileCleanerDelayJitter =
2122      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2123        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2124    double jitterRate =
2125      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2126    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2127    this.brokenStoreFileCleaner =
2128      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2129        brokenStoreFileCleanerPeriod, this, conf, this);
2130
2131    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
2132
2133    registerConfigurationObservers();
2134    initializeReplicationMarkerChore();
2135  }
2136
2137  private void registerConfigurationObservers() {
2138    // Register Replication if possible, as now we support recreating replication peer storage, for
2139    // migrating across different replication peer storages online
2140    if (replicationSourceHandler instanceof ConfigurationObserver) {
2141      configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
2142    }
2143    if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
2144      configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
2145    }
2146    // Registering the compactSplitThread object with the ConfigurationManager.
2147    configurationManager.registerObserver(this.compactSplitThread);
2148    configurationManager.registerObserver(this.cacheFlusher);
2149    configurationManager.registerObserver(this.rpcServices);
2150    configurationManager.registerObserver(this.prefetchExecutorNotifier);
2151    configurationManager.registerObserver(this);
2152  }
2153
2154  /*
2155   * Verify that server is healthy
2156   */
2157  private boolean isHealthy() {
2158    if (!dataFsOk) {
2159      // File system problem
2160      return false;
2161    }
2162    // Verify that all threads are alive
2163    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2164      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2165      && (this.walRoller == null || this.walRoller.isAlive())
2166      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2167      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2168    if (!healthy) {
2169      stop("One or more threads are no longer alive -- stop");
2170    }
2171    return healthy;
2172  }
2173
2174  @Override
2175  public List<WAL> getWALs() {
2176    return walFactory.getWALs();
2177  }
2178
2179  @Override
2180  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2181    WAL wal = walFactory.getWAL(regionInfo);
2182    if (this.walRoller != null) {
2183      this.walRoller.addWAL(wal);
2184    }
2185    return wal;
2186  }
2187
2188  public LogRoller getWalRoller() {
2189    return walRoller;
2190  }
2191
2192  public WALFactory getWalFactory() {
2193    return walFactory;
2194  }
2195
2196  @Override
2197  public void stop(final String msg) {
2198    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2199  }
2200
2201  /**
2202   * Stops the regionserver.
2203   * @param msg   Status message
2204   * @param force True if this is a regionserver abort
2205   * @param user  The user executing the stop request, or null if no user is associated
2206   */
2207  public void stop(final String msg, final boolean force, final User user) {
2208    if (!this.stopped) {
2209      LOG.info("***** STOPPING region server '{}' *****", this);
2210      if (this.rsHost != null) {
2211        // when forced via abort don't allow CPs to override
2212        try {
2213          this.rsHost.preStop(msg, user);
2214        } catch (IOException ioe) {
2215          if (!force) {
2216            LOG.warn("The region server did not stop", ioe);
2217            return;
2218          }
2219          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2220        }
2221      }
2222      this.stopped = true;
2223      LOG.info("STOPPED: " + msg);
2224      // Wakes run() if it is sleeping
2225      sleeper.skipSleepCycle();
2226    }
2227  }
2228
2229  public void waitForServerOnline() {
2230    while (!isStopped() && !isOnline()) {
2231      synchronized (online) {
2232        try {
2233          online.wait(msgInterval);
2234        } catch (InterruptedException ie) {
2235          Thread.currentThread().interrupt();
2236          break;
2237        }
2238      }
2239    }
2240  }
2241
2242  @Override
2243  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2244    HRegion r = context.getRegion();
2245    long openProcId = context.getOpenProcId();
2246    long masterSystemTime = context.getMasterSystemTime();
2247    long initiatingMasterActiveTime = context.getInitiatingMasterActiveTime();
2248    rpcServices.checkOpen();
2249    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2250      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2251    // Do checks to see if we need to compact (references or too many files)
2252    // Skip compaction check if region is read only
2253    if (!r.isReadOnly()) {
2254      for (HStore s : r.stores.values()) {
2255        if (s.hasReferences() || s.needsCompaction()) {
2256          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2257        }
2258      }
2259    }
2260    long openSeqNum = r.getOpenSeqNum();
2261    if (openSeqNum == HConstants.NO_SEQNUM) {
2262      // If we opened a region, we should have read some sequence number from it.
2263      LOG.error(
2264        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2265      openSeqNum = 0;
2266    }
2267
2268    // Notify master
2269    if (
2270      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2271        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo(), initiatingMasterActiveTime))
2272    ) {
2273      throw new IOException(
2274        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2275    }
2276
2277    triggerFlushInPrimaryRegion(r);
2278
2279    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2280  }
2281
2282  /**
2283   * Helper method for use in tests. Skip the region transition report when there's no master around
2284   * to receive it.
2285   */
2286  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2287    final TransitionCode code = context.getCode();
2288    final long openSeqNum = context.getOpenSeqNum();
2289    long masterSystemTime = context.getMasterSystemTime();
2290    final RegionInfo[] hris = context.getHris();
2291
2292    if (code == TransitionCode.OPENED) {
2293      Preconditions.checkArgument(hris != null && hris.length == 1);
2294      if (hris[0].isMetaRegion()) {
2295        LOG.warn(
2296          "meta table location is stored in master local store, so we can not skip reporting");
2297        return false;
2298      } else {
2299        try {
2300          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2301            serverName, openSeqNum, masterSystemTime);
2302        } catch (IOException e) {
2303          LOG.info("Failed to update meta", e);
2304          return false;
2305        }
2306      }
2307    }
2308    return true;
2309  }
2310
2311  private ReportRegionStateTransitionRequest
2312    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2313    final TransitionCode code = context.getCode();
2314    final long openSeqNum = context.getOpenSeqNum();
2315    final RegionInfo[] hris = context.getHris();
2316    final long[] procIds = context.getProcIds();
2317
2318    ReportRegionStateTransitionRequest.Builder builder =
2319      ReportRegionStateTransitionRequest.newBuilder();
2320    builder.setServer(ProtobufUtil.toServerName(serverName));
2321    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2322    transition.setTransitionCode(code);
2323    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2324      transition.setOpenSeqNum(openSeqNum);
2325    }
2326    for (RegionInfo hri : hris) {
2327      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2328    }
2329    for (long procId : procIds) {
2330      transition.addProcId(procId);
2331    }
2332    transition.setInitiatingMasterActiveTime(context.getInitiatingMasterActiveTime());
2333
2334    return builder.build();
2335  }
2336
2337  @Override
2338  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2339    if (TEST_SKIP_REPORTING_TRANSITION) {
2340      return skipReportingTransition(context);
2341    }
2342    final ReportRegionStateTransitionRequest request =
2343      createReportRegionStateTransitionRequest(context);
2344
2345    int tries = 0;
2346    long pauseTime = this.retryPauseTime;
2347    // Keep looping till we get an error. We want to send reports even though server is going down.
2348    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2349    // HRegionServer does down.
2350    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2351      RegionServerStatusService.BlockingInterface rss = rssStub;
2352      try {
2353        if (rss == null) {
2354          createRegionServerStatusStub();
2355          continue;
2356        }
2357        ReportRegionStateTransitionResponse response =
2358          rss.reportRegionStateTransition(null, request);
2359        if (response.hasErrorMessage()) {
2360          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2361          break;
2362        }
2363        // Log if we had to retry else don't log unless TRACE. We want to
2364        // know if were successful after an attempt showed in logs as failed.
2365        if (tries > 0 || LOG.isTraceEnabled()) {
2366          LOG.info("TRANSITION REPORTED " + request);
2367        }
2368        // NOTE: Return mid-method!!!
2369        return true;
2370      } catch (ServiceException se) {
2371        IOException ioe = ProtobufUtil.getRemoteException(se);
2372        boolean pause = ioe instanceof ServerNotRunningYetException
2373          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2374        if (pause) {
2375          // Do backoff else we flood the Master with requests.
2376          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2377        } else {
2378          pauseTime = this.retryPauseTime; // Reset.
2379        }
2380        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2381          + tries + ")"
2382          + (pause
2383            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2384            : " immediately."),
2385          ioe);
2386        if (pause) {
2387          Threads.sleep(pauseTime);
2388        }
2389        tries++;
2390        if (rssStub == rss) {
2391          rssStub = null;
2392        }
2393      }
2394    }
2395    return false;
2396  }
2397
2398  /**
2399   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2400   * block this thread. See RegionReplicaFlushHandler for details.
2401   */
2402  private void triggerFlushInPrimaryRegion(final HRegion region) {
2403    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2404      return;
2405    }
2406    TableName tn = region.getTableDescriptor().getTableName();
2407    if (
2408      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2409        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2410        // If the memstore replication not setup, we do not have to wait for observing a flush event
2411        // from primary before starting to serve reads, because gaps from replication is not
2412        // applicable,this logic is from
2413        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2414        // HBASE-13063
2415        !region.getTableDescriptor().hasRegionMemStoreReplication()
2416    ) {
2417      region.setReadsEnabled(true);
2418      return;
2419    }
2420
2421    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2422    // RegionReplicaFlushHandler might reset this.
2423
2424    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2425    if (this.executorService != null) {
2426      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2427    } else {
2428      LOG.info("Executor is null; not running flush of primary region replica for {}",
2429        region.getRegionInfo());
2430    }
2431  }
2432
2433  @InterfaceAudience.Private
2434  public RSRpcServices getRSRpcServices() {
2435    return rpcServices;
2436  }
2437
2438  /**
2439   * Cause the server to exit without closing the regions it is serving, the log it is using and
2440   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2441   * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused
2442   * the abort, or null
2443   */
2444  @Override
2445  public void abort(String reason, Throwable cause) {
2446    if (!setAbortRequested()) {
2447      // Abort already in progress, ignore the new request.
2448      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2449      return;
2450    }
2451    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2452    if (cause != null) {
2453      LOG.error(HBaseMarkers.FATAL, msg, cause);
2454    } else {
2455      LOG.error(HBaseMarkers.FATAL, msg);
2456    }
2457    // HBASE-4014: show list of coprocessors that were loaded to help debug
2458    // regionserver crashes.Note that we're implicitly using
2459    // java.util.HashSet's toString() method to print the coprocessor names.
2460    LOG.error(HBaseMarkers.FATAL,
2461      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2462    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2463    try {
2464      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2465    } catch (MalformedObjectNameException | IOException e) {
2466      LOG.warn("Failed dumping metrics", e);
2467    }
2468
2469    // Do our best to report our abort to the master, but this may not work
2470    try {
2471      if (cause != null) {
2472        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2473      }
2474      // Report to the master but only if we have already registered with the master.
2475      RegionServerStatusService.BlockingInterface rss = rssStub;
2476      if (rss != null && this.serverName != null) {
2477        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2478        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2479        builder.setErrorMessage(msg);
2480        rss.reportRSFatalError(null, builder.build());
2481      }
2482    } catch (Throwable t) {
2483      LOG.warn("Unable to report fatal error to master", t);
2484    }
2485
2486    scheduleAbortTimer();
2487    // shutdown should be run as the internal user
2488    stop(reason, true, null);
2489  }
2490
2491  /*
2492   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2493   * close socket in case want to bring up server on old hostname+port immediately.
2494   */
2495  @InterfaceAudience.Private
2496  protected void kill() {
2497    this.killed = true;
2498    abort("Simulated kill");
2499  }
2500
2501  // Limits the time spent in the shutdown process.
2502  private void scheduleAbortTimer() {
2503    if (this.abortMonitor == null) {
2504      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2505      TimerTask abortTimeoutTask = null;
2506      try {
2507        Constructor<? extends TimerTask> timerTaskCtor =
2508          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2509            .asSubclass(TimerTask.class).getDeclaredConstructor();
2510        timerTaskCtor.setAccessible(true);
2511        abortTimeoutTask = timerTaskCtor.newInstance();
2512      } catch (Exception e) {
2513        LOG.warn("Initialize abort timeout task failed", e);
2514      }
2515      if (abortTimeoutTask != null) {
2516        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2517      }
2518    }
2519  }
2520
2521  /**
2522   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2523   * called.
2524   */
2525  protected void stopServiceThreads() {
2526    // clean up the scheduled chores
2527    stopChoreService();
2528    if (bootstrapNodeManager != null) {
2529      bootstrapNodeManager.stop();
2530    }
2531    if (this.cacheFlusher != null) {
2532      this.cacheFlusher.shutdown();
2533    }
2534    if (this.walRoller != null) {
2535      this.walRoller.close();
2536    }
2537    if (this.compactSplitThread != null) {
2538      this.compactSplitThread.join();
2539    }
2540    stopExecutorService();
2541    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2542      this.replicationSourceHandler.stopReplicationService();
2543    } else {
2544      if (this.replicationSourceHandler != null) {
2545        this.replicationSourceHandler.stopReplicationService();
2546      }
2547      if (this.replicationSinkHandler != null) {
2548        this.replicationSinkHandler.stopReplicationService();
2549      }
2550    }
2551  }
2552
2553  /** Returns Return the object that implements the replication source executorService. */
2554  @Override
2555  public ReplicationSourceService getReplicationSourceService() {
2556    return replicationSourceHandler;
2557  }
2558
2559  /** Returns Return the object that implements the replication sink executorService. */
2560  public ReplicationSinkService getReplicationSinkService() {
2561    return replicationSinkHandler;
2562  }
2563
2564  /**
2565   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2566   * connection, the current rssStub must be null. Method will block until a master is available.
2567   * You can break from this block by requesting the server stop.
2568   * @return master + port, or null if server has been stopped
2569   */
2570  private synchronized ServerName createRegionServerStatusStub() {
2571    // Create RS stub without refreshing the master node from ZK, use cached data
2572    return createRegionServerStatusStub(false);
2573  }
2574
2575  /**
2576   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2577   * connection, the current rssStub must be null. Method will block until a master is available.
2578   * You can break from this block by requesting the server stop.
2579   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2580   * @return master + port, or null if server has been stopped
2581   */
2582  @InterfaceAudience.Private
2583  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2584    if (rssStub != null) {
2585      return masterAddressTracker.getMasterAddress();
2586    }
2587    ServerName sn = null;
2588    long previousLogTime = 0;
2589    RegionServerStatusService.BlockingInterface intRssStub = null;
2590    LockService.BlockingInterface intLockStub = null;
2591    boolean interrupted = false;
2592    try {
2593      while (keepLooping()) {
2594        sn = this.masterAddressTracker.getMasterAddress(refresh);
2595        if (sn == null) {
2596          if (!keepLooping()) {
2597            // give up with no connection.
2598            LOG.debug("No master found and cluster is stopped; bailing out");
2599            return null;
2600          }
2601          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2602            LOG.debug("No master found; retry");
2603            previousLogTime = EnvironmentEdgeManager.currentTime();
2604          }
2605          refresh = true; // let's try pull it from ZK directly
2606          if (sleepInterrupted(200)) {
2607            interrupted = true;
2608          }
2609          continue;
2610        }
2611        try {
2612          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
2613            userProvider.getCurrent(), shortOperationTimeout);
2614          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2615          intLockStub = LockService.newBlockingStub(channel);
2616          break;
2617        } catch (IOException e) {
2618          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2619            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
2620            if (e instanceof ServerNotRunningYetException) {
2621              LOG.info("Master isn't available yet, retrying");
2622            } else {
2623              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2624            }
2625            previousLogTime = EnvironmentEdgeManager.currentTime();
2626          }
2627          if (sleepInterrupted(200)) {
2628            interrupted = true;
2629          }
2630        }
2631      }
2632    } finally {
2633      if (interrupted) {
2634        Thread.currentThread().interrupt();
2635      }
2636    }
2637    this.rssStub = intRssStub;
2638    this.lockStub = intLockStub;
2639    return sn;
2640  }
2641
2642  /**
2643   * @return True if we should break loop because cluster is going down or this server has been
2644   *         stopped or hdfs has gone bad.
2645   */
2646  private boolean keepLooping() {
2647    return !this.stopped && isClusterUp();
2648  }
2649
2650  /*
2651   * Let the master know we're here Run initialization using parameters passed us by the master.
2652   * @return A Map of key/value configurations we got from the Master else null if we failed to
2653   * register.
2654   */
2655  private RegionServerStartupResponse reportForDuty() throws IOException {
2656    if (this.masterless) {
2657      return RegionServerStartupResponse.getDefaultInstance();
2658    }
2659    ServerName masterServerName = createRegionServerStatusStub(true);
2660    RegionServerStatusService.BlockingInterface rss = rssStub;
2661    if (masterServerName == null || rss == null) {
2662      return null;
2663    }
2664    RegionServerStartupResponse result = null;
2665    try {
2666      rpcServices.requestCount.reset();
2667      rpcServices.rpcGetRequestCount.reset();
2668      rpcServices.rpcScanRequestCount.reset();
2669      rpcServices.rpcFullScanRequestCount.reset();
2670      rpcServices.rpcMultiRequestCount.reset();
2671      rpcServices.rpcMutateRequestCount.reset();
2672      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2673        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2674      long now = EnvironmentEdgeManager.currentTime();
2675      int port = rpcServices.getSocketAddress().getPort();
2676      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2677      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2678        request.setUseThisHostnameInstead(useThisHostnameInstead);
2679      }
2680      request.setPort(port);
2681      request.setServerStartCode(this.startcode);
2682      request.setServerCurrentTime(now);
2683      result = rss.regionServerStartup(null, request.build());
2684    } catch (ServiceException se) {
2685      IOException ioe = ProtobufUtil.getRemoteException(se);
2686      if (ioe instanceof ClockOutOfSyncException) {
2687        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
2688        // Re-throw IOE will cause RS to abort
2689        throw ioe;
2690      } else if (ioe instanceof DecommissionedHostRejectedException) {
2691        LOG.error(HBaseMarkers.FATAL,
2692          "Master rejected startup because the host is considered decommissioned", ioe);
2693        // Re-throw IOE will cause RS to abort
2694        throw ioe;
2695      } else if (ioe instanceof ServerNotRunningYetException) {
2696        LOG.debug("Master is not running yet");
2697      } else {
2698        LOG.warn("error telling master we are up", se);
2699      }
2700      rssStub = null;
2701    }
2702    return result;
2703  }
2704
2705  @Override
2706  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2707    try {
2708      GetLastFlushedSequenceIdRequest req =
2709        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2710      RegionServerStatusService.BlockingInterface rss = rssStub;
2711      if (rss == null) { // Try to connect one more time
2712        createRegionServerStatusStub();
2713        rss = rssStub;
2714        if (rss == null) {
2715          // Still no luck, we tried
2716          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2717          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2718            .build();
2719        }
2720      }
2721      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2722      return RegionStoreSequenceIds.newBuilder()
2723        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2724        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2725    } catch (ServiceException e) {
2726      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2727      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2728        .build();
2729    }
2730  }
2731
2732  /**
2733   * Close meta region if we carry it
2734   * @param abort Whether we're running an abort.
2735   */
2736  private void closeMetaTableRegions(final boolean abort) {
2737    HRegion meta = null;
2738    this.onlineRegionsLock.writeLock().lock();
2739    try {
2740      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
2741        RegionInfo hri = e.getValue().getRegionInfo();
2742        if (hri.isMetaRegion()) {
2743          meta = e.getValue();
2744        }
2745        if (meta != null) {
2746          break;
2747        }
2748      }
2749    } finally {
2750      this.onlineRegionsLock.writeLock().unlock();
2751    }
2752    if (meta != null) {
2753      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2754    }
2755  }
2756
2757  /**
2758   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
2759   * close regions that are already closed or that are closing.
2760   * @param abort Whether we're running an abort.
2761   */
2762  private void closeUserRegions(final boolean abort) {
2763    this.onlineRegionsLock.writeLock().lock();
2764    try {
2765      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
2766        HRegion r = e.getValue();
2767        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2768          // Don't update zk with this close transition; pass false.
2769          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2770        }
2771      }
2772    } finally {
2773      this.onlineRegionsLock.writeLock().unlock();
2774    }
2775  }
2776
2777  protected Map<String, HRegion> getOnlineRegions() {
2778    return this.onlineRegions;
2779  }
2780
2781  public int getNumberOfOnlineRegions() {
2782    return this.onlineRegions.size();
2783  }
2784
2785  /**
2786   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
2787   * as client; HRegion cannot be serialized to cross an rpc.
2788   */
2789  public Collection<HRegion> getOnlineRegionsLocalContext() {
2790    Collection<HRegion> regions = this.onlineRegions.values();
2791    return Collections.unmodifiableCollection(regions);
2792  }
2793
2794  @Override
2795  public void addRegion(HRegion region) {
2796    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2797    configurationManager.registerObserver(region);
2798  }
2799
2800  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2801    long size) {
2802    if (!sortedRegions.containsKey(size)) {
2803      sortedRegions.put(size, new ArrayList<>());
2804    }
2805    sortedRegions.get(size).add(region);
2806  }
2807
2808  /**
2809   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2810   *         the biggest.
2811   */
2812  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2813    // we'll sort the regions in reverse
2814    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2815    // Copy over all regions. Regions are sorted by size with biggest first.
2816    for (HRegion region : this.onlineRegions.values()) {
2817      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2818    }
2819    return sortedRegions;
2820  }
2821
2822  /**
2823   * @return A new Map of online regions sorted by region heap size with the first entry being the
2824   *         biggest.
2825   */
2826  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2827    // we'll sort the regions in reverse
2828    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2829    // Copy over all regions. Regions are sorted by size with biggest first.
2830    for (HRegion region : this.onlineRegions.values()) {
2831      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2832    }
2833    return sortedRegions;
2834  }
2835
2836  /** Returns reference to FlushRequester */
2837  @Override
2838  public FlushRequester getFlushRequester() {
2839    return this.cacheFlusher;
2840  }
2841
2842  @Override
2843  public CompactionRequester getCompactionRequestor() {
2844    return this.compactSplitThread;
2845  }
2846
2847  @Override
2848  public LeaseManager getLeaseManager() {
2849    return leaseManager;
2850  }
2851
2852  /** Returns {@code true} when the data file system is available, {@code false} otherwise. */
2853  boolean isDataFileSystemOk() {
2854    return this.dataFsOk;
2855  }
2856
2857  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
2858    return this.rsHost;
2859  }
2860
2861  @Override
2862  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2863    return this.regionsInTransitionInRS;
2864  }
2865
2866  @Override
2867  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2868    return rsQuotaManager;
2869  }
2870
2871  //
2872  // Main program and support routines
2873  //
2874  /**
2875   * Load the replication executorService objects, if any
2876   */
2877  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2878    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2879    // read in the name of the source replication class from the config file.
2880    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2881      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2882
2883    // read in the name of the sink replication class from the config file.
2884    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2885      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2886
2887    // If both the sink and the source class names are the same, then instantiate
2888    // only one object.
2889    if (sourceClassname.equals(sinkClassname)) {
2890      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2891        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2892      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2893      server.sameReplicationSourceAndSink = true;
2894    } else {
2895      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2896        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2897      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2898        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2899      server.sameReplicationSourceAndSink = false;
2900    }
2901  }
2902
2903  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2904    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2905    Path oldLogDir, WALFactory walFactory) throws IOException {
2906    final Class<? extends T> clazz;
2907    try {
2908      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2909      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2910    } catch (java.lang.ClassNotFoundException nfe) {
2911      throw new IOException("Could not find class for " + classname);
2912    }
2913    T service = ReflectionUtils.newInstance(clazz, conf);
2914    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2915    return service;
2916  }
2917
2918  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
2919    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2920    if (!this.isOnline()) {
2921      return walGroupsReplicationStatus;
2922    }
2923    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2924    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2925    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2926    for (ReplicationSourceInterface source : allSources) {
2927      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2928    }
2929    return walGroupsReplicationStatus;
2930  }
2931
2932  /**
2933   * Utility for constructing an instance of the passed HRegionServer class.
2934   */
2935  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
2936    final Configuration conf) {
2937    try {
2938      Constructor<? extends HRegionServer> c =
2939        regionServerClass.getConstructor(Configuration.class);
2940      return c.newInstance(conf);
2941    } catch (Exception e) {
2942      throw new RuntimeException(
2943        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
2944    }
2945  }
2946
2947  /**
2948   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2949   */
2950  public static void main(String[] args) {
2951    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2952    VersionInfo.logVersion();
2953    Configuration conf = HBaseConfiguration.create();
2954    @SuppressWarnings("unchecked")
2955    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2956      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2957
2958    new HRegionServerCommandLine(regionServerClass).doMain(args);
2959  }
2960
2961  /**
2962   * Gets the online regions of the specified table. This method looks at the in-memory
2963   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
2964   * If a region on this table has been closed during a disable, etc., it will not be included in
2965   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
2966   * all the ONLINE regions in the table.
2967   * @param tableName table to limit the scope of the query
2968   * @return Online regions from <code>tableName</code>
2969   */
2970  @Override
2971  public List<HRegion> getRegions(TableName tableName) {
2972    List<HRegion> tableRegions = new ArrayList<>();
2973    synchronized (this.onlineRegions) {
2974      for (HRegion region : this.onlineRegions.values()) {
2975        RegionInfo regionInfo = region.getRegionInfo();
2976        if (regionInfo.getTable().equals(tableName)) {
2977          tableRegions.add(region);
2978        }
2979      }
2980    }
2981    return tableRegions;
2982  }
2983
2984  @Override
2985  public List<HRegion> getRegions() {
2986    List<HRegion> allRegions;
2987    synchronized (this.onlineRegions) {
2988      // Return a clone copy of the onlineRegions
2989      allRegions = new ArrayList<>(onlineRegions.values());
2990    }
2991    return allRegions;
2992  }
2993
2994  /**
2995   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
2996   * @return all the online tables in this RS
2997   */
2998  public Set<TableName> getOnlineTables() {
2999    Set<TableName> tables = new HashSet<>();
3000    synchronized (this.onlineRegions) {
3001      for (Region region : this.onlineRegions.values()) {
3002        tables.add(region.getTableDescriptor().getTableName());
3003      }
3004    }
3005    return tables;
3006  }
3007
3008  public String[] getRegionServerCoprocessors() {
3009    TreeSet<String> coprocessors = new TreeSet<>();
3010    try {
3011      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3012    } catch (IOException exception) {
3013      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
3014        + "skipping.");
3015      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3016    }
3017    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3018    for (HRegion region : regions) {
3019      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3020      try {
3021        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3022      } catch (IOException exception) {
3023        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
3024          + "; skipping.");
3025        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3026      }
3027    }
3028    coprocessors.addAll(rsHost.getCoprocessors());
3029    return coprocessors.toArray(new String[0]);
3030  }
3031
3032  /**
3033   * Try to close the region, logs a warning on failure but continues.
3034   * @param region Region to close
3035   */
3036  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3037    try {
3038      if (!closeRegion(region.getEncodedName(), abort, null)) {
3039        LOG
3040          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
3041      }
3042    } catch (IOException e) {
3043      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
3044        e);
3045    }
3046  }
3047
3048  /**
3049   * Close asynchronously a region, can be called from the master or internally by the regionserver
3050   * when stopping. If called from the master, the region will update the status.
3051   * <p>
3052   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3053   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3054   * </p>
3055   * <p>
3056   * If a close was in progress, this new request will be ignored, and an exception thrown.
3057   * </p>
3058   * <p>
3059   * Provides additional flag to indicate if this region blocks should be evicted from the cache.
3060   * </p>
3061   * @param encodedName Region to close
3062   * @param abort       True if we are aborting
3063   * @param destination Where the Region is being moved too... maybe null if unknown.
3064   * @return True if closed a region.
3065   * @throws NotServingRegionException if the region is not online
3066   */
3067  protected boolean closeRegion(String encodedName, final boolean abort,
3068    final ServerName destination) throws NotServingRegionException {
3069    // Check for permissions to close.
3070    HRegion actualRegion = this.getRegion(encodedName);
3071    // Can be null if we're calling close on a region that's not online
3072    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3073      try {
3074        actualRegion.getCoprocessorHost().preClose(false);
3075      } catch (IOException exp) {
3076        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3077        return false;
3078      }
3079    }
3080
3081    // previous can come back 'null' if not in map.
3082    final Boolean previous =
3083      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
3084
3085    if (Boolean.TRUE.equals(previous)) {
3086      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
3087        + "trying to OPEN. Cancelling OPENING.");
3088      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3089        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3090        // We're going to try to do a standard close then.
3091        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
3092          + " Doing a standard close now");
3093        return closeRegion(encodedName, abort, destination);
3094      }
3095      // Let's get the region from the online region list again
3096      actualRegion = this.getRegion(encodedName);
3097      if (actualRegion == null) { // If already online, we still need to close it.
3098        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3099        // The master deletes the znode when it receives this exception.
3100        throw new NotServingRegionException(
3101          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
3102      }
3103    } else if (previous == null) {
3104      LOG.info("Received CLOSE for {}", encodedName);
3105    } else if (Boolean.FALSE.equals(previous)) {
3106      LOG.info("Received CLOSE for the region: " + encodedName
3107        + ", which we are already trying to CLOSE, but not completed yet");
3108      return true;
3109    }
3110
3111    if (actualRegion == null) {
3112      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3113      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3114      // The master deletes the znode when it receives this exception.
3115      throw new NotServingRegionException(
3116        "The region " + encodedName + " is not online, and is not opening.");
3117    }
3118
3119    CloseRegionHandler crh;
3120    final RegionInfo hri = actualRegion.getRegionInfo();
3121    if (hri.isMetaRegion()) {
3122      crh = new CloseMetaHandler(this, this, hri, abort);
3123    } else {
3124      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3125    }
3126    this.executorService.submit(crh);
3127    return true;
3128  }
3129
3130  /**
3131   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
3132   *         member of the online regions.
3133   */
3134  public HRegion getOnlineRegion(final byte[] regionName) {
3135    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3136    return this.onlineRegions.get(encodedRegionName);
3137  }
3138
3139  @Override
3140  public HRegion getRegion(final String encodedRegionName) {
3141    return this.onlineRegions.get(encodedRegionName);
3142  }
3143
3144  @Override
3145  public boolean removeRegion(final HRegion r, ServerName destination) {
3146    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3147    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3148    if (destination != null) {
3149      long closeSeqNum = r.getMaxFlushedSeqId();
3150      if (closeSeqNum == HConstants.NO_SEQNUM) {
3151        // No edits in WAL for this region; get the sequence number when the region was opened.
3152        closeSeqNum = r.getOpenSeqNum();
3153        if (closeSeqNum == HConstants.NO_SEQNUM) {
3154          closeSeqNum = 0;
3155        }
3156      }
3157      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3158      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3159      if (selfMove) {
3160        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3161          r.getRegionInfo().getEncodedName(),
3162          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3163      }
3164    }
3165    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3166    configurationManager.deregisterObserver(r);
3167    return toReturn != null;
3168  }
3169
3170  /**
3171   * Protected Utility method for safely obtaining an HRegion handle.
3172   * @param regionName Name of online {@link HRegion} to return
3173   * @return {@link HRegion} for <code>regionName</code>
3174   */
3175  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3176    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3177    return getRegionByEncodedName(regionName, encodedRegionName);
3178  }
3179
3180  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3181    return getRegionByEncodedName(null, encodedRegionName);
3182  }
3183
3184  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3185    throws NotServingRegionException {
3186    HRegion region = this.onlineRegions.get(encodedRegionName);
3187    if (region == null) {
3188      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3189      if (moveInfo != null) {
3190        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3191      }
3192      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3193      String regionNameStr =
3194        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3195      if (isOpening != null && isOpening) {
3196        throw new RegionOpeningException(
3197          "Region " + regionNameStr + " is opening on " + this.serverName);
3198      }
3199      throw new NotServingRegionException(
3200        "" + regionNameStr + " is not online on " + this.serverName);
3201    }
3202    return region;
3203  }
3204
3205  /**
3206   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3207   * already.
3208   * @param t   Throwable
3209   * @param msg Message to log in error. Can be null.
3210   * @return Throwable converted to an IOE; methods can only let out IOEs.
3211   */
3212  private Throwable cleanup(final Throwable t, final String msg) {
3213    // Don't log as error if NSRE; NSRE is 'normal' operation.
3214    if (t instanceof NotServingRegionException) {
3215      LOG.debug("NotServingRegionException; " + t.getMessage());
3216      return t;
3217    }
3218    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3219    if (msg == null) {
3220      LOG.error("", e);
3221    } else {
3222      LOG.error(msg, e);
3223    }
3224    if (!rpcServices.checkOOME(t)) {
3225      checkFileSystem();
3226    }
3227    return t;
3228  }
3229
3230  /**
3231   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3232   * @return Make <code>t</code> an IOE if it isn't already.
3233   */
3234  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3235    return (t instanceof IOException ? (IOException) t
3236      : msg == null || msg.length() == 0 ? new IOException(t)
3237      : new IOException(msg, t));
3238  }
3239
3240  /**
3241   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3242   * stopRequested
3243   * @return false if file system is not available
3244   */
3245  boolean checkFileSystem() {
3246    if (this.dataFsOk && this.dataFs != null) {
3247      try {
3248        FSUtils.checkFileSystemAvailable(this.dataFs);
3249      } catch (IOException e) {
3250        abort("File System not available", e);
3251        this.dataFsOk = false;
3252      }
3253    }
3254    return this.dataFsOk;
3255  }
3256
3257  @Override
3258  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3259    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3260    Address[] addr = new Address[favoredNodes.size()];
3261    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3262    // it is a map of region name to Address[]
3263    for (int i = 0; i < favoredNodes.size(); i++) {
3264      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3265    }
3266    regionFavoredNodesMap.put(encodedRegionName, addr);
3267  }
3268
3269  /**
3270   * Return the favored nodes for a region given its encoded name. Look at the comment around
3271   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3272   * @param encodedRegionName the encoded region name.
3273   * @return array of favored locations
3274   */
3275  @Override
3276  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3277    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3278  }
3279
3280  @Override
3281  public ServerNonceManager getNonceManager() {
3282    return this.nonceManager;
3283  }
3284
3285  private static class MovedRegionInfo {
3286    private final ServerName serverName;
3287    private final long seqNum;
3288
3289    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3290      this.serverName = serverName;
3291      this.seqNum = closeSeqNum;
3292    }
3293
3294    public ServerName getServerName() {
3295      return serverName;
3296    }
3297
3298    public long getSeqNum() {
3299      return seqNum;
3300    }
3301  }
3302
3303  /**
3304   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3305   * number of network calls instead of reducing them.
3306   */
3307  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3308
3309  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3310    boolean selfMove) {
3311    if (selfMove) {
3312      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3313      return;
3314    }
3315    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3316      + closeSeqNum);
3317    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3318  }
3319
3320  void removeFromMovedRegions(String encodedName) {
3321    movedRegionInfoCache.invalidate(encodedName);
3322  }
3323
3324  @InterfaceAudience.Private
3325  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3326    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3327  }
3328
3329  @InterfaceAudience.Private
3330  public int movedRegionCacheExpiredTime() {
3331    return TIMEOUT_REGION_MOVED;
3332  }
3333
3334  private String getMyEphemeralNodePath() {
3335    return zooKeeper.getZNodePaths().getRsPath(serverName);
3336  }
3337
3338  private boolean isHealthCheckerConfigured() {
3339    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3340    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3341  }
3342
3343  /** Returns the underlying {@link CompactSplit} for the servers */
3344  public CompactSplit getCompactSplitThread() {
3345    return this.compactSplitThread;
3346  }
3347
3348  CoprocessorServiceResponse execRegionServerService(
3349    @SuppressWarnings("UnusedParameters") final RpcController controller,
3350    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3351    try {
3352      ServerRpcController serviceController = new ServerRpcController();
3353      CoprocessorServiceCall call = serviceRequest.getCall();
3354      String serviceName = call.getServiceName();
3355      Service service = coprocessorServiceHandlers.get(serviceName);
3356      if (service == null) {
3357        throw new UnknownProtocolException(null,
3358          "No registered coprocessor executorService found for " + serviceName);
3359      }
3360      ServiceDescriptor serviceDesc = service.getDescriptorForType();
3361
3362      String methodName = call.getMethodName();
3363      MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3364      if (methodDesc == null) {
3365        throw new UnknownProtocolException(service.getClass(),
3366          "Unknown method " + methodName + " called on executorService " + serviceName);
3367      }
3368
3369      Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3370      final Message.Builder responseBuilder =
3371        service.getResponsePrototype(methodDesc).newBuilderForType();
3372      service.callMethod(methodDesc, serviceController, request, message -> {
3373        if (message != null) {
3374          responseBuilder.mergeFrom(message);
3375        }
3376      });
3377      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3378      if (exception != null) {
3379        throw exception;
3380      }
3381      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3382    } catch (IOException ie) {
3383      throw new ServiceException(ie);
3384    }
3385  }
3386
3387  /**
3388   * May be null if this is a master which not carry table.
3389   * @return The block cache instance used by the regionserver.
3390   */
3391  @Override
3392  public Optional<BlockCache> getBlockCache() {
3393    return Optional.ofNullable(this.blockCache);
3394  }
3395
3396  /**
3397   * May be null if this is a master which not carry table.
3398   * @return The cache for mob files used by the regionserver.
3399   */
3400  @Override
3401  public Optional<MobFileCache> getMobFileCache() {
3402    return Optional.ofNullable(this.mobFileCache);
3403  }
3404
3405  CacheEvictionStats clearRegionBlockCache(Region region) {
3406    long evictedBlocks = 0;
3407
3408    for (Store store : region.getStores()) {
3409      for (StoreFile hFile : store.getStorefiles()) {
3410        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3411      }
3412    }
3413
3414    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3415  }
3416
3417  @Override
3418  public double getCompactionPressure() {
3419    double max = 0;
3420    for (Region region : onlineRegions.values()) {
3421      for (Store store : region.getStores()) {
3422        double normCount = store.getCompactionPressure();
3423        if (normCount > max) {
3424          max = normCount;
3425        }
3426      }
3427    }
3428    return max;
3429  }
3430
3431  @Override
3432  public HeapMemoryManager getHeapMemoryManager() {
3433    return hMemManager;
3434  }
3435
3436  public MemStoreFlusher getMemStoreFlusher() {
3437    return cacheFlusher;
3438  }
3439
3440  /**
3441   * For testing
3442   * @return whether all wal roll request finished for this regionserver
3443   */
3444  @InterfaceAudience.Private
3445  public boolean walRollRequestFinished() {
3446    return this.walRoller.walRollFinished();
3447  }
3448
3449  @Override
3450  public ThroughputController getFlushThroughputController() {
3451    return flushThroughputController;
3452  }
3453
3454  @Override
3455  public double getFlushPressure() {
3456    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3457      // return 0 during RS initialization
3458      return 0.0;
3459    }
3460    return getRegionServerAccounting().getFlushPressure();
3461  }
3462
3463  @Override
3464  public void onConfigurationChange(Configuration newConf) {
3465    ThroughputController old = this.flushThroughputController;
3466    if (old != null) {
3467      old.stop("configuration change");
3468    }
3469    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3470    try {
3471      Superusers.initialize(newConf);
3472    } catch (IOException e) {
3473      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3474    }
3475
3476    // update region server coprocessor if the configuration has changed.
3477    if (
3478      CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
3479        CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)
3480    ) {
3481      LOG.info("Update region server coprocessors because the configuration has changed");
3482      this.rsHost = new RegionServerCoprocessorHost(this, newConf);
3483    }
3484  }
3485
3486  @Override
3487  public MetricsRegionServer getMetrics() {
3488    return metricsRegionServer;
3489  }
3490
3491  @Override
3492  public SecureBulkLoadManager getSecureBulkLoadManager() {
3493    return this.secureBulkLoadManager;
3494  }
3495
3496  @Override
3497  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3498    final Abortable abort) {
3499    final LockServiceClient client =
3500      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3501    return client.regionLock(regionInfo, description, abort);
3502  }
3503
3504  @Override
3505  public void unassign(byte[] regionName) throws IOException {
3506    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3507  }
3508
3509  @Override
3510  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3511    return this.rsSpaceQuotaManager;
3512  }
3513
3514  @Override
3515  public boolean reportFileArchivalForQuotas(TableName tableName,
3516    Collection<Entry<String, Long>> archivedFiles) {
3517    if (TEST_SKIP_REPORTING_TRANSITION) {
3518      return false;
3519    }
3520    RegionServerStatusService.BlockingInterface rss = rssStub;
3521    if (rss == null || rsSpaceQuotaManager == null) {
3522      // the current server could be stopping.
3523      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3524      return false;
3525    }
3526    try {
3527      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3528        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3529      rss.reportFileArchival(null, request);
3530    } catch (ServiceException se) {
3531      IOException ioe = ProtobufUtil.getRemoteException(se);
3532      if (ioe instanceof PleaseHoldException) {
3533        if (LOG.isTraceEnabled()) {
3534          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3535            + " This will be retried.", ioe);
3536        }
3537        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3538        return false;
3539      }
3540      if (rssStub == rss) {
3541        rssStub = null;
3542      }
3543      // re-create the stub if we failed to report the archival
3544      createRegionServerStatusStub(true);
3545      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3546      return false;
3547    }
3548    return true;
3549  }
3550
3551  void executeProcedure(long procId, long initiatingMasterActiveTime,
3552    RSProcedureCallable callable) {
3553    executorService
3554      .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable));
3555  }
3556
3557  public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, Throwable error,
3558    byte[] procResultData) {
3559    procedureResultReporter.complete(procId, initiatingMasterActiveTime, error, procResultData);
3560  }
3561
3562  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3563    RegionServerStatusService.BlockingInterface rss;
3564    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3565    for (;;) {
3566      rss = rssStub;
3567      if (rss != null) {
3568        break;
3569      }
3570      createRegionServerStatusStub();
3571    }
3572    try {
3573      rss.reportProcedureDone(null, request);
3574    } catch (ServiceException se) {
3575      if (rssStub == rss) {
3576        rssStub = null;
3577      }
3578      throw ProtobufUtil.getRemoteException(se);
3579    }
3580  }
3581
3582  /**
3583   * Will ignore the open/close region procedures which already submitted or executed. When master
3584   * had unfinished open/close region procedure and restarted, new active master may send duplicate
3585   * open/close region request to regionserver. The open/close request is submitted to a thread pool
3586   * and execute. So first need a cache for submitted open/close region procedures. After the
3587   * open/close region request executed and report region transition succeed, cache it in executed
3588   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3589   * transition succeed, master will not send the open/close region request to regionserver again.
3590   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3591   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
3592   * HBASE-22404 for more details.
3593   * @param procId the id of the open/close region procedure
3594   * @return true if the procedure can be submitted.
3595   */
3596  boolean submitRegionProcedure(long procId) {
3597    if (procId == -1) {
3598      return true;
3599    }
3600    // Ignore the region procedures which already submitted.
3601    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3602    if (previous != null) {
3603      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3604      return false;
3605    }
3606    // Ignore the region procedures which already executed.
3607    if (executedRegionProcedures.getIfPresent(procId) != null) {
3608      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3609      return false;
3610    }
3611    return true;
3612  }
3613
3614  /**
3615   * See {@link #submitRegionProcedure(long)}.
3616   * @param procId the id of the open/close region procedure
3617   */
3618  public void finishRegionProcedure(long procId) {
3619    executedRegionProcedures.put(procId, procId);
3620    submittedRegionProcedures.remove(procId);
3621  }
3622
3623  /**
3624   * Force to terminate region server when abort timeout.
3625   */
3626  private static class SystemExitWhenAbortTimeout extends TimerTask {
3627
3628    public SystemExitWhenAbortTimeout() {
3629    }
3630
3631    @Override
3632    public void run() {
3633      LOG.warn("Aborting region server timed out, terminating forcibly"
3634        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
3635        + " Thread dump to stdout.");
3636      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3637      Runtime.getRuntime().halt(1);
3638    }
3639  }
3640
3641  @InterfaceAudience.Private
3642  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3643    return compactedFileDischarger;
3644  }
3645
3646  /**
3647   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3648   * @return pause time
3649   */
3650  @InterfaceAudience.Private
3651  public long getRetryPauseTime() {
3652    return this.retryPauseTime;
3653  }
3654
3655  @Override
3656  public Optional<ServerName> getActiveMaster() {
3657    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3658  }
3659
3660  @Override
3661  public List<ServerName> getBackupMasters() {
3662    return masterAddressTracker.getBackupMasters();
3663  }
3664
3665  @Override
3666  public Iterator<ServerName> getBootstrapNodes() {
3667    return bootstrapNodeManager.getBootstrapNodes().iterator();
3668  }
3669
3670  @Override
3671  public List<HRegionLocation> getMetaLocations() {
3672    return metaRegionLocationCache.getMetaRegionLocations();
3673  }
3674
3675  @Override
3676  protected NamedQueueRecorder createNamedQueueRecord() {
3677    return NamedQueueRecorder.getInstance(conf);
3678  }
3679
3680  @Override
3681  protected boolean clusterMode() {
3682    // this method will be called in the constructor of super class, so we can not return masterless
3683    // directly here, as it will always be false.
3684    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3685  }
3686
3687  @InterfaceAudience.Private
3688  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
3689    return brokenStoreFileCleaner;
3690  }
3691
3692  @InterfaceAudience.Private
3693  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
3694    return rsMobFileCleanerChore;
3695  }
3696
3697  RSSnapshotVerifier getRsSnapshotVerifier() {
3698    return rsSnapshotVerifier;
3699  }
3700
3701  @Override
3702  protected void stopChores() {
3703    shutdownChore(nonceManagerChore);
3704    shutdownChore(compactionChecker);
3705    shutdownChore(compactedFileDischarger);
3706    shutdownChore(periodicFlusher);
3707    shutdownChore(healthCheckChore);
3708    shutdownChore(executorStatusChore);
3709    shutdownChore(storefileRefresher);
3710    shutdownChore(fsUtilizationChore);
3711    shutdownChore(namedQueueServiceChore);
3712    shutdownChore(brokenStoreFileCleaner);
3713    shutdownChore(rsMobFileCleanerChore);
3714    shutdownChore(replicationMarkerChore);
3715  }
3716
3717  @Override
3718  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3719    return regionReplicationBufferManager;
3720  }
3721}