001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
034
035import io.opentelemetry.api.trace.Span;
036import io.opentelemetry.api.trace.StatusCode;
037import io.opentelemetry.context.Scope;
038import java.io.IOException;
039import java.io.PrintWriter;
040import java.lang.management.MemoryUsage;
041import java.lang.reflect.Constructor;
042import java.net.InetSocketAddress;
043import java.time.Duration;
044import java.util.ArrayList;
045import java.util.Collection;
046import java.util.Collections;
047import java.util.Comparator;
048import java.util.HashSet;
049import java.util.Iterator;
050import java.util.List;
051import java.util.Map;
052import java.util.Map.Entry;
053import java.util.Objects;
054import java.util.Optional;
055import java.util.Set;
056import java.util.SortedMap;
057import java.util.Timer;
058import java.util.TimerTask;
059import java.util.TreeMap;
060import java.util.TreeSet;
061import java.util.concurrent.ConcurrentHashMap;
062import java.util.concurrent.ConcurrentMap;
063import java.util.concurrent.ConcurrentSkipListMap;
064import java.util.concurrent.ThreadLocalRandom;
065import java.util.concurrent.TimeUnit;
066import java.util.concurrent.atomic.AtomicBoolean;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.stream.Collectors;
069import javax.management.MalformedObjectNameException;
070import javax.servlet.http.HttpServlet;
071import org.apache.commons.lang3.StringUtils;
072import org.apache.commons.lang3.mutable.MutableFloat;
073import org.apache.hadoop.conf.Configuration;
074import org.apache.hadoop.fs.FileSystem;
075import org.apache.hadoop.fs.Path;
076import org.apache.hadoop.hbase.Abortable;
077import org.apache.hadoop.hbase.CacheEvictionStats;
078import org.apache.hadoop.hbase.CallQueueTooBigException;
079import org.apache.hadoop.hbase.ClockOutOfSyncException;
080import org.apache.hadoop.hbase.DoNotRetryIOException;
081import org.apache.hadoop.hbase.ExecutorStatusChore;
082import org.apache.hadoop.hbase.HBaseConfiguration;
083import org.apache.hadoop.hbase.HBaseInterfaceAudience;
084import org.apache.hadoop.hbase.HBaseServerBase;
085import org.apache.hadoop.hbase.HConstants;
086import org.apache.hadoop.hbase.HDFSBlocksDistribution;
087import org.apache.hadoop.hbase.HRegionLocation;
088import org.apache.hadoop.hbase.HealthCheckChore;
089import org.apache.hadoop.hbase.MetaTableAccessor;
090import org.apache.hadoop.hbase.NotServingRegionException;
091import org.apache.hadoop.hbase.PleaseHoldException;
092import org.apache.hadoop.hbase.ScheduledChore;
093import org.apache.hadoop.hbase.ServerName;
094import org.apache.hadoop.hbase.Stoppable;
095import org.apache.hadoop.hbase.TableName;
096import org.apache.hadoop.hbase.YouAreDeadException;
097import org.apache.hadoop.hbase.ZNodeClearer;
098import org.apache.hadoop.hbase.client.ConnectionUtils;
099import org.apache.hadoop.hbase.client.RegionInfo;
100import org.apache.hadoop.hbase.client.RegionInfoBuilder;
101import org.apache.hadoop.hbase.client.locking.EntityLock;
102import org.apache.hadoop.hbase.client.locking.LockServiceClient;
103import org.apache.hadoop.hbase.conf.ConfigurationObserver;
104import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
105import org.apache.hadoop.hbase.exceptions.RegionMovedException;
106import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
107import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
108import org.apache.hadoop.hbase.executor.ExecutorType;
109import org.apache.hadoop.hbase.http.InfoServer;
110import org.apache.hadoop.hbase.io.hfile.BlockCache;
111import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
112import org.apache.hadoop.hbase.io.hfile.HFile;
113import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
114import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
115import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
116import org.apache.hadoop.hbase.ipc.RpcClient;
117import org.apache.hadoop.hbase.ipc.RpcServer;
118import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
119import org.apache.hadoop.hbase.ipc.ServerRpcController;
120import org.apache.hadoop.hbase.log.HBaseMarkers;
121import org.apache.hadoop.hbase.mob.MobFileCache;
122import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
123import org.apache.hadoop.hbase.monitoring.TaskMonitor;
124import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
125import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
126import org.apache.hadoop.hbase.net.Address;
127import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
128import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
129import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
130import org.apache.hadoop.hbase.quotas.QuotaUtil;
131import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
132import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
133import org.apache.hadoop.hbase.quotas.RegionSize;
134import org.apache.hadoop.hbase.quotas.RegionSizeStore;
135import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
136import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
137import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
138import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
139import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
140import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
141import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
142import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
143import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
144import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
145import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
146import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
147import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
148import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
149import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
150import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
151import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
152import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
153import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
154import org.apache.hadoop.hbase.security.SecurityConstants;
155import org.apache.hadoop.hbase.security.Superusers;
156import org.apache.hadoop.hbase.security.User;
157import org.apache.hadoop.hbase.security.UserProvider;
158import org.apache.hadoop.hbase.trace.TraceUtil;
159import org.apache.hadoop.hbase.util.Bytes;
160import org.apache.hadoop.hbase.util.CompressionTest;
161import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
162import org.apache.hadoop.hbase.util.DNS;
163import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
164import org.apache.hadoop.hbase.util.FSUtils;
165import org.apache.hadoop.hbase.util.FutureUtils;
166import org.apache.hadoop.hbase.util.JvmPauseMonitor;
167import org.apache.hadoop.hbase.util.Pair;
168import org.apache.hadoop.hbase.util.RetryCounter;
169import org.apache.hadoop.hbase.util.RetryCounterFactory;
170import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
171import org.apache.hadoop.hbase.util.Threads;
172import org.apache.hadoop.hbase.util.VersionInfo;
173import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
174import org.apache.hadoop.hbase.wal.WAL;
175import org.apache.hadoop.hbase.wal.WALFactory;
176import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
177import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
178import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
179import org.apache.hadoop.hbase.zookeeper.ZKUtil;
180import org.apache.hadoop.ipc.RemoteException;
181import org.apache.hadoop.util.ReflectionUtils;
182import org.apache.yetus.audience.InterfaceAudience;
183import org.apache.zookeeper.KeeperException;
184import org.slf4j.Logger;
185import org.slf4j.LoggerFactory;
186
187import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
188import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
189import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
190import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
191import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
192import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
193import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
194import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
195import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
196import org.apache.hbase.thirdparty.com.google.protobuf.Message;
197import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
198import org.apache.hbase.thirdparty.com.google.protobuf.Service;
199import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
200import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
201import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
202
203import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
204import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
233
234/**
235 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
236 * are many HRegionServers in a single HBase deployment.
237 */
238@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
239@SuppressWarnings({ "deprecation" })
240public class HRegionServer extends HBaseServerBase<RSRpcServices>
241  implements RegionServerServices, LastSequenceId {
242
243  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
244
245  int unitMB = 1024 * 1024;
246  int unitKB = 1024;
247
248  /**
249   * For testing only! Set to true to skip notifying region assignment to master .
250   */
251  @InterfaceAudience.Private
252  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
253  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
254
255  /**
256   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
257   * region action in progress false - if close region action in progress
258   */
259  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
260    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
261
262  /**
263   * Used to cache the open/close region procedures which already submitted. See
264   * {@link #submitRegionProcedure(long)}.
265   */
266  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
267  /**
268   * Used to cache the open/close region procedures which already executed. See
269   * {@link #submitRegionProcedure(long)}.
270   */
271  private final Cache<Long, Long> executedRegionProcedures =
272    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
273
274  /**
275   * Used to cache the moved-out regions
276   */
277  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
278    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
279
280  private MemStoreFlusher cacheFlusher;
281
282  private HeapMemoryManager hMemManager;
283
284  // Replication services. If no replication, this handler will be null.
285  private ReplicationSourceService replicationSourceHandler;
286  private ReplicationSinkService replicationSinkHandler;
287  private boolean sameReplicationSourceAndSink;
288
289  // Compactions
290  private CompactSplit compactSplitThread;
291
292  /**
293   * Map of regions currently being served by this region server. Key is the encoded region name.
294   * All access should be synchronized.
295   */
296  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
297  /**
298   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
299   * need to be a ConcurrentHashMap?
300   */
301  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
302
303  /**
304   * Map of encoded region names to the DataNode locations they should be hosted on We store the
305   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
306   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
307   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
308   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
309   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
310   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
311   * will be fresh when we need it.
312   */
313  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
314
315  private LeaseManager leaseManager;
316
317  private volatile boolean dataFsOk;
318
319  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
320  // Default abort timeout is 1200 seconds for safe
321  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
322  // Will run this task when abort timeout
323  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
324
325  // A state before we go into stopped state. At this stage we're closing user
326  // space regions.
327  private boolean stopping = false;
328  private volatile boolean killed = false;
329
330  private final int threadWakeFrequency;
331
332  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
333  private final int compactionCheckFrequency;
334  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
335  private final int flushCheckFrequency;
336
337  // Stub to do region server status calls against the master.
338  private volatile RegionServerStatusService.BlockingInterface rssStub;
339  private volatile LockService.BlockingInterface lockStub;
340  // RPC client. Used to make the stub above that does region server status checking.
341  private RpcClient rpcClient;
342
343  private UncaughtExceptionHandler uncaughtExceptionHandler;
344
345  private JvmPauseMonitor pauseMonitor;
346
347  private RSSnapshotVerifier rsSnapshotVerifier;
348
349  /** region server process name */
350  public static final String REGIONSERVER = "regionserver";
351
352  private MetricsRegionServer metricsRegionServer;
353  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
354
355  /**
356   * Check for compactions requests.
357   */
358  private ScheduledChore compactionChecker;
359
360  /**
361   * Check for flushes
362   */
363  private ScheduledChore periodicFlusher;
364
365  private volatile WALFactory walFactory;
366
367  private LogRoller walRoller;
368
369  // A thread which calls reportProcedureDone
370  private RemoteProcedureResultReporter procedureResultReporter;
371
372  // flag set after we're done setting up server threads
373  final AtomicBoolean online = new AtomicBoolean(false);
374
375  // master address tracker
376  private final MasterAddressTracker masterAddressTracker;
377
378  // Log Splitting Worker
379  private SplitLogWorker splitLogWorker;
380
381  private final int shortOperationTimeout;
382
383  // Time to pause if master says 'please hold'
384  private final long retryPauseTime;
385
386  private final RegionServerAccounting regionServerAccounting;
387
388  private NamedQueueServiceChore namedQueueServiceChore = null;
389
390  // Block cache
391  private BlockCache blockCache;
392  // The cache for mob files
393  private MobFileCache mobFileCache;
394
395  /** The health check chore. */
396  private HealthCheckChore healthCheckChore;
397
398  /** The Executor status collect chore. */
399  private ExecutorStatusChore executorStatusChore;
400
401  /** The nonce manager chore. */
402  private ScheduledChore nonceManagerChore;
403
404  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
405
406  /**
407   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
408   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
409   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
410   */
411  @Deprecated
412  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
413  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
414    "hbase.regionserver.hostname.disable.master.reversedns";
415
416  /**
417   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
418   * Exception will be thrown if both are used.
419   */
420  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
421  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
422    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
423
424  /**
425   * Unique identifier for the cluster we are a part of.
426   */
427  private String clusterId;
428
429  // chore for refreshing store files for secondary regions
430  private StorefileRefresherChore storefileRefresher;
431
432  private volatile RegionServerCoprocessorHost rsHost;
433
434  private RegionServerProcedureManagerHost rspmHost;
435
436  private RegionServerRpcQuotaManager rsQuotaManager;
437  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
438
439  /**
440   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
441   * case where client doesn't receive the response from a successful operation and retries. We
442   * track the successful ops for some time via a nonce sent by client and handle duplicate
443   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
444   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
445   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
446   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
447   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
448   * recovery during normal region move, so nonces will not be transfered. We can have separate
449   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
450   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
451   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
452   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
453   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
454   * recovered during move.
455   */
456  final ServerNonceManager nonceManager;
457
458  private BrokenStoreFileCleaner brokenStoreFileCleaner;
459
460  private RSMobFileCleanerChore rsMobFileCleanerChore;
461
462  @InterfaceAudience.Private
463  CompactedHFilesDischarger compactedFileDischarger;
464
465  private volatile ThroughputController flushThroughputController;
466
467  private SecureBulkLoadManager secureBulkLoadManager;
468
469  private FileSystemUtilizationChore fsUtilizationChore;
470
471  private BootstrapNodeManager bootstrapNodeManager;
472
473  /**
474   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
475   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
476   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
477   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
478   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
479   */
480  private final boolean masterless;
481  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
482
483  /** regionserver codec list **/
484  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
485
486  // A timer to shutdown the process if abort takes too long
487  private Timer abortMonitor;
488
489  private RegionReplicationBufferManager regionReplicationBufferManager;
490
491  /*
492   * Chore that creates replication marker rows.
493   */
494  private ReplicationMarkerChore replicationMarkerChore;
495
496  // A timer submit requests to the PrefetchExecutor
497  private PrefetchExecutorNotifier prefetchExecutorNotifier;
498
499  /**
500   * Starts a HRegionServer at the default location.
501   * <p/>
502   * Don't start any services or managers in here in the Constructor. Defer till after we register
503   * with the Master as much as possible. See {@link #startServices}.
504   */
505  public HRegionServer(final Configuration conf) throws IOException {
506    super(conf, "RegionServer"); // thread name
507    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
508    try (Scope ignored = span.makeCurrent()) {
509      this.dataFsOk = true;
510      this.masterless = !clusterMode();
511      MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf);
512      HFile.checkHFileVersion(this.conf);
513      checkCodecs(this.conf);
514      FSUtils.setupShortCircuitRead(this.conf);
515
516      // Disable usage of meta replicas in the regionserver
517      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
518      // Config'ed params
519      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
520      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
521      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
522
523      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
524      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
525
526      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
527        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
528
529      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
530        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
531
532      regionServerAccounting = new RegionServerAccounting(conf);
533
534      blockCache = BlockCacheFactory.createBlockCache(conf);
535      // The call below, instantiates the DataTieringManager only when
536      // the configuration "hbase.regionserver.datatiering.enable" is set to true.
537      DataTieringManager.instantiate(conf, onlineRegions);
538
539      mobFileCache = new MobFileCache(conf);
540
541      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
542
543      uncaughtExceptionHandler =
544        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
545
546      // If no master in cluster, skip trying to track one or look for a cluster status.
547      if (!this.masterless) {
548        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
549        masterAddressTracker.start();
550      } else {
551        masterAddressTracker = null;
552      }
553      this.rpcServices.start(zooKeeper);
554      span.setStatus(StatusCode.OK);
555    } catch (Throwable t) {
556      // Make sure we log the exception. HRegionServer is often started via reflection and the
557      // cause of failed startup is lost.
558      TraceUtil.setError(span, t);
559      LOG.error("Failed construction RegionServer", t);
560      throw t;
561    } finally {
562      span.end();
563    }
564  }
565
566  // HMaster should override this method to load the specific config for master
567  @Override
568  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
569    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
570    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
571      if (!StringUtils.isBlank(hostname)) {
572        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
573          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
574          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
575          + UNSAFE_RS_HOSTNAME_KEY + " is used";
576        throw new IOException(msg);
577      } else {
578        return DNS.getHostname(conf, DNS.ServerType.REGIONSERVER);
579      }
580    } else {
581      return hostname;
582    }
583  }
584
585  @Override
586  protected DNS.ServerType getDNSServerType() {
587    return DNS.ServerType.REGIONSERVER;
588  }
589
590  @Override
591  protected void login(UserProvider user, String host) throws IOException {
592    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
593      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
594  }
595
596  @Override
597  protected String getProcessName() {
598    return REGIONSERVER;
599  }
600
601  @Override
602  protected RegionServerCoprocessorHost getCoprocessorHost() {
603    return getRegionServerCoprocessorHost();
604  }
605
606  @Override
607  protected boolean canCreateBaseZNode() {
608    return !clusterMode();
609  }
610
611  @Override
612  protected boolean canUpdateTableDescriptor() {
613    return false;
614  }
615
616  @Override
617  protected boolean cacheTableDescriptor() {
618    return false;
619  }
620
621  protected RSRpcServices createRpcServices() throws IOException {
622    return new RSRpcServices(this);
623  }
624
625  @Override
626  protected void configureInfoServer(InfoServer infoServer) {
627    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
628    infoServer.setAttribute(REGIONSERVER, this);
629  }
630
631  @Override
632  protected Class<? extends HttpServlet> getDumpServlet() {
633    return RSDumpServlet.class;
634  }
635
636  /**
637   * Used by {@link RSDumpServlet} to generate debugging information.
638   */
639  public void dumpRowLocks(final PrintWriter out) {
640    StringBuilder sb = new StringBuilder();
641    for (HRegion region : getRegions()) {
642      if (region.getLockedRows().size() > 0) {
643        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
644          sb.setLength(0);
645          sb.append(region.getTableDescriptor().getTableName()).append(",")
646            .append(region.getRegionInfo().getEncodedName()).append(",");
647          sb.append(rowLockContext.toString());
648          out.println(sb);
649        }
650      }
651    }
652  }
653
654  @Override
655  public boolean registerService(Service instance) {
656    // No stacking of instances is allowed for a single executorService name
657    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
658    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
659    if (coprocessorServiceHandlers.containsKey(serviceName)) {
660      LOG.error("Coprocessor executorService " + serviceName
661        + " already registered, rejecting request from " + instance);
662      return false;
663    }
664
665    coprocessorServiceHandlers.put(serviceName, instance);
666    if (LOG.isDebugEnabled()) {
667      LOG.debug(
668        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
669    }
670    return true;
671  }
672
673  /**
674   * Run test on configured codecs to make sure supporting libs are in place.
675   */
676  private static void checkCodecs(final Configuration c) throws IOException {
677    // check to see if the codec list is available:
678    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
679    if (codecs == null) {
680      return;
681    }
682    for (String codec : codecs) {
683      if (!CompressionTest.testCompression(codec)) {
684        throw new IOException(
685          "Compression codec " + codec + " not supported, aborting RS construction");
686      }
687    }
688  }
689
690  public String getClusterId() {
691    return this.clusterId;
692  }
693
694  /**
695   * All initialization needed before we go register with Master.<br>
696   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
697   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
698   */
699  private void preRegistrationInitialization() {
700    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
701    try (Scope ignored = span.makeCurrent()) {
702      initializeZooKeeper();
703      setupClusterConnection();
704      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
705      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
706      // Setup RPC client for master communication
707      this.rpcClient = asyncClusterConnection.getRpcClient();
708      span.setStatus(StatusCode.OK);
709    } catch (Throwable t) {
710      // Call stop if error or process will stick around for ever since server
711      // puts up non-daemon threads.
712      TraceUtil.setError(span, t);
713      this.rpcServices.stop();
714      abort("Initialization of RS failed.  Hence aborting RS.", t);
715    } finally {
716      span.end();
717    }
718  }
719
720  /**
721   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
722   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
723   * <p>
724   * Finally open long-living server short-circuit connection.
725   */
726  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
727      justification = "cluster Id znode read would give us correct response")
728  private void initializeZooKeeper() throws IOException, InterruptedException {
729    // Nothing to do in here if no Master in the mix.
730    if (this.masterless) {
731      return;
732    }
733
734    // Create the master address tracker, register with zk, and start it. Then
735    // block until a master is available. No point in starting up if no master
736    // running.
737    blockAndCheckIfStopped(this.masterAddressTracker);
738
739    // Wait on cluster being up. Master will set this flag up in zookeeper
740    // when ready.
741    blockAndCheckIfStopped(this.clusterStatusTracker);
742
743    // If we are HMaster then the cluster id should have already been set.
744    if (clusterId == null) {
745      // Retrieve clusterId
746      // Since cluster status is now up
747      // ID should have already been set by HMaster
748      try {
749        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
750        if (clusterId == null) {
751          this.abort("Cluster ID has not been set");
752        }
753        LOG.info("ClusterId : " + clusterId);
754      } catch (KeeperException e) {
755        this.abort("Failed to retrieve Cluster ID", e);
756      }
757    }
758
759    if (isStopped() || isAborted()) {
760      return; // No need for further initialization
761    }
762
763    // watch for snapshots and other procedures
764    try {
765      rspmHost = new RegionServerProcedureManagerHost();
766      rspmHost.loadProcedures(conf);
767      rspmHost.initialize(this);
768    } catch (KeeperException e) {
769      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
770    }
771  }
772
773  /**
774   * Utilty method to wait indefinitely on a znode availability while checking if the region server
775   * is shut down
776   * @param tracker znode tracker to use
777   * @throws IOException          any IO exception, plus if the RS is stopped
778   * @throws InterruptedException if the waiting thread is interrupted
779   */
780  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
781    throws IOException, InterruptedException {
782    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
783      if (this.stopped) {
784        throw new IOException("Received the shutdown message while waiting.");
785      }
786    }
787  }
788
789  /** Returns True if the cluster is up. */
790  @Override
791  public boolean isClusterUp() {
792    return this.masterless
793      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
794  }
795
796  private void initializeReplicationMarkerChore() {
797    boolean replicationMarkerEnabled =
798      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
799    // If replication or replication marker is not enabled then return immediately.
800    if (replicationMarkerEnabled) {
801      int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
802        REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
803      replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf);
804    }
805  }
806
807  @Override
808  public boolean isStopping() {
809    return stopping;
810  }
811
812  /**
813   * The HRegionServer sticks in this loop until closed.
814   */
815  @Override
816  public void run() {
817    if (isStopped()) {
818      LOG.info("Skipping run; stopped");
819      return;
820    }
821    try {
822      // Do pre-registration initializations; zookeeper, lease threads, etc.
823      preRegistrationInitialization();
824    } catch (Throwable e) {
825      abort("Fatal exception during initialization", e);
826    }
827
828    try {
829      if (!isStopped() && !isAborted()) {
830        installShutdownHook();
831        // Initialize the RegionServerCoprocessorHost now that our ephemeral
832        // node was created, in case any coprocessors want to use ZooKeeper
833        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
834
835        // Try and register with the Master; tell it we are here. Break if server is stopped or
836        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
837        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
838        // come up.
839        LOG.debug("About to register with Master.");
840        TraceUtil.trace(() -> {
841          RetryCounterFactory rcf =
842            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
843          RetryCounter rc = rcf.create();
844          while (keepLooping()) {
845            RegionServerStartupResponse w = reportForDuty();
846            if (w == null) {
847              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
848              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
849              this.sleeper.sleep(sleepTime);
850            } else {
851              handleReportForDutyResponse(w);
852              break;
853            }
854          }
855        }, "HRegionServer.registerWithMaster");
856      }
857
858      if (!isStopped() && isHealthy()) {
859        TraceUtil.trace(() -> {
860          // start the snapshot handler and other procedure handlers,
861          // since the server is ready to run
862          if (this.rspmHost != null) {
863            this.rspmHost.start();
864          }
865          // Start the Quota Manager
866          if (this.rsQuotaManager != null) {
867            rsQuotaManager.start(getRpcServer().getScheduler());
868          }
869          if (this.rsSpaceQuotaManager != null) {
870            this.rsSpaceQuotaManager.start();
871          }
872        }, "HRegionServer.startup");
873      }
874
875      // We registered with the Master. Go into run mode.
876      long lastMsg = EnvironmentEdgeManager.currentTime();
877      long oldRequestCount = -1;
878      // The main run loop.
879      while (!isStopped() && isHealthy()) {
880        if (!isClusterUp()) {
881          if (onlineRegions.isEmpty()) {
882            stop("Exiting; cluster shutdown set and not carrying any regions");
883          } else if (!this.stopping) {
884            this.stopping = true;
885            LOG.info("Closing user regions");
886            closeUserRegions(isAborted());
887          } else {
888            boolean allUserRegionsOffline = areAllUserRegionsOffline();
889            if (allUserRegionsOffline) {
890              // Set stopped if no more write requests tp meta tables
891              // since last time we went around the loop. Any open
892              // meta regions will be closed on our way out.
893              if (oldRequestCount == getWriteRequestCount()) {
894                stop("Stopped; only catalog regions remaining online");
895                break;
896              }
897              oldRequestCount = getWriteRequestCount();
898            } else {
899              // Make sure all regions have been closed -- some regions may
900              // have not got it because we were splitting at the time of
901              // the call to closeUserRegions.
902              closeUserRegions(this.abortRequested.get());
903            }
904            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
905          }
906        }
907        long now = EnvironmentEdgeManager.currentTime();
908        if ((now - lastMsg) >= msgInterval) {
909          tryRegionServerReport(lastMsg, now);
910          lastMsg = EnvironmentEdgeManager.currentTime();
911        }
912        if (!isStopped() && !isAborted()) {
913          this.sleeper.sleep();
914        }
915      } // for
916    } catch (Throwable t) {
917      if (!rpcServices.checkOOME(t)) {
918        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
919        abort(prefix + t.getMessage(), t);
920      }
921    }
922
923    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
924    try (Scope ignored = span.makeCurrent()) {
925      if (this.leaseManager != null) {
926        this.leaseManager.closeAfterLeasesExpire();
927      }
928      if (this.splitLogWorker != null) {
929        splitLogWorker.stop();
930      }
931      stopInfoServer();
932      // Send cache a shutdown.
933      if (blockCache != null) {
934        blockCache.shutdown();
935      }
936      if (mobFileCache != null) {
937        mobFileCache.shutdown();
938      }
939
940      // Send interrupts to wake up threads if sleeping so they notice shutdown.
941      // TODO: Should we check they are alive? If OOME could have exited already
942      if (this.hMemManager != null) {
943        this.hMemManager.stop();
944      }
945      if (this.cacheFlusher != null) {
946        this.cacheFlusher.interruptIfNecessary();
947      }
948      if (this.compactSplitThread != null) {
949        this.compactSplitThread.interruptIfNecessary();
950      }
951
952      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
953      if (rspmHost != null) {
954        rspmHost.stop(this.abortRequested.get() || this.killed);
955      }
956
957      if (this.killed) {
958        // Just skip out w/o closing regions. Used when testing.
959      } else if (abortRequested.get()) {
960        if (this.dataFsOk) {
961          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
962        }
963        LOG.info("aborting server " + this.serverName);
964      } else {
965        closeUserRegions(abortRequested.get());
966        LOG.info("stopping server " + this.serverName);
967      }
968      regionReplicationBufferManager.stop();
969      closeClusterConnection();
970      // Closing the compactSplit thread before closing meta regions
971      if (!this.killed && containsMetaTableRegions()) {
972        if (!abortRequested.get() || this.dataFsOk) {
973          if (this.compactSplitThread != null) {
974            this.compactSplitThread.join();
975            this.compactSplitThread = null;
976          }
977          closeMetaTableRegions(abortRequested.get());
978        }
979      }
980
981      if (!this.killed && this.dataFsOk) {
982        waitOnAllRegionsToClose(abortRequested.get());
983        LOG.info("stopping server " + this.serverName + "; all regions closed.");
984      }
985
986      // Stop the quota manager
987      if (rsQuotaManager != null) {
988        rsQuotaManager.stop();
989      }
990      if (rsSpaceQuotaManager != null) {
991        rsSpaceQuotaManager.stop();
992        rsSpaceQuotaManager = null;
993      }
994
995      // flag may be changed when closing regions throws exception.
996      if (this.dataFsOk) {
997        shutdownWAL(!abortRequested.get());
998      }
999
1000      // Make sure the proxy is down.
1001      if (this.rssStub != null) {
1002        this.rssStub = null;
1003      }
1004      if (this.lockStub != null) {
1005        this.lockStub = null;
1006      }
1007      if (this.rpcClient != null) {
1008        this.rpcClient.close();
1009      }
1010      if (this.leaseManager != null) {
1011        this.leaseManager.close();
1012      }
1013      if (this.pauseMonitor != null) {
1014        this.pauseMonitor.stop();
1015      }
1016
1017      if (!killed) {
1018        stopServiceThreads();
1019      }
1020
1021      if (this.rpcServices != null) {
1022        this.rpcServices.stop();
1023      }
1024
1025      try {
1026        deleteMyEphemeralNode();
1027      } catch (KeeperException.NoNodeException nn) {
1028        // pass
1029      } catch (KeeperException e) {
1030        LOG.warn("Failed deleting my ephemeral node", e);
1031      }
1032      // We may have failed to delete the znode at the previous step, but
1033      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1034      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1035
1036      closeZooKeeper();
1037      closeTableDescriptors();
1038      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1039      span.setStatus(StatusCode.OK);
1040    } finally {
1041      span.end();
1042    }
1043  }
1044
1045  private boolean containsMetaTableRegions() {
1046    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1047  }
1048
1049  private boolean areAllUserRegionsOffline() {
1050    if (getNumberOfOnlineRegions() > 2) {
1051      return false;
1052    }
1053    boolean allUserRegionsOffline = true;
1054    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1055      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1056        allUserRegionsOffline = false;
1057        break;
1058      }
1059    }
1060    return allUserRegionsOffline;
1061  }
1062
1063  /** Returns Current write count for all online regions. */
1064  private long getWriteRequestCount() {
1065    long writeCount = 0;
1066    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1067      writeCount += e.getValue().getWriteRequestsCount();
1068    }
1069    return writeCount;
1070  }
1071
1072  @InterfaceAudience.Private
1073  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1074    throws IOException {
1075    RegionServerStatusService.BlockingInterface rss = rssStub;
1076    if (rss == null) {
1077      // the current server could be stopping.
1078      return;
1079    }
1080    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1081    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1082    try (Scope ignored = span.makeCurrent()) {
1083      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1084      request.setServer(ProtobufUtil.toServerName(this.serverName));
1085      request.setLoad(sl);
1086      rss.regionServerReport(null, request.build());
1087      span.setStatus(StatusCode.OK);
1088    } catch (ServiceException se) {
1089      IOException ioe = ProtobufUtil.getRemoteException(se);
1090      if (ioe instanceof YouAreDeadException) {
1091        // This will be caught and handled as a fatal error in run()
1092        TraceUtil.setError(span, ioe);
1093        throw ioe;
1094      }
1095      if (rssStub == rss) {
1096        rssStub = null;
1097      }
1098      TraceUtil.setError(span, se);
1099      // Couldn't connect to the master, get location from zk and reconnect
1100      // Method blocks until new master is found or we are stopped
1101      createRegionServerStatusStub(true);
1102    } finally {
1103      span.end();
1104    }
1105  }
1106
1107  /**
1108   * Reports the given map of Regions and their size on the filesystem to the active Master.
1109   * @param regionSizeStore The store containing region sizes
1110   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1111   */
1112  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1113    RegionServerStatusService.BlockingInterface rss = rssStub;
1114    if (rss == null) {
1115      // the current server could be stopping.
1116      LOG.trace("Skipping Region size report to HMaster as stub is null");
1117      return true;
1118    }
1119    try {
1120      buildReportAndSend(rss, regionSizeStore);
1121    } catch (ServiceException se) {
1122      IOException ioe = ProtobufUtil.getRemoteException(se);
1123      if (ioe instanceof PleaseHoldException) {
1124        LOG.trace("Failed to report region sizes to Master because it is initializing."
1125          + " This will be retried.", ioe);
1126        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1127        return true;
1128      }
1129      if (rssStub == rss) {
1130        rssStub = null;
1131      }
1132      createRegionServerStatusStub(true);
1133      if (ioe instanceof DoNotRetryIOException) {
1134        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1135        if (doNotRetryEx.getCause() != null) {
1136          Throwable t = doNotRetryEx.getCause();
1137          if (t instanceof UnsupportedOperationException) {
1138            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1139            return false;
1140          }
1141        }
1142      }
1143      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1144    }
1145    return true;
1146  }
1147
1148  /**
1149   * Builds the region size report and sends it to the master. Upon successful sending of the
1150   * report, the region sizes that were sent are marked as sent.
1151   * @param rss             The stub to send to the Master
1152   * @param regionSizeStore The store containing region sizes
1153   */
1154  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1155    RegionSizeStore regionSizeStore) throws ServiceException {
1156    RegionSpaceUseReportRequest request =
1157      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1158    rss.reportRegionSpaceUse(null, request);
1159    // Record the number of size reports sent
1160    if (metricsRegionServer != null) {
1161      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1162    }
1163  }
1164
1165  /**
1166   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1167   * @param regionSizes The size in bytes of regions
1168   * @return The corresponding protocol buffer message.
1169   */
1170  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1171    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1172    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1173      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1174    }
1175    return request.build();
1176  }
1177
1178  /**
1179   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1180   * message.
1181   * @param regionInfo  The RegionInfo
1182   * @param sizeInBytes The size in bytes of the Region
1183   * @return The protocol buffer
1184   */
1185  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1186    return RegionSpaceUse.newBuilder()
1187      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1188      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1189  }
1190
1191  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1192    throws IOException {
1193    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1194    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1195    // the wrapper to compute those numbers in one place.
1196    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1197    // Instead they should be stored in an HBase table so that external visibility into HBase is
1198    // improved; Additionally the load balancer will be able to take advantage of a more complete
1199    // history.
1200    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1201    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1202    long usedMemory = -1L;
1203    long maxMemory = -1L;
1204    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1205    if (usage != null) {
1206      usedMemory = usage.getUsed();
1207      maxMemory = usage.getMax();
1208    }
1209
1210    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1211    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1212    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1213    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1214    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1215    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1216    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1217    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1218    Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder();
1219    for (String coprocessor : coprocessors) {
1220      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1221    }
1222    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1223    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1224    for (HRegion region : regions) {
1225      if (region.getCoprocessorHost() != null) {
1226        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1227        for (String regionCoprocessor : regionCoprocessors) {
1228          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1229        }
1230      }
1231      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1232      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1233        .getCoprocessors()) {
1234        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1235      }
1236    }
1237
1238    getBlockCache().ifPresent(cache -> {
1239      cache.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1240        regionCachedInfo.forEach((regionName, prefetchSize) -> {
1241          serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB));
1242        });
1243      });
1244    });
1245    serverLoad.setCacheFreeSize(regionServerWrapper.getBlockCacheFreeSize());
1246    if (DataTieringManager.getInstance() != null) {
1247      DataTieringManager.getInstance().getRegionColdDataSize()
1248        .forEach((regionName, coldDataSize) -> serverLoad.putRegionColdData(regionName,
1249          roundSize(coldDataSize.getSecond(), unitMB)));
1250    }
1251    serverLoad.setReportStartTime(reportStartTime);
1252    serverLoad.setReportEndTime(reportEndTime);
1253    if (this.infoServer != null) {
1254      serverLoad.setInfoServerPort(this.infoServer.getPort());
1255    } else {
1256      serverLoad.setInfoServerPort(-1);
1257    }
1258    MetricsUserAggregateSource userSource =
1259      metricsRegionServer.getMetricsUserAggregate().getSource();
1260    if (userSource != null) {
1261      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1262      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1263        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1264      }
1265    }
1266
1267    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1268      // always refresh first to get the latest value
1269      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1270      if (rLoad != null) {
1271        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1272        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1273          .getReplicationLoadSourceEntries()) {
1274          serverLoad.addReplLoadSource(rLS);
1275        }
1276      }
1277    } else {
1278      if (replicationSourceHandler != null) {
1279        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1280        if (rLoad != null) {
1281          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1282            .getReplicationLoadSourceEntries()) {
1283            serverLoad.addReplLoadSource(rLS);
1284          }
1285        }
1286      }
1287      if (replicationSinkHandler != null) {
1288        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1289        if (rLoad != null) {
1290          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1291        }
1292      }
1293    }
1294
1295    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1296      .newBuilder().setDescription(task.getDescription())
1297      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1298      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1299      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1300
1301    return serverLoad.build();
1302  }
1303
1304  private String getOnlineRegionsAsPrintableString() {
1305    StringBuilder sb = new StringBuilder();
1306    for (Region r : this.onlineRegions.values()) {
1307      if (sb.length() > 0) {
1308        sb.append(", ");
1309      }
1310      sb.append(r.getRegionInfo().getEncodedName());
1311    }
1312    return sb.toString();
1313  }
1314
1315  /**
1316   * Wait on regions close.
1317   */
1318  private void waitOnAllRegionsToClose(final boolean abort) {
1319    // Wait till all regions are closed before going out.
1320    int lastCount = -1;
1321    long previousLogTime = 0;
1322    Set<String> closedRegions = new HashSet<>();
1323    boolean interrupted = false;
1324    try {
1325      while (!onlineRegions.isEmpty()) {
1326        int count = getNumberOfOnlineRegions();
1327        // Only print a message if the count of regions has changed.
1328        if (count != lastCount) {
1329          // Log every second at most
1330          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1331            previousLogTime = EnvironmentEdgeManager.currentTime();
1332            lastCount = count;
1333            LOG.info("Waiting on " + count + " regions to close");
1334            // Only print out regions still closing if a small number else will
1335            // swamp the log.
1336            if (count < 10 && LOG.isDebugEnabled()) {
1337              LOG.debug("Online Regions=" + this.onlineRegions);
1338            }
1339          }
1340        }
1341        // Ensure all user regions have been sent a close. Use this to
1342        // protect against the case where an open comes in after we start the
1343        // iterator of onlineRegions to close all user regions.
1344        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1345          RegionInfo hri = e.getValue().getRegionInfo();
1346          if (
1347            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1348              && !closedRegions.contains(hri.getEncodedName())
1349          ) {
1350            closedRegions.add(hri.getEncodedName());
1351            // Don't update zk with this close transition; pass false.
1352            closeRegionIgnoreErrors(hri, abort);
1353          }
1354        }
1355        // No regions in RIT, we could stop waiting now.
1356        if (this.regionsInTransitionInRS.isEmpty()) {
1357          if (!onlineRegions.isEmpty()) {
1358            LOG.info("We were exiting though online regions are not empty,"
1359              + " because some regions failed closing");
1360          }
1361          break;
1362        } else {
1363          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1364            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1365        }
1366        if (sleepInterrupted(200)) {
1367          interrupted = true;
1368        }
1369      }
1370    } finally {
1371      if (interrupted) {
1372        Thread.currentThread().interrupt();
1373      }
1374    }
1375  }
1376
1377  private static boolean sleepInterrupted(long millis) {
1378    boolean interrupted = false;
1379    try {
1380      Thread.sleep(millis);
1381    } catch (InterruptedException e) {
1382      LOG.warn("Interrupted while sleeping");
1383      interrupted = true;
1384    }
1385    return interrupted;
1386  }
1387
1388  private void shutdownWAL(final boolean close) {
1389    if (this.walFactory != null) {
1390      try {
1391        if (close) {
1392          walFactory.close();
1393        } else {
1394          walFactory.shutdown();
1395        }
1396      } catch (Throwable e) {
1397        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1398        LOG.error("Shutdown / close of WAL failed: " + e);
1399        LOG.debug("Shutdown / close exception details:", e);
1400      }
1401    }
1402  }
1403
1404  /**
1405   * Run init. Sets up wal and starts up all server threads.
1406   * @param c Extra configuration.
1407   */
1408  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1409    throws IOException {
1410    try {
1411      boolean updateRootDir = false;
1412      for (NameStringPair e : c.getMapEntriesList()) {
1413        String key = e.getName();
1414        // The hostname the master sees us as.
1415        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1416          String hostnameFromMasterPOV = e.getValue();
1417          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1418            rpcServices.getSocketAddress().getPort(), this.startcode);
1419          String expectedHostName = rpcServices.getSocketAddress().getHostName();
1420          // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it
1421          // is set to disable. so we will use the ip of the RegionServer to compare with the
1422          // hostname passed by the Master, see HBASE-27304 for details.
1423          if (
1424            StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent()
1425              && InetAddresses.isInetAddress(getActiveMaster().get().getHostname())
1426          ) {
1427            expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress();
1428          }
1429          boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead)
1430            ? hostnameFromMasterPOV.equals(expectedHostName)
1431            : hostnameFromMasterPOV.equals(useThisHostnameInstead);
1432          if (!isHostnameConsist) {
1433            String msg = "Master passed us a different hostname to use; was="
1434              + (StringUtils.isBlank(useThisHostnameInstead)
1435                ? expectedHostName
1436                : this.useThisHostnameInstead)
1437              + ", but now=" + hostnameFromMasterPOV;
1438            LOG.error(msg);
1439            throw new IOException(msg);
1440          }
1441          continue;
1442        }
1443
1444        String value = e.getValue();
1445        if (key.equals(HConstants.HBASE_DIR)) {
1446          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1447            updateRootDir = true;
1448          }
1449        }
1450
1451        if (LOG.isDebugEnabled()) {
1452          LOG.debug("Config from master: " + key + "=" + value);
1453        }
1454        this.conf.set(key, value);
1455      }
1456      // Set our ephemeral znode up in zookeeper now we have a name.
1457      createMyEphemeralNode();
1458
1459      if (updateRootDir) {
1460        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1461        initializeFileSystem();
1462      }
1463
1464      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1465      // config param for task trackers, but we can piggyback off of it.
1466      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1467        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1468      }
1469
1470      // Save it in a file, this will allow to see if we crash
1471      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1472
1473      // This call sets up an initialized replication and WAL. Later we start it up.
1474      setupWALAndReplication();
1475      // Init in here rather than in constructor after thread name has been set
1476      final MetricsTable metricsTable =
1477        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1478      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1479      this.metricsRegionServer =
1480        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1481      // Now that we have a metrics source, start the pause monitor
1482      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1483      pauseMonitor.start();
1484
1485      // There is a rare case where we do NOT want services to start. Check config.
1486      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1487        startServices();
1488      }
1489      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1490      // or make sense of it.
1491      startReplicationService();
1492
1493      // Set up ZK
1494      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress()
1495        + ", sessionid=0x"
1496        + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1497
1498      // Wake up anyone waiting for this server to online
1499      synchronized (online) {
1500        online.set(true);
1501        online.notifyAll();
1502      }
1503    } catch (Throwable e) {
1504      stop("Failed initialization");
1505      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1506    } finally {
1507      sleeper.skipSleepCycle();
1508    }
1509  }
1510
1511  private void startHeapMemoryManager() {
1512    if (this.blockCache != null) {
1513      this.hMemManager =
1514        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1515      this.hMemManager.start(getChoreService());
1516    }
1517  }
1518
1519  private void createMyEphemeralNode() throws KeeperException {
1520    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1521    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1522    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1523    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1524    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1525  }
1526
1527  private void deleteMyEphemeralNode() throws KeeperException {
1528    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1529  }
1530
1531  @Override
1532  public RegionServerAccounting getRegionServerAccounting() {
1533    return regionServerAccounting;
1534  }
1535
1536  // Round the size with KB or MB.
1537  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1538  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1539  // HBASE-26340 for more details on why this is important.
1540  private static int roundSize(long sizeInByte, int sizeUnit) {
1541    if (sizeInByte == 0) {
1542      return 0;
1543    } else if (sizeInByte < sizeUnit) {
1544      return 1;
1545    } else {
1546      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1547    }
1548  }
1549
1550  /**
1551   * @param r               Region to get RegionLoad for.
1552   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1553   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1554   * @return RegionLoad instance.
1555   */
1556  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1557    RegionSpecifier.Builder regionSpecifier) throws IOException {
1558    byte[] name = r.getRegionInfo().getRegionName();
1559    String regionEncodedName = r.getRegionInfo().getEncodedName();
1560    int stores = 0;
1561    int storefiles = 0;
1562    int storeRefCount = 0;
1563    int maxCompactedStoreFileRefCount = 0;
1564    long storeUncompressedSize = 0L;
1565    long storefileSize = 0L;
1566    long storefileIndexSize = 0L;
1567    long rootLevelIndexSize = 0L;
1568    long totalStaticIndexSize = 0L;
1569    long totalStaticBloomSize = 0L;
1570    long totalCompactingKVs = 0L;
1571    long currentCompactedKVs = 0L;
1572    long totalRegionSize = 0L;
1573    List<HStore> storeList = r.getStores();
1574    stores += storeList.size();
1575    for (HStore store : storeList) {
1576      storefiles += store.getStorefilesCount();
1577      int currentStoreRefCount = store.getStoreRefCount();
1578      storeRefCount += currentStoreRefCount;
1579      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1580      maxCompactedStoreFileRefCount =
1581        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1582      storeUncompressedSize += store.getStoreSizeUncompressed();
1583      storefileSize += store.getStorefilesSize();
1584      totalRegionSize += store.getHFilesSize();
1585      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1586      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1587      CompactionProgress progress = store.getCompactionProgress();
1588      if (progress != null) {
1589        totalCompactingKVs += progress.getTotalCompactingKVs();
1590        currentCompactedKVs += progress.currentCompactedKVs;
1591      }
1592      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1593      totalStaticIndexSize += store.getTotalStaticIndexSize();
1594      totalStaticBloomSize += store.getTotalStaticBloomSize();
1595    }
1596
1597    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1598    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1599    int storefileSizeMB = roundSize(storefileSize, unitMB);
1600    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1601    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1602    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1603    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1604    int regionSizeMB = roundSize(totalRegionSize, unitMB);
1605    final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f);
1606    getBlockCache().ifPresent(bc -> {
1607      bc.getRegionCachedInfo().ifPresent(regionCachedInfo -> {
1608        if (regionCachedInfo.containsKey(regionEncodedName)) {
1609          currentRegionCachedRatio.setValue(regionSizeMB == 0
1610            ? 0.0f
1611            : (float) roundSize(regionCachedInfo.get(regionEncodedName), unitMB) / regionSizeMB);
1612        }
1613      });
1614    });
1615    final MutableFloat currentRegionColdDataRatio = new MutableFloat(0.0f);
1616    if (DataTieringManager.getInstance() != null) {
1617      DataTieringManager.getInstance().getRegionColdDataSize().computeIfPresent(regionEncodedName,
1618        (k, v) -> {
1619          int coldSizeMB = roundSize(v.getSecond(), unitMB);
1620          currentRegionColdDataRatio
1621            .setValue(regionSizeMB == 0 ? 0.0f : (float) coldSizeMB / regionSizeMB);
1622          return v;
1623        });
1624    }
1625
1626    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1627    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1628    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1629    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1630    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1631    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1632    if (regionLoadBldr == null) {
1633      regionLoadBldr = RegionLoad.newBuilder();
1634    }
1635    if (regionSpecifier == null) {
1636      regionSpecifier = RegionSpecifier.newBuilder();
1637    }
1638
1639    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1640    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1641    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1642      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1643      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1644      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1645      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1646      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1647      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1648      .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount())
1649      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1650      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1651      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1652      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1653      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1654      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1655      .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB)
1656      .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue())
1657      .setCurrentRegionColdDataRatio(currentRegionColdDataRatio.floatValue());
1658    r.setCompleteSequenceId(regionLoadBldr);
1659    return regionLoadBldr.build();
1660  }
1661
1662  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1663    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1664    userLoadBldr.setUserName(user);
1665    userSource.getClientMetrics().values().stream()
1666      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1667        .setHostName(clientMetrics.getHostName())
1668        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1669        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1670        .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1671      .forEach(userLoadBldr::addClientMetrics);
1672    return userLoadBldr.build();
1673  }
1674
1675  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1676    HRegion r = onlineRegions.get(encodedRegionName);
1677    return r != null ? createRegionLoad(r, null, null) : null;
1678  }
1679
1680  /**
1681   * Inner class that runs on a long period checking if regions need compaction.
1682   */
1683  private static class CompactionChecker extends ScheduledChore {
1684    private final HRegionServer instance;
1685    private final int majorCompactPriority;
1686    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1687    // Iteration is 1-based rather than 0-based so we don't check for compaction
1688    // immediately upon region server startup
1689    private long iteration = 1;
1690
1691    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1692      super("CompactionChecker", stopper, sleepTime);
1693      this.instance = h;
1694      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1695
1696      /*
1697       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1698       */
1699      this.majorCompactPriority = this.instance.conf
1700        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1701    }
1702
1703    @Override
1704    protected void chore() {
1705      for (HRegion hr : this.instance.onlineRegions.values()) {
1706        // If region is read only or compaction is disabled at table level, there's no need to
1707        // iterate through region's stores
1708        if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {
1709          continue;
1710        }
1711
1712        for (HStore s : hr.stores.values()) {
1713          try {
1714            long multiplier = s.getCompactionCheckMultiplier();
1715            assert multiplier > 0;
1716            if (iteration % multiplier != 0) {
1717              continue;
1718            }
1719            if (s.needsCompaction()) {
1720              // Queue a compaction. Will recognize if major is needed.
1721              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1722                getName() + " requests compaction");
1723            } else if (s.shouldPerformMajorCompaction()) {
1724              s.triggerMajorCompaction();
1725              if (
1726                majorCompactPriority == DEFAULT_PRIORITY
1727                  || majorCompactPriority > hr.getCompactPriority()
1728              ) {
1729                this.instance.compactSplitThread.requestCompaction(hr, s,
1730                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
1731                  CompactionLifeCycleTracker.DUMMY, null);
1732              } else {
1733                this.instance.compactSplitThread.requestCompaction(hr, s,
1734                  getName() + " requests major compaction; use configured priority",
1735                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1736              }
1737            }
1738          } catch (IOException e) {
1739            LOG.warn("Failed major compaction check on " + hr, e);
1740          }
1741        }
1742      }
1743      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1744    }
1745  }
1746
1747  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1748    private final HRegionServer server;
1749    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1750    private final static int MIN_DELAY_TIME = 0; // millisec
1751    private final long rangeOfDelayMs;
1752
1753    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1754      super("MemstoreFlusherChore", server, cacheFlushInterval);
1755      this.server = server;
1756
1757      final long configuredRangeOfDelay = server.getConfiguration()
1758        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1759      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1760    }
1761
1762    @Override
1763    protected void chore() {
1764      final StringBuilder whyFlush = new StringBuilder();
1765      for (HRegion r : this.server.onlineRegions.values()) {
1766        if (r == null) {
1767          continue;
1768        }
1769        if (r.shouldFlush(whyFlush)) {
1770          FlushRequester requester = server.getFlushRequester();
1771          if (requester != null) {
1772            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1773            // Throttle the flushes by putting a delay. If we don't throttle, and there
1774            // is a balanced write-load on the regions in a table, we might end up
1775            // overwhelming the filesystem with too many flushes at once.
1776            if (requester.requestDelayedFlush(r, delay)) {
1777              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
1778                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
1779            }
1780          }
1781        }
1782      }
1783    }
1784  }
1785
1786  /**
1787   * Report the status of the server. A server is online once all the startup is completed (setting
1788   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
1789   * useful in tests.
1790   * @return true if online, false if not.
1791   */
1792  public boolean isOnline() {
1793    return online.get();
1794  }
1795
1796  /**
1797   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1798   * be hooked up to WAL.
1799   */
1800  private void setupWALAndReplication() throws IOException {
1801    WALFactory factory = new WALFactory(conf, serverName, this);
1802    // TODO Replication make assumptions here based on the default filesystem impl
1803    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1804    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1805
1806    Path logDir = new Path(walRootDir, logName);
1807    LOG.debug("logDir={}", logDir);
1808    if (this.walFs.exists(logDir)) {
1809      throw new RegionServerRunningException(
1810        "Region server has already created directory at " + this.serverName.toString());
1811    }
1812    // Create wal directory here and we will never create it again in other places. This is
1813    // important to make sure that our fencing way takes effect. See HBASE-29797 for more details.
1814    if (!this.walFs.mkdirs(logDir)) {
1815      throw new IOException("Can not create wal directory " + logDir);
1816    }
1817    // Instantiate replication if replication enabled. Pass it the log directories.
1818    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1819
1820    WALActionsListener walEventListener = getWALEventTrackerListener(conf);
1821    if (walEventListener != null && factory.getWALProvider() != null) {
1822      factory.getWALProvider().addWALActionsListener(walEventListener);
1823    }
1824    this.walFactory = factory;
1825  }
1826
1827  private WALActionsListener getWALEventTrackerListener(Configuration conf) {
1828    if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
1829      WALEventTrackerListener listener =
1830        new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
1831      return listener;
1832    }
1833    return null;
1834  }
1835
1836  /**
1837   * Start up replication source and sink handlers.
1838   */
1839  private void startReplicationService() throws IOException {
1840    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1841      this.replicationSourceHandler.startReplicationService();
1842    } else {
1843      if (this.replicationSourceHandler != null) {
1844        this.replicationSourceHandler.startReplicationService();
1845      }
1846      if (this.replicationSinkHandler != null) {
1847        this.replicationSinkHandler.startReplicationService();
1848      }
1849    }
1850  }
1851
1852  /** Returns Master address tracker instance. */
1853  public MasterAddressTracker getMasterAddressTracker() {
1854    return this.masterAddressTracker;
1855  }
1856
1857  /**
1858   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
1859   * to run. This is called after we've successfully registered with the Master. Install an
1860   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
1861   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
1862   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
1863   * to run should trigger same critical condition and the shutdown will run. On its way out, this
1864   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
1865   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
1866   * by this hosting server. Worker logs the exception and exits.
1867   */
1868  private void startServices() throws IOException {
1869    if (!isStopped() && !isAborted()) {
1870      initializeThreads();
1871    }
1872    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1873    this.secureBulkLoadManager.start();
1874
1875    // Health checker thread.
1876    if (isHealthCheckerConfigured()) {
1877      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1878        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1879      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1880    }
1881    // Executor status collect thread.
1882    if (
1883      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1884        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
1885    ) {
1886      int sleepTime =
1887        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1888      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1889        this.metricsRegionServer.getMetricsSource());
1890    }
1891
1892    this.walRoller = new LogRoller(this);
1893    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1894    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1895
1896    // Create the CompactedFileDischarger chore executorService. This chore helps to
1897    // remove the compacted files that will no longer be used in reads.
1898    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1899    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1900    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1901    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
1902    choreService.scheduleChore(compactedFileDischarger);
1903
1904    // Start executor services
1905    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1906    executorService.startExecutorService(executorService.new ExecutorConfig()
1907      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1908    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1909    executorService.startExecutorService(executorService.new ExecutorConfig()
1910      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1911    final int openPriorityRegionThreads =
1912      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1913    executorService.startExecutorService(
1914      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
1915        .setCorePoolSize(openPriorityRegionThreads));
1916    final int closeRegionThreads =
1917      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1918    executorService.startExecutorService(executorService.new ExecutorConfig()
1919      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1920    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1921    executorService.startExecutorService(executorService.new ExecutorConfig()
1922      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1923    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1924      final int storeScannerParallelSeekThreads =
1925        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1926      executorService.startExecutorService(
1927        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
1928          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
1929    }
1930    final int logReplayOpsThreads =
1931      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1932    executorService.startExecutorService(
1933      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
1934        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
1935    // Start the threads for compacted files discharger
1936    final int compactionDischargerThreads =
1937      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1938    executorService.startExecutorService(executorService.new ExecutorConfig()
1939      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
1940      .setCorePoolSize(compactionDischargerThreads));
1941    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1942      final int regionReplicaFlushThreads =
1943        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1944          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1945      executorService.startExecutorService(executorService.new ExecutorConfig()
1946        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
1947        .setCorePoolSize(regionReplicaFlushThreads));
1948    }
1949    final int refreshPeerThreads =
1950      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1951    executorService.startExecutorService(executorService.new ExecutorConfig()
1952      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1953    final int replaySyncReplicationWALThreads =
1954      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1955    executorService.startExecutorService(executorService.new ExecutorConfig()
1956      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
1957      .setCorePoolSize(replaySyncReplicationWALThreads));
1958    final int switchRpcThrottleThreads =
1959      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1960    executorService.startExecutorService(
1961      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
1962        .setCorePoolSize(switchRpcThrottleThreads));
1963    final int claimReplicationQueueThreads =
1964      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1965    executorService.startExecutorService(
1966      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
1967        .setCorePoolSize(claimReplicationQueueThreads));
1968    final int rsSnapshotOperationThreads =
1969      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1970    executorService.startExecutorService(
1971      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
1972        .setCorePoolSize(rsSnapshotOperationThreads));
1973    final int rsFlushOperationThreads =
1974      conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3);
1975    executorService.startExecutorService(executorService.new ExecutorConfig()
1976      .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads));
1977    final int rsRefreshQuotasThreads =
1978      conf.getInt("hbase.regionserver.executor.refresh.quotas.threads", 1);
1979    executorService.startExecutorService(
1980      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS)
1981        .setCorePoolSize(rsRefreshQuotasThreads));
1982    final int logRollThreads = conf.getInt("hbase.regionserver.executor.log.roll.threads", 1);
1983    executorService.startExecutorService(executorService.new ExecutorConfig()
1984      .setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads));
1985
1986    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1987      uncaughtExceptionHandler);
1988    if (this.cacheFlusher != null) {
1989      this.cacheFlusher.start(uncaughtExceptionHandler);
1990    }
1991    Threads.setDaemonThreadRunning(this.procedureResultReporter,
1992      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1993
1994    if (this.compactionChecker != null) {
1995      choreService.scheduleChore(compactionChecker);
1996    }
1997    if (this.periodicFlusher != null) {
1998      choreService.scheduleChore(periodicFlusher);
1999    }
2000    if (this.healthCheckChore != null) {
2001      choreService.scheduleChore(healthCheckChore);
2002    }
2003    if (this.executorStatusChore != null) {
2004      choreService.scheduleChore(executorStatusChore);
2005    }
2006    if (this.nonceManagerChore != null) {
2007      choreService.scheduleChore(nonceManagerChore);
2008    }
2009    if (this.storefileRefresher != null) {
2010      choreService.scheduleChore(storefileRefresher);
2011    }
2012    if (this.fsUtilizationChore != null) {
2013      choreService.scheduleChore(fsUtilizationChore);
2014    }
2015    if (this.namedQueueServiceChore != null) {
2016      choreService.scheduleChore(namedQueueServiceChore);
2017    }
2018    if (this.brokenStoreFileCleaner != null) {
2019      choreService.scheduleChore(brokenStoreFileCleaner);
2020    }
2021    if (this.rsMobFileCleanerChore != null) {
2022      choreService.scheduleChore(rsMobFileCleanerChore);
2023    }
2024    if (replicationMarkerChore != null) {
2025      LOG.info("Starting replication marker chore");
2026      choreService.scheduleChore(replicationMarkerChore);
2027    }
2028
2029    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2030    // an unhandled exception, it will just exit.
2031    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2032      uncaughtExceptionHandler);
2033
2034    // Create the log splitting worker and start it
2035    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2036    // quite a while inside Connection layer. The worker won't be available for other
2037    // tasks even after current task is preempted after a split task times out.
2038    Configuration sinkConf = HBaseConfiguration.create(conf);
2039    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2040      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2041    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2042      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2043    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2044    if (
2045      this.csm != null
2046        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
2047    ) {
2048      // SplitLogWorker needs csm. If none, don't start this.
2049      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
2050      splitLogWorker.start();
2051      LOG.debug("SplitLogWorker started");
2052    }
2053
2054    // Memstore services.
2055    startHeapMemoryManager();
2056    // Call it after starting HeapMemoryManager.
2057    initializeMemStoreChunkCreator(hMemManager);
2058  }
2059
2060  private void initializeThreads() {
2061    // Cache flushing thread.
2062    this.cacheFlusher = new MemStoreFlusher(conf, this);
2063
2064    // Compaction thread
2065    this.compactSplitThread = new CompactSplit(this);
2066
2067    // Prefetch Notifier
2068    this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
2069
2070    // Background thread to check for compactions; needed if region has not gotten updates
2071    // in a while. It will take care of not checking too frequently on store-by-store basis.
2072    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2073    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2074    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2075
2076    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2077      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2078    final boolean walEventTrackerEnabled =
2079      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
2080
2081    if (isSlowLogTableEnabled || walEventTrackerEnabled) {
2082      // default chore duration: 10 min
2083      // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
2084      final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
2085        DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
2086
2087      final int namedQueueChoreDuration =
2088        conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
2089      // Considering min of slowLogChoreDuration and namedQueueChoreDuration
2090      int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
2091
2092      namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
2093        this.namedQueueRecorder, this.getConnection());
2094    }
2095
2096    if (this.nonceManager != null) {
2097      // Create the scheduled chore that cleans up nonces.
2098      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2099    }
2100
2101    // Setup the Quota Manager
2102    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2103    configurationManager.registerObserver(rsQuotaManager);
2104    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2105
2106    if (QuotaUtil.isQuotaEnabled(conf)) {
2107      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2108    }
2109
2110    boolean onlyMetaRefresh = false;
2111    int storefileRefreshPeriod =
2112      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2113        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2114    if (storefileRefreshPeriod == 0) {
2115      storefileRefreshPeriod =
2116        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2117          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2118      onlyMetaRefresh = true;
2119    }
2120    if (storefileRefreshPeriod > 0) {
2121      this.storefileRefresher =
2122        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
2123    }
2124
2125    int brokenStoreFileCleanerPeriod =
2126      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2127        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2128    int brokenStoreFileCleanerDelay =
2129      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2130        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2131    double brokenStoreFileCleanerDelayJitter =
2132      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2133        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2134    double jitterRate =
2135      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2136    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2137    this.brokenStoreFileCleaner =
2138      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2139        brokenStoreFileCleanerPeriod, this, conf, this);
2140
2141    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
2142
2143    registerConfigurationObservers();
2144    initializeReplicationMarkerChore();
2145  }
2146
2147  private void registerConfigurationObservers() {
2148    // Register Replication if possible, as now we support recreating replication peer storage, for
2149    // migrating across different replication peer storages online
2150    if (replicationSourceHandler instanceof ConfigurationObserver) {
2151      configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
2152    }
2153    if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
2154      configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
2155    }
2156    // Registering the compactSplitThread object with the ConfigurationManager.
2157    configurationManager.registerObserver(this.compactSplitThread);
2158    configurationManager.registerObserver(this.cacheFlusher);
2159    configurationManager.registerObserver(this.rpcServices);
2160    configurationManager.registerObserver(this.prefetchExecutorNotifier);
2161    configurationManager.registerObserver(this);
2162  }
2163
2164  /*
2165   * Verify that server is healthy
2166   */
2167  private boolean isHealthy() {
2168    if (!dataFsOk) {
2169      // File system problem
2170      return false;
2171    }
2172    // Verify that all threads are alive
2173    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2174      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2175      && (this.walRoller == null || this.walRoller.isAlive())
2176      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2177      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2178    if (!healthy) {
2179      stop("One or more threads are no longer alive -- stop");
2180    }
2181    return healthy;
2182  }
2183
2184  @Override
2185  public List<WAL> getWALs() {
2186    return walFactory.getWALs();
2187  }
2188
2189  @Override
2190  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2191    WAL wal = walFactory.getWAL(regionInfo);
2192    if (this.walRoller != null) {
2193      this.walRoller.addWAL(wal);
2194    }
2195    return wal;
2196  }
2197
2198  public LogRoller getWalRoller() {
2199    return walRoller;
2200  }
2201
2202  public WALFactory getWalFactory() {
2203    return walFactory;
2204  }
2205
2206  @Override
2207  public void stop(final String msg) {
2208    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2209  }
2210
2211  /**
2212   * Stops the regionserver.
2213   * @param msg   Status message
2214   * @param force True if this is a regionserver abort
2215   * @param user  The user executing the stop request, or null if no user is associated
2216   */
2217  public void stop(final String msg, final boolean force, final User user) {
2218    if (!this.stopped) {
2219      LOG.info("***** STOPPING region server '{}' *****", this);
2220      if (this.rsHost != null) {
2221        // when forced via abort don't allow CPs to override
2222        try {
2223          this.rsHost.preStop(msg, user);
2224        } catch (IOException ioe) {
2225          if (!force) {
2226            LOG.warn("The region server did not stop", ioe);
2227            return;
2228          }
2229          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2230        }
2231      }
2232      this.stopped = true;
2233      LOG.info("STOPPED: " + msg);
2234      // Wakes run() if it is sleeping
2235      sleeper.skipSleepCycle();
2236    }
2237  }
2238
2239  public void waitForServerOnline() {
2240    while (!isStopped() && !isOnline()) {
2241      synchronized (online) {
2242        try {
2243          online.wait(msgInterval);
2244        } catch (InterruptedException ie) {
2245          Thread.currentThread().interrupt();
2246          break;
2247        }
2248      }
2249    }
2250  }
2251
2252  @Override
2253  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2254    HRegion r = context.getRegion();
2255    long openProcId = context.getOpenProcId();
2256    long masterSystemTime = context.getMasterSystemTime();
2257    long initiatingMasterActiveTime = context.getInitiatingMasterActiveTime();
2258    rpcServices.checkOpen();
2259    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2260      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2261    // Do checks to see if we need to compact (references or too many files)
2262    // Skip compaction check if region is read only
2263    if (!r.isReadOnly()) {
2264      for (HStore s : r.stores.values()) {
2265        if (s.hasReferences() || s.needsCompaction()) {
2266          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2267        }
2268      }
2269    }
2270    long openSeqNum = r.getOpenSeqNum();
2271    if (openSeqNum == HConstants.NO_SEQNUM) {
2272      // If we opened a region, we should have read some sequence number from it.
2273      LOG.error(
2274        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2275      openSeqNum = 0;
2276    }
2277
2278    // Notify master
2279    if (
2280      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2281        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo(), initiatingMasterActiveTime))
2282    ) {
2283      throw new IOException(
2284        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2285    }
2286
2287    triggerFlushInPrimaryRegion(r);
2288
2289    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2290  }
2291
2292  /**
2293   * Helper method for use in tests. Skip the region transition report when there's no master around
2294   * to receive it.
2295   */
2296  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2297    final TransitionCode code = context.getCode();
2298    final long openSeqNum = context.getOpenSeqNum();
2299    long masterSystemTime = context.getMasterSystemTime();
2300    final RegionInfo[] hris = context.getHris();
2301
2302    if (code == TransitionCode.OPENED) {
2303      Preconditions.checkArgument(hris != null && hris.length == 1);
2304      if (hris[0].isMetaRegion()) {
2305        LOG.warn(
2306          "meta table location is stored in master local store, so we can not skip reporting");
2307        return false;
2308      } else {
2309        try {
2310          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2311            serverName, openSeqNum, masterSystemTime);
2312        } catch (IOException e) {
2313          LOG.info("Failed to update meta", e);
2314          return false;
2315        }
2316      }
2317    }
2318    return true;
2319  }
2320
2321  private ReportRegionStateTransitionRequest
2322    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2323    final TransitionCode code = context.getCode();
2324    final long openSeqNum = context.getOpenSeqNum();
2325    final RegionInfo[] hris = context.getHris();
2326    final long[] procIds = context.getProcIds();
2327
2328    ReportRegionStateTransitionRequest.Builder builder =
2329      ReportRegionStateTransitionRequest.newBuilder();
2330    builder.setServer(ProtobufUtil.toServerName(serverName));
2331    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2332    transition.setTransitionCode(code);
2333    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2334      transition.setOpenSeqNum(openSeqNum);
2335    }
2336    for (RegionInfo hri : hris) {
2337      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2338    }
2339    for (long procId : procIds) {
2340      transition.addProcId(procId);
2341    }
2342    transition.setInitiatingMasterActiveTime(context.getInitiatingMasterActiveTime());
2343
2344    return builder.build();
2345  }
2346
2347  @Override
2348  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2349    if (TEST_SKIP_REPORTING_TRANSITION) {
2350      return skipReportingTransition(context);
2351    }
2352    final ReportRegionStateTransitionRequest request =
2353      createReportRegionStateTransitionRequest(context);
2354
2355    int tries = 0;
2356    long pauseTime = this.retryPauseTime;
2357    // Keep looping till we get an error. We want to send reports even though server is going down.
2358    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2359    // HRegionServer does down.
2360    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2361      RegionServerStatusService.BlockingInterface rss = rssStub;
2362      try {
2363        if (rss == null) {
2364          createRegionServerStatusStub();
2365          continue;
2366        }
2367        ReportRegionStateTransitionResponse response =
2368          rss.reportRegionStateTransition(null, request);
2369        if (response.hasErrorMessage()) {
2370          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2371          break;
2372        }
2373        // Log if we had to retry else don't log unless TRACE. We want to
2374        // know if were successful after an attempt showed in logs as failed.
2375        if (tries > 0 || LOG.isTraceEnabled()) {
2376          LOG.info("TRANSITION REPORTED " + request);
2377        }
2378        // NOTE: Return mid-method!!!
2379        return true;
2380      } catch (ServiceException se) {
2381        IOException ioe = ProtobufUtil.getRemoteException(se);
2382        boolean pause = ioe instanceof ServerNotRunningYetException
2383          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2384        if (pause) {
2385          // Do backoff else we flood the Master with requests.
2386          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2387        } else {
2388          pauseTime = this.retryPauseTime; // Reset.
2389        }
2390        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2391          + tries + ")"
2392          + (pause
2393            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2394            : " immediately."),
2395          ioe);
2396        if (pause) {
2397          Threads.sleep(pauseTime);
2398        }
2399        tries++;
2400        if (rssStub == rss) {
2401          rssStub = null;
2402        }
2403      }
2404    }
2405    return false;
2406  }
2407
2408  /**
2409   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2410   * block this thread. See RegionReplicaFlushHandler for details.
2411   */
2412  private void triggerFlushInPrimaryRegion(final HRegion region) {
2413    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2414      return;
2415    }
2416    TableName tn = region.getTableDescriptor().getTableName();
2417    if (
2418      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2419        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2420        // If the memstore replication not setup, we do not have to wait for observing a flush event
2421        // from primary before starting to serve reads, because gaps from replication is not
2422        // applicable,this logic is from
2423        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2424        // HBASE-13063
2425        !region.getTableDescriptor().hasRegionMemStoreReplication()
2426    ) {
2427      region.setReadsEnabled(true);
2428      return;
2429    }
2430
2431    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2432    // RegionReplicaFlushHandler might reset this.
2433
2434    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2435    if (this.executorService != null) {
2436      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2437    } else {
2438      LOG.info("Executor is null; not running flush of primary region replica for {}",
2439        region.getRegionInfo());
2440    }
2441  }
2442
2443  @InterfaceAudience.Private
2444  public RSRpcServices getRSRpcServices() {
2445    return rpcServices;
2446  }
2447
2448  /**
2449   * Cause the server to exit without closing the regions it is serving, the log it is using and
2450   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2451   * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused
2452   * the abort, or null
2453   */
2454  @Override
2455  public void abort(String reason, Throwable cause) {
2456    if (!setAbortRequested()) {
2457      // Abort already in progress, ignore the new request.
2458      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2459      return;
2460    }
2461    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2462    if (cause != null) {
2463      LOG.error(HBaseMarkers.FATAL, msg, cause);
2464    } else {
2465      LOG.error(HBaseMarkers.FATAL, msg);
2466    }
2467    // HBASE-4014: show list of coprocessors that were loaded to help debug
2468    // regionserver crashes.Note that we're implicitly using
2469    // java.util.HashSet's toString() method to print the coprocessor names.
2470    LOG.error(HBaseMarkers.FATAL,
2471      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2472    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2473    try {
2474      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2475    } catch (MalformedObjectNameException | IOException e) {
2476      LOG.warn("Failed dumping metrics", e);
2477    }
2478
2479    // Do our best to report our abort to the master, but this may not work
2480    try {
2481      if (cause != null) {
2482        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2483      }
2484      // Report to the master but only if we have already registered with the master.
2485      RegionServerStatusService.BlockingInterface rss = rssStub;
2486      if (rss != null && this.serverName != null) {
2487        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2488        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2489        builder.setErrorMessage(msg);
2490        rss.reportRSFatalError(null, builder.build());
2491      }
2492    } catch (Throwable t) {
2493      LOG.warn("Unable to report fatal error to master", t);
2494    }
2495
2496    scheduleAbortTimer();
2497    // shutdown should be run as the internal user
2498    stop(reason, true, null);
2499  }
2500
2501  /*
2502   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2503   * close socket in case want to bring up server on old hostname+port immediately.
2504   */
2505  @InterfaceAudience.Private
2506  protected void kill() {
2507    this.killed = true;
2508    abort("Simulated kill");
2509  }
2510
2511  // Limits the time spent in the shutdown process.
2512  private void scheduleAbortTimer() {
2513    if (this.abortMonitor == null) {
2514      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2515      TimerTask abortTimeoutTask = null;
2516      try {
2517        Constructor<? extends TimerTask> timerTaskCtor =
2518          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2519            .asSubclass(TimerTask.class).getDeclaredConstructor();
2520        timerTaskCtor.setAccessible(true);
2521        abortTimeoutTask = timerTaskCtor.newInstance();
2522      } catch (Exception e) {
2523        LOG.warn("Initialize abort timeout task failed", e);
2524      }
2525      if (abortTimeoutTask != null) {
2526        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2527      }
2528    }
2529  }
2530
2531  /**
2532   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2533   * called.
2534   */
2535  protected void stopServiceThreads() {
2536    // clean up the scheduled chores
2537    stopChoreService();
2538    if (bootstrapNodeManager != null) {
2539      bootstrapNodeManager.stop();
2540    }
2541    if (this.cacheFlusher != null) {
2542      this.cacheFlusher.shutdown();
2543    }
2544    if (this.walRoller != null) {
2545      this.walRoller.close();
2546    }
2547    if (this.compactSplitThread != null) {
2548      this.compactSplitThread.join();
2549    }
2550    stopExecutorService();
2551    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2552      this.replicationSourceHandler.stopReplicationService();
2553    } else {
2554      if (this.replicationSourceHandler != null) {
2555        this.replicationSourceHandler.stopReplicationService();
2556      }
2557      if (this.replicationSinkHandler != null) {
2558        this.replicationSinkHandler.stopReplicationService();
2559      }
2560    }
2561  }
2562
2563  /** Returns Return the object that implements the replication source executorService. */
2564  @Override
2565  public ReplicationSourceService getReplicationSourceService() {
2566    return replicationSourceHandler;
2567  }
2568
2569  /** Returns Return the object that implements the replication sink executorService. */
2570  public ReplicationSinkService getReplicationSinkService() {
2571    return replicationSinkHandler;
2572  }
2573
2574  /**
2575   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2576   * connection, the current rssStub must be null. Method will block until a master is available.
2577   * You can break from this block by requesting the server stop.
2578   * @return master + port, or null if server has been stopped
2579   */
2580  private synchronized ServerName createRegionServerStatusStub() {
2581    // Create RS stub without refreshing the master node from ZK, use cached data
2582    return createRegionServerStatusStub(false);
2583  }
2584
2585  /**
2586   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2587   * connection, the current rssStub must be null. Method will block until a master is available.
2588   * You can break from this block by requesting the server stop.
2589   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2590   * @return master + port, or null if server has been stopped
2591   */
2592  @InterfaceAudience.Private
2593  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2594    if (rssStub != null) {
2595      return masterAddressTracker.getMasterAddress();
2596    }
2597    ServerName sn = null;
2598    long previousLogTime = 0;
2599    RegionServerStatusService.BlockingInterface intRssStub = null;
2600    LockService.BlockingInterface intLockStub = null;
2601    boolean interrupted = false;
2602    try {
2603      while (keepLooping()) {
2604        sn = this.masterAddressTracker.getMasterAddress(refresh);
2605        if (sn == null) {
2606          if (!keepLooping()) {
2607            // give up with no connection.
2608            LOG.debug("No master found and cluster is stopped; bailing out");
2609            return null;
2610          }
2611          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2612            LOG.debug("No master found; retry");
2613            previousLogTime = EnvironmentEdgeManager.currentTime();
2614          }
2615          refresh = true; // let's try pull it from ZK directly
2616          if (sleepInterrupted(200)) {
2617            interrupted = true;
2618          }
2619          continue;
2620        }
2621        try {
2622          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
2623            userProvider.getCurrent(), shortOperationTimeout);
2624          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2625          intLockStub = LockService.newBlockingStub(channel);
2626          break;
2627        } catch (IOException e) {
2628          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2629            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
2630            if (e instanceof ServerNotRunningYetException) {
2631              LOG.info("Master isn't available yet, retrying");
2632            } else {
2633              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2634            }
2635            previousLogTime = EnvironmentEdgeManager.currentTime();
2636          }
2637          if (sleepInterrupted(200)) {
2638            interrupted = true;
2639          }
2640        }
2641      }
2642    } finally {
2643      if (interrupted) {
2644        Thread.currentThread().interrupt();
2645      }
2646    }
2647    this.rssStub = intRssStub;
2648    this.lockStub = intLockStub;
2649    return sn;
2650  }
2651
2652  /**
2653   * @return True if we should break loop because cluster is going down or this server has been
2654   *         stopped or hdfs has gone bad.
2655   */
2656  private boolean keepLooping() {
2657    return !this.stopped && isClusterUp();
2658  }
2659
2660  /*
2661   * Let the master know we're here Run initialization using parameters passed us by the master.
2662   * @return A Map of key/value configurations we got from the Master else null if we failed to
2663   * register.
2664   */
2665  private RegionServerStartupResponse reportForDuty() throws IOException {
2666    if (this.masterless) {
2667      return RegionServerStartupResponse.getDefaultInstance();
2668    }
2669    ServerName masterServerName = createRegionServerStatusStub(true);
2670    RegionServerStatusService.BlockingInterface rss = rssStub;
2671    if (masterServerName == null || rss == null) {
2672      return null;
2673    }
2674    RegionServerStartupResponse result = null;
2675    try {
2676      rpcServices.requestCount.reset();
2677      rpcServices.rpcGetRequestCount.reset();
2678      rpcServices.rpcScanRequestCount.reset();
2679      rpcServices.rpcFullScanRequestCount.reset();
2680      rpcServices.rpcMultiRequestCount.reset();
2681      rpcServices.rpcMutateRequestCount.reset();
2682      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2683        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2684      long now = EnvironmentEdgeManager.currentTime();
2685      int port = rpcServices.getSocketAddress().getPort();
2686      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2687      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2688        request.setUseThisHostnameInstead(useThisHostnameInstead);
2689      }
2690      request.setPort(port);
2691      request.setServerStartCode(this.startcode);
2692      request.setServerCurrentTime(now);
2693      result = rss.regionServerStartup(null, request.build());
2694    } catch (ServiceException se) {
2695      IOException ioe = ProtobufUtil.getRemoteException(se);
2696      if (ioe instanceof ClockOutOfSyncException) {
2697        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
2698        // Re-throw IOE will cause RS to abort
2699        throw ioe;
2700      } else if (ioe instanceof DecommissionedHostRejectedException) {
2701        LOG.error(HBaseMarkers.FATAL,
2702          "Master rejected startup because the host is considered decommissioned", ioe);
2703        // Re-throw IOE will cause RS to abort
2704        throw ioe;
2705      } else if (ioe instanceof ServerNotRunningYetException) {
2706        LOG.debug("Master is not running yet");
2707      } else {
2708        LOG.warn("error telling master we are up", se);
2709      }
2710      rssStub = null;
2711    }
2712    return result;
2713  }
2714
2715  @Override
2716  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2717    try {
2718      GetLastFlushedSequenceIdRequest req =
2719        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2720      RegionServerStatusService.BlockingInterface rss = rssStub;
2721      if (rss == null) { // Try to connect one more time
2722        createRegionServerStatusStub();
2723        rss = rssStub;
2724        if (rss == null) {
2725          // Still no luck, we tried
2726          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2727          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2728            .build();
2729        }
2730      }
2731      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2732      return RegionStoreSequenceIds.newBuilder()
2733        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2734        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2735    } catch (ServiceException e) {
2736      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2737      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2738        .build();
2739    }
2740  }
2741
2742  /**
2743   * Close meta region if we carry it
2744   * @param abort Whether we're running an abort.
2745   */
2746  private void closeMetaTableRegions(final boolean abort) {
2747    HRegion meta = null;
2748    this.onlineRegionsLock.writeLock().lock();
2749    try {
2750      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
2751        RegionInfo hri = e.getValue().getRegionInfo();
2752        if (hri.isMetaRegion()) {
2753          meta = e.getValue();
2754        }
2755        if (meta != null) {
2756          break;
2757        }
2758      }
2759    } finally {
2760      this.onlineRegionsLock.writeLock().unlock();
2761    }
2762    if (meta != null) {
2763      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2764    }
2765  }
2766
2767  /**
2768   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
2769   * close regions that are already closed or that are closing.
2770   * @param abort Whether we're running an abort.
2771   */
2772  private void closeUserRegions(final boolean abort) {
2773    this.onlineRegionsLock.writeLock().lock();
2774    try {
2775      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
2776        HRegion r = e.getValue();
2777        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2778          // Don't update zk with this close transition; pass false.
2779          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2780        }
2781      }
2782    } finally {
2783      this.onlineRegionsLock.writeLock().unlock();
2784    }
2785  }
2786
2787  protected Map<String, HRegion> getOnlineRegions() {
2788    return this.onlineRegions;
2789  }
2790
2791  public int getNumberOfOnlineRegions() {
2792    return this.onlineRegions.size();
2793  }
2794
2795  /**
2796   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
2797   * as client; HRegion cannot be serialized to cross an rpc.
2798   */
2799  public Collection<HRegion> getOnlineRegionsLocalContext() {
2800    Collection<HRegion> regions = this.onlineRegions.values();
2801    return Collections.unmodifiableCollection(regions);
2802  }
2803
2804  @Override
2805  public void addRegion(HRegion region) {
2806    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2807    configurationManager.registerObserver(region);
2808  }
2809
2810  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2811    long size) {
2812    if (!sortedRegions.containsKey(size)) {
2813      sortedRegions.put(size, new ArrayList<>());
2814    }
2815    sortedRegions.get(size).add(region);
2816  }
2817
2818  /**
2819   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2820   *         the biggest.
2821   */
2822  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2823    // we'll sort the regions in reverse
2824    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2825    // Copy over all regions. Regions are sorted by size with biggest first.
2826    for (HRegion region : this.onlineRegions.values()) {
2827      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2828    }
2829    return sortedRegions;
2830  }
2831
2832  /**
2833   * @return A new Map of online regions sorted by region heap size with the first entry being the
2834   *         biggest.
2835   */
2836  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2837    // we'll sort the regions in reverse
2838    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2839    // Copy over all regions. Regions are sorted by size with biggest first.
2840    for (HRegion region : this.onlineRegions.values()) {
2841      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2842    }
2843    return sortedRegions;
2844  }
2845
2846  /** Returns reference to FlushRequester */
2847  @Override
2848  public FlushRequester getFlushRequester() {
2849    return this.cacheFlusher;
2850  }
2851
2852  @Override
2853  public CompactionRequester getCompactionRequestor() {
2854    return this.compactSplitThread;
2855  }
2856
2857  @Override
2858  public LeaseManager getLeaseManager() {
2859    return leaseManager;
2860  }
2861
2862  /** Returns {@code true} when the data file system is available, {@code false} otherwise. */
2863  boolean isDataFileSystemOk() {
2864    return this.dataFsOk;
2865  }
2866
2867  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
2868    return this.rsHost;
2869  }
2870
2871  @Override
2872  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2873    return this.regionsInTransitionInRS;
2874  }
2875
2876  @Override
2877  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2878    return rsQuotaManager;
2879  }
2880
2881  //
2882  // Main program and support routines
2883  //
2884  /**
2885   * Load the replication executorService objects, if any
2886   */
2887  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2888    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2889    // read in the name of the source replication class from the config file.
2890    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2891      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2892
2893    // read in the name of the sink replication class from the config file.
2894    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2895      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2896
2897    // If both the sink and the source class names are the same, then instantiate
2898    // only one object.
2899    if (sourceClassname.equals(sinkClassname)) {
2900      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2901        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2902      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2903      server.sameReplicationSourceAndSink = true;
2904    } else {
2905      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2906        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2907      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2908        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2909      server.sameReplicationSourceAndSink = false;
2910    }
2911  }
2912
2913  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2914    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2915    Path oldLogDir, WALFactory walFactory) throws IOException {
2916    final Class<? extends T> clazz;
2917    try {
2918      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2919      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2920    } catch (java.lang.ClassNotFoundException nfe) {
2921      throw new IOException("Could not find class for " + classname);
2922    }
2923    T service = ReflectionUtils.newInstance(clazz, conf);
2924    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2925    return service;
2926  }
2927
2928  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
2929    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2930    if (!this.isOnline()) {
2931      return walGroupsReplicationStatus;
2932    }
2933    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2934    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2935    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2936    for (ReplicationSourceInterface source : allSources) {
2937      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2938    }
2939    return walGroupsReplicationStatus;
2940  }
2941
2942  /**
2943   * Utility for constructing an instance of the passed HRegionServer class.
2944   */
2945  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
2946    final Configuration conf) {
2947    try {
2948      Constructor<? extends HRegionServer> c =
2949        regionServerClass.getConstructor(Configuration.class);
2950      return c.newInstance(conf);
2951    } catch (Exception e) {
2952      throw new RuntimeException(
2953        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
2954    }
2955  }
2956
2957  /**
2958   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2959   */
2960  public static void main(String[] args) {
2961    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2962    VersionInfo.logVersion();
2963    Configuration conf = HBaseConfiguration.create();
2964    @SuppressWarnings("unchecked")
2965    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2966      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2967
2968    new HRegionServerCommandLine(regionServerClass).doMain(args);
2969  }
2970
2971  /**
2972   * Gets the online regions of the specified table. This method looks at the in-memory
2973   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
2974   * If a region on this table has been closed during a disable, etc., it will not be included in
2975   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
2976   * all the ONLINE regions in the table.
2977   * @param tableName table to limit the scope of the query
2978   * @return Online regions from <code>tableName</code>
2979   */
2980  @Override
2981  public List<HRegion> getRegions(TableName tableName) {
2982    List<HRegion> tableRegions = new ArrayList<>();
2983    synchronized (this.onlineRegions) {
2984      for (HRegion region : this.onlineRegions.values()) {
2985        RegionInfo regionInfo = region.getRegionInfo();
2986        if (regionInfo.getTable().equals(tableName)) {
2987          tableRegions.add(region);
2988        }
2989      }
2990    }
2991    return tableRegions;
2992  }
2993
2994  @Override
2995  public List<HRegion> getRegions() {
2996    List<HRegion> allRegions;
2997    synchronized (this.onlineRegions) {
2998      // Return a clone copy of the onlineRegions
2999      allRegions = new ArrayList<>(onlineRegions.values());
3000    }
3001    return allRegions;
3002  }
3003
3004  /**
3005   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
3006   * @return all the online tables in this RS
3007   */
3008  public Set<TableName> getOnlineTables() {
3009    Set<TableName> tables = new HashSet<>();
3010    synchronized (this.onlineRegions) {
3011      for (Region region : this.onlineRegions.values()) {
3012        tables.add(region.getTableDescriptor().getTableName());
3013      }
3014    }
3015    return tables;
3016  }
3017
3018  public String[] getRegionServerCoprocessors() {
3019    TreeSet<String> coprocessors = new TreeSet<>();
3020    try {
3021      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3022    } catch (IOException exception) {
3023      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
3024        + "skipping.");
3025      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3026    }
3027    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3028    for (HRegion region : regions) {
3029      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3030      try {
3031        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3032      } catch (IOException exception) {
3033        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
3034          + "; skipping.");
3035        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3036      }
3037    }
3038    coprocessors.addAll(rsHost.getCoprocessors());
3039    return coprocessors.toArray(new String[0]);
3040  }
3041
3042  /**
3043   * Try to close the region, logs a warning on failure but continues.
3044   * @param region Region to close
3045   */
3046  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3047    try {
3048      if (!closeRegion(region.getEncodedName(), abort, null)) {
3049        LOG
3050          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
3051      }
3052    } catch (IOException e) {
3053      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
3054        e);
3055    }
3056  }
3057
3058  /**
3059   * Close asynchronously a region, can be called from the master or internally by the regionserver
3060   * when stopping. If called from the master, the region will update the status.
3061   * <p>
3062   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3063   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3064   * </p>
3065   * <p>
3066   * If a close was in progress, this new request will be ignored, and an exception thrown.
3067   * </p>
3068   * <p>
3069   * Provides additional flag to indicate if this region blocks should be evicted from the cache.
3070   * </p>
3071   * @param encodedName Region to close
3072   * @param abort       True if we are aborting
3073   * @param destination Where the Region is being moved too... maybe null if unknown.
3074   * @return True if closed a region.
3075   * @throws NotServingRegionException if the region is not online
3076   */
3077  protected boolean closeRegion(String encodedName, final boolean abort,
3078    final ServerName destination) throws NotServingRegionException {
3079    // Check for permissions to close.
3080    HRegion actualRegion = this.getRegion(encodedName);
3081    // Can be null if we're calling close on a region that's not online
3082    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3083      try {
3084        actualRegion.getCoprocessorHost().preClose(false);
3085      } catch (IOException exp) {
3086        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3087        return false;
3088      }
3089    }
3090
3091    // previous can come back 'null' if not in map.
3092    final Boolean previous =
3093      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
3094
3095    if (Boolean.TRUE.equals(previous)) {
3096      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
3097        + "trying to OPEN. Cancelling OPENING.");
3098      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3099        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3100        // We're going to try to do a standard close then.
3101        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
3102          + " Doing a standard close now");
3103        return closeRegion(encodedName, abort, destination);
3104      }
3105      // Let's get the region from the online region list again
3106      actualRegion = this.getRegion(encodedName);
3107      if (actualRegion == null) { // If already online, we still need to close it.
3108        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3109        // The master deletes the znode when it receives this exception.
3110        throw new NotServingRegionException(
3111          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
3112      }
3113    } else if (previous == null) {
3114      LOG.info("Received CLOSE for {}", encodedName);
3115    } else if (Boolean.FALSE.equals(previous)) {
3116      LOG.info("Received CLOSE for the region: " + encodedName
3117        + ", which we are already trying to CLOSE, but not completed yet");
3118      return true;
3119    }
3120
3121    if (actualRegion == null) {
3122      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3123      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3124      // The master deletes the znode when it receives this exception.
3125      throw new NotServingRegionException(
3126        "The region " + encodedName + " is not online, and is not opening.");
3127    }
3128
3129    CloseRegionHandler crh;
3130    final RegionInfo hri = actualRegion.getRegionInfo();
3131    if (hri.isMetaRegion()) {
3132      crh = new CloseMetaHandler(this, this, hri, abort);
3133    } else {
3134      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3135    }
3136    this.executorService.submit(crh);
3137    return true;
3138  }
3139
3140  /**
3141   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
3142   *         member of the online regions.
3143   */
3144  public HRegion getOnlineRegion(final byte[] regionName) {
3145    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3146    return this.onlineRegions.get(encodedRegionName);
3147  }
3148
3149  @Override
3150  public HRegion getRegion(final String encodedRegionName) {
3151    return this.onlineRegions.get(encodedRegionName);
3152  }
3153
3154  @Override
3155  public boolean removeRegion(final HRegion r, ServerName destination) {
3156    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3157    if (DataTieringManager.getInstance() != null) {
3158      DataTieringManager.getInstance().getRegionColdDataSize()
3159        .remove(r.getRegionInfo().getEncodedName());
3160    }
3161    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3162    if (destination != null) {
3163      long closeSeqNum = r.getMaxFlushedSeqId();
3164      if (closeSeqNum == HConstants.NO_SEQNUM) {
3165        // No edits in WAL for this region; get the sequence number when the region was opened.
3166        closeSeqNum = r.getOpenSeqNum();
3167        if (closeSeqNum == HConstants.NO_SEQNUM) {
3168          closeSeqNum = 0;
3169        }
3170      }
3171      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3172      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3173      if (selfMove) {
3174        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3175          r.getRegionInfo().getEncodedName(),
3176          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3177      }
3178    }
3179    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3180    configurationManager.deregisterObserver(r);
3181    return toReturn != null;
3182  }
3183
3184  /**
3185   * Protected Utility method for safely obtaining an HRegion handle.
3186   * @param regionName Name of online {@link HRegion} to return
3187   * @return {@link HRegion} for <code>regionName</code>
3188   */
3189  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3190    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3191    return getRegionByEncodedName(regionName, encodedRegionName);
3192  }
3193
3194  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3195    return getRegionByEncodedName(null, encodedRegionName);
3196  }
3197
3198  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3199    throws NotServingRegionException {
3200    HRegion region = this.onlineRegions.get(encodedRegionName);
3201    if (region == null) {
3202      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3203      if (moveInfo != null) {
3204        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3205      }
3206      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3207      String regionNameStr =
3208        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3209      if (isOpening != null && isOpening) {
3210        throw new RegionOpeningException(
3211          "Region " + regionNameStr + " is opening on " + this.serverName);
3212      }
3213      throw new NotServingRegionException(
3214        "" + regionNameStr + " is not online on " + this.serverName);
3215    }
3216    return region;
3217  }
3218
3219  /**
3220   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3221   * already.
3222   * @param t   Throwable
3223   * @param msg Message to log in error. Can be null.
3224   * @return Throwable converted to an IOE; methods can only let out IOEs.
3225   */
3226  private Throwable cleanup(final Throwable t, final String msg) {
3227    // Don't log as error if NSRE; NSRE is 'normal' operation.
3228    if (t instanceof NotServingRegionException) {
3229      LOG.debug("NotServingRegionException; " + t.getMessage());
3230      return t;
3231    }
3232    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3233    if (msg == null) {
3234      LOG.error("", e);
3235    } else {
3236      LOG.error(msg, e);
3237    }
3238    if (!rpcServices.checkOOME(t)) {
3239      checkFileSystem();
3240    }
3241    return t;
3242  }
3243
3244  /**
3245   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3246   * @return Make <code>t</code> an IOE if it isn't already.
3247   */
3248  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3249    return (t instanceof IOException ? (IOException) t
3250      : msg == null || msg.length() == 0 ? new IOException(t)
3251      : new IOException(msg, t));
3252  }
3253
3254  /**
3255   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3256   * stopRequested
3257   * @return false if file system is not available
3258   */
3259  boolean checkFileSystem() {
3260    if (this.dataFsOk && this.dataFs != null) {
3261      try {
3262        FSUtils.checkFileSystemAvailable(this.dataFs);
3263      } catch (IOException e) {
3264        abort("File System not available", e);
3265        this.dataFsOk = false;
3266      }
3267    }
3268    return this.dataFsOk;
3269  }
3270
3271  @Override
3272  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3273    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3274    Address[] addr = new Address[favoredNodes.size()];
3275    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3276    // it is a map of region name to Address[]
3277    for (int i = 0; i < favoredNodes.size(); i++) {
3278      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3279    }
3280    regionFavoredNodesMap.put(encodedRegionName, addr);
3281  }
3282
3283  /**
3284   * Return the favored nodes for a region given its encoded name. Look at the comment around
3285   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3286   * @param encodedRegionName the encoded region name.
3287   * @return array of favored locations
3288   */
3289  @Override
3290  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3291    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3292  }
3293
3294  @Override
3295  public ServerNonceManager getNonceManager() {
3296    return this.nonceManager;
3297  }
3298
3299  private static class MovedRegionInfo {
3300    private final ServerName serverName;
3301    private final long seqNum;
3302
3303    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3304      this.serverName = serverName;
3305      this.seqNum = closeSeqNum;
3306    }
3307
3308    public ServerName getServerName() {
3309      return serverName;
3310    }
3311
3312    public long getSeqNum() {
3313      return seqNum;
3314    }
3315  }
3316
3317  /**
3318   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3319   * number of network calls instead of reducing them.
3320   */
3321  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3322
3323  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3324    boolean selfMove) {
3325    if (selfMove) {
3326      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3327      return;
3328    }
3329    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3330      + closeSeqNum);
3331    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3332  }
3333
3334  void removeFromMovedRegions(String encodedName) {
3335    movedRegionInfoCache.invalidate(encodedName);
3336  }
3337
3338  @InterfaceAudience.Private
3339  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3340    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3341  }
3342
3343  @InterfaceAudience.Private
3344  public int movedRegionCacheExpiredTime() {
3345    return TIMEOUT_REGION_MOVED;
3346  }
3347
3348  private String getMyEphemeralNodePath() {
3349    return zooKeeper.getZNodePaths().getRsPath(serverName);
3350  }
3351
3352  private boolean isHealthCheckerConfigured() {
3353    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3354    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3355  }
3356
3357  /** Returns the underlying {@link CompactSplit} for the servers */
3358  public CompactSplit getCompactSplitThread() {
3359    return this.compactSplitThread;
3360  }
3361
3362  CoprocessorServiceResponse execRegionServerService(
3363    @SuppressWarnings("UnusedParameters") final RpcController controller,
3364    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3365    try {
3366      ServerRpcController serviceController = new ServerRpcController();
3367      CoprocessorServiceCall call = serviceRequest.getCall();
3368      String serviceName = call.getServiceName();
3369      Service service = coprocessorServiceHandlers.get(serviceName);
3370      if (service == null) {
3371        throw new UnknownProtocolException(null,
3372          "No registered coprocessor executorService found for " + serviceName);
3373      }
3374      ServiceDescriptor serviceDesc = service.getDescriptorForType();
3375
3376      String methodName = call.getMethodName();
3377      MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3378      if (methodDesc == null) {
3379        throw new UnknownProtocolException(service.getClass(),
3380          "Unknown method " + methodName + " called on executorService " + serviceName);
3381      }
3382
3383      Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3384      final Message.Builder responseBuilder =
3385        service.getResponsePrototype(methodDesc).newBuilderForType();
3386      service.callMethod(methodDesc, serviceController, request, message -> {
3387        if (message != null) {
3388          responseBuilder.mergeFrom(message);
3389        }
3390      });
3391      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3392      if (exception != null) {
3393        throw exception;
3394      }
3395      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3396    } catch (IOException ie) {
3397      throw new ServiceException(ie);
3398    }
3399  }
3400
3401  /**
3402   * May be null if this is a master which not carry table.
3403   * @return The block cache instance used by the regionserver.
3404   */
3405  @Override
3406  public Optional<BlockCache> getBlockCache() {
3407    return Optional.ofNullable(this.blockCache);
3408  }
3409
3410  /**
3411   * May be null if this is a master which not carry table.
3412   * @return The cache for mob files used by the regionserver.
3413   */
3414  @Override
3415  public Optional<MobFileCache> getMobFileCache() {
3416    return Optional.ofNullable(this.mobFileCache);
3417  }
3418
3419  CacheEvictionStats clearRegionBlockCache(Region region) {
3420    long evictedBlocks = 0;
3421
3422    for (Store store : region.getStores()) {
3423      for (StoreFile hFile : store.getStorefiles()) {
3424        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3425      }
3426    }
3427
3428    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3429  }
3430
3431  @Override
3432  public double getCompactionPressure() {
3433    double max = 0;
3434    for (Region region : onlineRegions.values()) {
3435      for (Store store : region.getStores()) {
3436        double normCount = store.getCompactionPressure();
3437        if (normCount > max) {
3438          max = normCount;
3439        }
3440      }
3441    }
3442    return max;
3443  }
3444
3445  @Override
3446  public HeapMemoryManager getHeapMemoryManager() {
3447    return hMemManager;
3448  }
3449
3450  public MemStoreFlusher getMemStoreFlusher() {
3451    return cacheFlusher;
3452  }
3453
3454  /**
3455   * For testing
3456   * @return whether all wal roll request finished for this regionserver
3457   */
3458  @InterfaceAudience.Private
3459  public boolean walRollRequestFinished() {
3460    return this.walRoller.walRollFinished();
3461  }
3462
3463  @Override
3464  public ThroughputController getFlushThroughputController() {
3465    return flushThroughputController;
3466  }
3467
3468  @Override
3469  public double getFlushPressure() {
3470    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3471      // return 0 during RS initialization
3472      return 0.0;
3473    }
3474    return getRegionServerAccounting().getFlushPressure();
3475  }
3476
3477  @Override
3478  public void onConfigurationChange(Configuration newConf) {
3479    ThroughputController old = this.flushThroughputController;
3480    if (old != null) {
3481      old.stop("configuration change");
3482    }
3483    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3484    try {
3485      Superusers.initialize(newConf);
3486    } catch (IOException e) {
3487      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3488    }
3489
3490    // update region server coprocessor if the configuration has changed.
3491    if (
3492      CoprocessorConfigurationUtil.checkConfigurationChange(this.rsHost, newConf,
3493        CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)
3494    ) {
3495      LOG.info("Update region server coprocessors because the configuration has changed");
3496      this.rsHost = new RegionServerCoprocessorHost(this, newConf);
3497    }
3498  }
3499
3500  @Override
3501  public MetricsRegionServer getMetrics() {
3502    return metricsRegionServer;
3503  }
3504
3505  @Override
3506  public SecureBulkLoadManager getSecureBulkLoadManager() {
3507    return this.secureBulkLoadManager;
3508  }
3509
3510  @Override
3511  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3512    final Abortable abort) {
3513    final LockServiceClient client =
3514      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3515    return client.regionLock(regionInfo, description, abort);
3516  }
3517
3518  @Override
3519  public void unassign(byte[] regionName) throws IOException {
3520    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3521  }
3522
3523  @Override
3524  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3525    return this.rsSpaceQuotaManager;
3526  }
3527
3528  @Override
3529  public boolean reportFileArchivalForQuotas(TableName tableName,
3530    Collection<Entry<String, Long>> archivedFiles) {
3531    if (TEST_SKIP_REPORTING_TRANSITION) {
3532      return false;
3533    }
3534    RegionServerStatusService.BlockingInterface rss = rssStub;
3535    if (rss == null || rsSpaceQuotaManager == null) {
3536      // the current server could be stopping.
3537      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3538      return false;
3539    }
3540    try {
3541      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3542        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3543      rss.reportFileArchival(null, request);
3544    } catch (ServiceException se) {
3545      IOException ioe = ProtobufUtil.getRemoteException(se);
3546      if (ioe instanceof PleaseHoldException) {
3547        if (LOG.isTraceEnabled()) {
3548          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3549            + " This will be retried.", ioe);
3550        }
3551        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3552        return false;
3553      }
3554      if (rssStub == rss) {
3555        rssStub = null;
3556      }
3557      // re-create the stub if we failed to report the archival
3558      createRegionServerStatusStub(true);
3559      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3560      return false;
3561    }
3562    return true;
3563  }
3564
3565  void executeProcedure(long procId, long initiatingMasterActiveTime,
3566    RSProcedureCallable callable) {
3567    executorService
3568      .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable));
3569  }
3570
3571  public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, Throwable error,
3572    byte[] procResultData) {
3573    procedureResultReporter.complete(procId, initiatingMasterActiveTime, error, procResultData);
3574  }
3575
3576  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3577    RegionServerStatusService.BlockingInterface rss;
3578    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3579    for (;;) {
3580      rss = rssStub;
3581      if (rss != null) {
3582        break;
3583      }
3584      createRegionServerStatusStub();
3585    }
3586    try {
3587      rss.reportProcedureDone(null, request);
3588    } catch (ServiceException se) {
3589      if (rssStub == rss) {
3590        rssStub = null;
3591      }
3592      throw ProtobufUtil.getRemoteException(se);
3593    }
3594  }
3595
3596  /**
3597   * Will ignore the open/close region procedures which already submitted or executed. When master
3598   * had unfinished open/close region procedure and restarted, new active master may send duplicate
3599   * open/close region request to regionserver. The open/close request is submitted to a thread pool
3600   * and execute. So first need a cache for submitted open/close region procedures. After the
3601   * open/close region request executed and report region transition succeed, cache it in executed
3602   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3603   * transition succeed, master will not send the open/close region request to regionserver again.
3604   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3605   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
3606   * HBASE-22404 for more details.
3607   * @param procId the id of the open/close region procedure
3608   * @return true if the procedure can be submitted.
3609   */
3610  boolean submitRegionProcedure(long procId) {
3611    if (procId == -1) {
3612      return true;
3613    }
3614    // Ignore the region procedures which already submitted.
3615    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3616    if (previous != null) {
3617      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3618      return false;
3619    }
3620    // Ignore the region procedures which already executed.
3621    if (executedRegionProcedures.getIfPresent(procId) != null) {
3622      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3623      return false;
3624    }
3625    return true;
3626  }
3627
3628  /**
3629   * See {@link #submitRegionProcedure(long)}.
3630   * @param procId the id of the open/close region procedure
3631   */
3632  public void finishRegionProcedure(long procId) {
3633    executedRegionProcedures.put(procId, procId);
3634    submittedRegionProcedures.remove(procId);
3635  }
3636
3637  /**
3638   * Force to terminate region server when abort timeout.
3639   */
3640  private static class SystemExitWhenAbortTimeout extends TimerTask {
3641
3642    public SystemExitWhenAbortTimeout() {
3643    }
3644
3645    @Override
3646    public void run() {
3647      LOG.warn("Aborting region server timed out, terminating forcibly"
3648        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
3649        + " Thread dump to stdout.");
3650      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3651      Runtime.getRuntime().halt(1);
3652    }
3653  }
3654
3655  @InterfaceAudience.Private
3656  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3657    return compactedFileDischarger;
3658  }
3659
3660  /**
3661   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3662   * @return pause time
3663   */
3664  @InterfaceAudience.Private
3665  public long getRetryPauseTime() {
3666    return this.retryPauseTime;
3667  }
3668
3669  @Override
3670  public Optional<ServerName> getActiveMaster() {
3671    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3672  }
3673
3674  @Override
3675  public List<ServerName> getBackupMasters() {
3676    return masterAddressTracker.getBackupMasters();
3677  }
3678
3679  @Override
3680  public Iterator<ServerName> getBootstrapNodes() {
3681    return bootstrapNodeManager.getBootstrapNodes().iterator();
3682  }
3683
3684  @Override
3685  public List<HRegionLocation> getMetaLocations() {
3686    return metaRegionLocationCache.getMetaRegionLocations();
3687  }
3688
3689  @Override
3690  protected NamedQueueRecorder createNamedQueueRecord() {
3691    return NamedQueueRecorder.getInstance(conf);
3692  }
3693
3694  @Override
3695  protected boolean clusterMode() {
3696    // this method will be called in the constructor of super class, so we can not return masterless
3697    // directly here, as it will always be false.
3698    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3699  }
3700
3701  @InterfaceAudience.Private
3702  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
3703    return brokenStoreFileCleaner;
3704  }
3705
3706  @InterfaceAudience.Private
3707  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
3708    return rsMobFileCleanerChore;
3709  }
3710
3711  RSSnapshotVerifier getRsSnapshotVerifier() {
3712    return rsSnapshotVerifier;
3713  }
3714
3715  @Override
3716  protected void stopChores() {
3717    shutdownChore(nonceManagerChore);
3718    shutdownChore(compactionChecker);
3719    shutdownChore(compactedFileDischarger);
3720    shutdownChore(periodicFlusher);
3721    shutdownChore(healthCheckChore);
3722    shutdownChore(executorStatusChore);
3723    shutdownChore(storefileRefresher);
3724    shutdownChore(fsUtilizationChore);
3725    shutdownChore(namedQueueServiceChore);
3726    shutdownChore(brokenStoreFileCleaner);
3727    shutdownChore(rsMobFileCleanerChore);
3728    shutdownChore(replicationMarkerChore);
3729  }
3730
3731  @Override
3732  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3733    return regionReplicationBufferManager;
3734  }
3735}