001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
034
035import io.opentelemetry.api.trace.Span;
036import io.opentelemetry.api.trace.StatusCode;
037import io.opentelemetry.context.Scope;
038import java.io.IOException;
039import java.io.PrintWriter;
040import java.lang.management.MemoryUsage;
041import java.lang.reflect.Constructor;
042import java.net.InetSocketAddress;
043import java.time.Duration;
044import java.util.ArrayList;
045import java.util.Collection;
046import java.util.Collections;
047import java.util.Comparator;
048import java.util.HashSet;
049import java.util.Iterator;
050import java.util.List;
051import java.util.Map;
052import java.util.Map.Entry;
053import java.util.Objects;
054import java.util.Optional;
055import java.util.Set;
056import java.util.SortedMap;
057import java.util.Timer;
058import java.util.TimerTask;
059import java.util.TreeMap;
060import java.util.TreeSet;
061import java.util.concurrent.ConcurrentHashMap;
062import java.util.concurrent.ConcurrentMap;
063import java.util.concurrent.ConcurrentSkipListMap;
064import java.util.concurrent.ThreadLocalRandom;
065import java.util.concurrent.TimeUnit;
066import java.util.concurrent.atomic.AtomicBoolean;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.function.Consumer;
069import java.util.stream.Collectors;
070import javax.management.MalformedObjectNameException;
071import javax.servlet.http.HttpServlet;
072import org.apache.commons.lang3.StringUtils;
073import org.apache.commons.lang3.mutable.MutableFloat;
074import org.apache.hadoop.conf.Configuration;
075import org.apache.hadoop.fs.FileSystem;
076import org.apache.hadoop.fs.Path;
077import org.apache.hadoop.hbase.Abortable;
078import org.apache.hadoop.hbase.CacheEvictionStats;
079import org.apache.hadoop.hbase.CallQueueTooBigException;
080import org.apache.hadoop.hbase.ClockOutOfSyncException;
081import org.apache.hadoop.hbase.DoNotRetryIOException;
082import org.apache.hadoop.hbase.ExecutorStatusChore;
083import org.apache.hadoop.hbase.HBaseConfiguration;
084import org.apache.hadoop.hbase.HBaseInterfaceAudience;
085import org.apache.hadoop.hbase.HBaseServerBase;
086import org.apache.hadoop.hbase.HConstants;
087import org.apache.hadoop.hbase.HDFSBlocksDistribution;
088import org.apache.hadoop.hbase.HRegionLocation;
089import org.apache.hadoop.hbase.HealthCheckChore;
090import org.apache.hadoop.hbase.MetaTableAccessor;
091import org.apache.hadoop.hbase.NotServingRegionException;
092import org.apache.hadoop.hbase.PleaseHoldException;
093import org.apache.hadoop.hbase.ScheduledChore;
094import org.apache.hadoop.hbase.ServerName;
095import org.apache.hadoop.hbase.Stoppable;
096import org.apache.hadoop.hbase.TableName;
097import org.apache.hadoop.hbase.YouAreDeadException;
098import org.apache.hadoop.hbase.ZNodeClearer;
099import org.apache.hadoop.hbase.client.ConnectionUtils;
100import org.apache.hadoop.hbase.client.RegionInfo;
101import org.apache.hadoop.hbase.client.RegionInfoBuilder;
102import org.apache.hadoop.hbase.client.locking.EntityLock;
103import org.apache.hadoop.hbase.client.locking.LockServiceClient;
104import org.apache.hadoop.hbase.conf.ConfigurationObserver;
105import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
106import org.apache.hadoop.hbase.exceptions.RegionMovedException;
107import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
108import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
109import org.apache.hadoop.hbase.executor.ExecutorType;
110import org.apache.hadoop.hbase.http.InfoServer;
111import org.apache.hadoop.hbase.io.hfile.BlockCache;
112import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
113import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
114import org.apache.hadoop.hbase.io.hfile.HFile;
115import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
116import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
117import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
118import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
119import org.apache.hadoop.hbase.ipc.RpcClient;
120import org.apache.hadoop.hbase.ipc.RpcServer;
121import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
122import org.apache.hadoop.hbase.ipc.ServerRpcController;
123import org.apache.hadoop.hbase.log.HBaseMarkers;
124import org.apache.hadoop.hbase.mob.MobFileCache;
125import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
126import org.apache.hadoop.hbase.monitoring.TaskMonitor;
127import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
128import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
129import org.apache.hadoop.hbase.net.Address;
130import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
131import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
132import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
133import org.apache.hadoop.hbase.quotas.QuotaUtil;
134import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
135import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
136import org.apache.hadoop.hbase.quotas.RegionSize;
137import org.apache.hadoop.hbase.quotas.RegionSizeStore;
138import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
139import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
140import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
141import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
142import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
143import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
144import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
145import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
146import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
147import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
148import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
149import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
150import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
151import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
152import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
153import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
154import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
155import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
156import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
157import org.apache.hadoop.hbase.security.SecurityConstants;
158import org.apache.hadoop.hbase.security.Superusers;
159import org.apache.hadoop.hbase.security.User;
160import org.apache.hadoop.hbase.security.UserProvider;
161import org.apache.hadoop.hbase.trace.TraceUtil;
162import org.apache.hadoop.hbase.util.Bytes;
163import org.apache.hadoop.hbase.util.CompressionTest;
164import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
165import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
166import org.apache.hadoop.hbase.util.FSUtils;
167import org.apache.hadoop.hbase.util.FutureUtils;
168import org.apache.hadoop.hbase.util.JvmPauseMonitor;
169import org.apache.hadoop.hbase.util.Pair;
170import org.apache.hadoop.hbase.util.RetryCounter;
171import org.apache.hadoop.hbase.util.RetryCounterFactory;
172import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
173import org.apache.hadoop.hbase.util.Threads;
174import org.apache.hadoop.hbase.util.VersionInfo;
175import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
176import org.apache.hadoop.hbase.wal.WAL;
177import org.apache.hadoop.hbase.wal.WALFactory;
178import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
179import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
180import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
181import org.apache.hadoop.hbase.zookeeper.ZKUtil;
182import org.apache.hadoop.ipc.RemoteException;
183import org.apache.hadoop.util.ReflectionUtils;
184import org.apache.yetus.audience.InterfaceAudience;
185import org.apache.zookeeper.KeeperException;
186import org.slf4j.Logger;
187import org.slf4j.LoggerFactory;
188
189import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
190import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
191import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
192import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
193import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
194import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
195import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
196import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
197import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
198import org.apache.hbase.thirdparty.com.google.protobuf.Message;
199import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
200import org.apache.hbase.thirdparty.com.google.protobuf.Service;
201import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
202import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
203import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
204
205import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
206import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
235
236/**
237 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
238 * are many HRegionServers in a single HBase deployment.
239 */
240@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
241@SuppressWarnings({ "deprecation" })
242public class HRegionServer extends HBaseServerBase<RSRpcServices>
243  implements RegionServerServices, LastSequenceId {
244
245  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
246
247  int unitMB = 1024 * 1024;
248  int unitKB = 1024;
249
250  /**
251   * For testing only! Set to true to skip notifying region assignment to master .
252   */
253  @InterfaceAudience.Private
254  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
255  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
256
257  /**
258   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
259   * region action in progress false - if close region action in progress
260   */
261  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
262    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
263
264  /**
265   * Used to cache the open/close region procedures which already submitted. See
266   * {@link #submitRegionProcedure(long)}.
267   */
268  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
269  /**
270   * Used to cache the open/close region procedures which already executed. See
271   * {@link #submitRegionProcedure(long)}.
272   */
273  private final Cache<Long, Long> executedRegionProcedures =
274    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
275
276  /**
277   * Used to cache the moved-out regions
278   */
279  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
280    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
281
282  private MemStoreFlusher cacheFlusher;
283
284  private HeapMemoryManager hMemManager;
285
286  // Replication services. If no replication, this handler will be null.
287  private ReplicationSourceService replicationSourceHandler;
288  private ReplicationSinkService replicationSinkHandler;
289  private boolean sameReplicationSourceAndSink;
290
291  // Compactions
292  private CompactSplit compactSplitThread;
293
294  /**
295   * Map of regions currently being served by this region server. Key is the encoded region name.
296   * All access should be synchronized.
297   */
298  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
299  /**
300   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
301   * need to be a ConcurrentHashMap?
302   */
303  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
304
305  /**
306   * Map of encoded region names to the DataNode locations they should be hosted on We store the
307   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
308   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
309   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
310   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
311   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
312   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
313   * will be fresh when we need it.
314   */
315  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
316
317  private LeaseManager leaseManager;
318
319  private volatile boolean dataFsOk;
320
321  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
322  // Default abort timeout is 1200 seconds for safe
323  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
324  // Will run this task when abort timeout
325  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
326
327  // A state before we go into stopped state. At this stage we're closing user
328  // space regions.
329  private boolean stopping = false;
330  private volatile boolean killed = false;
331
332  private final int threadWakeFrequency;
333
334  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
335  private final int compactionCheckFrequency;
336  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
337  private final int flushCheckFrequency;
338
339  // Stub to do region server status calls against the master.
340  private volatile RegionServerStatusService.BlockingInterface rssStub;
341  private volatile LockService.BlockingInterface lockStub;
342  // RPC client. Used to make the stub above that does region server status checking.
343  private RpcClient rpcClient;
344
345  private UncaughtExceptionHandler uncaughtExceptionHandler;
346
347  private JvmPauseMonitor pauseMonitor;
348
349  private RSSnapshotVerifier rsSnapshotVerifier;
350
351  /** region server process name */
352  public static final String REGIONSERVER = "regionserver";
353
354  private MetricsRegionServer metricsRegionServer;
355  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
356
357  /**
358   * Check for compactions requests.
359   */
360  private ScheduledChore compactionChecker;
361
362  /**
363   * Check for flushes
364   */
365  private ScheduledChore periodicFlusher;
366
367  private volatile WALFactory walFactory;
368
369  private LogRoller walRoller;
370
371  // A thread which calls reportProcedureDone
372  private RemoteProcedureResultReporter procedureResultReporter;
373
374  // flag set after we're done setting up server threads
375  final AtomicBoolean online = new AtomicBoolean(false);
376
377  // master address tracker
378  private final MasterAddressTracker masterAddressTracker;
379
380  // Log Splitting Worker
381  private SplitLogWorker splitLogWorker;
382
383  private final int shortOperationTimeout;
384
385  // Time to pause if master says 'please hold'
386  private final long retryPauseTime;
387
388  private final RegionServerAccounting regionServerAccounting;
389
390  private NamedQueueServiceChore namedQueueServiceChore = null;
391
392  // Block cache
393  private BlockCache blockCache;
394  // The cache for mob files
395  private MobFileCache mobFileCache;
396
397  /** The health check chore. */
398  private HealthCheckChore healthCheckChore;
399
400  /** The Executor status collect chore. */
401  private ExecutorStatusChore executorStatusChore;
402
403  /** The nonce manager chore. */
404  private ScheduledChore nonceManagerChore;
405
406  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
407
408  /**
409   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
410   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
411   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
412   */
413  @Deprecated
414  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
415  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
416    "hbase.regionserver.hostname.disable.master.reversedns";
417
418  /**
419   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
420   * Exception will be thrown if both are used.
421   */
422  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
423  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
424    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
425
426  /**
427   * Unique identifier for the cluster we are a part of.
428   */
429  private String clusterId;
430
431  // chore for refreshing store files for secondary regions
432  private StorefileRefresherChore storefileRefresher;
433
434  private volatile RegionServerCoprocessorHost rsHost;
435
436  private RegionServerProcedureManagerHost rspmHost;
437
438  private RegionServerRpcQuotaManager rsQuotaManager;
439  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
440
441  /**
442   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
443   * case where client doesn't receive the response from a successful operation and retries. We
444   * track the successful ops for some time via a nonce sent by client and handle duplicate
445   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
446   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
447   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
448   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
449   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
450   * recovery during normal region move, so nonces will not be transfered. We can have separate
451   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
452   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
453   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
454   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
455   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
456   * recovered during move.
457   */
458  final ServerNonceManager nonceManager;
459
460  private BrokenStoreFileCleaner brokenStoreFileCleaner;
461
462  private RSMobFileCleanerChore rsMobFileCleanerChore;
463
464  @InterfaceAudience.Private
465  CompactedHFilesDischarger compactedFileDischarger;
466
467  private volatile ThroughputController flushThroughputController;
468
469  private SecureBulkLoadManager secureBulkLoadManager;
470
471  private FileSystemUtilizationChore fsUtilizationChore;
472
473  private BootstrapNodeManager bootstrapNodeManager;
474
475  /**
476   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
477   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
478   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
479   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
480   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
481   */
482  private final boolean masterless;
483  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
484
485  /** regionserver codec list **/
486  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
487
488  // A timer to shutdown the process if abort takes too long
489  private Timer abortMonitor;
490
491  private RegionReplicationBufferManager regionReplicationBufferManager;
492
493  /*
494   * Chore that creates replication marker rows.
495   */
496  private ReplicationMarkerChore replicationMarkerChore;
497
498  /**
499   * Starts a HRegionServer at the default location.
500   * <p/>
501   * Don't start any services or managers in here in the Constructor. Defer till after we register
502   * with the Master as much as possible. See {@link #startServices}.
503   */
504  public HRegionServer(final Configuration conf) throws IOException {
505    super(conf, "RegionServer"); // thread name
506    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
507    try (Scope ignored = span.makeCurrent()) {
508      this.dataFsOk = true;
509      this.masterless = !clusterMode();
510      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
511      HFile.checkHFileVersion(this.conf);
512      checkCodecs(this.conf);
513      FSUtils.setupShortCircuitRead(this.conf);
514
515      // Disable usage of meta replicas in the regionserver
516      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
517      // Config'ed params
518      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
519      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
520      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
521
522      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
523      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
524
525      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
526        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
527
528      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
529        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
530
531      regionServerAccounting = new RegionServerAccounting(conf);
532
533      blockCache = BlockCacheFactory.createBlockCache(conf);
534      mobFileCache = new MobFileCache(conf);
535
536      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
537
538      uncaughtExceptionHandler =
539        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
540
541      // If no master in cluster, skip trying to track one or look for a cluster status.
542      if (!this.masterless) {
543        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
544        masterAddressTracker.start();
545      } else {
546        masterAddressTracker = null;
547      }
548      this.rpcServices.start(zooKeeper);
549      span.setStatus(StatusCode.OK);
550    } catch (Throwable t) {
551      // Make sure we log the exception. HRegionServer is often started via reflection and the
552      // cause of failed startup is lost.
553      TraceUtil.setError(span, t);
554      LOG.error("Failed construction RegionServer", t);
555      throw t;
556    } finally {
557      span.end();
558    }
559  }
560
561  // HMaster should override this method to load the specific config for master
562  @Override
563  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
564    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
565    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
566      if (!StringUtils.isBlank(hostname)) {
567        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
568          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
569          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
570          + UNSAFE_RS_HOSTNAME_KEY + " is used";
571        throw new IOException(msg);
572      } else {
573        return rpcServices.getSocketAddress().getHostName();
574      }
575    } else {
576      return hostname;
577    }
578  }
579
580  @Override
581  protected void login(UserProvider user, String host) throws IOException {
582    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
583      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
584  }
585
586  @Override
587  protected String getProcessName() {
588    return REGIONSERVER;
589  }
590
591  @Override
592  protected RegionServerCoprocessorHost getCoprocessorHost() {
593    return getRegionServerCoprocessorHost();
594  }
595
596  @Override
597  protected boolean canCreateBaseZNode() {
598    return !clusterMode();
599  }
600
601  @Override
602  protected boolean canUpdateTableDescriptor() {
603    return false;
604  }
605
606  @Override
607  protected boolean cacheTableDescriptor() {
608    return false;
609  }
610
611  protected RSRpcServices createRpcServices() throws IOException {
612    return new RSRpcServices(this);
613  }
614
615  @Override
616  protected void configureInfoServer(InfoServer infoServer) {
617    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
618    infoServer.setAttribute(REGIONSERVER, this);
619  }
620
621  @Override
622  protected Class<? extends HttpServlet> getDumpServlet() {
623    return RSDumpServlet.class;
624  }
625
626  /**
627   * Used by {@link RSDumpServlet} to generate debugging information.
628   */
629  public void dumpRowLocks(final PrintWriter out) {
630    StringBuilder sb = new StringBuilder();
631    for (HRegion region : getRegions()) {
632      if (region.getLockedRows().size() > 0) {
633        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
634          sb.setLength(0);
635          sb.append(region.getTableDescriptor().getTableName()).append(",")
636            .append(region.getRegionInfo().getEncodedName()).append(",");
637          sb.append(rowLockContext.toString());
638          out.println(sb);
639        }
640      }
641    }
642  }
643
644  @Override
645  public boolean registerService(Service instance) {
646    // No stacking of instances is allowed for a single executorService name
647    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
648    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
649    if (coprocessorServiceHandlers.containsKey(serviceName)) {
650      LOG.error("Coprocessor executorService " + serviceName
651        + " already registered, rejecting request from " + instance);
652      return false;
653    }
654
655    coprocessorServiceHandlers.put(serviceName, instance);
656    if (LOG.isDebugEnabled()) {
657      LOG.debug(
658        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
659    }
660    return true;
661  }
662
663  /**
664   * Run test on configured codecs to make sure supporting libs are in place.
665   */
666  private static void checkCodecs(final Configuration c) throws IOException {
667    // check to see if the codec list is available:
668    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
669    if (codecs == null) {
670      return;
671    }
672    for (String codec : codecs) {
673      if (!CompressionTest.testCompression(codec)) {
674        throw new IOException(
675          "Compression codec " + codec + " not supported, aborting RS construction");
676      }
677    }
678  }
679
680  public String getClusterId() {
681    return this.clusterId;
682  }
683
684  /**
685   * All initialization needed before we go register with Master.<br>
686   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
687   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
688   */
689  private void preRegistrationInitialization() {
690    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
691    try (Scope ignored = span.makeCurrent()) {
692      initializeZooKeeper();
693      setupClusterConnection();
694      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
695      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
696      // Setup RPC client for master communication
697      this.rpcClient = asyncClusterConnection.getRpcClient();
698      span.setStatus(StatusCode.OK);
699    } catch (Throwable t) {
700      // Call stop if error or process will stick around for ever since server
701      // puts up non-daemon threads.
702      TraceUtil.setError(span, t);
703      this.rpcServices.stop();
704      abort("Initialization of RS failed.  Hence aborting RS.", t);
705    } finally {
706      span.end();
707    }
708  }
709
710  /**
711   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
712   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
713   * <p>
714   * Finally open long-living server short-circuit connection.
715   */
716  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
717      justification = "cluster Id znode read would give us correct response")
718  private void initializeZooKeeper() throws IOException, InterruptedException {
719    // Nothing to do in here if no Master in the mix.
720    if (this.masterless) {
721      return;
722    }
723
724    // Create the master address tracker, register with zk, and start it. Then
725    // block until a master is available. No point in starting up if no master
726    // running.
727    blockAndCheckIfStopped(this.masterAddressTracker);
728
729    // Wait on cluster being up. Master will set this flag up in zookeeper
730    // when ready.
731    blockAndCheckIfStopped(this.clusterStatusTracker);
732
733    // If we are HMaster then the cluster id should have already been set.
734    if (clusterId == null) {
735      // Retrieve clusterId
736      // Since cluster status is now up
737      // ID should have already been set by HMaster
738      try {
739        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
740        if (clusterId == null) {
741          this.abort("Cluster ID has not been set");
742        }
743        LOG.info("ClusterId : " + clusterId);
744      } catch (KeeperException e) {
745        this.abort("Failed to retrieve Cluster ID", e);
746      }
747    }
748
749    if (isStopped() || isAborted()) {
750      return; // No need for further initialization
751    }
752
753    // watch for snapshots and other procedures
754    try {
755      rspmHost = new RegionServerProcedureManagerHost();
756      rspmHost.loadProcedures(conf);
757      rspmHost.initialize(this);
758    } catch (KeeperException e) {
759      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
760    }
761  }
762
763  /**
764   * Utilty method to wait indefinitely on a znode availability while checking if the region server
765   * is shut down
766   * @param tracker znode tracker to use
767   * @throws IOException          any IO exception, plus if the RS is stopped
768   * @throws InterruptedException if the waiting thread is interrupted
769   */
770  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
771    throws IOException, InterruptedException {
772    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
773      if (this.stopped) {
774        throw new IOException("Received the shutdown message while waiting.");
775      }
776    }
777  }
778
779  /** Returns True if the cluster is up. */
780  @Override
781  public boolean isClusterUp() {
782    return this.masterless
783      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
784  }
785
786  private void initializeReplicationMarkerChore() {
787    boolean replicationMarkerEnabled =
788      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
789    // If replication or replication marker is not enabled then return immediately.
790    if (replicationMarkerEnabled) {
791      int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
792        REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
793      replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf);
794    }
795  }
796
797  @Override
798  public boolean isStopping() {
799    return stopping;
800  }
801
802  /**
803   * The HRegionServer sticks in this loop until closed.
804   */
805  @Override
806  public void run() {
807    if (isStopped()) {
808      LOG.info("Skipping run; stopped");
809      return;
810    }
811    try {
812      // Do pre-registration initializations; zookeeper, lease threads, etc.
813      preRegistrationInitialization();
814    } catch (Throwable e) {
815      abort("Fatal exception during initialization", e);
816    }
817
818    try {
819      if (!isStopped() && !isAborted()) {
820        installShutdownHook();
821        // Initialize the RegionServerCoprocessorHost now that our ephemeral
822        // node was created, in case any coprocessors want to use ZooKeeper
823        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
824
825        // Try and register with the Master; tell it we are here. Break if server is stopped or
826        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
827        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
828        // come up.
829        LOG.debug("About to register with Master.");
830        TraceUtil.trace(() -> {
831          RetryCounterFactory rcf =
832            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
833          RetryCounter rc = rcf.create();
834          while (keepLooping()) {
835            RegionServerStartupResponse w = reportForDuty();
836            if (w == null) {
837              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
838              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
839              this.sleeper.sleep(sleepTime);
840            } else {
841              handleReportForDutyResponse(w);
842              break;
843            }
844          }
845        }, "HRegionServer.registerWithMaster");
846      }
847
848      if (!isStopped() && isHealthy()) {
849        TraceUtil.trace(() -> {
850          // start the snapshot handler and other procedure handlers,
851          // since the server is ready to run
852          if (this.rspmHost != null) {
853            this.rspmHost.start();
854          }
855          // Start the Quota Manager
856          if (this.rsQuotaManager != null) {
857            rsQuotaManager.start(getRpcServer().getScheduler());
858          }
859          if (this.rsSpaceQuotaManager != null) {
860            this.rsSpaceQuotaManager.start();
861          }
862        }, "HRegionServer.startup");
863      }
864
865      // We registered with the Master. Go into run mode.
866      long lastMsg = EnvironmentEdgeManager.currentTime();
867      long oldRequestCount = -1;
868      // The main run loop.
869      while (!isStopped() && isHealthy()) {
870        if (!isClusterUp()) {
871          if (onlineRegions.isEmpty()) {
872            stop("Exiting; cluster shutdown set and not carrying any regions");
873          } else if (!this.stopping) {
874            this.stopping = true;
875            LOG.info("Closing user regions");
876            closeUserRegions(isAborted());
877          } else {
878            boolean allUserRegionsOffline = areAllUserRegionsOffline();
879            if (allUserRegionsOffline) {
880              // Set stopped if no more write requests tp meta tables
881              // since last time we went around the loop. Any open
882              // meta regions will be closed on our way out.
883              if (oldRequestCount == getWriteRequestCount()) {
884                stop("Stopped; only catalog regions remaining online");
885                break;
886              }
887              oldRequestCount = getWriteRequestCount();
888            } else {
889              // Make sure all regions have been closed -- some regions may
890              // have not got it because we were splitting at the time of
891              // the call to closeUserRegions.
892              closeUserRegions(this.abortRequested.get());
893            }
894            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
895          }
896        }
897        long now = EnvironmentEdgeManager.currentTime();
898        if ((now - lastMsg) >= msgInterval) {
899          tryRegionServerReport(lastMsg, now);
900          lastMsg = EnvironmentEdgeManager.currentTime();
901        }
902        if (!isStopped() && !isAborted()) {
903          this.sleeper.sleep();
904        }
905      } // for
906    } catch (Throwable t) {
907      if (!rpcServices.checkOOME(t)) {
908        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
909        abort(prefix + t.getMessage(), t);
910      }
911    }
912
913    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
914    try (Scope ignored = span.makeCurrent()) {
915      if (this.leaseManager != null) {
916        this.leaseManager.closeAfterLeasesExpire();
917      }
918      if (this.splitLogWorker != null) {
919        splitLogWorker.stop();
920      }
921      stopInfoServer();
922      // Send cache a shutdown.
923      if (blockCache != null) {
924        blockCache.shutdown();
925      }
926      if (mobFileCache != null) {
927        mobFileCache.shutdown();
928      }
929
930      // Send interrupts to wake up threads if sleeping so they notice shutdown.
931      // TODO: Should we check they are alive? If OOME could have exited already
932      if (this.hMemManager != null) {
933        this.hMemManager.stop();
934      }
935      if (this.cacheFlusher != null) {
936        this.cacheFlusher.interruptIfNecessary();
937      }
938      if (this.compactSplitThread != null) {
939        this.compactSplitThread.interruptIfNecessary();
940      }
941
942      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
943      if (rspmHost != null) {
944        rspmHost.stop(this.abortRequested.get() || this.killed);
945      }
946
947      if (this.killed) {
948        // Just skip out w/o closing regions. Used when testing.
949      } else if (abortRequested.get()) {
950        if (this.dataFsOk) {
951          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
952        }
953        LOG.info("aborting server " + this.serverName);
954      } else {
955        closeUserRegions(abortRequested.get());
956        LOG.info("stopping server " + this.serverName);
957      }
958      regionReplicationBufferManager.stop();
959      closeClusterConnection();
960      // Closing the compactSplit thread before closing meta regions
961      if (!this.killed && containsMetaTableRegions()) {
962        if (!abortRequested.get() || this.dataFsOk) {
963          if (this.compactSplitThread != null) {
964            this.compactSplitThread.join();
965            this.compactSplitThread = null;
966          }
967          closeMetaTableRegions(abortRequested.get());
968        }
969      }
970
971      if (!this.killed && this.dataFsOk) {
972        waitOnAllRegionsToClose(abortRequested.get());
973        LOG.info("stopping server " + this.serverName + "; all regions closed.");
974      }
975
976      // Stop the quota manager
977      if (rsQuotaManager != null) {
978        rsQuotaManager.stop();
979      }
980      if (rsSpaceQuotaManager != null) {
981        rsSpaceQuotaManager.stop();
982        rsSpaceQuotaManager = null;
983      }
984
985      // flag may be changed when closing regions throws exception.
986      if (this.dataFsOk) {
987        shutdownWAL(!abortRequested.get());
988      }
989
990      // Make sure the proxy is down.
991      if (this.rssStub != null) {
992        this.rssStub = null;
993      }
994      if (this.lockStub != null) {
995        this.lockStub = null;
996      }
997      if (this.rpcClient != null) {
998        this.rpcClient.close();
999      }
1000      if (this.leaseManager != null) {
1001        this.leaseManager.close();
1002      }
1003      if (this.pauseMonitor != null) {
1004        this.pauseMonitor.stop();
1005      }
1006
1007      if (!killed) {
1008        stopServiceThreads();
1009      }
1010
1011      if (this.rpcServices != null) {
1012        this.rpcServices.stop();
1013      }
1014
1015      try {
1016        deleteMyEphemeralNode();
1017      } catch (KeeperException.NoNodeException nn) {
1018        // pass
1019      } catch (KeeperException e) {
1020        LOG.warn("Failed deleting my ephemeral node", e);
1021      }
1022      // We may have failed to delete the znode at the previous step, but
1023      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1024      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1025
1026      closeZooKeeper();
1027      closeTableDescriptors();
1028      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1029      span.setStatus(StatusCode.OK);
1030    } finally {
1031      span.end();
1032    }
1033  }
1034
1035  private boolean containsMetaTableRegions() {
1036    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1037  }
1038
1039  private boolean areAllUserRegionsOffline() {
1040    if (getNumberOfOnlineRegions() > 2) {
1041      return false;
1042    }
1043    boolean allUserRegionsOffline = true;
1044    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1045      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1046        allUserRegionsOffline = false;
1047        break;
1048      }
1049    }
1050    return allUserRegionsOffline;
1051  }
1052
1053  /** Returns Current write count for all online regions. */
1054  private long getWriteRequestCount() {
1055    long writeCount = 0;
1056    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1057      writeCount += e.getValue().getWriteRequestsCount();
1058    }
1059    return writeCount;
1060  }
1061
1062  @InterfaceAudience.Private
1063  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1064    throws IOException {
1065    RegionServerStatusService.BlockingInterface rss = rssStub;
1066    if (rss == null) {
1067      // the current server could be stopping.
1068      return;
1069    }
1070    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1071    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1072    try (Scope ignored = span.makeCurrent()) {
1073      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1074      request.setServer(ProtobufUtil.toServerName(this.serverName));
1075      request.setLoad(sl);
1076      rss.regionServerReport(null, request.build());
1077      span.setStatus(StatusCode.OK);
1078    } catch (ServiceException se) {
1079      IOException ioe = ProtobufUtil.getRemoteException(se);
1080      if (ioe instanceof YouAreDeadException) {
1081        // This will be caught and handled as a fatal error in run()
1082        TraceUtil.setError(span, ioe);
1083        throw ioe;
1084      }
1085      if (rssStub == rss) {
1086        rssStub = null;
1087      }
1088      TraceUtil.setError(span, se);
1089      // Couldn't connect to the master, get location from zk and reconnect
1090      // Method blocks until new master is found or we are stopped
1091      createRegionServerStatusStub(true);
1092    } finally {
1093      span.end();
1094    }
1095  }
1096
1097  /**
1098   * Reports the given map of Regions and their size on the filesystem to the active Master.
1099   * @param regionSizeStore The store containing region sizes
1100   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1101   */
1102  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1103    RegionServerStatusService.BlockingInterface rss = rssStub;
1104    if (rss == null) {
1105      // the current server could be stopping.
1106      LOG.trace("Skipping Region size report to HMaster as stub is null");
1107      return true;
1108    }
1109    try {
1110      buildReportAndSend(rss, regionSizeStore);
1111    } catch (ServiceException se) {
1112      IOException ioe = ProtobufUtil.getRemoteException(se);
1113      if (ioe instanceof PleaseHoldException) {
1114        LOG.trace("Failed to report region sizes to Master because it is initializing."
1115          + " This will be retried.", ioe);
1116        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1117        return true;
1118      }
1119      if (rssStub == rss) {
1120        rssStub = null;
1121      }
1122      createRegionServerStatusStub(true);
1123      if (ioe instanceof DoNotRetryIOException) {
1124        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1125        if (doNotRetryEx.getCause() != null) {
1126          Throwable t = doNotRetryEx.getCause();
1127          if (t instanceof UnsupportedOperationException) {
1128            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1129            return false;
1130          }
1131        }
1132      }
1133      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1134    }
1135    return true;
1136  }
1137
1138  /**
1139   * Builds the region size report and sends it to the master. Upon successful sending of the
1140   * report, the region sizes that were sent are marked as sent.
1141   * @param rss             The stub to send to the Master
1142   * @param regionSizeStore The store containing region sizes
1143   */
1144  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1145    RegionSizeStore regionSizeStore) throws ServiceException {
1146    RegionSpaceUseReportRequest request =
1147      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1148    rss.reportRegionSpaceUse(null, request);
1149    // Record the number of size reports sent
1150    if (metricsRegionServer != null) {
1151      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1152    }
1153  }
1154
1155  /**
1156   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1157   * @param regionSizes The size in bytes of regions
1158   * @return The corresponding protocol buffer message.
1159   */
1160  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1161    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1162    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1163      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1164    }
1165    return request.build();
1166  }
1167
1168  /**
1169   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1170   * message.
1171   * @param regionInfo  The RegionInfo
1172   * @param sizeInBytes The size in bytes of the Region
1173   * @return The protocol buffer
1174   */
1175  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1176    return RegionSpaceUse.newBuilder()
1177      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1178      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1179  }
1180
1181  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1182    throws IOException {
1183    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1184    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1185    // the wrapper to compute those numbers in one place.
1186    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1187    // Instead they should be stored in an HBase table so that external visibility into HBase is
1188    // improved; Additionally the load balancer will be able to take advantage of a more complete
1189    // history.
1190    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1191    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1192    long usedMemory = -1L;
1193    long maxMemory = -1L;
1194    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1195    if (usage != null) {
1196      usedMemory = usage.getUsed();
1197      maxMemory = usage.getMax();
1198    }
1199
1200    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1201    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1202    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1203    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1204    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1205    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1206    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1207    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1208    Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder();
1209    for (String coprocessor : coprocessors) {
1210      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1211    }
1212    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1213    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1214    for (HRegion region : regions) {
1215      if (region.getCoprocessorHost() != null) {
1216        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1217        for (String regionCoprocessor : regionCoprocessors) {
1218          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1219        }
1220      }
1221      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1222      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1223        .getCoprocessors()) {
1224        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1225      }
1226    }
1227
1228    getBlockCache().ifPresent(cache -> {
1229      cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1230        regionCachedInfo.forEach((regionName, prefetchSize) -> {
1231          serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB));
1232        });
1233      });
1234    });
1235
1236    serverLoad.setReportStartTime(reportStartTime);
1237    serverLoad.setReportEndTime(reportEndTime);
1238    if (this.infoServer != null) {
1239      serverLoad.setInfoServerPort(this.infoServer.getPort());
1240    } else {
1241      serverLoad.setInfoServerPort(-1);
1242    }
1243    MetricsUserAggregateSource userSource =
1244      metricsRegionServer.getMetricsUserAggregate().getSource();
1245    if (userSource != null) {
1246      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1247      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1248        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1249      }
1250    }
1251
1252    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1253      // always refresh first to get the latest value
1254      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1255      if (rLoad != null) {
1256        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1257        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1258          .getReplicationLoadSourceEntries()) {
1259          serverLoad.addReplLoadSource(rLS);
1260        }
1261      }
1262    } else {
1263      if (replicationSourceHandler != null) {
1264        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1265        if (rLoad != null) {
1266          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1267            .getReplicationLoadSourceEntries()) {
1268            serverLoad.addReplLoadSource(rLS);
1269          }
1270        }
1271      }
1272      if (replicationSinkHandler != null) {
1273        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1274        if (rLoad != null) {
1275          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1276        }
1277      }
1278    }
1279
1280    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1281      .newBuilder().setDescription(task.getDescription())
1282      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1283      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1284      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1285
1286    return serverLoad.build();
1287  }
1288
1289  private String getOnlineRegionsAsPrintableString() {
1290    StringBuilder sb = new StringBuilder();
1291    for (Region r : this.onlineRegions.values()) {
1292      if (sb.length() > 0) {
1293        sb.append(", ");
1294      }
1295      sb.append(r.getRegionInfo().getEncodedName());
1296    }
1297    return sb.toString();
1298  }
1299
1300  /**
1301   * Wait on regions close.
1302   */
1303  private void waitOnAllRegionsToClose(final boolean abort) {
1304    // Wait till all regions are closed before going out.
1305    int lastCount = -1;
1306    long previousLogTime = 0;
1307    Set<String> closedRegions = new HashSet<>();
1308    boolean interrupted = false;
1309    try {
1310      while (!onlineRegions.isEmpty()) {
1311        int count = getNumberOfOnlineRegions();
1312        // Only print a message if the count of regions has changed.
1313        if (count != lastCount) {
1314          // Log every second at most
1315          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1316            previousLogTime = EnvironmentEdgeManager.currentTime();
1317            lastCount = count;
1318            LOG.info("Waiting on " + count + " regions to close");
1319            // Only print out regions still closing if a small number else will
1320            // swamp the log.
1321            if (count < 10 && LOG.isDebugEnabled()) {
1322              LOG.debug("Online Regions=" + this.onlineRegions);
1323            }
1324          }
1325        }
1326        // Ensure all user regions have been sent a close. Use this to
1327        // protect against the case where an open comes in after we start the
1328        // iterator of onlineRegions to close all user regions.
1329        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1330          RegionInfo hri = e.getValue().getRegionInfo();
1331          if (
1332            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1333              && !closedRegions.contains(hri.getEncodedName())
1334          ) {
1335            closedRegions.add(hri.getEncodedName());
1336            // Don't update zk with this close transition; pass false.
1337            closeRegionIgnoreErrors(hri, abort);
1338          }
1339        }
1340        // No regions in RIT, we could stop waiting now.
1341        if (this.regionsInTransitionInRS.isEmpty()) {
1342          if (!onlineRegions.isEmpty()) {
1343            LOG.info("We were exiting though online regions are not empty,"
1344              + " because some regions failed closing");
1345          }
1346          break;
1347        } else {
1348          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1349            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1350        }
1351        if (sleepInterrupted(200)) {
1352          interrupted = true;
1353        }
1354      }
1355    } finally {
1356      if (interrupted) {
1357        Thread.currentThread().interrupt();
1358      }
1359    }
1360  }
1361
1362  private static boolean sleepInterrupted(long millis) {
1363    boolean interrupted = false;
1364    try {
1365      Thread.sleep(millis);
1366    } catch (InterruptedException e) {
1367      LOG.warn("Interrupted while sleeping");
1368      interrupted = true;
1369    }
1370    return interrupted;
1371  }
1372
1373  private void shutdownWAL(final boolean close) {
1374    if (this.walFactory != null) {
1375      try {
1376        if (close) {
1377          walFactory.close();
1378        } else {
1379          walFactory.shutdown();
1380        }
1381      } catch (Throwable e) {
1382        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1383        LOG.error("Shutdown / close of WAL failed: " + e);
1384        LOG.debug("Shutdown / close exception details:", e);
1385      }
1386    }
1387  }
1388
1389  /**
1390   * Run init. Sets up wal and starts up all server threads.
1391   * @param c Extra configuration.
1392   */
1393  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1394    throws IOException {
1395    try {
1396      boolean updateRootDir = false;
1397      for (NameStringPair e : c.getMapEntriesList()) {
1398        String key = e.getName();
1399        // The hostname the master sees us as.
1400        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1401          String hostnameFromMasterPOV = e.getValue();
1402          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1403            rpcServices.getSocketAddress().getPort(), this.startcode);
1404          String expectedHostName = rpcServices.getSocketAddress().getHostName();
1405          // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it
1406          // is set to disable. so we will use the ip of the RegionServer to compare with the
1407          // hostname passed by the Master, see HBASE-27304 for details.
1408          if (
1409            StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent()
1410              && InetAddresses.isInetAddress(getActiveMaster().get().getHostname())
1411          ) {
1412            expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress();
1413          }
1414          boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead)
1415            ? hostnameFromMasterPOV.equals(expectedHostName)
1416            : hostnameFromMasterPOV.equals(useThisHostnameInstead);
1417          if (!isHostnameConsist) {
1418            String msg = "Master passed us a different hostname to use; was="
1419              + (StringUtils.isBlank(useThisHostnameInstead)
1420                ? rpcServices.getSocketAddress().getHostName()
1421                : this.useThisHostnameInstead)
1422              + ", but now=" + hostnameFromMasterPOV;
1423            LOG.error(msg);
1424            throw new IOException(msg);
1425          }
1426          continue;
1427        }
1428
1429        String value = e.getValue();
1430        if (key.equals(HConstants.HBASE_DIR)) {
1431          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1432            updateRootDir = true;
1433          }
1434        }
1435
1436        if (LOG.isDebugEnabled()) {
1437          LOG.debug("Config from master: " + key + "=" + value);
1438        }
1439        this.conf.set(key, value);
1440      }
1441      // Set our ephemeral znode up in zookeeper now we have a name.
1442      createMyEphemeralNode();
1443
1444      if (updateRootDir) {
1445        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1446        initializeFileSystem();
1447      }
1448
1449      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1450      // config param for task trackers, but we can piggyback off of it.
1451      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1452        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1453      }
1454
1455      // Save it in a file, this will allow to see if we crash
1456      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1457
1458      // This call sets up an initialized replication and WAL. Later we start it up.
1459      setupWALAndReplication();
1460      // Init in here rather than in constructor after thread name has been set
1461      final MetricsTable metricsTable =
1462        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1463      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1464      this.metricsRegionServer =
1465        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1466      // Now that we have a metrics source, start the pause monitor
1467      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1468      pauseMonitor.start();
1469
1470      // There is a rare case where we do NOT want services to start. Check config.
1471      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1472        startServices();
1473      }
1474      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1475      // or make sense of it.
1476      startReplicationService();
1477
1478      // Set up ZK
1479      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress()
1480        + ", sessionid=0x"
1481        + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1482
1483      // Wake up anyone waiting for this server to online
1484      synchronized (online) {
1485        online.set(true);
1486        online.notifyAll();
1487      }
1488    } catch (Throwable e) {
1489      stop("Failed initialization");
1490      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1491    } finally {
1492      sleeper.skipSleepCycle();
1493    }
1494  }
1495
1496  private void startHeapMemoryManager() {
1497    if (this.blockCache != null) {
1498      this.hMemManager =
1499        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1500      this.hMemManager.start(getChoreService());
1501    }
1502  }
1503
1504  private void createMyEphemeralNode() throws KeeperException {
1505    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1506    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1507    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1508    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1509    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1510  }
1511
1512  private void deleteMyEphemeralNode() throws KeeperException {
1513    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1514  }
1515
1516  @Override
1517  public RegionServerAccounting getRegionServerAccounting() {
1518    return regionServerAccounting;
1519  }
1520
1521  // Round the size with KB or MB.
1522  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1523  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1524  // HBASE-26340 for more details on why this is important.
1525  private static int roundSize(long sizeInByte, int sizeUnit) {
1526    if (sizeInByte == 0) {
1527      return 0;
1528    } else if (sizeInByte < sizeUnit) {
1529      return 1;
1530    } else {
1531      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1532    }
1533  }
1534
1535  private void computeIfPersistentBucketCache(Consumer<BucketCache> computation) {
1536    if (blockCache instanceof CombinedBlockCache) {
1537      BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache();
1538      if (l2 instanceof BucketCache && ((BucketCache) l2).isCachePersistent()) {
1539        computation.accept((BucketCache) l2);
1540      }
1541    }
1542  }
1543
1544  /**
1545   * @param r               Region to get RegionLoad for.
1546   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1547   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1548   * @return RegionLoad instance.
1549   */
1550  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1551    RegionSpecifier.Builder regionSpecifier) throws IOException {
1552    byte[] name = r.getRegionInfo().getRegionName();
1553    String regionEncodedName = r.getRegionInfo().getEncodedName();
1554    int stores = 0;
1555    int storefiles = 0;
1556    int storeRefCount = 0;
1557    int maxCompactedStoreFileRefCount = 0;
1558    long storeUncompressedSize = 0L;
1559    long storefileSize = 0L;
1560    long storefileIndexSize = 0L;
1561    long rootLevelIndexSize = 0L;
1562    long totalStaticIndexSize = 0L;
1563    long totalStaticBloomSize = 0L;
1564    long totalCompactingKVs = 0L;
1565    long currentCompactedKVs = 0L;
1566    long totalRegionSize = 0L;
1567    List<HStore> storeList = r.getStores();
1568    stores += storeList.size();
1569    for (HStore store : storeList) {
1570      storefiles += store.getStorefilesCount();
1571      int currentStoreRefCount = store.getStoreRefCount();
1572      storeRefCount += currentStoreRefCount;
1573      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1574      maxCompactedStoreFileRefCount =
1575        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1576      storeUncompressedSize += store.getStoreSizeUncompressed();
1577      storefileSize += store.getStorefilesSize();
1578      totalRegionSize += store.getHFilesSize();
1579      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1580      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1581      CompactionProgress progress = store.getCompactionProgress();
1582      if (progress != null) {
1583        totalCompactingKVs += progress.getTotalCompactingKVs();
1584        currentCompactedKVs += progress.currentCompactedKVs;
1585      }
1586      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1587      totalStaticIndexSize += store.getTotalStaticIndexSize();
1588      totalStaticBloomSize += store.getTotalStaticBloomSize();
1589    }
1590
1591    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1592    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1593    int storefileSizeMB = roundSize(storefileSize, unitMB);
1594    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1595    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1596    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1597    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1598    int regionSizeMB = roundSize(totalRegionSize, unitMB);
1599    final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f);
1600    getBlockCache().ifPresent(bc -> {
1601      bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1602        if (regionCachedInfo.containsKey(regionEncodedName)) {
1603          currentRegionCachedRatio.setValue(regionSizeMB == 0
1604            ? 0.0f
1605            : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB);
1606        }
1607      });
1608    });
1609
1610    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1611    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1612    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1613    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1614    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1615    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1616    if (regionLoadBldr == null) {
1617      regionLoadBldr = RegionLoad.newBuilder();
1618    }
1619    if (regionSpecifier == null) {
1620      regionSpecifier = RegionSpecifier.newBuilder();
1621    }
1622
1623    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1624    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1625    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1626      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1627      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1628      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1629      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1630      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1631      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1632      .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount())
1633      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1634      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1635      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1636      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1637      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1638      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1639      .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB)
1640      .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue());
1641    r.setCompleteSequenceId(regionLoadBldr);
1642    return regionLoadBldr.build();
1643  }
1644
1645  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1646    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1647    userLoadBldr.setUserName(user);
1648    userSource.getClientMetrics().values().stream()
1649      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1650        .setHostName(clientMetrics.getHostName())
1651        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1652        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1653        .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1654      .forEach(userLoadBldr::addClientMetrics);
1655    return userLoadBldr.build();
1656  }
1657
1658  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1659    HRegion r = onlineRegions.get(encodedRegionName);
1660    return r != null ? createRegionLoad(r, null, null) : null;
1661  }
1662
1663  /**
1664   * Inner class that runs on a long period checking if regions need compaction.
1665   */
1666  private static class CompactionChecker extends ScheduledChore {
1667    private final HRegionServer instance;
1668    private final int majorCompactPriority;
1669    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1670    // Iteration is 1-based rather than 0-based so we don't check for compaction
1671    // immediately upon region server startup
1672    private long iteration = 1;
1673
1674    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1675      super("CompactionChecker", stopper, sleepTime);
1676      this.instance = h;
1677      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1678
1679      /*
1680       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1681       */
1682      this.majorCompactPriority = this.instance.conf
1683        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1684    }
1685
1686    @Override
1687    protected void chore() {
1688      for (HRegion hr : this.instance.onlineRegions.values()) {
1689        // If region is read only or compaction is disabled at table level, there's no need to
1690        // iterate through region's stores
1691        if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {
1692          continue;
1693        }
1694
1695        for (HStore s : hr.stores.values()) {
1696          try {
1697            long multiplier = s.getCompactionCheckMultiplier();
1698            assert multiplier > 0;
1699            if (iteration % multiplier != 0) {
1700              continue;
1701            }
1702            if (s.needsCompaction()) {
1703              // Queue a compaction. Will recognize if major is needed.
1704              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1705                getName() + " requests compaction");
1706            } else if (s.shouldPerformMajorCompaction()) {
1707              s.triggerMajorCompaction();
1708              if (
1709                majorCompactPriority == DEFAULT_PRIORITY
1710                  || majorCompactPriority > hr.getCompactPriority()
1711              ) {
1712                this.instance.compactSplitThread.requestCompaction(hr, s,
1713                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
1714                  CompactionLifeCycleTracker.DUMMY, null);
1715              } else {
1716                this.instance.compactSplitThread.requestCompaction(hr, s,
1717                  getName() + " requests major compaction; use configured priority",
1718                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1719              }
1720            }
1721          } catch (IOException e) {
1722            LOG.warn("Failed major compaction check on " + hr, e);
1723          }
1724        }
1725      }
1726      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1727    }
1728  }
1729
1730  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1731    private final HRegionServer server;
1732    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1733    private final static int MIN_DELAY_TIME = 0; // millisec
1734    private final long rangeOfDelayMs;
1735
1736    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1737      super("MemstoreFlusherChore", server, cacheFlushInterval);
1738      this.server = server;
1739
1740      final long configuredRangeOfDelay = server.getConfiguration()
1741        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1742      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1743    }
1744
1745    @Override
1746    protected void chore() {
1747      final StringBuilder whyFlush = new StringBuilder();
1748      for (HRegion r : this.server.onlineRegions.values()) {
1749        if (r == null) {
1750          continue;
1751        }
1752        if (r.shouldFlush(whyFlush)) {
1753          FlushRequester requester = server.getFlushRequester();
1754          if (requester != null) {
1755            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1756            // Throttle the flushes by putting a delay. If we don't throttle, and there
1757            // is a balanced write-load on the regions in a table, we might end up
1758            // overwhelming the filesystem with too many flushes at once.
1759            if (requester.requestDelayedFlush(r, delay)) {
1760              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
1761                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
1762            }
1763          }
1764        }
1765      }
1766    }
1767  }
1768
1769  /**
1770   * Report the status of the server. A server is online once all the startup is completed (setting
1771   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
1772   * useful in tests.
1773   * @return true if online, false if not.
1774   */
1775  public boolean isOnline() {
1776    return online.get();
1777  }
1778
1779  /**
1780   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1781   * be hooked up to WAL.
1782   */
1783  private void setupWALAndReplication() throws IOException {
1784    WALFactory factory = new WALFactory(conf, serverName, this);
1785    // TODO Replication make assumptions here based on the default filesystem impl
1786    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1787    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1788
1789    Path logDir = new Path(walRootDir, logName);
1790    LOG.debug("logDir={}", logDir);
1791    if (this.walFs.exists(logDir)) {
1792      throw new RegionServerRunningException(
1793        "Region server has already created directory at " + this.serverName.toString());
1794    }
1795    // Always create wal directory as now we need this when master restarts to find out the live
1796    // region servers.
1797    if (!this.walFs.mkdirs(logDir)) {
1798      throw new IOException("Can not create wal directory " + logDir);
1799    }
1800    // Instantiate replication if replication enabled. Pass it the log directories.
1801    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1802
1803    WALActionsListener walEventListener = getWALEventTrackerListener(conf);
1804    if (walEventListener != null && factory.getWALProvider() != null) {
1805      factory.getWALProvider().addWALActionsListener(walEventListener);
1806    }
1807    this.walFactory = factory;
1808  }
1809
1810  private WALActionsListener getWALEventTrackerListener(Configuration conf) {
1811    if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
1812      WALEventTrackerListener listener =
1813        new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
1814      return listener;
1815    }
1816    return null;
1817  }
1818
1819  /**
1820   * Start up replication source and sink handlers.
1821   */
1822  private void startReplicationService() throws IOException {
1823    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1824      this.replicationSourceHandler.startReplicationService();
1825    } else {
1826      if (this.replicationSourceHandler != null) {
1827        this.replicationSourceHandler.startReplicationService();
1828      }
1829      if (this.replicationSinkHandler != null) {
1830        this.replicationSinkHandler.startReplicationService();
1831      }
1832    }
1833  }
1834
1835  /** Returns Master address tracker instance. */
1836  public MasterAddressTracker getMasterAddressTracker() {
1837    return this.masterAddressTracker;
1838  }
1839
1840  /**
1841   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
1842   * to run. This is called after we've successfully registered with the Master. Install an
1843   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
1844   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
1845   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
1846   * to run should trigger same critical condition and the shutdown will run. On its way out, this
1847   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
1848   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
1849   * by this hosting server. Worker logs the exception and exits.
1850   */
1851  private void startServices() throws IOException {
1852    if (!isStopped() && !isAborted()) {
1853      initializeThreads();
1854    }
1855    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1856    this.secureBulkLoadManager.start();
1857
1858    // Health checker thread.
1859    if (isHealthCheckerConfigured()) {
1860      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1861        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1862      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1863    }
1864    // Executor status collect thread.
1865    if (
1866      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1867        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
1868    ) {
1869      int sleepTime =
1870        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1871      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1872        this.metricsRegionServer.getMetricsSource());
1873    }
1874
1875    this.walRoller = new LogRoller(this);
1876    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1877    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1878
1879    // Create the CompactedFileDischarger chore executorService. This chore helps to
1880    // remove the compacted files that will no longer be used in reads.
1881    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1882    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1883    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1884    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
1885    choreService.scheduleChore(compactedFileDischarger);
1886
1887    // Start executor services
1888    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1889    executorService.startExecutorService(executorService.new ExecutorConfig()
1890      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1891    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1892    executorService.startExecutorService(executorService.new ExecutorConfig()
1893      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1894    final int openPriorityRegionThreads =
1895      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1896    executorService.startExecutorService(
1897      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
1898        .setCorePoolSize(openPriorityRegionThreads));
1899    final int closeRegionThreads =
1900      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1901    executorService.startExecutorService(executorService.new ExecutorConfig()
1902      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1903    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1904    executorService.startExecutorService(executorService.new ExecutorConfig()
1905      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1906    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1907      final int storeScannerParallelSeekThreads =
1908        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1909      executorService.startExecutorService(
1910        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
1911          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
1912    }
1913    final int logReplayOpsThreads =
1914      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1915    executorService.startExecutorService(
1916      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
1917        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
1918    // Start the threads for compacted files discharger
1919    final int compactionDischargerThreads =
1920      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1921    executorService.startExecutorService(executorService.new ExecutorConfig()
1922      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
1923      .setCorePoolSize(compactionDischargerThreads));
1924    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1925      final int regionReplicaFlushThreads =
1926        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1927          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1928      executorService.startExecutorService(executorService.new ExecutorConfig()
1929        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
1930        .setCorePoolSize(regionReplicaFlushThreads));
1931    }
1932    final int refreshPeerThreads =
1933      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1934    executorService.startExecutorService(executorService.new ExecutorConfig()
1935      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1936    final int replaySyncReplicationWALThreads =
1937      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1938    executorService.startExecutorService(executorService.new ExecutorConfig()
1939      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
1940      .setCorePoolSize(replaySyncReplicationWALThreads));
1941    final int switchRpcThrottleThreads =
1942      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1943    executorService.startExecutorService(
1944      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
1945        .setCorePoolSize(switchRpcThrottleThreads));
1946    final int claimReplicationQueueThreads =
1947      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1948    executorService.startExecutorService(
1949      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
1950        .setCorePoolSize(claimReplicationQueueThreads));
1951    final int rsSnapshotOperationThreads =
1952      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1953    executorService.startExecutorService(
1954      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
1955        .setCorePoolSize(rsSnapshotOperationThreads));
1956    final int rsFlushOperationThreads =
1957      conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3);
1958    executorService.startExecutorService(executorService.new ExecutorConfig()
1959      .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads));
1960
1961    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1962      uncaughtExceptionHandler);
1963    if (this.cacheFlusher != null) {
1964      this.cacheFlusher.start(uncaughtExceptionHandler);
1965    }
1966    Threads.setDaemonThreadRunning(this.procedureResultReporter,
1967      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1968
1969    if (this.compactionChecker != null) {
1970      choreService.scheduleChore(compactionChecker);
1971    }
1972    if (this.periodicFlusher != null) {
1973      choreService.scheduleChore(periodicFlusher);
1974    }
1975    if (this.healthCheckChore != null) {
1976      choreService.scheduleChore(healthCheckChore);
1977    }
1978    if (this.executorStatusChore != null) {
1979      choreService.scheduleChore(executorStatusChore);
1980    }
1981    if (this.nonceManagerChore != null) {
1982      choreService.scheduleChore(nonceManagerChore);
1983    }
1984    if (this.storefileRefresher != null) {
1985      choreService.scheduleChore(storefileRefresher);
1986    }
1987    if (this.fsUtilizationChore != null) {
1988      choreService.scheduleChore(fsUtilizationChore);
1989    }
1990    if (this.namedQueueServiceChore != null) {
1991      choreService.scheduleChore(namedQueueServiceChore);
1992    }
1993    if (this.brokenStoreFileCleaner != null) {
1994      choreService.scheduleChore(brokenStoreFileCleaner);
1995    }
1996    if (this.rsMobFileCleanerChore != null) {
1997      choreService.scheduleChore(rsMobFileCleanerChore);
1998    }
1999    if (replicationMarkerChore != null) {
2000      LOG.info("Starting replication marker chore");
2001      choreService.scheduleChore(replicationMarkerChore);
2002    }
2003
2004    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2005    // an unhandled exception, it will just exit.
2006    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2007      uncaughtExceptionHandler);
2008
2009    // Create the log splitting worker and start it
2010    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2011    // quite a while inside Connection layer. The worker won't be available for other
2012    // tasks even after current task is preempted after a split task times out.
2013    Configuration sinkConf = HBaseConfiguration.create(conf);
2014    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2015      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2016    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2017      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2018    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2019    if (
2020      this.csm != null
2021        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
2022    ) {
2023      // SplitLogWorker needs csm. If none, don't start this.
2024      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
2025      splitLogWorker.start();
2026      LOG.debug("SplitLogWorker started");
2027    }
2028
2029    // Memstore services.
2030    startHeapMemoryManager();
2031    // Call it after starting HeapMemoryManager.
2032    initializeMemStoreChunkCreator(hMemManager);
2033  }
2034
2035  private void initializeThreads() {
2036    // Cache flushing thread.
2037    this.cacheFlusher = new MemStoreFlusher(conf, this);
2038
2039    // Compaction thread
2040    this.compactSplitThread = new CompactSplit(this);
2041
2042    // Background thread to check for compactions; needed if region has not gotten updates
2043    // in a while. It will take care of not checking too frequently on store-by-store basis.
2044    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2045    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2046    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2047
2048    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2049      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2050    final boolean walEventTrackerEnabled =
2051      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
2052
2053    if (isSlowLogTableEnabled || walEventTrackerEnabled) {
2054      // default chore duration: 10 min
2055      // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
2056      final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
2057        DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
2058
2059      final int namedQueueChoreDuration =
2060        conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
2061      // Considering min of slowLogChoreDuration and namedQueueChoreDuration
2062      int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
2063
2064      namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
2065        this.namedQueueRecorder, this.getConnection());
2066    }
2067
2068    if (this.nonceManager != null) {
2069      // Create the scheduled chore that cleans up nonces.
2070      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2071    }
2072
2073    // Setup the Quota Manager
2074    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2075    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2076
2077    if (QuotaUtil.isQuotaEnabled(conf)) {
2078      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2079    }
2080
2081    boolean onlyMetaRefresh = false;
2082    int storefileRefreshPeriod =
2083      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2084        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2085    if (storefileRefreshPeriod == 0) {
2086      storefileRefreshPeriod =
2087        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2088          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2089      onlyMetaRefresh = true;
2090    }
2091    if (storefileRefreshPeriod > 0) {
2092      this.storefileRefresher =
2093        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
2094    }
2095
2096    int brokenStoreFileCleanerPeriod =
2097      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2098        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2099    int brokenStoreFileCleanerDelay =
2100      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2101        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2102    double brokenStoreFileCleanerDelayJitter =
2103      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2104        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2105    double jitterRate =
2106      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2107    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2108    this.brokenStoreFileCleaner =
2109      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2110        brokenStoreFileCleanerPeriod, this, conf, this);
2111
2112    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
2113
2114    registerConfigurationObservers();
2115    initializeReplicationMarkerChore();
2116  }
2117
2118  private void registerConfigurationObservers() {
2119    // Register Replication if possible, as now we support recreating replication peer storage, for
2120    // migrating across different replication peer storages online
2121    if (replicationSourceHandler instanceof ConfigurationObserver) {
2122      configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
2123    }
2124    if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
2125      configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
2126    }
2127    // Registering the compactSplitThread object with the ConfigurationManager.
2128    configurationManager.registerObserver(this.compactSplitThread);
2129    configurationManager.registerObserver(this.cacheFlusher);
2130    configurationManager.registerObserver(this.rpcServices);
2131    configurationManager.registerObserver(this);
2132  }
2133
2134  /*
2135   * Verify that server is healthy
2136   */
2137  private boolean isHealthy() {
2138    if (!dataFsOk) {
2139      // File system problem
2140      return false;
2141    }
2142    // Verify that all threads are alive
2143    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2144      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2145      && (this.walRoller == null || this.walRoller.isAlive())
2146      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2147      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2148    if (!healthy) {
2149      stop("One or more threads are no longer alive -- stop");
2150    }
2151    return healthy;
2152  }
2153
2154  @Override
2155  public List<WAL> getWALs() {
2156    return walFactory.getWALs();
2157  }
2158
2159  @Override
2160  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2161    WAL wal = walFactory.getWAL(regionInfo);
2162    if (this.walRoller != null) {
2163      this.walRoller.addWAL(wal);
2164    }
2165    return wal;
2166  }
2167
2168  public LogRoller getWalRoller() {
2169    return walRoller;
2170  }
2171
2172  public WALFactory getWalFactory() {
2173    return walFactory;
2174  }
2175
2176  @Override
2177  public void stop(final String msg) {
2178    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2179  }
2180
2181  /**
2182   * Stops the regionserver.
2183   * @param msg   Status message
2184   * @param force True if this is a regionserver abort
2185   * @param user  The user executing the stop request, or null if no user is associated
2186   */
2187  public void stop(final String msg, final boolean force, final User user) {
2188    if (!this.stopped) {
2189      LOG.info("***** STOPPING region server '" + this + "' *****");
2190      if (this.rsHost != null) {
2191        // when forced via abort don't allow CPs to override
2192        try {
2193          this.rsHost.preStop(msg, user);
2194        } catch (IOException ioe) {
2195          if (!force) {
2196            LOG.warn("The region server did not stop", ioe);
2197            return;
2198          }
2199          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2200        }
2201      }
2202      this.stopped = true;
2203      LOG.info("STOPPED: " + msg);
2204      // Wakes run() if it is sleeping
2205      sleeper.skipSleepCycle();
2206    }
2207  }
2208
2209  public void waitForServerOnline() {
2210    while (!isStopped() && !isOnline()) {
2211      synchronized (online) {
2212        try {
2213          online.wait(msgInterval);
2214        } catch (InterruptedException ie) {
2215          Thread.currentThread().interrupt();
2216          break;
2217        }
2218      }
2219    }
2220  }
2221
2222  @Override
2223  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2224    HRegion r = context.getRegion();
2225    long openProcId = context.getOpenProcId();
2226    long masterSystemTime = context.getMasterSystemTime();
2227    rpcServices.checkOpen();
2228    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2229      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2230    // Do checks to see if we need to compact (references or too many files)
2231    // Skip compaction check if region is read only
2232    if (!r.isReadOnly()) {
2233      for (HStore s : r.stores.values()) {
2234        if (s.hasReferences() || s.needsCompaction()) {
2235          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2236        }
2237      }
2238    }
2239    long openSeqNum = r.getOpenSeqNum();
2240    if (openSeqNum == HConstants.NO_SEQNUM) {
2241      // If we opened a region, we should have read some sequence number from it.
2242      LOG.error(
2243        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2244      openSeqNum = 0;
2245    }
2246
2247    // Notify master
2248    if (
2249      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2250        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))
2251    ) {
2252      throw new IOException(
2253        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2254    }
2255
2256    triggerFlushInPrimaryRegion(r);
2257
2258    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2259  }
2260
2261  /**
2262   * Helper method for use in tests. Skip the region transition report when there's no master around
2263   * to receive it.
2264   */
2265  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2266    final TransitionCode code = context.getCode();
2267    final long openSeqNum = context.getOpenSeqNum();
2268    long masterSystemTime = context.getMasterSystemTime();
2269    final RegionInfo[] hris = context.getHris();
2270
2271    if (code == TransitionCode.OPENED) {
2272      Preconditions.checkArgument(hris != null && hris.length == 1);
2273      if (hris[0].isMetaRegion()) {
2274        LOG.warn(
2275          "meta table location is stored in master local store, so we can not skip reporting");
2276        return false;
2277      } else {
2278        try {
2279          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2280            serverName, openSeqNum, masterSystemTime);
2281        } catch (IOException e) {
2282          LOG.info("Failed to update meta", e);
2283          return false;
2284        }
2285      }
2286    }
2287    return true;
2288  }
2289
2290  private ReportRegionStateTransitionRequest
2291    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2292    final TransitionCode code = context.getCode();
2293    final long openSeqNum = context.getOpenSeqNum();
2294    final RegionInfo[] hris = context.getHris();
2295    final long[] procIds = context.getProcIds();
2296
2297    ReportRegionStateTransitionRequest.Builder builder =
2298      ReportRegionStateTransitionRequest.newBuilder();
2299    builder.setServer(ProtobufUtil.toServerName(serverName));
2300    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2301    transition.setTransitionCode(code);
2302    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2303      transition.setOpenSeqNum(openSeqNum);
2304    }
2305    for (RegionInfo hri : hris) {
2306      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2307    }
2308    for (long procId : procIds) {
2309      transition.addProcId(procId);
2310    }
2311
2312    return builder.build();
2313  }
2314
2315  @Override
2316  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2317    if (TEST_SKIP_REPORTING_TRANSITION) {
2318      return skipReportingTransition(context);
2319    }
2320    final ReportRegionStateTransitionRequest request =
2321      createReportRegionStateTransitionRequest(context);
2322
2323    int tries = 0;
2324    long pauseTime = this.retryPauseTime;
2325    // Keep looping till we get an error. We want to send reports even though server is going down.
2326    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2327    // HRegionServer does down.
2328    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2329      RegionServerStatusService.BlockingInterface rss = rssStub;
2330      try {
2331        if (rss == null) {
2332          createRegionServerStatusStub();
2333          continue;
2334        }
2335        ReportRegionStateTransitionResponse response =
2336          rss.reportRegionStateTransition(null, request);
2337        if (response.hasErrorMessage()) {
2338          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2339          break;
2340        }
2341        // Log if we had to retry else don't log unless TRACE. We want to
2342        // know if were successful after an attempt showed in logs as failed.
2343        if (tries > 0 || LOG.isTraceEnabled()) {
2344          LOG.info("TRANSITION REPORTED " + request);
2345        }
2346        // NOTE: Return mid-method!!!
2347        return true;
2348      } catch (ServiceException se) {
2349        IOException ioe = ProtobufUtil.getRemoteException(se);
2350        boolean pause = ioe instanceof ServerNotRunningYetException
2351          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2352        if (pause) {
2353          // Do backoff else we flood the Master with requests.
2354          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2355        } else {
2356          pauseTime = this.retryPauseTime; // Reset.
2357        }
2358        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2359          + tries + ")"
2360          + (pause
2361            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2362            : " immediately."),
2363          ioe);
2364        if (pause) {
2365          Threads.sleep(pauseTime);
2366        }
2367        tries++;
2368        if (rssStub == rss) {
2369          rssStub = null;
2370        }
2371      }
2372    }
2373    return false;
2374  }
2375
2376  /**
2377   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2378   * block this thread. See RegionReplicaFlushHandler for details.
2379   */
2380  private void triggerFlushInPrimaryRegion(final HRegion region) {
2381    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2382      return;
2383    }
2384    TableName tn = region.getTableDescriptor().getTableName();
2385    if (
2386      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2387        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2388        // If the memstore replication not setup, we do not have to wait for observing a flush event
2389        // from primary before starting to serve reads, because gaps from replication is not
2390        // applicable,this logic is from
2391        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2392        // HBASE-13063
2393        !region.getTableDescriptor().hasRegionMemStoreReplication()
2394    ) {
2395      region.setReadsEnabled(true);
2396      return;
2397    }
2398
2399    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2400    // RegionReplicaFlushHandler might reset this.
2401
2402    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2403    if (this.executorService != null) {
2404      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2405    } else {
2406      LOG.info("Executor is null; not running flush of primary region replica for {}",
2407        region.getRegionInfo());
2408    }
2409  }
2410
2411  @InterfaceAudience.Private
2412  public RSRpcServices getRSRpcServices() {
2413    return rpcServices;
2414  }
2415
2416  /**
2417   * Cause the server to exit without closing the regions it is serving, the log it is using and
2418   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2419   * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused
2420   * the abort, or null
2421   */
2422  @Override
2423  public void abort(String reason, Throwable cause) {
2424    if (!setAbortRequested()) {
2425      // Abort already in progress, ignore the new request.
2426      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2427      return;
2428    }
2429    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2430    if (cause != null) {
2431      LOG.error(HBaseMarkers.FATAL, msg, cause);
2432    } else {
2433      LOG.error(HBaseMarkers.FATAL, msg);
2434    }
2435    // HBASE-4014: show list of coprocessors that were loaded to help debug
2436    // regionserver crashes.Note that we're implicitly using
2437    // java.util.HashSet's toString() method to print the coprocessor names.
2438    LOG.error(HBaseMarkers.FATAL,
2439      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2440    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2441    try {
2442      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2443    } catch (MalformedObjectNameException | IOException e) {
2444      LOG.warn("Failed dumping metrics", e);
2445    }
2446
2447    // Do our best to report our abort to the master, but this may not work
2448    try {
2449      if (cause != null) {
2450        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2451      }
2452      // Report to the master but only if we have already registered with the master.
2453      RegionServerStatusService.BlockingInterface rss = rssStub;
2454      if (rss != null && this.serverName != null) {
2455        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2456        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2457        builder.setErrorMessage(msg);
2458        rss.reportRSFatalError(null, builder.build());
2459      }
2460    } catch (Throwable t) {
2461      LOG.warn("Unable to report fatal error to master", t);
2462    }
2463
2464    scheduleAbortTimer();
2465    // shutdown should be run as the internal user
2466    stop(reason, true, null);
2467  }
2468
2469  /*
2470   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2471   * close socket in case want to bring up server on old hostname+port immediately.
2472   */
2473  @InterfaceAudience.Private
2474  protected void kill() {
2475    this.killed = true;
2476    abort("Simulated kill");
2477  }
2478
2479  // Limits the time spent in the shutdown process.
2480  private void scheduleAbortTimer() {
2481    if (this.abortMonitor == null) {
2482      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2483      TimerTask abortTimeoutTask = null;
2484      try {
2485        Constructor<? extends TimerTask> timerTaskCtor =
2486          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2487            .asSubclass(TimerTask.class).getDeclaredConstructor();
2488        timerTaskCtor.setAccessible(true);
2489        abortTimeoutTask = timerTaskCtor.newInstance();
2490      } catch (Exception e) {
2491        LOG.warn("Initialize abort timeout task failed", e);
2492      }
2493      if (abortTimeoutTask != null) {
2494        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2495      }
2496    }
2497  }
2498
2499  /**
2500   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2501   * called.
2502   */
2503  protected void stopServiceThreads() {
2504    // clean up the scheduled chores
2505    stopChoreService();
2506    if (bootstrapNodeManager != null) {
2507      bootstrapNodeManager.stop();
2508    }
2509    if (this.cacheFlusher != null) {
2510      this.cacheFlusher.shutdown();
2511    }
2512    if (this.walRoller != null) {
2513      this.walRoller.close();
2514    }
2515    if (this.compactSplitThread != null) {
2516      this.compactSplitThread.join();
2517    }
2518    stopExecutorService();
2519    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2520      this.replicationSourceHandler.stopReplicationService();
2521    } else {
2522      if (this.replicationSourceHandler != null) {
2523        this.replicationSourceHandler.stopReplicationService();
2524      }
2525      if (this.replicationSinkHandler != null) {
2526        this.replicationSinkHandler.stopReplicationService();
2527      }
2528    }
2529  }
2530
2531  /** Returns Return the object that implements the replication source executorService. */
2532  @Override
2533  public ReplicationSourceService getReplicationSourceService() {
2534    return replicationSourceHandler;
2535  }
2536
2537  /** Returns Return the object that implements the replication sink executorService. */
2538  public ReplicationSinkService getReplicationSinkService() {
2539    return replicationSinkHandler;
2540  }
2541
2542  /**
2543   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2544   * connection, the current rssStub must be null. Method will block until a master is available.
2545   * You can break from this block by requesting the server stop.
2546   * @return master + port, or null if server has been stopped
2547   */
2548  private synchronized ServerName createRegionServerStatusStub() {
2549    // Create RS stub without refreshing the master node from ZK, use cached data
2550    return createRegionServerStatusStub(false);
2551  }
2552
2553  /**
2554   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2555   * connection, the current rssStub must be null. Method will block until a master is available.
2556   * You can break from this block by requesting the server stop.
2557   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2558   * @return master + port, or null if server has been stopped
2559   */
2560  @InterfaceAudience.Private
2561  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2562    if (rssStub != null) {
2563      return masterAddressTracker.getMasterAddress();
2564    }
2565    ServerName sn = null;
2566    long previousLogTime = 0;
2567    RegionServerStatusService.BlockingInterface intRssStub = null;
2568    LockService.BlockingInterface intLockStub = null;
2569    boolean interrupted = false;
2570    try {
2571      while (keepLooping()) {
2572        sn = this.masterAddressTracker.getMasterAddress(refresh);
2573        if (sn == null) {
2574          if (!keepLooping()) {
2575            // give up with no connection.
2576            LOG.debug("No master found and cluster is stopped; bailing out");
2577            return null;
2578          }
2579          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2580            LOG.debug("No master found; retry");
2581            previousLogTime = EnvironmentEdgeManager.currentTime();
2582          }
2583          refresh = true; // let's try pull it from ZK directly
2584          if (sleepInterrupted(200)) {
2585            interrupted = true;
2586          }
2587          continue;
2588        }
2589        try {
2590          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
2591            userProvider.getCurrent(), shortOperationTimeout);
2592          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2593          intLockStub = LockService.newBlockingStub(channel);
2594          break;
2595        } catch (IOException e) {
2596          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2597            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
2598            if (e instanceof ServerNotRunningYetException) {
2599              LOG.info("Master isn't available yet, retrying");
2600            } else {
2601              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2602            }
2603            previousLogTime = EnvironmentEdgeManager.currentTime();
2604          }
2605          if (sleepInterrupted(200)) {
2606            interrupted = true;
2607          }
2608        }
2609      }
2610    } finally {
2611      if (interrupted) {
2612        Thread.currentThread().interrupt();
2613      }
2614    }
2615    this.rssStub = intRssStub;
2616    this.lockStub = intLockStub;
2617    return sn;
2618  }
2619
2620  /**
2621   * @return True if we should break loop because cluster is going down or this server has been
2622   *         stopped or hdfs has gone bad.
2623   */
2624  private boolean keepLooping() {
2625    return !this.stopped && isClusterUp();
2626  }
2627
2628  /*
2629   * Let the master know we're here Run initialization using parameters passed us by the master.
2630   * @return A Map of key/value configurations we got from the Master else null if we failed to
2631   * register.
2632   */
2633  private RegionServerStartupResponse reportForDuty() throws IOException {
2634    if (this.masterless) {
2635      return RegionServerStartupResponse.getDefaultInstance();
2636    }
2637    ServerName masterServerName = createRegionServerStatusStub(true);
2638    RegionServerStatusService.BlockingInterface rss = rssStub;
2639    if (masterServerName == null || rss == null) {
2640      return null;
2641    }
2642    RegionServerStartupResponse result = null;
2643    try {
2644      rpcServices.requestCount.reset();
2645      rpcServices.rpcGetRequestCount.reset();
2646      rpcServices.rpcScanRequestCount.reset();
2647      rpcServices.rpcFullScanRequestCount.reset();
2648      rpcServices.rpcMultiRequestCount.reset();
2649      rpcServices.rpcMutateRequestCount.reset();
2650      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2651        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2652      long now = EnvironmentEdgeManager.currentTime();
2653      int port = rpcServices.getSocketAddress().getPort();
2654      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2655      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2656        request.setUseThisHostnameInstead(useThisHostnameInstead);
2657      }
2658      request.setPort(port);
2659      request.setServerStartCode(this.startcode);
2660      request.setServerCurrentTime(now);
2661      result = rss.regionServerStartup(null, request.build());
2662    } catch (ServiceException se) {
2663      IOException ioe = ProtobufUtil.getRemoteException(se);
2664      if (ioe instanceof ClockOutOfSyncException) {
2665        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
2666        // Re-throw IOE will cause RS to abort
2667        throw ioe;
2668      } else if (ioe instanceof DecommissionedHostRejectedException) {
2669        LOG.error(HBaseMarkers.FATAL,
2670          "Master rejected startup because the host is considered decommissioned", ioe);
2671        // Re-throw IOE will cause RS to abort
2672        throw ioe;
2673      } else if (ioe instanceof ServerNotRunningYetException) {
2674        LOG.debug("Master is not running yet");
2675      } else {
2676        LOG.warn("error telling master we are up", se);
2677      }
2678      rssStub = null;
2679    }
2680    return result;
2681  }
2682
2683  @Override
2684  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2685    try {
2686      GetLastFlushedSequenceIdRequest req =
2687        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2688      RegionServerStatusService.BlockingInterface rss = rssStub;
2689      if (rss == null) { // Try to connect one more time
2690        createRegionServerStatusStub();
2691        rss = rssStub;
2692        if (rss == null) {
2693          // Still no luck, we tried
2694          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2695          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2696            .build();
2697        }
2698      }
2699      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2700      return RegionStoreSequenceIds.newBuilder()
2701        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2702        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2703    } catch (ServiceException e) {
2704      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2705      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2706        .build();
2707    }
2708  }
2709
2710  /**
2711   * Close meta region if we carry it
2712   * @param abort Whether we're running an abort.
2713   */
2714  private void closeMetaTableRegions(final boolean abort) {
2715    HRegion meta = null;
2716    this.onlineRegionsLock.writeLock().lock();
2717    try {
2718      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
2719        RegionInfo hri = e.getValue().getRegionInfo();
2720        if (hri.isMetaRegion()) {
2721          meta = e.getValue();
2722        }
2723        if (meta != null) {
2724          break;
2725        }
2726      }
2727    } finally {
2728      this.onlineRegionsLock.writeLock().unlock();
2729    }
2730    if (meta != null) {
2731      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2732    }
2733  }
2734
2735  /**
2736   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
2737   * close regions that are already closed or that are closing.
2738   * @param abort Whether we're running an abort.
2739   */
2740  private void closeUserRegions(final boolean abort) {
2741    this.onlineRegionsLock.writeLock().lock();
2742    try {
2743      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
2744        HRegion r = e.getValue();
2745        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2746          // Don't update zk with this close transition; pass false.
2747          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2748        }
2749      }
2750    } finally {
2751      this.onlineRegionsLock.writeLock().unlock();
2752    }
2753  }
2754
2755  protected Map<String, HRegion> getOnlineRegions() {
2756    return this.onlineRegions;
2757  }
2758
2759  public int getNumberOfOnlineRegions() {
2760    return this.onlineRegions.size();
2761  }
2762
2763  /**
2764   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
2765   * as client; HRegion cannot be serialized to cross an rpc.
2766   */
2767  public Collection<HRegion> getOnlineRegionsLocalContext() {
2768    Collection<HRegion> regions = this.onlineRegions.values();
2769    return Collections.unmodifiableCollection(regions);
2770  }
2771
2772  @Override
2773  public void addRegion(HRegion region) {
2774    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2775    configurationManager.registerObserver(region);
2776  }
2777
2778  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2779    long size) {
2780    if (!sortedRegions.containsKey(size)) {
2781      sortedRegions.put(size, new ArrayList<>());
2782    }
2783    sortedRegions.get(size).add(region);
2784  }
2785
2786  /**
2787   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2788   *         the biggest.
2789   */
2790  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2791    // we'll sort the regions in reverse
2792    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2793    // Copy over all regions. Regions are sorted by size with biggest first.
2794    for (HRegion region : this.onlineRegions.values()) {
2795      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2796    }
2797    return sortedRegions;
2798  }
2799
2800  /**
2801   * @return A new Map of online regions sorted by region heap size with the first entry being the
2802   *         biggest.
2803   */
2804  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2805    // we'll sort the regions in reverse
2806    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2807    // Copy over all regions. Regions are sorted by size with biggest first.
2808    for (HRegion region : this.onlineRegions.values()) {
2809      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2810    }
2811    return sortedRegions;
2812  }
2813
2814  /** Returns reference to FlushRequester */
2815  @Override
2816  public FlushRequester getFlushRequester() {
2817    return this.cacheFlusher;
2818  }
2819
2820  @Override
2821  public CompactionRequester getCompactionRequestor() {
2822    return this.compactSplitThread;
2823  }
2824
2825  @Override
2826  public LeaseManager getLeaseManager() {
2827    return leaseManager;
2828  }
2829
2830  /** Returns {@code true} when the data file system is available, {@code false} otherwise. */
2831  boolean isDataFileSystemOk() {
2832    return this.dataFsOk;
2833  }
2834
2835  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
2836    return this.rsHost;
2837  }
2838
2839  @Override
2840  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2841    return this.regionsInTransitionInRS;
2842  }
2843
2844  @Override
2845  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2846    return rsQuotaManager;
2847  }
2848
2849  //
2850  // Main program and support routines
2851  //
2852  /**
2853   * Load the replication executorService objects, if any
2854   */
2855  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2856    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2857    // read in the name of the source replication class from the config file.
2858    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2859      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2860
2861    // read in the name of the sink replication class from the config file.
2862    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2863      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2864
2865    // If both the sink and the source class names are the same, then instantiate
2866    // only one object.
2867    if (sourceClassname.equals(sinkClassname)) {
2868      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2869        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2870      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2871      server.sameReplicationSourceAndSink = true;
2872    } else {
2873      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2874        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2875      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2876        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2877      server.sameReplicationSourceAndSink = false;
2878    }
2879  }
2880
2881  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2882    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2883    Path oldLogDir, WALFactory walFactory) throws IOException {
2884    final Class<? extends T> clazz;
2885    try {
2886      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2887      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2888    } catch (java.lang.ClassNotFoundException nfe) {
2889      throw new IOException("Could not find class for " + classname);
2890    }
2891    T service = ReflectionUtils.newInstance(clazz, conf);
2892    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2893    return service;
2894  }
2895
2896  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
2897    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2898    if (!this.isOnline()) {
2899      return walGroupsReplicationStatus;
2900    }
2901    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2902    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2903    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2904    for (ReplicationSourceInterface source : allSources) {
2905      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2906    }
2907    return walGroupsReplicationStatus;
2908  }
2909
2910  /**
2911   * Utility for constructing an instance of the passed HRegionServer class.
2912   */
2913  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
2914    final Configuration conf) {
2915    try {
2916      Constructor<? extends HRegionServer> c =
2917        regionServerClass.getConstructor(Configuration.class);
2918      return c.newInstance(conf);
2919    } catch (Exception e) {
2920      throw new RuntimeException(
2921        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
2922    }
2923  }
2924
2925  /**
2926   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2927   */
2928  public static void main(String[] args) {
2929    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2930    VersionInfo.logVersion();
2931    Configuration conf = HBaseConfiguration.create();
2932    @SuppressWarnings("unchecked")
2933    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2934      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2935
2936    new HRegionServerCommandLine(regionServerClass).doMain(args);
2937  }
2938
2939  /**
2940   * Gets the online regions of the specified table. This method looks at the in-memory
2941   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
2942   * If a region on this table has been closed during a disable, etc., it will not be included in
2943   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
2944   * all the ONLINE regions in the table.
2945   * @param tableName table to limit the scope of the query
2946   * @return Online regions from <code>tableName</code>
2947   */
2948  @Override
2949  public List<HRegion> getRegions(TableName tableName) {
2950    List<HRegion> tableRegions = new ArrayList<>();
2951    synchronized (this.onlineRegions) {
2952      for (HRegion region : this.onlineRegions.values()) {
2953        RegionInfo regionInfo = region.getRegionInfo();
2954        if (regionInfo.getTable().equals(tableName)) {
2955          tableRegions.add(region);
2956        }
2957      }
2958    }
2959    return tableRegions;
2960  }
2961
2962  @Override
2963  public List<HRegion> getRegions() {
2964    List<HRegion> allRegions;
2965    synchronized (this.onlineRegions) {
2966      // Return a clone copy of the onlineRegions
2967      allRegions = new ArrayList<>(onlineRegions.values());
2968    }
2969    return allRegions;
2970  }
2971
2972  /**
2973   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
2974   * @return all the online tables in this RS
2975   */
2976  public Set<TableName> getOnlineTables() {
2977    Set<TableName> tables = new HashSet<>();
2978    synchronized (this.onlineRegions) {
2979      for (Region region : this.onlineRegions.values()) {
2980        tables.add(region.getTableDescriptor().getTableName());
2981      }
2982    }
2983    return tables;
2984  }
2985
2986  public String[] getRegionServerCoprocessors() {
2987    TreeSet<String> coprocessors = new TreeSet<>();
2988    try {
2989      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2990    } catch (IOException exception) {
2991      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
2992        + "skipping.");
2993      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2994    }
2995    Collection<HRegion> regions = getOnlineRegionsLocalContext();
2996    for (HRegion region : regions) {
2997      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2998      try {
2999        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3000      } catch (IOException exception) {
3001        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
3002          + "; skipping.");
3003        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3004      }
3005    }
3006    coprocessors.addAll(rsHost.getCoprocessors());
3007    return coprocessors.toArray(new String[0]);
3008  }
3009
3010  /**
3011   * Try to close the region, logs a warning on failure but continues.
3012   * @param region Region to close
3013   */
3014  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3015    try {
3016      if (!closeRegion(region.getEncodedName(), abort, null)) {
3017        LOG
3018          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
3019      }
3020    } catch (IOException e) {
3021      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
3022        e);
3023    }
3024  }
3025
3026  /**
3027   * Close asynchronously a region, can be called from the master or internally by the regionserver
3028   * when stopping. If called from the master, the region will update the status.
3029   * <p>
3030   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3031   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3032   * </p>
3033   * <p>
3034   * If a close was in progress, this new request will be ignored, and an exception thrown.
3035   * </p>
3036   * <p>
3037   * Provides additional flag to indicate if this region blocks should be evicted from the cache.
3038   * </p>
3039   * @param encodedName Region to close
3040   * @param abort       True if we are aborting
3041   * @param destination Where the Region is being moved too... maybe null if unknown.
3042   * @return True if closed a region.
3043   * @throws NotServingRegionException if the region is not online
3044   */
3045  protected boolean closeRegion(String encodedName, final boolean abort,
3046    final ServerName destination) throws NotServingRegionException {
3047    // Check for permissions to close.
3048    HRegion actualRegion = this.getRegion(encodedName);
3049    // Can be null if we're calling close on a region that's not online
3050    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3051      try {
3052        actualRegion.getCoprocessorHost().preClose(false);
3053      } catch (IOException exp) {
3054        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3055        return false;
3056      }
3057    }
3058
3059    // previous can come back 'null' if not in map.
3060    final Boolean previous =
3061      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
3062
3063    if (Boolean.TRUE.equals(previous)) {
3064      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
3065        + "trying to OPEN. Cancelling OPENING.");
3066      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3067        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3068        // We're going to try to do a standard close then.
3069        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
3070          + " Doing a standard close now");
3071        return closeRegion(encodedName, abort, destination);
3072      }
3073      // Let's get the region from the online region list again
3074      actualRegion = this.getRegion(encodedName);
3075      if (actualRegion == null) { // If already online, we still need to close it.
3076        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3077        // The master deletes the znode when it receives this exception.
3078        throw new NotServingRegionException(
3079          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
3080      }
3081    } else if (previous == null) {
3082      LOG.info("Received CLOSE for {}", encodedName);
3083    } else if (Boolean.FALSE.equals(previous)) {
3084      LOG.info("Received CLOSE for the region: " + encodedName
3085        + ", which we are already trying to CLOSE, but not completed yet");
3086      return true;
3087    }
3088
3089    if (actualRegion == null) {
3090      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3091      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3092      // The master deletes the znode when it receives this exception.
3093      throw new NotServingRegionException(
3094        "The region " + encodedName + " is not online, and is not opening.");
3095    }
3096
3097    CloseRegionHandler crh;
3098    final RegionInfo hri = actualRegion.getRegionInfo();
3099    if (hri.isMetaRegion()) {
3100      crh = new CloseMetaHandler(this, this, hri, abort);
3101    } else {
3102      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3103    }
3104    this.executorService.submit(crh);
3105    return true;
3106  }
3107
3108  /**
3109   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
3110   *         member of the online regions.
3111   */
3112  public HRegion getOnlineRegion(final byte[] regionName) {
3113    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3114    return this.onlineRegions.get(encodedRegionName);
3115  }
3116
3117  @Override
3118  public HRegion getRegion(final String encodedRegionName) {
3119    return this.onlineRegions.get(encodedRegionName);
3120  }
3121
3122  @Override
3123  public boolean removeRegion(final HRegion r, ServerName destination) {
3124    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3125    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3126    if (destination != null) {
3127      long closeSeqNum = r.getMaxFlushedSeqId();
3128      if (closeSeqNum == HConstants.NO_SEQNUM) {
3129        // No edits in WAL for this region; get the sequence number when the region was opened.
3130        closeSeqNum = r.getOpenSeqNum();
3131        if (closeSeqNum == HConstants.NO_SEQNUM) {
3132          closeSeqNum = 0;
3133        }
3134      }
3135      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3136      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3137      if (selfMove) {
3138        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3139          r.getRegionInfo().getEncodedName(),
3140          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3141      }
3142    }
3143    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3144    configurationManager.deregisterObserver(r);
3145    return toReturn != null;
3146  }
3147
3148  /**
3149   * Protected Utility method for safely obtaining an HRegion handle.
3150   * @param regionName Name of online {@link HRegion} to return
3151   * @return {@link HRegion} for <code>regionName</code>
3152   */
3153  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3154    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3155    return getRegionByEncodedName(regionName, encodedRegionName);
3156  }
3157
3158  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3159    return getRegionByEncodedName(null, encodedRegionName);
3160  }
3161
3162  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3163    throws NotServingRegionException {
3164    HRegion region = this.onlineRegions.get(encodedRegionName);
3165    if (region == null) {
3166      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3167      if (moveInfo != null) {
3168        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3169      }
3170      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3171      String regionNameStr =
3172        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3173      if (isOpening != null && isOpening) {
3174        throw new RegionOpeningException(
3175          "Region " + regionNameStr + " is opening on " + this.serverName);
3176      }
3177      throw new NotServingRegionException(
3178        "" + regionNameStr + " is not online on " + this.serverName);
3179    }
3180    return region;
3181  }
3182
3183  /**
3184   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3185   * already.
3186   * @param t   Throwable
3187   * @param msg Message to log in error. Can be null.
3188   * @return Throwable converted to an IOE; methods can only let out IOEs.
3189   */
3190  private Throwable cleanup(final Throwable t, final String msg) {
3191    // Don't log as error if NSRE; NSRE is 'normal' operation.
3192    if (t instanceof NotServingRegionException) {
3193      LOG.debug("NotServingRegionException; " + t.getMessage());
3194      return t;
3195    }
3196    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3197    if (msg == null) {
3198      LOG.error("", e);
3199    } else {
3200      LOG.error(msg, e);
3201    }
3202    if (!rpcServices.checkOOME(t)) {
3203      checkFileSystem();
3204    }
3205    return t;
3206  }
3207
3208  /**
3209   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3210   * @return Make <code>t</code> an IOE if it isn't already.
3211   */
3212  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3213    return (t instanceof IOException ? (IOException) t
3214      : msg == null || msg.length() == 0 ? new IOException(t)
3215      : new IOException(msg, t));
3216  }
3217
3218  /**
3219   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3220   * stopRequested
3221   * @return false if file system is not available
3222   */
3223  boolean checkFileSystem() {
3224    if (this.dataFsOk && this.dataFs != null) {
3225      try {
3226        FSUtils.checkFileSystemAvailable(this.dataFs);
3227      } catch (IOException e) {
3228        abort("File System not available", e);
3229        this.dataFsOk = false;
3230      }
3231    }
3232    return this.dataFsOk;
3233  }
3234
3235  @Override
3236  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3237    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3238    Address[] addr = new Address[favoredNodes.size()];
3239    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3240    // it is a map of region name to Address[]
3241    for (int i = 0; i < favoredNodes.size(); i++) {
3242      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3243    }
3244    regionFavoredNodesMap.put(encodedRegionName, addr);
3245  }
3246
3247  /**
3248   * Return the favored nodes for a region given its encoded name. Look at the comment around
3249   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3250   * @param encodedRegionName the encoded region name.
3251   * @return array of favored locations
3252   */
3253  @Override
3254  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3255    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3256  }
3257
3258  @Override
3259  public ServerNonceManager getNonceManager() {
3260    return this.nonceManager;
3261  }
3262
3263  private static class MovedRegionInfo {
3264    private final ServerName serverName;
3265    private final long seqNum;
3266
3267    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3268      this.serverName = serverName;
3269      this.seqNum = closeSeqNum;
3270    }
3271
3272    public ServerName getServerName() {
3273      return serverName;
3274    }
3275
3276    public long getSeqNum() {
3277      return seqNum;
3278    }
3279  }
3280
3281  /**
3282   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3283   * number of network calls instead of reducing them.
3284   */
3285  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3286
3287  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3288    boolean selfMove) {
3289    if (selfMove) {
3290      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3291      return;
3292    }
3293    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3294      + closeSeqNum);
3295    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3296  }
3297
3298  void removeFromMovedRegions(String encodedName) {
3299    movedRegionInfoCache.invalidate(encodedName);
3300  }
3301
3302  @InterfaceAudience.Private
3303  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3304    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3305  }
3306
3307  @InterfaceAudience.Private
3308  public int movedRegionCacheExpiredTime() {
3309    return TIMEOUT_REGION_MOVED;
3310  }
3311
3312  private String getMyEphemeralNodePath() {
3313    return zooKeeper.getZNodePaths().getRsPath(serverName);
3314  }
3315
3316  private boolean isHealthCheckerConfigured() {
3317    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3318    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3319  }
3320
3321  /** Returns the underlying {@link CompactSplit} for the servers */
3322  public CompactSplit getCompactSplitThread() {
3323    return this.compactSplitThread;
3324  }
3325
3326  CoprocessorServiceResponse execRegionServerService(
3327    @SuppressWarnings("UnusedParameters") final RpcController controller,
3328    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3329    try {
3330      ServerRpcController serviceController = new ServerRpcController();
3331      CoprocessorServiceCall call = serviceRequest.getCall();
3332      String serviceName = call.getServiceName();
3333      Service service = coprocessorServiceHandlers.get(serviceName);
3334      if (service == null) {
3335        throw new UnknownProtocolException(null,
3336          "No registered coprocessor executorService found for " + serviceName);
3337      }
3338      ServiceDescriptor serviceDesc = service.getDescriptorForType();
3339
3340      String methodName = call.getMethodName();
3341      MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3342      if (methodDesc == null) {
3343        throw new UnknownProtocolException(service.getClass(),
3344          "Unknown method " + methodName + " called on executorService " + serviceName);
3345      }
3346
3347      Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3348      final Message.Builder responseBuilder =
3349        service.getResponsePrototype(methodDesc).newBuilderForType();
3350      service.callMethod(methodDesc, serviceController, request, message -> {
3351        if (message != null) {
3352          responseBuilder.mergeFrom(message);
3353        }
3354      });
3355      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3356      if (exception != null) {
3357        throw exception;
3358      }
3359      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3360    } catch (IOException ie) {
3361      throw new ServiceException(ie);
3362    }
3363  }
3364
3365  /**
3366   * May be null if this is a master which not carry table.
3367   * @return The block cache instance used by the regionserver.
3368   */
3369  @Override
3370  public Optional<BlockCache> getBlockCache() {
3371    return Optional.ofNullable(this.blockCache);
3372  }
3373
3374  /**
3375   * May be null if this is a master which not carry table.
3376   * @return The cache for mob files used by the regionserver.
3377   */
3378  @Override
3379  public Optional<MobFileCache> getMobFileCache() {
3380    return Optional.ofNullable(this.mobFileCache);
3381  }
3382
3383  CacheEvictionStats clearRegionBlockCache(Region region) {
3384    long evictedBlocks = 0;
3385
3386    for (Store store : region.getStores()) {
3387      for (StoreFile hFile : store.getStorefiles()) {
3388        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3389      }
3390    }
3391
3392    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3393  }
3394
3395  @Override
3396  public double getCompactionPressure() {
3397    double max = 0;
3398    for (Region region : onlineRegions.values()) {
3399      for (Store store : region.getStores()) {
3400        double normCount = store.getCompactionPressure();
3401        if (normCount > max) {
3402          max = normCount;
3403        }
3404      }
3405    }
3406    return max;
3407  }
3408
3409  @Override
3410  public HeapMemoryManager getHeapMemoryManager() {
3411    return hMemManager;
3412  }
3413
3414  public MemStoreFlusher getMemStoreFlusher() {
3415    return cacheFlusher;
3416  }
3417
3418  /**
3419   * For testing
3420   * @return whether all wal roll request finished for this regionserver
3421   */
3422  @InterfaceAudience.Private
3423  public boolean walRollRequestFinished() {
3424    return this.walRoller.walRollFinished();
3425  }
3426
3427  @Override
3428  public ThroughputController getFlushThroughputController() {
3429    return flushThroughputController;
3430  }
3431
3432  @Override
3433  public double getFlushPressure() {
3434    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3435      // return 0 during RS initialization
3436      return 0.0;
3437    }
3438    return getRegionServerAccounting().getFlushPressure();
3439  }
3440
3441  @Override
3442  public void onConfigurationChange(Configuration newConf) {
3443    ThroughputController old = this.flushThroughputController;
3444    if (old != null) {
3445      old.stop("configuration change");
3446    }
3447    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3448    try {
3449      Superusers.initialize(newConf);
3450    } catch (IOException e) {
3451      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3452    }
3453
3454    // update region server coprocessor if the configuration has changed.
3455    if (
3456      CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
3457        CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)
3458    ) {
3459      LOG.info("Update region server coprocessors because the configuration has changed");
3460      this.rsHost = new RegionServerCoprocessorHost(this, newConf);
3461    }
3462  }
3463
3464  @Override
3465  public MetricsRegionServer getMetrics() {
3466    return metricsRegionServer;
3467  }
3468
3469  @Override
3470  public SecureBulkLoadManager getSecureBulkLoadManager() {
3471    return this.secureBulkLoadManager;
3472  }
3473
3474  @Override
3475  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3476    final Abortable abort) {
3477    final LockServiceClient client =
3478      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3479    return client.regionLock(regionInfo, description, abort);
3480  }
3481
3482  @Override
3483  public void unassign(byte[] regionName) throws IOException {
3484    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3485  }
3486
3487  @Override
3488  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3489    return this.rsSpaceQuotaManager;
3490  }
3491
3492  @Override
3493  public boolean reportFileArchivalForQuotas(TableName tableName,
3494    Collection<Entry<String, Long>> archivedFiles) {
3495    if (TEST_SKIP_REPORTING_TRANSITION) {
3496      return false;
3497    }
3498    RegionServerStatusService.BlockingInterface rss = rssStub;
3499    if (rss == null || rsSpaceQuotaManager == null) {
3500      // the current server could be stopping.
3501      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3502      return false;
3503    }
3504    try {
3505      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3506        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3507      rss.reportFileArchival(null, request);
3508    } catch (ServiceException se) {
3509      IOException ioe = ProtobufUtil.getRemoteException(se);
3510      if (ioe instanceof PleaseHoldException) {
3511        if (LOG.isTraceEnabled()) {
3512          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3513            + " This will be retried.", ioe);
3514        }
3515        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3516        return false;
3517      }
3518      if (rssStub == rss) {
3519        rssStub = null;
3520      }
3521      // re-create the stub if we failed to report the archival
3522      createRegionServerStatusStub(true);
3523      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3524      return false;
3525    }
3526    return true;
3527  }
3528
3529  void executeProcedure(long procId, RSProcedureCallable callable) {
3530    executorService.submit(new RSProcedureHandler(this, procId, callable));
3531  }
3532
3533  public void remoteProcedureComplete(long procId, Throwable error) {
3534    procedureResultReporter.complete(procId, error);
3535  }
3536
3537  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3538    RegionServerStatusService.BlockingInterface rss;
3539    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3540    for (;;) {
3541      rss = rssStub;
3542      if (rss != null) {
3543        break;
3544      }
3545      createRegionServerStatusStub();
3546    }
3547    try {
3548      rss.reportProcedureDone(null, request);
3549    } catch (ServiceException se) {
3550      if (rssStub == rss) {
3551        rssStub = null;
3552      }
3553      throw ProtobufUtil.getRemoteException(se);
3554    }
3555  }
3556
3557  /**
3558   * Will ignore the open/close region procedures which already submitted or executed. When master
3559   * had unfinished open/close region procedure and restarted, new active master may send duplicate
3560   * open/close region request to regionserver. The open/close request is submitted to a thread pool
3561   * and execute. So first need a cache for submitted open/close region procedures. After the
3562   * open/close region request executed and report region transition succeed, cache it in executed
3563   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3564   * transition succeed, master will not send the open/close region request to regionserver again.
3565   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3566   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
3567   * HBASE-22404 for more details.
3568   * @param procId the id of the open/close region procedure
3569   * @return true if the procedure can be submitted.
3570   */
3571  boolean submitRegionProcedure(long procId) {
3572    if (procId == -1) {
3573      return true;
3574    }
3575    // Ignore the region procedures which already submitted.
3576    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3577    if (previous != null) {
3578      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3579      return false;
3580    }
3581    // Ignore the region procedures which already executed.
3582    if (executedRegionProcedures.getIfPresent(procId) != null) {
3583      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3584      return false;
3585    }
3586    return true;
3587  }
3588
3589  /**
3590   * See {@link #submitRegionProcedure(long)}.
3591   * @param procId the id of the open/close region procedure
3592   */
3593  public void finishRegionProcedure(long procId) {
3594    executedRegionProcedures.put(procId, procId);
3595    submittedRegionProcedures.remove(procId);
3596  }
3597
3598  /**
3599   * Force to terminate region server when abort timeout.
3600   */
3601  private static class SystemExitWhenAbortTimeout extends TimerTask {
3602
3603    public SystemExitWhenAbortTimeout() {
3604    }
3605
3606    @Override
3607    public void run() {
3608      LOG.warn("Aborting region server timed out, terminating forcibly"
3609        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
3610        + " Thread dump to stdout.");
3611      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3612      Runtime.getRuntime().halt(1);
3613    }
3614  }
3615
3616  @InterfaceAudience.Private
3617  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3618    return compactedFileDischarger;
3619  }
3620
3621  /**
3622   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3623   * @return pause time
3624   */
3625  @InterfaceAudience.Private
3626  public long getRetryPauseTime() {
3627    return this.retryPauseTime;
3628  }
3629
3630  @Override
3631  public Optional<ServerName> getActiveMaster() {
3632    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3633  }
3634
3635  @Override
3636  public List<ServerName> getBackupMasters() {
3637    return masterAddressTracker.getBackupMasters();
3638  }
3639
3640  @Override
3641  public Iterator<ServerName> getBootstrapNodes() {
3642    return bootstrapNodeManager.getBootstrapNodes().iterator();
3643  }
3644
3645  @Override
3646  public List<HRegionLocation> getMetaLocations() {
3647    return metaRegionLocationCache.getMetaRegionLocations();
3648  }
3649
3650  @Override
3651  protected NamedQueueRecorder createNamedQueueRecord() {
3652    return NamedQueueRecorder.getInstance(conf);
3653  }
3654
3655  @Override
3656  protected boolean clusterMode() {
3657    // this method will be called in the constructor of super class, so we can not return masterless
3658    // directly here, as it will always be false.
3659    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3660  }
3661
3662  @InterfaceAudience.Private
3663  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
3664    return brokenStoreFileCleaner;
3665  }
3666
3667  @InterfaceAudience.Private
3668  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
3669    return rsMobFileCleanerChore;
3670  }
3671
3672  RSSnapshotVerifier getRsSnapshotVerifier() {
3673    return rsSnapshotVerifier;
3674  }
3675
3676  @Override
3677  protected void stopChores() {
3678    shutdownChore(nonceManagerChore);
3679    shutdownChore(compactionChecker);
3680    shutdownChore(compactedFileDischarger);
3681    shutdownChore(periodicFlusher);
3682    shutdownChore(healthCheckChore);
3683    shutdownChore(executorStatusChore);
3684    shutdownChore(storefileRefresher);
3685    shutdownChore(fsUtilizationChore);
3686    shutdownChore(namedQueueServiceChore);
3687    shutdownChore(brokenStoreFileCleaner);
3688    shutdownChore(rsMobFileCleanerChore);
3689    shutdownChore(replicationMarkerChore);
3690  }
3691
3692  @Override
3693  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3694    return regionReplicationBufferManager;
3695  }
3696}