001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
024import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
025import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
026import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
027import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
028import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
029import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
030import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
033import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
034
035import io.opentelemetry.api.trace.Span;
036import io.opentelemetry.api.trace.StatusCode;
037import io.opentelemetry.context.Scope;
038import java.io.IOException;
039import java.io.PrintWriter;
040import java.lang.management.MemoryUsage;
041import java.lang.reflect.Constructor;
042import java.net.InetSocketAddress;
043import java.time.Duration;
044import java.util.ArrayList;
045import java.util.Collection;
046import java.util.Collections;
047import java.util.Comparator;
048import java.util.HashSet;
049import java.util.Iterator;
050import java.util.List;
051import java.util.Map;
052import java.util.Map.Entry;
053import java.util.Objects;
054import java.util.Optional;
055import java.util.Set;
056import java.util.SortedMap;
057import java.util.Timer;
058import java.util.TimerTask;
059import java.util.TreeMap;
060import java.util.TreeSet;
061import java.util.concurrent.ConcurrentHashMap;
062import java.util.concurrent.ConcurrentMap;
063import java.util.concurrent.ConcurrentSkipListMap;
064import java.util.concurrent.ThreadLocalRandom;
065import java.util.concurrent.TimeUnit;
066import java.util.concurrent.atomic.AtomicBoolean;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.stream.Collectors;
069import javax.management.MalformedObjectNameException;
070import javax.servlet.http.HttpServlet;
071import org.apache.commons.lang3.StringUtils;
072import org.apache.hadoop.conf.Configuration;
073import org.apache.hadoop.fs.FileSystem;
074import org.apache.hadoop.fs.Path;
075import org.apache.hadoop.hbase.Abortable;
076import org.apache.hadoop.hbase.CacheEvictionStats;
077import org.apache.hadoop.hbase.CallQueueTooBigException;
078import org.apache.hadoop.hbase.ClockOutOfSyncException;
079import org.apache.hadoop.hbase.DoNotRetryIOException;
080import org.apache.hadoop.hbase.ExecutorStatusChore;
081import org.apache.hadoop.hbase.HBaseConfiguration;
082import org.apache.hadoop.hbase.HBaseInterfaceAudience;
083import org.apache.hadoop.hbase.HBaseServerBase;
084import org.apache.hadoop.hbase.HConstants;
085import org.apache.hadoop.hbase.HDFSBlocksDistribution;
086import org.apache.hadoop.hbase.HRegionLocation;
087import org.apache.hadoop.hbase.HealthCheckChore;
088import org.apache.hadoop.hbase.MetaTableAccessor;
089import org.apache.hadoop.hbase.NotServingRegionException;
090import org.apache.hadoop.hbase.PleaseHoldException;
091import org.apache.hadoop.hbase.ScheduledChore;
092import org.apache.hadoop.hbase.ServerName;
093import org.apache.hadoop.hbase.Stoppable;
094import org.apache.hadoop.hbase.TableName;
095import org.apache.hadoop.hbase.YouAreDeadException;
096import org.apache.hadoop.hbase.ZNodeClearer;
097import org.apache.hadoop.hbase.client.ConnectionUtils;
098import org.apache.hadoop.hbase.client.RegionInfo;
099import org.apache.hadoop.hbase.client.RegionInfoBuilder;
100import org.apache.hadoop.hbase.client.locking.EntityLock;
101import org.apache.hadoop.hbase.client.locking.LockServiceClient;
102import org.apache.hadoop.hbase.conf.ConfigurationObserver;
103import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
104import org.apache.hadoop.hbase.exceptions.RegionMovedException;
105import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
106import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
107import org.apache.hadoop.hbase.executor.ExecutorType;
108import org.apache.hadoop.hbase.http.InfoServer;
109import org.apache.hadoop.hbase.io.hfile.BlockCache;
110import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
111import org.apache.hadoop.hbase.io.hfile.HFile;
112import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
113import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
114import org.apache.hadoop.hbase.ipc.RpcClient;
115import org.apache.hadoop.hbase.ipc.RpcServer;
116import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
117import org.apache.hadoop.hbase.ipc.ServerRpcController;
118import org.apache.hadoop.hbase.log.HBaseMarkers;
119import org.apache.hadoop.hbase.mob.MobFileCache;
120import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
121import org.apache.hadoop.hbase.monitoring.TaskMonitor;
122import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
123import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
124import org.apache.hadoop.hbase.net.Address;
125import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
126import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
127import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
128import org.apache.hadoop.hbase.quotas.QuotaUtil;
129import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
130import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
131import org.apache.hadoop.hbase.quotas.RegionSize;
132import org.apache.hadoop.hbase.quotas.RegionSizeStore;
133import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
134import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
135import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
136import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
137import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
138import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
139import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
140import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
141import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
142import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
143import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
144import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
145import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
146import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
147import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
148import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
149import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
150import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
151import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
152import org.apache.hadoop.hbase.security.SecurityConstants;
153import org.apache.hadoop.hbase.security.Superusers;
154import org.apache.hadoop.hbase.security.User;
155import org.apache.hadoop.hbase.security.UserProvider;
156import org.apache.hadoop.hbase.trace.TraceUtil;
157import org.apache.hadoop.hbase.util.Bytes;
158import org.apache.hadoop.hbase.util.CompressionTest;
159import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
160import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
161import org.apache.hadoop.hbase.util.FSUtils;
162import org.apache.hadoop.hbase.util.FutureUtils;
163import org.apache.hadoop.hbase.util.JvmPauseMonitor;
164import org.apache.hadoop.hbase.util.Pair;
165import org.apache.hadoop.hbase.util.RetryCounter;
166import org.apache.hadoop.hbase.util.RetryCounterFactory;
167import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
168import org.apache.hadoop.hbase.util.Threads;
169import org.apache.hadoop.hbase.util.VersionInfo;
170import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
171import org.apache.hadoop.hbase.wal.WAL;
172import org.apache.hadoop.hbase.wal.WALFactory;
173import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
174import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
175import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
176import org.apache.hadoop.hbase.zookeeper.ZKUtil;
177import org.apache.hadoop.ipc.RemoteException;
178import org.apache.hadoop.util.ReflectionUtils;
179import org.apache.yetus.audience.InterfaceAudience;
180import org.apache.zookeeper.KeeperException;
181import org.slf4j.Logger;
182import org.slf4j.LoggerFactory;
183
184import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
185import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
186import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
187import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
188import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
189import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
190import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
191import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
192import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
193import org.apache.hbase.thirdparty.com.google.protobuf.Message;
194import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
195import org.apache.hbase.thirdparty.com.google.protobuf.Service;
196import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
197import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
198import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
199
200import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
201import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
230
231/**
232 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
233 * are many HRegionServers in a single HBase deployment.
234 */
235@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
236@SuppressWarnings({ "deprecation" })
237public class HRegionServer extends HBaseServerBase<RSRpcServices>
238  implements RegionServerServices, LastSequenceId {
239
240  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
241
242  /**
243   * For testing only! Set to true to skip notifying region assignment to master .
244   */
245  @InterfaceAudience.Private
246  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
247  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
248
249  /**
250   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
251   * region action in progress false - if close region action in progress
252   */
253  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
254    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
255
256  /**
257   * Used to cache the open/close region procedures which already submitted. See
258   * {@link #submitRegionProcedure(long)}.
259   */
260  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
261  /**
262   * Used to cache the open/close region procedures which already executed. See
263   * {@link #submitRegionProcedure(long)}.
264   */
265  private final Cache<Long, Long> executedRegionProcedures =
266    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
267
268  /**
269   * Used to cache the moved-out regions
270   */
271  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
272    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
273
274  private MemStoreFlusher cacheFlusher;
275
276  private HeapMemoryManager hMemManager;
277
278  // Replication services. If no replication, this handler will be null.
279  private ReplicationSourceService replicationSourceHandler;
280  private ReplicationSinkService replicationSinkHandler;
281  private boolean sameReplicationSourceAndSink;
282
283  // Compactions
284  private CompactSplit compactSplitThread;
285
286  /**
287   * Map of regions currently being served by this region server. Key is the encoded region name.
288   * All access should be synchronized.
289   */
290  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
291  /**
292   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
293   * need to be a ConcurrentHashMap?
294   */
295  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
296
297  /**
298   * Map of encoded region names to the DataNode locations they should be hosted on We store the
299   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
300   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
301   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
302   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
303   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
304   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
305   * will be fresh when we need it.
306   */
307  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
308
309  private LeaseManager leaseManager;
310
311  private volatile boolean dataFsOk;
312
313  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
314  // Default abort timeout is 1200 seconds for safe
315  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
316  // Will run this task when abort timeout
317  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
318
319  // A state before we go into stopped state. At this stage we're closing user
320  // space regions.
321  private boolean stopping = false;
322  private volatile boolean killed = false;
323
324  private final int threadWakeFrequency;
325
326  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
327  private final int compactionCheckFrequency;
328  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
329  private final int flushCheckFrequency;
330
331  // Stub to do region server status calls against the master.
332  private volatile RegionServerStatusService.BlockingInterface rssStub;
333  private volatile LockService.BlockingInterface lockStub;
334  // RPC client. Used to make the stub above that does region server status checking.
335  private RpcClient rpcClient;
336
337  private UncaughtExceptionHandler uncaughtExceptionHandler;
338
339  private JvmPauseMonitor pauseMonitor;
340
341  private RSSnapshotVerifier rsSnapshotVerifier;
342
343  /** region server process name */
344  public static final String REGIONSERVER = "regionserver";
345
346  private MetricsRegionServer metricsRegionServer;
347  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
348
349  /**
350   * Check for compactions requests.
351   */
352  private ScheduledChore compactionChecker;
353
354  /**
355   * Check for flushes
356   */
357  private ScheduledChore periodicFlusher;
358
359  private volatile WALFactory walFactory;
360
361  private LogRoller walRoller;
362
363  // A thread which calls reportProcedureDone
364  private RemoteProcedureResultReporter procedureResultReporter;
365
366  // flag set after we're done setting up server threads
367  final AtomicBoolean online = new AtomicBoolean(false);
368
369  // master address tracker
370  private final MasterAddressTracker masterAddressTracker;
371
372  // Log Splitting Worker
373  private SplitLogWorker splitLogWorker;
374
375  private final int shortOperationTimeout;
376
377  // Time to pause if master says 'please hold'
378  private final long retryPauseTime;
379
380  private final RegionServerAccounting regionServerAccounting;
381
382  private NamedQueueServiceChore namedQueueServiceChore = null;
383
384  // Block cache
385  private BlockCache blockCache;
386  // The cache for mob files
387  private MobFileCache mobFileCache;
388
389  /** The health check chore. */
390  private HealthCheckChore healthCheckChore;
391
392  /** The Executor status collect chore. */
393  private ExecutorStatusChore executorStatusChore;
394
395  /** The nonce manager chore. */
396  private ScheduledChore nonceManagerChore;
397
398  private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
399
400  /**
401   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
402   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
403   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
404   */
405  @Deprecated
406  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
407  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
408    "hbase.regionserver.hostname.disable.master.reversedns";
409
410  /**
411   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
412   * Exception will be thrown if both are used.
413   */
414  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
415  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
416    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
417
418  /**
419   * Unique identifier for the cluster we are a part of.
420   */
421  private String clusterId;
422
423  // chore for refreshing store files for secondary regions
424  private StorefileRefresherChore storefileRefresher;
425
426  private volatile RegionServerCoprocessorHost rsHost;
427
428  private RegionServerProcedureManagerHost rspmHost;
429
430  private RegionServerRpcQuotaManager rsQuotaManager;
431  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
432
433  /**
434   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
435   * case where client doesn't receive the response from a successful operation and retries. We
436   * track the successful ops for some time via a nonce sent by client and handle duplicate
437   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
438   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
439   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
440   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
441   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
442   * recovery during normal region move, so nonces will not be transfered. We can have separate
443   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
444   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
445   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
446   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
447   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
448   * recovered during move.
449   */
450  final ServerNonceManager nonceManager;
451
452  private BrokenStoreFileCleaner brokenStoreFileCleaner;
453
454  private RSMobFileCleanerChore rsMobFileCleanerChore;
455
456  @InterfaceAudience.Private
457  CompactedHFilesDischarger compactedFileDischarger;
458
459  private volatile ThroughputController flushThroughputController;
460
461  private SecureBulkLoadManager secureBulkLoadManager;
462
463  private FileSystemUtilizationChore fsUtilizationChore;
464
465  private BootstrapNodeManager bootstrapNodeManager;
466
467  /**
468   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
469   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
470   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
471   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
472   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
473   */
474  private final boolean masterless;
475  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
476
477  /** regionserver codec list **/
478  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
479
480  // A timer to shutdown the process if abort takes too long
481  private Timer abortMonitor;
482
483  private RegionReplicationBufferManager regionReplicationBufferManager;
484
485  /*
486   * Chore that creates replication marker rows.
487   */
488  private ReplicationMarkerChore replicationMarkerChore;
489
490  /**
491   * Starts a HRegionServer at the default location.
492   * <p/>
493   * Don't start any services or managers in here in the Constructor. Defer till after we register
494   * with the Master as much as possible. See {@link #startServices}.
495   */
496  public HRegionServer(final Configuration conf) throws IOException {
497    super(conf, "RegionServer"); // thread name
498    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
499    try (Scope ignored = span.makeCurrent()) {
500      this.dataFsOk = true;
501      this.masterless = !clusterMode();
502      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
503      HFile.checkHFileVersion(this.conf);
504      checkCodecs(this.conf);
505      FSUtils.setupShortCircuitRead(this.conf);
506
507      // Disable usage of meta replicas in the regionserver
508      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
509      // Config'ed params
510      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
511      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
512      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
513
514      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
515      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
516
517      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
518        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
519
520      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
521        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
522
523      regionServerAccounting = new RegionServerAccounting(conf);
524
525      blockCache = BlockCacheFactory.createBlockCache(conf);
526      mobFileCache = new MobFileCache(conf);
527
528      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
529
530      uncaughtExceptionHandler =
531        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
532
533      // If no master in cluster, skip trying to track one or look for a cluster status.
534      if (!this.masterless) {
535        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
536        masterAddressTracker.start();
537      } else {
538        masterAddressTracker = null;
539      }
540      this.rpcServices.start(zooKeeper);
541      span.setStatus(StatusCode.OK);
542    } catch (Throwable t) {
543      // Make sure we log the exception. HRegionServer is often started via reflection and the
544      // cause of failed startup is lost.
545      TraceUtil.setError(span, t);
546      LOG.error("Failed construction RegionServer", t);
547      throw t;
548    } finally {
549      span.end();
550    }
551  }
552
553  // HMaster should override this method to load the specific config for master
554  @Override
555  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
556    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
557    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
558      if (!StringUtils.isBlank(hostname)) {
559        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
560          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
561          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
562          + UNSAFE_RS_HOSTNAME_KEY + " is used";
563        throw new IOException(msg);
564      } else {
565        return rpcServices.getSocketAddress().getHostName();
566      }
567    } else {
568      return hostname;
569    }
570  }
571
572  @Override
573  protected void login(UserProvider user, String host) throws IOException {
574    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
575      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
576  }
577
578  @Override
579  protected String getProcessName() {
580    return REGIONSERVER;
581  }
582
583  @Override
584  protected boolean canCreateBaseZNode() {
585    return !clusterMode();
586  }
587
588  @Override
589  protected boolean canUpdateTableDescriptor() {
590    return false;
591  }
592
593  @Override
594  protected boolean cacheTableDescriptor() {
595    return false;
596  }
597
598  protected RSRpcServices createRpcServices() throws IOException {
599    return new RSRpcServices(this);
600  }
601
602  @Override
603  protected void configureInfoServer(InfoServer infoServer) {
604    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
605    infoServer.setAttribute(REGIONSERVER, this);
606  }
607
608  @Override
609  protected Class<? extends HttpServlet> getDumpServlet() {
610    return RSDumpServlet.class;
611  }
612
613  /**
614   * Used by {@link RSDumpServlet} to generate debugging information.
615   */
616  public void dumpRowLocks(final PrintWriter out) {
617    StringBuilder sb = new StringBuilder();
618    for (HRegion region : getRegions()) {
619      if (region.getLockedRows().size() > 0) {
620        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
621          sb.setLength(0);
622          sb.append(region.getTableDescriptor().getTableName()).append(",")
623            .append(region.getRegionInfo().getEncodedName()).append(",");
624          sb.append(rowLockContext.toString());
625          out.println(sb);
626        }
627      }
628    }
629  }
630
631  @Override
632  public boolean registerService(Service instance) {
633    // No stacking of instances is allowed for a single executorService name
634    ServiceDescriptor serviceDesc = instance.getDescriptorForType();
635    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
636    if (coprocessorServiceHandlers.containsKey(serviceName)) {
637      LOG.error("Coprocessor executorService " + serviceName
638        + " already registered, rejecting request from " + instance);
639      return false;
640    }
641
642    coprocessorServiceHandlers.put(serviceName, instance);
643    if (LOG.isDebugEnabled()) {
644      LOG.debug(
645        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
646    }
647    return true;
648  }
649
650  /**
651   * Run test on configured codecs to make sure supporting libs are in place.
652   */
653  private static void checkCodecs(final Configuration c) throws IOException {
654    // check to see if the codec list is available:
655    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
656    if (codecs == null) {
657      return;
658    }
659    for (String codec : codecs) {
660      if (!CompressionTest.testCompression(codec)) {
661        throw new IOException(
662          "Compression codec " + codec + " not supported, aborting RS construction");
663      }
664    }
665  }
666
667  public String getClusterId() {
668    return this.clusterId;
669  }
670
671  /**
672   * All initialization needed before we go register with Master.<br>
673   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
674   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
675   */
676  private void preRegistrationInitialization() {
677    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
678    try (Scope ignored = span.makeCurrent()) {
679      initializeZooKeeper();
680      setupClusterConnection();
681      bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
682      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
683      // Setup RPC client for master communication
684      this.rpcClient = asyncClusterConnection.getRpcClient();
685      span.setStatus(StatusCode.OK);
686    } catch (Throwable t) {
687      // Call stop if error or process will stick around for ever since server
688      // puts up non-daemon threads.
689      TraceUtil.setError(span, t);
690      this.rpcServices.stop();
691      abort("Initialization of RS failed.  Hence aborting RS.", t);
692    } finally {
693      span.end();
694    }
695  }
696
697  /**
698   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
699   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
700   * <p>
701   * Finally open long-living server short-circuit connection.
702   */
703  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
704      justification = "cluster Id znode read would give us correct response")
705  private void initializeZooKeeper() throws IOException, InterruptedException {
706    // Nothing to do in here if no Master in the mix.
707    if (this.masterless) {
708      return;
709    }
710
711    // Create the master address tracker, register with zk, and start it. Then
712    // block until a master is available. No point in starting up if no master
713    // running.
714    blockAndCheckIfStopped(this.masterAddressTracker);
715
716    // Wait on cluster being up. Master will set this flag up in zookeeper
717    // when ready.
718    blockAndCheckIfStopped(this.clusterStatusTracker);
719
720    // If we are HMaster then the cluster id should have already been set.
721    if (clusterId == null) {
722      // Retrieve clusterId
723      // Since cluster status is now up
724      // ID should have already been set by HMaster
725      try {
726        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
727        if (clusterId == null) {
728          this.abort("Cluster ID has not been set");
729        }
730        LOG.info("ClusterId : " + clusterId);
731      } catch (KeeperException e) {
732        this.abort("Failed to retrieve Cluster ID", e);
733      }
734    }
735
736    if (isStopped() || isAborted()) {
737      return; // No need for further initialization
738    }
739
740    // watch for snapshots and other procedures
741    try {
742      rspmHost = new RegionServerProcedureManagerHost();
743      rspmHost.loadProcedures(conf);
744      rspmHost.initialize(this);
745    } catch (KeeperException e) {
746      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
747    }
748  }
749
750  /**
751   * Utilty method to wait indefinitely on a znode availability while checking if the region server
752   * is shut down
753   * @param tracker znode tracker to use
754   * @throws IOException          any IO exception, plus if the RS is stopped
755   * @throws InterruptedException if the waiting thread is interrupted
756   */
757  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
758    throws IOException, InterruptedException {
759    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
760      if (this.stopped) {
761        throw new IOException("Received the shutdown message while waiting.");
762      }
763    }
764  }
765
766  /** Returns True if the cluster is up. */
767  @Override
768  public boolean isClusterUp() {
769    return this.masterless
770      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
771  }
772
773  private void initializeReplicationMarkerChore() {
774    boolean replicationMarkerEnabled =
775      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
776    // If replication or replication marker is not enabled then return immediately.
777    if (replicationMarkerEnabled) {
778      int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
779        REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
780      replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf);
781    }
782  }
783
784  /**
785   * The HRegionServer sticks in this loop until closed.
786   */
787  @Override
788  public void run() {
789    if (isStopped()) {
790      LOG.info("Skipping run; stopped");
791      return;
792    }
793    try {
794      // Do pre-registration initializations; zookeeper, lease threads, etc.
795      preRegistrationInitialization();
796    } catch (Throwable e) {
797      abort("Fatal exception during initialization", e);
798    }
799
800    try {
801      if (!isStopped() && !isAborted()) {
802        installShutdownHook();
803        // Initialize the RegionServerCoprocessorHost now that our ephemeral
804        // node was created, in case any coprocessors want to use ZooKeeper
805        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
806
807        // Try and register with the Master; tell it we are here. Break if server is stopped or
808        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
809        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
810        // come up.
811        LOG.debug("About to register with Master.");
812        TraceUtil.trace(() -> {
813          RetryCounterFactory rcf =
814            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
815          RetryCounter rc = rcf.create();
816          while (keepLooping()) {
817            RegionServerStartupResponse w = reportForDuty();
818            if (w == null) {
819              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
820              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
821              this.sleeper.sleep(sleepTime);
822            } else {
823              handleReportForDutyResponse(w);
824              break;
825            }
826          }
827        }, "HRegionServer.registerWithMaster");
828      }
829
830      if (!isStopped() && isHealthy()) {
831        TraceUtil.trace(() -> {
832          // start the snapshot handler and other procedure handlers,
833          // since the server is ready to run
834          if (this.rspmHost != null) {
835            this.rspmHost.start();
836          }
837          // Start the Quota Manager
838          if (this.rsQuotaManager != null) {
839            rsQuotaManager.start(getRpcServer().getScheduler());
840          }
841          if (this.rsSpaceQuotaManager != null) {
842            this.rsSpaceQuotaManager.start();
843          }
844        }, "HRegionServer.startup");
845      }
846
847      // We registered with the Master. Go into run mode.
848      long lastMsg = EnvironmentEdgeManager.currentTime();
849      long oldRequestCount = -1;
850      // The main run loop.
851      while (!isStopped() && isHealthy()) {
852        if (!isClusterUp()) {
853          if (onlineRegions.isEmpty()) {
854            stop("Exiting; cluster shutdown set and not carrying any regions");
855          } else if (!this.stopping) {
856            this.stopping = true;
857            LOG.info("Closing user regions");
858            closeUserRegions(isAborted());
859          } else {
860            boolean allUserRegionsOffline = areAllUserRegionsOffline();
861            if (allUserRegionsOffline) {
862              // Set stopped if no more write requests tp meta tables
863              // since last time we went around the loop. Any open
864              // meta regions will be closed on our way out.
865              if (oldRequestCount == getWriteRequestCount()) {
866                stop("Stopped; only catalog regions remaining online");
867                break;
868              }
869              oldRequestCount = getWriteRequestCount();
870            } else {
871              // Make sure all regions have been closed -- some regions may
872              // have not got it because we were splitting at the time of
873              // the call to closeUserRegions.
874              closeUserRegions(this.abortRequested.get());
875            }
876            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
877          }
878        }
879        long now = EnvironmentEdgeManager.currentTime();
880        if ((now - lastMsg) >= msgInterval) {
881          tryRegionServerReport(lastMsg, now);
882          lastMsg = EnvironmentEdgeManager.currentTime();
883        }
884        if (!isStopped() && !isAborted()) {
885          this.sleeper.sleep();
886        }
887      } // for
888    } catch (Throwable t) {
889      if (!rpcServices.checkOOME(t)) {
890        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
891        abort(prefix + t.getMessage(), t);
892      }
893    }
894
895    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
896    try (Scope ignored = span.makeCurrent()) {
897      if (this.leaseManager != null) {
898        this.leaseManager.closeAfterLeasesExpire();
899      }
900      if (this.splitLogWorker != null) {
901        splitLogWorker.stop();
902      }
903      stopInfoServer();
904      // Send cache a shutdown.
905      if (blockCache != null) {
906        blockCache.shutdown();
907      }
908      if (mobFileCache != null) {
909        mobFileCache.shutdown();
910      }
911
912      // Send interrupts to wake up threads if sleeping so they notice shutdown.
913      // TODO: Should we check they are alive? If OOME could have exited already
914      if (this.hMemManager != null) {
915        this.hMemManager.stop();
916      }
917      if (this.cacheFlusher != null) {
918        this.cacheFlusher.interruptIfNecessary();
919      }
920      if (this.compactSplitThread != null) {
921        this.compactSplitThread.interruptIfNecessary();
922      }
923
924      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
925      if (rspmHost != null) {
926        rspmHost.stop(this.abortRequested.get() || this.killed);
927      }
928
929      if (this.killed) {
930        // Just skip out w/o closing regions. Used when testing.
931      } else if (abortRequested.get()) {
932        if (this.dataFsOk) {
933          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
934        }
935        LOG.info("aborting server " + this.serverName);
936      } else {
937        closeUserRegions(abortRequested.get());
938        LOG.info("stopping server " + this.serverName);
939      }
940      regionReplicationBufferManager.stop();
941      closeClusterConnection();
942      // Closing the compactSplit thread before closing meta regions
943      if (!this.killed && containsMetaTableRegions()) {
944        if (!abortRequested.get() || this.dataFsOk) {
945          if (this.compactSplitThread != null) {
946            this.compactSplitThread.join();
947            this.compactSplitThread = null;
948          }
949          closeMetaTableRegions(abortRequested.get());
950        }
951      }
952
953      if (!this.killed && this.dataFsOk) {
954        waitOnAllRegionsToClose(abortRequested.get());
955        LOG.info("stopping server " + this.serverName + "; all regions closed.");
956      }
957
958      // Stop the quota manager
959      if (rsQuotaManager != null) {
960        rsQuotaManager.stop();
961      }
962      if (rsSpaceQuotaManager != null) {
963        rsSpaceQuotaManager.stop();
964        rsSpaceQuotaManager = null;
965      }
966
967      // flag may be changed when closing regions throws exception.
968      if (this.dataFsOk) {
969        shutdownWAL(!abortRequested.get());
970      }
971
972      // Make sure the proxy is down.
973      if (this.rssStub != null) {
974        this.rssStub = null;
975      }
976      if (this.lockStub != null) {
977        this.lockStub = null;
978      }
979      if (this.rpcClient != null) {
980        this.rpcClient.close();
981      }
982      if (this.leaseManager != null) {
983        this.leaseManager.close();
984      }
985      if (this.pauseMonitor != null) {
986        this.pauseMonitor.stop();
987      }
988
989      if (!killed) {
990        stopServiceThreads();
991      }
992
993      if (this.rpcServices != null) {
994        this.rpcServices.stop();
995      }
996
997      try {
998        deleteMyEphemeralNode();
999      } catch (KeeperException.NoNodeException nn) {
1000        // pass
1001      } catch (KeeperException e) {
1002        LOG.warn("Failed deleting my ephemeral node", e);
1003      }
1004      // We may have failed to delete the znode at the previous step, but
1005      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1006      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1007
1008      closeZooKeeper();
1009      closeTableDescriptors();
1010      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1011      span.setStatus(StatusCode.OK);
1012    } finally {
1013      span.end();
1014    }
1015  }
1016
1017  private boolean containsMetaTableRegions() {
1018    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1019  }
1020
1021  private boolean areAllUserRegionsOffline() {
1022    if (getNumberOfOnlineRegions() > 2) {
1023      return false;
1024    }
1025    boolean allUserRegionsOffline = true;
1026    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1027      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1028        allUserRegionsOffline = false;
1029        break;
1030      }
1031    }
1032    return allUserRegionsOffline;
1033  }
1034
1035  /** Returns Current write count for all online regions. */
1036  private long getWriteRequestCount() {
1037    long writeCount = 0;
1038    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1039      writeCount += e.getValue().getWriteRequestsCount();
1040    }
1041    return writeCount;
1042  }
1043
1044  @InterfaceAudience.Private
1045  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1046    throws IOException {
1047    RegionServerStatusService.BlockingInterface rss = rssStub;
1048    if (rss == null) {
1049      // the current server could be stopping.
1050      return;
1051    }
1052    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1053    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1054    try (Scope ignored = span.makeCurrent()) {
1055      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1056      request.setServer(ProtobufUtil.toServerName(this.serverName));
1057      request.setLoad(sl);
1058      rss.regionServerReport(null, request.build());
1059      span.setStatus(StatusCode.OK);
1060    } catch (ServiceException se) {
1061      IOException ioe = ProtobufUtil.getRemoteException(se);
1062      if (ioe instanceof YouAreDeadException) {
1063        // This will be caught and handled as a fatal error in run()
1064        TraceUtil.setError(span, ioe);
1065        throw ioe;
1066      }
1067      if (rssStub == rss) {
1068        rssStub = null;
1069      }
1070      TraceUtil.setError(span, se);
1071      // Couldn't connect to the master, get location from zk and reconnect
1072      // Method blocks until new master is found or we are stopped
1073      createRegionServerStatusStub(true);
1074    } finally {
1075      span.end();
1076    }
1077  }
1078
1079  /**
1080   * Reports the given map of Regions and their size on the filesystem to the active Master.
1081   * @param regionSizeStore The store containing region sizes
1082   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1083   */
1084  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1085    RegionServerStatusService.BlockingInterface rss = rssStub;
1086    if (rss == null) {
1087      // the current server could be stopping.
1088      LOG.trace("Skipping Region size report to HMaster as stub is null");
1089      return true;
1090    }
1091    try {
1092      buildReportAndSend(rss, regionSizeStore);
1093    } catch (ServiceException se) {
1094      IOException ioe = ProtobufUtil.getRemoteException(se);
1095      if (ioe instanceof PleaseHoldException) {
1096        LOG.trace("Failed to report region sizes to Master because it is initializing."
1097          + " This will be retried.", ioe);
1098        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1099        return true;
1100      }
1101      if (rssStub == rss) {
1102        rssStub = null;
1103      }
1104      createRegionServerStatusStub(true);
1105      if (ioe instanceof DoNotRetryIOException) {
1106        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1107        if (doNotRetryEx.getCause() != null) {
1108          Throwable t = doNotRetryEx.getCause();
1109          if (t instanceof UnsupportedOperationException) {
1110            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1111            return false;
1112          }
1113        }
1114      }
1115      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1116    }
1117    return true;
1118  }
1119
1120  /**
1121   * Builds the region size report and sends it to the master. Upon successful sending of the
1122   * report, the region sizes that were sent are marked as sent.
1123   * @param rss             The stub to send to the Master
1124   * @param regionSizeStore The store containing region sizes
1125   */
1126  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1127    RegionSizeStore regionSizeStore) throws ServiceException {
1128    RegionSpaceUseReportRequest request =
1129      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1130    rss.reportRegionSpaceUse(null, request);
1131    // Record the number of size reports sent
1132    if (metricsRegionServer != null) {
1133      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1134    }
1135  }
1136
1137  /**
1138   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1139   * @param regionSizes The size in bytes of regions
1140   * @return The corresponding protocol buffer message.
1141   */
1142  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1143    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1144    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1145      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1146    }
1147    return request.build();
1148  }
1149
1150  /**
1151   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1152   * message.
1153   * @param regionInfo  The RegionInfo
1154   * @param sizeInBytes The size in bytes of the Region
1155   * @return The protocol buffer
1156   */
1157  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1158    return RegionSpaceUse.newBuilder()
1159      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1160      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1161  }
1162
1163  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1164    throws IOException {
1165    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1166    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1167    // the wrapper to compute those numbers in one place.
1168    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1169    // Instead they should be stored in an HBase table so that external visibility into HBase is
1170    // improved; Additionally the load balancer will be able to take advantage of a more complete
1171    // history.
1172    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1173    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1174    long usedMemory = -1L;
1175    long maxMemory = -1L;
1176    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1177    if (usage != null) {
1178      usedMemory = usage.getUsed();
1179      maxMemory = usage.getMax();
1180    }
1181
1182    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1183    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1184    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1185    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1186    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1187    serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1188    serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1189    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1190    Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder();
1191    for (String coprocessor : coprocessors) {
1192      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1193    }
1194    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1195    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1196    for (HRegion region : regions) {
1197      if (region.getCoprocessorHost() != null) {
1198        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1199        for (String regionCoprocessor : regionCoprocessors) {
1200          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1201        }
1202      }
1203      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1204      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1205        .getCoprocessors()) {
1206        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1207      }
1208    }
1209    serverLoad.setReportStartTime(reportStartTime);
1210    serverLoad.setReportEndTime(reportEndTime);
1211    if (this.infoServer != null) {
1212      serverLoad.setInfoServerPort(this.infoServer.getPort());
1213    } else {
1214      serverLoad.setInfoServerPort(-1);
1215    }
1216    MetricsUserAggregateSource userSource =
1217      metricsRegionServer.getMetricsUserAggregate().getSource();
1218    if (userSource != null) {
1219      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1220      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1221        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1222      }
1223    }
1224
1225    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1226      // always refresh first to get the latest value
1227      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1228      if (rLoad != null) {
1229        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1230        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1231          .getReplicationLoadSourceEntries()) {
1232          serverLoad.addReplLoadSource(rLS);
1233        }
1234      }
1235    } else {
1236      if (replicationSourceHandler != null) {
1237        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1238        if (rLoad != null) {
1239          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1240            .getReplicationLoadSourceEntries()) {
1241            serverLoad.addReplLoadSource(rLS);
1242          }
1243        }
1244      }
1245      if (replicationSinkHandler != null) {
1246        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1247        if (rLoad != null) {
1248          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1249        }
1250      }
1251    }
1252
1253    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1254      .newBuilder().setDescription(task.getDescription())
1255      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1256      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1257      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1258
1259    return serverLoad.build();
1260  }
1261
1262  private String getOnlineRegionsAsPrintableString() {
1263    StringBuilder sb = new StringBuilder();
1264    for (Region r : this.onlineRegions.values()) {
1265      if (sb.length() > 0) {
1266        sb.append(", ");
1267      }
1268      sb.append(r.getRegionInfo().getEncodedName());
1269    }
1270    return sb.toString();
1271  }
1272
1273  /**
1274   * Wait on regions close.
1275   */
1276  private void waitOnAllRegionsToClose(final boolean abort) {
1277    // Wait till all regions are closed before going out.
1278    int lastCount = -1;
1279    long previousLogTime = 0;
1280    Set<String> closedRegions = new HashSet<>();
1281    boolean interrupted = false;
1282    try {
1283      while (!onlineRegions.isEmpty()) {
1284        int count = getNumberOfOnlineRegions();
1285        // Only print a message if the count of regions has changed.
1286        if (count != lastCount) {
1287          // Log every second at most
1288          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1289            previousLogTime = EnvironmentEdgeManager.currentTime();
1290            lastCount = count;
1291            LOG.info("Waiting on " + count + " regions to close");
1292            // Only print out regions still closing if a small number else will
1293            // swamp the log.
1294            if (count < 10 && LOG.isDebugEnabled()) {
1295              LOG.debug("Online Regions=" + this.onlineRegions);
1296            }
1297          }
1298        }
1299        // Ensure all user regions have been sent a close. Use this to
1300        // protect against the case where an open comes in after we start the
1301        // iterator of onlineRegions to close all user regions.
1302        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1303          RegionInfo hri = e.getValue().getRegionInfo();
1304          if (
1305            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1306              && !closedRegions.contains(hri.getEncodedName())
1307          ) {
1308            closedRegions.add(hri.getEncodedName());
1309            // Don't update zk with this close transition; pass false.
1310            closeRegionIgnoreErrors(hri, abort);
1311          }
1312        }
1313        // No regions in RIT, we could stop waiting now.
1314        if (this.regionsInTransitionInRS.isEmpty()) {
1315          if (!onlineRegions.isEmpty()) {
1316            LOG.info("We were exiting though online regions are not empty,"
1317              + " because some regions failed closing");
1318          }
1319          break;
1320        } else {
1321          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1322            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1323        }
1324        if (sleepInterrupted(200)) {
1325          interrupted = true;
1326        }
1327      }
1328    } finally {
1329      if (interrupted) {
1330        Thread.currentThread().interrupt();
1331      }
1332    }
1333  }
1334
1335  private static boolean sleepInterrupted(long millis) {
1336    boolean interrupted = false;
1337    try {
1338      Thread.sleep(millis);
1339    } catch (InterruptedException e) {
1340      LOG.warn("Interrupted while sleeping");
1341      interrupted = true;
1342    }
1343    return interrupted;
1344  }
1345
1346  private void shutdownWAL(final boolean close) {
1347    if (this.walFactory != null) {
1348      try {
1349        if (close) {
1350          walFactory.close();
1351        } else {
1352          walFactory.shutdown();
1353        }
1354      } catch (Throwable e) {
1355        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1356        LOG.error("Shutdown / close of WAL failed: " + e);
1357        LOG.debug("Shutdown / close exception details:", e);
1358      }
1359    }
1360  }
1361
1362  /**
1363   * Run init. Sets up wal and starts up all server threads.
1364   * @param c Extra configuration.
1365   */
1366  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1367    throws IOException {
1368    try {
1369      boolean updateRootDir = false;
1370      for (NameStringPair e : c.getMapEntriesList()) {
1371        String key = e.getName();
1372        // The hostname the master sees us as.
1373        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1374          String hostnameFromMasterPOV = e.getValue();
1375          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1376            rpcServices.getSocketAddress().getPort(), this.startcode);
1377          String expectedHostName = rpcServices.getSocketAddress().getHostName();
1378          // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it
1379          // is set to disable. so we will use the ip of the RegionServer to compare with the
1380          // hostname passed by the Master, see HBASE-27304 for details.
1381          if (
1382            StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent()
1383              && InetAddresses.isInetAddress(getActiveMaster().get().getHostname())
1384          ) {
1385            expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress();
1386          }
1387          boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead)
1388            ? hostnameFromMasterPOV.equals(expectedHostName)
1389            : hostnameFromMasterPOV.equals(useThisHostnameInstead);
1390          if (!isHostnameConsist) {
1391            String msg = "Master passed us a different hostname to use; was="
1392              + (StringUtils.isBlank(useThisHostnameInstead)
1393                ? rpcServices.getSocketAddress().getHostName()
1394                : this.useThisHostnameInstead)
1395              + ", but now=" + hostnameFromMasterPOV;
1396            LOG.error(msg);
1397            throw new IOException(msg);
1398          }
1399          continue;
1400        }
1401
1402        String value = e.getValue();
1403        if (key.equals(HConstants.HBASE_DIR)) {
1404          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1405            updateRootDir = true;
1406          }
1407        }
1408
1409        if (LOG.isDebugEnabled()) {
1410          LOG.debug("Config from master: " + key + "=" + value);
1411        }
1412        this.conf.set(key, value);
1413      }
1414      // Set our ephemeral znode up in zookeeper now we have a name.
1415      createMyEphemeralNode();
1416
1417      if (updateRootDir) {
1418        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1419        initializeFileSystem();
1420      }
1421
1422      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1423      // config param for task trackers, but we can piggyback off of it.
1424      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1425        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1426      }
1427
1428      // Save it in a file, this will allow to see if we crash
1429      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1430
1431      // This call sets up an initialized replication and WAL. Later we start it up.
1432      setupWALAndReplication();
1433      // Init in here rather than in constructor after thread name has been set
1434      final MetricsTable metricsTable =
1435        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1436      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1437      this.metricsRegionServer =
1438        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1439      // Now that we have a metrics source, start the pause monitor
1440      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1441      pauseMonitor.start();
1442
1443      // There is a rare case where we do NOT want services to start. Check config.
1444      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1445        startServices();
1446      }
1447      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1448      // or make sense of it.
1449      startReplicationService();
1450
1451      // Set up ZK
1452      LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.getSocketAddress()
1453        + ", sessionid=0x"
1454        + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1455
1456      // Wake up anyone waiting for this server to online
1457      synchronized (online) {
1458        online.set(true);
1459        online.notifyAll();
1460      }
1461    } catch (Throwable e) {
1462      stop("Failed initialization");
1463      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1464    } finally {
1465      sleeper.skipSleepCycle();
1466    }
1467  }
1468
1469  private void startHeapMemoryManager() {
1470    if (this.blockCache != null) {
1471      this.hMemManager =
1472        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1473      this.hMemManager.start(getChoreService());
1474    }
1475  }
1476
1477  private void createMyEphemeralNode() throws KeeperException {
1478    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1479    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1480    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1481    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1482    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1483  }
1484
1485  private void deleteMyEphemeralNode() throws KeeperException {
1486    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1487  }
1488
1489  @Override
1490  public RegionServerAccounting getRegionServerAccounting() {
1491    return regionServerAccounting;
1492  }
1493
1494  // Round the size with KB or MB.
1495  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1496  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1497  // HBASE-26340 for more details on why this is important.
1498  private static int roundSize(long sizeInByte, int sizeUnit) {
1499    if (sizeInByte == 0) {
1500      return 0;
1501    } else if (sizeInByte < sizeUnit) {
1502      return 1;
1503    } else {
1504      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1505    }
1506  }
1507
1508  /**
1509   * @param r               Region to get RegionLoad for.
1510   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1511   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1512   * @return RegionLoad instance.
1513   */
1514  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1515    RegionSpecifier.Builder regionSpecifier) throws IOException {
1516    byte[] name = r.getRegionInfo().getRegionName();
1517    int stores = 0;
1518    int storefiles = 0;
1519    int storeRefCount = 0;
1520    int maxCompactedStoreFileRefCount = 0;
1521    long storeUncompressedSize = 0L;
1522    long storefileSize = 0L;
1523    long storefileIndexSize = 0L;
1524    long rootLevelIndexSize = 0L;
1525    long totalStaticIndexSize = 0L;
1526    long totalStaticBloomSize = 0L;
1527    long totalCompactingKVs = 0L;
1528    long currentCompactedKVs = 0L;
1529    List<HStore> storeList = r.getStores();
1530    stores += storeList.size();
1531    for (HStore store : storeList) {
1532      storefiles += store.getStorefilesCount();
1533      int currentStoreRefCount = store.getStoreRefCount();
1534      storeRefCount += currentStoreRefCount;
1535      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1536      maxCompactedStoreFileRefCount =
1537        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1538      storeUncompressedSize += store.getStoreSizeUncompressed();
1539      storefileSize += store.getStorefilesSize();
1540      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1541      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1542      CompactionProgress progress = store.getCompactionProgress();
1543      if (progress != null) {
1544        totalCompactingKVs += progress.getTotalCompactingKVs();
1545        currentCompactedKVs += progress.currentCompactedKVs;
1546      }
1547      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1548      totalStaticIndexSize += store.getTotalStaticIndexSize();
1549      totalStaticBloomSize += store.getTotalStaticBloomSize();
1550    }
1551
1552    int unitMB = 1024 * 1024;
1553    int unitKB = 1024;
1554
1555    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1556    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1557    int storefileSizeMB = roundSize(storefileSize, unitMB);
1558    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1559    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1560    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1561    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1562
1563    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1564    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1565    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1566    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1567    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1568    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1569    if (regionLoadBldr == null) {
1570      regionLoadBldr = RegionLoad.newBuilder();
1571    }
1572    if (regionSpecifier == null) {
1573      regionSpecifier = RegionSpecifier.newBuilder();
1574    }
1575
1576    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1577    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1578    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1579      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1580      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1581      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1582      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1583      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1584      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1585      .setReadRequestsCount(r.getReadRequestsCount()).setCpRequestsCount(r.getCpRequestsCount())
1586      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1587      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1588      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1589      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1590      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1591      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1592      .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1593    r.setCompleteSequenceId(regionLoadBldr);
1594    return regionLoadBldr.build();
1595  }
1596
1597  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1598    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1599    userLoadBldr.setUserName(user);
1600    userSource.getClientMetrics().values().stream()
1601      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1602        .setHostName(clientMetrics.getHostName())
1603        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1604        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1605        .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1606      .forEach(userLoadBldr::addClientMetrics);
1607    return userLoadBldr.build();
1608  }
1609
1610  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1611    HRegion r = onlineRegions.get(encodedRegionName);
1612    return r != null ? createRegionLoad(r, null, null) : null;
1613  }
1614
1615  /**
1616   * Inner class that runs on a long period checking if regions need compaction.
1617   */
1618  private static class CompactionChecker extends ScheduledChore {
1619    private final HRegionServer instance;
1620    private final int majorCompactPriority;
1621    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1622    // Iteration is 1-based rather than 0-based so we don't check for compaction
1623    // immediately upon region server startup
1624    private long iteration = 1;
1625
1626    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1627      super("CompactionChecker", stopper, sleepTime);
1628      this.instance = h;
1629      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1630
1631      /*
1632       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1633       */
1634      this.majorCompactPriority = this.instance.conf
1635        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1636    }
1637
1638    @Override
1639    protected void chore() {
1640      for (Region r : this.instance.onlineRegions.values()) {
1641        // Skip compaction if region is read only
1642        if (r == null || r.isReadOnly()) {
1643          continue;
1644        }
1645
1646        HRegion hr = (HRegion) r;
1647        for (HStore s : hr.stores.values()) {
1648          try {
1649            long multiplier = s.getCompactionCheckMultiplier();
1650            assert multiplier > 0;
1651            if (iteration % multiplier != 0) {
1652              continue;
1653            }
1654            if (s.needsCompaction()) {
1655              // Queue a compaction. Will recognize if major is needed.
1656              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1657                getName() + " requests compaction");
1658            } else if (s.shouldPerformMajorCompaction()) {
1659              s.triggerMajorCompaction();
1660              if (
1661                majorCompactPriority == DEFAULT_PRIORITY
1662                  || majorCompactPriority > hr.getCompactPriority()
1663              ) {
1664                this.instance.compactSplitThread.requestCompaction(hr, s,
1665                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
1666                  CompactionLifeCycleTracker.DUMMY, null);
1667              } else {
1668                this.instance.compactSplitThread.requestCompaction(hr, s,
1669                  getName() + " requests major compaction; use configured priority",
1670                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1671              }
1672            }
1673          } catch (IOException e) {
1674            LOG.warn("Failed major compaction check on " + r, e);
1675          }
1676        }
1677      }
1678      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1679    }
1680  }
1681
1682  private static class PeriodicMemStoreFlusher extends ScheduledChore {
1683    private final HRegionServer server;
1684    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1685    private final static int MIN_DELAY_TIME = 0; // millisec
1686    private final long rangeOfDelayMs;
1687
1688    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1689      super("MemstoreFlusherChore", server, cacheFlushInterval);
1690      this.server = server;
1691
1692      final long configuredRangeOfDelay = server.getConfiguration()
1693        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1694      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1695    }
1696
1697    @Override
1698    protected void chore() {
1699      final StringBuilder whyFlush = new StringBuilder();
1700      for (HRegion r : this.server.onlineRegions.values()) {
1701        if (r == null) {
1702          continue;
1703        }
1704        if (r.shouldFlush(whyFlush)) {
1705          FlushRequester requester = server.getFlushRequester();
1706          if (requester != null) {
1707            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1708            // Throttle the flushes by putting a delay. If we don't throttle, and there
1709            // is a balanced write-load on the regions in a table, we might end up
1710            // overwhelming the filesystem with too many flushes at once.
1711            if (requester.requestDelayedFlush(r, delay)) {
1712              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
1713                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
1714            }
1715          }
1716        }
1717      }
1718    }
1719  }
1720
1721  /**
1722   * Report the status of the server. A server is online once all the startup is completed (setting
1723   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
1724   * useful in tests.
1725   * @return true if online, false if not.
1726   */
1727  public boolean isOnline() {
1728    return online.get();
1729  }
1730
1731  /**
1732   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1733   * be hooked up to WAL.
1734   */
1735  private void setupWALAndReplication() throws IOException {
1736    WALFactory factory = new WALFactory(conf, serverName, this, true);
1737    // TODO Replication make assumptions here based on the default filesystem impl
1738    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1739    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1740
1741    Path logDir = new Path(walRootDir, logName);
1742    LOG.debug("logDir={}", logDir);
1743    if (this.walFs.exists(logDir)) {
1744      throw new RegionServerRunningException(
1745        "Region server has already created directory at " + this.serverName.toString());
1746    }
1747    // Always create wal directory as now we need this when master restarts to find out the live
1748    // region servers.
1749    if (!this.walFs.mkdirs(logDir)) {
1750      throw new IOException("Can not create wal directory " + logDir);
1751    }
1752    // Instantiate replication if replication enabled. Pass it the log directories.
1753    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1754
1755    WALActionsListener walEventListener = getWALEventTrackerListener(conf);
1756    if (walEventListener != null && factory.getWALProvider() != null) {
1757      factory.getWALProvider().addWALActionsListener(walEventListener);
1758    }
1759    this.walFactory = factory;
1760  }
1761
1762  private WALActionsListener getWALEventTrackerListener(Configuration conf) {
1763    if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
1764      WALEventTrackerListener listener =
1765        new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
1766      return listener;
1767    }
1768    return null;
1769  }
1770
1771  /**
1772   * Start up replication source and sink handlers.
1773   */
1774  private void startReplicationService() throws IOException {
1775    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1776      this.replicationSourceHandler.startReplicationService();
1777    } else {
1778      if (this.replicationSourceHandler != null) {
1779        this.replicationSourceHandler.startReplicationService();
1780      }
1781      if (this.replicationSinkHandler != null) {
1782        this.replicationSinkHandler.startReplicationService();
1783      }
1784    }
1785  }
1786
1787  /** Returns Master address tracker instance. */
1788  public MasterAddressTracker getMasterAddressTracker() {
1789    return this.masterAddressTracker;
1790  }
1791
1792  /**
1793   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
1794   * to run. This is called after we've successfully registered with the Master. Install an
1795   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
1796   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
1797   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
1798   * to run should trigger same critical condition and the shutdown will run. On its way out, this
1799   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
1800   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
1801   * by this hosting server. Worker logs the exception and exits.
1802   */
1803  private void startServices() throws IOException {
1804    if (!isStopped() && !isAborted()) {
1805      initializeThreads();
1806    }
1807    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1808    this.secureBulkLoadManager.start();
1809
1810    // Health checker thread.
1811    if (isHealthCheckerConfigured()) {
1812      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1813        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1814      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1815    }
1816    // Executor status collect thread.
1817    if (
1818      this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1819        HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)
1820    ) {
1821      int sleepTime =
1822        this.conf.getInt(ExecutorStatusChore.WAKE_FREQ, ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1823      executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1824        this.metricsRegionServer.getMetricsSource());
1825    }
1826
1827    this.walRoller = new LogRoller(this);
1828    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1829    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1830
1831    // Create the CompactedFileDischarger chore executorService. This chore helps to
1832    // remove the compacted files that will no longer be used in reads.
1833    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1834    // 2 mins so that compacted files can be archived before the TTLCleaner runs
1835    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1836    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
1837    choreService.scheduleChore(compactedFileDischarger);
1838
1839    // Start executor services
1840    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1841    executorService.startExecutorService(executorService.new ExecutorConfig()
1842      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1843    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1844    executorService.startExecutorService(executorService.new ExecutorConfig()
1845      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1846    final int openPriorityRegionThreads =
1847      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1848    executorService.startExecutorService(
1849      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
1850        .setCorePoolSize(openPriorityRegionThreads));
1851    final int closeRegionThreads =
1852      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1853    executorService.startExecutorService(executorService.new ExecutorConfig()
1854      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1855    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1856    executorService.startExecutorService(executorService.new ExecutorConfig()
1857      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1858    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1859      final int storeScannerParallelSeekThreads =
1860        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1861      executorService.startExecutorService(
1862        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
1863          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
1864    }
1865    final int logReplayOpsThreads =
1866      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1867    executorService.startExecutorService(
1868      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
1869        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
1870    // Start the threads for compacted files discharger
1871    final int compactionDischargerThreads =
1872      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1873    executorService.startExecutorService(executorService.new ExecutorConfig()
1874      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
1875      .setCorePoolSize(compactionDischargerThreads));
1876    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1877      final int regionReplicaFlushThreads =
1878        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1879          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1880      executorService.startExecutorService(executorService.new ExecutorConfig()
1881        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
1882        .setCorePoolSize(regionReplicaFlushThreads));
1883    }
1884    final int refreshPeerThreads =
1885      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1886    executorService.startExecutorService(executorService.new ExecutorConfig()
1887      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1888    final int replaySyncReplicationWALThreads =
1889      conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1890    executorService.startExecutorService(executorService.new ExecutorConfig()
1891      .setExecutorType(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL)
1892      .setCorePoolSize(replaySyncReplicationWALThreads));
1893    final int switchRpcThrottleThreads =
1894      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1895    executorService.startExecutorService(
1896      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
1897        .setCorePoolSize(switchRpcThrottleThreads));
1898    final int claimReplicationQueueThreads =
1899      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1900    executorService.startExecutorService(
1901      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
1902        .setCorePoolSize(claimReplicationQueueThreads));
1903    final int rsSnapshotOperationThreads =
1904      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1905    executorService.startExecutorService(
1906      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
1907        .setCorePoolSize(rsSnapshotOperationThreads));
1908
1909    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1910      uncaughtExceptionHandler);
1911    if (this.cacheFlusher != null) {
1912      this.cacheFlusher.start(uncaughtExceptionHandler);
1913    }
1914    Threads.setDaemonThreadRunning(this.procedureResultReporter,
1915      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1916
1917    if (this.compactionChecker != null) {
1918      choreService.scheduleChore(compactionChecker);
1919    }
1920    if (this.periodicFlusher != null) {
1921      choreService.scheduleChore(periodicFlusher);
1922    }
1923    if (this.healthCheckChore != null) {
1924      choreService.scheduleChore(healthCheckChore);
1925    }
1926    if (this.executorStatusChore != null) {
1927      choreService.scheduleChore(executorStatusChore);
1928    }
1929    if (this.nonceManagerChore != null) {
1930      choreService.scheduleChore(nonceManagerChore);
1931    }
1932    if (this.storefileRefresher != null) {
1933      choreService.scheduleChore(storefileRefresher);
1934    }
1935    if (this.fsUtilizationChore != null) {
1936      choreService.scheduleChore(fsUtilizationChore);
1937    }
1938    if (this.namedQueueServiceChore != null) {
1939      choreService.scheduleChore(namedQueueServiceChore);
1940    }
1941    if (this.brokenStoreFileCleaner != null) {
1942      choreService.scheduleChore(brokenStoreFileCleaner);
1943    }
1944    if (this.rsMobFileCleanerChore != null) {
1945      choreService.scheduleChore(rsMobFileCleanerChore);
1946    }
1947    if (replicationMarkerChore != null) {
1948      LOG.info("Starting replication marker chore");
1949      choreService.scheduleChore(replicationMarkerChore);
1950    }
1951
1952    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1953    // an unhandled exception, it will just exit.
1954    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
1955      uncaughtExceptionHandler);
1956
1957    // Create the log splitting worker and start it
1958    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1959    // quite a while inside Connection layer. The worker won't be available for other
1960    // tasks even after current task is preempted after a split task times out.
1961    Configuration sinkConf = HBaseConfiguration.create(conf);
1962    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1963      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1964    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1965      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1966    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
1967    if (
1968      this.csm != null
1969        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
1970    ) {
1971      // SplitLogWorker needs csm. If none, don't start this.
1972      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
1973      splitLogWorker.start();
1974      LOG.debug("SplitLogWorker started");
1975    }
1976
1977    // Memstore services.
1978    startHeapMemoryManager();
1979    // Call it after starting HeapMemoryManager.
1980    initializeMemStoreChunkCreator(hMemManager);
1981  }
1982
1983  private void initializeThreads() {
1984    // Cache flushing thread.
1985    this.cacheFlusher = new MemStoreFlusher(conf, this);
1986
1987    // Compaction thread
1988    this.compactSplitThread = new CompactSplit(this);
1989
1990    // Background thread to check for compactions; needed if region has not gotten updates
1991    // in a while. It will take care of not checking too frequently on store-by-store basis.
1992    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
1993    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
1994    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
1995
1996    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
1997      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
1998    final boolean walEventTrackerEnabled =
1999      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
2000
2001    if (isSlowLogTableEnabled || walEventTrackerEnabled) {
2002      // default chore duration: 10 min
2003      // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
2004      final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
2005        DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
2006
2007      final int namedQueueChoreDuration =
2008        conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
2009      // Considering min of slowLogChoreDuration and namedQueueChoreDuration
2010      int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
2011
2012      namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
2013        this.namedQueueRecorder, this.getConnection());
2014    }
2015
2016    if (this.nonceManager != null) {
2017      // Create the scheduled chore that cleans up nonces.
2018      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2019    }
2020
2021    // Setup the Quota Manager
2022    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2023    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2024
2025    if (QuotaUtil.isQuotaEnabled(conf)) {
2026      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2027    }
2028
2029    boolean onlyMetaRefresh = false;
2030    int storefileRefreshPeriod =
2031      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2032        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2033    if (storefileRefreshPeriod == 0) {
2034      storefileRefreshPeriod =
2035        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2036          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2037      onlyMetaRefresh = true;
2038    }
2039    if (storefileRefreshPeriod > 0) {
2040      this.storefileRefresher =
2041        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
2042    }
2043
2044    int brokenStoreFileCleanerPeriod =
2045      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2046        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2047    int brokenStoreFileCleanerDelay =
2048      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2049        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2050    double brokenStoreFileCleanerDelayJitter =
2051      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2052        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2053    double jitterRate =
2054      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2055    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2056    this.brokenStoreFileCleaner =
2057      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2058        brokenStoreFileCleanerPeriod, this, conf, this);
2059
2060    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
2061
2062    registerConfigurationObservers();
2063    initializeReplicationMarkerChore();
2064  }
2065
2066  private void registerConfigurationObservers() {
2067    // Register Replication if possible, as now we support recreating replication peer storage, for
2068    // migrating across different replication peer storages online
2069    if (replicationSourceHandler instanceof ConfigurationObserver) {
2070      configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
2071    }
2072    if (!sameReplicationSourceAndSink && replicationSinkHandler instanceof ConfigurationObserver) {
2073      configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
2074    }
2075    // Registering the compactSplitThread object with the ConfigurationManager.
2076    configurationManager.registerObserver(this.compactSplitThread);
2077    configurationManager.registerObserver(this.cacheFlusher);
2078    configurationManager.registerObserver(this.rpcServices);
2079    configurationManager.registerObserver(this);
2080  }
2081
2082  /*
2083   * Verify that server is healthy
2084   */
2085  private boolean isHealthy() {
2086    if (!dataFsOk) {
2087      // File system problem
2088      return false;
2089    }
2090    // Verify that all threads are alive
2091    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2092      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2093      && (this.walRoller == null || this.walRoller.isAlive())
2094      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2095      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2096    if (!healthy) {
2097      stop("One or more threads are no longer alive -- stop");
2098    }
2099    return healthy;
2100  }
2101
2102  @Override
2103  public List<WAL> getWALs() {
2104    return walFactory.getWALs();
2105  }
2106
2107  @Override
2108  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2109    WAL wal = walFactory.getWAL(regionInfo);
2110    if (this.walRoller != null) {
2111      this.walRoller.addWAL(wal);
2112    }
2113    return wal;
2114  }
2115
2116  public LogRoller getWalRoller() {
2117    return walRoller;
2118  }
2119
2120  WALFactory getWalFactory() {
2121    return walFactory;
2122  }
2123
2124  @Override
2125  public void stop(final String msg) {
2126    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2127  }
2128
2129  /**
2130   * Stops the regionserver.
2131   * @param msg   Status message
2132   * @param force True if this is a regionserver abort
2133   * @param user  The user executing the stop request, or null if no user is associated
2134   */
2135  public void stop(final String msg, final boolean force, final User user) {
2136    if (!this.stopped) {
2137      LOG.info("***** STOPPING region server '" + this + "' *****");
2138      if (this.rsHost != null) {
2139        // when forced via abort don't allow CPs to override
2140        try {
2141          this.rsHost.preStop(msg, user);
2142        } catch (IOException ioe) {
2143          if (!force) {
2144            LOG.warn("The region server did not stop", ioe);
2145            return;
2146          }
2147          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2148        }
2149      }
2150      this.stopped = true;
2151      LOG.info("STOPPED: " + msg);
2152      // Wakes run() if it is sleeping
2153      sleeper.skipSleepCycle();
2154    }
2155  }
2156
2157  public void waitForServerOnline() {
2158    while (!isStopped() && !isOnline()) {
2159      synchronized (online) {
2160        try {
2161          online.wait(msgInterval);
2162        } catch (InterruptedException ie) {
2163          Thread.currentThread().interrupt();
2164          break;
2165        }
2166      }
2167    }
2168  }
2169
2170  @Override
2171  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2172    HRegion r = context.getRegion();
2173    long openProcId = context.getOpenProcId();
2174    long masterSystemTime = context.getMasterSystemTime();
2175    rpcServices.checkOpen();
2176    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2177      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2178    // Do checks to see if we need to compact (references or too many files)
2179    // Skip compaction check if region is read only
2180    if (!r.isReadOnly()) {
2181      for (HStore s : r.stores.values()) {
2182        if (s.hasReferences() || s.needsCompaction()) {
2183          this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2184        }
2185      }
2186    }
2187    long openSeqNum = r.getOpenSeqNum();
2188    if (openSeqNum == HConstants.NO_SEQNUM) {
2189      // If we opened a region, we should have read some sequence number from it.
2190      LOG.error(
2191        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2192      openSeqNum = 0;
2193    }
2194
2195    // Notify master
2196    if (
2197      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2198        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))
2199    ) {
2200      throw new IOException(
2201        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2202    }
2203
2204    triggerFlushInPrimaryRegion(r);
2205
2206    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2207  }
2208
2209  /**
2210   * Helper method for use in tests. Skip the region transition report when there's no master around
2211   * to receive it.
2212   */
2213  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2214    final TransitionCode code = context.getCode();
2215    final long openSeqNum = context.getOpenSeqNum();
2216    long masterSystemTime = context.getMasterSystemTime();
2217    final RegionInfo[] hris = context.getHris();
2218
2219    if (code == TransitionCode.OPENED) {
2220      Preconditions.checkArgument(hris != null && hris.length == 1);
2221      if (hris[0].isMetaRegion()) {
2222        LOG.warn(
2223          "meta table location is stored in master local store, so we can not skip reporting");
2224        return false;
2225      } else {
2226        try {
2227          MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2228            serverName, openSeqNum, masterSystemTime);
2229        } catch (IOException e) {
2230          LOG.info("Failed to update meta", e);
2231          return false;
2232        }
2233      }
2234    }
2235    return true;
2236  }
2237
2238  private ReportRegionStateTransitionRequest
2239    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2240    final TransitionCode code = context.getCode();
2241    final long openSeqNum = context.getOpenSeqNum();
2242    final RegionInfo[] hris = context.getHris();
2243    final long[] procIds = context.getProcIds();
2244
2245    ReportRegionStateTransitionRequest.Builder builder =
2246      ReportRegionStateTransitionRequest.newBuilder();
2247    builder.setServer(ProtobufUtil.toServerName(serverName));
2248    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2249    transition.setTransitionCode(code);
2250    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2251      transition.setOpenSeqNum(openSeqNum);
2252    }
2253    for (RegionInfo hri : hris) {
2254      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2255    }
2256    for (long procId : procIds) {
2257      transition.addProcId(procId);
2258    }
2259
2260    return builder.build();
2261  }
2262
2263  @Override
2264  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2265    if (TEST_SKIP_REPORTING_TRANSITION) {
2266      return skipReportingTransition(context);
2267    }
2268    final ReportRegionStateTransitionRequest request =
2269      createReportRegionStateTransitionRequest(context);
2270
2271    int tries = 0;
2272    long pauseTime = this.retryPauseTime;
2273    // Keep looping till we get an error. We want to send reports even though server is going down.
2274    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2275    // HRegionServer does down.
2276    while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2277      RegionServerStatusService.BlockingInterface rss = rssStub;
2278      try {
2279        if (rss == null) {
2280          createRegionServerStatusStub();
2281          continue;
2282        }
2283        ReportRegionStateTransitionResponse response =
2284          rss.reportRegionStateTransition(null, request);
2285        if (response.hasErrorMessage()) {
2286          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2287          break;
2288        }
2289        // Log if we had to retry else don't log unless TRACE. We want to
2290        // know if were successful after an attempt showed in logs as failed.
2291        if (tries > 0 || LOG.isTraceEnabled()) {
2292          LOG.info("TRANSITION REPORTED " + request);
2293        }
2294        // NOTE: Return mid-method!!!
2295        return true;
2296      } catch (ServiceException se) {
2297        IOException ioe = ProtobufUtil.getRemoteException(se);
2298        boolean pause = ioe instanceof ServerNotRunningYetException
2299          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2300        if (pause) {
2301          // Do backoff else we flood the Master with requests.
2302          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2303        } else {
2304          pauseTime = this.retryPauseTime; // Reset.
2305        }
2306        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2307          + tries + ")"
2308          + (pause
2309            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2310            : " immediately."),
2311          ioe);
2312        if (pause) {
2313          Threads.sleep(pauseTime);
2314        }
2315        tries++;
2316        if (rssStub == rss) {
2317          rssStub = null;
2318        }
2319      }
2320    }
2321    return false;
2322  }
2323
2324  /**
2325   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2326   * block this thread. See RegionReplicaFlushHandler for details.
2327   */
2328  private void triggerFlushInPrimaryRegion(final HRegion region) {
2329    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2330      return;
2331    }
2332    TableName tn = region.getTableDescriptor().getTableName();
2333    if (
2334      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2335        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2336        // If the memstore replication not setup, we do not have to wait for observing a flush event
2337        // from primary before starting to serve reads, because gaps from replication is not
2338        // applicable,this logic is from
2339        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2340        // HBASE-13063
2341        !region.getTableDescriptor().hasRegionMemStoreReplication()
2342    ) {
2343      region.setReadsEnabled(true);
2344      return;
2345    }
2346
2347    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2348    // RegionReplicaFlushHandler might reset this.
2349
2350    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2351    if (this.executorService != null) {
2352      this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2353    } else {
2354      LOG.info("Executor is null; not running flush of primary region replica for {}",
2355        region.getRegionInfo());
2356    }
2357  }
2358
2359  @InterfaceAudience.Private
2360  public RSRpcServices getRSRpcServices() {
2361    return rpcServices;
2362  }
2363
2364  /**
2365   * Cause the server to exit without closing the regions it is serving, the log it is using and
2366   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2367   * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused
2368   * the abort, or null
2369   */
2370  @Override
2371  public void abort(String reason, Throwable cause) {
2372    if (!setAbortRequested()) {
2373      // Abort already in progress, ignore the new request.
2374      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2375      return;
2376    }
2377    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2378    if (cause != null) {
2379      LOG.error(HBaseMarkers.FATAL, msg, cause);
2380    } else {
2381      LOG.error(HBaseMarkers.FATAL, msg);
2382    }
2383    // HBASE-4014: show list of coprocessors that were loaded to help debug
2384    // regionserver crashes.Note that we're implicitly using
2385    // java.util.HashSet's toString() method to print the coprocessor names.
2386    LOG.error(HBaseMarkers.FATAL,
2387      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2388    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2389    try {
2390      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2391    } catch (MalformedObjectNameException | IOException e) {
2392      LOG.warn("Failed dumping metrics", e);
2393    }
2394
2395    // Do our best to report our abort to the master, but this may not work
2396    try {
2397      if (cause != null) {
2398        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2399      }
2400      // Report to the master but only if we have already registered with the master.
2401      RegionServerStatusService.BlockingInterface rss = rssStub;
2402      if (rss != null && this.serverName != null) {
2403        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2404        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2405        builder.setErrorMessage(msg);
2406        rss.reportRSFatalError(null, builder.build());
2407      }
2408    } catch (Throwable t) {
2409      LOG.warn("Unable to report fatal error to master", t);
2410    }
2411
2412    scheduleAbortTimer();
2413    // shutdown should be run as the internal user
2414    stop(reason, true, null);
2415  }
2416
2417  /*
2418   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2419   * close socket in case want to bring up server on old hostname+port immediately.
2420   */
2421  @InterfaceAudience.Private
2422  protected void kill() {
2423    this.killed = true;
2424    abort("Simulated kill");
2425  }
2426
2427  // Limits the time spent in the shutdown process.
2428  private void scheduleAbortTimer() {
2429    if (this.abortMonitor == null) {
2430      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2431      TimerTask abortTimeoutTask = null;
2432      try {
2433        Constructor<? extends TimerTask> timerTaskCtor =
2434          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2435            .asSubclass(TimerTask.class).getDeclaredConstructor();
2436        timerTaskCtor.setAccessible(true);
2437        abortTimeoutTask = timerTaskCtor.newInstance();
2438      } catch (Exception e) {
2439        LOG.warn("Initialize abort timeout task failed", e);
2440      }
2441      if (abortTimeoutTask != null) {
2442        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2443      }
2444    }
2445  }
2446
2447  /**
2448   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2449   * called.
2450   */
2451  protected void stopServiceThreads() {
2452    // clean up the scheduled chores
2453    stopChoreService();
2454    if (bootstrapNodeManager != null) {
2455      bootstrapNodeManager.stop();
2456    }
2457    if (this.cacheFlusher != null) {
2458      this.cacheFlusher.shutdown();
2459    }
2460    if (this.walRoller != null) {
2461      this.walRoller.close();
2462    }
2463    if (this.compactSplitThread != null) {
2464      this.compactSplitThread.join();
2465    }
2466    stopExecutorService();
2467    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2468      this.replicationSourceHandler.stopReplicationService();
2469    } else {
2470      if (this.replicationSourceHandler != null) {
2471        this.replicationSourceHandler.stopReplicationService();
2472      }
2473      if (this.replicationSinkHandler != null) {
2474        this.replicationSinkHandler.stopReplicationService();
2475      }
2476    }
2477  }
2478
2479  /** Returns Return the object that implements the replication source executorService. */
2480  @Override
2481  public ReplicationSourceService getReplicationSourceService() {
2482    return replicationSourceHandler;
2483  }
2484
2485  /** Returns Return the object that implements the replication sink executorService. */
2486  public ReplicationSinkService getReplicationSinkService() {
2487    return replicationSinkHandler;
2488  }
2489
2490  /**
2491   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2492   * connection, the current rssStub must be null. Method will block until a master is available.
2493   * You can break from this block by requesting the server stop.
2494   * @return master + port, or null if server has been stopped
2495   */
2496  private synchronized ServerName createRegionServerStatusStub() {
2497    // Create RS stub without refreshing the master node from ZK, use cached data
2498    return createRegionServerStatusStub(false);
2499  }
2500
2501  /**
2502   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2503   * connection, the current rssStub must be null. Method will block until a master is available.
2504   * You can break from this block by requesting the server stop.
2505   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2506   * @return master + port, or null if server has been stopped
2507   */
2508  @InterfaceAudience.Private
2509  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2510    if (rssStub != null) {
2511      return masterAddressTracker.getMasterAddress();
2512    }
2513    ServerName sn = null;
2514    long previousLogTime = 0;
2515    RegionServerStatusService.BlockingInterface intRssStub = null;
2516    LockService.BlockingInterface intLockStub = null;
2517    boolean interrupted = false;
2518    try {
2519      while (keepLooping()) {
2520        sn = this.masterAddressTracker.getMasterAddress(refresh);
2521        if (sn == null) {
2522          if (!keepLooping()) {
2523            // give up with no connection.
2524            LOG.debug("No master found and cluster is stopped; bailing out");
2525            return null;
2526          }
2527          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2528            LOG.debug("No master found; retry");
2529            previousLogTime = EnvironmentEdgeManager.currentTime();
2530          }
2531          refresh = true; // let's try pull it from ZK directly
2532          if (sleepInterrupted(200)) {
2533            interrupted = true;
2534          }
2535          continue;
2536        }
2537        try {
2538          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
2539            userProvider.getCurrent(), shortOperationTimeout);
2540          intRssStub = RegionServerStatusService.newBlockingStub(channel);
2541          intLockStub = LockService.newBlockingStub(channel);
2542          break;
2543        } catch (IOException e) {
2544          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2545            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
2546            if (e instanceof ServerNotRunningYetException) {
2547              LOG.info("Master isn't available yet, retrying");
2548            } else {
2549              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2550            }
2551            previousLogTime = EnvironmentEdgeManager.currentTime();
2552          }
2553          if (sleepInterrupted(200)) {
2554            interrupted = true;
2555          }
2556        }
2557      }
2558    } finally {
2559      if (interrupted) {
2560        Thread.currentThread().interrupt();
2561      }
2562    }
2563    this.rssStub = intRssStub;
2564    this.lockStub = intLockStub;
2565    return sn;
2566  }
2567
2568  /**
2569   * @return True if we should break loop because cluster is going down or this server has been
2570   *         stopped or hdfs has gone bad.
2571   */
2572  private boolean keepLooping() {
2573    return !this.stopped && isClusterUp();
2574  }
2575
2576  /*
2577   * Let the master know we're here Run initialization using parameters passed us by the master.
2578   * @return A Map of key/value configurations we got from the Master else null if we failed to
2579   * register.
2580   */
2581  private RegionServerStartupResponse reportForDuty() throws IOException {
2582    if (this.masterless) {
2583      return RegionServerStartupResponse.getDefaultInstance();
2584    }
2585    ServerName masterServerName = createRegionServerStatusStub(true);
2586    RegionServerStatusService.BlockingInterface rss = rssStub;
2587    if (masterServerName == null || rss == null) {
2588      return null;
2589    }
2590    RegionServerStartupResponse result = null;
2591    try {
2592      rpcServices.requestCount.reset();
2593      rpcServices.rpcGetRequestCount.reset();
2594      rpcServices.rpcScanRequestCount.reset();
2595      rpcServices.rpcFullScanRequestCount.reset();
2596      rpcServices.rpcMultiRequestCount.reset();
2597      rpcServices.rpcMutateRequestCount.reset();
2598      LOG.info("reportForDuty to master=" + masterServerName + " with port="
2599        + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2600      long now = EnvironmentEdgeManager.currentTime();
2601      int port = rpcServices.getSocketAddress().getPort();
2602      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2603      if (!StringUtils.isBlank(useThisHostnameInstead)) {
2604        request.setUseThisHostnameInstead(useThisHostnameInstead);
2605      }
2606      request.setPort(port);
2607      request.setServerStartCode(this.startcode);
2608      request.setServerCurrentTime(now);
2609      result = rss.regionServerStartup(null, request.build());
2610    } catch (ServiceException se) {
2611      IOException ioe = ProtobufUtil.getRemoteException(se);
2612      if (ioe instanceof ClockOutOfSyncException) {
2613        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
2614        // Re-throw IOE will cause RS to abort
2615        throw ioe;
2616      } else if (ioe instanceof ServerNotRunningYetException) {
2617        LOG.debug("Master is not running yet");
2618      } else {
2619        LOG.warn("error telling master we are up", se);
2620      }
2621      rssStub = null;
2622    }
2623    return result;
2624  }
2625
2626  @Override
2627  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2628    try {
2629      GetLastFlushedSequenceIdRequest req =
2630        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2631      RegionServerStatusService.BlockingInterface rss = rssStub;
2632      if (rss == null) { // Try to connect one more time
2633        createRegionServerStatusStub();
2634        rss = rssStub;
2635        if (rss == null) {
2636          // Still no luck, we tried
2637          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2638          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2639            .build();
2640        }
2641      }
2642      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2643      return RegionStoreSequenceIds.newBuilder()
2644        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2645        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2646    } catch (ServiceException e) {
2647      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2648      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2649        .build();
2650    }
2651  }
2652
2653  /**
2654   * Close meta region if we carry it
2655   * @param abort Whether we're running an abort.
2656   */
2657  private void closeMetaTableRegions(final boolean abort) {
2658    HRegion meta = null;
2659    this.onlineRegionsLock.writeLock().lock();
2660    try {
2661      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
2662        RegionInfo hri = e.getValue().getRegionInfo();
2663        if (hri.isMetaRegion()) {
2664          meta = e.getValue();
2665        }
2666        if (meta != null) {
2667          break;
2668        }
2669      }
2670    } finally {
2671      this.onlineRegionsLock.writeLock().unlock();
2672    }
2673    if (meta != null) {
2674      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2675    }
2676  }
2677
2678  /**
2679   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
2680   * close regions that are already closed or that are closing.
2681   * @param abort Whether we're running an abort.
2682   */
2683  private void closeUserRegions(final boolean abort) {
2684    this.onlineRegionsLock.writeLock().lock();
2685    try {
2686      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
2687        HRegion r = e.getValue();
2688        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2689          // Don't update zk with this close transition; pass false.
2690          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2691        }
2692      }
2693    } finally {
2694      this.onlineRegionsLock.writeLock().unlock();
2695    }
2696  }
2697
2698  protected Map<String, HRegion> getOnlineRegions() {
2699    return this.onlineRegions;
2700  }
2701
2702  public int getNumberOfOnlineRegions() {
2703    return this.onlineRegions.size();
2704  }
2705
2706  /**
2707   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
2708   * as client; HRegion cannot be serialized to cross an rpc.
2709   */
2710  public Collection<HRegion> getOnlineRegionsLocalContext() {
2711    Collection<HRegion> regions = this.onlineRegions.values();
2712    return Collections.unmodifiableCollection(regions);
2713  }
2714
2715  @Override
2716  public void addRegion(HRegion region) {
2717    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2718    configurationManager.registerObserver(region);
2719  }
2720
2721  private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2722    long size) {
2723    if (!sortedRegions.containsKey(size)) {
2724      sortedRegions.put(size, new ArrayList<>());
2725    }
2726    sortedRegions.get(size).add(region);
2727  }
2728
2729  /**
2730   * @return A new Map of online regions sorted by region off-heap size with the first entry being
2731   *         the biggest.
2732   */
2733  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2734    // we'll sort the regions in reverse
2735    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2736    // Copy over all regions. Regions are sorted by size with biggest first.
2737    for (HRegion region : this.onlineRegions.values()) {
2738      addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2739    }
2740    return sortedRegions;
2741  }
2742
2743  /**
2744   * @return A new Map of online regions sorted by region heap size with the first entry being the
2745   *         biggest.
2746   */
2747  SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2748    // we'll sort the regions in reverse
2749    SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2750    // Copy over all regions. Regions are sorted by size with biggest first.
2751    for (HRegion region : this.onlineRegions.values()) {
2752      addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2753    }
2754    return sortedRegions;
2755  }
2756
2757  /** Returns reference to FlushRequester */
2758  @Override
2759  public FlushRequester getFlushRequester() {
2760    return this.cacheFlusher;
2761  }
2762
2763  @Override
2764  public CompactionRequester getCompactionRequestor() {
2765    return this.compactSplitThread;
2766  }
2767
2768  @Override
2769  public LeaseManager getLeaseManager() {
2770    return leaseManager;
2771  }
2772
2773  /** Returns {@code true} when the data file system is available, {@code false} otherwise. */
2774  boolean isDataFileSystemOk() {
2775    return this.dataFsOk;
2776  }
2777
2778  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
2779    return this.rsHost;
2780  }
2781
2782  @Override
2783  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2784    return this.regionsInTransitionInRS;
2785  }
2786
2787  @Override
2788  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2789    return rsQuotaManager;
2790  }
2791
2792  //
2793  // Main program and support routines
2794  //
2795  /**
2796   * Load the replication executorService objects, if any
2797   */
2798  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2799    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2800    // read in the name of the source replication class from the config file.
2801    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2802      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2803
2804    // read in the name of the sink replication class from the config file.
2805    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2806      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2807
2808    // If both the sink and the source class names are the same, then instantiate
2809    // only one object.
2810    if (sourceClassname.equals(sinkClassname)) {
2811      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2812        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2813      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2814      server.sameReplicationSourceAndSink = true;
2815    } else {
2816      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2817        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2818      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2819        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2820      server.sameReplicationSourceAndSink = false;
2821    }
2822  }
2823
2824  private static <T extends ReplicationService> T newReplicationInstance(String classname,
2825    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2826    Path oldLogDir, WALFactory walFactory) throws IOException {
2827    final Class<? extends T> clazz;
2828    try {
2829      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2830      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2831    } catch (java.lang.ClassNotFoundException nfe) {
2832      throw new IOException("Could not find class for " + classname);
2833    }
2834    T service = ReflectionUtils.newInstance(clazz, conf);
2835    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2836    return service;
2837  }
2838
2839  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
2840    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2841    if (!this.isOnline()) {
2842      return walGroupsReplicationStatus;
2843    }
2844    List<ReplicationSourceInterface> allSources = new ArrayList<>();
2845    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2846    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2847    for (ReplicationSourceInterface source : allSources) {
2848      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2849    }
2850    return walGroupsReplicationStatus;
2851  }
2852
2853  /**
2854   * Utility for constructing an instance of the passed HRegionServer class.
2855   */
2856  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
2857    final Configuration conf) {
2858    try {
2859      Constructor<? extends HRegionServer> c =
2860        regionServerClass.getConstructor(Configuration.class);
2861      return c.newInstance(conf);
2862    } catch (Exception e) {
2863      throw new RuntimeException(
2864        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
2865    }
2866  }
2867
2868  /**
2869   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2870   */
2871  public static void main(String[] args) {
2872    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2873    VersionInfo.logVersion();
2874    Configuration conf = HBaseConfiguration.create();
2875    @SuppressWarnings("unchecked")
2876    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2877      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2878
2879    new HRegionServerCommandLine(regionServerClass).doMain(args);
2880  }
2881
2882  /**
2883   * Gets the online regions of the specified table. This method looks at the in-memory
2884   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
2885   * If a region on this table has been closed during a disable, etc., it will not be included in
2886   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
2887   * all the ONLINE regions in the table.
2888   * @param tableName table to limit the scope of the query
2889   * @return Online regions from <code>tableName</code>
2890   */
2891  @Override
2892  public List<HRegion> getRegions(TableName tableName) {
2893    List<HRegion> tableRegions = new ArrayList<>();
2894    synchronized (this.onlineRegions) {
2895      for (HRegion region : this.onlineRegions.values()) {
2896        RegionInfo regionInfo = region.getRegionInfo();
2897        if (regionInfo.getTable().equals(tableName)) {
2898          tableRegions.add(region);
2899        }
2900      }
2901    }
2902    return tableRegions;
2903  }
2904
2905  @Override
2906  public List<HRegion> getRegions() {
2907    List<HRegion> allRegions;
2908    synchronized (this.onlineRegions) {
2909      // Return a clone copy of the onlineRegions
2910      allRegions = new ArrayList<>(onlineRegions.values());
2911    }
2912    return allRegions;
2913  }
2914
2915  /**
2916   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
2917   * @return all the online tables in this RS
2918   */
2919  public Set<TableName> getOnlineTables() {
2920    Set<TableName> tables = new HashSet<>();
2921    synchronized (this.onlineRegions) {
2922      for (Region region : this.onlineRegions.values()) {
2923        tables.add(region.getTableDescriptor().getTableName());
2924      }
2925    }
2926    return tables;
2927  }
2928
2929  public String[] getRegionServerCoprocessors() {
2930    TreeSet<String> coprocessors = new TreeSet<>();
2931    try {
2932      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2933    } catch (IOException exception) {
2934      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
2935        + "skipping.");
2936      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2937    }
2938    Collection<HRegion> regions = getOnlineRegionsLocalContext();
2939    for (HRegion region : regions) {
2940      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2941      try {
2942        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2943      } catch (IOException exception) {
2944        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
2945          + "; skipping.");
2946        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2947      }
2948    }
2949    coprocessors.addAll(rsHost.getCoprocessors());
2950    return coprocessors.toArray(new String[0]);
2951  }
2952
2953  /**
2954   * Try to close the region, logs a warning on failure but continues.
2955   * @param region Region to close
2956   */
2957  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
2958    try {
2959      if (!closeRegion(region.getEncodedName(), abort, null)) {
2960        LOG
2961          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
2962      }
2963    } catch (IOException e) {
2964      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
2965        e);
2966    }
2967  }
2968
2969  /**
2970   * Close asynchronously a region, can be called from the master or internally by the regionserver
2971   * when stopping. If called from the master, the region will update the status.
2972   * <p>
2973   * If an opening was in progress, this method will cancel it, but will not start a new close. The
2974   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2975   * </p>
2976   * <p>
2977   * If a close was in progress, this new request will be ignored, and an exception thrown.
2978   * </p>
2979   * <p>
2980   * Provides additional flag to indicate if this region blocks should be evicted from the cache.
2981   * </p>
2982   * @param encodedName Region to close
2983   * @param abort       True if we are aborting
2984   * @param destination Where the Region is being moved too... maybe null if unknown.
2985   * @return True if closed a region.
2986   * @throws NotServingRegionException if the region is not online
2987   */
2988  protected boolean closeRegion(String encodedName, final boolean abort,
2989    final ServerName destination) throws NotServingRegionException {
2990    // Check for permissions to close.
2991    HRegion actualRegion = this.getRegion(encodedName);
2992    // Can be null if we're calling close on a region that's not online
2993    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2994      try {
2995        actualRegion.getCoprocessorHost().preClose(false);
2996      } catch (IOException exp) {
2997        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2998        return false;
2999      }
3000    }
3001
3002    // previous can come back 'null' if not in map.
3003    final Boolean previous =
3004      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
3005
3006    if (Boolean.TRUE.equals(previous)) {
3007      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
3008        + "trying to OPEN. Cancelling OPENING.");
3009      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3010        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3011        // We're going to try to do a standard close then.
3012        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
3013          + " Doing a standard close now");
3014        return closeRegion(encodedName, abort, destination);
3015      }
3016      // Let's get the region from the online region list again
3017      actualRegion = this.getRegion(encodedName);
3018      if (actualRegion == null) { // If already online, we still need to close it.
3019        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3020        // The master deletes the znode when it receives this exception.
3021        throw new NotServingRegionException(
3022          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
3023      }
3024    } else if (previous == null) {
3025      LOG.info("Received CLOSE for {}", encodedName);
3026    } else if (Boolean.FALSE.equals(previous)) {
3027      LOG.info("Received CLOSE for the region: " + encodedName
3028        + ", which we are already trying to CLOSE, but not completed yet");
3029      return true;
3030    }
3031
3032    if (actualRegion == null) {
3033      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3034      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3035      // The master deletes the znode when it receives this exception.
3036      throw new NotServingRegionException(
3037        "The region " + encodedName + " is not online, and is not opening.");
3038    }
3039
3040    CloseRegionHandler crh;
3041    final RegionInfo hri = actualRegion.getRegionInfo();
3042    if (hri.isMetaRegion()) {
3043      crh = new CloseMetaHandler(this, this, hri, abort);
3044    } else {
3045      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3046    }
3047    this.executorService.submit(crh);
3048    return true;
3049  }
3050
3051  /**
3052   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
3053   *         member of the online regions.
3054   */
3055  public HRegion getOnlineRegion(final byte[] regionName) {
3056    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3057    return this.onlineRegions.get(encodedRegionName);
3058  }
3059
3060  @Override
3061  public HRegion getRegion(final String encodedRegionName) {
3062    return this.onlineRegions.get(encodedRegionName);
3063  }
3064
3065  @Override
3066  public boolean removeRegion(final HRegion r, ServerName destination) {
3067    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3068    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3069    if (destination != null) {
3070      long closeSeqNum = r.getMaxFlushedSeqId();
3071      if (closeSeqNum == HConstants.NO_SEQNUM) {
3072        // No edits in WAL for this region; get the sequence number when the region was opened.
3073        closeSeqNum = r.getOpenSeqNum();
3074        if (closeSeqNum == HConstants.NO_SEQNUM) {
3075          closeSeqNum = 0;
3076        }
3077      }
3078      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3079      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3080      if (selfMove) {
3081        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3082          r.getRegionInfo().getEncodedName(),
3083          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3084      }
3085    }
3086    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3087    configurationManager.deregisterObserver(r);
3088    return toReturn != null;
3089  }
3090
3091  /**
3092   * Protected Utility method for safely obtaining an HRegion handle.
3093   * @param regionName Name of online {@link HRegion} to return
3094   * @return {@link HRegion} for <code>regionName</code>
3095   */
3096  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3097    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3098    return getRegionByEncodedName(regionName, encodedRegionName);
3099  }
3100
3101  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3102    return getRegionByEncodedName(null, encodedRegionName);
3103  }
3104
3105  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3106    throws NotServingRegionException {
3107    HRegion region = this.onlineRegions.get(encodedRegionName);
3108    if (region == null) {
3109      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3110      if (moveInfo != null) {
3111        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3112      }
3113      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3114      String regionNameStr =
3115        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3116      if (isOpening != null && isOpening) {
3117        throw new RegionOpeningException(
3118          "Region " + regionNameStr + " is opening on " + this.serverName);
3119      }
3120      throw new NotServingRegionException(
3121        "" + regionNameStr + " is not online on " + this.serverName);
3122    }
3123    return region;
3124  }
3125
3126  /**
3127   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3128   * already.
3129   * @param t   Throwable
3130   * @param msg Message to log in error. Can be null.
3131   * @return Throwable converted to an IOE; methods can only let out IOEs.
3132   */
3133  private Throwable cleanup(final Throwable t, final String msg) {
3134    // Don't log as error if NSRE; NSRE is 'normal' operation.
3135    if (t instanceof NotServingRegionException) {
3136      LOG.debug("NotServingRegionException; " + t.getMessage());
3137      return t;
3138    }
3139    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3140    if (msg == null) {
3141      LOG.error("", e);
3142    } else {
3143      LOG.error(msg, e);
3144    }
3145    if (!rpcServices.checkOOME(t)) {
3146      checkFileSystem();
3147    }
3148    return t;
3149  }
3150
3151  /**
3152   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3153   * @return Make <code>t</code> an IOE if it isn't already.
3154   */
3155  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3156    return (t instanceof IOException ? (IOException) t
3157      : msg == null || msg.length() == 0 ? new IOException(t)
3158      : new IOException(msg, t));
3159  }
3160
3161  /**
3162   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3163   * stopRequested
3164   * @return false if file system is not available
3165   */
3166  boolean checkFileSystem() {
3167    if (this.dataFsOk && this.dataFs != null) {
3168      try {
3169        FSUtils.checkFileSystemAvailable(this.dataFs);
3170      } catch (IOException e) {
3171        abort("File System not available", e);
3172        this.dataFsOk = false;
3173      }
3174    }
3175    return this.dataFsOk;
3176  }
3177
3178  @Override
3179  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3180    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3181    Address[] addr = new Address[favoredNodes.size()];
3182    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3183    // it is a map of region name to Address[]
3184    for (int i = 0; i < favoredNodes.size(); i++) {
3185      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3186    }
3187    regionFavoredNodesMap.put(encodedRegionName, addr);
3188  }
3189
3190  /**
3191   * Return the favored nodes for a region given its encoded name. Look at the comment around
3192   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3193   * @param encodedRegionName the encoded region name.
3194   * @return array of favored locations
3195   */
3196  @Override
3197  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3198    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3199  }
3200
3201  @Override
3202  public ServerNonceManager getNonceManager() {
3203    return this.nonceManager;
3204  }
3205
3206  private static class MovedRegionInfo {
3207    private final ServerName serverName;
3208    private final long seqNum;
3209
3210    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3211      this.serverName = serverName;
3212      this.seqNum = closeSeqNum;
3213    }
3214
3215    public ServerName getServerName() {
3216      return serverName;
3217    }
3218
3219    public long getSeqNum() {
3220      return seqNum;
3221    }
3222  }
3223
3224  /**
3225   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3226   * number of network calls instead of reducing them.
3227   */
3228  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3229
3230  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3231    boolean selfMove) {
3232    if (selfMove) {
3233      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3234      return;
3235    }
3236    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3237      + closeSeqNum);
3238    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3239  }
3240
3241  void removeFromMovedRegions(String encodedName) {
3242    movedRegionInfoCache.invalidate(encodedName);
3243  }
3244
3245  @InterfaceAudience.Private
3246  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3247    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3248  }
3249
3250  @InterfaceAudience.Private
3251  public int movedRegionCacheExpiredTime() {
3252    return TIMEOUT_REGION_MOVED;
3253  }
3254
3255  private String getMyEphemeralNodePath() {
3256    return zooKeeper.getZNodePaths().getRsPath(serverName);
3257  }
3258
3259  private boolean isHealthCheckerConfigured() {
3260    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3261    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3262  }
3263
3264  /** Returns the underlying {@link CompactSplit} for the servers */
3265  public CompactSplit getCompactSplitThread() {
3266    return this.compactSplitThread;
3267  }
3268
3269  CoprocessorServiceResponse execRegionServerService(
3270    @SuppressWarnings("UnusedParameters") final RpcController controller,
3271    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3272    try {
3273      ServerRpcController serviceController = new ServerRpcController();
3274      CoprocessorServiceCall call = serviceRequest.getCall();
3275      String serviceName = call.getServiceName();
3276      Service service = coprocessorServiceHandlers.get(serviceName);
3277      if (service == null) {
3278        throw new UnknownProtocolException(null,
3279          "No registered coprocessor executorService found for " + serviceName);
3280      }
3281      ServiceDescriptor serviceDesc = service.getDescriptorForType();
3282
3283      String methodName = call.getMethodName();
3284      MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3285      if (methodDesc == null) {
3286        throw new UnknownProtocolException(service.getClass(),
3287          "Unknown method " + methodName + " called on executorService " + serviceName);
3288      }
3289
3290      Message request = CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3291      final Message.Builder responseBuilder =
3292        service.getResponsePrototype(methodDesc).newBuilderForType();
3293      service.callMethod(methodDesc, serviceController, request, message -> {
3294        if (message != null) {
3295          responseBuilder.mergeFrom(message);
3296        }
3297      });
3298      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3299      if (exception != null) {
3300        throw exception;
3301      }
3302      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3303    } catch (IOException ie) {
3304      throw new ServiceException(ie);
3305    }
3306  }
3307
3308  /**
3309   * May be null if this is a master which not carry table.
3310   * @return The block cache instance used by the regionserver.
3311   */
3312  @Override
3313  public Optional<BlockCache> getBlockCache() {
3314    return Optional.ofNullable(this.blockCache);
3315  }
3316
3317  /**
3318   * May be null if this is a master which not carry table.
3319   * @return The cache for mob files used by the regionserver.
3320   */
3321  @Override
3322  public Optional<MobFileCache> getMobFileCache() {
3323    return Optional.ofNullable(this.mobFileCache);
3324  }
3325
3326  CacheEvictionStats clearRegionBlockCache(Region region) {
3327    long evictedBlocks = 0;
3328
3329    for (Store store : region.getStores()) {
3330      for (StoreFile hFile : store.getStorefiles()) {
3331        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3332      }
3333    }
3334
3335    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3336  }
3337
3338  @Override
3339  public double getCompactionPressure() {
3340    double max = 0;
3341    for (Region region : onlineRegions.values()) {
3342      for (Store store : region.getStores()) {
3343        double normCount = store.getCompactionPressure();
3344        if (normCount > max) {
3345          max = normCount;
3346        }
3347      }
3348    }
3349    return max;
3350  }
3351
3352  @Override
3353  public HeapMemoryManager getHeapMemoryManager() {
3354    return hMemManager;
3355  }
3356
3357  public MemStoreFlusher getMemStoreFlusher() {
3358    return cacheFlusher;
3359  }
3360
3361  /**
3362   * For testing
3363   * @return whether all wal roll request finished for this regionserver
3364   */
3365  @InterfaceAudience.Private
3366  public boolean walRollRequestFinished() {
3367    return this.walRoller.walRollFinished();
3368  }
3369
3370  @Override
3371  public ThroughputController getFlushThroughputController() {
3372    return flushThroughputController;
3373  }
3374
3375  @Override
3376  public double getFlushPressure() {
3377    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3378      // return 0 during RS initialization
3379      return 0.0;
3380    }
3381    return getRegionServerAccounting().getFlushPressure();
3382  }
3383
3384  @Override
3385  public void onConfigurationChange(Configuration newConf) {
3386    ThroughputController old = this.flushThroughputController;
3387    if (old != null) {
3388      old.stop("configuration change");
3389    }
3390    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3391    try {
3392      Superusers.initialize(newConf);
3393    } catch (IOException e) {
3394      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3395    }
3396
3397    // update region server coprocessor if the configuration has changed.
3398    if (
3399      CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
3400        CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)
3401    ) {
3402      LOG.info("Update region server coprocessors because the configuration has changed");
3403      this.rsHost = new RegionServerCoprocessorHost(this, newConf);
3404    }
3405  }
3406
3407  @Override
3408  public MetricsRegionServer getMetrics() {
3409    return metricsRegionServer;
3410  }
3411
3412  @Override
3413  public SecureBulkLoadManager getSecureBulkLoadManager() {
3414    return this.secureBulkLoadManager;
3415  }
3416
3417  @Override
3418  public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3419    final Abortable abort) {
3420    final LockServiceClient client =
3421      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3422    return client.regionLock(regionInfo, description, abort);
3423  }
3424
3425  @Override
3426  public void unassign(byte[] regionName) throws IOException {
3427    FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3428  }
3429
3430  @Override
3431  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3432    return this.rsSpaceQuotaManager;
3433  }
3434
3435  @Override
3436  public boolean reportFileArchivalForQuotas(TableName tableName,
3437    Collection<Entry<String, Long>> archivedFiles) {
3438    if (TEST_SKIP_REPORTING_TRANSITION) {
3439      return false;
3440    }
3441    RegionServerStatusService.BlockingInterface rss = rssStub;
3442    if (rss == null || rsSpaceQuotaManager == null) {
3443      // the current server could be stopping.
3444      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3445      return false;
3446    }
3447    try {
3448      RegionServerStatusProtos.FileArchiveNotificationRequest request =
3449        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3450      rss.reportFileArchival(null, request);
3451    } catch (ServiceException se) {
3452      IOException ioe = ProtobufUtil.getRemoteException(se);
3453      if (ioe instanceof PleaseHoldException) {
3454        if (LOG.isTraceEnabled()) {
3455          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3456            + " This will be retried.", ioe);
3457        }
3458        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3459        return false;
3460      }
3461      if (rssStub == rss) {
3462        rssStub = null;
3463      }
3464      // re-create the stub if we failed to report the archival
3465      createRegionServerStatusStub(true);
3466      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3467      return false;
3468    }
3469    return true;
3470  }
3471
3472  void executeProcedure(long procId, RSProcedureCallable callable) {
3473    executorService.submit(new RSProcedureHandler(this, procId, callable));
3474  }
3475
3476  public void remoteProcedureComplete(long procId, Throwable error) {
3477    procedureResultReporter.complete(procId, error);
3478  }
3479
3480  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3481    RegionServerStatusService.BlockingInterface rss;
3482    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3483    for (;;) {
3484      rss = rssStub;
3485      if (rss != null) {
3486        break;
3487      }
3488      createRegionServerStatusStub();
3489    }
3490    try {
3491      rss.reportProcedureDone(null, request);
3492    } catch (ServiceException se) {
3493      if (rssStub == rss) {
3494        rssStub = null;
3495      }
3496      throw ProtobufUtil.getRemoteException(se);
3497    }
3498  }
3499
3500  /**
3501   * Will ignore the open/close region procedures which already submitted or executed. When master
3502   * had unfinished open/close region procedure and restarted, new active master may send duplicate
3503   * open/close region request to regionserver. The open/close request is submitted to a thread pool
3504   * and execute. So first need a cache for submitted open/close region procedures. After the
3505   * open/close region request executed and report region transition succeed, cache it in executed
3506   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3507   * transition succeed, master will not send the open/close region request to regionserver again.
3508   * And we thought that the ongoing duplicate open/close region request should not be delayed more
3509   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
3510   * HBASE-22404 for more details.
3511   * @param procId the id of the open/close region procedure
3512   * @return true if the procedure can be submitted.
3513   */
3514  boolean submitRegionProcedure(long procId) {
3515    if (procId == -1) {
3516      return true;
3517    }
3518    // Ignore the region procedures which already submitted.
3519    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3520    if (previous != null) {
3521      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3522      return false;
3523    }
3524    // Ignore the region procedures which already executed.
3525    if (executedRegionProcedures.getIfPresent(procId) != null) {
3526      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3527      return false;
3528    }
3529    return true;
3530  }
3531
3532  /**
3533   * See {@link #submitRegionProcedure(long)}.
3534   * @param procId the id of the open/close region procedure
3535   */
3536  public void finishRegionProcedure(long procId) {
3537    executedRegionProcedures.put(procId, procId);
3538    submittedRegionProcedures.remove(procId);
3539  }
3540
3541  /**
3542   * Force to terminate region server when abort timeout.
3543   */
3544  private static class SystemExitWhenAbortTimeout extends TimerTask {
3545
3546    public SystemExitWhenAbortTimeout() {
3547    }
3548
3549    @Override
3550    public void run() {
3551      LOG.warn("Aborting region server timed out, terminating forcibly"
3552        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
3553        + " Thread dump to stdout.");
3554      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3555      Runtime.getRuntime().halt(1);
3556    }
3557  }
3558
3559  @InterfaceAudience.Private
3560  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3561    return compactedFileDischarger;
3562  }
3563
3564  /**
3565   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3566   * @return pause time
3567   */
3568  @InterfaceAudience.Private
3569  public long getRetryPauseTime() {
3570    return this.retryPauseTime;
3571  }
3572
3573  @Override
3574  public Optional<ServerName> getActiveMaster() {
3575    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3576  }
3577
3578  @Override
3579  public List<ServerName> getBackupMasters() {
3580    return masterAddressTracker.getBackupMasters();
3581  }
3582
3583  @Override
3584  public Iterator<ServerName> getBootstrapNodes() {
3585    return bootstrapNodeManager.getBootstrapNodes().iterator();
3586  }
3587
3588  @Override
3589  public List<HRegionLocation> getMetaLocations() {
3590    return metaRegionLocationCache.getMetaRegionLocations();
3591  }
3592
3593  @Override
3594  protected NamedQueueRecorder createNamedQueueRecord() {
3595    return NamedQueueRecorder.getInstance(conf);
3596  }
3597
3598  @Override
3599  protected boolean clusterMode() {
3600    // this method will be called in the constructor of super class, so we can not return masterless
3601    // directly here, as it will always be false.
3602    return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3603  }
3604
3605  @InterfaceAudience.Private
3606  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
3607    return brokenStoreFileCleaner;
3608  }
3609
3610  @InterfaceAudience.Private
3611  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
3612    return rsMobFileCleanerChore;
3613  }
3614
3615  RSSnapshotVerifier getRsSnapshotVerifier() {
3616    return rsSnapshotVerifier;
3617  }
3618
3619  @Override
3620  protected void stopChores() {
3621    shutdownChore(nonceManagerChore);
3622    shutdownChore(compactionChecker);
3623    shutdownChore(compactedFileDischarger);
3624    shutdownChore(periodicFlusher);
3625    shutdownChore(healthCheckChore);
3626    shutdownChore(executorStatusChore);
3627    shutdownChore(storefileRefresher);
3628    shutdownChore(fsUtilizationChore);
3629    shutdownChore(namedQueueServiceChore);
3630    shutdownChore(brokenStoreFileCleaner);
3631    shutdownChore(rsMobFileCleanerChore);
3632    shutdownChore(replicationMarkerChore);
3633  }
3634
3635  @Override
3636  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3637    return regionReplicationBufferManager;
3638  }
3639}