001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
034
035import io.opentelemetry.api.trace.Span;
036import io.opentelemetry.api.trace.StatusCode;
037import io.opentelemetry.context.Scope;
038import java.io.IOException;
039import java.io.PrintWriter;
040import java.lang.management.MemoryUsage;
041import java.lang.reflect.Constructor;
042import java.net.InetSocketAddress;
043import java.time.Duration;
044import java.util.ArrayList;
045import java.util.Collection;
046import java.util.Collections;
047import java.util.Comparator;
048import java.util.HashSet;
049import java.util.Iterator;
050import java.util.List;
051import java.util.Map;
052import java.util.Map.Entry;
053import java.util.Objects;
054import java.util.Optional;
055import java.util.Set;
056import java.util.SortedMap;
057import java.util.Timer;
058import java.util.TimerTask;
059import java.util.TreeMap;
060import java.util.TreeSet;
061import java.util.concurrent.ConcurrentHashMap;
062import java.util.concurrent.ConcurrentMap;
063import java.util.concurrent.ConcurrentSkipListMap;
064import java.util.concurrent.ThreadLocalRandom;
065import java.util.concurrent.TimeUnit;
066import java.util.concurrent.atomic.AtomicBoolean;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.function.Consumer;
069import java.util.stream.Collectors;
070import javax.management.MalformedObjectNameException;
071import javax.servlet.http.HttpServlet;
072import org.apache.commons.lang3.StringUtils;
073import org.apache.commons.lang3.mutable.MutableFloat;
074import org.apache.hadoop.conf.Configuration;
075import org.apache.hadoop.fs.FileSystem;
076import org.apache.hadoop.fs.Path;
077import org.apache.hadoop.hbase.Abortable;
078import org.apache.hadoop.hbase.CacheEvictionStats;
079import org.apache.hadoop.hbase.CallQueueTooBigException;
080import org.apache.hadoop.hbase.ClockOutOfSyncException;
081import org.apache.hadoop.hbase.DoNotRetryIOException;
082import org.apache.hadoop.hbase.ExecutorStatusChore;
083import org.apache.hadoop.hbase.HBaseConfiguration;
084import org.apache.hadoop.hbase.HBaseInterfaceAudience;
085import org.apache.hadoop.hbase.HBaseServerBase;
086import org.apache.hadoop.hbase.HConstants;
087import org.apache.hadoop.hbase.HDFSBlocksDistribution;
088import org.apache.hadoop.hbase.HRegionLocation;
089import org.apache.hadoop.hbase.HealthCheckChore;
090import org.apache.hadoop.hbase.MetaTableAccessor;
091import org.apache.hadoop.hbase.NotServingRegionException;
092import org.apache.hadoop.hbase.PleaseHoldException;
093import org.apache.hadoop.hbase.ScheduledChore;
094import org.apache.hadoop.hbase.ServerName;
095import org.apache.hadoop.hbase.Stoppable;
096import org.apache.hadoop.hbase.TableName;
097import org.apache.hadoop.hbase.YouAreDeadException;
098import org.apache.hadoop.hbase.ZNodeClearer;
099import org.apache.hadoop.hbase.client.ConnectionUtils;
100import org.apache.hadoop.hbase.client.RegionInfo;
101import org.apache.hadoop.hbase.client.RegionInfoBuilder;
102import org.apache.hadoop.hbase.client.locking.EntityLock;
103import org.apache.hadoop.hbase.client.locking.LockServiceClient;
104import org.apache.hadoop.hbase.conf.ConfigurationObserver;
105import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
106import org.apache.hadoop.hbase.exceptions.RegionMovedException;
107import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
108import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
109import org.apache.hadoop.hbase.executor.ExecutorType;
110import org.apache.hadoop.hbase.http.InfoServer;
111import org.apache.hadoop.hbase.io.hfile.BlockCache;
112import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
113import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
114import org.apache.hadoop.hbase.io.hfile.HFile;
115import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
116import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
117import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
118import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
119import org.apache.hadoop.hbase.ipc.RpcClient;
120import org.apache.hadoop.hbase.ipc.RpcServer;
121import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
122import org.apache.hadoop.hbase.ipc.ServerRpcController;
123import org.apache.hadoop.hbase.log.HBaseMarkers;
124import org.apache.hadoop.hbase.mob.MobFileCache;
125import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
126import org.apache.hadoop.hbase.monitoring.TaskMonitor;
127import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
128import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
129import org.apache.hadoop.hbase.net.Address;
130import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
131import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
132import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
133import org.apache.hadoop.hbase.quotas.QuotaUtil;
134import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
135import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
136import org.apache.hadoop.hbase.quotas.RegionSize;
137import org.apache.hadoop.hbase.quotas.RegionSizeStore;
138import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
139import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
140import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
141import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
142import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
143import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
144import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
145import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
146import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
147import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
148import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
149import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
150import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
151import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
152import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
153import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
154import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
155import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
156import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
157import org.apache.hadoop.hbase.security.SecurityConstants;
158import org.apache.hadoop.hbase.security.Superusers;
159import org.apache.hadoop.hbase.security.User;
160import org.apache.hadoop.hbase.security.UserProvider;
161import org.apache.hadoop.hbase.trace.TraceUtil;
162import org.apache.hadoop.hbase.util.Bytes;
163import org.apache.hadoop.hbase.util.CompressionTest;
164import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
165import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
166import org.apache.hadoop.hbase.util.FSUtils;
167import org.apache.hadoop.hbase.util.FutureUtils;
168import org.apache.hadoop.hbase.util.JvmPauseMonitor;
169import org.apache.hadoop.hbase.util.Pair;
170import org.apache.hadoop.hbase.util.RetryCounter;
171import org.apache.hadoop.hbase.util.RetryCounterFactory;
172import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
173import org.apache.hadoop.hbase.util.Threads;
174import org.apache.hadoop.hbase.util.VersionInfo;
175import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
176import org.apache.hadoop.hbase.wal.WAL;
177import org.apache.hadoop.hbase.wal.WALFactory;
178import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
179import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
180import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
181import org.apache.hadoop.hbase.zookeeper.ZKUtil;
182import org.apache.hadoop.ipc.RemoteException;
183import org.apache.hadoop.util.ReflectionUtils;
184import org.apache.yetus.audience.InterfaceAudience;
185import org.apache.zookeeper.KeeperException;
186import org.slf4j.Logger;
187import org.slf4j.LoggerFactory;
188
189import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
190import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
191import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
192import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
193import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
194import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
195import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
196import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
197import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
198import org.apache.hbase.thirdparty.com.google.protobuf.Message;
199import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
200import org.apache.hbase.thirdparty.com.google.protobuf.Service;
201import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
202import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
203import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
204
205import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
206import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
235
236/**
237 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
238 * are many HRegionServers in a single HBase deployment.
239 */
240@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
241@SuppressWarnings({ "deprecation" })
242public class HRegionServer extends HBaseServerBase<RSRpcServices>
243  implements RegionServerServices, LastSequenceId {
244
245  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
246
247  int unitMB = 1024 * 1024;
248  int unitKB = 1024;
249
250  /**
251   * For testing only! Set to true to skip notifying region assignment to master .
252   */
253  @InterfaceAudience.Private
254  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
255  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
256
257  /**
258   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
259   * region action in progress false - if close region action in progress
260   */
261  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
262    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
263
264  /**
265   * Used to cache the open/close region procedures which already submitted. See
266   * {@link #submitRegionProcedure(long)}.
267   */
268  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
269  /**
270   * Used to cache the open/close region procedures which already executed. See
271   * {@link #submitRegionProcedure(long)}.
272   */
273  private final Cache<Long, Long> executedRegionProcedures =
274    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
275
276  /**
277   * Used to cache the moved-out regions
278   */
279  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
280    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
281
282  private MemStoreFlusher cacheFlusher;
283
284  private HeapMemoryManager hMemManager;
285
286  // Replication services. If no replication, this handler will be null.
287  private ReplicationSourceService replicationSourceHandler;
288  private ReplicationSinkService replicationSinkHandler;
289  private boolean sameReplicationSourceAndSink;
290
291  // Compactions
292  private CompactSplit compactSplitThread;
293
294  /**
295   * Map of regions currently being served by this region server. Key is the encoded region name.
296   * All access should be synchronized.
297   */
298  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
299  /**
300   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
301   * need to be a ConcurrentHashMap?
302   */
303  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
304
305  /**
306   * Map of encoded region names to the DataNode locations they should be hosted on We store the
307   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
308   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
309   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
310   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
311   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
312   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
313   * will be fresh when we need it.
314   */
315  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
316
317  private LeaseManager leaseManager;
318
319  private volatile boolean dataFsOk;
320
321  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
322  // Default abort timeout is 1200 seconds for safe
323  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
324  // Will run this task when abort timeout
325  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
326
327  // A state before we go into stopped state. At this stage we're closing user
328  // space regions.
329  private boolean stopping = false;
330  private volatile boolean killed = false;
331
332  private final int threadWakeFrequency;
333
334  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
335  private final int compactionCheckFrequency;
336  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
337  private final int flushCheckFrequency;
338
339  // Stub to do region server status calls against the master.
340  private volatile RegionServerStatusService.BlockingInterface rssStub;
341  private volatile LockService.BlockingInterface lockStub;
342  // RPC client. Used to make the stub above that does region server status checking.
343  private RpcClient rpcClient;
344
345  private UncaughtExceptionHandler uncaughtExceptionHandler;
346
347  private JvmPauseMonitor pauseMonitor;
348
349  private RSSnapshotVerifier rsSnapshotVerifier;
350
351  /** region server process name */
352  public static final String REGIONSERVER = "regionserver";
353
354  private MetricsRegionServer metricsRegionServer;
355  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
356
357  /**
358   * Check for compactions requests.
359   */
360  private ScheduledChore compactionChecker;
361
362  /**
363   * Check for flushes
364   */
365  private ScheduledChore periodicFlusher;
366
367  private volatile WALFactory walFactory;
368
369  private LogRoller walRoller;
370
371  // A thread which calls reportProcedureDone
372  private RemoteProcedureResultReporter procedureResultReporter;
373
374  // flag set after we're done setting up server threads
375  final AtomicBoolean online = new AtomicBoolean(false);
376
377  // master address tracker
378  private final MasterAddressTracker masterAddressTracker;
379
380  // Log Splitting Worker
381  private SplitLogWorker splitLogWorker;
382
383  private final int shortOperationTimeout;
384
385  // Time to pause if master says 'please hold'
386  private final long retryPauseTime;
387
388  private final RegionServerAccounting regionServerAccounting;
389
390  private NamedQueueServiceChore namedQueueServiceChore = null;
391
392  // Block cache
393  private BlockCache blockCache;
394  // The cache for mob files
395  private MobFileCache mobFileCache;
396
397  /** The health check chore. */
398  private HealthCheckChore healthCheckChore;
399
400  /** The Executor status collect chore. */
401  private ExecutorStatusChore executorStatusChore;
402
403  /** The nonce manager chore. */
404  private ScheduledChore nonceManagerChore;
405
406  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
407
408  /**
409   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
410   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
411   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
412   */
413  @Deprecated
414  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
415  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
416    "hbase.regionserver.hostname.disable.master.reversedns";
417
418  /**
419   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
420   * Exception will be thrown if both are used.
421   */
422  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
423  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
424    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
425
426  /**
427   * Unique identifier for the cluster we are a part of.
428   */
429  private String clusterId;
430
431  // chore for refreshing store files for secondary regions
432  private StorefileRefresherChore storefileRefresher;
433
434  private volatile RegionServerCoprocessorHost rsHost;
435
436  private RegionServerProcedureManagerHost rspmHost;
437
438  private RegionServerRpcQuotaManager rsQuotaManager;
439  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
440
441  /**
442   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
443   * case where client doesn't receive the response from a successful operation and retries. We
444   * track the successful ops for some time via a nonce sent by client and handle duplicate
445   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
446   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
447   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
448   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
449   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
450   * recovery during normal region move, so nonces will not be transfered. We can have separate
451   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
452   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
453   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
454   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
455   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
456   * recovered during move.
457   */
458  final ServerNonceManager nonceManager;
459
460  private BrokenStoreFileCleaner brokenStoreFileCleaner;
461
462  private RSMobFileCleanerChore rsMobFileCleanerChore;
463
464  @InterfaceAudience.Private
465  CompactedHFilesDischarger compactedFileDischarger;
466
467  private volatile ThroughputController flushThroughputController;
468
469  private SecureBulkLoadManager secureBulkLoadManager;
470
471  private FileSystemUtilizationChore fsUtilizationChore;
472
473  private BootstrapNodeManager bootstrapNodeManager;
474
475  /**
476   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
477   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
478   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
479   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
480   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
481   */
482  private final boolean masterless;
483  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
484
485  /** regionserver codec list **/
486  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
487
488  // A timer to shutdown the process if abort takes too long
489  private Timer abortMonitor;
490
491  private RegionReplicationBufferManager regionReplicationBufferManager;
492
493  /*
494   * Chore that creates replication marker rows.
495   */
496  private ReplicationMarkerChore replicationMarkerChore;
497
498  // A timer submit requests to the PrefetchExecutor
499  private PrefetchExecutorNotifier prefetchExecutorNotifier;
500
501  /**
502   * Starts a HRegionServer at the default location.
503   * <p/>
504   * Don't start any services or managers in here in the Constructor. Defer till after we register
505   * with the Master as much as possible. See {@link #startServices}.
506   */
507  public HRegionServer(final Configuration conf) throws IOException {
508    super(conf, "RegionServer"); // thread name
509    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
510    try (Scope ignored = span.makeCurrent()) {
511      this.dataFsOk = true;
512      this.masterless = !clusterMode();
513      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
514      HFile.checkHFileVersion(this.conf);
515      checkCodecs(this.conf);
516      FSUtils.setupShortCircuitRead(this.conf);
517
518      // Disable usage of meta replicas in the regionserver
519      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
520      // Config'ed params
521      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
522      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
523      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
524
525      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
526      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
527
528      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
529        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
530
531      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
532        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
533
534      regionServerAccounting = new RegionServerAccounting(conf);
535
536      blockCache = BlockCacheFactory.createBlockCache(conf);
537      mobFileCache = new MobFileCache(conf);
538
539      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
540
541      uncaughtExceptionHandler =
542        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
543
544      // If no master in cluster, skip trying to track one or look for a cluster status.
545      if (!this.masterless) {
546        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
547        masterAddressTracker.start();
548      } else {
549        masterAddressTracker = null;
550      }
551      this.rpcServices.start(zooKeeper);
552      span.setStatus(StatusCode.OK);
553    } catch (Throwable t) {
554      // Make sure we log the exception. HRegionServer is often started via reflection and the
555      // cause of failed startup is lost.
556      TraceUtil.setError(span, t);
557      LOG.error("Failed construction RegionServer", t);
558      throw t;
559    } finally {
560      span.end();
561    }
562  }
563
564  // HMaster should override this method to load the specific config for master
565  @Override
566  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
567    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
568    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
569      if (!StringUtils.isBlank(hostname)) {
570        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
571          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
572          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
573          + UNSAFE_RS_HOSTNAME_KEY + " is used";
574        throw new IOException(msg);
575      } else {
576        return rpcServices.getSocketAddress().getHostName();
577      }
578    } else {
579      return hostname;
580    }
581  }
582
583  @Override
584  protected void login(UserProvider user, String host) throws IOException {
585    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
586      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
587  }
588
589  @Override
590  protected String getProcessName() {
591    return REGIONSERVER;
592  }
593
594  @Override
595  protected RegionServerCoprocessorHost getCoprocessorHost() {
596    return getRegionServerCoprocessorHost();
597  }
598
599  @Override
600  protected boolean canCreateBaseZNode() {
601    return !clusterMode();
602  }
603
604  @Override
605  protected boolean canUpdateTableDescriptor() {
606    return false;
607  }
608
609  @Override
610  protected boolean cacheTableDescriptor() {
611    return false;
612  }
613
614  protected RSRpcServices createRpcServices() throws IOException {
615    return new RSRpcServices(this);
616  }
617
618  @Override
619  protected void configureInfoServer(InfoServer infoServer) {
620    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
621    infoServer.setAttribute(REGIONSERVER, this);
622  }
623
624  @Override
625  protected Class<? extends HttpServlet> getDumpServlet() {
626    return RSDumpServlet.class;
627  }
628
629  /**
630   * Used by {@link RSDumpServlet} to generate debugging information.
631   */
632  public void dumpRowLocks(final PrintWriter out) {
633    StringBuilder sb = new StringBuilder();
634    for (HRegion region : getRegions()) {
635      if (region.getLockedRows().size() > 0) {
636        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
637          sb.setLength(0);
638          sb.append(region.getTableDescriptor().getTableName()).append(",")
639            .append(region.getRegionInfo().getEncodedName()).append(",");
640          sb.append(rowLockContext.toString());
641          out.println(sb);
642        }
643      }
644    }
645  }
646
647  @Override
648  public boolean registerService(Service instance) {
649    // No stacking of instances is allowed for a single executorService name
650    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
651    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
652    if (coprocessorServiceHandlers.containsKey(serviceName)) {
653      LOG.error("Coprocessor executorService " + serviceName
654        + " already registered, rejecting request from " + instance);
655      return false;
656    }
657
658    coprocessorServiceHandlers.put(serviceName, instance);
659    if (LOG.isDebugEnabled()) {
660      LOG.debug(
661        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
662    }
663    return true;
664  }
665
666  /**
667   * Run test on configured codecs to make sure supporting libs are in place.
668   */
669  private static void checkCodecs(final Configuration c) throws IOException {
670    // check to see if the codec list is available:
671    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
672    if (codecs == null) {
673      return;
674    }
675    for (String codec : codecs) {
676      if (!CompressionTest.testCompression(codec)) {
677        throw new IOException(
678          "Compression codec " + codec + " not supported, aborting RS construction");
679      }
680    }
681  }
682
683  public String getClusterId() {
684    return this.clusterId;
685  }
686
687  /**
688   * All initialization needed before we go register with Master.<br>
689   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
690   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
691   */
692  private void preRegistrationInitialization() {
693    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
694    try (Scope ignored = span.makeCurrent()) {
695      initializeZooKeeper();
696      setupClusterConnection();
697      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
698      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
699      // Setup RPC client for master communication
700      this.rpcClient = asyncClusterConnection.getRpcClient();
701      span.setStatus(StatusCode.OK);
702    } catch (Throwable t) {
703      // Call stop if error or process will stick around for ever since server
704      // puts up non-daemon threads.
705      TraceUtil.setError(span, t);
706      this.rpcServices.stop();
707      abort("Initialization of RS failed.  Hence aborting RS.", t);
708    } finally {
709      span.end();
710    }
711  }
712
713  /**
714   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
715   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
716   * <p>
717   * Finally open long-living server short-circuit connection.
718   */
719  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
720      justification = "cluster Id znode read would give us correct response")
721  private void initializeZooKeeper() throws IOException, InterruptedException {
722    // Nothing to do in here if no Master in the mix.
723    if (this.masterless) {
724      return;
725    }
726
727    // Create the master address tracker, register with zk, and start it. Then
728    // block until a master is available. No point in starting up if no master
729    // running.
730    blockAndCheckIfStopped(this.masterAddressTracker);
731
732    // Wait on cluster being up. Master will set this flag up in zookeeper
733    // when ready.
734    blockAndCheckIfStopped(this.clusterStatusTracker);
735
736    // If we are HMaster then the cluster id should have already been set.
737    if (clusterId == null) {
738      // Retrieve clusterId
739      // Since cluster status is now up
740      // ID should have already been set by HMaster
741      try {
742        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
743        if (clusterId == null) {
744          this.abort("Cluster ID has not been set");
745        }
746        LOG.info("ClusterId : " + clusterId);
747      } catch (KeeperException e) {
748        this.abort("Failed to retrieve Cluster ID", e);
749      }
750    }
751
752    if (isStopped() || isAborted()) {
753      return; // No need for further initialization
754    }
755
756    // watch for snapshots and other procedures
757    try {
758      rspmHost = new RegionServerProcedureManagerHost();
759      rspmHost.loadProcedures(conf);
760      rspmHost.initialize(this);
761    } catch (KeeperException e) {
762      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
763    }
764  }
765
766  /**
767   * Utilty method to wait indefinitely on a znode availability while checking if the region server
768   * is shut down
769   * @param tracker znode tracker to use
770   * @throws IOException          any IO exception, plus if the RS is stopped
771   * @throws InterruptedException if the waiting thread is interrupted
772   */
773  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
774    throws IOException, InterruptedException {
775    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
776      if (this.stopped) {
777        throw new IOException("Received the shutdown message while waiting.");
778      }
779    }
780  }
781
782  /** Returns True if the cluster is up. */
783  @Override
784  public boolean isClusterUp() {
785    return this.masterless
786      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
787  }
788
789  private void initializeReplicationMarkerChore() {
790    boolean replicationMarkerEnabled =
791      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
792    // If replication or replication marker is not enabled then return immediately.
793    if (replicationMarkerEnabled) {
794      int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
795        REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
796      replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf);
797    }
798  }
799
800  @Override
801  public boolean isStopping() {
802    return stopping;
803  }
804
805  /**
806   * The HRegionServer sticks in this loop until closed.
807   */
808  @Override
809  public void run() {
810    if (isStopped()) {
811      LOG.info("Skipping run; stopped");
812      return;
813    }
814    try {
815      // Do pre-registration initializations; zookeeper, lease threads, etc.
816      preRegistrationInitialization();
817    } catch (Throwable e) {
818      abort("Fatal exception during initialization", e);
819    }
820
821    try {
822      if (!isStopped() && !isAborted()) {
823        installShutdownHook();
824        // Initialize the RegionServerCoprocessorHost now that our ephemeral
825        // node was created, in case any coprocessors want to use ZooKeeper
826        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
827
828        // Try and register with the Master; tell it we are here. Break if server is stopped or
829        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
830        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
831        // come up.
832        LOG.debug("About to register with Master.");
833        TraceUtil.trace(() -> {
834          RetryCounterFactory rcf =
835            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
836          RetryCounter rc = rcf.create();
837          while (keepLooping()) {
838            RegionServerStartupResponse w = reportForDuty();
839            if (w == null) {
840              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
841              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
842              this.sleeper.sleep(sleepTime);
843            } else {
844              handleReportForDutyResponse(w);
845              break;
846            }
847          }
848        }, "HRegionServer.registerWithMaster");
849      }
850
851      if (!isStopped() && isHealthy()) {
852        TraceUtil.trace(() -> {
853          // start the snapshot handler and other procedure handlers,
854          // since the server is ready to run
855          if (this.rspmHost != null) {
856            this.rspmHost.start();
857          }
858          // Start the Quota Manager
859          if (this.rsQuotaManager != null) {
860            rsQuotaManager.start(getRpcServer().getScheduler());
861          }
862          if (this.rsSpaceQuotaManager != null) {
863            this.rsSpaceQuotaManager.start();
864          }
865        }, "HRegionServer.startup");
866      }
867
868      // We registered with the Master. Go into run mode.
869      long lastMsg = EnvironmentEdgeManager.currentTime();
870      long oldRequestCount = -1;
871      // The main run loop.
872      while (!isStopped() && isHealthy()) {
873        if (!isClusterUp()) {
874          if (onlineRegions.isEmpty()) {
875            stop("Exiting; cluster shutdown set and not carrying any regions");
876          } else if (!this.stopping) {
877            this.stopping = true;
878            LOG.info("Closing user regions");
879            closeUserRegions(isAborted());
880          } else {
881            boolean allUserRegionsOffline = areAllUserRegionsOffline();
882            if (allUserRegionsOffline) {
883              // Set stopped if no more write requests tp meta tables
884              // since last time we went around the loop. Any open
885              // meta regions will be closed on our way out.
886              if (oldRequestCount == getWriteRequestCount()) {
887                stop("Stopped; only catalog regions remaining online");
888                break;
889              }
890              oldRequestCount = getWriteRequestCount();
891            } else {
892              // Make sure all regions have been closed -- some regions may
893              // have not got it because we were splitting at the time of
894              // the call to closeUserRegions.
895              closeUserRegions(this.abortRequested.get());
896            }
897            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
898          }
899        }
900        long now = EnvironmentEdgeManager.currentTime();
901        if ((now - lastMsg) >= msgInterval) {
902          tryRegionServerReport(lastMsg, now);
903          lastMsg = EnvironmentEdgeManager.currentTime();
904        }
905        if (!isStopped() && !isAborted()) {
906          this.sleeper.sleep();
907        }
908      } // for
909    } catch (Throwable t) {
910      if (!rpcServices.checkOOME(t)) {
911        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
912        abort(prefix + t.getMessage(), t);
913      }
914    }
915
916    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
917    try (Scope ignored = span.makeCurrent()) {
918      if (this.leaseManager != null) {
919        this.leaseManager.closeAfterLeasesExpire();
920      }
921      if (this.splitLogWorker != null) {
922        splitLogWorker.stop();
923      }
924      stopInfoServer();
925      // Send cache a shutdown.
926      if (blockCache != null) {
927        blockCache.shutdown();
928      }
929      if (mobFileCache != null) {
930        mobFileCache.shutdown();
931      }
932
933      // Send interrupts to wake up threads if sleeping so they notice shutdown.
934      // TODO: Should we check they are alive? If OOME could have exited already
935      if (this.hMemManager != null) {
936        this.hMemManager.stop();
937      }
938      if (this.cacheFlusher != null) {
939        this.cacheFlusher.interruptIfNecessary();
940      }
941      if (this.compactSplitThread != null) {
942        this.compactSplitThread.interruptIfNecessary();
943      }
944
945      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
946      if (rspmHost != null) {
947        rspmHost.stop(this.abortRequested.get() || this.killed);
948      }
949
950      if (this.killed) {
951        // Just skip out w/o closing regions. Used when testing.
952      } else if (abortRequested.get()) {
953        if (this.dataFsOk) {
954          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
955        }
956        LOG.info("aborting server " + this.serverName);
957      } else {
958        closeUserRegions(abortRequested.get());
959        LOG.info("stopping server " + this.serverName);
960      }
961      regionReplicationBufferManager.stop();
962      closeClusterConnection();
963      // Closing the compactSplit thread before closing meta regions
964      if (!this.killed && containsMetaTableRegions()) {
965        if (!abortRequested.get() || this.dataFsOk) {
966          if (this.compactSplitThread != null) {
967            this.compactSplitThread.join();
968            this.compactSplitThread = null;
969          }
970          closeMetaTableRegions(abortRequested.get());
971        }
972      }
973
974      if (!this.killed && this.dataFsOk) {
975        waitOnAllRegionsToClose(abortRequested.get());
976        LOG.info("stopping server " + this.serverName + "; all regions closed.");
977      }
978
979      // Stop the quota manager
980      if (rsQuotaManager != null) {
981        rsQuotaManager.stop();
982      }
983      if (rsSpaceQuotaManager != null) {
984        rsSpaceQuotaManager.stop();
985        rsSpaceQuotaManager = null;
986      }
987
988      // flag may be changed when closing regions throws exception.
989      if (this.dataFsOk) {
990        shutdownWAL(!abortRequested.get());
991      }
992
993      // Make sure the proxy is down.
994      if (this.rssStub != null) {
995        this.rssStub = null;
996      }
997      if (this.lockStub != null) {
998        this.lockStub = null;
999      }
1000      if (this.rpcClient != null) {
1001        this.rpcClient.close();
1002      }
1003      if (this.leaseManager != null) {
1004        this.leaseManager.close();
1005      }
1006      if (this.pauseMonitor != null) {
1007        this.pauseMonitor.stop();
1008      }
1009
1010      if (!killed) {
1011        stopServiceThreads();
1012      }
1013
1014      if (this.rpcServices != null) {
1015        this.rpcServices.stop();
1016      }
1017
1018      try {
1019        deleteMyEphemeralNode();
1020      } catch (KeeperException.NoNodeException nn) {
1021        // pass
1022      } catch (KeeperException e) {
1023        LOG.warn("Failed deleting my ephemeral node", e);
1024      }
1025      // We may have failed to delete the znode at the previous step, but
1026      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1027      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1028
1029      closeZooKeeper();
1030      closeTableDescriptors();
1031      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1032      span.setStatus(StatusCode.OK);
1033    } finally {
1034      span.end();
1035    }
1036  }
1037
1038  private boolean containsMetaTableRegions() {
1039    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1040  }
1041
1042  private boolean areAllUserRegionsOffline() {
1043    if (getNumberOfOnlineRegions() > 2) {
1044      return false;
1045    }
1046    boolean allUserRegionsOffline = true;
1047    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1048      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1049        allUserRegionsOffline = false;
1050        break;
1051      }
1052    }
1053    return allUserRegionsOffline;
1054  }
1055
1056  /** Returns Current write count for all online regions. */
1057  private long getWriteRequestCount() {
1058    long writeCount = 0;
1059    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1060      writeCount += e.getValue().getWriteRequestsCount();
1061    }
1062    return writeCount;
1063  }
1064
1065  @InterfaceAudience.Private
1066  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1067    throws IOException {
1068    RegionServerStatusService.BlockingInterface rss = rssStub;
1069    if (rss == null) {
1070      // the current server could be stopping.
1071      return;
1072    }
1073    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1074    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1075    try (Scope ignored = span.makeCurrent()) {
1076      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1077      request.setServer(ProtobufUtil.toServerName(this.serverName));
1078      request.setLoad(sl);
1079      rss.regionServerReport(null, request.build());
1080      span.setStatus(StatusCode.OK);
1081    } catch (ServiceException se) {
1082      IOException ioe = ProtobufUtil.getRemoteException(se);
1083      if (ioe instanceof YouAreDeadException) {
1084        // This will be caught and handled as a fatal error in run()
1085        TraceUtil.setError(span, ioe);
1086        throw ioe;
1087      }
1088      if (rssStub == rss) {
1089        rssStub = null;
1090      }
1091      TraceUtil.setError(span, se);
1092      // Couldn't connect to the master, get location from zk and reconnect
1093      // Method blocks until new master is found or we are stopped
1094      createRegionServerStatusStub(true);
1095    } finally {
1096      span.end();
1097    }
1098  }
1099
1100  /**
1101   * Reports the given map of Regions and their size on the filesystem to the active Master.
1102   * @param regionSizeStore The store containing region sizes
1103   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1104   */
1105  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1106    RegionServerStatusService.BlockingInterface rss = rssStub;
1107    if (rss == null) {
1108      // the current server could be stopping.
1109      LOG.trace("Skipping Region size report to HMaster as stub is null");
1110      return true;
1111    }
1112    try {
1113      buildReportAndSend(rss, regionSizeStore);
1114    } catch (ServiceException se) {
1115      IOException ioe = ProtobufUtil.getRemoteException(se);
1116      if (ioe instanceof PleaseHoldException) {
1117        LOG.trace("Failed to report region sizes to Master because it is initializing."
1118          + " This will be retried.", ioe);
1119        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1120        return true;
1121      }
1122      if (rssStub == rss) {
1123        rssStub = null;
1124      }
1125      createRegionServerStatusStub(true);
1126      if (ioe instanceof DoNotRetryIOException) {
1127        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1128        if (doNotRetryEx.getCause() != null) {
1129          Throwable t = doNotRetryEx.getCause();
1130          if (t instanceof UnsupportedOperationException) {
1131            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1132            return false;
1133          }
1134        }
1135      }
1136      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1137    }
1138    return true;
1139  }
1140
1141  /**
1142   * Builds the region size report and sends it to the master. Upon successful sending of the
1143   * report, the region sizes that were sent are marked as sent.
1144   * @param rss             The stub to send to the Master
1145   * @param regionSizeStore The store containing region sizes
1146   */
1147  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1148    RegionSizeStore regionSizeStore) throws ServiceException {
1149    RegionSpaceUseReportRequest request =
1150      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1151    rss.reportRegionSpaceUse(null, request);
1152    // Record the number of size reports sent
1153    if (metricsRegionServer != null) {
1154      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1155    }
1156  }
1157
1158  /**
1159   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1160   * @param regionSizes The size in bytes of regions
1161   * @return The corresponding protocol buffer message.
1162   */
1163  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1164    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1165    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1166      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1167    }
1168    return request.build();
1169  }
1170
1171  /**
1172   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1173   * message.
1174   * @param regionInfo  The RegionInfo
1175   * @param sizeInBytes The size in bytes of the Region
1176   * @return The protocol buffer
1177   */
1178  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1179    return RegionSpaceUse.newBuilder()
1180      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1181      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1182  }
1183
1184  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1185    throws IOException {
1186    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1187    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1188    // the wrapper to compute those numbers in one place.
1189    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1190    // Instead they should be stored in an HBase table so that external visibility into HBase is
1191    // improved; Additionally the load balancer will be able to take advantage of a more complete
1192    // history.
1193    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1194    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1195    long usedMemory = -1L;
1196    long maxMemory = -1L;
1197    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1198    if (usage != null) {
1199      usedMemory = usage.getUsed();
1200      maxMemory = usage.getMax();
1201    }
1202
1203    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1204    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1205    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1206    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1207    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1208    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1209    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1210    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1211    Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder();
1212    for (String coprocessor : coprocessors) {
1213      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1214    }
1215    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1216    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1217    for (HRegion region : regions) {
1218      if (region.getCoprocessorHost() != null) {
1219        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1220        for (String regionCoprocessor : regionCoprocessors) {
1221          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1222        }
1223      }
1224      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1225      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1226        .getCoprocessors()) {
1227        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1228      }
1229    }
1230
1231    getBlockCache().ifPresent(cache -> {
1232      cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1233        regionCachedInfo.forEach((regionName, prefetchSize) -> {
1234          serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB));
1235        });
1236      });
1237    });
1238
1239    serverLoad.setReportStartTime(reportStartTime);
1240    serverLoad.setReportEndTime(reportEndTime);
1241    if (this.infoServer != null) {
1242      serverLoad.setInfoServerPort(this.infoServer.getPort());
1243    } else {
1244      serverLoad.setInfoServerPort(-1);
1245    }
1246    MetricsUserAggregateSource userSource =
1247      metricsRegionServer.getMetricsUserAggregate().getSource();
1248    if (userSource != null) {
1249      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1250      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1251        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1252      }
1253    }
1254
1255    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1256      // always refresh first to get the latest value
1257      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1258      if (rLoad != null) {
1259        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1260        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1261          .getReplicationLoadSourceEntries()) {
1262          serverLoad.addReplLoadSource(rLS);
1263        }
1264      }
1265    } else {
1266      if (replicationSourceHandler != null) {
1267        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1268        if (rLoad != null) {
1269          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1270            .getReplicationLoadSourceEntries()) {
1271            serverLoad.addReplLoadSource(rLS);
1272          }
1273        }
1274      }
1275      if (replicationSinkHandler != null) {
1276        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1277        if (rLoad != null) {
1278          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1279        }
1280      }
1281    }
1282
1283    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1284      .newBuilder().setDescription(task.getDescription())
1285      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1286      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1287      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1288
1289    return serverLoad.build();
1290  }
1291
1292  private String getOnlineRegionsAsPrintableString() {
1293    StringBuilder sb = new StringBuilder();
1294    for (Region r : this.onlineRegions.values()) {
1295      if (sb.length() > 0) {
1296        sb.append(", ");
1297      }
1298      sb.append(r.getRegionInfo().getEncodedName());
1299    }
1300    return sb.toString();
1301  }
1302
1303  /**
1304   * Wait on regions close.
1305   */
1306  private void waitOnAllRegionsToClose(final boolean abort) {
1307    // Wait till all regions are closed before going out.
1308    int lastCount = -1;
1309    long previousLogTime = 0;
1310    Set<String> closedRegions = new HashSet<>();
1311    boolean interrupted = false;
1312    try {
1313      while (!onlineRegions.isEmpty()) {
1314        int count = getNumberOfOnlineRegions();
1315        // Only print a message if the count of regions has changed.
1316        if (count != lastCount) {
1317          // Log every second at most
1318          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1319            previousLogTime = EnvironmentEdgeManager.currentTime();
1320            lastCount = count;
1321            LOG.info("Waiting on " + count + " regions to close");
1322            // Only print out regions still closing if a small number else will
1323            // swamp the log.
1324            if (count < 10 && LOG.isDebugEnabled()) {
1325              LOG.debug("Online Regions=" + this.onlineRegions);
1326            }
1327          }
1328        }
1329        // Ensure all user regions have been sent a close. Use this to
1330        // protect against the case where an open comes in after we start the
1331        // iterator of onlineRegions to close all user regions.
1332        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1333          RegionInfo hri = e.getValue().getRegionInfo();
1334          if (
1335            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1336              && !closedRegions.contains(hri.getEncodedName())
1337          ) {
1338            closedRegions.add(hri.getEncodedName());
1339            // Don't update zk with this close transition; pass false.
1340            closeRegionIgnoreErrors(hri, abort);
1341          }
1342        }
1343        // No regions in RIT, we could stop waiting now.
1344        if (this.regionsInTransitionInRS.isEmpty()) {
1345          if (!onlineRegions.isEmpty()) {
1346            LOG.info("We were exiting though online regions are not empty,"
1347              + " because some regions failed closing");
1348          }
1349          break;
1350        } else {
1351          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1352            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1353        }
1354        if (sleepInterrupted(200)) {
1355          interrupted = true;
1356        }
1357      }
1358    } finally {
1359      if (interrupted) {
1360        Thread.currentThread().interrupt();
1361      }
1362    }
1363  }
1364
1365  private static boolean sleepInterrupted(long millis) {
1366    boolean interrupted = false;
1367    try {
1368      Thread.sleep(millis);
1369    } catch (InterruptedException e) {
1370      LOG.warn("Interrupted while sleeping");
1371      interrupted = true;
1372    }
1373    return interrupted;
1374  }
1375
1376  private void shutdownWAL(final boolean close) {
1377    if (this.walFactory != null) {
1378      try {
1379        if (close) {
1380          walFactory.close();
1381        } else {
1382          walFactory.shutdown();
1383        }
1384      } catch (Throwable e) {
1385        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1386        LOG.error("Shutdown / close of WAL failed: " + e);
1387        LOG.debug("Shutdown / close exception details:", e);
1388      }
1389    }
1390  }
1391
1392  /**
1393   * Run init. Sets up wal and starts up all server threads.
1394   * @param c Extra configuration.
1395   */
1396  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1397    throws IOException {
1398    try {
1399      boolean updateRootDir = false;
1400      for (NameStringPair e : c.getMapEntriesList()) {
1401        String key = e.getName();
1402        // The hostname the master sees us as.
1403        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1404          String hostnameFromMasterPOV = e.getValue();
1405          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1406            rpcServices.getSocketAddress().getPort(), this.startcode);
1407          String expectedHostName = rpcServices.getSocketAddress().getHostName();
1408          // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it
1409          // is set to disable. so we will use the ip of the RegionServer to compare with the
1410          // hostname passed by the Master, see HBASE-27304 for details.
1411          if (
1412            StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent()
1413              && InetAddresses.isInetAddress(getActiveMaster().get().getHostname())
1414          ) {
1415            expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress();
1416          }
1417          boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead)
1418            ? hostnameFromMasterPOV.equals(expectedHostName)
1419            : hostnameFromMasterPOV.equals(useThisHostnameInstead);
1420          if (!isHostnameConsist) {
1421            String msg = "Master passed us a different hostname to use; was="
1422              + (StringUtils.isBlank(useThisHostnameInstead)
1423                ? expectedHostName
1424                : this.useThisHostnameInstead)
1425              + ", but now=" + hostnameFromMasterPOV;
1426            LOG.error(msg);
1427            throw new IOException(msg);
1428          }
1429          continue;
1430        }
1431
1432        String value = e.getValue();
1433        if (key.equals(HConstants.HBASE_DIR)) {
1434          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1435            updateRootDir = true;
1436          }
1437        }
1438
1439        if (LOG.isDebugEnabled()) {
1440          LOG.debug("Config from master: " + key + "=" + value);
1441        }
1442        this.conf.set(key, value);
1443      }
1444      // Set our ephemeral znode up in zookeeper now we have a name.
1445      createMyEphemeralNode();
1446
1447      if (updateRootDir) {
1448        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1449        initializeFileSystem();
1450      }
1451
1452      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1453      // config param for task trackers, but we can piggyback off of it.
1454      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1455        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1456      }
1457
1458      // Save it in a file, this will allow to see if we crash
1459      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1460
1461      // This call sets up an initialized replication and WAL. Later we start it up.
1462      setupWALAndReplication();
1463      // Init in here rather than in constructor after thread name has been set
1464      final MetricsTable metricsTable =
1465        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1466      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1467      this.metricsRegionServer =
1468        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1469      // Now that we have a metrics source, start the pause monitor
1470      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1471      pauseMonitor.start();
1472
1473      // There is a rare case where we do NOT want services to start. Check config.
1474      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1475        startServices();
1476      }
1477      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1478      // or make sense of it.
1479      startReplicationService();
1480
1481      // Set up ZK
1482      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress()
1483        + ", sessionid=0x"
1484        + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1485
1486      // Wake up anyone waiting for this server to online
1487      synchronized (online) {
1488        online.set(true);
1489        online.notifyAll();
1490      }
1491    } catch (Throwable e) {
1492      stop("Failed initialization");
1493      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1494    } finally {
1495      sleeper.skipSleepCycle();
1496    }
1497  }
1498
1499  private void startHeapMemoryManager() {
1500    if (this.blockCache != null) {
1501      this.hMemManager =
1502        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1503      this.hMemManager.start(getChoreService());
1504    }
1505  }
1506
1507  private void createMyEphemeralNode() throws KeeperException {
1508    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1509    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1510    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1511    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1512    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1513  }
1514
1515  private void deleteMyEphemeralNode() throws KeeperException {
1516    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1517  }
1518
1519  @Override
1520  public RegionServerAccounting getRegionServerAccounting() {
1521    return regionServerAccounting;
1522  }
1523
1524  // Round the size with KB or MB.
1525  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1526  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1527  // HBASE-26340 for more details on why this is important.
1528  private static int roundSize(long sizeInByte, int sizeUnit) {
1529    if (sizeInByte == 0) {
1530      return 0;
1531    } else if (sizeInByte < sizeUnit) {
1532      return 1;
1533    } else {
1534      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1535    }
1536  }
1537
1538  private void computeIfPersistentBucketCache(Consumer<BucketCache> computation) {
1539    if (blockCache instanceof CombinedBlockCache) {
1540      BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache();
1541      if (l2 instanceof BucketCache && ((BucketCache) l2).isCachePersistent()) {
1542        computation.accept((BucketCache) l2);
1543      }
1544    }
1545  }
1546
1547  /**
1548   * @param r               Region to get RegionLoad for.
1549   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1550   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1551   * @return RegionLoad instance.
1552   */
1553  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1554    RegionSpecifier.Builder regionSpecifier) throws IOException {
1555    byte[] name = r.getRegionInfo().getRegionName();
1556    String regionEncodedName = r.getRegionInfo().getEncodedName();
1557    int stores = 0;
1558    int storefiles = 0;
1559    int storeRefCount = 0;
1560    int maxCompactedStoreFileRefCount = 0;
1561    long storeUncompressedSize = 0L;
1562    long storefileSize = 0L;
1563    long storefileIndexSize = 0L;
1564    long rootLevelIndexSize = 0L;
1565    long totalStaticIndexSize = 0L;
1566    long totalStaticBloomSize = 0L;
1567    long totalCompactingKVs = 0L;
1568    long currentCompactedKVs = 0L;
1569    long totalRegionSize = 0L;
1570    List<HStore> storeList = r.getStores();
1571    stores += storeList.size();
1572    for (HStore store : storeList) {
1573      storefiles += store.getStorefilesCount();
1574      int currentStoreRefCount = store.getStoreRefCount();
1575      storeRefCount += currentStoreRefCount;
1576      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1577      maxCompactedStoreFileRefCount =
1578        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1579      storeUncompressedSize += store.getStoreSizeUncompressed();
1580      storefileSize += store.getStorefilesSize();
1581      totalRegionSize += store.getHFilesSize();
1582      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1583      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1584      CompactionProgress progress = store.getCompactionProgress();
1585      if (progress != null) {
1586        totalCompactingKVs += progress.getTotalCompactingKVs();
1587        currentCompactedKVs += progress.currentCompactedKVs;
1588      }
1589      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1590      totalStaticIndexSize += store.getTotalStaticIndexSize();
1591      totalStaticBloomSize += store.getTotalStaticBloomSize();
1592    }
1593
1594    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1595    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1596    int storefileSizeMB = roundSize(storefileSize, unitMB);
1597    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1598    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1599    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1600    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1601    int regionSizeMB = roundSize(totalRegionSize, unitMB);
1602    final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f);
1603    getBlockCache().ifPresent(bc -> {
1604      bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1605        if (regionCachedInfo.containsKey(regionEncodedName)) {
1606          currentRegionCachedRatio.setValue(regionSizeMB == 0
1607            ? 0.0f
1608            : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB);
1609        }
1610      });
1611    });
1612
1613    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1614    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1615    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1616    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1617    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1618    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1619    if (regionLoadBldr == null) {
1620      regionLoadBldr = RegionLoad.newBuilder();
1621    }
1622    if (regionSpecifier == null) {
1623      regionSpecifier = RegionSpecifier.newBuilder();
1624    }
1625
1626    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1627    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1628    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1629      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1630      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1631      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1632      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1633      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1634      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1635      .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount())
1636      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1637      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1638      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1639      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1640      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1641      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1642      .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB)
1643      .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue());
1644    r.setCompleteSequenceId(regionLoadBldr);
1645    return regionLoadBldr.build();
1646  }
1647
1648  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1649    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1650    userLoadBldr.setUserName(user);
1651    userSource.getClientMetrics().values().stream()
1652      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1653        .setHostName(clientMetrics.getHostName())
1654        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1655        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1656        .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1657      .forEach(userLoadBldr::addClientMetrics);
1658    return userLoadBldr.build();
1659  }
1660
1661  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1662    HRegion r = onlineRegions.get(encodedRegionName);
1663    return r != null ? createRegionLoad(r, null, null) : null;
1664  }
1665
1666  /**
1667   * Inner class that runs on a long period checking if regions need compaction.
1668   */
1669  private static class CompactionChecker extends ScheduledChore {
1670    private final HRegionServer instance;
1671    private final int majorCompactPriority;
1672    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1673    // Iteration is 1-based rather than 0-based so we don't check for compaction
1674    // immediately upon region server startup
1675    private long iteration = 1;
1676
1677    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1678      super("CompactionChecker", stopper, sleepTime);
1679      this.instance = h;
1680      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1681
1682      /*
1683       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1684       */
1685      this.majorCompactPriority = this.instance.conf
1686        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1687    }
1688
1689    @Override
1690    protected void chore() {
1691      for (HRegion hr : this.instance.onlineRegions.values()) {
1692        // If region is read only or compaction is disabled at table level, there's no need to
1693        // iterate through region's stores
1694        if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {
1695          continue;
1696        }
1697
1698        for (HStore s : hr.stores.values()) {
1699          try {
1700            long multiplier = s.getCompactionCheckMultiplier();
1701            assert multiplier > 0;
1702            if (iteration % multiplier != 0) {
1703              continue;
1704            }
1705            if (s.needsCompaction()) {
1706              // Queue a compaction. Will recognize if major is needed.
1707              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1708                getName() + " requests compaction");
1709            } else if (s.shouldPerformMajorCompaction()) {
1710              s.triggerMajorCompaction();
1711              if (
1712                majorCompactPriority == DEFAULT_PRIORITY
1713                  || majorCompactPriority > hr.getCompactPriority()
1714              ) {
1715                this.instance.compactSplitThread.requestCompaction(hr, s,
1716                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
1717                  CompactionLifeCycleTracker.DUMMY, null);
1718              } else {
1719                this.instance.compactSplitThread.requestCompaction(hr, s,
1720                  getName() + " requests major compaction; use configured priority",
1721                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1722              }
1723            }
1724          } catch (IOException e) {
1725            LOG.warn("Failed major compaction check on " + hr, e);
1726          }
1727        }
1728      }
1729      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1730    }
1731  }
1732
1733  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1734    private final HRegionServer server;
1735    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1736    private final static int MIN_DELAY_TIME = 0; // millisec
1737    private final long rangeOfDelayMs;
1738
1739    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1740      super("MemstoreFlusherChore", server, cacheFlushInterval);
1741      this.server = server;
1742
1743      final long configuredRangeOfDelay = server.getConfiguration()
1744        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1745      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1746    }
1747
1748    @Override
1749    protected void chore() {
1750      final StringBuilder whyFlush = new StringBuilder();
1751      for (HRegion r : this.server.onlineRegions.values()) {
1752        if (r == null) {
1753          continue;
1754        }
1755        if (r.shouldFlush(whyFlush)) {
1756          FlushRequester requester = server.getFlushRequester();
1757          if (requester != null) {
1758            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1759            // Throttle the flushes by putting a delay. If we don't throttle, and there
1760            // is a balanced write-load on the regions in a table, we might end up
1761            // overwhelming the filesystem with too many flushes at once.
1762            if (requester.requestDelayedFlush(r, delay)) {
1763              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
1764                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
1765            }
1766          }
1767        }
1768      }
1769    }
1770  }
1771
1772  /**
1773   * Report the status of the server. A server is online once all the startup is completed (setting
1774   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
1775   * useful in tests.
1776   * @return true if online, false if not.
1777   */
1778  public boolean isOnline() {
1779    return online.get();
1780  }
1781
1782  /**
1783   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1784   * be hooked up to WAL.
1785   */
1786  private void setupWALAndReplication() throws IOException {
1787    WALFactory factory = new WALFactory(conf, serverName, this);
1788    // TODO Replication make assumptions here based on the default filesystem impl
1789    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1790    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1791
1792    Path logDir = new Path(walRootDir, logName);
1793    LOG.debug("logDir={}", logDir);
1794    if (this.walFs.exists(logDir)) {
1795      throw new RegionServerRunningException(
1796        "Region server has already created directory at " + this.serverName.toString());
1797    }
1798    // Always create wal directory as now we need this when master restarts to find out the live
1799    // region servers.
1800    if (!this.walFs.mkdirs(logDir)) {
1801      throw new IOException("Can not create wal directory " + logDir);
1802    }
1803    // Instantiate replication if replication enabled. Pass it the log directories.
1804    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1805
1806    WALActionsListener walEventListener = getWALEventTrackerListener(conf);
1807    if (walEventListener != null && factory.getWALProvider() != null) {
1808      factory.getWALProvider().addWALActionsListener(walEventListener);
1809    }
1810    this.walFactory = factory;
1811  }
1812
1813  private WALActionsListener getWALEventTrackerListener(Configuration conf) {
1814    if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
1815      WALEventTrackerListener listener =
1816        new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
1817      return listener;
1818    }
1819    return null;
1820  }
1821
1822  /**
1823   * Start up replication source and sink handlers.
1824   */
1825  private void startReplicationService() throws IOException {
1826    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1827      this.replicationSourceHandler.startReplicationService();
1828    } else {
1829      if (this.replicationSourceHandler != null) {
1830        this.replicationSourceHandler.startReplicationService();
1831      }
1832      if (this.replicationSinkHandler != null) {
1833        this.replicationSinkHandler.startReplicationService();
1834      }
1835    }
1836  }
1837
1838  /** Returns Master address tracker instance. */
1839  public MasterAddressTracker getMasterAddressTracker() {
1840    return this.masterAddressTracker;
1841  }
1842
1843  /**
1844   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
1845   * to run. This is called after we've successfully registered with the Master. Install an
1846   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
1847   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
1848   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
1849   * to run should trigger same critical condition and the shutdown will run. On its way out, this
1850   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
1851   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
1852   * by this hosting server. Worker logs the exception and exits.
1853   */
1854  private void startServices() throws IOException {
1855    if (!isStopped() && !isAborted()) {
1856      initializeThreads();
1857    }
1858    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1859    this.secureBulkLoadManager.start();
1860
1861    // Health checker thread.
1862    if (isHealthCheckerConfigured()) {
1863      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1864        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1865      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1866    }
1867    // Executor status collect thread.
1868    if (
1869      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1870        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
1871    ) {
1872      int sleepTime =
1873        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1874      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1875        this.metricsRegionServer.getMetricsSource());
1876    }
1877
1878    this.walRoller = new LogRoller(this);
1879    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1880    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1881
1882    // Create the CompactedFileDischarger chore executorService. This chore helps to
1883    // remove the compacted files that will no longer be used in reads.
1884    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1885    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1886    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1887    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
1888    choreService.scheduleChore(compactedFileDischarger);
1889
1890    // Start executor services
1891    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1892    executorService.startExecutorService(executorService.new ExecutorConfig()
1893      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1894    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1895    executorService.startExecutorService(executorService.new ExecutorConfig()
1896      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1897    final int openPriorityRegionThreads =
1898      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1899    executorService.startExecutorService(
1900      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
1901        .setCorePoolSize(openPriorityRegionThreads));
1902    final int closeRegionThreads =
1903      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1904    executorService.startExecutorService(executorService.new ExecutorConfig()
1905      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1906    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1907    executorService.startExecutorService(executorService.new ExecutorConfig()
1908      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1909    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1910      final int storeScannerParallelSeekThreads =
1911        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1912      executorService.startExecutorService(
1913        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
1914          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
1915    }
1916    final int logReplayOpsThreads =
1917      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1918    executorService.startExecutorService(
1919      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
1920        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
1921    // Start the threads for compacted files discharger
1922    final int compactionDischargerThreads =
1923      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1924    executorService.startExecutorService(executorService.new ExecutorConfig()
1925      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
1926      .setCorePoolSize(compactionDischargerThreads));
1927    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1928      final int regionReplicaFlushThreads =
1929        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1930          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1931      executorService.startExecutorService(executorService.new ExecutorConfig()
1932        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
1933        .setCorePoolSize(regionReplicaFlushThreads));
1934    }
1935    final int refreshPeerThreads =
1936      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1937    executorService.startExecutorService(executorService.new ExecutorConfig()
1938      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1939    final int replaySyncReplicationWALThreads =
1940      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1941    executorService.startExecutorService(executorService.new ExecutorConfig()
1942      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
1943      .setCorePoolSize(replaySyncReplicationWALThreads));
1944    final int switchRpcThrottleThreads =
1945      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1946    executorService.startExecutorService(
1947      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
1948        .setCorePoolSize(switchRpcThrottleThreads));
1949    final int claimReplicationQueueThreads =
1950      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1951    executorService.startExecutorService(
1952      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
1953        .setCorePoolSize(claimReplicationQueueThreads));
1954    final int rsSnapshotOperationThreads =
1955      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1956    executorService.startExecutorService(
1957      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
1958        .setCorePoolSize(rsSnapshotOperationThreads));
1959    final int rsFlushOperationThreads =
1960      conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3);
1961    executorService.startExecutorService(executorService.new ExecutorConfig()
1962      .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads));
1963    final int rsRefreshQuotasThreads =
1964      conf.getInt("hbase.regionserver.executor.refresh.quotas.threads", 1);
1965    executorService.startExecutorService(
1966      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS)
1967        .setCorePoolSize(rsRefreshQuotasThreads));
1968
1969    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1970      uncaughtExceptionHandler);
1971    if (this.cacheFlusher != null) {
1972      this.cacheFlusher.start(uncaughtExceptionHandler);
1973    }
1974    Threads.setDaemonThreadRunning(this.procedureResultReporter,
1975      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1976
1977    if (this.compactionChecker != null) {
1978      choreService.scheduleChore(compactionChecker);
1979    }
1980    if (this.periodicFlusher != null) {
1981      choreService.scheduleChore(periodicFlusher);
1982    }
1983    if (this.healthCheckChore != null) {
1984      choreService.scheduleChore(healthCheckChore);
1985    }
1986    if (this.executorStatusChore != null) {
1987      choreService.scheduleChore(executorStatusChore);
1988    }
1989    if (this.nonceManagerChore != null) {
1990      choreService.scheduleChore(nonceManagerChore);
1991    }
1992    if (this.storefileRefresher != null) {
1993      choreService.scheduleChore(storefileRefresher);
1994    }
1995    if (this.fsUtilizationChore != null) {
1996      choreService.scheduleChore(fsUtilizationChore);
1997    }
1998    if (this.namedQueueServiceChore != null) {
1999      choreService.scheduleChore(namedQueueServiceChore);
2000    }
2001    if (this.brokenStoreFileCleaner != null) {
2002      choreService.scheduleChore(brokenStoreFileCleaner);
2003    }
2004    if (this.rsMobFileCleanerChore != null) {
2005      choreService.scheduleChore(rsMobFileCleanerChore);
2006    }
2007    if (replicationMarkerChore != null) {
2008      LOG.info("Starting replication marker chore");
2009      choreService.scheduleChore(replicationMarkerChore);
2010    }
2011
2012    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2013    // an unhandled exception, it will just exit.
2014    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2015      uncaughtExceptionHandler);
2016
2017    // Create the log splitting worker and start it
2018    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2019    // quite a while inside Connection layer. The worker won't be available for other
2020    // tasks even after current task is preempted after a split task times out.
2021    Configuration sinkConf = HBaseConfiguration.create(conf);
2022    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2023      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2024    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2025      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2026    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2027    if (
2028      this.csm != null
2029        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
2030    ) {
2031      // SplitLogWorker needs csm. If none, don't start this.
2032      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
2033      splitLogWorker.start();
2034      LOG.debug("SplitLogWorker started");
2035    }
2036
2037    // Memstore services.
2038    startHeapMemoryManager();
2039    // Call it after starting HeapMemoryManager.
2040    initializeMemStoreChunkCreator(hMemManager);
2041  }
2042
2043  private void initializeThreads() {
2044    // Cache flushing thread.
2045    this.cacheFlusher = new MemStoreFlusher(conf, this);
2046
2047    // Compaction thread
2048    this.compactSplitThread = new CompactSplit(this);
2049
2050    // Prefetch Notifier
2051    this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
2052
2053    // Background thread to check for compactions; needed if region has not gotten updates
2054    // in a while. It will take care of not checking too frequently on store-by-store basis.
2055    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2056    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2057    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2058
2059    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2060      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2061    final boolean walEventTrackerEnabled =
2062      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
2063
2064    if (isSlowLogTableEnabled || walEventTrackerEnabled) {
2065      // default chore duration: 10 min
2066      // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
2067      final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
2068        DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
2069
2070      final int namedQueueChoreDuration =
2071        conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
2072      // Considering min of slowLogChoreDuration and namedQueueChoreDuration
2073      int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
2074
2075      namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
2076        this.namedQueueRecorder, this.getConnection());
2077    }
2078
2079    if (this.nonceManager != null) {
2080      // Create the scheduled chore that cleans up nonces.
2081      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2082    }
2083
2084    // Setup the Quota Manager
2085    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2086    configurationManager.registerObserver(rsQuotaManager);
2087    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2088
2089    if (QuotaUtil.isQuotaEnabled(conf)) {
2090      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2091    }
2092
2093    boolean onlyMetaRefresh = false;
2094    int storefileRefreshPeriod =
2095      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2096        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2097    if (storefileRefreshPeriod == 0) {
2098      storefileRefreshPeriod =
2099        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2100          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2101      onlyMetaRefresh = true;
2102    }
2103    if (storefileRefreshPeriod > 0) {
2104      this.storefileRefresher =
2105        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
2106    }
2107
2108    int brokenStoreFileCleanerPeriod =
2109      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2110        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2111    int brokenStoreFileCleanerDelay =
2112      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2113        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2114    double brokenStoreFileCleanerDelayJitter =
2115      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2116        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2117    double jitterRate =
2118      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2119    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2120    this.brokenStoreFileCleaner =
2121      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2122        brokenStoreFileCleanerPeriod, this, conf, this);
2123
2124    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
2125
2126    registerConfigurationObservers();
2127    initializeReplicationMarkerChore();
2128  }
2129
2130  private void registerConfigurationObservers() {
2131    // Register Replication if possible, as now we support recreating replication peer storage, for
2132    // migrating across different replication peer storages online
2133    if (replicationSourceHandler instanceof ConfigurationObserver) {
2134      configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
2135    }
2136    if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
2137      configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
2138    }
2139    // Registering the compactSplitThread object with the ConfigurationManager.
2140    configurationManager.registerObserver(this.compactSplitThread);
2141    configurationManager.registerObserver(this.cacheFlusher);
2142    configurationManager.registerObserver(this.rpcServices);
2143    configurationManager.registerObserver(this.prefetchExecutorNotifier);
2144    configurationManager.registerObserver(this);
2145  }
2146
2147  /*
2148   * Verify that server is healthy
2149   */
2150  private boolean isHealthy() {
2151    if (!dataFsOk) {
2152      // File system problem
2153      return false;
2154    }
2155    // Verify that all threads are alive
2156    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2157      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2158      && (this.walRoller == null || this.walRoller.isAlive())
2159      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2160      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2161    if (!healthy) {
2162      stop("One or more threads are no longer alive -- stop");
2163    }
2164    return healthy;
2165  }
2166
2167  @Override
2168  public List<WAL> getWALs() {
2169    return walFactory.getWALs();
2170  }
2171
2172  @Override
2173  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2174    WAL wal = walFactory.getWAL(regionInfo);
2175    if (this.walRoller != null) {
2176      this.walRoller.addWAL(wal);
2177    }
2178    return wal;
2179  }
2180
2181  public LogRoller getWalRoller() {
2182    return walRoller;
2183  }
2184
2185  public WALFactory getWalFactory() {
2186    return walFactory;
2187  }
2188
2189  @Override
2190  public void stop(final String msg) {
2191    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2192  }
2193
2194  /**
2195   * Stops the regionserver.
2196   * @param msg   Status message
2197   * @param force True if this is a regionserver abort
2198   * @param user  The user executing the stop request, or null if no user is associated
2199   */
2200  public void stop(final String msg, final boolean force, final User user) {
2201    if (!this.stopped) {
2202      LOG.info("***** STOPPING region server '" + this + "' *****");
2203      if (this.rsHost != null) {
2204        // when forced via abort don't allow CPs to override
2205        try {
2206          this.rsHost.preStop(msg, user);
2207        } catch (IOException ioe) {
2208          if (!force) {
2209            LOG.warn("The region server did not stop", ioe);
2210            return;
2211          }
2212          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2213        }
2214      }
2215      this.stopped = true;
2216      LOG.info("STOPPED: " + msg);
2217      // Wakes run() if it is sleeping
2218      sleeper.skipSleepCycle();
2219    }
2220  }
2221
2222  public void waitForServerOnline() {
2223    while (!isStopped() && !isOnline()) {
2224      synchronized (online) {
2225        try {
2226          online.wait(msgInterval);
2227        } catch (InterruptedException ie) {
2228          Thread.currentThread().interrupt();
2229          break;
2230        }
2231      }
2232    }
2233  }
2234
2235  @Override
2236  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2237    HRegion r = context.getRegion();
2238    long openProcId = context.getOpenProcId();
2239    long masterSystemTime = context.getMasterSystemTime();
2240    long initiatingMasterActiveTime = context.getInitiatingMasterActiveTime();
2241    rpcServices.checkOpen();
2242    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2243      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2244    // Do checks to see if we need to compact (references or too many files)
2245    // Skip compaction check if region is read only
2246    if (!r.isReadOnly()) {
2247      for (HStore s : r.stores.values()) {
2248        if (s.hasReferences() || s.needsCompaction()) {
2249          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2250        }
2251      }
2252    }
2253    long openSeqNum = r.getOpenSeqNum();
2254    if (openSeqNum == HConstants.NO_SEQNUM) {
2255      // If we opened a region, we should have read some sequence number from it.
2256      LOG.error(
2257        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2258      openSeqNum = 0;
2259    }
2260
2261    // Notify master
2262    if (
2263      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2264        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo(), initiatingMasterActiveTime))
2265    ) {
2266      throw new IOException(
2267        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2268    }
2269
2270    triggerFlushInPrimaryRegion(r);
2271
2272    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2273  }
2274
2275  /**
2276   * Helper method for use in tests. Skip the region transition report when there's no master around
2277   * to receive it.
2278   */
2279  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2280    final TransitionCode code = context.getCode();
2281    final long openSeqNum = context.getOpenSeqNum();
2282    long masterSystemTime = context.getMasterSystemTime();
2283    final RegionInfo[] hris = context.getHris();
2284
2285    if (code == TransitionCode.OPENED) {
2286      Preconditions.checkArgument(hris != null && hris.length == 1);
2287      if (hris[0].isMetaRegion()) {
2288        LOG.warn(
2289          "meta table location is stored in master local store, so we can not skip reporting");
2290        return false;
2291      } else {
2292        try {
2293          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2294            serverName, openSeqNum, masterSystemTime);
2295        } catch (IOException e) {
2296          LOG.info("Failed to update meta", e);
2297          return false;
2298        }
2299      }
2300    }
2301    return true;
2302  }
2303
2304  private ReportRegionStateTransitionRequest
2305    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2306    final TransitionCode code = context.getCode();
2307    final long openSeqNum = context.getOpenSeqNum();
2308    final RegionInfo[] hris = context.getHris();
2309    final long[] procIds = context.getProcIds();
2310
2311    ReportRegionStateTransitionRequest.Builder builder =
2312      ReportRegionStateTransitionRequest.newBuilder();
2313    builder.setServer(ProtobufUtil.toServerName(serverName));
2314    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2315    transition.setTransitionCode(code);
2316    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2317      transition.setOpenSeqNum(openSeqNum);
2318    }
2319    for (RegionInfo hri : hris) {
2320      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2321    }
2322    for (long procId : procIds) {
2323      transition.addProcId(procId);
2324    }
2325    transition.setInitiatingMasterActiveTime(context.getInitiatingMasterActiveTime());
2326
2327    return builder.build();
2328  }
2329
2330  @Override
2331  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2332    if (TEST_SKIP_REPORTING_TRANSITION) {
2333      return skipReportingTransition(context);
2334    }
2335    final ReportRegionStateTransitionRequest request =
2336      createReportRegionStateTransitionRequest(context);
2337
2338    int tries = 0;
2339    long pauseTime = this.retryPauseTime;
2340    // Keep looping till we get an error. We want to send reports even though server is going down.
2341    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2342    // HRegionServer does down.
2343    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2344      RegionServerStatusService.BlockingInterface rss = rssStub;
2345      try {
2346        if (rss == null) {
2347          createRegionServerStatusStub();
2348          continue;
2349        }
2350        ReportRegionStateTransitionResponse response =
2351          rss.reportRegionStateTransition(null, request);
2352        if (response.hasErrorMessage()) {
2353          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2354          break;
2355        }
2356        // Log if we had to retry else don't log unless TRACE. We want to
2357        // know if were successful after an attempt showed in logs as failed.
2358        if (tries > 0 || LOG.isTraceEnabled()) {
2359          LOG.info("TRANSITION REPORTED " + request);
2360        }
2361        // NOTE: Return mid-method!!!
2362        return true;
2363      } catch (ServiceException se) {
2364        IOException ioe = ProtobufUtil.getRemoteException(se);
2365        boolean pause = ioe instanceof ServerNotRunningYetException
2366          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2367        if (pause) {
2368          // Do backoff else we flood the Master with requests.
2369          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2370        } else {
2371          pauseTime = this.retryPauseTime; // Reset.
2372        }
2373        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2374          + tries + ")"
2375          + (pause
2376            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2377            : " immediately."),
2378          ioe);
2379        if (pause) {
2380          Threads.sleep(pauseTime);
2381        }
2382        tries++;
2383        if (rssStub == rss) {
2384          rssStub = null;
2385        }
2386      }
2387    }
2388    return false;
2389  }
2390
2391  /**
2392   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2393   * block this thread. See RegionReplicaFlushHandler for details.
2394   */
2395  private void triggerFlushInPrimaryRegion(final HRegion region) {
2396    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2397      return;
2398    }
2399    TableName tn = region.getTableDescriptor().getTableName();
2400    if (
2401      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2402        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2403        // If the memstore replication not setup, we do not have to wait for observing a flush event
2404        // from primary before starting to serve reads, because gaps from replication is not
2405        // applicable,this logic is from
2406        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2407        // HBASE-13063
2408        !region.getTableDescriptor().hasRegionMemStoreReplication()
2409    ) {
2410      region.setReadsEnabled(true);
2411      return;
2412    }
2413
2414    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2415    // RegionReplicaFlushHandler might reset this.
2416
2417    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2418    if (this.executorService != null) {
2419      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2420    } else {
2421      LOG.info("Executor is null; not running flush of primary region replica for {}",
2422        region.getRegionInfo());
2423    }
2424  }
2425
2426  @InterfaceAudience.Private
2427  public RSRpcServices getRSRpcServices() {
2428    return rpcServices;
2429  }
2430
2431  /**
2432   * Cause the server to exit without closing the regions it is serving, the log it is using and
2433   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2434   * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused
2435   * the abort, or null
2436   */
2437  @Override
2438  public void abort(String reason, Throwable cause) {
2439    if (!setAbortRequested()) {
2440      // Abort already in progress, ignore the new request.
2441      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2442      return;
2443    }
2444    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2445    if (cause != null) {
2446      LOG.error(HBaseMarkers.FATAL, msg, cause);
2447    } else {
2448      LOG.error(HBaseMarkers.FATAL, msg);
2449    }
2450    // HBASE-4014: show list of coprocessors that were loaded to help debug
2451    // regionserver crashes.Note that we're implicitly using
2452    // java.util.HashSet's toString() method to print the coprocessor names.
2453    LOG.error(HBaseMarkers.FATAL,
2454      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2455    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2456    try {
2457      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2458    } catch (MalformedObjectNameException | IOException e) {
2459      LOG.warn("Failed dumping metrics", e);
2460    }
2461
2462    // Do our best to report our abort to the master, but this may not work
2463    try {
2464      if (cause != null) {
2465        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2466      }
2467      // Report to the master but only if we have already registered with the master.
2468      RegionServerStatusService.BlockingInterface rss = rssStub;
2469      if (rss != null && this.serverName != null) {
2470        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2471        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2472        builder.setErrorMessage(msg);
2473        rss.reportRSFatalError(null, builder.build());
2474      }
2475    } catch (Throwable t) {
2476      LOG.warn("Unable to report fatal error to master", t);
2477    }
2478
2479    scheduleAbortTimer();
2480    // shutdown should be run as the internal user
2481    stop(reason, true, null);
2482  }
2483
2484  /*
2485   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2486   * close socket in case want to bring up server on old hostname+port immediately.
2487   */
2488  @InterfaceAudience.Private
2489  protected void kill() {
2490    this.killed = true;
2491    abort("Simulated kill");
2492  }
2493
2494  // Limits the time spent in the shutdown process.
2495  private void scheduleAbortTimer() {
2496    if (this.abortMonitor == null) {
2497      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2498      TimerTask abortTimeoutTask = null;
2499      try {
2500        Constructor<? extends TimerTask> timerTaskCtor =
2501          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2502            .asSubclass(TimerTask.class).getDeclaredConstructor();
2503        timerTaskCtor.setAccessible(true);
2504        abortTimeoutTask = timerTaskCtor.newInstance();
2505      } catch (Exception e) {
2506        LOG.warn("Initialize abort timeout task failed", e);
2507      }
2508      if (abortTimeoutTask != null) {
2509        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2510      }
2511    }
2512  }
2513
2514  /**
2515   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2516   * called.
2517   */
2518  protected void stopServiceThreads() {
2519    // clean up the scheduled chores
2520    stopChoreService();
2521    if (bootstrapNodeManager != null) {
2522      bootstrapNodeManager.stop();
2523    }
2524    if (this.cacheFlusher != null) {
2525      this.cacheFlusher.shutdown();
2526    }
2527    if (this.walRoller != null) {
2528      this.walRoller.close();
2529    }
2530    if (this.compactSplitThread != null) {
2531      this.compactSplitThread.join();
2532    }
2533    stopExecutorService();
2534    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2535      this.replicationSourceHandler.stopReplicationService();
2536    } else {
2537      if (this.replicationSourceHandler != null) {
2538        this.replicationSourceHandler.stopReplicationService();
2539      }
2540      if (this.replicationSinkHandler != null) {
2541        this.replicationSinkHandler.stopReplicationService();
2542      }
2543    }
2544  }
2545
2546  /** Returns Return the object that implements the replication source executorService. */
2547  @Override
2548  public ReplicationSourceService getReplicationSourceService() {
2549    return replicationSourceHandler;
2550  }
2551
2552  /** Returns Return the object that implements the replication sink executorService. */
2553  public ReplicationSinkService getReplicationSinkService() {
2554    return replicationSinkHandler;
2555  }
2556
2557  /**
2558   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2559   * connection, the current rssStub must be null. Method will block until a master is available.
2560   * You can break from this block by requesting the server stop.
2561   * @return master + port, or null if server has been stopped
2562   */
2563  private synchronized ServerName createRegionServerStatusStub() {
2564    // Create RS stub without refreshing the master node from ZK, use cached data
2565    return createRegionServerStatusStub(false);
2566  }
2567
2568  /**
2569   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2570   * connection, the current rssStub must be null. Method will block until a master is available.
2571   * You can break from this block by requesting the server stop.
2572   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2573   * @return master + port, or null if server has been stopped
2574   */
2575  @InterfaceAudience.Private
2576  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2577    if (rssStub != null) {
2578      return masterAddressTracker.getMasterAddress();
2579    }
2580    ServerName sn = null;
2581    long previousLogTime = 0;
2582    RegionServerStatusService.BlockingInterface intRssStub = null;
2583    LockService.BlockingInterface intLockStub = null;
2584    boolean interrupted = false;
2585    try {
2586      while (keepLooping()) {
2587        sn = this.masterAddressTracker.getMasterAddress(refresh);
2588        if (sn == null) {
2589          if (!keepLooping()) {
2590            // give up with no connection.
2591            LOG.debug("No master found and cluster is stopped; bailing out");
2592            return null;
2593          }
2594          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2595            LOG.debug("No master found; retry");
2596            previousLogTime = EnvironmentEdgeManager.currentTime();
2597          }
2598          refresh = true; // let's try pull it from ZK directly
2599          if (sleepInterrupted(200)) {
2600            interrupted = true;
2601          }
2602          continue;
2603        }
2604        try {
2605          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
2606            userProvider.getCurrent(), shortOperationTimeout);
2607          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2608          intLockStub = LockService.newBlockingStub(channel);
2609          break;
2610        } catch (IOException e) {
2611          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2612            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
2613            if (e instanceof ServerNotRunningYetException) {
2614              LOG.info("Master isn't available yet, retrying");
2615            } else {
2616              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2617            }
2618            previousLogTime = EnvironmentEdgeManager.currentTime();
2619          }
2620          if (sleepInterrupted(200)) {
2621            interrupted = true;
2622          }
2623        }
2624      }
2625    } finally {
2626      if (interrupted) {
2627        Thread.currentThread().interrupt();
2628      }
2629    }
2630    this.rssStub = intRssStub;
2631    this.lockStub = intLockStub;
2632    return sn;
2633  }
2634
2635  /**
2636   * @return True if we should break loop because cluster is going down or this server has been
2637   *         stopped or hdfs has gone bad.
2638   */
2639  private boolean keepLooping() {
2640    return !this.stopped && isClusterUp();
2641  }
2642
2643  /*
2644   * Let the master know we're here Run initialization using parameters passed us by the master.
2645   * @return A Map of key/value configurations we got from the Master else null if we failed to
2646   * register.
2647   */
2648  private RegionServerStartupResponse reportForDuty() throws IOException {
2649    if (this.masterless) {
2650      return RegionServerStartupResponse.getDefaultInstance();
2651    }
2652    ServerName masterServerName = createRegionServerStatusStub(true);
2653    RegionServerStatusService.BlockingInterface rss = rssStub;
2654    if (masterServerName == null || rss == null) {
2655      return null;
2656    }
2657    RegionServerStartupResponse result = null;
2658    try {
2659      rpcServices.requestCount.reset();
2660      rpcServices.rpcGetRequestCount.reset();
2661      rpcServices.rpcScanRequestCount.reset();
2662      rpcServices.rpcFullScanRequestCount.reset();
2663      rpcServices.rpcMultiRequestCount.reset();
2664      rpcServices.rpcMutateRequestCount.reset();
2665      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2666        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2667      long now = EnvironmentEdgeManager.currentTime();
2668      int port = rpcServices.getSocketAddress().getPort();
2669      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2670      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2671        request.setUseThisHostnameInstead(useThisHostnameInstead);
2672      }
2673      request.setPort(port);
2674      request.setServerStartCode(this.startcode);
2675      request.setServerCurrentTime(now);
2676      result = rss.regionServerStartup(null, request.build());
2677    } catch (ServiceException se) {
2678      IOException ioe = ProtobufUtil.getRemoteException(se);
2679      if (ioe instanceof ClockOutOfSyncException) {
2680        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
2681        // Re-throw IOE will cause RS to abort
2682        throw ioe;
2683      } else if (ioe instanceof DecommissionedHostRejectedException) {
2684        LOG.error(HBaseMarkers.FATAL,
2685          "Master rejected startup because the host is considered decommissioned", ioe);
2686        // Re-throw IOE will cause RS to abort
2687        throw ioe;
2688      } else if (ioe instanceof ServerNotRunningYetException) {
2689        LOG.debug("Master is not running yet");
2690      } else {
2691        LOG.warn("error telling master we are up", se);
2692      }
2693      rssStub = null;
2694    }
2695    return result;
2696  }
2697
2698  @Override
2699  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2700    try {
2701      GetLastFlushedSequenceIdRequest req =
2702        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2703      RegionServerStatusService.BlockingInterface rss = rssStub;
2704      if (rss == null) { // Try to connect one more time
2705        createRegionServerStatusStub();
2706        rss = rssStub;
2707        if (rss == null) {
2708          // Still no luck, we tried
2709          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2710          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2711            .build();
2712        }
2713      }
2714      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2715      return RegionStoreSequenceIds.newBuilder()
2716        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2717        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2718    } catch (ServiceException e) {
2719      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2720      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2721        .build();
2722    }
2723  }
2724
2725  /**
2726   * Close meta region if we carry it
2727   * @param abort Whether we're running an abort.
2728   */
2729  private void closeMetaTableRegions(final boolean abort) {
2730    HRegion meta = null;
2731    this.onlineRegionsLock.writeLock().lock();
2732    try {
2733      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
2734        RegionInfo hri = e.getValue().getRegionInfo();
2735        if (hri.isMetaRegion()) {
2736          meta = e.getValue();
2737        }
2738        if (meta != null) {
2739          break;
2740        }
2741      }
2742    } finally {
2743      this.onlineRegionsLock.writeLock().unlock();
2744    }
2745    if (meta != null) {
2746      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2747    }
2748  }
2749
2750  /**
2751   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
2752   * close regions that are already closed or that are closing.
2753   * @param abort Whether we're running an abort.
2754   */
2755  private void closeUserRegions(final boolean abort) {
2756    this.onlineRegionsLock.writeLock().lock();
2757    try {
2758      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
2759        HRegion r = e.getValue();
2760        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2761          // Don't update zk with this close transition; pass false.
2762          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2763        }
2764      }
2765    } finally {
2766      this.onlineRegionsLock.writeLock().unlock();
2767    }
2768  }
2769
2770  protected Map<String, HRegion> getOnlineRegions() {
2771    return this.onlineRegions;
2772  }
2773
2774  public int getNumberOfOnlineRegions() {
2775    return this.onlineRegions.size();
2776  }
2777
2778  /**
2779   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
2780   * as client; HRegion cannot be serialized to cross an rpc.
2781   */
2782  public Collection<HRegion> getOnlineRegionsLocalContext() {
2783    Collection<HRegion> regions = this.onlineRegions.values();
2784    return Collections.unmodifiableCollection(regions);
2785  }
2786
2787  @Override
2788  public void addRegion(HRegion region) {
2789    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2790    configurationManager.registerObserver(region);
2791  }
2792
2793  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2794    long size) {
2795    if (!sortedRegions.containsKey(size)) {
2796      sortedRegions.put(size, new ArrayList<>());
2797    }
2798    sortedRegions.get(size).add(region);
2799  }
2800
2801  /**
2802   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2803   *         the biggest.
2804   */
2805  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2806    // we'll sort the regions in reverse
2807    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2808    // Copy over all regions. Regions are sorted by size with biggest first.
2809    for (HRegion region : this.onlineRegions.values()) {
2810      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2811    }
2812    return sortedRegions;
2813  }
2814
2815  /**
2816   * @return A new Map of online regions sorted by region heap size with the first entry being the
2817   *         biggest.
2818   */
2819  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2820    // we'll sort the regions in reverse
2821    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2822    // Copy over all regions. Regions are sorted by size with biggest first.
2823    for (HRegion region : this.onlineRegions.values()) {
2824      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2825    }
2826    return sortedRegions;
2827  }
2828
2829  /** Returns reference to FlushRequester */
2830  @Override
2831  public FlushRequester getFlushRequester() {
2832    return this.cacheFlusher;
2833  }
2834
2835  @Override
2836  public CompactionRequester getCompactionRequestor() {
2837    return this.compactSplitThread;
2838  }
2839
2840  @Override
2841  public LeaseManager getLeaseManager() {
2842    return leaseManager;
2843  }
2844
2845  /** Returns {@code true} when the data file system is available, {@code false} otherwise. */
2846  boolean isDataFileSystemOk() {
2847    return this.dataFsOk;
2848  }
2849
2850  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
2851    return this.rsHost;
2852  }
2853
2854  @Override
2855  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2856    return this.regionsInTransitionInRS;
2857  }
2858
2859  @Override
2860  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2861    return rsQuotaManager;
2862  }
2863
2864  //
2865  // Main program and support routines
2866  //
2867  /**
2868   * Load the replication executorService objects, if any
2869   */
2870  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2871    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2872    // read in the name of the source replication class from the config file.
2873    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2874      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2875
2876    // read in the name of the sink replication class from the config file.
2877    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2878      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2879
2880    // If both the sink and the source class names are the same, then instantiate
2881    // only one object.
2882    if (sourceClassname.equals(sinkClassname)) {
2883      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2884        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2885      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2886      server.sameReplicationSourceAndSink = true;
2887    } else {
2888      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2889        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2890      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2891        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2892      server.sameReplicationSourceAndSink = false;
2893    }
2894  }
2895
2896  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2897    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2898    Path oldLogDir, WALFactory walFactory) throws IOException {
2899    final Class<? extends T> clazz;
2900    try {
2901      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2902      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2903    } catch (java.lang.ClassNotFoundException nfe) {
2904      throw new IOException("Could not find class for " + classname);
2905    }
2906    T service = ReflectionUtils.newInstance(clazz, conf);
2907    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2908    return service;
2909  }
2910
2911  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
2912    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2913    if (!this.isOnline()) {
2914      return walGroupsReplicationStatus;
2915    }
2916    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2917    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2918    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2919    for (ReplicationSourceInterface source : allSources) {
2920      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2921    }
2922    return walGroupsReplicationStatus;
2923  }
2924
2925  /**
2926   * Utility for constructing an instance of the passed HRegionServer class.
2927   */
2928  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
2929    final Configuration conf) {
2930    try {
2931      Constructor<? extends HRegionServer> c =
2932        regionServerClass.getConstructor(Configuration.class);
2933      return c.newInstance(conf);
2934    } catch (Exception e) {
2935      throw new RuntimeException(
2936        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
2937    }
2938  }
2939
2940  /**
2941   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2942   */
2943  public static void main(String[] args) {
2944    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2945    VersionInfo.logVersion();
2946    Configuration conf = HBaseConfiguration.create();
2947    @SuppressWarnings("unchecked")
2948    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2949      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2950
2951    new HRegionServerCommandLine(regionServerClass).doMain(args);
2952  }
2953
2954  /**
2955   * Gets the online regions of the specified table. This method looks at the in-memory
2956   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
2957   * If a region on this table has been closed during a disable, etc., it will not be included in
2958   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
2959   * all the ONLINE regions in the table.
2960   * @param tableName table to limit the scope of the query
2961   * @return Online regions from <code>tableName</code>
2962   */
2963  @Override
2964  public List<HRegion> getRegions(TableName tableName) {
2965    List<HRegion> tableRegions = new ArrayList<>();
2966    synchronized (this.onlineRegions) {
2967      for (HRegion region : this.onlineRegions.values()) {
2968        RegionInfo regionInfo = region.getRegionInfo();
2969        if (regionInfo.getTable().equals(tableName)) {
2970          tableRegions.add(region);
2971        }
2972      }
2973    }
2974    return tableRegions;
2975  }
2976
2977  @Override
2978  public List<HRegion> getRegions() {
2979    List<HRegion> allRegions;
2980    synchronized (this.onlineRegions) {
2981      // Return a clone copy of the onlineRegions
2982      allRegions = new ArrayList<>(onlineRegions.values());
2983    }
2984    return allRegions;
2985  }
2986
2987  /**
2988   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
2989   * @return all the online tables in this RS
2990   */
2991  public Set<TableName> getOnlineTables() {
2992    Set<TableName> tables = new HashSet<>();
2993    synchronized (this.onlineRegions) {
2994      for (Region region : this.onlineRegions.values()) {
2995        tables.add(region.getTableDescriptor().getTableName());
2996      }
2997    }
2998    return tables;
2999  }
3000
3001  public String[] getRegionServerCoprocessors() {
3002    TreeSet<String> coprocessors = new TreeSet<>();
3003    try {
3004      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3005    } catch (IOException exception) {
3006      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
3007        + "skipping.");
3008      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3009    }
3010    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3011    for (HRegion region : regions) {
3012      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3013      try {
3014        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3015      } catch (IOException exception) {
3016        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
3017          + "; skipping.");
3018        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3019      }
3020    }
3021    coprocessors.addAll(rsHost.getCoprocessors());
3022    return coprocessors.toArray(new String[0]);
3023  }
3024
3025  /**
3026   * Try to close the region, logs a warning on failure but continues.
3027   * @param region Region to close
3028   */
3029  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3030    try {
3031      if (!closeRegion(region.getEncodedName(), abort, null)) {
3032        LOG
3033          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
3034      }
3035    } catch (IOException e) {
3036      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
3037        e);
3038    }
3039  }
3040
3041  /**
3042   * Close asynchronously a region, can be called from the master or internally by the regionserver
3043   * when stopping. If called from the master, the region will update the status.
3044   * <p>
3045   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3046   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3047   * </p>
3048   * <p>
3049   * If a close was in progress, this new request will be ignored, and an exception thrown.
3050   * </p>
3051   * <p>
3052   * Provides additional flag to indicate if this region blocks should be evicted from the cache.
3053   * </p>
3054   * @param encodedName Region to close
3055   * @param abort       True if we are aborting
3056   * @param destination Where the Region is being moved too... maybe null if unknown.
3057   * @return True if closed a region.
3058   * @throws NotServingRegionException if the region is not online
3059   */
3060  protected boolean closeRegion(String encodedName, final boolean abort,
3061    final ServerName destination) throws NotServingRegionException {
3062    // Check for permissions to close.
3063    HRegion actualRegion = this.getRegion(encodedName);
3064    // Can be null if we're calling close on a region that's not online
3065    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3066      try {
3067        actualRegion.getCoprocessorHost().preClose(false);
3068      } catch (IOException exp) {
3069        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3070        return false;
3071      }
3072    }
3073
3074    // previous can come back 'null' if not in map.
3075    final Boolean previous =
3076      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
3077
3078    if (Boolean.TRUE.equals(previous)) {
3079      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
3080        + "trying to OPEN. Cancelling OPENING.");
3081      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3082        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3083        // We're going to try to do a standard close then.
3084        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
3085          + " Doing a standard close now");
3086        return closeRegion(encodedName, abort, destination);
3087      }
3088      // Let's get the region from the online region list again
3089      actualRegion = this.getRegion(encodedName);
3090      if (actualRegion == null) { // If already online, we still need to close it.
3091        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3092        // The master deletes the znode when it receives this exception.
3093        throw new NotServingRegionException(
3094          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
3095      }
3096    } else if (previous == null) {
3097      LOG.info("Received CLOSE for {}", encodedName);
3098    } else if (Boolean.FALSE.equals(previous)) {
3099      LOG.info("Received CLOSE for the region: " + encodedName
3100        + ", which we are already trying to CLOSE, but not completed yet");
3101      return true;
3102    }
3103
3104    if (actualRegion == null) {
3105      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3106      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3107      // The master deletes the znode when it receives this exception.
3108      throw new NotServingRegionException(
3109        "The region " + encodedName + " is not online, and is not opening.");
3110    }
3111
3112    CloseRegionHandler crh;
3113    final RegionInfo hri = actualRegion.getRegionInfo();
3114    if (hri.isMetaRegion()) {
3115      crh = new CloseMetaHandler(this, this, hri, abort);
3116    } else {
3117      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3118    }
3119    this.executorService.submit(crh);
3120    return true;
3121  }
3122
3123  /**
3124   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
3125   *         member of the online regions.
3126   */
3127  public HRegion getOnlineRegion(final byte[] regionName) {
3128    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3129    return this.onlineRegions.get(encodedRegionName);
3130  }
3131
3132  @Override
3133  public HRegion getRegion(final String encodedRegionName) {
3134    return this.onlineRegions.get(encodedRegionName);
3135  }
3136
3137  @Override
3138  public boolean removeRegion(final HRegion r, ServerName destination) {
3139    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3140    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3141    if (destination != null) {
3142      long closeSeqNum = r.getMaxFlushedSeqId();
3143      if (closeSeqNum == HConstants.NO_SEQNUM) {
3144        // No edits in WAL for this region; get the sequence number when the region was opened.
3145        closeSeqNum = r.getOpenSeqNum();
3146        if (closeSeqNum == HConstants.NO_SEQNUM) {
3147          closeSeqNum = 0;
3148        }
3149      }
3150      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3151      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3152      if (selfMove) {
3153        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3154          r.getRegionInfo().getEncodedName(),
3155          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3156      }
3157    }
3158    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3159    configurationManager.deregisterObserver(r);
3160    return toReturn != null;
3161  }
3162
3163  /**
3164   * Protected Utility method for safely obtaining an HRegion handle.
3165   * @param regionName Name of online {@link HRegion} to return
3166   * @return {@link HRegion} for <code>regionName</code>
3167   */
3168  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3169    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3170    return getRegionByEncodedName(regionName, encodedRegionName);
3171  }
3172
3173  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3174    return getRegionByEncodedName(null, encodedRegionName);
3175  }
3176
3177  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3178    throws NotServingRegionException {
3179    HRegion region = this.onlineRegions.get(encodedRegionName);
3180    if (region == null) {
3181      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3182      if (moveInfo != null) {
3183        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3184      }
3185      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3186      String regionNameStr =
3187        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3188      if (isOpening != null && isOpening) {
3189        throw new RegionOpeningException(
3190          "Region " + regionNameStr + " is opening on " + this.serverName);
3191      }
3192      throw new NotServingRegionException(
3193        "" + regionNameStr + " is not online on " + this.serverName);
3194    }
3195    return region;
3196  }
3197
3198  /**
3199   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3200   * already.
3201   * @param t   Throwable
3202   * @param msg Message to log in error. Can be null.
3203   * @return Throwable converted to an IOE; methods can only let out IOEs.
3204   */
3205  private Throwable cleanup(final Throwable t, final String msg) {
3206    // Don't log as error if NSRE; NSRE is 'normal' operation.
3207    if (t instanceof NotServingRegionException) {
3208      LOG.debug("NotServingRegionException; " + t.getMessage());
3209      return t;
3210    }
3211    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3212    if (msg == null) {
3213      LOG.error("", e);
3214    } else {
3215      LOG.error(msg, e);
3216    }
3217    if (!rpcServices.checkOOME(t)) {
3218      checkFileSystem();
3219    }
3220    return t;
3221  }
3222
3223  /**
3224   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3225   * @return Make <code>t</code> an IOE if it isn't already.
3226   */
3227  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3228    return (t instanceof IOException ? (IOException) t
3229      : msg == null || msg.length() == 0 ? new IOException(t)
3230      : new IOException(msg, t));
3231  }
3232
3233  /**
3234   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3235   * stopRequested
3236   * @return false if file system is not available
3237   */
3238  boolean checkFileSystem() {
3239    if (this.dataFsOk && this.dataFs != null) {
3240      try {
3241        FSUtils.checkFileSystemAvailable(this.dataFs);
3242      } catch (IOException e) {
3243        abort("File System not available", e);
3244        this.dataFsOk = false;
3245      }
3246    }
3247    return this.dataFsOk;
3248  }
3249
3250  @Override
3251  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3252    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3253    Address[] addr = new Address[favoredNodes.size()];
3254    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3255    // it is a map of region name to Address[]
3256    for (int i = 0; i < favoredNodes.size(); i++) {
3257      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3258    }
3259    regionFavoredNodesMap.put(encodedRegionName, addr);
3260  }
3261
3262  /**
3263   * Return the favored nodes for a region given its encoded name. Look at the comment around
3264   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3265   * @param encodedRegionName the encoded region name.
3266   * @return array of favored locations
3267   */
3268  @Override
3269  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3270    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3271  }
3272
3273  @Override
3274  public ServerNonceManager getNonceManager() {
3275    return this.nonceManager;
3276  }
3277
3278  private static class MovedRegionInfo {
3279    private final ServerName serverName;
3280    private final long seqNum;
3281
3282    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3283      this.serverName = serverName;
3284      this.seqNum = closeSeqNum;
3285    }
3286
3287    public ServerName getServerName() {
3288      return serverName;
3289    }
3290
3291    public long getSeqNum() {
3292      return seqNum;
3293    }
3294  }
3295
3296  /**
3297   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3298   * number of network calls instead of reducing them.
3299   */
3300  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3301
3302  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3303    boolean selfMove) {
3304    if (selfMove) {
3305      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3306      return;
3307    }
3308    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3309      + closeSeqNum);
3310    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3311  }
3312
3313  void removeFromMovedRegions(String encodedName) {
3314    movedRegionInfoCache.invalidate(encodedName);
3315  }
3316
3317  @InterfaceAudience.Private
3318  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3319    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3320  }
3321
3322  @InterfaceAudience.Private
3323  public int movedRegionCacheExpiredTime() {
3324    return TIMEOUT_REGION_MOVED;
3325  }
3326
3327  private String getMyEphemeralNodePath() {
3328    return zooKeeper.getZNodePaths().getRsPath(serverName);
3329  }
3330
3331  private boolean isHealthCheckerConfigured() {
3332    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3333    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3334  }
3335
3336  /** Returns the underlying {@link CompactSplit} for the servers */
3337  public CompactSplit getCompactSplitThread() {
3338    return this.compactSplitThread;
3339  }
3340
3341  CoprocessorServiceResponse execRegionServerService(
3342    @SuppressWarnings("UnusedParameters") final RpcController controller,
3343    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3344    try {
3345      ServerRpcController serviceController = new ServerRpcController();
3346      CoprocessorServiceCall call = serviceRequest.getCall();
3347      String serviceName = call.getServiceName();
3348      Service service = coprocessorServiceHandlers.get(serviceName);
3349      if (service == null) {
3350        throw new UnknownProtocolException(null,
3351          "No registered coprocessor executorService found for " + serviceName);
3352      }
3353      ServiceDescriptor serviceDesc = service.getDescriptorForType();
3354
3355      String methodName = call.getMethodName();
3356      MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3357      if (methodDesc == null) {
3358        throw new UnknownProtocolException(service.getClass(),
3359          "Unknown method " + methodName + " called on executorService " + serviceName);
3360      }
3361
3362      Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3363      final Message.Builder responseBuilder =
3364        service.getResponsePrototype(methodDesc).newBuilderForType();
3365      service.callMethod(methodDesc, serviceController, request, message -> {
3366        if (message != null) {
3367          responseBuilder.mergeFrom(message);
3368        }
3369      });
3370      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3371      if (exception != null) {
3372        throw exception;
3373      }
3374      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3375    } catch (IOException ie) {
3376      throw new ServiceException(ie);
3377    }
3378  }
3379
3380  /**
3381   * May be null if this is a master which not carry table.
3382   * @return The block cache instance used by the regionserver.
3383   */
3384  @Override
3385  public Optional<BlockCache> getBlockCache() {
3386    return Optional.ofNullable(this.blockCache);
3387  }
3388
3389  /**
3390   * May be null if this is a master which not carry table.
3391   * @return The cache for mob files used by the regionserver.
3392   */
3393  @Override
3394  public Optional<MobFileCache> getMobFileCache() {
3395    return Optional.ofNullable(this.mobFileCache);
3396  }
3397
3398  CacheEvictionStats clearRegionBlockCache(Region region) {
3399    long evictedBlocks = 0;
3400
3401    for (Store store : region.getStores()) {
3402      for (StoreFile hFile : store.getStorefiles()) {
3403        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3404      }
3405    }
3406
3407    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3408  }
3409
3410  @Override
3411  public double getCompactionPressure() {
3412    double max = 0;
3413    for (Region region : onlineRegions.values()) {
3414      for (Store store : region.getStores()) {
3415        double normCount = store.getCompactionPressure();
3416        if (normCount > max) {
3417          max = normCount;
3418        }
3419      }
3420    }
3421    return max;
3422  }
3423
3424  @Override
3425  public HeapMemoryManager getHeapMemoryManager() {
3426    return hMemManager;
3427  }
3428
3429  public MemStoreFlusher getMemStoreFlusher() {
3430    return cacheFlusher;
3431  }
3432
3433  /**
3434   * For testing
3435   * @return whether all wal roll request finished for this regionserver
3436   */
3437  @InterfaceAudience.Private
3438  public boolean walRollRequestFinished() {
3439    return this.walRoller.walRollFinished();
3440  }
3441
3442  @Override
3443  public ThroughputController getFlushThroughputController() {
3444    return flushThroughputController;
3445  }
3446
3447  @Override
3448  public double getFlushPressure() {
3449    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3450      // return 0 during RS initialization
3451      return 0.0;
3452    }
3453    return getRegionServerAccounting().getFlushPressure();
3454  }
3455
3456  @Override
3457  public void onConfigurationChange(Configuration newConf) {
3458    ThroughputController old = this.flushThroughputController;
3459    if (old != null) {
3460      old.stop("configuration change");
3461    }
3462    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3463    try {
3464      Superusers.initialize(newConf);
3465    } catch (IOException e) {
3466      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3467    }
3468
3469    // update region server coprocessor if the configuration has changed.
3470    if (
3471      CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
3472        CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)
3473    ) {
3474      LOG.info("Update region server coprocessors because the configuration has changed");
3475      this.rsHost = new RegionServerCoprocessorHost(this, newConf);
3476    }
3477  }
3478
3479  @Override
3480  public MetricsRegionServer getMetrics() {
3481    return metricsRegionServer;
3482  }
3483
3484  @Override
3485  public SecureBulkLoadManager getSecureBulkLoadManager() {
3486    return this.secureBulkLoadManager;
3487  }
3488
3489  @Override
3490  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3491    final Abortable abort) {
3492    final LockServiceClient client =
3493      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3494    return client.regionLock(regionInfo, description, abort);
3495  }
3496
3497  @Override
3498  public void unassign(byte[] regionName) throws IOException {
3499    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3500  }
3501
3502  @Override
3503  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3504    return this.rsSpaceQuotaManager;
3505  }
3506
3507  @Override
3508  public boolean reportFileArchivalForQuotas(TableName tableName,
3509    Collection<Entry<String, Long>> archivedFiles) {
3510    if (TEST_SKIP_REPORTING_TRANSITION) {
3511      return false;
3512    }
3513    RegionServerStatusService.BlockingInterface rss = rssStub;
3514    if (rss == null || rsSpaceQuotaManager == null) {
3515      // the current server could be stopping.
3516      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3517      return false;
3518    }
3519    try {
3520      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3521        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3522      rss.reportFileArchival(null, request);
3523    } catch (ServiceException se) {
3524      IOException ioe = ProtobufUtil.getRemoteException(se);
3525      if (ioe instanceof PleaseHoldException) {
3526        if (LOG.isTraceEnabled()) {
3527          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3528            + " This will be retried.", ioe);
3529        }
3530        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3531        return false;
3532      }
3533      if (rssStub == rss) {
3534        rssStub = null;
3535      }
3536      // re-create the stub if we failed to report the archival
3537      createRegionServerStatusStub(true);
3538      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3539      return false;
3540    }
3541    return true;
3542  }
3543
3544  void executeProcedure(long procId, long initiatingMasterActiveTime,
3545    RSProcedureCallable callable) {
3546    executorService
3547      .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable));
3548  }
3549
3550  public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime,
3551    Throwable error) {
3552    procedureResultReporter.complete(procId, initiatingMasterActiveTime, error);
3553  }
3554
3555  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3556    RegionServerStatusService.BlockingInterface rss;
3557    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3558    for (;;) {
3559      rss = rssStub;
3560      if (rss != null) {
3561        break;
3562      }
3563      createRegionServerStatusStub();
3564    }
3565    try {
3566      rss.reportProcedureDone(null, request);
3567    } catch (ServiceException se) {
3568      if (rssStub == rss) {
3569        rssStub = null;
3570      }
3571      throw ProtobufUtil.getRemoteException(se);
3572    }
3573  }
3574
3575  /**
3576   * Will ignore the open/close region procedures which already submitted or executed. When master
3577   * had unfinished open/close region procedure and restarted, new active master may send duplicate
3578   * open/close region request to regionserver. The open/close request is submitted to a thread pool
3579   * and execute. So first need a cache for submitted open/close region procedures. After the
3580   * open/close region request executed and report region transition succeed, cache it in executed
3581   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3582   * transition succeed, master will not send the open/close region request to regionserver again.
3583   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3584   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
3585   * HBASE-22404 for more details.
3586   * @param procId the id of the open/close region procedure
3587   * @return true if the procedure can be submitted.
3588   */
3589  boolean submitRegionProcedure(long procId) {
3590    if (procId == -1) {
3591      return true;
3592    }
3593    // Ignore the region procedures which already submitted.
3594    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3595    if (previous != null) {
3596      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3597      return false;
3598    }
3599    // Ignore the region procedures which already executed.
3600    if (executedRegionProcedures.getIfPresent(procId) != null) {
3601      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3602      return false;
3603    }
3604    return true;
3605  }
3606
3607  /**
3608   * See {@link #submitRegionProcedure(long)}.
3609   * @param procId the id of the open/close region procedure
3610   */
3611  public void finishRegionProcedure(long procId) {
3612    executedRegionProcedures.put(procId, procId);
3613    submittedRegionProcedures.remove(procId);
3614  }
3615
3616  /**
3617   * Force to terminate region server when abort timeout.
3618   */
3619  private static class SystemExitWhenAbortTimeout extends TimerTask {
3620
3621    public SystemExitWhenAbortTimeout() {
3622    }
3623
3624    @Override
3625    public void run() {
3626      LOG.warn("Aborting region server timed out, terminating forcibly"
3627        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
3628        + " Thread dump to stdout.");
3629      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3630      Runtime.getRuntime().halt(1);
3631    }
3632  }
3633
3634  @InterfaceAudience.Private
3635  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3636    return compactedFileDischarger;
3637  }
3638
3639  /**
3640   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3641   * @return pause time
3642   */
3643  @InterfaceAudience.Private
3644  public long getRetryPauseTime() {
3645    return this.retryPauseTime;
3646  }
3647
3648  @Override
3649  public Optional<ServerName> getActiveMaster() {
3650    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3651  }
3652
3653  @Override
3654  public List<ServerName> getBackupMasters() {
3655    return masterAddressTracker.getBackupMasters();
3656  }
3657
3658  @Override
3659  public Iterator<ServerName> getBootstrapNodes() {
3660    return bootstrapNodeManager.getBootstrapNodes().iterator();
3661  }
3662
3663  @Override
3664  public List<HRegionLocation> getMetaLocations() {
3665    return metaRegionLocationCache.getMetaRegionLocations();
3666  }
3667
3668  @Override
3669  protected NamedQueueRecorder createNamedQueueRecord() {
3670    return NamedQueueRecorder.getInstance(conf);
3671  }
3672
3673  @Override
3674  protected boolean clusterMode() {
3675    // this method will be called in the constructor of super class, so we can not return masterless
3676    // directly here, as it will always be false.
3677    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3678  }
3679
3680  @InterfaceAudience.Private
3681  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
3682    return brokenStoreFileCleaner;
3683  }
3684
3685  @InterfaceAudience.Private
3686  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
3687    return rsMobFileCleanerChore;
3688  }
3689
3690  RSSnapshotVerifier getRsSnapshotVerifier() {
3691    return rsSnapshotVerifier;
3692  }
3693
3694  @Override
3695  protected void stopChores() {
3696    shutdownChore(nonceManagerChore);
3697    shutdownChore(compactionChecker);
3698    shutdownChore(compactedFileDischarger);
3699    shutdownChore(periodicFlusher);
3700    shutdownChore(healthCheckChore);
3701    shutdownChore(executorStatusChore);
3702    shutdownChore(storefileRefresher);
3703    shutdownChore(fsUtilizationChore);
3704    shutdownChore(namedQueueServiceChore);
3705    shutdownChore(brokenStoreFileCleaner);
3706    shutdownChore(rsMobFileCleanerChore);
3707    shutdownChore(replicationMarkerChore);
3708  }
3709
3710  @Override
3711  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3712    return regionReplicationBufferManager;
3713  }
3714}